diff --git a/node/app/node.go b/node/app/node.go index e0ed4ae..40ef797 100644 --- a/node/app/node.go +++ b/node/app/node.go @@ -15,10 +15,19 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token" "source.quilibrium.com/quilibrium/monorepo/node/keys" "source.quilibrium.com/quilibrium/monorepo/node/p2p" + "source.quilibrium.com/quilibrium/monorepo/node/rpc" "source.quilibrium.com/quilibrium/monorepo/node/store" ) +type NodeMode string + +const ( + NormalNodeMode = NodeMode("normal") + StrictSyncNodeMode = NodeMode("strict-sync") +) + type Node struct { + mode NodeMode logger *zap.Logger dataProofStore store.DataProofStore clockStore store.ClockStore @@ -28,6 +37,7 @@ type Node struct { execEngines map[string]execution.ExecutionEngine engine consensus.ConsensusEngine pebble store.KVDB + synchronizer rpc.Synchronizer } type DHTNode struct { @@ -44,6 +54,27 @@ func newDHTNode( }, nil } +func newStrictSyncNode( + logger *zap.Logger, + dataProofStore store.DataProofStore, + clockStore store.ClockStore, + coinStore store.CoinStore, + keyManager keys.KeyManager, + pebble store.KVDB, + synchronizer rpc.Synchronizer, +) (*Node, error) { + return &Node{ + mode: StrictSyncNodeMode, + logger: logger, + dataProofStore: dataProofStore, + clockStore: clockStore, + coinStore: coinStore, + keyManager: keyManager, + pebble: pebble, + synchronizer: synchronizer, + }, nil +} + func newNode( logger *zap.Logger, dataProofStore store.DataProofStore, @@ -65,15 +96,17 @@ func newNode( } return &Node{ - logger, - dataProofStore, - clockStore, - coinStore, - keyManager, - pubSub, - execEngines, - engine, - pebble, + mode: NormalNodeMode, + logger: logger, + dataProofStore: dataProofStore, + clockStore: clockStore, + coinStore: coinStore, + keyManager: keyManager, + pubSub: pubSub, + execEngines: execEngines, + engine: engine, + pebble: pebble, + synchronizer: nil, }, nil } @@ -157,29 +190,39 @@ func (d *DHTNode) Stop() { } func (n *Node) Start() { - err := <-n.engine.Start() - if err != nil { - panic(err) - } + switch n.mode { + case NormalNodeMode: + err := <-n.engine.Start() + if err != nil { + panic(err) + } - // TODO: add config mapping to engine name/frame registration - wg := sync.WaitGroup{} - for _, e := range n.execEngines { - wg.Add(1) - go func(e execution.ExecutionEngine) { - defer wg.Done() - if err := <-n.engine.RegisterExecutor(e, 0); err != nil { - panic(err) - } - }(e) + // TODO: add config mapping to engine name/frame registration + wg := sync.WaitGroup{} + for _, e := range n.execEngines { + wg.Add(1) + go func(e execution.ExecutionEngine) { + defer wg.Done() + if err := <-n.engine.RegisterExecutor(e, 0); err != nil { + panic(err) + } + }(e) + } + wg.Wait() + case StrictSyncNodeMode: + go n.synchronizer.Start(n.logger, n.pebble) } - wg.Wait() } func (n *Node) Stop() { - err := <-n.engine.Stop(false) - if err != nil { - panic(err) + switch n.mode { + case NormalNodeMode: + err := <-n.engine.Stop(false) + if err != nil { + panic(err) + } + case StrictSyncNodeMode: + n.synchronizer.Stop() } n.pebble.Close() diff --git a/node/app/wire.go b/node/app/wire.go index c7a8d4f..445268e 100644 --- a/node/app/wire.go +++ b/node/app/wire.go @@ -15,6 +15,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/keys" "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" + "source.quilibrium.com/quilibrium/monorepo/node/rpc" "source.quilibrium.com/quilibrium/monorepo/node/store" ) @@ -113,6 +114,15 @@ func NewDebugNode(*config.Config, *protobufs.SelfTestReport) (*Node, error) { )) } +func NewStrictSyncNode(*config.Config, *protobufs.SelfTestReport, rpc.Synchronizer) (*Node, error) { + panic(wire.Build( + loggerSet, + keyManagerSet, + storeSet, + newStrictSyncNode, + )) +} + func NewNode(*config.Config, *protobufs.SelfTestReport) (*Node, error) { panic(wire.Build( loggerSet, diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index dc4e212..ce154e1 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -18,6 +18,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/keys" "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" + "source.quilibrium.com/quilibrium/monorepo/node/rpc" "source.quilibrium.com/quilibrium/monorepo/node/store" ) @@ -61,6 +62,22 @@ func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTes return node, nil } +func NewStrictSyncNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport, synchronizer rpc.Synchronizer) (*Node, error) { + zapLogger := logger() + dbConfig := configConfig.DB + pebbleDB := store.NewPebbleDB(dbConfig) + pebbleDataProofStore := store.NewPebbleDataProofStore(pebbleDB, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger) + pebbleCoinStore := store.NewPebbleCoinStore(pebbleDB, zapLogger) + keyConfig := configConfig.Key + fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) + node, err := newStrictSyncNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, pebbleDB, synchronizer) + if err != nil { + return nil, err + } + return node, nil +} + func NewNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) { zapLogger := logger() dbConfig := configConfig.DB diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index 9bd904b..b03a994 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -478,6 +478,7 @@ func NewTokenExecutionEngine( e.hypergraph, e.syncController, totalCoins, + false, ) hypersyncMetadataFilter := slices.Concat( diff --git a/node/main.go b/node/main.go index 56512f5..b9e8d66 100644 --- a/node/main.go +++ b/node/main.go @@ -142,6 +142,16 @@ var ( false, "compacts the database and exits", ) + strictSyncServer = flag.String( + "strict-sync-server", + "", + "runs only a server to listen for hypersync requests, uses multiaddr format (e.g. /ip4/0.0.0.0/tcp/8339)", + ) + strictSyncClient = flag.String( + "strict-sync-client", + "", + "runs only a client to connect to a server listening for hypersync requests, uses multiaddr format (e.g. /ip4/127.0.0.1/tcp/8339)", + ) ) func signatureCheckDefault() bool { @@ -493,8 +503,27 @@ func main() { done := make(chan os.Signal, 1) signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) var node *app.Node - if *debug { + if *debug && *strictSyncServer == "" && *strictSyncClient == "" { node, err = app.NewDebugNode(nodeConfig, report) + } else if *strictSyncServer != "" { + fmt.Println("Running in strict sync server mode, will not connect to regular p2p network...") + + node, err = app.NewStrictSyncNode( + nodeConfig, + report, + rpc.NewStandaloneHypersyncServer( + nodeConfig.DB, + *strictSyncServer, + ), + ) + } else if *strictSyncClient != "" { + fmt.Println("Running in strict sync client mode, will not connect to regular p2p network...") + + node, err = app.NewStrictSyncNode( + nodeConfig, + report, + rpc.NewStandaloneHypersyncClient(nodeConfig.DB, *strictSyncClient, done), + ) } else { node, err = app.NewNode(nodeConfig, report) } @@ -515,7 +544,8 @@ func main() { node.Start() defer node.Stop() - if nodeConfig.ListenGRPCMultiaddr != "" { + if nodeConfig.ListenGRPCMultiaddr != "" && *strictSyncServer == "" && + *strictSyncClient == "" { srv, err := rpc.NewRPCServer( nodeConfig.ListenGRPCMultiaddr, nodeConfig.ListenRestMultiaddr, diff --git a/node/rpc/hypergraph_sync_rpc_server.go b/node/rpc/hypergraph_sync_rpc_server.go index a469a41..b9b9cee 100644 --- a/node/rpc/hypergraph_sync_rpc_server.go +++ b/node/rpc/hypergraph_sync_rpc_server.go @@ -7,13 +7,21 @@ import ( "encoding/hex" "fmt" "io" + "os" "slices" "sync" "sync/atomic" + "syscall" "time" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + mn "github.com/multiformats/go-multiaddr/net" "github.com/pkg/errors" "go.uber.org/zap" + gogrpc "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "source.quilibrium.com/quilibrium/monorepo/node/config" "source.quilibrium.com/quilibrium/monorepo/node/crypto" hypergraph "source.quilibrium.com/quilibrium/monorepo/node/hypergraph/application" "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" @@ -21,6 +29,226 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/store" ) +type Synchronizer interface { + Start(*zap.Logger, store.KVDB) + Stop() +} + +type StandaloneHypersyncServer struct { + listenAddr multiaddr.Multiaddr + dbConfig *config.DBConfig + grpcServer *gogrpc.Server + quit chan struct{} +} + +type StandaloneHypersyncClient struct { + serverAddr multiaddr.Multiaddr + dbConfig *config.DBConfig + done chan os.Signal +} + +func NewStandaloneHypersyncServer( + dbConfig *config.DBConfig, + strictSyncServer string, +) Synchronizer { + listenAddr, err := multiaddr.NewMultiaddr(strictSyncServer) + if err != nil { + panic(err) + } + + return &StandaloneHypersyncServer{ + dbConfig: dbConfig, + listenAddr: listenAddr, + quit: make(chan struct{}), + } +} + +func NewStandaloneHypersyncClient( + dbConfig *config.DBConfig, + strictSyncClient string, + done chan os.Signal, +) Synchronizer { + serverAddr, err := multiaddr.NewMultiaddr(strictSyncClient) + if err != nil { + panic(err) + } + + return &StandaloneHypersyncClient{ + dbConfig: dbConfig, + serverAddr: serverAddr, + done: done, + } +} + +func (s *StandaloneHypersyncServer) Start( + logger *zap.Logger, + db store.KVDB, +) { + lis, err := mn.Listen(s.listenAddr) + if err != nil { + panic(err) + } + + s.grpcServer = grpc.NewServer( + gogrpc.MaxRecvMsgSize(600*1024*1024), + gogrpc.MaxSendMsgSize(600*1024*1024), + ) + + hypergraphStore := store.NewPebbleHypergraphStore(s.dbConfig, db, logger) + hypergraph, err := hypergraphStore.LoadHypergraph() + if err != nil { + panic(err) + } + + totalCoins := 0 + + coinStore := store.NewPebbleCoinStore(db, logger) + + iter, err := coinStore.RangeCoins( + []byte{0x00}, + []byte{0xff}, + ) + if err != nil { + panic(err) + } + + for iter.First(); iter.Valid(); iter.Next() { + totalCoins++ + } + iter.Close() + + server := NewHypergraphComparisonServer( + logger, + hypergraphStore, + hypergraph, + NewSyncController(), + totalCoins, + true, + ) + protobufs.RegisterHypergraphComparisonServiceServer( + s.grpcServer, + server, + ) + + go func() { + if err := s.grpcServer.Serve(mn.NetListener(lis)); err != nil { + logger.Error("serve error", zap.Error(err)) + } + }() + <-s.quit +} + +func (s *StandaloneHypersyncServer) Stop() { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + s.grpcServer.GracefulStop() + }() + wg.Wait() + s.quit <- struct{}{} +} + +func (s *StandaloneHypersyncClient) Start( + logger *zap.Logger, + db store.KVDB, +) { + hypergraphStore := store.NewPebbleHypergraphStore(s.dbConfig, db, logger) + hypergraph, err := hypergraphStore.LoadHypergraph() + if err != nil { + panic(err) + } + + totalCoins := 0 + + coinStore := store.NewPebbleCoinStore(db, logger) + + iter, err := coinStore.RangeCoins( + []byte{0x00}, + []byte{0xff}, + ) + if err != nil { + panic(err) + } + + for iter.First(); iter.Valid(); iter.Next() { + totalCoins++ + } + iter.Close() + + sets := hypergraph.GetVertexAdds() + for key, set := range sets { + dialCtx, cancelDial := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelDial() + + _, addr, err := mn.DialArgs(s.serverAddr) + if err != nil { + panic(err) + } + credentials := insecure.NewCredentials() + + cc, err := gogrpc.DialContext( + dialCtx, + addr, + gogrpc.WithTransportCredentials( + credentials, + ), + gogrpc.WithDefaultCallOptions( + gogrpc.MaxCallSendMsgSize(600*1024*1024), + gogrpc.MaxCallRecvMsgSize(600*1024*1024), + ), + ) + if err != nil { + panic(err) + } + + client := protobufs.NewHypergraphComparisonServiceClient(cc) + + stream, err := client.HyperStream(context.Background()) + if err != nil { + logger.Error("could not open stream", zap.Error(err)) + return + } + + err = SyncTreeBidirectionally( + stream, + logger, + append(append([]byte{}, key.L1[:]...), key.L2[:]...), + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + hypergraphStore, + hypergraph, + set, + NewSyncController(), + totalCoins, + false, + ) + + if err != nil { + logger.Error("error while synchronizing", zap.Error(err)) + } + if err := cc.Close(); err != nil { + logger.Error("error while closing connection", zap.Error(err)) + } + break + } + + roots := hypergraph.Commit() + logger.Info( + "hypergraph root commit", + zap.String("root", hex.EncodeToString(roots[0])), + ) + + logger.Info("saving hypergraph") + if err := hypergraphStore.SaveHypergraph(hypergraph); err != nil { + logger.Error("error while saving", zap.Error(err)) + } + logger.Info("hypergraph saved") + s.done <- syscall.SIGINT +} + +func (s *StandaloneHypersyncClient) Stop() { +} + type SyncController struct { isSyncing atomic.Bool SyncStatus map[string]*SyncInfo @@ -49,7 +277,7 @@ func NewSyncController() *SyncController { // hypergraphComparisonServer implements the bidirectional sync service. type hypergraphComparisonServer struct { protobufs.UnimplementedHypergraphComparisonServiceServer - + isDetachedServer bool logger *zap.Logger localHypergraphStore store.HypergraphStore localHypergraph *hypergraph.Hypergraph @@ -63,8 +291,10 @@ func NewHypergraphComparisonServer( hypergraph *hypergraph.Hypergraph, syncController *SyncController, debugTotalCoins int, + isDetachedServer bool, ) *hypergraphComparisonServer { return &hypergraphComparisonServer{ + isDetachedServer: isDetachedServer, logger: logger, localHypergraphStore: hypergraphStore, localHypergraph: hypergraph, @@ -75,6 +305,7 @@ func NewHypergraphComparisonServer( type streamManager struct { ctx context.Context + cancel context.CancelFunc logger *zap.Logger stream HyperStream hypergraphStore store.HypergraphStore @@ -133,6 +364,12 @@ func (s *streamManager) sendLeafData( return nil } + select { + case <-s.ctx.Done(): + return s.ctx.Err() + default: + } + node := getNodeAtPath(s.localTree.Root, path, 0) leaf, ok := node.(*crypto.VectorCommitmentLeafNode) if !ok { @@ -142,7 +379,7 @@ func (s *streamManager) sendLeafData( continue } s.leavesSent++ - if s.leavesSent > 50000 { + if s.leavesSent > 100000 { return nil } @@ -431,7 +668,7 @@ func (s *streamManager) walk( incomingResponses <-chan *protobufs.HypergraphComparisonResponse, metadataOnly bool, ) error { - if s.leavesSent > 50000 { + if s.leavesSent > 100000 { return nil } @@ -752,18 +989,30 @@ func syncTreeBidirectionallyServer( ) } + ctx, cancel := context.WithCancel(stream.Context()) + incomingQueriesIn, incomingQueriesOut := - UnboundedChan[*protobufs.HypergraphComparisonQuery]("server incoming") + UnboundedChan[*protobufs.HypergraphComparisonQuery]( + cancel, + "server incoming", + ) incomingResponsesIn, incomingResponsesOut := - UnboundedChan[*protobufs.HypergraphComparisonResponse]("server incoming") + UnboundedChan[*protobufs.HypergraphComparisonResponse]( + cancel, + "server incoming", + ) incomingLeavesIn, incomingLeavesOut := - UnboundedChan[*protobufs.LeafData]("server incoming") + UnboundedChan[*protobufs.LeafData]( + cancel, + "server incoming", + ) go func() { for { msg, err := stream.Recv() if err == io.EOF { logger.Info("received disconnect") + cancel() close(incomingQueriesIn) close(incomingResponsesIn) close(incomingLeavesIn) @@ -771,6 +1020,7 @@ func syncTreeBidirectionallyServer( } if err != nil { logger.Info("received error", zap.Error(err)) + cancel() close(incomingQueriesIn) close(incomingResponsesIn) close(incomingLeavesIn) @@ -794,7 +1044,8 @@ func syncTreeBidirectionallyServer( wg.Add(1) manager := &streamManager{ - ctx: stream.Context(), + ctx: ctx, + cancel: cancel, logger: logger, stream: stream, hypergraphStore: localHypergraphStore, @@ -827,7 +1078,7 @@ outer: break outer } leavesReceived++ - if leavesReceived > 50000 { + if leavesReceived > 100000 { break outer } @@ -898,13 +1149,15 @@ outer: zap.String("root", hex.EncodeToString(roots[0])), ) + logger.Info("saving hypergraph") if err = localHypergraphStore.SaveHypergraph(localHypergraph); err != nil { logger.Error("error while saving", zap.Error(err)) } + logger.Info("hypergraph saved") total, _ := idSet.GetTree().GetMetadata() logger.Info( - "current progress", + "current progress, ready to resume connections", zap.Float32("percentage", float32(total*100)/float32(debugTotalCoins)), ) return nil @@ -919,14 +1172,18 @@ func (s *hypergraphComparisonServer) HyperStream( } defer s.syncController.EndSyncSession() - peerId, ok := grpc.PeerIDFromContext(stream.Context()) - if !ok { - return errors.New("could not identify peer") - } + var peerId peer.ID + var ok bool + if !s.isDetachedServer { + peerId, ok = grpc.PeerIDFromContext(stream.Context()) + if !ok { + return errors.New("could not identify peer") + } - status, ok := s.syncController.SyncStatus[peerId.String()] - if ok && time.Since(status.LastSynced) < 30*time.Minute { - return errors.New("peer too recently synced") + status, ok := s.syncController.SyncStatus[peerId.String()] + if ok && time.Since(status.LastSynced) < 30*time.Minute { + return errors.New("peer too recently synced") + } } err := syncTreeBidirectionallyServer( @@ -937,9 +1194,12 @@ func (s *hypergraphComparisonServer) HyperStream( false, s.debugTotalCoins, ) - s.syncController.SyncStatus[peerId.String()] = &SyncInfo{ - Unreachable: false, - LastSynced: time.Now(), + + if !s.isDetachedServer { + s.syncController.SyncStatus[peerId.String()] = &SyncInfo{ + Unreachable: false, + LastSynced: time.Now(), + } } return err @@ -1008,23 +1268,36 @@ func SyncTreeBidirectionally( return err } + ctx, cancel := context.WithCancel(stream.Context()) + incomingQueriesIn, incomingQueriesOut := - UnboundedChan[*protobufs.HypergraphComparisonQuery]("server incoming") + UnboundedChan[*protobufs.HypergraphComparisonQuery]( + cancel, + "client incoming", + ) incomingResponsesIn, incomingResponsesOut := - UnboundedChan[*protobufs.HypergraphComparisonResponse]("server incoming") + UnboundedChan[*protobufs.HypergraphComparisonResponse]( + cancel, + "client incoming", + ) incomingLeavesIn, incomingLeavesOut := - UnboundedChan[*protobufs.LeafData]("server incoming") + UnboundedChan[*protobufs.LeafData]( + cancel, + "client incoming", + ) go func() { for { msg, err := stream.Recv() if err == io.EOF { + cancel() close(incomingQueriesIn) close(incomingResponsesIn) close(incomingLeavesIn) return } if err != nil { + cancel() close(incomingQueriesIn) close(incomingResponsesIn) close(incomingLeavesIn) @@ -1048,7 +1321,8 @@ func SyncTreeBidirectionally( wg.Add(1) manager := &streamManager{ - ctx: stream.Context(), + ctx: ctx, + cancel: cancel, logger: logger, stream: stream, hypergraphStore: hypergraphStore, @@ -1083,7 +1357,7 @@ outer: } leavesReceived++ - if leavesReceived > 50000 { + if leavesReceived > 100000 { break outer } @@ -1156,7 +1430,10 @@ outer: return nil } -func UnboundedChan[T any](purpose string) (chan<- T, <-chan T) { +func UnboundedChan[T any]( + cancel context.CancelFunc, + purpose string, +) (chan<- T, <-chan T) { in := make(chan T) out := make(chan T) go func() { @@ -1171,6 +1448,7 @@ func UnboundedChan[T any](purpose string) (chan<- T, <-chan T) { select { case msg, ok := <-in: if !ok { + cancel() close(out) return } diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index 6c403c6..a38e698 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -253,7 +253,7 @@ func TestHypergraphSyncServer(t *testing.T) { ) protobufs.RegisterHypergraphComparisonServiceServer( grpcServer, - rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), numOperations), + rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), numOperations, false), ) log.Println("Server listening on :50051") go func() {