From d2048d0acf48e960ff2a50b083611add94a3e5c4 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Mon, 24 Feb 2025 03:00:02 -0600 Subject: [PATCH] add progress info, make sync a little smarter --- .../token/token_execution_engine.go | 60 +++++++- node/protobufs/application.pb.go | 141 ++++++++++-------- node/protobufs/application.proto | 2 + node/rpc/hypergraph_sync_rpc_server.go | 131 ++++++++-------- node/rpc/hypergraph_sync_rpc_server_test.go | 8 +- 5 files changed, 202 insertions(+), 140 deletions(-) diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index aef848d..39f511a 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -369,9 +369,44 @@ func NewTokenExecutionEngine( e.proverPublicKey = publicKeyBytes e.provingKeyAddress = provingKeyAddress + // debug carveout for M5 testing + iter, err := e.coinStore.RangeCoins( + []byte{0x00}, + []byte{0xff}, + ) + if err != nil { + panic(err) + } + + totalCoins := 0 + specificRange := 0 + if e.engineConfig.RebuildStart == "" { + e.engineConfig.RebuildStart = "0000000000000000000000000000000000000000000000000000000000000000" + } + if e.engineConfig.RebuildEnd == "" { + e.engineConfig.RebuildEnd = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + } + start, err := hex.DecodeString(e.engineConfig.RebuildStart) + if err != nil { + panic(err) + } + end, err := hex.DecodeString(e.engineConfig.RebuildEnd) + if err != nil { + panic(err) + } + + for iter.First(); iter.Valid(); iter.Next() { + if bytes.Compare(iter.Key()[2:], start) >= 0 && bytes.Compare(iter.Key()[2:], end) < 0 { + specificRange++ + } + totalCoins++ + } + iter.Close() + // end debug carveout for M5 testing + _, _, err = e.clockStore.GetLatestDataClockFrame(e.intrinsicFilter) if err != nil { - e.rebuildHypergraph() + e.rebuildHypergraph(specificRange) } else { e.hypergraph, err = e.hypergraphStore.LoadHypergraph() if err != nil && !errors.Is(err, store.ErrNotFound) { @@ -382,7 +417,7 @@ func NewTokenExecutionEngine( } if e.hypergraph == nil || len(e.hypergraph.GetVertexAdds()) == 0 { - e.rebuildHypergraph() + e.rebuildHypergraph(specificRange) } } @@ -396,6 +431,7 @@ func NewTokenExecutionEngine( e.hypergraphStore, e.hypergraph, e.syncController, + totalCoins, ) protobufs.RegisterHypergraphComparisonServiceServer(syncServer, hyperSync) go func() { @@ -412,7 +448,7 @@ func NewTokenExecutionEngine( for { select { case <-gotime.After(5 * gotime.Second): - e.hyperSync() + e.hyperSync(totalCoins) case <-e.ctx.Done(): return } @@ -578,7 +614,12 @@ func (e *TokenExecutionEngine) addBatchToHypergraph(batchKey [][]byte, batchValu } } -func (e *TokenExecutionEngine) hyperSync() { +func (e *TokenExecutionEngine) hyperSync(totalCoins int) { + if !e.syncController.TryEstablishSyncSession() { + return + } + defer e.syncController.EndSyncSession() + peerId, err := e.pubSub.GetRandomPeer( append([]byte{0x00}, e.intrinsicFilter...), ) @@ -624,8 +665,9 @@ func (e *TokenExecutionEngine) hyperSync() { append(append([]byte{}, key.L1[:]...), key.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, e.hypergraphStore, - set.GetTree(), + set, e.syncController, + totalCoins, false, ) if err != nil { @@ -645,7 +687,7 @@ func (e *TokenExecutionEngine) hyperSync() { } } -func (e *TokenExecutionEngine) rebuildHypergraph() { +func (e *TokenExecutionEngine) rebuildHypergraph(totalRange int) { e.logger.Info("rebuilding hypergraph") e.hypergraph = hypergraph.NewHypergraph() if e.engineConfig.RebuildStart == "" { @@ -670,7 +712,9 @@ func (e *TokenExecutionEngine) rebuildHypergraph() { panic(err) } var batchKey, batchValue [][]byte + processed := 0 for iter.First(); iter.Valid(); iter.Next() { + processed++ key := make([]byte, len(iter.Key()[2:])) copy(key, iter.Key()[2:]) batchKey = append(batchKey, key) @@ -694,6 +738,10 @@ func (e *TokenExecutionEngine) rebuildHypergraph() { if len(batchKey) == runtime.NumCPU() { e.addBatchToHypergraph(batchKey, batchValue) + e.logger.Info( + "processed batch", + zap.Float32("percentage", float32(processed)/float32(totalRange)), + ) batchKey = [][]byte{} batchValue = [][]byte{} } diff --git a/node/protobufs/application.pb.go b/node/protobufs/application.pb.go index 2458c52..1d0d963 100644 --- a/node/protobufs/application.pb.go +++ b/node/protobufs/application.pb.go @@ -482,6 +482,8 @@ type HypergraphComparisonResponse struct { // (For instance, if the node is a branch you may want to send the commitment // for each child.) Children []*BranchChild `protobuf:"bytes,3,rep,name=children,proto3" json:"children,omitempty"` + // If set, indicates the response is for the root of the given set tree. + IsRoot bool `protobuf:"varint,4,opt,name=is_root,json=isRoot,proto3" json:"is_root,omitempty"` } func (x *HypergraphComparisonResponse) Reset() { @@ -537,6 +539,13 @@ func (x *HypergraphComparisonResponse) GetChildren() []*BranchChild { return nil } +func (x *HypergraphComparisonResponse) GetIsRoot() bool { + if x != nil { + return x.IsRoot + } + return false +} + // Defines the child commitment. type BranchChild struct { state protoimpl.MessageState @@ -826,7 +835,7 @@ var file_application_proto_rawDesc = []byte{ 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x12, 0x2a, 0x0a, 0x11, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x5f, 0x6c, 0x65, 0x61, 0x66, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x4c, 0x65, 0x61, 0x66, 0x44, - 0x61, 0x74, 0x61, 0x22, 0x9b, 0x01, 0x0a, 0x1c, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, + 0x61, 0x74, 0x61, 0x22, 0xb4, 0x01, 0x0a, 0x1c, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x03, 0x28, 0x05, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, @@ -836,71 +845,73 @@ var file_application_proto_rawDesc = []byte{ 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, - 0x6e, 0x22, 0x43, 0x0a, 0x0b, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x43, 0x68, 0x69, 0x6c, 0x64, - 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, - 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, - 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, - 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x90, 0x01, 0x0a, 0x08, 0x4c, 0x65, 0x61, 0x66, 0x44, - 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x68, - 0x61, 0x73, 0x68, 0x5f, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x0a, 0x68, 0x61, 0x73, 0x68, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, - 0x73, 0x69, 0x7a, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, - 0x12, 0x27, 0x0a, 0x0f, 0x75, 0x6e, 0x64, 0x65, 0x72, 0x6c, 0x79, 0x69, 0x6e, 0x67, 0x5f, 0x64, - 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x75, 0x6e, 0x64, 0x65, 0x72, - 0x6c, 0x79, 0x69, 0x6e, 0x67, 0x44, 0x61, 0x74, 0x61, 0x22, 0x99, 0x02, 0x0a, 0x14, 0x48, 0x79, - 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, - 0x6f, 0x6e, 0x12, 0x51, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x39, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, - 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, - 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, - 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x48, 0x00, 0x52, 0x05, - 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x5a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3c, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, - 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, - 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x47, 0x0a, 0x09, 0x6c, 0x65, 0x61, 0x66, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, - 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x4c, 0x65, 0x61, 0x66, 0x44, 0x61, 0x74, 0x61, 0x48, 0x00, - 0x52, 0x08, 0x6c, 0x65, 0x61, 0x66, 0x44, 0x61, 0x74, 0x61, 0x42, 0x09, 0x0a, 0x07, 0x70, 0x61, - 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2a, 0x76, 0x0a, 0x10, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, - 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x1f, 0x0a, 0x1b, 0x45, 0x58, 0x45, - 0x43, 0x55, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x5f, 0x49, - 0x4e, 0x54, 0x52, 0x49, 0x4e, 0x53, 0x49, 0x43, 0x10, 0x00, 0x12, 0x20, 0x0a, 0x1c, 0x45, 0x58, - 0x45, 0x43, 0x55, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x5f, - 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x10, 0x01, 0x12, 0x1f, 0x0a, 0x1b, - 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, - 0x54, 0x5f, 0x45, 0x58, 0x54, 0x52, 0x49, 0x4e, 0x53, 0x49, 0x43, 0x10, 0x02, 0x2a, 0xb8, 0x01, - 0x0a, 0x12, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x50, 0x68, 0x61, 0x73, - 0x65, 0x53, 0x65, 0x74, 0x12, 0x24, 0x0a, 0x20, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, - 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x56, 0x45, 0x52, - 0x54, 0x45, 0x58, 0x5f, 0x41, 0x44, 0x44, 0x53, 0x10, 0x00, 0x12, 0x27, 0x0a, 0x23, 0x48, 0x59, - 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, - 0x45, 0x54, 0x5f, 0x56, 0x45, 0x52, 0x54, 0x45, 0x58, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, - 0x53, 0x10, 0x01, 0x12, 0x27, 0x0a, 0x23, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, - 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x48, 0x59, 0x50, 0x45, - 0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, 0x41, 0x44, 0x44, 0x53, 0x10, 0x02, 0x12, 0x2a, 0x0a, 0x26, - 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, - 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x48, 0x59, 0x50, 0x45, 0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, 0x52, - 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x53, 0x10, 0x03, 0x32, 0x9c, 0x01, 0x0a, 0x1b, 0x48, 0x79, 0x70, - 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, - 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7d, 0x0a, 0x0b, 0x48, 0x79, 0x70, 0x65, - 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x34, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, - 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, - 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x1a, 0x34, 0x2e, + 0x6e, 0x12, 0x17, 0x0a, 0x07, 0x69, 0x73, 0x5f, 0x72, 0x6f, 0x6f, 0x74, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x52, 0x6f, 0x6f, 0x74, 0x22, 0x43, 0x0a, 0x0b, 0x42, 0x72, + 0x61, 0x6e, 0x63, 0x68, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, + 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, + 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x22, + 0x90, 0x01, 0x0a, 0x08, 0x4c, 0x65, 0x61, 0x66, 0x44, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, + 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, + 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x68, 0x61, 0x73, 0x68, 0x5f, 0x74, 0x61, 0x72, + 0x67, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x68, 0x61, 0x73, 0x68, 0x54, + 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x75, 0x6e, 0x64, + 0x65, 0x72, 0x6c, 0x79, 0x69, 0x6e, 0x67, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x0e, 0x75, 0x6e, 0x64, 0x65, 0x72, 0x6c, 0x79, 0x69, 0x6e, 0x67, 0x44, 0x61, + 0x74, 0x61, 0x22, 0x99, 0x02, 0x0a, 0x14, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, + 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x12, 0x51, 0x0a, 0x05, 0x71, + 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x39, 0x2e, 0x71, 0x75, 0x69, + 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, + 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, + 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, + 0x51, 0x75, 0x65, 0x72, 0x79, 0x48, 0x00, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x5a, + 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x3c, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, + 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, + 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, + 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, + 0x52, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, 0x0a, 0x09, 0x6c, 0x65, + 0x61, 0x66, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, - 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, - 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, - 0x73, 0x6f, 0x6e, 0x28, 0x01, 0x30, 0x01, 0x42, 0x3a, 0x5a, 0x38, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x6d, 0x6f, 0x6e, 0x6f, - 0x72, 0x65, 0x70, 0x6f, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x4c, + 0x65, 0x61, 0x66, 0x44, 0x61, 0x74, 0x61, 0x48, 0x00, 0x52, 0x08, 0x6c, 0x65, 0x61, 0x66, 0x44, + 0x61, 0x74, 0x61, 0x42, 0x09, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2a, 0x76, + 0x0a, 0x10, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65, + 0x78, 0x74, 0x12, 0x1f, 0x0a, 0x1b, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x49, 0x4f, 0x4e, 0x5f, + 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x5f, 0x49, 0x4e, 0x54, 0x52, 0x49, 0x4e, 0x53, 0x49, + 0x43, 0x10, 0x00, 0x12, 0x20, 0x0a, 0x1c, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x49, 0x4f, 0x4e, + 0x5f, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x5f, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, + 0x41, 0x50, 0x48, 0x10, 0x01, 0x12, 0x1f, 0x0a, 0x1b, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x49, + 0x4f, 0x4e, 0x5f, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x5f, 0x45, 0x58, 0x54, 0x52, 0x49, + 0x4e, 0x53, 0x49, 0x43, 0x10, 0x02, 0x2a, 0xb8, 0x01, 0x0a, 0x12, 0x48, 0x79, 0x70, 0x65, 0x72, + 0x67, 0x72, 0x61, 0x70, 0x68, 0x50, 0x68, 0x61, 0x73, 0x65, 0x53, 0x65, 0x74, 0x12, 0x24, 0x0a, + 0x20, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, + 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x56, 0x45, 0x52, 0x54, 0x45, 0x58, 0x5f, 0x41, 0x44, 0x44, + 0x53, 0x10, 0x00, 0x12, 0x27, 0x0a, 0x23, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, + 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x56, 0x45, 0x52, 0x54, + 0x45, 0x58, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x53, 0x10, 0x01, 0x12, 0x27, 0x0a, 0x23, + 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, + 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x48, 0x59, 0x50, 0x45, 0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, 0x41, + 0x44, 0x44, 0x53, 0x10, 0x02, 0x12, 0x2a, 0x0a, 0x26, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, + 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x48, 0x59, + 0x50, 0x45, 0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x53, 0x10, + 0x03, 0x32, 0x9c, 0x01, 0x0a, 0x1b, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, + 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x12, 0x7d, 0x0a, 0x0b, 0x48, 0x79, 0x70, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x12, 0x34, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, + 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, + 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, + 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x1a, 0x34, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, + 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, + 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x28, 0x01, 0x30, 0x01, + 0x42, 0x3a, 0x5a, 0x38, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, + 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, + 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x6d, 0x6f, 0x6e, 0x6f, 0x72, 0x65, 0x70, 0x6f, 0x2f, 0x6e, 0x6f, + 0x64, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x73, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/node/protobufs/application.proto b/node/protobufs/application.proto index 92d2736..5cf7c6f 100644 --- a/node/protobufs/application.proto +++ b/node/protobufs/application.proto @@ -82,6 +82,8 @@ message HypergraphComparisonResponse { // (For instance, if the node is a branch you may want to send the commitment // for each child.) repeated BranchChild children = 3; + // If set, indicates the response is for the root of the given set tree. + bool is_root = 4; } // Defines the child commitment. diff --git a/node/rpc/hypergraph_sync_rpc_server.go b/node/rpc/hypergraph_sync_rpc_server.go index 9afbfd0..c25402b 100644 --- a/node/rpc/hypergraph_sync_rpc_server.go +++ b/node/rpc/hypergraph_sync_rpc_server.go @@ -6,7 +6,6 @@ import ( "encoding/hex" "fmt" "io" - "math/big" "sync/atomic" "time" @@ -44,6 +43,7 @@ type hypergraphComparisonServer struct { localHypergraphStore store.HypergraphStore localHypergraph *hypergraph.Hypergraph syncController *SyncController + debugTotalCoins int } func NewHypergraphComparisonServer( @@ -51,12 +51,14 @@ func NewHypergraphComparisonServer( hypergraphStore store.HypergraphStore, hypergraph *hypergraph.Hypergraph, syncController *SyncController, + debugTotalCoins int, ) *hypergraphComparisonServer { return &hypergraphComparisonServer{ logger: logger, localHypergraphStore: hypergraphStore, localHypergraph: hypergraph, syncController: syncController, + debugTotalCoins: debugTotalCoins, } } @@ -202,6 +204,7 @@ func getBranchInfoFromTree(tree *crypto.VectorCommitmentTree, path []int32) ( branchInfo := &protobufs.HypergraphComparisonResponse{ Path: path, Commitment: commitment, + IsRoot: len(path) == 0, } if branch, ok := node.(*crypto.VectorCommitmentBranchNode); ok { @@ -299,6 +302,7 @@ func syncTreeBidirectionallyServer( localHypergraphStore store.HypergraphStore, localHypergraph *hypergraph.Hypergraph, metadataOnly bool, + debugTotalCoins int, ) error { msg, err := stream.Recv() if err != nil { @@ -340,27 +344,14 @@ func syncTreeBidirectionallyServer( return errors.New("server does not have phase set") } - // Send our root branch info. - rootPath := []int32{} - rootInfo, err := getBranchInfoFromTree(idSet.GetTree(), rootPath) - if err != nil { - return err - } - if err := stream.Send(&protobufs.HypergraphComparison{ - Payload: &protobufs.HypergraphComparison_Response{ - Response: rootInfo, - }, - }); err != nil { - return err - } - requested := map[string]struct{}{} sent := map[string]struct{}{} - inserts := []*protobufs.LeafData{} pendingIn, pendingOut := UnboundedChan[[]int32]("server pending") - pendingIn <- rootPath + pendingIn <- []int32{} - incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison]("server incoming") + incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison]( + "server incoming", + ) go func() { for { msg, err := stream.Recv() @@ -380,6 +371,9 @@ func syncTreeBidirectionallyServer( } }() + knownRemoteCommitment := []byte{} + previousKnownRemoteCommitment := []byte{} + outer: for { select { @@ -418,6 +412,12 @@ outer: idSet.GetTree(), remoteInfo.Path, ) + + if remoteInfo.IsRoot { + previousKnownRemoteCommitment = knownRemoteCommitment + knownRemoteCommitment = remoteInfo.Commitment + } + if err != nil { logger.Info( "server requesting missing node", @@ -586,18 +586,25 @@ outer: } } - inserts = append(inserts, remoteUpdate) + idSet.Add(hypergraph.AtomFromBytes(remoteUpdate.Value)) } case <-time.After(5 * time.Second): - logger.Info("server timed out") - break outer + commitment := idSet.GetTree().Commit(false) + if !bytes.Equal(commitment, knownRemoteCommitment) && + !bytes.Equal(knownRemoteCommitment, previousKnownRemoteCommitment) && + !bytes.Equal(knownRemoteCommitment, []byte{}) { + time.Sleep(1 * time.Second) + requested = map[string]struct{}{} + sent = map[string]struct{}{} + pendingIn <- []int32{} + } else { + break outer + } } } - for _, remoteUpdate := range inserts { - idSet.Add(hypergraph.AtomFromBytes(remoteUpdate.Value)) - } - + total, _ := idSet.GetTree().GetMetadata() + logger.Info("current progress", zap.Float32("percentage", float32(total*100)/float32(debugTotalCoins))) return nil } @@ -616,6 +623,7 @@ func (s *hypergraphComparisonServer) HyperStream( s.localHypergraphStore, s.localHypergraph, false, + s.debugTotalCoins, ) } @@ -629,15 +637,11 @@ func SyncTreeBidirectionally( shardKey []byte, phaseSet protobufs.HypergraphPhaseSet, hypergraphStore store.HypergraphStore, - localTree *crypto.VectorCommitmentTree, + set *hypergraph.IdSet, syncController *SyncController, + debugTotalCoins int, metadataOnly bool, ) error { - if !syncController.TryEstablishSyncSession() { - return errors.New("unavailable") - } - defer syncController.EndSyncSession() - logger.Info( "sending initialization message", zap.String("shard_key", hex.EncodeToString(shardKey)), @@ -649,7 +653,7 @@ func SyncTreeBidirectionally( ShardKey: shardKey, PhaseSet: phaseSet, Path: []int32{}, - Commitment: localTree.Commit(false), + Commitment: set.GetTree().Commit(false), IncludeLeafData: false, }, }, @@ -657,26 +661,14 @@ func SyncTreeBidirectionally( return err } - rootPath := []int32{} - rootInfo, err := getBranchInfoFromTree(localTree, rootPath) - if err != nil { - return err - } - if err := stream.Send(&protobufs.HypergraphComparison{ - Payload: &protobufs.HypergraphComparison_Response{ - Response: rootInfo, - }, - }); err != nil { - return err - } - requested := map[string]struct{}{} sent := map[string]struct{}{} - inserts := []*protobufs.LeafData{} pendingIn, pendingOut := UnboundedChan[[]int32]("client pending") pendingIn <- []int32{} - incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison]("client incoming") + incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison]( + "client incoming", + ) go func() { for { msg, err := stream.Recv() @@ -695,6 +687,9 @@ func SyncTreeBidirectionally( } }() + knownRemoteCommitment := []byte{} + previousKnownRemoteCommitment := []byte{} + outer: for { select { @@ -725,7 +720,13 @@ outer: "handling response", zap.String("path", hex.EncodeToString(packNibbles(remoteInfo.Path))), ) - localInfo, err := getBranchInfoFromTree(localTree, remoteInfo.Path) + + if remoteInfo.IsRoot { + previousKnownRemoteCommitment = knownRemoteCommitment + knownRemoteCommitment = remoteInfo.Commitment + } + + localInfo, err := getBranchInfoFromTree(set.GetTree(), remoteInfo.Path) if err != nil { logger.Info( "requesting missing node", @@ -771,7 +772,7 @@ outer: stream, hypergraphStore, sent, - localTree, + set.GetTree(), remoteInfo.Path, metadataOnly, ); err != nil { @@ -826,7 +827,7 @@ outer: stream, hypergraphStore, sent, - localTree, + set.GetTree(), queryPath, metadataOnly, ); err != nil { @@ -840,7 +841,7 @@ outer: hex.EncodeToString(packNibbles(queryPath)), ), ) - branchInfo, err := getBranchInfoFromTree(localTree, queryPath) + branchInfo, err := getBranchInfoFromTree(set.GetTree(), queryPath) if err != nil { continue } @@ -888,25 +889,25 @@ outer: } } - inserts = append(inserts, remoteUpdate) + set.Add(hypergraph.AtomFromBytes(remoteUpdate.Value)) } case <-time.After(5 * time.Second): - logger.Info("timed out") - break outer + commitment := set.GetTree().Commit(false) + if !bytes.Equal(commitment, knownRemoteCommitment) && + !bytes.Equal(knownRemoteCommitment, previousKnownRemoteCommitment) && + !bytes.Equal(knownRemoteCommitment, []byte{}) { + time.Sleep(1 * time.Second) + requested = map[string]struct{}{} + sent = map[string]struct{}{} + pendingIn <- []int32{} + } else { + break outer + } } } - for _, remoteUpdate := range inserts { - size := new(big.Int).SetBytes(remoteUpdate.Size) - - localTree.Insert( - remoteUpdate.Key, - remoteUpdate.Value, - remoteUpdate.HashTarget, - size, - ) - } - + total, _ := set.GetTree().GetMetadata() + logger.Info("current progress", zap.Float32("percentage", float32(total*100)/float32(debugTotalCoins))) return nil } diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index aada06d..3aae769 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -220,7 +220,7 @@ func TestHypergraphSyncServer(t *testing.T) { grpcServer := grpc.NewServer() protobufs.RegisterHypergraphComparisonServiceServer( grpcServer, - rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController()), + rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), 10000), ) log.Println("Server listening on :50051") go func() { @@ -240,7 +240,7 @@ func TestHypergraphSyncServer(t *testing.T) { syncController := rpc.NewSyncController() - err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), syncController, false) + err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, 10000, false) if err != nil { log.Fatalf("Client: failed to sync 1: %v", err) } @@ -249,7 +249,7 @@ func TestHypergraphSyncServer(t *testing.T) { crdts[0].GetVertexAdds()[shardKey].GetTree(), crdts[1].GetVertexAdds()[shardKey].GetTree(), ) - fmt.Println(len(leaves)) + fmt.Println("pass completed, orphans:", len(leaves)) crdts[0].GetVertexAdds()[shardKey].GetTree().Commit(false) crdts[1].GetVertexAdds()[shardKey].GetTree().Commit(false) @@ -259,7 +259,7 @@ func TestHypergraphSyncServer(t *testing.T) { log.Fatalf("Client: failed to stream: %v", err) } - err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), syncController, false) + err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, 10000, false) if err != nil { log.Fatalf("Client: failed to sync 2: %v", err) }