mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-22 02:47:26 +08:00
* v2.1.0.2 * restore tweaks to simlibp2p * fix: nil ref on size calc * fix: panic should induce shutdown from event_distributor * fix: friendlier initialization that requires less manual kickstarting for test/devnets * fix: fewer available shards than provers should choose shard length * fix: update stored worker registry, improve logging for debug mode * fix: shut the fuck up, peer log * qol: log value should be snake cased * fix:non-archive snap sync issues * fix: separate X448/Decaf448 signed keys, add onion key to registry * fix: overflow arithmetic on frame number comparison * fix: worker registration should be idempotent if inputs are same, otherwise permit updated records * fix: remove global prover state from size calculation * fix: divide by zero case * fix: eager prover * fix: broadcast listener default * qol: diagnostic data for peer authenticator * fix: master/worker connectivity issue in sparse networks tight coupling of peer and workers can sometimes interfere if mesh is sparse, so give workers a pseudoidentity but publish messages with the proper peer key * fix: reorder steps of join creation * fix: join verify frame source + ensure domain is properly padded (unnecessary but good for consistency) * fix: add delegate to protobuf <-> reified join conversion * fix: preempt prover from planning with no workers * fix: use the unallocated workers to generate a proof * qol: underflow causes join fail in first ten frames on test/devnets * qol: small logging tweaks for easier log correlation in debug mode * qol: use fisher-yates shuffle to ensure prover allocations are evenly distributed when scores are equal * qol: separate decisional logic on post-enrollment confirmation into consensus engine, proposer, and worker manager where relevant, refactor out scoring * reuse shard descriptors for both join planning and confirm/reject decisions * fix: add missing interface method and amend test blossomsub to use new peer id basis * fix: only check allocations if they exist * fix: pomw mint proof data needs to be hierarchically under global intrinsic domain * staging temporary state under diagnostics * fix: first phase of distributed lock refactoring * fix: compute intrinsic locking * fix: hypergraph intrinsic locking * fix: token intrinsic locking * fix: update execution engines to support new locking model * fix: adjust tests with new execution shape * fix: weave in lock/unlock semantics to liveness provider * fix lock fallthrough, add missing allocation update * qol: additional logging for diagnostics, also testnet/devnet handling for confirmations * fix: establish grace period on halt scenario to permit recovery * fix: support test/devnet defaults for coverage scenarios * fix: nil ref on consensus halts for non-archive nodes * fix: remove unnecessary prefix from prover ref * add test coverage for fork choice behaviors and replay – once passing, blocker (2) is resolved * fix: no fork replay on repeat for non-archive nodes, snap now behaves correctly * rollup of pre-liveness check lock interactions * ahead of tests, get the protobuf/metrics-related changes out so teams can prepare * add test coverage for distributed lock behaviors – once passing, blocker (3) is resolved * fix: blocker (3) * Dev docs improvements (#445) * Make install deps script more robust * Improve testing instructions * Worker node should stop upon OS SIGINT/SIGTERM signal (#447) * move pebble close to Stop() * move deferred Stop() to Start() * add core id to worker stop log message * create done os signal channel and stop worker upon message to it --------- Co-authored-by: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com> --------- Co-authored-by: Daz <daz_the_corgi@proton.me> Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
623 lines
17 KiB
Go
623 lines
17 KiB
Go
package store
|
||
|
||
import (
|
||
"bytes"
|
||
"crypto/sha256"
|
||
"encoding/binary"
|
||
"slices"
|
||
"sort"
|
||
|
||
"github.com/pkg/errors"
|
||
"go.uber.org/zap"
|
||
"google.golang.org/protobuf/proto"
|
||
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
||
"source.quilibrium.com/quilibrium/monorepo/types/store"
|
||
up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
|
||
)
|
||
|
||
var _ store.InboxStore = (*PebbleInboxStore)(nil)
|
||
|
||
type PebbleInboxStore struct {
|
||
db store.KVDB
|
||
logger *zap.Logger
|
||
}
|
||
|
||
func NewPebbleInboxStore(
|
||
db store.KVDB,
|
||
logger *zap.Logger,
|
||
) *PebbleInboxStore {
|
||
return &PebbleInboxStore{
|
||
db: db,
|
||
logger: logger,
|
||
}
|
||
}
|
||
|
||
// Key construction functions for CRDT operations
|
||
|
||
// messageKey constructs key for message data in grow-only set:
|
||
// [INBOX][INBOX_MESSAGE] + filter + timestamp + address_hash + message_hash
|
||
func messageKey(msg *protobufs.InboxMessage) []byte {
|
||
filter := up2p.GetBloomFilterIndices(msg.Address, 256, 3)
|
||
slices.Sort(filter)
|
||
msgHash := sha256.Sum256(msg.Message)
|
||
addressHash := sha256.Sum256(msg.Address)
|
||
|
||
key := []byte{INBOX, INBOX_MESSAGE}
|
||
key = append(key, filter...)
|
||
key = binary.BigEndian.AppendUint64(key, msg.Timestamp)
|
||
key = append(key, addressHash[:]...)
|
||
key = append(key, msgHash[:]...)
|
||
return key
|
||
}
|
||
|
||
// messagesByFilterPrefix constructs prefix for ranging messages by filter
|
||
func messagesByFilterPrefix(filter []byte) []byte {
|
||
key := []byte{INBOX, INBOX_MESSAGE}
|
||
sorted := slices.Clone(filter)
|
||
slices.Sort(sorted)
|
||
|
||
key = append(key, sorted...)
|
||
return key
|
||
}
|
||
|
||
// hubAddKey constructs key for hub add operations (2P-Set adds):
|
||
// [INBOX][INBOX_HUB_ADDS] + filter + hub_address_hash + hub_public_key +
|
||
// inbox_public_key
|
||
func hubAddKey(add *protobufs.HubAddInboxMessage) []byte {
|
||
filter := up2p.GetBloomFilterIndices(add.Address, 256, 3)
|
||
|
||
sorted := slices.Clone(filter)
|
||
slices.Sort(sorted)
|
||
key := []byte{INBOX, INBOX_HUB_ADDS}
|
||
addressHash := sha256.Sum256(add.Address)
|
||
|
||
key = append(key, sorted...)
|
||
key = append(key, addressHash[:]...)
|
||
key = append(key, add.HubPublicKey...)
|
||
key = append(key, add.InboxPublicKey...)
|
||
return key
|
||
}
|
||
|
||
// hubDeleteKey constructs key for hub delete operations (2P-Set deletes):
|
||
// [INBOX][INBOX_HUB_DELETES] + filter + hub_address_hash + hub_public_key +
|
||
// inbox_public_key
|
||
func hubDeleteKey(delete *protobufs.HubDeleteInboxMessage) []byte {
|
||
filter := up2p.GetBloomFilterIndices(delete.Address, 256, 3)
|
||
|
||
sorted := slices.Clone(filter)
|
||
slices.Sort(sorted)
|
||
key := []byte{INBOX, INBOX_HUB_DELETES}
|
||
addressHash := sha256.Sum256(delete.Address)
|
||
key = append(key, sorted...)
|
||
key = append(key, addressHash[:]...)
|
||
key = append(key, delete.HubPublicKey...)
|
||
key = append(key, delete.InboxPublicKey...)
|
||
return key
|
||
}
|
||
|
||
// hubAddsPrefix for ranging all add operations for a hub
|
||
// [INBOX][INBOX_HUB_ADDS] + filter + hub_address_hash
|
||
func hubAddsPrefix(filter []byte, hubAddress []byte) []byte {
|
||
key := []byte{INBOX, INBOX_HUB_ADDS}
|
||
|
||
sorted := slices.Clone(filter)
|
||
slices.Sort(sorted)
|
||
addrHash := sha256.Sum256(hubAddress)
|
||
key = append(key, sorted...)
|
||
key = append(key, addrHash[:]...)
|
||
return key
|
||
}
|
||
|
||
// hubDeletesPrefix for ranging all delete operations for a hub
|
||
// [INBOX][INBOX_HUB_DELETES] + filter + hub_address_hash
|
||
func hubDeletesPrefix(filter []byte, hubAddress []byte) []byte {
|
||
key := []byte{INBOX, INBOX_HUB_DELETES}
|
||
|
||
sorted := slices.Clone(filter)
|
||
slices.Sort(sorted)
|
||
addrHash := sha256.Sum256(hubAddress)
|
||
key = append(key, sorted...)
|
||
key = append(key, addrHash[:]...)
|
||
return key
|
||
}
|
||
|
||
// hubMaterializedKey constructs key for materialized hub state:
|
||
// [INBOX][INBOX_HUB_BY_ADDR] + filter + hub_address
|
||
func hubMaterializedKey(filter []byte, hubAddress []byte) []byte {
|
||
key := []byte{INBOX, INBOX_HUB_BY_ADDR}
|
||
|
||
sorted := slices.Clone(filter)
|
||
slices.Sort(sorted)
|
||
key = append(key, sorted...)
|
||
key = append(key, hubAddress...)
|
||
return key
|
||
}
|
||
|
||
// AddMessage adds a message to the grow-only message set
|
||
func (p *PebbleInboxStore) AddMessage(msg *protobufs.InboxMessage) error {
|
||
if msg == nil {
|
||
return errors.New("message is nil")
|
||
}
|
||
|
||
key := messageKey(msg)
|
||
value, err := msg.ToCanonicalBytes()
|
||
if err != nil {
|
||
return errors.Wrap(err, "serialize message")
|
||
}
|
||
|
||
if err := p.db.Set(key, value); err != nil {
|
||
return errors.Wrap(err, "store message")
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetMessagesByFilter returns all messages for a filter
|
||
func (p *PebbleInboxStore) GetMessagesByFilter(filter [3]byte) (
|
||
[]*protobufs.InboxMessage,
|
||
error,
|
||
) {
|
||
prefix := messagesByFilterPrefix(filter[:])
|
||
return p.scanMessages(prefix)
|
||
}
|
||
|
||
// GetMessagesByAddress returns all messages for a specific address within a
|
||
// filter
|
||
func (p *PebbleInboxStore) GetMessagesByAddress(
|
||
filter [3]byte,
|
||
address []byte,
|
||
) ([]*protobufs.InboxMessage, error) {
|
||
// Get all messages for filter and filter by address
|
||
messages, err := p.GetMessagesByFilter(filter)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var filtered []*protobufs.InboxMessage
|
||
for _, msg := range messages {
|
||
if bytes.Equal(msg.Address, address) {
|
||
filtered = append(filtered, msg)
|
||
}
|
||
}
|
||
|
||
return filtered, nil
|
||
}
|
||
|
||
// GetMessagesByTimeRange returns messages within a timestamp range
|
||
func (p *PebbleInboxStore) GetMessagesByTimeRange(
|
||
filter [3]byte,
|
||
address []byte,
|
||
fromTimestamp, toTimestamp uint64,
|
||
) ([]*protobufs.InboxMessage, error) {
|
||
messages, err := p.GetMessagesByAddress(filter, address)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var filtered []*protobufs.InboxMessage
|
||
for _, msg := range messages {
|
||
if msg.Timestamp >= fromTimestamp &&
|
||
(toTimestamp == 0 || msg.Timestamp <= toTimestamp) {
|
||
filtered = append(filtered, msg)
|
||
}
|
||
}
|
||
|
||
return filtered, nil
|
||
}
|
||
|
||
// ReapMessages removes messages older than the specified timestamp (age-based
|
||
// truncation)
|
||
func (p *PebbleInboxStore) ReapMessages(
|
||
filter [3]byte,
|
||
cutoffTimestamp uint64,
|
||
) error {
|
||
prefix := messagesByFilterPrefix(filter[:])
|
||
upper := nextPrefix(prefix)
|
||
iter, err := p.db.NewIter(prefix, upper)
|
||
if err != nil {
|
||
return errors.Wrap(err, "create iterator")
|
||
}
|
||
defer iter.Close()
|
||
|
||
batch := p.db.NewBatch(false)
|
||
defer batch.Abort()
|
||
|
||
for iter.First(); iter.Valid(); iter.Next() {
|
||
key := iter.Key()
|
||
|
||
// Extract timestamp from key:
|
||
// [INBOX][INBOX_MESSAGE][filter][timestamp]...
|
||
if len(key) < 1+1+3+8 {
|
||
continue
|
||
}
|
||
|
||
timestampBytes := key[1+1+3 : 1+1+3+8]
|
||
timestamp := binary.BigEndian.Uint64(timestampBytes)
|
||
|
||
if timestamp < cutoffTimestamp {
|
||
if err := batch.Delete(key); err != nil {
|
||
return errors.Wrap(err, "delete old message")
|
||
}
|
||
}
|
||
}
|
||
|
||
return batch.Commit()
|
||
}
|
||
|
||
// AddHubInboxAssociation adds an association to the 2P-Set (never deleted)
|
||
func (p *PebbleInboxStore) AddHubInboxAssociation(
|
||
add *protobufs.HubAddInboxMessage,
|
||
) error {
|
||
if add == nil {
|
||
return errors.New("add message is nil")
|
||
}
|
||
|
||
key := hubAddKey(add)
|
||
value, err := add.ToCanonicalBytes()
|
||
if err != nil {
|
||
return errors.Wrap(err, "serialize add message")
|
||
}
|
||
|
||
batch := p.db.NewBatch(false)
|
||
defer batch.Abort()
|
||
|
||
// Store the add operation (never deleted)
|
||
if err := batch.Set(key, value); err != nil {
|
||
return errors.Wrap(err, "store add operation")
|
||
}
|
||
|
||
// Update materialized view
|
||
if err := p.updateMaterializedHub(batch, add.Address, add, nil); err != nil {
|
||
return errors.Wrap(err, "update materialized view")
|
||
}
|
||
|
||
return batch.Commit()
|
||
}
|
||
|
||
// DeleteHubInboxAssociation marks an association as deleted in the 2P-Set
|
||
func (p *PebbleInboxStore) DeleteHubInboxAssociation(
|
||
delete *protobufs.HubDeleteInboxMessage,
|
||
) error {
|
||
if delete == nil {
|
||
return errors.New("delete message is nil")
|
||
}
|
||
|
||
key := hubDeleteKey(delete)
|
||
value, err := delete.ToCanonicalBytes()
|
||
if err != nil {
|
||
return errors.Wrap(err, "serialize delete message")
|
||
}
|
||
|
||
batch := p.db.NewBatch(false)
|
||
defer batch.Abort()
|
||
|
||
// Store the delete operation (never deleted)
|
||
if err := batch.Set(key, value); err != nil {
|
||
return errors.Wrap(err, "store delete operation")
|
||
}
|
||
|
||
// Update materialized view
|
||
if err := p.updateMaterializedHub(
|
||
batch,
|
||
delete.Address,
|
||
nil,
|
||
delete,
|
||
); err != nil {
|
||
return errors.Wrap(err, "update materialized view")
|
||
}
|
||
|
||
return batch.Commit()
|
||
}
|
||
|
||
// GetHubAssociations returns the current effective associations for a hub (adds
|
||
// minus deletes in the 2P-Set)
|
||
func (p *PebbleInboxStore) GetHubAssociations(
|
||
filter [3]byte,
|
||
hubAddress []byte,
|
||
) (*protobufs.HubResponse, error) {
|
||
// Try materialized view first
|
||
materializedKey := hubMaterializedKey(filter[:], hubAddress)
|
||
if value, closer, err := p.db.Get(materializedKey); err == nil {
|
||
defer closer.Close()
|
||
|
||
response := &protobufs.HubResponse{}
|
||
if err := proto.Unmarshal(value, response); err == nil {
|
||
return response, nil
|
||
}
|
||
}
|
||
|
||
// Fallback to computing from CRDT operations
|
||
return p.computeHubAssociations(filter, hubAddress)
|
||
}
|
||
|
||
// computeHubAssociations computes current associations from CRDT operations
|
||
func (p *PebbleInboxStore) computeHubAssociations(
|
||
filter [3]byte,
|
||
hubAddress []byte,
|
||
) (*protobufs.HubResponse, error) {
|
||
// Get all add operations
|
||
adds, err := p.GetHubAddHistory(filter, hubAddress)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// Get all delete operations
|
||
deletes, err := p.GetHubDeleteHistory(filter, hubAddress)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// Create a map to track effective associations
|
||
associations := make(map[string]*protobufs.HubAddInboxMessage)
|
||
deletions := make(map[string]bool)
|
||
|
||
// Process all adds
|
||
for _, add := range adds {
|
||
key := string(add.InboxPublicKey) + string(add.HubPublicKey)
|
||
associations[key] = add
|
||
}
|
||
|
||
// Process all deletes
|
||
for _, delete := range deletes {
|
||
key := string(delete.InboxPublicKey) + string(delete.HubPublicKey)
|
||
deletions[key] = true
|
||
}
|
||
|
||
// Compute effective adds (adds that haven't been deleted)
|
||
var effectiveAdds []*protobufs.HubAddInboxMessage
|
||
var effectiveDeletes []*protobufs.HubDeleteInboxMessage
|
||
|
||
for key, add := range associations {
|
||
if deletions[key] {
|
||
// Find the corresponding delete
|
||
for _, delete := range deletes {
|
||
deleteKey := string(delete.InboxPublicKey) + string(delete.HubPublicKey)
|
||
if deleteKey == key {
|
||
effectiveDeletes = append(effectiveDeletes, delete)
|
||
break
|
||
}
|
||
}
|
||
} else {
|
||
effectiveAdds = append(effectiveAdds, add)
|
||
}
|
||
}
|
||
|
||
return &protobufs.HubResponse{
|
||
Adds: effectiveAdds,
|
||
Deletes: effectiveDeletes,
|
||
}, nil
|
||
}
|
||
|
||
// updateMaterializedHub updates the materialized view for a hub
|
||
// IMPORTANT: This may be called while a batch has uncommitted writes. Since
|
||
// DB iterators won't see those yet, we include the pending op (if provided)
|
||
// in-memory so the materialized view reflects the state as-of this batch.
|
||
func (p *PebbleInboxStore) updateMaterializedHub(
|
||
batch store.Transaction,
|
||
hubAddress []byte,
|
||
pendingAdd *protobufs.HubAddInboxMessage,
|
||
pendingDelete *protobufs.HubDeleteInboxMessage,
|
||
) error {
|
||
filter := up2p.GetBloomFilterIndices(hubAddress, 256, 3)
|
||
// Build from history (committed) and then layer on the pending op.
|
||
adds, err := p.GetHubAddHistory([3]byte(filter), hubAddress)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
deletes, err := p.GetHubDeleteHistory([3]byte(filter), hubAddress)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// Apply the pending op (read-your-writes for the batch)
|
||
if pendingAdd != nil {
|
||
adds = append(adds, pendingAdd)
|
||
}
|
||
if pendingDelete != nil {
|
||
deletes = append(deletes, pendingDelete)
|
||
}
|
||
|
||
// Compute effective state (adds minus deletes) deterministically
|
||
type kd string
|
||
keyAdd := func(a *protobufs.HubAddInboxMessage) kd {
|
||
return kd(string(a.InboxPublicKey) + string(a.HubPublicKey))
|
||
}
|
||
keyDel := func(d *protobufs.HubDeleteInboxMessage) kd {
|
||
return kd(string(d.InboxPublicKey) + string(d.HubPublicKey))
|
||
}
|
||
|
||
addMap := make(map[kd]*protobufs.HubAddInboxMessage, len(adds))
|
||
delSet := make(map[kd]*protobufs.HubDeleteInboxMessage, len(deletes))
|
||
|
||
for _, a := range adds {
|
||
addMap[keyAdd(a)] = a
|
||
}
|
||
for _, d := range deletes {
|
||
delSet[keyDel(d)] = d
|
||
}
|
||
|
||
var effAdds []*protobufs.HubAddInboxMessage
|
||
var effDels []*protobufs.HubDeleteInboxMessage
|
||
for k, a := range addMap {
|
||
if del, ok := delSet[k]; ok {
|
||
effDels = append(effDels, del)
|
||
} else {
|
||
effAdds = append(effAdds, a)
|
||
}
|
||
}
|
||
|
||
response := &protobufs.HubResponse{Adds: effAdds, Deletes: effDels}
|
||
|
||
materializedKey := hubMaterializedKey(filter, hubAddress)
|
||
value, err := proto.Marshal(response)
|
||
if err != nil {
|
||
return errors.Wrap(err, "marshal hub response")
|
||
}
|
||
|
||
return batch.Set(materializedKey, value)
|
||
}
|
||
|
||
// GetAllHubAssociations returns all hub associations for the given filters
|
||
func (p *PebbleInboxStore) GetAllHubAssociations(filters [][3]byte) (
|
||
[]*protobufs.HubResponse,
|
||
error,
|
||
) {
|
||
var allResponses []*protobufs.HubResponse
|
||
|
||
for _, filter := range filters {
|
||
prefix := []byte{INBOX, INBOX_HUB_BY_ADDR}
|
||
prefix = append(prefix, filter[:]...)
|
||
upper := nextPrefix(prefix)
|
||
iter, err := p.db.NewIter(prefix, upper)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "create materialized iterator")
|
||
}
|
||
|
||
for iter.First(); iter.Valid(); iter.Next() {
|
||
value := iter.Value()
|
||
response := &protobufs.HubResponse{}
|
||
if err := proto.Unmarshal(value, response); err != nil {
|
||
p.logger.Warn("failed to deserialize hub response", zap.Error(err))
|
||
continue
|
||
}
|
||
allResponses = append(allResponses, response)
|
||
}
|
||
iter.Close()
|
||
}
|
||
|
||
return allResponses, nil
|
||
}
|
||
|
||
// GetHubAddHistory returns all add operations for CRDT synchronization
|
||
func (p *PebbleInboxStore) GetHubAddHistory(
|
||
filter [3]byte,
|
||
hubAddress []byte,
|
||
) ([]*protobufs.HubAddInboxMessage, error) {
|
||
prefix := hubAddsPrefix(filter[:], hubAddress)
|
||
upper := nextPrefix(prefix)
|
||
iter, err := p.db.NewIter(prefix, upper)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "create add iterator")
|
||
}
|
||
defer iter.Close()
|
||
|
||
var adds []*protobufs.HubAddInboxMessage
|
||
for iter.First(); iter.Valid(); iter.Next() {
|
||
value := iter.Value()
|
||
add := &protobufs.HubAddInboxMessage{}
|
||
if err := add.FromCanonicalBytes(value); err != nil {
|
||
return nil, errors.Wrap(err, "deserialize add message")
|
||
}
|
||
adds = append(adds, add)
|
||
}
|
||
|
||
return adds, nil
|
||
}
|
||
|
||
// GetHubDeleteHistory returns all delete operations for CRDT synchronization
|
||
func (p *PebbleInboxStore) GetHubDeleteHistory(
|
||
filter [3]byte,
|
||
hubAddress []byte,
|
||
) ([]*protobufs.HubDeleteInboxMessage, error) {
|
||
prefix := hubDeletesPrefix(filter[:], hubAddress)
|
||
upper := nextPrefix(prefix)
|
||
iter, err := p.db.NewIter(prefix, upper)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "create delete iterator")
|
||
}
|
||
defer iter.Close()
|
||
|
||
var deletes []*protobufs.HubDeleteInboxMessage
|
||
for iter.First(); iter.Valid(); iter.Next() {
|
||
value := iter.Value()
|
||
delete := &protobufs.HubDeleteInboxMessage{}
|
||
if err := delete.FromCanonicalBytes(value); err != nil {
|
||
return nil, errors.Wrap(err, "deserialize delete message")
|
||
}
|
||
deletes = append(deletes, delete)
|
||
}
|
||
|
||
return deletes, nil
|
||
}
|
||
|
||
// GetAllMessagesCRDT returns all messages for CRDT synchronization
|
||
func (p *PebbleInboxStore) GetAllMessagesCRDT(filters [][3]byte) (
|
||
[]*protobufs.InboxMessage,
|
||
error,
|
||
) {
|
||
var allMessages []*protobufs.InboxMessage
|
||
|
||
for _, filter := range filters {
|
||
messages, err := p.GetMessagesByFilter(filter)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
allMessages = append(allMessages, messages...)
|
||
}
|
||
|
||
return allMessages, nil
|
||
}
|
||
|
||
// GetAllHubsCRDT returns all hub CRDT data for synchronization
|
||
func (p *PebbleInboxStore) GetAllHubsCRDT(filters [][3]byte) (
|
||
[]*protobufs.HubResponse,
|
||
error,
|
||
) {
|
||
return p.GetAllHubAssociations(filters)
|
||
}
|
||
|
||
// scanMessages scans messages with the given prefix
|
||
func (p *PebbleInboxStore) scanMessages(prefix []byte) (
|
||
[]*protobufs.InboxMessage,
|
||
error,
|
||
) {
|
||
upper := nextPrefix(prefix)
|
||
iter, err := p.db.NewIter(prefix, upper)
|
||
if err != nil {
|
||
return nil, errors.Wrap(err, "create iterator")
|
||
}
|
||
defer iter.Close()
|
||
|
||
var messages []*protobufs.InboxMessage
|
||
for iter.First(); iter.Valid(); iter.Next() {
|
||
value := iter.Value()
|
||
msg := &protobufs.InboxMessage{}
|
||
if err := msg.FromCanonicalBytes(value); err != nil {
|
||
return nil, errors.Wrap(err, "deserialize message")
|
||
}
|
||
messages = append(messages, msg)
|
||
}
|
||
|
||
// Sort by timestamp
|
||
sort.Slice(messages, func(i, j int) bool {
|
||
return messages[i].Timestamp < messages[j].Timestamp
|
||
})
|
||
|
||
return messages, nil
|
||
}
|
||
|
||
// nextPrefix returns the smallest []byte that is strictly greater than
|
||
// all keys with the given prefix, suitable for use as an exclusive UpperBound.
|
||
// It treats the prefix as a big-endian integer and increments it by 1,
|
||
// truncating at the incremented byte.
|
||
//
|
||
// Example: []{0x01, 0x10, 0x00} -> []{0x01, 0x10, 0x01}
|
||
//
|
||
// []{0x01, 0x10, 0xFF} -> []{0x01, 0x11}
|
||
//
|
||
// Note: If every byte is 0xFF (extremely unlikely for our prefixes), we fall
|
||
// back to appending 0x00 to avoid returning nil in callers that require a
|
||
// value.
|
||
func nextPrefix(prefix []byte) []byte {
|
||
out := append([]byte(nil), prefix...)
|
||
for i := len(out) - 1; i >= 0; i-- {
|
||
if out[i] != 0xFF {
|
||
out[i]++
|
||
return out[:i+1]
|
||
}
|
||
}
|
||
// Overflow (all 0xFF) – practically unreachable with our prefixes. Return
|
||
// prefix appended with 0x00 to satisfy non-nil upper-bound requirement.
|
||
return append(out, 0x00)
|
||
}
|