package store import ( "bytes" "encoding/binary" "encoding/gob" "math/big" "slices" "github.com/cockroachdb/pebble/v2" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/protobuf/proto" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/store" "source.quilibrium.com/quilibrium/monorepo/types/tries" ) type PebbleClockStore struct { db store.KVDB logger *zap.Logger } var _ store.ClockStore = (*PebbleClockStore)(nil) type PebbleGlobalClockIterator struct { i store.Iterator db *PebbleClockStore } type PebbleGlobalClockCandidateIterator struct { i store.Iterator db *PebbleClockStore } type PebbleClockIterator struct { filter []byte start uint64 end uint64 cur uint64 db *PebbleClockStore } type PebbleStagedShardFrameIterator struct { filter []byte start uint64 end uint64 cur uint64 db *PebbleClockStore } type PebbleGlobalStateIterator struct { i store.Iterator db *PebbleClockStore } type PebbleAppShardStateIterator struct { filter []byte start uint64 end uint64 cur uint64 db *PebbleClockStore } type PebbleQuorumCertificateIterator struct { filter []byte start uint64 end uint64 cur uint64 db *PebbleClockStore } type PebbleTimeoutCertificateIterator struct { filter []byte start uint64 end uint64 cur uint64 db *PebbleClockStore } var _ store.TypedIterator[*protobufs.GlobalFrame] = (*PebbleGlobalClockIterator)(nil) var _ store.TypedIterator[*protobufs.GlobalFrame] = (*PebbleGlobalClockCandidateIterator)(nil) var _ store.TypedIterator[*protobufs.AppShardFrame] = (*PebbleClockIterator)(nil) var _ store.TypedIterator[*protobufs.AppShardFrame] = (*PebbleStagedShardFrameIterator)(nil) var _ store.TypedIterator[*protobufs.GlobalProposal] = (*PebbleGlobalStateIterator)(nil) var _ store.TypedIterator[*protobufs.AppShardProposal] = (*PebbleAppShardStateIterator)(nil) var _ store.TypedIterator[*protobufs.QuorumCertificate] = (*PebbleQuorumCertificateIterator)(nil) var _ store.TypedIterator[*protobufs.TimeoutCertificate] = (*PebbleTimeoutCertificateIterator)(nil) func (p *PebbleGlobalClockIterator) First() bool { return p.i.First() } func (p *PebbleGlobalClockIterator) Next() bool { return p.i.Next() } func (p *PebbleGlobalClockIterator) Valid() bool { return p.i.Valid() } func (p *PebbleGlobalClockIterator) Value() (*protobufs.GlobalFrame, error) { if !p.i.Valid() { return nil, store.ErrNotFound } key := p.i.Key() value := p.i.Value() frameNumber, err := extractFrameNumberFromGlobalFrameKey(key) if err != nil { return nil, errors.Wrap(err, "get global clock frame iterator value") } // Deserialize the GlobalFrameHeader header := &protobufs.GlobalFrameHeader{} if err := proto.Unmarshal(value, header); err != nil { return nil, errors.Wrap(err, "get global clock frame iterator value") } frame := &protobufs.GlobalFrame{ Header: header, } // Retrieve all requests for this frame var requests []*protobufs.MessageBundle requestIndex := uint16(0) for { requestKey := clockGlobalFrameRequestKey(frameNumber, requestIndex) requestData, closer, err := p.db.db.Get(requestKey) if err != nil { if errors.Is(err, pebble.ErrNotFound) { // No more requests break } return nil, errors.Wrap(err, "get global clock frame requests") } defer closer.Close() request := &protobufs.MessageBundle{} if err := proto.Unmarshal(requestData, request); err != nil { return nil, errors.Wrap(err, "get global clock frame requests") } requests = append(requests, request) requestIndex++ } frame.Requests = requests return frame, nil } func (p *PebbleGlobalClockIterator) TruncatedValue() ( *protobufs.GlobalFrame, error, ) { if !p.i.Valid() { return nil, store.ErrNotFound } value := p.i.Value() // Deserialize the GlobalFrameHeader header := &protobufs.GlobalFrameHeader{} if err := proto.Unmarshal(value, header); err != nil { return nil, errors.Wrap(err, "get global clock frame iterator value") } frame := &protobufs.GlobalFrame{ Header: header, } // TruncatedValue doesn't include requests return frame, nil } func (p *PebbleGlobalClockIterator) Close() error { return errors.Wrap(p.i.Close(), "closing global clock iterator") } func (p *PebbleGlobalClockCandidateIterator) First() bool { return p.i.First() } func (p *PebbleGlobalClockCandidateIterator) Next() bool { return p.i.Next() } func (p *PebbleGlobalClockCandidateIterator) Valid() bool { return p.i.Valid() } func (p *PebbleGlobalClockCandidateIterator) Value() ( *protobufs.GlobalFrame, error, ) { if !p.i.Valid() { return nil, store.ErrNotFound } key := p.i.Key() value := p.i.Value() frameNumber, selector, err := extractFrameNumberAndSelectorFromCandidateKey(key) if err != nil { return nil, errors.Wrap(err, "get candidate clock frame iterator value") } header := &protobufs.GlobalFrameHeader{} if err := proto.Unmarshal(value, header); err != nil { return nil, errors.Wrap(err, "get candidate clock frame iterator value") } frame := &protobufs.GlobalFrame{ Header: header, } var requests []*protobufs.MessageBundle requestIndex := uint16(0) for { requestKey := clockGlobalFrameRequestCandidateKey( selector, frameNumber, requestIndex, ) requestData, closer, err := p.db.db.Get(requestKey) if err != nil { if errors.Is(err, pebble.ErrNotFound) { break } return nil, errors.Wrap(err, "get candidate clock frame iterator value") } defer closer.Close() request := &protobufs.MessageBundle{} if err := proto.Unmarshal(requestData, request); err != nil { return nil, errors.Wrap(err, "get candidate clock frame iterator value") } requests = append(requests, request) requestIndex++ } frame.Requests = requests return frame, nil } func (p *PebbleGlobalClockCandidateIterator) TruncatedValue() ( *protobufs.GlobalFrame, error, ) { if !p.i.Valid() { return nil, store.ErrNotFound } value := p.i.Value() header := &protobufs.GlobalFrameHeader{} if err := proto.Unmarshal(value, header); err != nil { return nil, errors.Wrap(err, "get candidate clock frame iterator value") } return &protobufs.GlobalFrame{Header: header}, nil } func (p *PebbleGlobalClockCandidateIterator) Close() error { return errors.Wrap(p.i.Close(), "closing global clock candidate iterator") } func (p *PebbleClockIterator) First() bool { p.cur = p.start return true } func (p *PebbleClockIterator) Next() bool { p.cur++ return p.cur < p.end } func (p *PebbleClockIterator) Prev() bool { p.cur-- return p.cur >= p.start } func (p *PebbleClockIterator) Valid() bool { return p.cur >= p.start && p.cur < p.end } func (p *PebbleClockIterator) TruncatedValue() ( *protobufs.AppShardFrame, error, ) { if !p.Valid() { return nil, store.ErrNotFound } return p.Value() } func (p *PebbleClockIterator) Value() (*protobufs.AppShardFrame, error) { if !p.Valid() { return nil, store.ErrNotFound } frame, _, err := p.db.GetShardClockFrame(p.filter, p.cur, false) if err != nil { return nil, errors.Wrap(err, "get clock frame iterator value") } return frame, nil } func (p *PebbleStagedShardFrameIterator) First() bool { p.cur = p.start return true } func (p *PebbleStagedShardFrameIterator) Next() bool { p.cur++ return p.cur <= p.end } func (p *PebbleStagedShardFrameIterator) Valid() bool { return p.cur >= p.start && p.cur <= p.end } func (p *PebbleStagedShardFrameIterator) Value() (*protobufs.AppShardFrame, error) { if !p.Valid() { return nil, store.ErrNotFound } frames, err := p.db.GetStagedShardClockFramesForFrameNumber(p.filter, p.cur) if err != nil { return nil, errors.Wrap(err, "get staged shard clocks") } if len(frames) == 0 { return nil, store.ErrNotFound } return frames[len(frames)-1], nil } func (p *PebbleStagedShardFrameIterator) TruncatedValue() ( *protobufs.AppShardFrame, error, ) { return p.Value() } func (p *PebbleStagedShardFrameIterator) Close() error { return nil } func (p *PebbleClockIterator) Close() error { return nil } func (p *PebbleGlobalStateIterator) First() bool { return p.i.First() } func (p *PebbleGlobalStateIterator) Next() bool { return p.i.Next() } func (p *PebbleGlobalStateIterator) Prev() bool { return p.i.Prev() } func (p *PebbleGlobalStateIterator) Valid() bool { return p.i.Valid() } func (p *PebbleGlobalStateIterator) Value() ( *protobufs.GlobalProposal, error, ) { if !p.Valid() { return nil, store.ErrNotFound } value := p.i.Value() if len(value) != 24 { return nil, errors.Wrap( store.ErrInvalidData, "get certified global state", ) } frameNumber := binary.BigEndian.Uint64(value[:8]) qcRank := binary.BigEndian.Uint64(value[8:16]) tcRank := binary.BigEndian.Uint64(value[16:]) frame, err := p.db.GetGlobalClockFrame(frameNumber) if err != nil && !errors.Is(err, store.ErrNotFound) { return nil, errors.Wrap(err, "get certified global state") } qc, err := p.db.GetQuorumCertificate(nil, qcRank) if err != nil && !errors.Is(err, store.ErrNotFound) { return nil, errors.Wrap(err, "get certified global state") } tc, err := p.db.GetTimeoutCertificate(nil, tcRank) if err != nil && !errors.Is(err, store.ErrNotFound) { return nil, errors.Wrap(err, "get certified global state") } return &protobufs.GlobalProposal{ State: frame, ParentQuorumCertificate: qc, PriorRankTimeoutCertificate: tc, }, nil } func (p *PebbleGlobalStateIterator) TruncatedValue() ( *protobufs.GlobalProposal, error, ) { return p.Value() } func (p *PebbleGlobalStateIterator) Close() error { return p.i.Close() } func (p *PebbleAppShardStateIterator) First() bool { p.cur = p.start return true } func (p *PebbleAppShardStateIterator) Next() bool { p.cur++ return p.cur < p.end } func (p *PebbleAppShardStateIterator) Prev() bool { p.cur-- return p.cur >= p.start } func (p *PebbleAppShardStateIterator) Valid() bool { return p.cur >= p.start && p.cur < p.end } func (p *PebbleAppShardStateIterator) Close() error { return nil } func (p *PebbleAppShardStateIterator) Value() ( *protobufs.AppShardProposal, error, ) { if !p.Valid() { return nil, store.ErrNotFound } state, err := p.db.GetCertifiedAppShardState(p.filter, p.cur) if err != nil { return nil, errors.Wrap(err, "get app shard state iterator value") } return state, nil } func (p *PebbleAppShardStateIterator) TruncatedValue() ( *protobufs.AppShardProposal, error, ) { if !p.Valid() { return nil, store.ErrNotFound } return p.Value() } func (p *PebbleQuorumCertificateIterator) First() bool { p.cur = p.start return true } func (p *PebbleQuorumCertificateIterator) Next() bool { p.cur++ return p.cur < p.end } func (p *PebbleQuorumCertificateIterator) Prev() bool { p.cur-- return p.cur >= p.start } func (p *PebbleQuorumCertificateIterator) Valid() bool { return p.cur >= p.start && p.cur < p.end } func (p *PebbleQuorumCertificateIterator) Close() error { return nil } func (p *PebbleQuorumCertificateIterator) Value() ( *protobufs.QuorumCertificate, error, ) { if !p.Valid() { return nil, store.ErrNotFound } qc, err := p.db.GetQuorumCertificate(p.filter, p.cur) if err != nil { return nil, errors.Wrap(err, "get quorum certificate iterator value") } return qc, nil } func (p *PebbleQuorumCertificateIterator) TruncatedValue() ( *protobufs.QuorumCertificate, error, ) { return p.Value() } func (p *PebbleTimeoutCertificateIterator) First() bool { p.cur = p.start return true } func (p *PebbleTimeoutCertificateIterator) Next() bool { p.cur++ return p.cur < p.end } func (p *PebbleTimeoutCertificateIterator) Prev() bool { p.cur-- return p.cur >= p.start } func (p *PebbleTimeoutCertificateIterator) Valid() bool { return p.cur >= p.start && p.cur < p.end } func (p *PebbleTimeoutCertificateIterator) Close() error { return nil } func (p *PebbleTimeoutCertificateIterator) Value() ( *protobufs.TimeoutCertificate, error, ) { if !p.Valid() { return nil, store.ErrNotFound } tc, err := p.db.GetTimeoutCertificate(p.filter, p.cur) if err != nil { return nil, errors.Wrap(err, "get timeout certificate iterator value") } return tc, nil } func (p *PebbleTimeoutCertificateIterator) TruncatedValue() ( *protobufs.TimeoutCertificate, error, ) { if !p.Valid() { return nil, store.ErrNotFound } return p.Value() } func NewPebbleClockStore(db store.KVDB, logger *zap.Logger) *PebbleClockStore { return &PebbleClockStore{ db, logger, } } func (p *PebbleClockStore) updateEarliestIndex( txn store.Transaction, key []byte, rank uint64, ) error { existing, closer, err := p.db.Get(key) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return txn.Set( key, binary.BigEndian.AppendUint64(nil, rank), ) } return err } defer closer.Close() if len(existing) != 8 { return errors.Wrap( store.ErrInvalidData, "earliest index contained unexpected length", ) } if binary.BigEndian.Uint64(existing) > rank { return txn.Set( key, binary.BigEndian.AppendUint64(nil, rank), ) } return nil } func (p *PebbleClockStore) updateLatestIndex( txn store.Transaction, key []byte, rank uint64, ) error { existing, closer, err := p.db.Get(key) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return txn.Set( key, binary.BigEndian.AppendUint64(nil, rank), ) } return err } defer closer.Close() if len(existing) != 8 { return errors.Wrap( store.ErrInvalidData, "latest index contained unexpected length", ) } if binary.BigEndian.Uint64(existing) < rank { return txn.Set( key, binary.BigEndian.AppendUint64(nil, rank), ) } return nil } func deleteIfExists(txn store.Transaction, key []byte) error { if err := txn.Delete(key); err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil } return err } return nil } // // DB Keys // // Keys are structured as: // [] // Increment necessarily must be full width – elsewise the frame number would // easily produce conflicts if filters are stepped by byte: // 0x01 || 0xffff == 0x01ff || 0xff func clockFrameKey(filter []byte, frameNumber uint64, frameType byte) []byte { key := []byte{CLOCK_FRAME, frameType} key = binary.BigEndian.AppendUint64(key, frameNumber) key = append(key, filter...) return key } func clockGlobalFrameKey(frameNumber uint64) []byte { return clockFrameKey([]byte{}, frameNumber, CLOCK_GLOBAL_FRAME) } func clockGlobalFrameCandidateKey(frameNumber uint64, selector []byte) []byte { key := []byte{CLOCK_FRAME, CLOCK_GLOBAL_FRAME_CANDIDATE} key = binary.BigEndian.AppendUint64(key, frameNumber) key = append(key, selector...) return key } func extractFrameNumberFromGlobalFrameKey( key []byte, ) (uint64, error) { if len(key) < 10 { return 0, errors.Wrap( store.ErrInvalidData, "extract frame number and filter from global frame key", ) } copied := make([]byte, len(key)) copy(copied, key) return binary.BigEndian.Uint64(copied[2:10]), nil } func extractFrameNumberAndSelectorFromCandidateKey( key []byte, ) (uint64, []byte, error) { frameNumber, err := extractFrameNumberFromGlobalFrameKey(key) if err != nil { return 0, nil, err } if len(key) < 10 { return 0, nil, errors.Wrap( store.ErrInvalidData, "extract selector from global frame candidate key", ) } selector := slices.Clone(key[10:]) return frameNumber, selector, nil } func clockShardFrameKey( filter []byte, frameNumber uint64, ) []byte { return clockFrameKey(filter, frameNumber, CLOCK_SHARD_FRAME_SHARD) } func clockLatestIndex(filter []byte, frameType byte) []byte { key := []byte{CLOCK_FRAME, frameType} key = append(key, filter...) return key } func clockGlobalLatestIndex() []byte { return clockLatestIndex([]byte{}, CLOCK_GLOBAL_FRAME_INDEX_LATEST) } func clockShardLatestIndex(filter []byte) []byte { return clockLatestIndex(filter, CLOCK_SHARD_FRAME_INDEX_LATEST) } func clockEarliestIndex(filter []byte, frameType byte) []byte { key := []byte{CLOCK_FRAME, frameType} key = append(key, filter...) return key } func clockGlobalEarliestIndex() []byte { return clockEarliestIndex([]byte{}, CLOCK_GLOBAL_FRAME_INDEX_EARLIEST) } func clockDataEarliestIndex(filter []byte) []byte { return clockEarliestIndex(filter, CLOCK_SHARD_FRAME_INDEX_EARLIEST) } func clockGlobalCertifiedStateEarliestIndex() []byte { return []byte{CLOCK_FRAME, CLOCK_GLOBAL_CERTIFIED_STATE_INDEX_EARLIEST} } func clockShardCertifiedStateEarliestIndex(filter []byte) []byte { return slices.Concat( []byte{CLOCK_FRAME, CLOCK_SHARD_CERTIFIED_STATE_INDEX_EARLIEST}, filter, ) } func clockGlobalCertifiedStateLatestIndex() []byte { return []byte{CLOCK_FRAME, CLOCK_GLOBAL_CERTIFIED_STATE_INDEX_LATEST} } func clockShardCertifiedStateLatestIndex(filter []byte) []byte { return slices.Concat( []byte{CLOCK_FRAME, CLOCK_SHARD_CERTIFIED_STATE_INDEX_LATEST}, filter, ) } func clockGlobalCertifiedStateKey(rank uint64) []byte { key := []byte{CLOCK_FRAME, CLOCK_GLOBAL_CERTIFIED_STATE} key = binary.BigEndian.AppendUint64(key, rank) return key } func clockShardCertifiedStateKey(rank uint64, filter []byte) []byte { key := []byte{CLOCK_FRAME, CLOCK_SHARD_CERTIFIED_STATE} key = binary.BigEndian.AppendUint64(key, rank) key = append(key, filter...) return key } func clockQuorumCertificateKey(rank uint64, filter []byte) []byte { key := []byte{CLOCK_FRAME, CLOCK_QUORUM_CERTIFICATE} key = binary.BigEndian.AppendUint64(key, rank) return key } func clockQuorumCertificateEarliestIndex(filter []byte) []byte { return slices.Concat( []byte{CLOCK_FRAME, CLOCK_QUORUM_CERTIFICATE_INDEX_EARLIEST}, filter, ) } func clockQuorumCertificateLatestIndex(filter []byte) []byte { return slices.Concat( []byte{CLOCK_FRAME, CLOCK_QUORUM_CERTIFICATE_INDEX_LATEST}, filter, ) } func clockTimeoutCertificateKey(rank uint64, filter []byte) []byte { key := []byte{CLOCK_FRAME, CLOCK_TIMEOUT_CERTIFICATE} key = binary.BigEndian.AppendUint64(key, rank) key = append(key, filter...) return key } func clockTimeoutCertificateEarliestIndex(filter []byte) []byte { return slices.Concat( []byte{CLOCK_FRAME, CLOCK_TIMEOUT_CERTIFICATE_INDEX_EARLIEST}, filter, ) } func clockTimeoutCertificateLatestIndex(filter []byte) []byte { return slices.Concat( []byte{CLOCK_FRAME, CLOCK_TIMEOUT_CERTIFICATE_INDEX_LATEST}, filter, ) } // Produces an index key of size: len(filter) + 42 func clockParentIndexKey( filter []byte, frameNumber uint64, selector []byte, frameType byte, ) []byte { key := []byte{CLOCK_FRAME, frameType} key = binary.BigEndian.AppendUint64(key, frameNumber) key = append(key, filter...) key = append(key, rightAlign(selector, 32)...) return key } func clockShardParentIndexKey( address []byte, frameNumber uint64, selector []byte, ) []byte { return clockParentIndexKey( address, frameNumber, rightAlign(selector, 32), CLOCK_SHARD_FRAME_INDEX_PARENT, ) } func clockProverTrieKey(filter []byte, ring uint16, frameNumber uint64) []byte { key := []byte{CLOCK_FRAME, CLOCK_SHARD_FRAME_FRECENCY_SHARD} key = binary.BigEndian.AppendUint16(key, ring) key = binary.BigEndian.AppendUint64(key, frameNumber) key = append(key, filter...) return key } func clockDataTotalDistanceKey( filter []byte, frameNumber uint64, selector []byte, ) []byte { key := []byte{CLOCK_FRAME, CLOCK_SHARD_FRAME_DISTANCE_SHARD} key = binary.BigEndian.AppendUint64(key, frameNumber) key = append(key, filter...) key = append(key, rightAlign(selector, 32)...) return key } func clockDataSeniorityKey( filter []byte, ) []byte { key := []byte{CLOCK_FRAME, CLOCK_SHARD_FRAME_SENIORITY_SHARD} key = append(key, filter...) return key } func clockShardStateTreeKey( filter []byte, ) []byte { key := []byte{CLOCK_FRAME, CLOCK_SHARD_FRAME_STATE_TREE} key = append(key, filter...) return key } func clockGlobalFrameRequestKey( frameNumber uint64, requestIndex uint16, ) []byte { key := []byte{CLOCK_FRAME, CLOCK_GLOBAL_FRAME_REQUEST} key = binary.BigEndian.AppendUint64(key, frameNumber) key = binary.BigEndian.AppendUint16(key, requestIndex) return key } func clockGlobalFrameRequestCandidateKey( selector []byte, frameNumber uint64, requestIndex uint16, ) []byte { key := []byte{CLOCK_FRAME, CLOCK_GLOBAL_FRAME_REQUEST_CANDIDATE} key = append(key, selector...) key = binary.BigEndian.AppendUint64(key, frameNumber) key = binary.BigEndian.AppendUint16(key, requestIndex) return key } func clockProposalVoteKey(rank uint64, filter []byte, identity []byte) []byte { key := []byte{CLOCK_FRAME, CLOCK_PROPOSAL_VOTE} key = binary.BigEndian.AppendUint64(key, rank) key = append(key, filter...) key = append(key, identity...) return key } func clockTimeoutVoteKey(rank uint64, filter []byte, identity []byte) []byte { key := []byte{CLOCK_FRAME, CLOCK_TIMEOUT_VOTE} key = binary.BigEndian.AppendUint64(key, rank) key = append(key, filter...) key = append(key, identity...) return key } func (p *PebbleClockStore) NewTransaction(indexed bool) ( store.Transaction, error, ) { return p.db.NewBatch(indexed), nil } // GetEarliestGlobalClockFrame implements ClockStore. func (p *PebbleClockStore) GetEarliestGlobalClockFrame() ( *protobufs.GlobalFrame, error, ) { idxValue, closer, err := p.db.Get(clockGlobalEarliestIndex()) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get earliest global clock frame") } defer closer.Close() frameNumber := binary.BigEndian.Uint64(idxValue) frame, err := p.GetGlobalClockFrame(frameNumber) if err != nil { return nil, errors.Wrap(err, "get earliest global clock frame") } return frame, nil } // GetLatestGlobalClockFrame implements ClockStore. func (p *PebbleClockStore) GetLatestGlobalClockFrame() ( *protobufs.GlobalFrame, error, ) { idxValue, closer, err := p.db.Get(clockGlobalLatestIndex()) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get latest global clock frame") } defer closer.Close() frameNumber := binary.BigEndian.Uint64(idxValue) frame, err := p.GetGlobalClockFrame(frameNumber) if err != nil { return nil, errors.Wrap(err, "get latest global clock frame") } return frame, nil } // GetGlobalClockFrame implements ClockStore. func (p *PebbleClockStore) GetGlobalClockFrame( frameNumber uint64, ) (*protobufs.GlobalFrame, error) { value, closer, err := p.db.Get(clockGlobalFrameKey(frameNumber)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get global clock frame") } defer closer.Close() // Deserialize the GlobalFrameHeader header := &protobufs.GlobalFrameHeader{} if err := proto.Unmarshal(value, header); err != nil { return nil, errors.Wrap(err, "get global clock frame") } frame := &protobufs.GlobalFrame{ Header: header, } // Retrieve all requests for this frame var requests []*protobufs.MessageBundle requestIndex := uint16(0) for { requestKey := clockGlobalFrameRequestKey(frameNumber, requestIndex) requestData, closer, err := p.db.Get(requestKey) if err != nil { if errors.Is(err, pebble.ErrNotFound) { // No more requests break } return nil, errors.Wrap(err, "get global clock frame") } defer closer.Close() request := &protobufs.MessageBundle{} if err := proto.Unmarshal(requestData, request); err != nil { return nil, errors.Wrap(err, "get global clock frame") } requests = append(requests, request) requestIndex++ } frame.Requests = requests return frame, nil } // RangeGlobalClockFrames implements ClockStore. func (p *PebbleClockStore) RangeGlobalClockFrames( startFrameNumber uint64, endFrameNumber uint64, ) (store.TypedIterator[*protobufs.GlobalFrame], error) { if startFrameNumber > endFrameNumber { temp := endFrameNumber endFrameNumber = startFrameNumber startFrameNumber = temp } iter, err := p.db.NewIter( clockGlobalFrameKey(startFrameNumber), clockGlobalFrameKey(endFrameNumber+1), ) if err != nil { return nil, errors.Wrap(err, "range global clock frames") } return &PebbleGlobalClockIterator{i: iter, db: p}, nil } // RangeGlobalClockFrameCandidates implements ClockStore. func (p *PebbleClockStore) RangeGlobalClockFrameCandidates( startFrameNumber uint64, endFrameNumber uint64, ) (store.TypedIterator[*protobufs.GlobalFrame], error) { if startFrameNumber > endFrameNumber { temp := endFrameNumber endFrameNumber = startFrameNumber startFrameNumber = temp } iter, err := p.db.NewIter( clockGlobalFrameCandidateKey(startFrameNumber, nil), clockGlobalFrameCandidateKey(endFrameNumber+1, nil), ) if err != nil { return nil, errors.Wrap(err, "range global clock frame candidates") } return &PebbleGlobalClockCandidateIterator{i: iter, db: p}, nil } // PutGlobalClockFrame implements ClockStore. func (p *PebbleClockStore) PutGlobalClockFrame( frame *protobufs.GlobalFrame, txn store.Transaction, ) error { if frame.Header == nil { return errors.Wrap( errors.New("frame header is required"), "put global clock frame", ) } frameNumber := frame.Header.FrameNumber // Serialize the full header using protobuf headerData, err := proto.Marshal(frame.Header) if err != nil { return errors.Wrap(err, "put global clock frame") } if err := txn.Set( clockGlobalFrameKey(frameNumber), headerData, ); err != nil { return errors.Wrap(err, "put global clock frame") } // Store requests separately with iterative keys for i, request := range frame.Requests { requestData, err := proto.Marshal(request) if err != nil { return errors.Wrap(err, "put global clock frame request") } if err := txn.Set( clockGlobalFrameRequestKey(frameNumber, uint16(i)), requestData, ); err != nil { return errors.Wrap(err, "put global clock frame request") } } if err = p.updateEarliestIndex( txn, clockGlobalEarliestIndex(), frameNumber, ); err != nil { return errors.Wrap(err, "put global clock frame") } if err = p.updateLatestIndex( txn, clockGlobalLatestIndex(), frameNumber, ); err != nil { return errors.Wrap(err, "put global clock frame") } return nil } // PutGlobalClockFrameCandidate implements ClockStore. func (p *PebbleClockStore) PutGlobalClockFrameCandidate( frame *protobufs.GlobalFrame, txn store.Transaction, ) error { if frame.Header == nil { return errors.Wrap( errors.New("frame header is required"), "put global clock frame candidate", ) } frameNumber := frame.Header.FrameNumber // Serialize the full header using protobuf headerData, err := proto.Marshal(frame.Header) if err != nil { return errors.Wrap(err, "put global clock frame candidate") } if err := txn.Set( clockGlobalFrameCandidateKey(frameNumber, []byte(frame.Identity())), headerData, ); err != nil { return errors.Wrap(err, "put global clock frame candidate") } // Store requests separately with iterative keys for i, request := range frame.Requests { requestData, err := proto.Marshal(request) if err != nil { return errors.Wrap(err, "put global clock frame candidate request") } if err := txn.Set( clockGlobalFrameRequestCandidateKey( []byte(frame.Identity()), frameNumber, uint16(i), ), requestData, ); err != nil { return errors.Wrap(err, "put global clock frame candidate request") } } return nil } // GetGlobalClockFrameCandidate implements ClockStore. func (p *PebbleClockStore) GetGlobalClockFrameCandidate( frameNumber uint64, selector []byte, ) (*protobufs.GlobalFrame, error) { value, closer, err := p.db.Get(clockGlobalFrameCandidateKey( frameNumber, selector, )) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get global clock frame candidate") } defer closer.Close() // Deserialize the GlobalFrameHeader header := &protobufs.GlobalFrameHeader{} if err := proto.Unmarshal(value, header); err != nil { return nil, errors.Wrap(err, "get global clock frame candidate") } frame := &protobufs.GlobalFrame{ Header: header, } // Retrieve all requests for this frame var requests []*protobufs.MessageBundle requestIndex := uint16(0) for { requestKey := clockGlobalFrameRequestCandidateKey( selector, frameNumber, requestIndex, ) requestData, closer, err := p.db.Get(requestKey) if err != nil { if errors.Is(err, pebble.ErrNotFound) { // No more requests break } return nil, errors.Wrap(err, "get global clock frame candidate") } defer closer.Close() request := &protobufs.MessageBundle{} if err := proto.Unmarshal(requestData, request); err != nil { return nil, errors.Wrap(err, "get global clock frame candidate") } requests = append(requests, request) requestIndex++ } frame.Requests = requests return frame, nil } // GetShardClockFrame implements ClockStore. func (p *PebbleClockStore) GetShardClockFrame( filter []byte, frameNumber uint64, truncate bool, ) (*protobufs.AppShardFrame, []*tries.RollingFrecencyCritbitTrie, error) { value, closer, err := p.db.Get(clockShardFrameKey(filter, frameNumber)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, nil, store.ErrNotFound } return nil, nil, errors.Wrap(err, "get shard clock frame") } defer closer.Close() frame := &protobufs.AppShardFrame{} // We do a bit of a cheap trick here while things are still stuck in the old // ways: we use the size of the parent index key to determine if it's the new // format, or the old raw frame if len(value) == (len(filter) + 42) { frameValue, frameCloser, err := p.db.Get(value) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, nil, store.ErrNotFound } return nil, nil, errors.Wrap(err, "get shard clock frame") } defer frameCloser.Close() if err := proto.Unmarshal(frameValue, frame); err != nil { return nil, nil, errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "get shard clock frame", ) } } else { if err := proto.Unmarshal(value, frame); err != nil { return nil, nil, errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "get shard clock frame", ) } } if !truncate { proverTries := []*tries.RollingFrecencyCritbitTrie{} i := uint16(0) for { proverTrie := &tries.RollingFrecencyCritbitTrie{} trieData, closer, err := p.db.Get( clockProverTrieKey(filter, i, frameNumber), ) if err != nil { if !errors.Is(err, pebble.ErrNotFound) { return nil, nil, errors.Wrap(err, "get shard clock frame") } break } defer closer.Close() if err := proverTrie.Deserialize(trieData); err != nil { return nil, nil, errors.Wrap(err, "get shard clock frame") } i++ proverTries = append(proverTries, proverTrie) } return frame, proverTries, nil } return frame, nil, nil } // GetEarliestShardClockFrame implements ClockStore. func (p *PebbleClockStore) GetEarliestShardClockFrame( filter []byte, ) (*protobufs.AppShardFrame, error) { idxValue, closer, err := p.db.Get(clockDataEarliestIndex(filter)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get earliest shard clock frame") } defer closer.Close() frameNumber := binary.BigEndian.Uint64(idxValue) frame, _, err := p.GetShardClockFrame(filter, frameNumber, false) if err != nil { return nil, errors.Wrap(err, "get earliest shard clock frame") } return frame, nil } // GetLatestShardClockFrame implements ClockStore. func (p *PebbleClockStore) GetLatestShardClockFrame( filter []byte, ) (*protobufs.AppShardFrame, []*tries.RollingFrecencyCritbitTrie, error) { idxValue, closer, err := p.db.Get(clockShardLatestIndex(filter)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, nil, store.ErrNotFound } return nil, nil, errors.Wrap(err, "get latest shard clock frame") } defer closer.Close() frameNumber := binary.BigEndian.Uint64(idxValue) frame, tries, err := p.GetShardClockFrame(filter, frameNumber, false) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, nil, store.ErrNotFound } return nil, nil, errors.Wrap(err, "get latest shard clock frame") } return frame, tries, nil } // GetStagedShardClockFrame implements ClockStore. func (p *PebbleClockStore) GetStagedShardClockFrame( filter []byte, frameNumber uint64, parentSelector []byte, truncate bool, ) (*protobufs.AppShardFrame, error) { data, closer, err := p.db.Get( clockShardParentIndexKey(filter, frameNumber, parentSelector), ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, errors.Wrap(store.ErrNotFound, "get parent shard clock frame") } return nil, errors.Wrap(err, "get parent shard clock frame") } defer closer.Close() parent := &protobufs.AppShardFrame{} if err := proto.Unmarshal(data, parent); err != nil { return nil, errors.Wrap(err, "get parent shard clock frame") } return parent, nil } func (p *PebbleClockStore) GetStagedShardClockFramesForFrameNumber( filter []byte, frameNumber uint64, ) ([]*protobufs.AppShardFrame, error) { iter, err := p.db.NewIter( clockShardParentIndexKey( filter, frameNumber, bytes.Repeat([]byte{0x00}, 32), ), clockShardParentIndexKey( filter, frameNumber, bytes.Repeat([]byte{0xff}, 32), ), ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, errors.Wrap( store.ErrNotFound, "get staged shard clock frames", ) } return nil, errors.Wrap(err, "get staged shard clock frames") } defer iter.Close() frames := []*protobufs.AppShardFrame{} for iter.First(); iter.Valid(); iter.Next() { data := iter.Value() frame := &protobufs.AppShardFrame{} if err := proto.Unmarshal(data, frame); err != nil { return nil, errors.Wrap(err, "get staged shard clock frames") } frames = append(frames, frame) } return frames, nil } // StageShardClockFrame implements ClockStore. func (p *PebbleClockStore) StageShardClockFrame( selector []byte, frame *protobufs.AppShardFrame, txn store.Transaction, ) error { data, err := proto.Marshal(frame) if err != nil { return errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "stage shard clock frame", ) } if err = txn.Set( clockShardParentIndexKey( frame.Header.Address, frame.Header.FrameNumber, selector, ), data, ); err != nil { return errors.Wrap(err, "stage shard clock frame") } return nil } // CommitShardClockFrame implements ClockStore. func (p *PebbleClockStore) CommitShardClockFrame( filter []byte, frameNumber uint64, selector []byte, proverTries []*tries.RollingFrecencyCritbitTrie, txn store.Transaction, backfill bool, ) error { frameNumberBytes := make([]byte, 8) binary.BigEndian.PutUint64(frameNumberBytes, frameNumber) if err := txn.Set( clockShardFrameKey(filter, frameNumber), clockShardParentIndexKey(filter, frameNumber, selector), ); err != nil { return errors.Wrap(err, "commit shard clock frame") } for i, proverTrie := range proverTries { proverData, err := proverTrie.Serialize() if err != nil { return errors.Wrap(err, "commit shard clock frame") } if err = txn.Set( clockProverTrieKey(filter, uint16(i), frameNumber), proverData, ); err != nil { return errors.Wrap(err, "commit shard clock frame") } } _, closer, err := p.db.Get(clockDataEarliestIndex(filter)) if err != nil { if !errors.Is(err, pebble.ErrNotFound) { return errors.Wrap(err, "commit shard clock frame") } if err = txn.Set( clockDataEarliestIndex(filter), frameNumberBytes, ); err != nil { return errors.Wrap(err, "commit shard clock frame") } } else { _ = closer.Close() } if !backfill { if err = txn.Set( clockShardLatestIndex(filter), frameNumberBytes, ); err != nil { return errors.Wrap(err, "commit shard clock frame") } } return nil } // RangeShardClockFrames implements ClockStore. func (p *PebbleClockStore) RangeShardClockFrames( filter []byte, startFrameNumber uint64, endFrameNumber uint64, ) (store.TypedIterator[*protobufs.AppShardFrame], error) { if startFrameNumber > endFrameNumber { temp := endFrameNumber endFrameNumber = startFrameNumber startFrameNumber = temp } return &PebbleClockIterator{ filter: filter, // buildutils:allow-slice-alias slice is static start: startFrameNumber, end: endFrameNumber + 1, cur: startFrameNumber, db: p, }, nil } func (p *PebbleClockStore) RangeStagedShardClockFrames( filter []byte, startFrameNumber uint64, endFrameNumber uint64, ) (store.TypedIterator[*protobufs.AppShardFrame], error) { if startFrameNumber > endFrameNumber { startFrameNumber, endFrameNumber = endFrameNumber, startFrameNumber } return &PebbleStagedShardFrameIterator{ filter: filter, // buildutils:allow-slice-alias slice is static start: startFrameNumber, end: endFrameNumber, cur: startFrameNumber, db: p, }, nil } func (p *PebbleClockStore) SetLatestShardClockFrameNumber( filter []byte, frameNumber uint64, ) error { err := p.db.Set( clockShardLatestIndex(filter), binary.BigEndian.AppendUint64(nil, frameNumber), ) return errors.Wrap(err, "set latest shard clock frame number") } func (p *PebbleClockStore) DeleteGlobalClockFrameRange( minFrameNumber uint64, maxFrameNumber uint64, ) error { err := p.db.DeleteRange( clockGlobalFrameKey(minFrameNumber), clockGlobalFrameKey(maxFrameNumber), ) return errors.Wrap(err, "delete global clock frame range") } func (p *PebbleClockStore) DeleteQuorumCertificateRange( filter []byte, minRank uint64, maxRank uint64, ) error { err := p.db.DeleteRange( clockQuorumCertificateKey(minRank, filter), clockQuorumCertificateKey(maxRank, filter), ) if err != nil { return errors.Wrap(err, "delete quorum certificate range") } err = p.db.Set( clockQuorumCertificateLatestIndex(nil), binary.BigEndian.AppendUint64(nil, minRank-1), ) return errors.Wrap(err, "delete quorum certificate range") } func (p *PebbleClockStore) DeleteTimeoutCertificateRange( filter []byte, minRank uint64, maxRank uint64, priorLatestRank uint64, ) error { err := p.db.DeleteRange( clockTimeoutCertificateKey(minRank, filter), clockTimeoutCertificateKey(maxRank, filter), ) if err != nil { return errors.Wrap(err, "delete timeout certificate range") } err = p.db.Set( clockTimeoutCertificateLatestIndex(nil), binary.BigEndian.AppendUint64(nil, priorLatestRank), ) return errors.Wrap(err, "delete timeout certificate range") } func (p *PebbleClockStore) DeleteShardClockFrameRange( filter []byte, fromFrameNumber uint64, toFrameNumber uint64, ) error { txn, err := p.NewTransaction(false) if err != nil { return errors.Wrap(err, "delete shard clock frame range") } for i := fromFrameNumber; i < toFrameNumber; i++ { if err := txn.DeleteRange( clockShardParentIndexKey(filter, i, bytes.Repeat([]byte{0x00}, 32)), clockShardParentIndexKey(filter, i, bytes.Repeat([]byte{0xff}, 32)), ); err != nil { if !errors.Is(err, pebble.ErrNotFound) { _ = txn.Abort() return errors.Wrap(err, "delete shard clock frame range") } } if err := txn.Delete(clockShardFrameKey(filter, i)); err != nil { if !errors.Is(err, pebble.ErrNotFound) { _ = txn.Abort() return errors.Wrap(err, "delete shard clock frame range") } } // The prover trie keys are not stored continuously with respect // to the same frame number. As such, we need to manually iterate // and discover such keys. for t := uint16(0); true; t++ { _, closer, err := p.db.Get(clockProverTrieKey(filter, t, i)) if err != nil { if !errors.Is(err, pebble.ErrNotFound) { _ = txn.Abort() return errors.Wrap(err, "delete shard clock frame range") } else { break } } _ = closer.Close() if err := txn.Delete(clockProverTrieKey(filter, t, i)); err != nil { _ = txn.Abort() return errors.Wrap(err, "delete shard clock frame range") } } if err := txn.DeleteRange( clockDataTotalDistanceKey(filter, i, bytes.Repeat([]byte{0x00}, 32)), clockDataTotalDistanceKey(filter, i, bytes.Repeat([]byte{0xff}, 32)), ); err != nil { if !errors.Is(err, pebble.ErrNotFound) { _ = txn.Abort() return errors.Wrap(err, "delete shard clock frame range") } } } if err := txn.Commit(); err != nil { return errors.Wrap(err, "delete shard clock frame range") } return nil } func (p *PebbleClockStore) ResetGlobalClockFrames() error { if err := p.db.DeleteRange( clockGlobalFrameKey(0), clockGlobalFrameKey(20000000), ); err != nil { return errors.Wrap(err, "reset global clock frames") } if err := p.db.Delete(clockGlobalEarliestIndex()); err != nil { return errors.Wrap(err, "reset global clock frames") } if err := p.db.Delete(clockGlobalLatestIndex()); err != nil { return errors.Wrap(err, "reset global clock frames") } return nil } func (p *PebbleClockStore) ResetShardClockFrames(filter []byte) error { if err := p.db.DeleteRange( clockShardFrameKey(filter, 0), clockShardFrameKey(filter, 200000), ); err != nil { return errors.Wrap(err, "reset shard clock frames") } if err := p.db.Delete(clockDataEarliestIndex(filter)); err != nil { return errors.Wrap(err, "reset shard clock frames") } if err := p.db.Delete(clockShardLatestIndex(filter)); err != nil { return errors.Wrap(err, "reset shard clock frames") } return nil } func (p *PebbleClockStore) Compact( dataFilter []byte, ) error { return nil } func (p *PebbleClockStore) GetTotalDistance( filter []byte, frameNumber uint64, selector []byte, ) (*big.Int, error) { value, closer, err := p.db.Get( clockDataTotalDistanceKey(filter, frameNumber, selector), ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get total distance") } defer closer.Close() dist := new(big.Int).SetBytes(value) return dist, nil } func (p *PebbleClockStore) SetTotalDistance( filter []byte, frameNumber uint64, selector []byte, totalDistance *big.Int, ) error { err := p.db.Set( clockDataTotalDistanceKey(filter, frameNumber, selector), totalDistance.Bytes(), ) return errors.Wrap(err, "set total distance") } func (p *PebbleClockStore) GetPeerSeniorityMap(filter []byte) ( map[string]uint64, error, ) { value, closer, err := p.db.Get(clockDataSeniorityKey(filter)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get peer seniority map") } defer closer.Close() var b bytes.Buffer b.Write(value) dec := gob.NewDecoder(&b) var seniorityMap map[string]uint64 if err = dec.Decode(&seniorityMap); err != nil { return nil, errors.Wrap(err, "get peer seniority map") } return seniorityMap, nil } func (p *PebbleClockStore) PutPeerSeniorityMap( txn store.Transaction, filter []byte, seniorityMap map[string]uint64, ) error { b := new(bytes.Buffer) enc := gob.NewEncoder(b) if err := enc.Encode(&seniorityMap); err != nil { return errors.Wrap(err, "put peer seniority map") } return errors.Wrap( txn.Set(clockDataSeniorityKey(filter), b.Bytes()), "put peer seniority map", ) } func (p *PebbleClockStore) SetProverTriesForGlobalFrame( frame *protobufs.GlobalFrame, tries []*tries.RollingFrecencyCritbitTrie, ) error { // For global frames, filter is typically empty filter := []byte{} frameNumber := frame.Header.FrameNumber start := 0 for i, proverTrie := range tries { proverData, err := proverTrie.Serialize() if err != nil { return errors.Wrap(err, "set prover tries for frame") } if err = p.db.Set( clockProverTrieKey(filter, uint16(i), frameNumber), proverData, ); err != nil { return errors.Wrap(err, "set prover tries for frame") } start = i } start++ for { _, closer, err := p.db.Get( clockProverTrieKey(filter, uint16(start), frameNumber), ) if err != nil { if !errors.Is(err, pebble.ErrNotFound) { return errors.Wrap(err, "set prover tries for frame") } break } _ = closer.Close() if err = p.db.Delete( clockProverTrieKey(filter, uint16(start), frameNumber), ); err != nil { return errors.Wrap(err, "set prover tries for frame") } start++ } return nil } func (p *PebbleClockStore) SetProverTriesForShardFrame( frame *protobufs.AppShardFrame, tries []*tries.RollingFrecencyCritbitTrie, ) error { filter := frame.Header.Address frameNumber := frame.Header.FrameNumber start := 0 for i, proverTrie := range tries { proverData, err := proverTrie.Serialize() if err != nil { return errors.Wrap(err, "set prover tries for frame") } if err = p.db.Set( clockProverTrieKey(filter, uint16(i), frameNumber), proverData, ); err != nil { return errors.Wrap(err, "set prover tries for frame") } start = i } start++ for { _, closer, err := p.db.Get( clockProverTrieKey(filter, uint16(start), frameNumber), ) if err != nil { if !errors.Is(err, pebble.ErrNotFound) { return errors.Wrap(err, "set prover tries for frame") } break } _ = closer.Close() if err = p.db.Delete( clockProverTrieKey(filter, uint16(start), frameNumber), ); err != nil { return errors.Wrap(err, "set prover tries for frame") } start++ } return nil } func (p *PebbleClockStore) GetShardStateTree(filter []byte) ( *tries.VectorCommitmentTree, error, ) { data, closer, err := p.db.Get(clockShardStateTreeKey(filter)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get data state tree") } defer closer.Close() tree := &tries.VectorCommitmentTree{} var b bytes.Buffer b.Write(data) dec := gob.NewDecoder(&b) if err = dec.Decode(tree); err != nil { return nil, errors.Wrap(err, "get data state tree") } return tree, nil } func (p *PebbleClockStore) SetShardStateTree( txn store.Transaction, filter []byte, tree *tries.VectorCommitmentTree, ) error { b := new(bytes.Buffer) enc := gob.NewEncoder(b) if err := enc.Encode(tree); err != nil { return errors.Wrap(err, "set data state tree") } return errors.Wrap( txn.Set(clockShardStateTreeKey(filter), b.Bytes()), "set data state tree", ) } func (p *PebbleClockStore) GetLatestCertifiedGlobalState() ( *protobufs.GlobalProposal, error, ) { idxValue, closer, err := p.db.Get(clockGlobalCertifiedStateLatestIndex()) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get latest certified global state") } defer closer.Close() if len(idxValue) != 8 { return nil, errors.Wrap( store.ErrInvalidData, "get latest certified global state", ) } rank := binary.BigEndian.Uint64(idxValue) return p.GetCertifiedGlobalState(rank) } func (p *PebbleClockStore) GetEarliestCertifiedGlobalState() ( *protobufs.GlobalProposal, error, ) { idxValue, closer, err := p.db.Get(clockGlobalCertifiedStateEarliestIndex()) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get earliest certified global state") } defer closer.Close() if len(idxValue) != 8 { return nil, errors.Wrap( store.ErrInvalidData, "get earliest certified global state", ) } rank := binary.BigEndian.Uint64(idxValue) return p.GetCertifiedGlobalState(rank) } func (p *PebbleClockStore) GetCertifiedGlobalState(rank uint64) ( *protobufs.GlobalProposal, error, ) { key := clockGlobalCertifiedStateKey(rank) value, closer, err := p.db.Get(key) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get certified global state") } defer closer.Close() if len(value) != 24 { return nil, errors.Wrap( store.ErrInvalidData, "get certified global state", ) } frameNumber := binary.BigEndian.Uint64(value[:8]) qcRank := binary.BigEndian.Uint64(value[8:16]) tcRank := binary.BigEndian.Uint64(value[16:]) frame, err := p.GetGlobalClockFrame(frameNumber) if err != nil && !errors.Is(err, store.ErrNotFound) { return nil, errors.Wrap(err, "get certified global state") } vote, err := p.GetProposalVote(nil, frame.GetRank(), frame.Header.Prover) if err != nil && !errors.Is(err, store.ErrNotFound) { return nil, errors.Wrap(err, "get certified app shard state") } qc, err := p.GetQuorumCertificate(nil, qcRank) if err != nil && !errors.Is(err, store.ErrNotFound) { return nil, errors.Wrap(err, "get certified global state") } tc, err := p.GetTimeoutCertificate(nil, tcRank) if err != nil && !errors.Is(err, store.ErrNotFound) { return nil, errors.Wrap(err, "get certified global state") } return &protobufs.GlobalProposal{ State: frame, ParentQuorumCertificate: qc, PriorRankTimeoutCertificate: tc, Vote: vote, }, nil } func (p *PebbleClockStore) RangeCertifiedGlobalStates( startRank uint64, endRank uint64, ) (store.TypedIterator[*protobufs.GlobalProposal], error) { if startRank > endRank { startRank, endRank = endRank, startRank } iter, err := p.db.NewIter( clockGlobalCertifiedStateKey(startRank), clockGlobalCertifiedStateKey(endRank+1), ) if err != nil { return nil, errors.Wrap(err, "range certified global states") } return &PebbleGlobalStateIterator{i: iter, db: p}, nil } func (p *PebbleClockStore) PutCertifiedGlobalState( state *protobufs.GlobalProposal, txn store.Transaction, ) error { if state == nil { return errors.Wrap( errors.New("proposal is required"), "put certified global state", ) } rank := uint64(0) frameNumber := uint64(0xffffffffffffffff) qcRank := uint64(0xffffffffffffffff) tcRank := uint64(0xffffffffffffffff) if state.State != nil { if state.State.Header.Rank > rank { rank = state.State.Header.Rank } frameNumber = state.State.Header.FrameNumber if err := p.PutGlobalClockFrame(state.State, txn); err != nil { return errors.Wrap(err, "put certified global state") } if err := p.PutProposalVote(txn, state.Vote); err != nil { return errors.Wrap(err, "put certified global state") } } if state.ParentQuorumCertificate != nil { if state.ParentQuorumCertificate.Rank > rank { rank = state.ParentQuorumCertificate.Rank } qcRank = state.ParentQuorumCertificate.Rank if err := p.PutQuorumCertificate( state.ParentQuorumCertificate, txn, ); err != nil { return errors.Wrap(err, "put certified global state") } } if state.PriorRankTimeoutCertificate != nil { if state.PriorRankTimeoutCertificate.Rank > rank { rank = state.PriorRankTimeoutCertificate.Rank } tcRank = state.PriorRankTimeoutCertificate.Rank if err := p.PutTimeoutCertificate( state.PriorRankTimeoutCertificate, txn, ); err != nil { return errors.Wrap(err, "put certified global state") } } key := clockGlobalCertifiedStateKey(rank) value := []byte{} value = binary.BigEndian.AppendUint64(value, frameNumber) value = binary.BigEndian.AppendUint64(value, qcRank) value = binary.BigEndian.AppendUint64(value, tcRank) if err := txn.Set(key, value); err != nil { return errors.Wrap(err, "put certified global state") } if err := p.updateEarliestIndex( txn, clockGlobalCertifiedStateEarliestIndex(), rank, ); err != nil { return errors.Wrap(err, "put certified global state") } if err := txn.Set( clockGlobalCertifiedStateLatestIndex(), binary.BigEndian.AppendUint64(nil, rank), ); err != nil { return errors.Wrap(err, "put certified global state") } return nil } func (p *PebbleClockStore) GetLatestCertifiedAppShardState( filter []byte, ) ( *protobufs.AppShardProposal, error, ) { idxValue, closer, err := p.db.Get( clockShardCertifiedStateLatestIndex([]byte{}), ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get latest certified app shard state") } defer closer.Close() if len(idxValue) != 8 { return nil, errors.Wrap( store.ErrInvalidData, "get latest certified app shard state", ) } rank := binary.BigEndian.Uint64(idxValue) return p.GetCertifiedAppShardState(filter, rank) } func (p *PebbleClockStore) GetEarliestCertifiedAppShardState( filter []byte, ) ( *protobufs.AppShardProposal, error, ) { idxValue, closer, err := p.db.Get( clockShardCertifiedStateEarliestIndex([]byte{}), ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get earliest certified app shard state") } defer closer.Close() if len(idxValue) != 8 { return nil, errors.Wrap( store.ErrInvalidData, "get earliest certified app shard state", ) } rank := binary.BigEndian.Uint64(idxValue) return p.GetCertifiedAppShardState(filter, rank) } func (p *PebbleClockStore) GetCertifiedAppShardState( filter []byte, rank uint64, ) ( *protobufs.AppShardProposal, error, ) { key := clockShardCertifiedStateKey(rank, filter) value, closer, err := p.db.Get(key) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get certified app shard state") } defer closer.Close() if len(value) != 24 { return nil, errors.Wrap( store.ErrInvalidData, "get certified app shard state", ) } frameNumber := binary.BigEndian.Uint64(value[:8]) qcRank := binary.BigEndian.Uint64(value[8:16]) tcRank := binary.BigEndian.Uint64(value[16:]) frame, _, err := p.GetShardClockFrame(filter, frameNumber, false) if err != nil && !errors.Is(err, store.ErrNotFound) { return nil, errors.Wrap(err, "get certified app shard state") } vote, err := p.GetProposalVote(filter, frame.GetRank(), frame.Header.Prover) if err != nil && !errors.Is(err, store.ErrNotFound) { return nil, errors.Wrap(err, "get certified app shard state") } qc, err := p.GetQuorumCertificate(filter, qcRank) if err != nil && !errors.Is(err, store.ErrNotFound) { return nil, errors.Wrap(err, "get certified app shard state") } tc, err := p.GetTimeoutCertificate(filter, tcRank) if err != nil && !errors.Is(err, store.ErrNotFound) { return nil, errors.Wrap(err, "get certified app shard state") } return &protobufs.AppShardProposal{ State: frame, ParentQuorumCertificate: qc, PriorRankTimeoutCertificate: tc, Vote: vote, }, nil } func (p *PebbleClockStore) RangeCertifiedAppShardStates( filter []byte, startRank uint64, endRank uint64, ) (store.TypedIterator[*protobufs.AppShardProposal], error) { if startRank > endRank { startRank, endRank = endRank, startRank } return &PebbleAppShardStateIterator{ filter: filter, // buildutils:allow-slice-alias slice is static start: startRank, end: endRank + 1, cur: startRank, db: p, }, nil } func (p *PebbleClockStore) PutCertifiedAppShardState( state *protobufs.AppShardProposal, txn store.Transaction, ) error { if state == nil { return errors.Wrap( errors.New("proposal is required"), "put certified app shard state", ) } rank := uint64(0) filter := []byte{} frameNumber := uint64(0xffffffffffffffff) qcRank := uint64(0xffffffffffffffff) tcRank := uint64(0xffffffffffffffff) if state.State != nil { if state.State.Header.Rank > rank { rank = state.State.Header.Rank } frameNumber = state.State.Header.FrameNumber if err := p.StageShardClockFrame( []byte(state.State.Identity()), state.State, txn, ); err != nil { return errors.Wrap(err, "put certified app shard state") } if err := p.CommitShardClockFrame( state.State.Header.Address, frameNumber, []byte(state.State.Identity()), nil, txn, false, ); err != nil { return errors.Wrap(err, "put certified app shard state") } if err := p.PutProposalVote(txn, state.Vote); err != nil { return errors.Wrap(err, "put certified app shard state") } filter = state.State.Header.Address } if state.ParentQuorumCertificate != nil { if state.ParentQuorumCertificate.Rank > rank { rank = state.ParentQuorumCertificate.Rank } qcRank = state.ParentQuorumCertificate.Rank if err := p.PutQuorumCertificate( state.ParentQuorumCertificate, txn, ); err != nil { return errors.Wrap(err, "put certified app shard state") } filter = state.ParentQuorumCertificate.Filter } if state.PriorRankTimeoutCertificate != nil { if state.PriorRankTimeoutCertificate.Rank > rank { rank = state.PriorRankTimeoutCertificate.Rank } tcRank = state.PriorRankTimeoutCertificate.Rank if err := p.PutTimeoutCertificate( state.PriorRankTimeoutCertificate, txn, ); err != nil { return errors.Wrap(err, "put certified app shard state") } filter = state.PriorRankTimeoutCertificate.Filter } if bytes.Equal(filter, []byte{}) { return errors.Wrap( errors.New("invalid filter"), "put certified app shard state", ) } key := clockShardCertifiedStateKey(rank, filter) value := []byte{} value = binary.BigEndian.AppendUint64(value, frameNumber) value = binary.BigEndian.AppendUint64(value, qcRank) value = binary.BigEndian.AppendUint64(value, tcRank) if err := txn.Set(key, value); err != nil { return errors.Wrap(err, "put certified app shard state") } if err := p.updateEarliestIndex( txn, clockShardCertifiedStateEarliestIndex(filter), rank, ); err != nil { return errors.Wrap(err, "put certified app shard state") } if err := txn.Set( clockShardCertifiedStateLatestIndex(filter), binary.BigEndian.AppendUint64(nil, rank), ); err != nil { return errors.Wrap(err, "put certified app shard state") } return nil } func (p *PebbleClockStore) GetLatestQuorumCertificate( filter []byte, ) (*protobufs.QuorumCertificate, error) { idxValue, closer, err := p.db.Get( clockQuorumCertificateLatestIndex(filter), ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get latest quorum certificate") } defer closer.Close() if len(idxValue) != 8 { return nil, errors.Wrap( store.ErrInvalidData, "get latest quorum certificate", ) } rank := binary.BigEndian.Uint64(idxValue) return p.GetQuorumCertificate(filter, rank) } func (p *PebbleClockStore) GetEarliestQuorumCertificate( filter []byte, ) (*protobufs.QuorumCertificate, error) { idxValue, closer, err := p.db.Get( clockQuorumCertificateEarliestIndex(filter), ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get earliest quorum certificate") } defer closer.Close() if len(idxValue) != 8 { return nil, errors.Wrap( store.ErrInvalidData, "get earliest quorum certificate", ) } rank := binary.BigEndian.Uint64(idxValue) return p.GetQuorumCertificate(filter, rank) } func (p *PebbleClockStore) GetQuorumCertificate( filter []byte, rank uint64, ) (*protobufs.QuorumCertificate, error) { key := clockQuorumCertificateKey(rank, filter) value, closer, err := p.db.Get(key) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get quorum certificate") } defer closer.Close() qc := &protobufs.QuorumCertificate{} if err := qc.FromCanonicalBytes(slices.Clone(value)); err != nil { return nil, errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "get quorum certificate", ) } return qc, nil } func (p *PebbleClockStore) RangeQuorumCertificates( filter []byte, startRank uint64, endRank uint64, ) (store.TypedIterator[*protobufs.QuorumCertificate], error) { if startRank > endRank { startRank, endRank = endRank, startRank } return &PebbleQuorumCertificateIterator{ filter: filter, // buildutils:allow-slice-alias slice is static start: startRank, end: endRank + 1, cur: startRank, db: p, }, nil } func (p *PebbleClockStore) PutQuorumCertificate( qc *protobufs.QuorumCertificate, txn store.Transaction, ) error { if qc == nil { return errors.Wrap( errors.New("quorum certificate is required"), "put quorum certificate", ) } rank := qc.Rank filter := qc.Filter data, err := qc.ToCanonicalBytes() if err != nil { return errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "put quorum certificate", ) } key := clockQuorumCertificateKey(rank, filter) if err := txn.Set(key, data); err != nil { return errors.Wrap(err, "put quorum certificate") } if err := p.updateEarliestIndex( txn, clockQuorumCertificateEarliestIndex(filter), rank, ); err != nil { return errors.Wrap(err, "put quorum certificate") } if err := p.updateLatestIndex( txn, clockQuorumCertificateLatestIndex(filter), rank, ); err != nil { return errors.Wrap(err, "put quorum certificate") } return nil } func (p *PebbleClockStore) GetLatestTimeoutCertificate( filter []byte, ) (*protobufs.TimeoutCertificate, error) { idxValue, closer, err := p.db.Get( clockTimeoutCertificateLatestIndex(filter), ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get latest timeout certificate") } defer closer.Close() if len(idxValue) != 8 { return nil, errors.Wrap( store.ErrInvalidData, "get latest timeout certificate", ) } rank := binary.BigEndian.Uint64(idxValue) return p.GetTimeoutCertificate(filter, rank) } func (p *PebbleClockStore) GetEarliestTimeoutCertificate( filter []byte, ) (*protobufs.TimeoutCertificate, error) { idxValue, closer, err := p.db.Get( clockTimeoutCertificateEarliestIndex(filter), ) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get earliest timeout certificate") } defer closer.Close() if len(idxValue) != 8 { return nil, errors.Wrap( store.ErrInvalidData, "get earliest timeout certificate", ) } rank := binary.BigEndian.Uint64(idxValue) return p.GetTimeoutCertificate(filter, rank) } func (p *PebbleClockStore) GetTimeoutCertificate( filter []byte, rank uint64, ) (*protobufs.TimeoutCertificate, error) { key := clockTimeoutCertificateKey(rank, filter) value, closer, err := p.db.Get(key) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get timeout certificate") } defer closer.Close() tc := &protobufs.TimeoutCertificate{} if err := tc.FromCanonicalBytes(slices.Clone(value)); err != nil { return nil, errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "get timeout certificate", ) } return tc, nil } func (p *PebbleClockStore) RangeTimeoutCertificates( filter []byte, startRank uint64, endRank uint64, ) (store.TypedIterator[*protobufs.TimeoutCertificate], error) { if startRank > endRank { startRank, endRank = endRank, startRank } return &PebbleTimeoutCertificateIterator{ filter: filter, // buildutils:allow-slice-alias slice is static start: startRank, end: endRank + 1, cur: startRank, db: p, }, nil } func (p *PebbleClockStore) PutTimeoutCertificate( tc *protobufs.TimeoutCertificate, txn store.Transaction, ) error { if tc == nil { return errors.Wrap( errors.New("timeout certificate is required"), "put timeout certificate", ) } rank := tc.Rank filter := tc.Filter data, err := tc.ToCanonicalBytes() if err != nil { return errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "put timeout certificate", ) } key := clockTimeoutCertificateKey(rank, filter) if err := txn.Set(key, data); err != nil { return errors.Wrap(err, "put timeout certificate") } if err := p.updateEarliestIndex( txn, clockTimeoutCertificateEarliestIndex(filter), rank, ); err != nil { return errors.Wrap(err, "put timeout certificate") } if err := p.updateLatestIndex( txn, clockTimeoutCertificateLatestIndex(filter), rank, ); err != nil { return errors.Wrap(err, "put timeout certificate") } return nil } func (p *PebbleClockStore) PutProposalVote( txn store.Transaction, vote *protobufs.ProposalVote, ) error { if vote == nil { return errors.Wrap( errors.New("proposal vote is required"), "put proposal vote", ) } rank := vote.Rank filter := vote.Filter identity := vote.Identity() data, err := vote.ToCanonicalBytes() if err != nil { return errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "put proposal vote", ) } key := clockProposalVoteKey(rank, filter, []byte(identity)) err = txn.Set(key, data) return errors.Wrap(err, "put proposal vote") } func (p *PebbleClockStore) GetProposalVote( filter []byte, rank uint64, identity []byte, ) ( *protobufs.ProposalVote, error, ) { key := clockProposalVoteKey(rank, filter, []byte(identity)) value, closer, err := p.db.Get(key) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get proposal vote") } defer closer.Close() vote := &protobufs.ProposalVote{} if err := vote.FromCanonicalBytes(slices.Clone(value)); err != nil { return nil, errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "get proposal vote", ) } return vote, nil } func (p *PebbleClockStore) GetProposalVotes(filter []byte, rank uint64) ( []*protobufs.ProposalVote, error, ) { results := []*protobufs.ProposalVote{} startKey := clockProposalVoteKey(rank, filter, nil) endKey := clockProposalVoteKey(rank+1, filter, nil) iterator, err := p.db.NewIter(startKey, endKey) if err != nil { return nil, errors.Wrap(err, "get proposal votes") } defer iterator.Close() for iterator.First(); iterator.Valid(); iterator.Next() { key := iterator.Key() if len(key) != len(startKey)+32 { continue } value := iterator.Value() vote := &protobufs.ProposalVote{} if err := vote.FromCanonicalBytes(slices.Clone(value)); err != nil { return nil, errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "get proposal votes", ) } results = append(results, vote) } return results, nil } func (p *PebbleClockStore) PutTimeoutVote( txn store.Transaction, vote *protobufs.TimeoutState, ) error { if vote == nil { return errors.Wrap( errors.New("timeout vote is required"), "put timeout vote", ) } rank := vote.Vote.Rank filter := vote.Vote.Filter identity := vote.Vote.Identity() data, err := vote.ToCanonicalBytes() if err != nil { return errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "put timeout vote", ) } key := clockTimeoutVoteKey(rank, filter, []byte(identity)) err = txn.Set(key, data) return errors.Wrap(err, "put timeout vote") } func (p *PebbleClockStore) GetTimeoutVote( filter []byte, rank uint64, identity []byte, ) ( *protobufs.TimeoutState, error, ) { key := clockTimeoutVoteKey(rank, filter, []byte(identity)) value, closer, err := p.db.Get(key) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, store.ErrNotFound } return nil, errors.Wrap(err, "get proposal vote") } defer closer.Close() vote := &protobufs.TimeoutState{} if err := vote.FromCanonicalBytes(slices.Clone(value)); err != nil { return nil, errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "get proposal vote", ) } return vote, nil } func (p *PebbleClockStore) GetTimeoutVotes(filter []byte, rank uint64) ( []*protobufs.TimeoutState, error, ) { results := []*protobufs.TimeoutState{} startKey := clockTimeoutVoteKey(rank, filter, nil) endKey := clockTimeoutVoteKey(rank+1, filter, nil) iterator, err := p.db.NewIter(startKey, endKey) if err != nil { return nil, errors.Wrap(err, "get timeout votes") } defer iterator.Close() for iterator.First(); iterator.Valid(); iterator.Next() { key := iterator.Key() if len(key) != len(startKey)+32 { continue } value := iterator.Value() vote := &protobufs.TimeoutState{} if err := vote.FromCanonicalBytes(slices.Clone(value)); err != nil { return nil, errors.Wrap( errors.Wrap(err, store.ErrInvalidData.Error()), "get timeout votes", ) } results = append(results, vote) } return results, nil }