add standalone sync mode, handle disconnect issue, improve logging to clarify current state

This commit is contained in:
Cassandra Heart 2025-03-12 02:14:59 -05:00
parent 10b8344498
commit 586b68c296
No known key found for this signature in database
GPG Key ID: 6352152859385958
7 changed files with 434 additions and 55 deletions

View File

@ -15,10 +15,19 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token"
"source.quilibrium.com/quilibrium/monorepo/node/keys"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/rpc"
"source.quilibrium.com/quilibrium/monorepo/node/store"
)
type NodeMode string
const (
NormalNodeMode = NodeMode("normal")
StrictSyncNodeMode = NodeMode("strict-sync")
)
type Node struct {
mode NodeMode
logger *zap.Logger
dataProofStore store.DataProofStore
clockStore store.ClockStore
@ -28,6 +37,7 @@ type Node struct {
execEngines map[string]execution.ExecutionEngine
engine consensus.ConsensusEngine
pebble store.KVDB
synchronizer rpc.Synchronizer
}
type DHTNode struct {
@ -44,6 +54,27 @@ func newDHTNode(
}, nil
}
func newStrictSyncNode(
logger *zap.Logger,
dataProofStore store.DataProofStore,
clockStore store.ClockStore,
coinStore store.CoinStore,
keyManager keys.KeyManager,
pebble store.KVDB,
synchronizer rpc.Synchronizer,
) (*Node, error) {
return &Node{
mode: StrictSyncNodeMode,
logger: logger,
dataProofStore: dataProofStore,
clockStore: clockStore,
coinStore: coinStore,
keyManager: keyManager,
pebble: pebble,
synchronizer: synchronizer,
}, nil
}
func newNode(
logger *zap.Logger,
dataProofStore store.DataProofStore,
@ -65,15 +96,17 @@ func newNode(
}
return &Node{
logger,
dataProofStore,
clockStore,
coinStore,
keyManager,
pubSub,
execEngines,
engine,
pebble,
mode: NormalNodeMode,
logger: logger,
dataProofStore: dataProofStore,
clockStore: clockStore,
coinStore: coinStore,
keyManager: keyManager,
pubSub: pubSub,
execEngines: execEngines,
engine: engine,
pebble: pebble,
synchronizer: nil,
}, nil
}
@ -157,29 +190,39 @@ func (d *DHTNode) Stop() {
}
func (n *Node) Start() {
err := <-n.engine.Start()
if err != nil {
panic(err)
}
switch n.mode {
case NormalNodeMode:
err := <-n.engine.Start()
if err != nil {
panic(err)
}
// TODO: add config mapping to engine name/frame registration
wg := sync.WaitGroup{}
for _, e := range n.execEngines {
wg.Add(1)
go func(e execution.ExecutionEngine) {
defer wg.Done()
if err := <-n.engine.RegisterExecutor(e, 0); err != nil {
panic(err)
}
}(e)
// TODO: add config mapping to engine name/frame registration
wg := sync.WaitGroup{}
for _, e := range n.execEngines {
wg.Add(1)
go func(e execution.ExecutionEngine) {
defer wg.Done()
if err := <-n.engine.RegisterExecutor(e, 0); err != nil {
panic(err)
}
}(e)
}
wg.Wait()
case StrictSyncNodeMode:
go n.synchronizer.Start(n.logger, n.pebble)
}
wg.Wait()
}
func (n *Node) Stop() {
err := <-n.engine.Stop(false)
if err != nil {
panic(err)
switch n.mode {
case NormalNodeMode:
err := <-n.engine.Stop(false)
if err != nil {
panic(err)
}
case StrictSyncNodeMode:
n.synchronizer.Stop()
}
n.pebble.Close()

View File

@ -15,6 +15,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/keys"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/rpc"
"source.quilibrium.com/quilibrium/monorepo/node/store"
)
@ -113,6 +114,15 @@ func NewDebugNode(*config.Config, *protobufs.SelfTestReport) (*Node, error) {
))
}
func NewStrictSyncNode(*config.Config, *protobufs.SelfTestReport, rpc.Synchronizer) (*Node, error) {
panic(wire.Build(
loggerSet,
keyManagerSet,
storeSet,
newStrictSyncNode,
))
}
func NewNode(*config.Config, *protobufs.SelfTestReport) (*Node, error) {
panic(wire.Build(
loggerSet,

View File

@ -18,6 +18,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/keys"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/rpc"
"source.quilibrium.com/quilibrium/monorepo/node/store"
)
@ -61,6 +62,22 @@ func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTes
return node, nil
}
func NewStrictSyncNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport, synchronizer rpc.Synchronizer) (*Node, error) {
zapLogger := logger()
dbConfig := configConfig.DB
pebbleDB := store.NewPebbleDB(dbConfig)
pebbleDataProofStore := store.NewPebbleDataProofStore(pebbleDB, zapLogger)
pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger)
pebbleCoinStore := store.NewPebbleCoinStore(pebbleDB, zapLogger)
keyConfig := configConfig.Key
fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger)
node, err := newStrictSyncNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, pebbleDB, synchronizer)
if err != nil {
return nil, err
}
return node, nil
}
func NewNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) {
zapLogger := logger()
dbConfig := configConfig.DB

View File

@ -478,6 +478,7 @@ func NewTokenExecutionEngine(
e.hypergraph,
e.syncController,
totalCoins,
false,
)
hypersyncMetadataFilter := slices.Concat(

View File

@ -142,6 +142,16 @@ var (
false,
"compacts the database and exits",
)
strictSyncServer = flag.String(
"strict-sync-server",
"",
"runs only a server to listen for hypersync requests, uses multiaddr format (e.g. /ip4/0.0.0.0/tcp/8339)",
)
strictSyncClient = flag.String(
"strict-sync-client",
"",
"runs only a client to connect to a server listening for hypersync requests, uses multiaddr format (e.g. /ip4/127.0.0.1/tcp/8339)",
)
)
func signatureCheckDefault() bool {
@ -493,8 +503,27 @@ func main() {
done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
var node *app.Node
if *debug {
if *debug && *strictSyncServer == "" && *strictSyncClient == "" {
node, err = app.NewDebugNode(nodeConfig, report)
} else if *strictSyncServer != "" {
fmt.Println("Running in strict sync server mode, will not connect to regular p2p network...")
node, err = app.NewStrictSyncNode(
nodeConfig,
report,
rpc.NewStandaloneHypersyncServer(
nodeConfig.DB,
*strictSyncServer,
),
)
} else if *strictSyncClient != "" {
fmt.Println("Running in strict sync client mode, will not connect to regular p2p network...")
node, err = app.NewStrictSyncNode(
nodeConfig,
report,
rpc.NewStandaloneHypersyncClient(nodeConfig.DB, *strictSyncClient, done),
)
} else {
node, err = app.NewNode(nodeConfig, report)
}
@ -515,7 +544,8 @@ func main() {
node.Start()
defer node.Stop()
if nodeConfig.ListenGRPCMultiaddr != "" {
if nodeConfig.ListenGRPCMultiaddr != "" && *strictSyncServer == "" &&
*strictSyncClient == "" {
srv, err := rpc.NewRPCServer(
nodeConfig.ListenGRPCMultiaddr,
nodeConfig.ListenRestMultiaddr,

View File

@ -7,13 +7,21 @@ import (
"encoding/hex"
"fmt"
"io"
"os"
"slices"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
mn "github.com/multiformats/go-multiaddr/net"
"github.com/pkg/errors"
"go.uber.org/zap"
gogrpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/crypto"
hypergraph "source.quilibrium.com/quilibrium/monorepo/node/hypergraph/application"
"source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
@ -21,6 +29,226 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/store"
)
type Synchronizer interface {
Start(*zap.Logger, store.KVDB)
Stop()
}
type StandaloneHypersyncServer struct {
listenAddr multiaddr.Multiaddr
dbConfig *config.DBConfig
grpcServer *gogrpc.Server
quit chan struct{}
}
type StandaloneHypersyncClient struct {
serverAddr multiaddr.Multiaddr
dbConfig *config.DBConfig
done chan os.Signal
}
func NewStandaloneHypersyncServer(
dbConfig *config.DBConfig,
strictSyncServer string,
) Synchronizer {
listenAddr, err := multiaddr.NewMultiaddr(strictSyncServer)
if err != nil {
panic(err)
}
return &StandaloneHypersyncServer{
dbConfig: dbConfig,
listenAddr: listenAddr,
quit: make(chan struct{}),
}
}
func NewStandaloneHypersyncClient(
dbConfig *config.DBConfig,
strictSyncClient string,
done chan os.Signal,
) Synchronizer {
serverAddr, err := multiaddr.NewMultiaddr(strictSyncClient)
if err != nil {
panic(err)
}
return &StandaloneHypersyncClient{
dbConfig: dbConfig,
serverAddr: serverAddr,
done: done,
}
}
func (s *StandaloneHypersyncServer) Start(
logger *zap.Logger,
db store.KVDB,
) {
lis, err := mn.Listen(s.listenAddr)
if err != nil {
panic(err)
}
s.grpcServer = grpc.NewServer(
gogrpc.MaxRecvMsgSize(600*1024*1024),
gogrpc.MaxSendMsgSize(600*1024*1024),
)
hypergraphStore := store.NewPebbleHypergraphStore(s.dbConfig, db, logger)
hypergraph, err := hypergraphStore.LoadHypergraph()
if err != nil {
panic(err)
}
totalCoins := 0
coinStore := store.NewPebbleCoinStore(db, logger)
iter, err := coinStore.RangeCoins(
[]byte{0x00},
[]byte{0xff},
)
if err != nil {
panic(err)
}
for iter.First(); iter.Valid(); iter.Next() {
totalCoins++
}
iter.Close()
server := NewHypergraphComparisonServer(
logger,
hypergraphStore,
hypergraph,
NewSyncController(),
totalCoins,
true,
)
protobufs.RegisterHypergraphComparisonServiceServer(
s.grpcServer,
server,
)
go func() {
if err := s.grpcServer.Serve(mn.NetListener(lis)); err != nil {
logger.Error("serve error", zap.Error(err))
}
}()
<-s.quit
}
func (s *StandaloneHypersyncServer) Stop() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
s.grpcServer.GracefulStop()
}()
wg.Wait()
s.quit <- struct{}{}
}
func (s *StandaloneHypersyncClient) Start(
logger *zap.Logger,
db store.KVDB,
) {
hypergraphStore := store.NewPebbleHypergraphStore(s.dbConfig, db, logger)
hypergraph, err := hypergraphStore.LoadHypergraph()
if err != nil {
panic(err)
}
totalCoins := 0
coinStore := store.NewPebbleCoinStore(db, logger)
iter, err := coinStore.RangeCoins(
[]byte{0x00},
[]byte{0xff},
)
if err != nil {
panic(err)
}
for iter.First(); iter.Valid(); iter.Next() {
totalCoins++
}
iter.Close()
sets := hypergraph.GetVertexAdds()
for key, set := range sets {
dialCtx, cancelDial := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelDial()
_, addr, err := mn.DialArgs(s.serverAddr)
if err != nil {
panic(err)
}
credentials := insecure.NewCredentials()
cc, err := gogrpc.DialContext(
dialCtx,
addr,
gogrpc.WithTransportCredentials(
credentials,
),
gogrpc.WithDefaultCallOptions(
gogrpc.MaxCallSendMsgSize(600*1024*1024),
gogrpc.MaxCallRecvMsgSize(600*1024*1024),
),
)
if err != nil {
panic(err)
}
client := protobufs.NewHypergraphComparisonServiceClient(cc)
stream, err := client.HyperStream(context.Background())
if err != nil {
logger.Error("could not open stream", zap.Error(err))
return
}
err = SyncTreeBidirectionally(
stream,
logger,
append(append([]byte{}, key.L1[:]...), key.L2[:]...),
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
hypergraphStore,
hypergraph,
set,
NewSyncController(),
totalCoins,
false,
)
if err != nil {
logger.Error("error while synchronizing", zap.Error(err))
}
if err := cc.Close(); err != nil {
logger.Error("error while closing connection", zap.Error(err))
}
break
}
roots := hypergraph.Commit()
logger.Info(
"hypergraph root commit",
zap.String("root", hex.EncodeToString(roots[0])),
)
logger.Info("saving hypergraph")
if err := hypergraphStore.SaveHypergraph(hypergraph); err != nil {
logger.Error("error while saving", zap.Error(err))
}
logger.Info("hypergraph saved")
s.done <- syscall.SIGINT
}
func (s *StandaloneHypersyncClient) Stop() {
}
type SyncController struct {
isSyncing atomic.Bool
SyncStatus map[string]*SyncInfo
@ -49,7 +277,7 @@ func NewSyncController() *SyncController {
// hypergraphComparisonServer implements the bidirectional sync service.
type hypergraphComparisonServer struct {
protobufs.UnimplementedHypergraphComparisonServiceServer
isDetachedServer bool
logger *zap.Logger
localHypergraphStore store.HypergraphStore
localHypergraph *hypergraph.Hypergraph
@ -63,8 +291,10 @@ func NewHypergraphComparisonServer(
hypergraph *hypergraph.Hypergraph,
syncController *SyncController,
debugTotalCoins int,
isDetachedServer bool,
) *hypergraphComparisonServer {
return &hypergraphComparisonServer{
isDetachedServer: isDetachedServer,
logger: logger,
localHypergraphStore: hypergraphStore,
localHypergraph: hypergraph,
@ -75,6 +305,7 @@ func NewHypergraphComparisonServer(
type streamManager struct {
ctx context.Context
cancel context.CancelFunc
logger *zap.Logger
stream HyperStream
hypergraphStore store.HypergraphStore
@ -133,6 +364,12 @@ func (s *streamManager) sendLeafData(
return nil
}
select {
case <-s.ctx.Done():
return s.ctx.Err()
default:
}
node := getNodeAtPath(s.localTree.Root, path, 0)
leaf, ok := node.(*crypto.VectorCommitmentLeafNode)
if !ok {
@ -142,7 +379,7 @@ func (s *streamManager) sendLeafData(
continue
}
s.leavesSent++
if s.leavesSent > 50000 {
if s.leavesSent > 100000 {
return nil
}
@ -431,7 +668,7 @@ func (s *streamManager) walk(
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
metadataOnly bool,
) error {
if s.leavesSent > 50000 {
if s.leavesSent > 100000 {
return nil
}
@ -752,18 +989,30 @@ func syncTreeBidirectionallyServer(
)
}
ctx, cancel := context.WithCancel(stream.Context())
incomingQueriesIn, incomingQueriesOut :=
UnboundedChan[*protobufs.HypergraphComparisonQuery]("server incoming")
UnboundedChan[*protobufs.HypergraphComparisonQuery](
cancel,
"server incoming",
)
incomingResponsesIn, incomingResponsesOut :=
UnboundedChan[*protobufs.HypergraphComparisonResponse]("server incoming")
UnboundedChan[*protobufs.HypergraphComparisonResponse](
cancel,
"server incoming",
)
incomingLeavesIn, incomingLeavesOut :=
UnboundedChan[*protobufs.LeafData]("server incoming")
UnboundedChan[*protobufs.LeafData](
cancel,
"server incoming",
)
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
logger.Info("received disconnect")
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
@ -771,6 +1020,7 @@ func syncTreeBidirectionallyServer(
}
if err != nil {
logger.Info("received error", zap.Error(err))
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
@ -794,7 +1044,8 @@ func syncTreeBidirectionallyServer(
wg.Add(1)
manager := &streamManager{
ctx: stream.Context(),
ctx: ctx,
cancel: cancel,
logger: logger,
stream: stream,
hypergraphStore: localHypergraphStore,
@ -827,7 +1078,7 @@ outer:
break outer
}
leavesReceived++
if leavesReceived > 50000 {
if leavesReceived > 100000 {
break outer
}
@ -898,13 +1149,15 @@ outer:
zap.String("root", hex.EncodeToString(roots[0])),
)
logger.Info("saving hypergraph")
if err = localHypergraphStore.SaveHypergraph(localHypergraph); err != nil {
logger.Error("error while saving", zap.Error(err))
}
logger.Info("hypergraph saved")
total, _ := idSet.GetTree().GetMetadata()
logger.Info(
"current progress",
"current progress, ready to resume connections",
zap.Float32("percentage", float32(total*100)/float32(debugTotalCoins)),
)
return nil
@ -919,14 +1172,18 @@ func (s *hypergraphComparisonServer) HyperStream(
}
defer s.syncController.EndSyncSession()
peerId, ok := grpc.PeerIDFromContext(stream.Context())
if !ok {
return errors.New("could not identify peer")
}
var peerId peer.ID
var ok bool
if !s.isDetachedServer {
peerId, ok = grpc.PeerIDFromContext(stream.Context())
if !ok {
return errors.New("could not identify peer")
}
status, ok := s.syncController.SyncStatus[peerId.String()]
if ok && time.Since(status.LastSynced) < 30*time.Minute {
return errors.New("peer too recently synced")
status, ok := s.syncController.SyncStatus[peerId.String()]
if ok && time.Since(status.LastSynced) < 30*time.Minute {
return errors.New("peer too recently synced")
}
}
err := syncTreeBidirectionallyServer(
@ -937,9 +1194,12 @@ func (s *hypergraphComparisonServer) HyperStream(
false,
s.debugTotalCoins,
)
s.syncController.SyncStatus[peerId.String()] = &SyncInfo{
Unreachable: false,
LastSynced: time.Now(),
if !s.isDetachedServer {
s.syncController.SyncStatus[peerId.String()] = &SyncInfo{
Unreachable: false,
LastSynced: time.Now(),
}
}
return err
@ -1008,23 +1268,36 @@ func SyncTreeBidirectionally(
return err
}
ctx, cancel := context.WithCancel(stream.Context())
incomingQueriesIn, incomingQueriesOut :=
UnboundedChan[*protobufs.HypergraphComparisonQuery]("server incoming")
UnboundedChan[*protobufs.HypergraphComparisonQuery](
cancel,
"client incoming",
)
incomingResponsesIn, incomingResponsesOut :=
UnboundedChan[*protobufs.HypergraphComparisonResponse]("server incoming")
UnboundedChan[*protobufs.HypergraphComparisonResponse](
cancel,
"client incoming",
)
incomingLeavesIn, incomingLeavesOut :=
UnboundedChan[*protobufs.LeafData]("server incoming")
UnboundedChan[*protobufs.LeafData](
cancel,
"client incoming",
)
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
return
}
if err != nil {
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
@ -1048,7 +1321,8 @@ func SyncTreeBidirectionally(
wg.Add(1)
manager := &streamManager{
ctx: stream.Context(),
ctx: ctx,
cancel: cancel,
logger: logger,
stream: stream,
hypergraphStore: hypergraphStore,
@ -1083,7 +1357,7 @@ outer:
}
leavesReceived++
if leavesReceived > 50000 {
if leavesReceived > 100000 {
break outer
}
@ -1156,7 +1430,10 @@ outer:
return nil
}
func UnboundedChan[T any](purpose string) (chan<- T, <-chan T) {
func UnboundedChan[T any](
cancel context.CancelFunc,
purpose string,
) (chan<- T, <-chan T) {
in := make(chan T)
out := make(chan T)
go func() {
@ -1171,6 +1448,7 @@ func UnboundedChan[T any](purpose string) (chan<- T, <-chan T) {
select {
case msg, ok := <-in:
if !ok {
cancel()
close(out)
return
}

View File

@ -253,7 +253,7 @@ func TestHypergraphSyncServer(t *testing.T) {
)
protobufs.RegisterHypergraphComparisonServiceServer(
grpcServer,
rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), numOperations),
rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), numOperations, false),
)
log.Println("Server listening on :50051")
go func() {