mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
* v2.1.0 [omit consensus and adjacent] - this commit will be amended with the full release after the file copy is complete * 2.1.0 main node rollup
807 lines
23 KiB
Go
807 lines
23 KiB
Go
package dispatch
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/ed25519"
|
|
"crypto/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
tchannel "source.quilibrium.com/quilibrium/monorepo/types/channel"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/keys"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/mocks"
|
|
up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
|
|
)
|
|
|
|
var _ tchannel.EncryptedChannel = (*mockEncryptedChannel)(nil)
|
|
|
|
type mockEncryptedChannel struct{}
|
|
|
|
// DecryptTwoPartyMessage implements channel.EncryptedChannel.
|
|
func (m *mockEncryptedChannel) DecryptTwoPartyMessage(
|
|
ratchetState string,
|
|
envelope *tchannel.P2PChannelEnvelope,
|
|
) (newRatchetState string, message []byte, err error) {
|
|
// We just pass through the message for testing
|
|
return "", envelope.MessageBody.Ciphertext, nil
|
|
}
|
|
|
|
// EncryptTwoPartyMessage implements channel.EncryptedChannel.
|
|
func (m *mockEncryptedChannel) EncryptTwoPartyMessage(
|
|
ratchetState string,
|
|
message []byte,
|
|
) (newRatchetState string, envelope *tchannel.P2PChannelEnvelope, err error) {
|
|
return "", &tchannel.P2PChannelEnvelope{MessageBody: tchannel.MessageCiphertext{Ciphertext: message}}, nil
|
|
}
|
|
|
|
// EstablishTwoPartyChannel implements channel.EncryptedChannel.
|
|
func (m *mockEncryptedChannel) EstablishTwoPartyChannel(
|
|
isSender bool,
|
|
sendingIdentityPrivateKey []byte,
|
|
sendingSignedPrePrivateKey []byte,
|
|
receivingIdentityKey []byte,
|
|
receivingSignedPreKey []byte,
|
|
) (string, error) {
|
|
return "", nil
|
|
}
|
|
|
|
func setupTestService(t *testing.T) (*DispatchService, *mocks.MockInboxStore, *mocks.MockKeyManager, *mocks.MockPubSub) {
|
|
logger, _ := zap.NewDevelopment()
|
|
mockStore := new(mocks.MockInboxStore)
|
|
mockKeyManager := new(mocks.MockKeyManager)
|
|
mockPubSub := new(mocks.MockPubSub)
|
|
|
|
mockPubSub.On("GetPeerID").Return(make([]byte, 32))
|
|
|
|
// Setup key manager mocks
|
|
mockKeyManager.On("GetAgreementKey", mock.Anything).Return(nil, nil)
|
|
mockKeyManager.On("GetRawKey", mock.Anything).Return(&keys.Key{
|
|
Id: "",
|
|
Type: crypto.KeyTypeX448,
|
|
PrivateKey: make([]byte, 56),
|
|
PublicKey: make([]byte, 57),
|
|
}, nil)
|
|
|
|
service := NewDispatchService(
|
|
mockStore,
|
|
logger,
|
|
mockKeyManager,
|
|
mockPubSub,
|
|
)
|
|
|
|
// Set some default responsible filters
|
|
service.SetResponsibleFilters([][3]byte{
|
|
{0x01, 0x02, 0x03},
|
|
{0x04, 0x05, 0x06},
|
|
})
|
|
|
|
return service, mockStore, mockKeyManager, mockPubSub
|
|
}
|
|
|
|
func setupBenchService(b *testing.B) (*DispatchService, *mocks.MockInboxStore, *mocks.MockKeyManager, *mocks.MockPubSub) {
|
|
logger, _ := zap.NewDevelopment()
|
|
mockStore := new(mocks.MockInboxStore)
|
|
mockKeyManager := new(mocks.MockKeyManager)
|
|
mockPubSub := new(mocks.MockPubSub)
|
|
|
|
mockPubSub.On("GetPeerID").Return(make([]byte, 32))
|
|
|
|
// Setup key manager mocks
|
|
mockKeyManager.On("GetAgreementKey", mock.Anything).Return(nil, nil)
|
|
mockKeyManager.On("GetRawKey", mock.Anything).Return(&keys.Key{
|
|
Id: "",
|
|
Type: crypto.KeyTypeX448,
|
|
PrivateKey: make([]byte, 56),
|
|
PublicKey: make([]byte, 57),
|
|
}, nil)
|
|
|
|
service := NewDispatchService(
|
|
mockStore,
|
|
logger,
|
|
mockKeyManager,
|
|
mockPubSub,
|
|
)
|
|
|
|
// Set some default responsible filters
|
|
service.SetResponsibleFilters([][3]byte{
|
|
{0x01, 0x02, 0x03},
|
|
{0x04, 0x05, 0x06},
|
|
})
|
|
|
|
return service, mockStore, mockKeyManager, mockPubSub
|
|
}
|
|
|
|
func createValidEd448Keys() ([]byte, []byte) {
|
|
// Generate ED25519 keys for testing (ED448 is too complex for test)
|
|
// In real tests, we'd need actual ED448 keys, but for signature verification tests
|
|
// we'll mock the key manager
|
|
pub, priv, _ := ed25519.GenerateKey(rand.Reader)
|
|
|
|
// Pad to ED448 sizes for interface compatibility
|
|
ed448Priv := make([]byte, 114) // ED448 private key size
|
|
ed448Pub := make([]byte, 57) // ED448 public key size
|
|
copy(ed448Priv, priv)
|
|
copy(ed448Pub, pub)
|
|
|
|
return ed448Pub, ed448Priv
|
|
}
|
|
|
|
func createTestMessage(address []byte, timestamp uint64, content string) *protobufs.InboxMessage {
|
|
return &protobufs.InboxMessage{
|
|
Address: address,
|
|
Timestamp: timestamp,
|
|
EphemeralPublicKey: []byte("ephemeral_key"),
|
|
Message: []byte(content),
|
|
}
|
|
}
|
|
|
|
func createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey []byte) *protobufs.HubAddInboxMessage {
|
|
return &protobufs.HubAddInboxMessage{
|
|
Address: hubAddress,
|
|
InboxPublicKey: inboxPubKey,
|
|
HubPublicKey: hubPubKey,
|
|
HubSignature: bytes.Repeat([]byte{0xAA}, 114), // Mock signature
|
|
InboxSignature: bytes.Repeat([]byte{0xBB}, 114), // Mock signature
|
|
}
|
|
}
|
|
|
|
func createTestHubDeleteMessage(hubAddress, inboxPubKey, hubPubKey []byte) *protobufs.HubDeleteInboxMessage {
|
|
return &protobufs.HubDeleteInboxMessage{
|
|
Address: hubAddress,
|
|
InboxPublicKey: inboxPubKey,
|
|
HubPublicKey: hubPubKey,
|
|
HubSignature: bytes.Repeat([]byte{0xCC}, 114), // Mock signature
|
|
InboxSignature: bytes.Repeat([]byte{0xDD}, 114), // Mock signature
|
|
}
|
|
}
|
|
|
|
func TestDispatchService_SetResponsibleFilters(t *testing.T) {
|
|
service, _, _, _ := setupTestService(t)
|
|
|
|
filters := [][3]byte{
|
|
{0x07, 0x08, 0x09},
|
|
{0x0A, 0x0B, 0x0C},
|
|
}
|
|
|
|
service.SetResponsibleFilters(filters)
|
|
|
|
// Check that new filters are set
|
|
assert.True(t, service.IsResponsibleForFilter([3]byte{0x07, 0x08, 0x09}))
|
|
assert.True(t, service.IsResponsibleForFilter([3]byte{0x0A, 0x0B, 0x0C}))
|
|
|
|
// Check that old filters are no longer set
|
|
assert.False(t, service.IsResponsibleForFilter([3]byte{0x01, 0x02, 0x03}))
|
|
}
|
|
|
|
func TestDispatchService_AddInboxMessage(t *testing.T) {
|
|
service, mockStore, _, _ := setupTestService(t)
|
|
ctx := context.Background()
|
|
|
|
address := bytes.Repeat([]byte{0xAA}, 32)
|
|
filter := up2p.GetBloomFilterIndices(address, 256, 3)
|
|
msg := createTestMessage(address, uint64(time.Now().UnixMilli()), "Test message")
|
|
|
|
// Set service to be responsible for this filter
|
|
service.SetResponsibleFilters([][3]byte{[3]byte(filter)})
|
|
|
|
t.Run("Success", func(t *testing.T) {
|
|
mockStore.On("AddMessage", msg).Return(nil).Once()
|
|
|
|
err := service.AddInboxMessage(ctx, msg)
|
|
require.NoError(t, err)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("NotResponsibleForFilter", func(t *testing.T) {
|
|
// Set different filter responsibility
|
|
service.SetResponsibleFilters([][3]byte{{0xFF, 0xFE, 0xFD}})
|
|
|
|
err := service.AddInboxMessage(ctx, msg)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "not responsible for filter")
|
|
})
|
|
|
|
t.Run("NilMessage", func(t *testing.T) {
|
|
err := service.AddInboxMessage(ctx, nil)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "message is nil")
|
|
})
|
|
}
|
|
|
|
func TestDispatchService_GetInboxMessages(t *testing.T) {
|
|
service, mockStore, _, _ := setupTestService(t)
|
|
ctx := context.Background()
|
|
|
|
filter := [3]byte{0x01, 0x02, 0x03}
|
|
address := bytes.Repeat([]byte{0xAA}, 32)
|
|
|
|
t.Run("GetByFilter", func(t *testing.T) {
|
|
messages := []*protobufs.InboxMessage{
|
|
createTestMessage(address, 1000, "Message 1"),
|
|
createTestMessage(address, 2000, "Message 2"),
|
|
}
|
|
|
|
mockStore.On("GetMessagesByFilter", filter).Return(messages, nil).Once()
|
|
|
|
req := &protobufs.InboxMessageRequest{
|
|
Filter: filter[:],
|
|
}
|
|
|
|
resp, err := service.getInboxMessages(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.Len(t, resp.Messages, 2)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("GetByAddress", func(t *testing.T) {
|
|
messages := []*protobufs.InboxMessage{
|
|
createTestMessage(address, 1000, "Message 1"),
|
|
}
|
|
|
|
mockStore.On("GetMessagesByAddress", filter, address).Return(messages, nil).Once()
|
|
|
|
req := &protobufs.InboxMessageRequest{
|
|
Filter: filter[:],
|
|
Address: address,
|
|
}
|
|
|
|
resp, err := service.getInboxMessages(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.Len(t, resp.Messages, 1)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("GetByTimeRange", func(t *testing.T) {
|
|
messages := []*protobufs.InboxMessage{
|
|
createTestMessage(address, 1500, "Message in range"),
|
|
}
|
|
|
|
mockStore.On("GetMessagesByTimeRange", filter, address, uint64(1000), uint64(2000)).Return(messages, nil).Once()
|
|
|
|
req := &protobufs.InboxMessageRequest{
|
|
Filter: filter[:],
|
|
Address: address,
|
|
FromTimestamp: 1000,
|
|
ToTimestamp: 2000,
|
|
}
|
|
|
|
resp, err := service.getInboxMessages(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.Len(t, resp.Messages, 1)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("InvalidFilterLength", func(t *testing.T) {
|
|
req := &protobufs.InboxMessageRequest{
|
|
Filter: []byte{0x01, 0x02}, // Invalid length
|
|
}
|
|
|
|
_, err := service.getInboxMessages(ctx, req)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "invalid filter length")
|
|
})
|
|
|
|
t.Run("NotResponsibleForFilter", func(t *testing.T) {
|
|
service.SetResponsibleFilters([][3]byte{{0xFF, 0xFE, 0xFD}})
|
|
|
|
req := &protobufs.InboxMessageRequest{
|
|
Filter: filter[:],
|
|
}
|
|
|
|
_, err := service.getInboxMessages(ctx, req)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "not responsible for filter")
|
|
})
|
|
}
|
|
|
|
func TestDispatchService_HubAssociations(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
hubAddress := bytes.Repeat([]byte{0xCC}, 32)
|
|
inboxPubKey, _ := createValidEd448Keys()
|
|
hubPubKey, _ := createValidEd448Keys()
|
|
filter := up2p.GetBloomFilterIndices(hubAddress, 256, 3)
|
|
|
|
// Set service to be responsible for this filter
|
|
|
|
t.Run("AddHubInboxAssociation_Success", func(t *testing.T) {
|
|
addMsg := createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
service, mockStore, mockKeyManager, _ := setupTestService(t)
|
|
service.SetResponsibleFilters([][3]byte{[3]byte(filter)})
|
|
// Mock signature verification
|
|
mockKeyManager.On("ValidateSignature",
|
|
crypto.KeyTypeEd448,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
).Return(true, nil)
|
|
|
|
mockStore.On("AddHubInboxAssociation", addMsg).Return(nil).Once()
|
|
|
|
err := service.AddHubInboxAssociation(ctx, addMsg)
|
|
require.NoError(t, err)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
mockKeyManager.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("AddHubInboxAssociation_InvalidSignature", func(t *testing.T) {
|
|
addMsg := createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
service, _, mockKeyManager, _ := setupTestService(t)
|
|
service.SetResponsibleFilters([][3]byte{[3]byte(filter)})
|
|
// Mock signature verification failure
|
|
mockKeyManager.On("ValidateSignature",
|
|
crypto.KeyTypeEd448,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
).Return(false, nil)
|
|
|
|
err := service.AddHubInboxAssociation(ctx, addMsg)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "signature verification failed")
|
|
|
|
mockKeyManager.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("DeleteHubInboxAssociation_Success", func(t *testing.T) {
|
|
deleteMsg := createTestHubDeleteMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
service, mockStore, mockKeyManager, _ := setupTestService(t)
|
|
service.SetResponsibleFilters([][3]byte{[3]byte(filter)})
|
|
// Mock signature verification
|
|
mockKeyManager.On("ValidateSignature",
|
|
crypto.KeyTypeEd448,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
).Return(true, nil)
|
|
|
|
mockStore.On("DeleteHubInboxAssociation", deleteMsg).Return(nil).Once()
|
|
|
|
err := service.DeleteHubInboxAssociation(ctx, deleteMsg)
|
|
require.NoError(t, err)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
mockKeyManager.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("GetHub", func(t *testing.T) {
|
|
service, mockStore, _, _ := setupTestService(t)
|
|
service.SetResponsibleFilters([][3]byte{[3]byte(filter)})
|
|
hubResponse := &protobufs.HubResponse{
|
|
Adds: []*protobufs.HubAddInboxMessage{
|
|
createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey),
|
|
},
|
|
Deletes: []*protobufs.HubDeleteInboxMessage{},
|
|
}
|
|
|
|
mockStore.On("GetHubAssociations", [3]byte(filter), hubAddress).Return(hubResponse, nil).Once()
|
|
|
|
req := &protobufs.HubRequest{
|
|
Filter: filter,
|
|
HubAddress: hubAddress,
|
|
}
|
|
|
|
resp, err := service.getHub(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.Len(t, resp.Adds, 1)
|
|
assert.Len(t, resp.Deletes, 0)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("NilMessages", func(t *testing.T) {
|
|
service, _, _, _ := setupTestService(t)
|
|
service.SetResponsibleFilters([][3]byte{[3]byte(filter)})
|
|
err := service.AddHubInboxAssociation(ctx, nil)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "add message is nil")
|
|
|
|
err = service.DeleteHubInboxAssociation(ctx, nil)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "delete message is nil")
|
|
})
|
|
}
|
|
|
|
func TestDispatchService_Sync(t *testing.T) {
|
|
service, mockStore, _, _ := setupTestService(t)
|
|
ctx := context.Background()
|
|
|
|
filter1 := [3]byte{0x01, 0x02, 0x03}
|
|
filter2 := [3]byte{0x04, 0x05, 0x06}
|
|
|
|
t.Run("Success", func(t *testing.T) {
|
|
messages := []*protobufs.InboxMessage{
|
|
createTestMessage(bytes.Repeat([]byte{0xAA}, 32), 1000, "Message 1"),
|
|
createTestMessage(bytes.Repeat([]byte{0xBB}, 32), 2000, "Message 2"),
|
|
}
|
|
|
|
hubs := []*protobufs.HubResponse{
|
|
{
|
|
Adds: []*protobufs.HubAddInboxMessage{
|
|
createTestHubAddMessage(bytes.Repeat([]byte{0xCC}, 32), bytes.Repeat([]byte{0x11}, 57), bytes.Repeat([]byte{0x22}, 57)),
|
|
},
|
|
Deletes: []*protobufs.HubDeleteInboxMessage{},
|
|
},
|
|
}
|
|
|
|
mockStore.On("GetAllMessagesCRDT", [][3]byte{filter1, filter2}).Return(messages, nil).Once()
|
|
mockStore.On("GetAllHubsCRDT", [][3]byte{filter1, filter2}).Return(hubs, nil).Once()
|
|
|
|
req := &protobufs.DispatchSyncRequest{
|
|
Filters: [][]byte{filter1[:], filter2[:]},
|
|
}
|
|
|
|
resp, err := service.Sync(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.Len(t, resp.Messages, 2)
|
|
assert.Len(t, resp.Hubs, 1)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("NoResponsibleFilters", func(t *testing.T) {
|
|
// Set service to not be responsible for any of the requested filters
|
|
service.SetResponsibleFilters([][3]byte{{0xFF, 0xFE, 0xFD}})
|
|
|
|
req := &protobufs.DispatchSyncRequest{
|
|
Filters: [][]byte{filter1[:], filter2[:]},
|
|
}
|
|
|
|
resp, err := service.Sync(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.Len(t, resp.Messages, 0)
|
|
assert.Len(t, resp.Hubs, 0)
|
|
})
|
|
|
|
t.Run("InvalidFilterLength", func(t *testing.T) {
|
|
req := &protobufs.DispatchSyncRequest{
|
|
Filters: [][]byte{{0x01, 0x02}}, // Invalid length
|
|
}
|
|
|
|
resp, err := service.Sync(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.Len(t, resp.Messages, 0)
|
|
assert.Len(t, resp.Hubs, 0)
|
|
})
|
|
}
|
|
|
|
func TestDispatchService_gRPCMethods(t *testing.T) {
|
|
service, mockStore, mockKeyManager, _ := setupTestService(t)
|
|
ctx := context.Background()
|
|
|
|
address := bytes.Repeat([]byte{0xAA}, 32)
|
|
filter := up2p.GetBloomFilterIndices(address, 256, 3)
|
|
service.SetResponsibleFilters([][3]byte{[3]byte(filter)})
|
|
|
|
t.Run("PutInboxMessage", func(t *testing.T) {
|
|
msg := createTestMessage(address, uint64(time.Now().UnixMilli()), "gRPC test")
|
|
mockStore.On("AddMessage", msg).Return(nil).Once()
|
|
|
|
req := &protobufs.InboxMessagePut{
|
|
Message: msg,
|
|
}
|
|
|
|
resp, err := service.PutInboxMessage(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.IsType(t, &emptypb.Empty{}, resp)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("GetInboxMessages_gRPC", func(t *testing.T) {
|
|
messages := []*protobufs.InboxMessage{
|
|
createTestMessage(address, 1000, "gRPC Message"),
|
|
}
|
|
|
|
mockStore.On("GetMessagesByFilter", [3]byte(filter)).Return(messages, nil).Once()
|
|
|
|
req := &protobufs.InboxMessageRequest{
|
|
Filter: filter,
|
|
}
|
|
|
|
resp, err := service.GetInboxMessages(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.Len(t, resp.Messages, 1)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("PutHub", func(t *testing.T) {
|
|
hubAddress := bytes.Repeat([]byte{0xCC}, 32)
|
|
inboxPubKey, _ := createValidEd448Keys()
|
|
hubPubKey, _ := createValidEd448Keys()
|
|
|
|
addMsg := createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
deleteMsg := createTestHubDeleteMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
|
|
// Mock signature verification
|
|
mockKeyManager.On("ValidateSignature",
|
|
crypto.KeyTypeEd448,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
).Return(true, nil)
|
|
|
|
mockStore.On("AddHubInboxAssociation", addMsg).Return(nil).Once()
|
|
mockStore.On("DeleteHubInboxAssociation", deleteMsg).Return(nil).Once()
|
|
|
|
req := &protobufs.HubPut{
|
|
Add: addMsg,
|
|
Delete: deleteMsg,
|
|
}
|
|
|
|
resp, err := service.PutHub(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.IsType(t, &emptypb.Empty{}, resp)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
mockKeyManager.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("GetHub_gRPC", func(t *testing.T) {
|
|
hubAddress := bytes.Repeat([]byte{0xCC}, 32)
|
|
hubResponse := &protobufs.HubResponse{
|
|
Adds: []*protobufs.HubAddInboxMessage{},
|
|
Deletes: []*protobufs.HubDeleteInboxMessage{},
|
|
}
|
|
|
|
mockStore.On("GetHubAssociations", [3]byte(filter), hubAddress).Return(hubResponse, nil).Once()
|
|
|
|
req := &protobufs.HubRequest{
|
|
Filter: filter,
|
|
HubAddress: hubAddress,
|
|
}
|
|
|
|
resp, err := service.GetHub(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.NotNil(t, resp)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("Sync_gRPC", func(t *testing.T) {
|
|
messages := []*protobufs.InboxMessage{}
|
|
hubs := []*protobufs.HubResponse{}
|
|
|
|
mockStore.On("GetAllMessagesCRDT", [][3]byte{[3]byte(filter)}).Return(messages, nil).Once()
|
|
mockStore.On("GetAllHubsCRDT", [][3]byte{[3]byte(filter)}).Return(hubs, nil).Once()
|
|
|
|
req := &protobufs.DispatchSyncRequest{
|
|
Filters: [][]byte{filter},
|
|
}
|
|
|
|
resp, err := service.Sync(ctx, req)
|
|
require.NoError(t, err)
|
|
assert.NotNil(t, resp)
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
}
|
|
|
|
func TestDispatchService_SignatureVerification(t *testing.T) {
|
|
hubAddress := bytes.Repeat([]byte{0xCC}, 32)
|
|
inboxPubKey, _ := createValidEd448Keys()
|
|
hubPubKey, _ := createValidEd448Keys()
|
|
|
|
t.Run("VerifyHubAddSignatures_Success", func(t *testing.T) {
|
|
msg := createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
service, _, mockKeyManager, _ := setupTestService(t)
|
|
|
|
// Mock successful signature verification
|
|
mockKeyManager.On("ValidateSignature",
|
|
crypto.KeyTypeEd448,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
).Return(true, nil)
|
|
|
|
err := service.verifyHubAddSignatures(msg)
|
|
require.NoError(t, err)
|
|
|
|
mockKeyManager.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("VerifyHubAddSignatures_InvalidHub", func(t *testing.T) {
|
|
msg := createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
service, _, mockKeyManager, _ := setupTestService(t)
|
|
|
|
// Mock hub signature failure
|
|
mockKeyManager.On("ValidateSignature",
|
|
crypto.KeyTypeEd448,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
).Return(false, nil)
|
|
|
|
err := service.verifyHubAddSignatures(msg)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "invalid hub signature")
|
|
|
|
mockKeyManager.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("VerifyHubDeleteSignatures_Success", func(t *testing.T) {
|
|
msg := createTestHubDeleteMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
service, _, mockKeyManager, _ := setupTestService(t)
|
|
|
|
// Mock successful signature verification
|
|
mockKeyManager.On("ValidateSignature",
|
|
crypto.KeyTypeEd448,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
).Return(true, nil)
|
|
|
|
err := service.verifyHubDeleteSignatures(msg)
|
|
require.NoError(t, err)
|
|
|
|
mockKeyManager.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("VerifyEd448Signature_InvalidKeySize", func(t *testing.T) {
|
|
service, _, _, _ := setupTestService(t)
|
|
// Test with invalid key sizes
|
|
assert.False(t, service.verifyEd448Signature([]byte{0x01}, []byte("message"), []byte{0x01}))
|
|
assert.False(t, service.verifyEd448Signature(make([]byte, 57), []byte("message"), []byte{0x01}))
|
|
assert.False(t, service.verifyEd448Signature(make([]byte, 10), []byte("message"), make([]byte, 114)))
|
|
})
|
|
|
|
t.Run("VerifyEd448Signature_KeyManagerError", func(t *testing.T) {
|
|
service, _, mockKeyManager, _ := setupTestService(t)
|
|
validPubKey := make([]byte, 57)
|
|
validSignature := make([]byte, 114)
|
|
message := []byte("test message")
|
|
|
|
// Mock key manager error
|
|
mockKeyManager.On("ValidateSignature",
|
|
crypto.KeyTypeEd448,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
).Return(false, errors.New("!!!"))
|
|
|
|
result := service.verifyEd448Signature(validPubKey, message, validSignature)
|
|
assert.False(t, result)
|
|
|
|
mockKeyManager.AssertExpectations(t)
|
|
})
|
|
}
|
|
|
|
func TestDispatchService_BackgroundProcesses(t *testing.T) {
|
|
t.Skip()
|
|
filter1 := [3]byte{0x01, 0x02, 0x03}
|
|
filter2 := [3]byte{0x04, 0x05, 0x06}
|
|
|
|
t.Run("ReapMessages", func(t *testing.T) {
|
|
service, mockStore, _, _ := setupTestService(t)
|
|
// Set responsible filters and short retention period
|
|
service.SetResponsibleFilters([][3]byte{filter1, filter2})
|
|
service.SetRetentionPeriod(1 * time.Hour)
|
|
|
|
// Calculate expected cutoff time
|
|
expectedCutoff := time.Now().Add(-1 * time.Hour).UnixMilli()
|
|
|
|
// Mock the ReapMessages calls for both filters
|
|
mockStore.On("ReapMessages", mock.Anything, mock.MatchedBy(func(ts uint64) bool {
|
|
// Allow some time variance (within 5 seconds)
|
|
return int64(ts) >= expectedCutoff-5000 && int64(ts) <= expectedCutoff+5000
|
|
})).Return(nil).Once()
|
|
|
|
mockStore.On("ReapMessages", mock.Anything, mock.MatchedBy(func(ts uint64) bool {
|
|
return int64(ts) >= expectedCutoff-5000 && int64(ts) <= expectedCutoff+5000
|
|
})).Return(nil).Once()
|
|
|
|
// Manually trigger reaping
|
|
service.reapMessages()
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("ReapMessages_WithError", func(t *testing.T) {
|
|
service, mockStore, _, _ := setupTestService(t)
|
|
service.SetResponsibleFilters([][3]byte{filter1})
|
|
service.SetRetentionPeriod(1 * time.Hour)
|
|
|
|
// Mock error from store
|
|
mockStore.On("ReapMessages", mock.Anything, mock.Anything).Return(errors.New("reap error")).Once()
|
|
|
|
// Should not panic even with error
|
|
service.reapMessages()
|
|
|
|
mockStore.AssertExpectations(t)
|
|
})
|
|
|
|
t.Run("StartStop", func(t *testing.T) {
|
|
// Create a fresh service for lifecycle testing
|
|
freshService, _, _, _ := setupTestService(t)
|
|
freshService.SetReapInterval(10 * time.Millisecond) // Fast interval for testing
|
|
|
|
// Test that Start and Stop work correctly
|
|
freshService.Start()
|
|
|
|
// Give it a moment to start
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// Stop should gracefully shut down
|
|
freshService.Stop()
|
|
|
|
// Multiple stops should not panic
|
|
freshService.Stop()
|
|
})
|
|
|
|
t.Run("ConfigurationSetters", func(t *testing.T) {
|
|
service, _, _, _ := setupTestService(t)
|
|
newInterval := 30 * time.Minute
|
|
newPeriod := 3 * 24 * time.Hour
|
|
|
|
service.SetReapInterval(newInterval)
|
|
service.SetRetentionPeriod(newPeriod)
|
|
|
|
// Verify configuration was set
|
|
service.mu.RLock()
|
|
assert.Equal(t, newInterval, service.reapInterval)
|
|
assert.Equal(t, newPeriod, service.retentionPeriod)
|
|
service.mu.RUnlock()
|
|
})
|
|
}
|
|
|
|
func BenchmarkDispatchService(b *testing.B) {
|
|
service, mockStore, mockKeyManager, _ := setupBenchService(b)
|
|
ctx := context.Background()
|
|
|
|
address := bytes.Repeat([]byte{0xAA}, 32)
|
|
filter := up2p.GetBloomFilterIndices(address, 256, 3)
|
|
service.SetResponsibleFilters([][3]byte{[3]byte(filter)})
|
|
|
|
b.Run("AddInboxMessage", func(b *testing.B) {
|
|
mockStore.On("AddMessage", mock.Anything).Return(nil)
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
msg := createTestMessage(address, uint64(i), "Benchmark message")
|
|
_ = service.AddInboxMessage(ctx, msg)
|
|
}
|
|
})
|
|
|
|
b.Run("AddHubInboxAssociation", func(b *testing.B) {
|
|
mockKeyManager.On("ValidateSignature",
|
|
crypto.KeyTypeEd448,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
).Return(true, nil)
|
|
mockStore.On("AddHubInboxAssociation", mock.Anything).Return(nil)
|
|
|
|
hubAddress := bytes.Repeat([]byte{0xCC}, 32)
|
|
inboxPubKey := make([]byte, 57)
|
|
hubPubKey := make([]byte, 57)
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
msg := createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
_ = service.AddHubInboxAssociation(ctx, msg)
|
|
}
|
|
})
|
|
}
|