mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
* v2.1.0 [omit consensus and adjacent] - this commit will be amended with the full release after the file copy is complete * 2.1.0 main node rollup
626 lines
19 KiB
Go
626 lines
19 KiB
Go
package store
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/sha256"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"source.quilibrium.com/quilibrium/monorepo/config"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
|
|
)
|
|
|
|
func setupTestInboxStore(t *testing.T) *PebbleInboxStore {
|
|
logger, _ := zap.NewDevelopment()
|
|
tempDB := NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}, 0)
|
|
return NewPebbleInboxStore(tempDB, logger)
|
|
}
|
|
|
|
func createTestMessage(address []byte, timestamp uint64, content string) *protobufs.InboxMessage {
|
|
return &protobufs.InboxMessage{
|
|
Address: address,
|
|
Timestamp: timestamp,
|
|
EphemeralPublicKey: []byte("ephemeral_key"),
|
|
Message: []byte(content),
|
|
}
|
|
}
|
|
|
|
func createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey []byte) *protobufs.HubAddInboxMessage {
|
|
return &protobufs.HubAddInboxMessage{
|
|
Address: hubAddress,
|
|
InboxPublicKey: inboxPubKey,
|
|
HubPublicKey: hubPubKey,
|
|
HubSignature: bytes.Repeat([]byte{0xAA}, 114), // Mock signature
|
|
InboxSignature: bytes.Repeat([]byte{0xBB}, 114), // Mock signature
|
|
}
|
|
}
|
|
|
|
func createTestHubDeleteMessage(hubAddress, inboxPubKey, hubPubKey []byte) *protobufs.HubDeleteInboxMessage {
|
|
return &protobufs.HubDeleteInboxMessage{
|
|
Address: hubAddress,
|
|
InboxPublicKey: inboxPubKey,
|
|
HubPublicKey: hubPubKey,
|
|
HubSignature: bytes.Repeat([]byte{0xCC}, 114), // Mock signature
|
|
InboxSignature: bytes.Repeat([]byte{0xDD}, 114), // Mock signature
|
|
}
|
|
}
|
|
|
|
func TestCRDTMessageOperations(t *testing.T) {
|
|
store := setupTestInboxStore(t)
|
|
|
|
address1 := bytes.Repeat([]byte{0xAA}, 32)
|
|
address2 := bytes.Repeat([]byte{0xBB}, 32)
|
|
timestamp1 := uint64(time.Now().UnixMilli())
|
|
timestamp2 := timestamp1 + 1000
|
|
|
|
msg1 := createTestMessage(address1, timestamp1, "Hello World 1")
|
|
msg2 := createTestMessage(address1, timestamp2, "Hello World 2")
|
|
msg3 := createTestMessage(address2, timestamp1, "Hello World 3")
|
|
|
|
t.Run("AddMessage_GrowOnlySet", func(t *testing.T) {
|
|
// Add messages to the grow-only set
|
|
err := store.AddMessage(msg1)
|
|
require.NoError(t, err)
|
|
|
|
err = store.AddMessage(msg2)
|
|
require.NoError(t, err)
|
|
|
|
err = store.AddMessage(msg3)
|
|
require.NoError(t, err)
|
|
|
|
// Adding the same message again should not error (grow-only set)
|
|
err = store.AddMessage(msg1)
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("GetMessagesByFilter", func(t *testing.T) {
|
|
filter1 := up2p.GetBloomFilterIndices(address1, 256, 3)
|
|
filter2 := up2p.GetBloomFilterIndices(address2, 256, 3)
|
|
|
|
// Get messages for address1's filter
|
|
messages, err := store.GetMessagesByFilter([3]byte(filter1))
|
|
require.NoError(t, err)
|
|
|
|
// Should contain at least msg1 and msg2 (might have duplicates from previous test)
|
|
assert.True(t, len(messages) >= 2)
|
|
|
|
// Verify messages are sorted by timestamp
|
|
for i := 1; i < len(messages); i++ {
|
|
assert.True(t, messages[i-1].Timestamp <= messages[i].Timestamp)
|
|
}
|
|
|
|
// Get messages for address2's filter
|
|
messages, err = store.GetMessagesByFilter([3]byte(filter2))
|
|
require.NoError(t, err)
|
|
assert.True(t, len(messages) >= 1)
|
|
})
|
|
|
|
t.Run("GetMessagesByAddress", func(t *testing.T) {
|
|
filter1 := up2p.GetBloomFilterIndices(address1, 256, 3)
|
|
|
|
messages, err := store.GetMessagesByAddress([3]byte(filter1), address1)
|
|
require.NoError(t, err)
|
|
|
|
// All messages should be for address1
|
|
for _, msg := range messages {
|
|
assert.Equal(t, address1, msg.Address)
|
|
}
|
|
})
|
|
|
|
t.Run("GetMessagesByTimeRange", func(t *testing.T) {
|
|
filter1 := up2p.GetBloomFilterIndices(address1, 256, 3)
|
|
|
|
// Get messages in a specific time range
|
|
messages, err := store.GetMessagesByTimeRange([3]byte(filter1), address1, timestamp1, timestamp2)
|
|
require.NoError(t, err)
|
|
|
|
// All messages should be within the time range
|
|
for _, msg := range messages {
|
|
assert.True(t, msg.Timestamp >= timestamp1)
|
|
assert.True(t, msg.Timestamp <= timestamp2)
|
|
}
|
|
|
|
// Test open-ended range (toTimestamp = 0)
|
|
messages, err = store.GetMessagesByTimeRange([3]byte(filter1), address1, timestamp1, 0)
|
|
require.NoError(t, err)
|
|
|
|
for _, msg := range messages {
|
|
assert.True(t, msg.Timestamp >= timestamp1)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCRDTMessageReaping(t *testing.T) {
|
|
store := setupTestInboxStore(t)
|
|
|
|
address := bytes.Repeat([]byte{0xCC}, 32)
|
|
filter := up2p.GetBloomFilterIndices(address, 256, 3)
|
|
|
|
// Add messages at different timestamps
|
|
timestamps := []uint64{1000, 2000, 3000, 4000, 5000}
|
|
for i, ts := range timestamps {
|
|
msg := createTestMessage(address, ts, "Message "+string(rune('A'+i)))
|
|
err := store.AddMessage(msg)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Verify all messages exist
|
|
messages, err := store.GetMessagesByFilter([3]byte(filter))
|
|
require.NoError(t, err)
|
|
assert.True(t, len(messages) >= 5)
|
|
|
|
// Reap messages older than 3500 (should remove messages at 1000, 2000, 3000)
|
|
err = store.ReapMessages([3]byte(filter), 3500)
|
|
require.NoError(t, err)
|
|
|
|
// Verify remaining messages
|
|
messages, err = store.GetMessagesByFilter([3]byte(filter))
|
|
require.NoError(t, err)
|
|
|
|
// Should only have messages with timestamps >= 3500
|
|
for _, msg := range messages {
|
|
assert.True(t, msg.Timestamp >= 3500)
|
|
}
|
|
}
|
|
|
|
func TestCRDTHubAssociations(t *testing.T) {
|
|
store := setupTestInboxStore(t)
|
|
|
|
hubAddress := bytes.Repeat([]byte{0xDD}, 32)
|
|
inboxPubKey1 := bytes.Repeat([]byte{0x11}, 57) // Ed448 key size
|
|
inboxPubKey2 := bytes.Repeat([]byte{0x22}, 57)
|
|
hubPubKey := bytes.Repeat([]byte{0xAA}, 57)
|
|
filter := up2p.GetBloomFilterIndices(hubAddress, 256, 3)
|
|
|
|
t.Run("AddHubInboxAssociation_2PSet", func(t *testing.T) {
|
|
// Add first association
|
|
add1 := createTestHubAddMessage(hubAddress, inboxPubKey1, hubPubKey)
|
|
err := store.AddHubInboxAssociation(add1)
|
|
require.NoError(t, err)
|
|
|
|
// Add second association
|
|
add2 := createTestHubAddMessage(hubAddress, inboxPubKey2, hubPubKey)
|
|
err = store.AddHubInboxAssociation(add2)
|
|
require.NoError(t, err)
|
|
|
|
// Adding the same association again should not error (2P-Set allows re-adds)
|
|
err = store.AddHubInboxAssociation(add1)
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("GetHubAssociations_EffectiveState", func(t *testing.T) {
|
|
response, err := store.GetHubAssociations([3]byte(filter), hubAddress)
|
|
require.NoError(t, err)
|
|
|
|
// Should have 2 effective associations
|
|
assert.Len(t, response.Adds, 2)
|
|
assert.Len(t, response.Deletes, 0)
|
|
|
|
// Verify the associations
|
|
foundKeys := make(map[string]bool)
|
|
for _, add := range response.Adds {
|
|
key := string(add.InboxPublicKey)
|
|
foundKeys[key] = true
|
|
}
|
|
assert.True(t, foundKeys[string(inboxPubKey1)])
|
|
assert.True(t, foundKeys[string(inboxPubKey2)])
|
|
})
|
|
|
|
t.Run("DeleteHubInboxAssociation_2PSet", func(t *testing.T) {
|
|
// Delete first association
|
|
delete1 := createTestHubDeleteMessage(hubAddress, inboxPubKey1, hubPubKey)
|
|
err := store.DeleteHubInboxAssociation(delete1)
|
|
require.NoError(t, err)
|
|
|
|
// Verify effective state
|
|
response, err := store.GetHubAssociations([3]byte(filter), hubAddress)
|
|
require.NoError(t, err)
|
|
|
|
// Should have 1 effective add and 1 delete
|
|
assert.Len(t, response.Adds, 1)
|
|
assert.Len(t, response.Deletes, 1)
|
|
|
|
// The remaining add should be for inboxPubKey2
|
|
assert.Equal(t, inboxPubKey2, response.Adds[0].InboxPublicKey)
|
|
assert.Equal(t, inboxPubKey1, response.Deletes[0].InboxPublicKey)
|
|
})
|
|
|
|
t.Run("GetHubAddHistory_CRDTSync", func(t *testing.T) {
|
|
// Get all add operations (never deleted)
|
|
adds, err := store.GetHubAddHistory([3]byte(filter), hubAddress)
|
|
require.NoError(t, err)
|
|
|
|
// Should have all add operations ever performed
|
|
assert.True(t, len(adds) >= 2) // At least the original 2 adds
|
|
|
|
// Verify operations are preserved
|
|
foundKeys := make(map[string]int)
|
|
for _, add := range adds {
|
|
key := string(add.InboxPublicKey)
|
|
foundKeys[key]++
|
|
}
|
|
assert.True(t, foundKeys[string(inboxPubKey1)] >= 1)
|
|
assert.True(t, foundKeys[string(inboxPubKey2)] >= 1)
|
|
})
|
|
|
|
t.Run("GetHubDeleteHistory_CRDTSync", func(t *testing.T) {
|
|
// Get all delete operations (never deleted)
|
|
deletes, err := store.GetHubDeleteHistory([3]byte(filter), hubAddress)
|
|
require.NoError(t, err)
|
|
|
|
// Should have 1 delete operation
|
|
assert.Len(t, deletes, 1)
|
|
assert.Equal(t, inboxPubKey1, deletes[0].InboxPublicKey)
|
|
})
|
|
|
|
t.Run("2PSet_Idempotency", func(t *testing.T) {
|
|
// Re-add a deleted association
|
|
add1Again := createTestHubAddMessage(hubAddress, inboxPubKey1, hubPubKey)
|
|
err := store.AddHubInboxAssociation(add1Again)
|
|
require.NoError(t, err)
|
|
|
|
// It should still be considered deleted (2P-Set semantics)
|
|
response, err := store.GetHubAssociations([3]byte(filter), hubAddress)
|
|
require.NoError(t, err)
|
|
|
|
// Should still have 1 effective add and 1 delete
|
|
assert.Len(t, response.Adds, 1)
|
|
assert.Len(t, response.Deletes, 1)
|
|
assert.Equal(t, inboxPubKey2, response.Adds[0].InboxPublicKey)
|
|
|
|
// Re-delete the same association (should be idempotent)
|
|
delete1Again := createTestHubDeleteMessage(hubAddress, inboxPubKey1, hubPubKey)
|
|
err = store.DeleteHubInboxAssociation(delete1Again)
|
|
require.NoError(t, err)
|
|
|
|
// State should remain the same
|
|
response, err = store.GetHubAssociations([3]byte(filter), hubAddress)
|
|
require.NoError(t, err)
|
|
assert.Len(t, response.Adds, 1)
|
|
assert.Len(t, response.Deletes, 1)
|
|
})
|
|
}
|
|
|
|
func TestCRDTSynchronization(t *testing.T) {
|
|
store := setupTestInboxStore(t)
|
|
|
|
// Set up test data
|
|
address1 := bytes.Repeat([]byte{0xAA}, 32)
|
|
address2 := bytes.Repeat([]byte{0xBB}, 32)
|
|
filter1 := up2p.GetBloomFilterIndices(address1, 256, 3)
|
|
filter2 := up2p.GetBloomFilterIndices(address2, 256, 3)
|
|
|
|
// Add messages
|
|
msg1 := createTestMessage(address1, 1000, "Message 1")
|
|
msg2 := createTestMessage(address2, 2000, "Message 2")
|
|
|
|
err := store.AddMessage(msg1)
|
|
require.NoError(t, err)
|
|
err = store.AddMessage(msg2)
|
|
require.NoError(t, err)
|
|
|
|
// Add hub associations
|
|
hubAddress := bytes.Repeat([]byte{0xCC}, 32)
|
|
inboxPubKey := bytes.Repeat([]byte{0x11}, 57)
|
|
hubPubKey := bytes.Repeat([]byte{0xAA}, 57)
|
|
|
|
add := createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
err = store.AddHubInboxAssociation(add)
|
|
require.NoError(t, err)
|
|
|
|
t.Run("GetAllMessagesCRDT", func(t *testing.T) {
|
|
filters := [][3]byte{[3]byte(filter1), [3]byte(filter2)}
|
|
messages, err := store.GetAllMessagesCRDT(filters)
|
|
require.NoError(t, err)
|
|
|
|
assert.True(t, len(messages) >= 2)
|
|
|
|
// Verify we have messages from both filters
|
|
foundAddresses := make(map[string]bool)
|
|
for _, msg := range messages {
|
|
foundAddresses[string(msg.Address)] = true
|
|
}
|
|
assert.True(t, foundAddresses[string(address1)])
|
|
assert.True(t, foundAddresses[string(address2)])
|
|
})
|
|
|
|
t.Run("GetAllHubAssociations", func(t *testing.T) {
|
|
hubFilter := up2p.GetBloomFilterIndices(hubAddress, 256, 3)
|
|
filters := [][3]byte{[3]byte(hubFilter)}
|
|
|
|
hubs, err := store.GetAllHubAssociations(filters)
|
|
require.NoError(t, err)
|
|
|
|
assert.True(t, len(hubs) >= 1)
|
|
|
|
// Find our hub
|
|
var foundHub *protobufs.HubResponse
|
|
for _, hub := range hubs {
|
|
if len(hub.Adds) > 0 && bytes.Equal(hub.Adds[0].Address, hubAddress) {
|
|
foundHub = hub
|
|
break
|
|
}
|
|
}
|
|
|
|
require.NotNil(t, foundHub)
|
|
assert.Len(t, foundHub.Adds, 1)
|
|
assert.Equal(t, inboxPubKey, foundHub.Adds[0].InboxPublicKey)
|
|
})
|
|
|
|
t.Run("GetAllHubsCRDT", func(t *testing.T) {
|
|
hubFilter := up2p.GetBloomFilterIndices(hubAddress, 256, 3)
|
|
filters := [][3]byte{[3]byte(hubFilter)}
|
|
|
|
hubs, err := store.GetAllHubsCRDT(filters)
|
|
require.NoError(t, err)
|
|
|
|
assert.True(t, len(hubs) >= 1)
|
|
})
|
|
}
|
|
|
|
func TestCRDTMaterializedViews(t *testing.T) {
|
|
store := setupTestInboxStore(t)
|
|
|
|
hubAddress := bytes.Repeat([]byte{0xEE}, 32)
|
|
inboxPubKey1 := bytes.Repeat([]byte{0x11}, 57)
|
|
inboxPubKey2 := bytes.Repeat([]byte{0x22}, 57)
|
|
hubPubKey := bytes.Repeat([]byte{0xAA}, 57)
|
|
filter := up2p.GetBloomFilterIndices(hubAddress, 256, 3)
|
|
|
|
t.Run("MaterializedView_UpdatesOnAdd", func(t *testing.T) {
|
|
// Add association
|
|
add1 := createTestHubAddMessage(hubAddress, inboxPubKey1, hubPubKey)
|
|
err := store.AddHubInboxAssociation(add1)
|
|
require.NoError(t, err)
|
|
|
|
// Materialized view should be updated
|
|
response, err := store.GetHubAssociations([3]byte(filter), hubAddress)
|
|
require.NoError(t, err)
|
|
assert.Len(t, response.Adds, 1)
|
|
})
|
|
|
|
t.Run("MaterializedView_UpdatesOnDelete", func(t *testing.T) {
|
|
// Add another association
|
|
add2 := createTestHubAddMessage(hubAddress, inboxPubKey2, hubPubKey)
|
|
err := store.AddHubInboxAssociation(add2)
|
|
require.NoError(t, err)
|
|
|
|
// Delete first association
|
|
delete1 := createTestHubDeleteMessage(hubAddress, inboxPubKey1, hubPubKey)
|
|
err = store.DeleteHubInboxAssociation(delete1)
|
|
require.NoError(t, err)
|
|
|
|
// Materialized view should reflect the deletion
|
|
response, err := store.GetHubAssociations([3]byte(filter), hubAddress)
|
|
require.NoError(t, err)
|
|
assert.Len(t, response.Adds, 1)
|
|
assert.Len(t, response.Deletes, 1)
|
|
assert.Equal(t, inboxPubKey2, response.Adds[0].InboxPublicKey)
|
|
assert.Equal(t, inboxPubKey1, response.Deletes[0].InboxPublicKey)
|
|
})
|
|
}
|
|
|
|
func TestCRDTKeyConstruction(t *testing.T) {
|
|
address := bytes.Repeat([]byte{0xAA}, 32)
|
|
timestamp := uint64(1234567890)
|
|
msg := createTestMessage(address, timestamp, "Test message")
|
|
|
|
t.Run("messageKey", func(t *testing.T) {
|
|
key := messageKey(msg)
|
|
// Verify key structure:
|
|
// [INBOX][INBOX_MESSAGE][filter (3)][timestamp (8)][address_hash (32)][message_hash (32)]
|
|
assert.Equal(t, byte(INBOX), key[0])
|
|
assert.Equal(t, byte(INBOX_MESSAGE), key[1])
|
|
|
|
// Extract filter (next 3 bytes starting at index 2)
|
|
filter := up2p.GetBloomFilterIndices(address, 256, 3)
|
|
assert.Equal(t, filter, key[2:5])
|
|
|
|
// Verify key is deterministic
|
|
key2 := messageKey(msg)
|
|
assert.Equal(t, key, key2)
|
|
})
|
|
|
|
t.Run("HubAddKey", func(t *testing.T) {
|
|
hubAddress := bytes.Repeat([]byte{0xBB}, 32)
|
|
inboxPubKey := bytes.Repeat([]byte{0x11}, 57)
|
|
hubPubKey := bytes.Repeat([]byte{0x22}, 57)
|
|
|
|
add := createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
key := hubAddKey(add)
|
|
|
|
// Verify key structure
|
|
assert.Equal(t, byte(INBOX), key[0])
|
|
assert.Equal(t, byte(INBOX_HUB_ADDS), key[1])
|
|
|
|
// Verify key is deterministic
|
|
key2 := hubAddKey(add)
|
|
assert.Equal(t, key, key2)
|
|
})
|
|
|
|
t.Run("HubDeleteKey", func(t *testing.T) {
|
|
hubAddress := bytes.Repeat([]byte{0xBB}, 32)
|
|
inboxPubKey := bytes.Repeat([]byte{0x11}, 57)
|
|
hubPubKey := bytes.Repeat([]byte{0x22}, 57)
|
|
|
|
delete := createTestHubDeleteMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
key := hubDeleteKey(delete)
|
|
|
|
// Verify key structure
|
|
assert.Equal(t, byte(INBOX), key[0])
|
|
assert.Equal(t, byte(INBOX_HUB_DELETES), key[1])
|
|
|
|
// Verify key is deterministic
|
|
key2 := hubDeleteKey(delete)
|
|
assert.Equal(t, key, key2)
|
|
})
|
|
}
|
|
|
|
func TestCRDTErrorCases(t *testing.T) {
|
|
store := setupTestInboxStore(t)
|
|
|
|
t.Run("AddMessage_NilMessage", func(t *testing.T) {
|
|
err := store.AddMessage(nil)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "message is nil")
|
|
})
|
|
|
|
t.Run("AddHubInboxAssociation_NilMessage", func(t *testing.T) {
|
|
err := store.AddHubInboxAssociation(nil)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "add message is nil")
|
|
})
|
|
|
|
t.Run("DeleteHubInboxAssociation_NilMessage", func(t *testing.T) {
|
|
err := store.DeleteHubInboxAssociation(nil)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "delete message is nil")
|
|
})
|
|
|
|
t.Run("GetHubAssociations_NonExistentHub", func(t *testing.T) {
|
|
nonExistentHub := bytes.Repeat([]byte{0xFF}, 32)
|
|
filter := up2p.GetBloomFilterIndices(nonExistentHub, 256, 3)
|
|
|
|
response, err := store.GetHubAssociations([3]byte(filter), nonExistentHub)
|
|
require.NoError(t, err)
|
|
|
|
// Should return empty response, not error
|
|
assert.Len(t, response.Adds, 0)
|
|
assert.Len(t, response.Deletes, 0)
|
|
})
|
|
}
|
|
|
|
func TestCRDTConcurrentScenarios(t *testing.T) {
|
|
store := setupTestInboxStore(t)
|
|
|
|
hubAddress := bytes.Repeat([]byte{0xFF}, 32)
|
|
inboxPubKey := bytes.Repeat([]byte{0x11}, 57)
|
|
hubPubKey := bytes.Repeat([]byte{0x22}, 57)
|
|
filter := up2p.GetBloomFilterIndices(hubAddress, 256, 3)
|
|
|
|
t.Run("AddDeleteAdd_Sequence", func(t *testing.T) {
|
|
// Simulate concurrent operations that could arrive in different orders
|
|
|
|
// Add
|
|
add1 := createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
err := store.AddHubInboxAssociation(add1)
|
|
require.NoError(t, err)
|
|
|
|
// Delete
|
|
delete1 := createTestHubDeleteMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
err = store.DeleteHubInboxAssociation(delete1)
|
|
require.NoError(t, err)
|
|
|
|
// Add again (should not revert the delete in 2P-Set)
|
|
add2 := createTestHubAddMessage(hubAddress, inboxPubKey, hubPubKey)
|
|
err = store.AddHubInboxAssociation(add2)
|
|
require.NoError(t, err)
|
|
|
|
// Verify final state: delete wins in 2P-Set
|
|
response, err := store.GetHubAssociations([3]byte(filter), hubAddress)
|
|
require.NoError(t, err)
|
|
assert.Len(t, response.Adds, 0) // No effective adds
|
|
assert.Len(t, response.Deletes, 1) // One delete
|
|
})
|
|
|
|
t.Run("MultipleAdds_SameAssociation", func(t *testing.T) {
|
|
// Different hub for this test
|
|
hubAddress2 := bytes.Repeat([]byte{0xEE}, 32)
|
|
filter2 := up2p.GetBloomFilterIndices(hubAddress2, 256, 3)
|
|
|
|
// Add the same association multiple times
|
|
for i := 0; i < 5; i++ {
|
|
add := createTestHubAddMessage(hubAddress2, inboxPubKey, hubPubKey)
|
|
err := store.AddHubInboxAssociation(add)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Should still have only one effective association
|
|
response, err := store.GetHubAssociations([3]byte(filter2), hubAddress2)
|
|
require.NoError(t, err)
|
|
assert.Len(t, response.Adds, 1)
|
|
assert.Len(t, response.Deletes, 0)
|
|
|
|
// History should remain idempotent
|
|
adds, err := store.GetHubAddHistory([3]byte(filter2), hubAddress2)
|
|
require.NoError(t, err)
|
|
assert.True(t, len(adds) == 1)
|
|
})
|
|
}
|
|
|
|
func BenchmarkCRDTOperations(b *testing.B) {
|
|
logger, _ := zap.NewDevelopment()
|
|
tempDB := NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".test/store"}, 0)
|
|
store := NewPebbleInboxStore(tempDB, logger)
|
|
|
|
address := bytes.Repeat([]byte{0xAA}, 32)
|
|
|
|
b.Run("AddMessage", func(b *testing.B) {
|
|
for i := 0; i < b.N; i++ {
|
|
msg := createTestMessage(address, uint64(i), "Benchmark message")
|
|
err := store.AddMessage(msg)
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
}
|
|
})
|
|
|
|
b.Run("GetMessagesByFilter", func(b *testing.B) {
|
|
// Pre-populate with some messages
|
|
for i := 0; i < 1000; i++ {
|
|
msg := createTestMessage(address, uint64(i), "Benchmark message")
|
|
_ = store.AddMessage(msg)
|
|
}
|
|
|
|
filter := up2p.GetBloomFilterIndices(address, 256, 3)
|
|
b.ResetTimer()
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := store.GetMessagesByFilter([3]byte(filter))
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
}
|
|
})
|
|
|
|
b.Run("AddHubInboxAssociation", func(b *testing.B) {
|
|
hubAddress := bytes.Repeat([]byte{0xBB}, 32)
|
|
hubPubKey := bytes.Repeat([]byte{0xAA}, 57)
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
inboxPubKey := sha256.Sum256([]byte("inbox" + string(rune(i))))
|
|
add := createTestHubAddMessage(hubAddress, inboxPubKey[:], hubPubKey)
|
|
err := store.AddHubInboxAssociation(add)
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
}
|
|
})
|
|
|
|
b.Run("GetHubAssociations", func(b *testing.B) {
|
|
hubAddress := bytes.Repeat([]byte{0xCC}, 32)
|
|
hubPubKey := bytes.Repeat([]byte{0xBB}, 57)
|
|
filter := up2p.GetBloomFilterIndices(hubAddress, 256, 3)
|
|
|
|
// Pre-populate with some associations
|
|
for i := 0; i < 100; i++ {
|
|
inboxPubKey := sha256.Sum256([]byte("inbox" + string(rune(i))))
|
|
add := createTestHubAddMessage(hubAddress, inboxPubKey[:], hubPubKey)
|
|
_ = store.AddHubInboxAssociation(add)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := store.GetHubAssociations([3]byte(filter), hubAddress)
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
}
|
|
})
|
|
}
|