ceremonyclient/node/store/worker.go
2025-12-15 16:45:31 -06:00

327 lines
7.8 KiB
Go

package store
import (
"encoding/binary"
"fmt"
"slices"
"github.com/cockroachdb/pebble/v2"
"github.com/pkg/errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/types/store"
)
var _ store.WorkerStore = (*PebbleWorkerStore)(nil)
type PebbleWorkerStore struct {
db store.KVDB
logger *zap.Logger
}
func NewPebbleWorkerStore(
db store.KVDB,
logger *zap.Logger,
) *PebbleWorkerStore {
return &PebbleWorkerStore{
db,
logger,
}
}
func workerKey(coreId uint) []byte {
key := []byte{WORKER, WORKER_BY_CORE}
coreIdBytes := make([]byte, 8)
binary.BigEndian.PutUint64(coreIdBytes, uint64(coreId))
key = append(key, coreIdBytes...)
return key
}
func workerByFilterKey(filter []byte) []byte {
key := []byte{WORKER, WORKER_BY_FILTER}
key = append(key, filter...)
return key
}
func (p *PebbleWorkerStore) NewTransaction(indexed bool) (
store.Transaction,
error,
) {
return p.db.NewBatch(indexed), nil
}
func (p *PebbleWorkerStore) GetWorker(coreId uint) (*store.WorkerInfo, error) {
data, closer, err := p.db.Get(workerKey(coreId))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, store.ErrNotFound
}
return nil, errors.Wrap(err, "get worker")
}
copied := slices.Clone(data)
closer.Close()
return decodeWorkerInfo(copied)
}
func (p *PebbleWorkerStore) GetWorkerByFilter(filter []byte) (
*store.WorkerInfo,
error,
) {
if len(filter) == 0 {
return nil, errors.Wrap(
errors.New("filter cannot be empty"),
"get worker by filter",
)
}
data, closer, err := p.db.Get(workerByFilterKey(filter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, store.ErrNotFound
}
return nil, errors.Wrap(err, "get worker by filter")
}
copied := slices.Clone(data)
closer.Close()
if len(copied) < 8 {
return nil, errors.Wrap(
fmt.Errorf("index payload too short: %d", len(copied)),
"get worker by filter",
)
}
coreId := binary.BigEndian.Uint64(copied[:8])
return p.GetWorker(uint(coreId))
}
func (p *PebbleWorkerStore) PutWorker(
txn store.Transaction,
worker *store.WorkerInfo,
) error {
// Check if worker already exists to clean up old filter index if needed
existingWorker, err := p.GetWorker(worker.CoreId)
if err == nil && existingWorker != nil {
// Delete old filter index if it exists and is different
if len(existingWorker.Filter) > 0 &&
string(existingWorker.Filter) != string(worker.Filter) {
if err := txn.Delete(
workerByFilterKey(existingWorker.Filter),
); err != nil {
return errors.Wrap(err, "put worker")
}
}
}
data, err := encodeWorkerInfo(worker)
if err != nil {
return errors.Wrap(err, "put worker")
}
if err := txn.Set(workerKey(worker.CoreId), data); err != nil {
return errors.Wrap(err, "put worker")
}
// Only set filter index if filter is not empty
if len(worker.Filter) > 0 {
coreIdBytes := make([]byte, 8)
binary.BigEndian.PutUint64(coreIdBytes, uint64(worker.CoreId))
if err := txn.Set(
workerByFilterKey(worker.Filter),
coreIdBytes,
); err != nil {
return errors.Wrap(err, "put worker")
}
}
return nil
}
func (p *PebbleWorkerStore) DeleteWorker(
txn store.Transaction,
coreId uint,
) error {
worker, err := p.GetWorker(coreId)
if err != nil {
return errors.Wrap(err, "delete worker")
}
if err := txn.Delete(workerKey(coreId)); err != nil {
return errors.Wrap(err, "delete worker")
}
// Only delete filter index if filter is not empty
if len(worker.Filter) > 0 {
if err := txn.Delete(workerByFilterKey(worker.Filter)); err != nil {
return errors.Wrap(err, "delete worker")
}
}
return nil
}
func (p *PebbleWorkerStore) RangeWorkers() ([]*store.WorkerInfo, error) {
iter, err := p.db.NewIter(
[]byte{WORKER, WORKER_BY_CORE, 0x00},
[]byte{WORKER, WORKER_BY_CORE, 0xFF},
)
if err != nil {
return nil, errors.Wrap(err, "range workers")
}
defer iter.Close()
var workers []*store.WorkerInfo
for iter.First(); iter.Valid(); iter.Next() {
val := slices.Clone(iter.Value())
worker, err := decodeWorkerInfo(val)
if err != nil {
return nil, errors.Wrap(err, "range workers")
}
workers = append(workers, worker)
}
return workers, nil
}
func encodeWorkerInfo(worker *store.WorkerInfo) ([]byte, error) {
listenMultiaddrLen := uint16(len(worker.ListenMultiaddr))
streamListenMultiaddrLen := uint16(len(worker.StreamListenMultiaddr))
filterLen := uint16(len(worker.Filter))
// totalLen = coreId(8) + totalStorage(8) + automatic(1) + allocated(1)
// + 2 + listen + 2 + stream + 2 + filter
totalLen := 8 + 8 + 1 + 1 + 2 + int(listenMultiaddrLen) + 2 +
int(streamListenMultiaddrLen) + 2 + int(filterLen) + 8
data := make([]byte, totalLen)
offset := 0
binary.BigEndian.PutUint64(data[offset:], uint64(worker.CoreId))
offset += 8
binary.BigEndian.PutUint64(data[offset:], uint64(worker.TotalStorage))
offset += 8
if worker.Automatic {
data[offset] = 1
} else {
data[offset] = 0
}
offset += 1
if worker.Allocated {
data[offset] = 1
} else {
data[offset] = 0
}
offset += 1
binary.BigEndian.PutUint16(data[offset:], listenMultiaddrLen)
offset += 2
copy(data[offset:], worker.ListenMultiaddr)
offset += int(listenMultiaddrLen)
binary.BigEndian.PutUint16(data[offset:], streamListenMultiaddrLen)
offset += 2
copy(data[offset:], worker.StreamListenMultiaddr)
offset += int(streamListenMultiaddrLen)
binary.BigEndian.PutUint16(data[offset:], filterLen)
offset += 2
copy(data[offset:], worker.Filter)
offset += int(filterLen)
binary.BigEndian.PutUint64(data[offset:], worker.PendingFilterFrame)
return data, nil
}
func decodeWorkerInfo(data []byte) (*store.WorkerInfo, error) {
if len(data) < 24 {
return nil, errors.New("invalid worker info data: too short")
}
offset := 0
if offset+8 > len(data) {
return nil, errors.New("truncated coreId")
}
coreId := binary.BigEndian.Uint64(data[offset:])
offset += 8
if offset+8 > len(data) {
return nil, errors.New("truncated totalStorage")
}
totalStorage := binary.BigEndian.Uint64(data[offset:])
offset += 8
if offset+1 > len(data) {
return nil, errors.New("truncated automatic flag")
}
automatic := data[offset] == 1
offset += 1
if offset+1 > len(data) {
return nil, errors.New("truncated allocated flag")
}
allocated := data[offset] == 1
offset += 1
if offset+2 > len(data) {
return nil, errors.New("truncated listenMultiaddr length")
}
listenMultiaddrLen := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if listenMultiaddrLen < 0 || offset+listenMultiaddrLen > len(data) {
return nil, errors.New("invalid listen multiaddr length")
}
listenMultiaddr := string(data[offset : offset+listenMultiaddrLen])
offset += listenMultiaddrLen
if offset+2 > len(data) {
return nil, errors.New("truncated streamListenMultiaddr length")
}
streamListenMultiaddrLen := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if streamListenMultiaddrLen < 0 ||
offset+streamListenMultiaddrLen > len(data) {
return nil, errors.New("invalid stream listen multiaddr length")
}
streamListenMultiaddr := string(
data[offset : offset+streamListenMultiaddrLen],
)
offset += streamListenMultiaddrLen
if offset+2 > len(data) {
return nil, errors.New("truncated filter length")
}
filterLen := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if filterLen < 0 || offset+filterLen > len(data) {
return nil, errors.New("invalid filter length")
}
filter := make([]byte, filterLen)
copy(filter, data[offset:offset+filterLen])
offset += filterLen
var pendingFrame uint64
if offset+8 <= len(data) {
pendingFrame = binary.BigEndian.Uint64(data[offset:])
}
return &store.WorkerInfo{
CoreId: uint(coreId),
ListenMultiaddr: listenMultiaddr,
StreamListenMultiaddr: streamListenMultiaddr,
Filter: filter,
TotalStorage: uint(totalStorage),
Automatic: automatic,
Allocated: allocated,
PendingFilterFrame: pendingFrame,
}, nil
}