fix: update stored worker registry, improve logging for debug mode

This commit is contained in:
Cassandra Heart 2025-10-05 23:50:16 -05:00
parent 0174e0c324
commit f6bade89bf
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
2 changed files with 67 additions and 1 deletions

View File

@ -2,6 +2,7 @@ package provers
import (
"bytes"
"encoding/hex"
"math/big"
"sort"
@ -93,11 +94,12 @@ func (m *Manager) PlanAndAllocate(
maxAllocations int,
) ([]Proposal, error) {
if len(shards) == 0 {
m.logger.Debug("no shards to allocate")
return nil, nil
}
// Enumerate free workers (unallocated).
all, err := m.store.RangeWorkers()
all, err := m.workerMgr.RangeWorkers()
if err != nil {
return nil, errors.Wrap(err, "plan and allocate")
}
@ -109,6 +111,7 @@ func (m *Manager) PlanAndAllocate(
}
if len(free) == 0 {
m.logger.Debug("no workers free")
return nil, nil
}
@ -132,6 +135,11 @@ func (m *Manager) PlanAndAllocate(
for i, s := range shards {
if len(s.Filter) == 0 || s.Size == 0 {
m.logger.Debug(
"filtering out empty shard",
zap.String("filter", hex.EncodeToString(s.Filter)),
zap.Uint64("size", s.Size),
)
continue
}
if s.Shards == 0 {
@ -173,10 +181,17 @@ func (m *Manager) PlanAndAllocate(
score.Quo(score, ringDiv)
score.Quo(score, shardsSqrt.BigInt())
}
m.logger.Debug(
"adding score proposal",
zap.Int("index", i),
zap.String("score", score.String()),
)
scores = append(scores, scored{idx: i, score: score})
}
if len(scores) == 0 {
m.logger.Debug("no scores")
return nil, nil
}

View File

@ -143,6 +143,10 @@ func (w *WorkerManager) RegisterWorker(info *typesStore.WorkerInfo) error {
w.mu.Lock()
defer w.mu.Unlock()
return w.registerWorker(info)
}
func (w *WorkerManager) registerWorker(info *typesStore.WorkerInfo) error {
if !w.started {
workerOperationsTotal.WithLabelValues("register", "error").Inc()
return errors.New("worker manager not started")
@ -196,6 +200,7 @@ func (w *WorkerManager) RegisterWorker(info *typesStore.WorkerInfo) error {
"worker registered successfully",
zap.Uint("core_id", info.CoreId),
)
return nil
}
@ -494,6 +499,16 @@ func (w *WorkerManager) loadWorkersFromStore() error {
return errors.Wrap(err, "load workers from store")
}
if len(workers) != w.config.Engine.DataWorkerCount {
for i := range w.config.Engine.DataWorkerCount {
_, err := w.getIPCOfWorker(uint(i + 1))
if err != nil {
w.logger.Error("could not obtain IPC for worker", zap.Error(err))
continue
}
}
}
var totalStorage uint64
var allocatedCount int
for _, worker := range workers {
@ -549,6 +564,23 @@ func (w *WorkerManager) getMultiaddrOfWorker(coreId uint) (
return ma, errors.Wrap(err, "get multiaddr of worker")
}
func (w *WorkerManager) getP2PMultiaddrOfWorker(coreId uint) (
multiaddr.Multiaddr,
error,
) {
p2pMultiaddr := fmt.Sprintf(
w.config.Engine.DataWorkerBaseListenMultiaddr,
int(w.config.Engine.DataWorkerBaseP2PPort)+int(coreId-1),
)
if len(w.config.Engine.DataWorkerP2PMultiaddrs) != 0 {
p2pMultiaddr = w.config.Engine.DataWorkerP2PMultiaddrs[coreId-1]
}
ma, err := multiaddr.StringCast(p2pMultiaddr)
return ma, errors.Wrap(err, "get p2p multiaddr of worker")
}
func (w *WorkerManager) getIPCOfWorker(coreId uint) (
protobufs.DataIPCServiceClient,
error,
@ -572,6 +604,25 @@ func (w *WorkerManager) getIPCOfWorker(coreId uint) (
return nil, errors.Wrap(err, "get ipc of worker")
}
if _, ok := w.filtersByWorker[coreId]; !ok {
p2pAddr, err := w.getP2PMultiaddrOfWorker(coreId)
if err != nil {
return nil, errors.Wrap(err, "get ipc of worker")
}
err = w.registerWorker(&typesStore.WorkerInfo{
CoreId: coreId,
ListenMultiaddr: p2pAddr.String(),
StreamListenMultiaddr: addr.String(),
Filter: nil,
TotalStorage: 0,
Automatic: len(w.config.Engine.DataWorkerP2PMultiaddrs) == 0,
Allocated: false,
})
if err != nil {
return nil, errors.Wrap(err, "get ipc of worker")
}
}
privKey, err := crypto.UnmarshalEd448PrivateKey(peerPrivKey)
if err != nil {
w.logger.Error("error unmarshaling peerkey", zap.Error(err))