This commit is contained in:
Jiabao Qu 2025-11-19 21:01:40 +08:00 committed by GitHub
commit c52e1dc68e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 165 additions and 15 deletions

View File

@ -224,10 +224,9 @@ func (r *RPCServer) GetWorkerInfo(
info := []*protobufs.WorkerInfo{}
for _, worker := range workers {
info = append(info, &protobufs.WorkerInfo{
CoreId: uint32(worker.CoreId),
Filter: worker.Filter,
// TODO(2.1.1+): Expose available storage
AvailableStorage: uint64(worker.TotalStorage),
CoreId: uint32(worker.CoreId),
Filter: worker.Filter,
AvailableStorage: uint64(worker.AvailableStorage),
TotalStorage: uint64(worker.TotalStorage),
})
}

View File

@ -190,9 +190,9 @@ func encodeWorkerInfo(worker *store.WorkerInfo) ([]byte, error) {
streamListenMultiaddrLen := uint16(len(worker.StreamListenMultiaddr))
filterLen := uint16(len(worker.Filter))
// totalLen = coreId(8) + totalStorage(8) + automatic(1) + allocated(1)
// totalLen = coreId(8) + totalStorage(8) + availableStorage(8) + automatic(1) + allocated(1)
// + 2 + listen + 2 + stream + 2 + filter
totalLen := 8 + 8 + 1 + 1 + 2 + int(listenMultiaddrLen) + 2 +
totalLen := 8 + 8 + 8 + 1 + 1 + 2 + int(listenMultiaddrLen) + 2 +
int(streamListenMultiaddrLen) + 2 + int(filterLen)
data := make([]byte, totalLen)
@ -203,6 +203,9 @@ func encodeWorkerInfo(worker *store.WorkerInfo) ([]byte, error) {
binary.BigEndian.PutUint64(data[offset:], uint64(worker.TotalStorage))
offset += 8
binary.BigEndian.PutUint64(data[offset:], uint64(worker.AvailableStorage))
offset += 8
if worker.Automatic {
data[offset] = 1
} else {
@ -252,6 +255,15 @@ func decodeWorkerInfo(data []byte) (*store.WorkerInfo, error) {
totalStorage := binary.BigEndian.Uint64(data[offset:])
offset += 8
var availableStorage uint64
// Backwards compatibility
if offset+8 <= len(data) {
availableStorage = binary.BigEndian.Uint64(data[offset:])
offset += 8
} else {
availableStorage = totalStorage
}
if offset+1 > len(data) {
return nil, errors.New("truncated automatic flag")
}
@ -312,6 +324,7 @@ func decodeWorkerInfo(data []byte) (*store.WorkerInfo, error) {
StreamListenMultiaddr: streamListenMultiaddr,
Filter: filter,
TotalStorage: uint(totalStorage),
AvailableStorage: uint(availableStorage),
Automatic: automatic,
Allocated: allocated,
}, nil

View File

@ -52,6 +52,12 @@ type WorkerManager struct {
// IPC service clients
serviceClients map[uint]*grpc.ClientConn
// Reconciler configuration (partition estimate)
reconcilerInterval time.Duration
reconcilerBufferBytes uint64
reconcilerBufferPercent float64
reconcilerStatPath string
// In-memory cache for quick lookups
workersByFilter map[string]uint // filter hash -> worker id
filtersByWorker map[uint][]byte // worker id -> filter
@ -75,15 +81,16 @@ func NewWorkerManager(
) error,
) typesWorker.WorkerManager {
return &WorkerManager{
store: store,
logger: logger.Named("worker_manager"),
workersByFilter: make(map[string]uint),
filtersByWorker: make(map[uint][]byte),
allocatedWorkers: make(map[uint]bool),
serviceClients: make(map[uint]*grpc.ClientConn),
config: config,
proposeFunc: proposeFunc,
decideFunc: decideFunc,
store: store,
logger: logger.Named("worker_manager"),
workersByFilter: make(map[string]uint),
filtersByWorker: make(map[uint][]byte),
allocatedWorkers: make(map[uint]bool),
serviceClients: make(map[uint]*grpc.ClientConn),
config: config,
proposeFunc: proposeFunc,
decideFunc: decideFunc,
reconcilerStatPath: config.DB.Path,
}
}
@ -109,6 +116,8 @@ func (w *WorkerManager) Start(ctx context.Context) error {
return errors.Wrap(err, "start")
}
w.startPartitionReconciler()
w.logger.Info("worker manager started successfully")
return nil
}
@ -140,6 +149,7 @@ func (w *WorkerManager) Stop() error {
activeWorkersGauge.Set(0)
allocatedWorkersGauge.Set(0)
totalStorageGauge.Set(0)
availableStorageGauge.Set(0)
w.logger.Info("worker manager stopped")
return nil
@ -719,6 +729,7 @@ func (w *WorkerManager) getIPCOfWorker(coreId uint) (
StreamListenMultiaddr: addr.String(),
Filter: nil,
TotalStorage: 0,
AvailableStorage: 0,
Automatic: len(w.config.Engine.DataWorkerP2PMultiaddrs) == 0,
Allocated: false,
})

View File

@ -55,4 +55,13 @@ var (
},
[]string{"operation"}, // operation: register/allocate/deallocate/lookup
)
availableStorageGauge = promauto.NewGauge(
prometheus.GaugeOpts{
Namespace: "quilibrium",
Subsystem: "worker_manager",
Name: "available_storage_bytes",
Help: "Aggregated available storage (bytes) across workers as estimated by the manager",
},
)
)

117
node/worker/reconciler.go Normal file
View File

@ -0,0 +1,117 @@
package worker
import (
"syscall"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
)
func (w *WorkerManager) startPartitionReconciler() {
if w.reconcilerInterval == 0 {
w.reconcilerInterval = 30 * time.Second
}
if w.reconcilerStatPath == "" {
w.reconcilerStatPath = "."
}
go func() {
ticker := time.NewTicker(w.reconcilerInterval)
defer ticker.Stop()
for {
select {
case <-w.ctx.Done():
return
case <-ticker.C:
if err := reconcilePartitionAndPersist(w); err != nil {
w.logger.Error("partition reconcile failed", zap.Error(err))
}
}
}
}()
}
func reconcilePartitionAndPersist(w *WorkerManager) error {
total, avail, err := getPartitionUsage(w.reconcilerStatPath)
if err != nil {
return errors.Wrap(err, "get partition usage")
}
buffer := w.reconcilerBufferBytes
if buffer == 0 && w.reconcilerBufferPercent > 0 {
buffer = uint64(float64(total) * w.reconcilerBufferPercent)
}
if total == 0 {
return errors.New("partition total size is zero")
}
usable := uint64(0)
if avail > buffer {
usable = avail - buffer
} else {
usable = 0
}
workers, err := w.store.RangeWorkers()
if err != nil {
return errors.Wrap(err, "range workers")
}
if len(workers) == 0 {
return nil
}
perWorker := uint64(0)
if len(workers) > 0 {
perWorker = usable / uint64(len(workers))
}
txn, err := w.store.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "new transaction for reconcile")
}
var aggAvailable uint64
for _, worker := range workers {
if worker == nil {
continue
}
if perWorker == 0 || worker.TotalStorage == 0 {
worker.AvailableStorage = 0
} else {
if perWorker > uint64(worker.TotalStorage) {
worker.AvailableStorage = worker.TotalStorage
} else {
worker.AvailableStorage = uint(perWorker)
}
}
if err := w.store.PutWorker(txn, worker); err != nil {
txn.Abort()
return errors.Wrap(err, "put worker during reconcile")
}
aggAvailable += uint64(worker.AvailableStorage)
}
if err := txn.Commit(); err != nil {
txn.Abort()
return errors.Wrap(err, "commit reconcile txn")
}
availableStorageGauge.Set(float64(aggAvailable))
return nil
}
func getPartitionUsage(path string) (uint64, uint64, error) {
var stat syscall.Statfs_t
if err := syscall.Statfs(path, &stat); err != nil {
return 0, 0, errors.Wrapf(err, "statfs %s", path)
}
total := uint64(stat.Blocks) * uint64(stat.Bsize)
avail := uint64(stat.Bavail) * uint64(stat.Bsize)
return total, avail, nil
}

View File

@ -6,6 +6,7 @@ type WorkerInfo struct {
StreamListenMultiaddr string
Filter []byte
TotalStorage uint
AvailableStorage uint
Automatic bool
Allocated bool
}