ceremonyclient/node/dispatch/dispatch_service.go

542 lines
14 KiB
Go

package dispatch
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"sync"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/emptypb"
qkeys "source.quilibrium.com/quilibrium/monorepo/node/keys"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
"source.quilibrium.com/quilibrium/monorepo/types/dispatch"
"source.quilibrium.com/quilibrium/monorepo/types/keys"
"source.quilibrium.com/quilibrium/monorepo/types/p2p"
"source.quilibrium.com/quilibrium/monorepo/types/store"
up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
)
// Compile-time check that DispatchService implements the interface
var _ dispatch.DispatchService = (*DispatchService)(nil)
var _ protobufs.DispatchServiceServer = (*DispatchService)(nil)
const (
// Default reap interval - how often to clean up old messages
defaultReapInterval = 1 * time.Hour
// Default message retention period - messages older than this will be reaped
defaultRetentionPeriod = 7 * 24 * time.Hour
)
// Domain constants for signature verification
const (
domainAdd = "add"
domainDelete = "delete"
)
// DispatchService handles P2P dispatch messages and CRDT-based synchronization
type DispatchService struct {
protobufs.DispatchServiceServer
store store.InboxStore
logger *zap.Logger
keyManager keys.KeyManager
pubSub p2p.PubSub
mu sync.RWMutex
// Filters this node is responsible for
responsibleFilters map[[3]byte]bool
// Node's identity keys
identityPrivateKey []byte
signedPrePrivateKey []byte
identityPublicKey []byte
signedPrePublicKey []byte
// Background process control
stopCh chan struct{}
stopOnce sync.Once
reapInterval time.Duration
retentionPeriod time.Duration
}
// NewDispatchService creates a new dispatch service instance
func NewDispatchService(
store store.InboxStore,
logger *zap.Logger,
keyManager keys.KeyManager,
pubSub p2p.PubSub,
) *DispatchService {
// Ensure keys are in place to run dispatch
_, err := keyManager.GetAgreementKey("q-device-key")
if err != nil {
if !errors.Is(err, qkeys.KeyNotFoundErr) {
panic(err)
}
_, err := keyManager.CreateAgreementKey(
"q-device-key",
crypto.KeyTypeDecaf448,
)
if err != nil {
panic(err)
}
}
_, err = keyManager.GetAgreementKey("q-device-pre-key")
if err != nil {
if !errors.Is(err, qkeys.KeyNotFoundErr) {
panic(err)
}
_, err := keyManager.CreateAgreementKey(
"q-device-pre-key",
crypto.KeyTypeDecaf448,
)
if err != nil {
panic(err)
}
}
identityKey, err := keyManager.GetRawKey("q-device-key")
if err != nil {
panic(err)
}
signedPreKey, err := keyManager.GetRawKey("q-device-pre-key")
if err != nil {
panic(err)
}
return &DispatchService{
store: store,
logger: logger,
keyManager: keyManager,
pubSub: pubSub,
responsibleFilters: make(map[[3]byte]bool),
identityPrivateKey: identityKey.PrivateKey,
signedPrePrivateKey: signedPreKey.PrivateKey,
identityPublicKey: identityKey.PublicKey,
signedPrePublicKey: signedPreKey.PublicKey,
stopCh: make(chan struct{}),
reapInterval: defaultReapInterval,
retentionPeriod: defaultRetentionPeriod,
}
}
// SetResponsibleFilters updates the filters this node is responsible for
func (d *DispatchService) SetResponsibleFilters(filters [][3]byte) {
d.mu.Lock()
defer d.mu.Unlock()
d.responsibleFilters = make(map[[3]byte]bool)
for _, filter := range filters {
d.responsibleFilters[filter] = true
}
}
// IsResponsibleForFilter checks if this node handles the given filter
func (d *DispatchService) IsResponsibleForFilter(filter [3]byte) bool {
d.mu.RLock()
defer d.mu.RUnlock()
return d.responsibleFilters[filter]
}
// AddInboxMessage adds a new message to an inbox (grow-only set)
func (d *DispatchService) AddInboxMessage(
ctx context.Context,
msg *protobufs.InboxMessage,
) error {
if msg == nil {
return errors.New("message is nil")
}
filter := up2p.GetBloomFilterIndices(msg.Address, 256, 3)
// Check if we're responsible for this filter
if !d.IsResponsibleForFilter([3]byte(filter)) {
return errors.New("not responsible for filter")
}
return d.store.AddMessage(msg)
}
// getInboxMessages retrieves messages based on filter criteria
func (d *DispatchService) getInboxMessages(
ctx context.Context,
req *protobufs.InboxMessageRequest,
) (*protobufs.InboxMessageResponse, error) {
if len(req.Filter) != 3 {
return nil, errors.New("invalid filter length")
}
var filter [3]byte
copy(filter[:], req.Filter)
// Check if we're responsible for this filter
if !d.IsResponsibleForFilter(filter) {
return nil, errors.New("not responsible for filter")
}
var messages []*protobufs.InboxMessage
var err error
// Handle different request types
if len(req.Address) > 0 && (req.FromTimestamp > 0 || req.ToTimestamp > 0) {
// Time range query
messages, err = d.store.GetMessagesByTimeRange(
filter,
req.Address,
req.FromTimestamp,
req.ToTimestamp,
)
} else if len(req.Address) > 0 {
// Address-specific query
messages, err = d.store.GetMessagesByAddress(filter, req.Address)
} else {
// All messages for filter
messages, err = d.store.GetMessagesByFilter(filter)
}
if err != nil {
return nil, errors.Wrap(err, "get messages")
}
// Filter by message ID if specified
if len(req.MessageId) > 0 {
var filtered []*protobufs.InboxMessage
for _, msg := range messages {
msgHash := sha256.Sum256(msg.Message)
if bytes.Equal(msgHash[:], req.MessageId) {
filtered = append(filtered, msg)
break
}
}
messages = filtered
}
return &protobufs.InboxMessageResponse{
Messages: messages,
}, nil
}
// AddHubInboxAssociation adds a hub-inbox association (2P-Set add operation)
func (d *DispatchService) AddHubInboxAssociation(
ctx context.Context,
msg *protobufs.HubAddInboxMessage,
) error {
if msg == nil {
return errors.New("add message is nil")
}
filter := up2p.GetBloomFilterIndices(msg.Address, 256, 3)
// Check if we're responsible for this filter
if !d.IsResponsibleForFilter([3]byte(filter)) {
return errors.New("not responsible for filter")
}
// Verify signatures according to protobuf comments
if err := d.verifyHubAddSignatures(msg); err != nil {
return errors.Wrap(err, "signature verification failed")
}
return d.store.AddHubInboxAssociation(msg)
}
// DeleteHubInboxAssociation removes a hub-inbox association (2P-Set delete
// operation)
func (d *DispatchService) DeleteHubInboxAssociation(
ctx context.Context,
msg *protobufs.HubDeleteInboxMessage,
) error {
if msg == nil {
return errors.New("delete message is nil")
}
filter := up2p.GetBloomFilterIndices(msg.Address, 256, 3)
// Check if we're responsible for this filter
if !d.IsResponsibleForFilter([3]byte(filter)) {
return errors.New("not responsible for filter")
}
// Verify signatures according to protobuf comments
if err := d.verifyHubDeleteSignatures(msg); err != nil {
return errors.Wrap(err, "signature verification failed")
}
return d.store.DeleteHubInboxAssociation(msg)
}
// getHub retrieves hub information including current associations
func (d *DispatchService) getHub(
ctx context.Context,
req *protobufs.HubRequest,
) (*protobufs.HubResponse, error) {
if len(req.Filter) != 3 {
return nil, errors.New("invalid filter length")
}
var filter [3]byte
copy(filter[:], req.Filter)
// Check if we're responsible for this filter
if !d.IsResponsibleForFilter(filter) {
return nil, errors.New("not responsible for filter")
}
return d.store.GetHubAssociations(filter, req.HubAddress)
}
// syncDispatch handles synchronization requests from peers
func (d *DispatchService) syncDispatch(
ctx context.Context,
req *protobufs.DispatchSyncRequest,
) (*protobufs.DispatchSyncResponse, error) {
var responsibleFilters [][3]byte
// Filter to only the filters we're responsible for
for _, filterBytes := range req.Filters {
if len(filterBytes) != 3 {
continue
}
var filter [3]byte
copy(filter[:], filterBytes)
if d.IsResponsibleForFilter(filter) {
responsibleFilters = append(responsibleFilters, filter)
}
}
if len(responsibleFilters) == 0 {
return &protobufs.DispatchSyncResponse{}, nil
}
// Get messages and hubs for synchronization
messages, err := d.store.GetAllMessagesCRDT(responsibleFilters)
if err != nil {
return nil, errors.Wrap(err, "get messages for sync")
}
hubs, err := d.store.GetAllHubsCRDT(responsibleFilters)
if err != nil {
return nil, errors.Wrap(err, "get hubs for sync")
}
return &protobufs.DispatchSyncResponse{
Messages: messages,
Hubs: hubs,
}, nil
}
// PutInboxMessage implements the gRPC service method
func (d *DispatchService) PutInboxMessage(
ctx context.Context,
req *protobufs.InboxMessagePut,
) (*emptypb.Empty, error) {
if err := d.AddInboxMessage(ctx, req.Message); err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
// GetInboxMessages implements the gRPC service method
func (d *DispatchService) GetInboxMessages(
ctx context.Context,
req *protobufs.InboxMessageRequest,
) (*protobufs.InboxMessageResponse, error) {
return d.getInboxMessages(ctx, req)
}
// PutHub implements the gRPC service method
func (d *DispatchService) PutHub(
ctx context.Context,
req *protobufs.HubPut,
) (*emptypb.Empty, error) {
if req.Add != nil {
if err := d.AddHubInboxAssociation(ctx, req.Add); err != nil {
return nil, err
}
}
if req.Delete != nil {
if err := d.DeleteHubInboxAssociation(ctx, req.Delete); err != nil {
return nil, err
}
}
return &emptypb.Empty{}, nil
}
// GetHub implements the gRPC service method
func (d *DispatchService) GetHub(
ctx context.Context,
req *protobufs.HubRequest,
) (*protobufs.HubResponse, error) {
return d.getHub(ctx, req)
}
// Sync implements the gRPC service method
func (d *DispatchService) Sync(
ctx context.Context,
req *protobufs.DispatchSyncRequest,
) (*protobufs.DispatchSyncResponse, error) {
return d.syncDispatch(ctx, req)
}
// Signature verification functions
// verifyHubAddSignatures verifies ed448 signatures for hub add operations
func (d *DispatchService) verifyHubAddSignatures(
msg *protobufs.HubAddInboxMessage,
) error {
// Construct message for hub signature: domain("add") || inbox_public_key
hubMsg := append([]byte(domainAdd), msg.InboxPublicKey...)
// Verify hub signature
if !d.verifyEd448Signature(msg.HubPublicKey, hubMsg, msg.HubSignature) {
return errors.New("invalid hub signature")
}
// Construct message for inbox signature: domain("add") || hub_public_key
inboxMsg := append([]byte(domainAdd), msg.HubPublicKey...)
// Verify inbox signature
if !d.verifyEd448Signature(msg.InboxPublicKey, inboxMsg, msg.InboxSignature) {
return errors.New("invalid inbox signature")
}
return nil
}
// verifyHubDeleteSignatures verifies ed448 signatures for hub delete operations
func (d *DispatchService) verifyHubDeleteSignatures(
msg *protobufs.HubDeleteInboxMessage,
) error {
// Construct message for hub signature: domain("delete") || inbox_public_key
hubMsg := append([]byte(domainDelete), msg.InboxPublicKey...)
// Verify hub signature
if !d.verifyEd448Signature(msg.HubPublicKey, hubMsg, msg.HubSignature) {
return errors.New("invalid hub signature")
}
// Construct message for inbox signature: domain("delete") || hub_public_key
inboxMsg := append([]byte(domainDelete), msg.HubPublicKey...)
// Verify inbox signature
if !d.verifyEd448Signature(msg.InboxPublicKey, inboxMsg, msg.InboxSignature) {
return errors.New("invalid inbox signature")
}
return nil
}
// verifyEd448Signature verifies an ed448 signature
func (d *DispatchService) verifyEd448Signature(
publicKey, message, signature []byte,
) bool {
// Ed448 public keys are 57 bytes, signatures are 114 bytes
if len(publicKey) != 57 || len(signature) != 114 {
return false
}
// Use the key manager to verify the signature
valid, err := d.keyManager.ValidateSignature(
crypto.KeyTypeEd448,
publicKey,
message,
signature,
nil,
)
if err != nil {
d.logger.Debug("signature verification failed", zap.Error(err))
return false
}
return valid
}
// Start begins the background processes for the dispatch service
func (d *DispatchService) Start() {
go d.reapLoop()
d.logger.Info("dispatch service started")
}
// Stop gracefully shuts down the dispatch service
func (d *DispatchService) Stop() {
d.stopOnce.Do(func() {
// Signal stop to background processes
close(d.stopCh)
d.logger.Info("dispatch service stopped")
})
}
// SetReapInterval sets the interval between reap operations for messages
func (d *DispatchService) SetReapInterval(interval time.Duration) {
d.mu.Lock()
defer d.mu.Unlock()
d.reapInterval = interval
}
// SetRetentionPeriod sets how long messages are retained before being reaped
func (d *DispatchService) SetRetentionPeriod(period time.Duration) {
d.mu.Lock()
defer d.mu.Unlock()
d.retentionPeriod = period
}
// reapLoop runs the background process that periodically reaps old messages
func (d *DispatchService) reapLoop() {
ticker := time.NewTicker(d.reapInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
d.reapMessages()
case <-d.stopCh:
return
}
}
}
// reapMessages removes old messages from all responsible filters
func (d *DispatchService) reapMessages() {
d.mu.RLock()
filters := make([][3]byte, 0, len(d.responsibleFilters))
for filter := range d.responsibleFilters {
filters = append(filters, filter)
}
retentionPeriod := d.retentionPeriod
d.mu.RUnlock()
// Calculate cutoff timestamp
cutoffTime := time.Now().Add(-retentionPeriod).UnixMilli()
// Reap messages for each filter
for _, filter := range filters {
if err := d.store.ReapMessages(filter, uint64(cutoffTime)); err != nil {
d.logger.Error(
"failed to reap messages",
zap.String("filter", hex.EncodeToString(filter[:])),
zap.Error(err),
)
} else {
d.logger.Debug(
"reaped messages",
zap.String("filter", hex.EncodeToString(filter[:])),
zap.Int64("cutoffTimestamp", cutoffTime),
)
}
}
}