diff --git a/node/consensus/provers/proposer.go b/node/consensus/provers/proposer.go index ec19144..1c516de 100644 --- a/node/consensus/provers/proposer.go +++ b/node/consensus/provers/proposer.go @@ -2,6 +2,7 @@ package provers import ( "bytes" + "encoding/hex" "math/big" "sort" @@ -93,11 +94,12 @@ func (m *Manager) PlanAndAllocate( maxAllocations int, ) ([]Proposal, error) { if len(shards) == 0 { + m.logger.Debug("no shards to allocate") return nil, nil } // Enumerate free workers (unallocated). - all, err := m.store.RangeWorkers() + all, err := m.workerMgr.RangeWorkers() if err != nil { return nil, errors.Wrap(err, "plan and allocate") } @@ -109,6 +111,7 @@ func (m *Manager) PlanAndAllocate( } if len(free) == 0 { + m.logger.Debug("no workers free") return nil, nil } @@ -132,6 +135,11 @@ func (m *Manager) PlanAndAllocate( for i, s := range shards { if len(s.Filter) == 0 || s.Size == 0 { + m.logger.Debug( + "filtering out empty shard", + zap.String("filter", hex.EncodeToString(s.Filter)), + zap.Uint64("size", s.Size), + ) continue } if s.Shards == 0 { @@ -173,10 +181,17 @@ func (m *Manager) PlanAndAllocate( score.Quo(score, ringDiv) score.Quo(score, shardsSqrt.BigInt()) } + + m.logger.Debug( + "adding score proposal", + zap.Int("index", i), + zap.String("score", score.String()), + ) scores = append(scores, scored{idx: i, score: score}) } if len(scores) == 0 { + m.logger.Debug("no scores") return nil, nil } diff --git a/node/worker/manager.go b/node/worker/manager.go index 1845de6..c7cdd94 100644 --- a/node/worker/manager.go +++ b/node/worker/manager.go @@ -143,6 +143,10 @@ func (w *WorkerManager) RegisterWorker(info *typesStore.WorkerInfo) error { w.mu.Lock() defer w.mu.Unlock() + return w.registerWorker(info) +} + +func (w *WorkerManager) registerWorker(info *typesStore.WorkerInfo) error { if !w.started { workerOperationsTotal.WithLabelValues("register", "error").Inc() return errors.New("worker manager not started") @@ -196,6 +200,7 @@ func (w *WorkerManager) RegisterWorker(info *typesStore.WorkerInfo) error { "worker registered successfully", zap.Uint("core_id", info.CoreId), ) + return nil } @@ -494,6 +499,16 @@ func (w *WorkerManager) loadWorkersFromStore() error { return errors.Wrap(err, "load workers from store") } + if len(workers) != w.config.Engine.DataWorkerCount { + for i := range w.config.Engine.DataWorkerCount { + _, err := w.getIPCOfWorker(uint(i + 1)) + if err != nil { + w.logger.Error("could not obtain IPC for worker", zap.Error(err)) + continue + } + } + } + var totalStorage uint64 var allocatedCount int for _, worker := range workers { @@ -549,6 +564,23 @@ func (w *WorkerManager) getMultiaddrOfWorker(coreId uint) ( return ma, errors.Wrap(err, "get multiaddr of worker") } +func (w *WorkerManager) getP2PMultiaddrOfWorker(coreId uint) ( + multiaddr.Multiaddr, + error, +) { + p2pMultiaddr := fmt.Sprintf( + w.config.Engine.DataWorkerBaseListenMultiaddr, + int(w.config.Engine.DataWorkerBaseP2PPort)+int(coreId-1), + ) + + if len(w.config.Engine.DataWorkerP2PMultiaddrs) != 0 { + p2pMultiaddr = w.config.Engine.DataWorkerP2PMultiaddrs[coreId-1] + } + + ma, err := multiaddr.StringCast(p2pMultiaddr) + return ma, errors.Wrap(err, "get p2p multiaddr of worker") +} + func (w *WorkerManager) getIPCOfWorker(coreId uint) ( protobufs.DataIPCServiceClient, error, @@ -572,6 +604,25 @@ func (w *WorkerManager) getIPCOfWorker(coreId uint) ( return nil, errors.Wrap(err, "get ipc of worker") } + if _, ok := w.filtersByWorker[coreId]; !ok { + p2pAddr, err := w.getP2PMultiaddrOfWorker(coreId) + if err != nil { + return nil, errors.Wrap(err, "get ipc of worker") + } + err = w.registerWorker(&typesStore.WorkerInfo{ + CoreId: coreId, + ListenMultiaddr: p2pAddr.String(), + StreamListenMultiaddr: addr.String(), + Filter: nil, + TotalStorage: 0, + Automatic: len(w.config.Engine.DataWorkerP2PMultiaddrs) == 0, + Allocated: false, + }) + if err != nil { + return nil, errors.Wrap(err, "get ipc of worker") + } + } + privKey, err := crypto.UnmarshalEd448PrivateKey(peerPrivKey) if err != nil { w.logger.Error("error unmarshaling peerkey", zap.Error(err))