add progress info, make sync a little smarter

This commit is contained in:
Cassandra Heart 2025-02-24 03:00:02 -06:00
parent 6f2839083a
commit d2048d0acf
No known key found for this signature in database
GPG Key ID: 6352152859385958
5 changed files with 202 additions and 140 deletions

View File

@ -369,9 +369,44 @@ func NewTokenExecutionEngine(
e.proverPublicKey = publicKeyBytes
e.provingKeyAddress = provingKeyAddress
// debug carveout for M5 testing
iter, err := e.coinStore.RangeCoins(
[]byte{0x00},
[]byte{0xff},
)
if err != nil {
panic(err)
}
totalCoins := 0
specificRange := 0
if e.engineConfig.RebuildStart == "" {
e.engineConfig.RebuildStart = "0000000000000000000000000000000000000000000000000000000000000000"
}
if e.engineConfig.RebuildEnd == "" {
e.engineConfig.RebuildEnd = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
}
start, err := hex.DecodeString(e.engineConfig.RebuildStart)
if err != nil {
panic(err)
}
end, err := hex.DecodeString(e.engineConfig.RebuildEnd)
if err != nil {
panic(err)
}
for iter.First(); iter.Valid(); iter.Next() {
if bytes.Compare(iter.Key()[2:], start) >= 0 && bytes.Compare(iter.Key()[2:], end) < 0 {
specificRange++
}
totalCoins++
}
iter.Close()
// end debug carveout for M5 testing
_, _, err = e.clockStore.GetLatestDataClockFrame(e.intrinsicFilter)
if err != nil {
e.rebuildHypergraph()
e.rebuildHypergraph(specificRange)
} else {
e.hypergraph, err = e.hypergraphStore.LoadHypergraph()
if err != nil && !errors.Is(err, store.ErrNotFound) {
@ -382,7 +417,7 @@ func NewTokenExecutionEngine(
}
if e.hypergraph == nil || len(e.hypergraph.GetVertexAdds()) == 0 {
e.rebuildHypergraph()
e.rebuildHypergraph(specificRange)
}
}
@ -396,6 +431,7 @@ func NewTokenExecutionEngine(
e.hypergraphStore,
e.hypergraph,
e.syncController,
totalCoins,
)
protobufs.RegisterHypergraphComparisonServiceServer(syncServer, hyperSync)
go func() {
@ -412,7 +448,7 @@ func NewTokenExecutionEngine(
for {
select {
case <-gotime.After(5 * gotime.Second):
e.hyperSync()
e.hyperSync(totalCoins)
case <-e.ctx.Done():
return
}
@ -578,7 +614,12 @@ func (e *TokenExecutionEngine) addBatchToHypergraph(batchKey [][]byte, batchValu
}
}
func (e *TokenExecutionEngine) hyperSync() {
func (e *TokenExecutionEngine) hyperSync(totalCoins int) {
if !e.syncController.TryEstablishSyncSession() {
return
}
defer e.syncController.EndSyncSession()
peerId, err := e.pubSub.GetRandomPeer(
append([]byte{0x00}, e.intrinsicFilter...),
)
@ -624,8 +665,9 @@ func (e *TokenExecutionEngine) hyperSync() {
append(append([]byte{}, key.L1[:]...), key.L2[:]...),
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
e.hypergraphStore,
set.GetTree(),
set,
e.syncController,
totalCoins,
false,
)
if err != nil {
@ -645,7 +687,7 @@ func (e *TokenExecutionEngine) hyperSync() {
}
}
func (e *TokenExecutionEngine) rebuildHypergraph() {
func (e *TokenExecutionEngine) rebuildHypergraph(totalRange int) {
e.logger.Info("rebuilding hypergraph")
e.hypergraph = hypergraph.NewHypergraph()
if e.engineConfig.RebuildStart == "" {
@ -670,7 +712,9 @@ func (e *TokenExecutionEngine) rebuildHypergraph() {
panic(err)
}
var batchKey, batchValue [][]byte
processed := 0
for iter.First(); iter.Valid(); iter.Next() {
processed++
key := make([]byte, len(iter.Key()[2:]))
copy(key, iter.Key()[2:])
batchKey = append(batchKey, key)
@ -694,6 +738,10 @@ func (e *TokenExecutionEngine) rebuildHypergraph() {
if len(batchKey) == runtime.NumCPU() {
e.addBatchToHypergraph(batchKey, batchValue)
e.logger.Info(
"processed batch",
zap.Float32("percentage", float32(processed)/float32(totalRange)),
)
batchKey = [][]byte{}
batchValue = [][]byte{}
}

View File

@ -482,6 +482,8 @@ type HypergraphComparisonResponse struct {
// (For instance, if the node is a branch you may want to send the commitment
// for each child.)
Children []*BranchChild `protobuf:"bytes,3,rep,name=children,proto3" json:"children,omitempty"`
// If set, indicates the response is for the root of the given set tree.
IsRoot bool `protobuf:"varint,4,opt,name=is_root,json=isRoot,proto3" json:"is_root,omitempty"`
}
func (x *HypergraphComparisonResponse) Reset() {
@ -537,6 +539,13 @@ func (x *HypergraphComparisonResponse) GetChildren() []*BranchChild {
return nil
}
func (x *HypergraphComparisonResponse) GetIsRoot() bool {
if x != nil {
return x.IsRoot
}
return false
}
// Defines the child commitment.
type BranchChild struct {
state protoimpl.MessageState
@ -826,7 +835,7 @@ var file_application_proto_rawDesc = []byte{
0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x12, 0x2a, 0x0a, 0x11, 0x69, 0x6e, 0x63, 0x6c, 0x75,
0x64, 0x65, 0x5f, 0x6c, 0x65, 0x61, 0x66, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 0x01,
0x28, 0x08, 0x52, 0x0f, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x4c, 0x65, 0x61, 0x66, 0x44,
0x61, 0x74, 0x61, 0x22, 0x9b, 0x01, 0x0a, 0x1c, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61,
0x61, 0x74, 0x61, 0x22, 0xb4, 0x01, 0x0a, 0x1c, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61,
0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x03,
0x28, 0x05, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d,
@ -836,71 +845,73 @@ var file_application_proto_rawDesc = []byte{
0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70,
0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x72, 0x61, 0x6e,
0x63, 0x68, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65,
0x6e, 0x22, 0x43, 0x0a, 0x0b, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x43, 0x68, 0x69, 0x6c, 0x64,
0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52,
0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74,
0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x6d,
0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x90, 0x01, 0x0a, 0x08, 0x4c, 0x65, 0x61, 0x66, 0x44,
0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x68,
0x61, 0x73, 0x68, 0x5f, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x0a, 0x68, 0x61, 0x73, 0x68, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04,
0x73, 0x69, 0x7a, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65,
0x12, 0x27, 0x0a, 0x0f, 0x75, 0x6e, 0x64, 0x65, 0x72, 0x6c, 0x79, 0x69, 0x6e, 0x67, 0x5f, 0x64,
0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x75, 0x6e, 0x64, 0x65, 0x72,
0x6c, 0x79, 0x69, 0x6e, 0x67, 0x44, 0x61, 0x74, 0x61, 0x22, 0x99, 0x02, 0x0a, 0x14, 0x48, 0x79,
0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73,
0x6f, 0x6e, 0x12, 0x51, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x39, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e,
0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e,
0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d,
0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x48, 0x00, 0x52, 0x05,
0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x5a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3c, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62,
0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72,
0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x12, 0x47, 0x0a, 0x09, 0x6c, 0x65, 0x61, 0x66, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75,
0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x4c, 0x65, 0x61, 0x66, 0x44, 0x61, 0x74, 0x61, 0x48, 0x00,
0x52, 0x08, 0x6c, 0x65, 0x61, 0x66, 0x44, 0x61, 0x74, 0x61, 0x42, 0x09, 0x0a, 0x07, 0x70, 0x61,
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2a, 0x76, 0x0a, 0x10, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69,
0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x1f, 0x0a, 0x1b, 0x45, 0x58, 0x45,
0x43, 0x55, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x5f, 0x49,
0x4e, 0x54, 0x52, 0x49, 0x4e, 0x53, 0x49, 0x43, 0x10, 0x00, 0x12, 0x20, 0x0a, 0x1c, 0x45, 0x58,
0x45, 0x43, 0x55, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x5f,
0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x10, 0x01, 0x12, 0x1f, 0x0a, 0x1b,
0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58,
0x54, 0x5f, 0x45, 0x58, 0x54, 0x52, 0x49, 0x4e, 0x53, 0x49, 0x43, 0x10, 0x02, 0x2a, 0xb8, 0x01,
0x0a, 0x12, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x50, 0x68, 0x61, 0x73,
0x65, 0x53, 0x65, 0x74, 0x12, 0x24, 0x0a, 0x20, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41,
0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x56, 0x45, 0x52,
0x54, 0x45, 0x58, 0x5f, 0x41, 0x44, 0x44, 0x53, 0x10, 0x00, 0x12, 0x27, 0x0a, 0x23, 0x48, 0x59,
0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53,
0x45, 0x54, 0x5f, 0x56, 0x45, 0x52, 0x54, 0x45, 0x58, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45,
0x53, 0x10, 0x01, 0x12, 0x27, 0x0a, 0x23, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50,
0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x48, 0x59, 0x50, 0x45,
0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, 0x41, 0x44, 0x44, 0x53, 0x10, 0x02, 0x12, 0x2a, 0x0a, 0x26,
0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45,
0x5f, 0x53, 0x45, 0x54, 0x5f, 0x48, 0x59, 0x50, 0x45, 0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, 0x52,
0x45, 0x4d, 0x4f, 0x56, 0x45, 0x53, 0x10, 0x03, 0x32, 0x9c, 0x01, 0x0a, 0x1b, 0x48, 0x79, 0x70,
0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f,
0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7d, 0x0a, 0x0b, 0x48, 0x79, 0x70, 0x65,
0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x34, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62,
0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72,
0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x1a, 0x34, 0x2e,
0x6e, 0x12, 0x17, 0x0a, 0x07, 0x69, 0x73, 0x5f, 0x72, 0x6f, 0x6f, 0x74, 0x18, 0x04, 0x20, 0x01,
0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x52, 0x6f, 0x6f, 0x74, 0x22, 0x43, 0x0a, 0x0b, 0x42, 0x72,
0x61, 0x6e, 0x63, 0x68, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64,
0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12,
0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20,
0x01, 0x28, 0x0c, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x22,
0x90, 0x01, 0x0a, 0x08, 0x4c, 0x65, 0x61, 0x66, 0x44, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03,
0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14,
0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76,
0x61, 0x6c, 0x75, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x68, 0x61, 0x73, 0x68, 0x5f, 0x74, 0x61, 0x72,
0x67, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x68, 0x61, 0x73, 0x68, 0x54,
0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x04, 0x20,
0x01, 0x28, 0x0c, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x75, 0x6e, 0x64,
0x65, 0x72, 0x6c, 0x79, 0x69, 0x6e, 0x67, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x0e, 0x75, 0x6e, 0x64, 0x65, 0x72, 0x6c, 0x79, 0x69, 0x6e, 0x67, 0x44, 0x61,
0x74, 0x61, 0x22, 0x99, 0x02, 0x0a, 0x14, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70,
0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x12, 0x51, 0x0a, 0x05, 0x71,
0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x39, 0x2e, 0x71, 0x75, 0x69,
0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70,
0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65,
0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e,
0x51, 0x75, 0x65, 0x72, 0x79, 0x48, 0x00, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x5a,
0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x3c, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f,
0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70,
0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70,
0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00,
0x52, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, 0x0a, 0x09, 0x6c, 0x65,
0x61, 0x66, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e,
0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e,
0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48,
0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69,
0x73, 0x6f, 0x6e, 0x28, 0x01, 0x30, 0x01, 0x42, 0x3a, 0x5a, 0x38, 0x73, 0x6f, 0x75, 0x72, 0x63,
0x65, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x6d, 0x6f, 0x6e, 0x6f,
0x72, 0x65, 0x70, 0x6f, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x4c,
0x65, 0x61, 0x66, 0x44, 0x61, 0x74, 0x61, 0x48, 0x00, 0x52, 0x08, 0x6c, 0x65, 0x61, 0x66, 0x44,
0x61, 0x74, 0x61, 0x42, 0x09, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2a, 0x76,
0x0a, 0x10, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65,
0x78, 0x74, 0x12, 0x1f, 0x0a, 0x1b, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x49, 0x4f, 0x4e, 0x5f,
0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x5f, 0x49, 0x4e, 0x54, 0x52, 0x49, 0x4e, 0x53, 0x49,
0x43, 0x10, 0x00, 0x12, 0x20, 0x0a, 0x1c, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x49, 0x4f, 0x4e,
0x5f, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x5f, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52,
0x41, 0x50, 0x48, 0x10, 0x01, 0x12, 0x1f, 0x0a, 0x1b, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x49,
0x4f, 0x4e, 0x5f, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x5f, 0x45, 0x58, 0x54, 0x52, 0x49,
0x4e, 0x53, 0x49, 0x43, 0x10, 0x02, 0x2a, 0xb8, 0x01, 0x0a, 0x12, 0x48, 0x79, 0x70, 0x65, 0x72,
0x67, 0x72, 0x61, 0x70, 0x68, 0x50, 0x68, 0x61, 0x73, 0x65, 0x53, 0x65, 0x74, 0x12, 0x24, 0x0a,
0x20, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53,
0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x56, 0x45, 0x52, 0x54, 0x45, 0x58, 0x5f, 0x41, 0x44, 0x44,
0x53, 0x10, 0x00, 0x12, 0x27, 0x0a, 0x23, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50,
0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x56, 0x45, 0x52, 0x54,
0x45, 0x58, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x53, 0x10, 0x01, 0x12, 0x27, 0x0a, 0x23,
0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45,
0x5f, 0x53, 0x45, 0x54, 0x5f, 0x48, 0x59, 0x50, 0x45, 0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, 0x41,
0x44, 0x44, 0x53, 0x10, 0x02, 0x12, 0x2a, 0x0a, 0x26, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52,
0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x48, 0x59,
0x50, 0x45, 0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x53, 0x10,
0x03, 0x32, 0x9c, 0x01, 0x0a, 0x1b, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68,
0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63,
0x65, 0x12, 0x7d, 0x0a, 0x0b, 0x48, 0x79, 0x70, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x12, 0x34, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f,
0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70,
0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70,
0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x1a, 0x34, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72,
0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61,
0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x28, 0x01, 0x30, 0x01,
0x42, 0x3a, 0x5a, 0x38, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69,
0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62,
0x72, 0x69, 0x75, 0x6d, 0x2f, 0x6d, 0x6f, 0x6e, 0x6f, 0x72, 0x65, 0x70, 0x6f, 0x2f, 0x6e, 0x6f,
0x64, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x73, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@ -82,6 +82,8 @@ message HypergraphComparisonResponse {
// (For instance, if the node is a branch you may want to send the commitment
// for each child.)
repeated BranchChild children = 3;
// If set, indicates the response is for the root of the given set tree.
bool is_root = 4;
}
// Defines the child commitment.

View File

@ -6,7 +6,6 @@ import (
"encoding/hex"
"fmt"
"io"
"math/big"
"sync/atomic"
"time"
@ -44,6 +43,7 @@ type hypergraphComparisonServer struct {
localHypergraphStore store.HypergraphStore
localHypergraph *hypergraph.Hypergraph
syncController *SyncController
debugTotalCoins int
}
func NewHypergraphComparisonServer(
@ -51,12 +51,14 @@ func NewHypergraphComparisonServer(
hypergraphStore store.HypergraphStore,
hypergraph *hypergraph.Hypergraph,
syncController *SyncController,
debugTotalCoins int,
) *hypergraphComparisonServer {
return &hypergraphComparisonServer{
logger: logger,
localHypergraphStore: hypergraphStore,
localHypergraph: hypergraph,
syncController: syncController,
debugTotalCoins: debugTotalCoins,
}
}
@ -202,6 +204,7 @@ func getBranchInfoFromTree(tree *crypto.VectorCommitmentTree, path []int32) (
branchInfo := &protobufs.HypergraphComparisonResponse{
Path: path,
Commitment: commitment,
IsRoot: len(path) == 0,
}
if branch, ok := node.(*crypto.VectorCommitmentBranchNode); ok {
@ -299,6 +302,7 @@ func syncTreeBidirectionallyServer(
localHypergraphStore store.HypergraphStore,
localHypergraph *hypergraph.Hypergraph,
metadataOnly bool,
debugTotalCoins int,
) error {
msg, err := stream.Recv()
if err != nil {
@ -340,27 +344,14 @@ func syncTreeBidirectionallyServer(
return errors.New("server does not have phase set")
}
// Send our root branch info.
rootPath := []int32{}
rootInfo, err := getBranchInfoFromTree(idSet.GetTree(), rootPath)
if err != nil {
return err
}
if err := stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Response{
Response: rootInfo,
},
}); err != nil {
return err
}
requested := map[string]struct{}{}
sent := map[string]struct{}{}
inserts := []*protobufs.LeafData{}
pendingIn, pendingOut := UnboundedChan[[]int32]("server pending")
pendingIn <- rootPath
pendingIn <- []int32{}
incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison]("server incoming")
incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison](
"server incoming",
)
go func() {
for {
msg, err := stream.Recv()
@ -380,6 +371,9 @@ func syncTreeBidirectionallyServer(
}
}()
knownRemoteCommitment := []byte{}
previousKnownRemoteCommitment := []byte{}
outer:
for {
select {
@ -418,6 +412,12 @@ outer:
idSet.GetTree(),
remoteInfo.Path,
)
if remoteInfo.IsRoot {
previousKnownRemoteCommitment = knownRemoteCommitment
knownRemoteCommitment = remoteInfo.Commitment
}
if err != nil {
logger.Info(
"server requesting missing node",
@ -586,18 +586,25 @@ outer:
}
}
inserts = append(inserts, remoteUpdate)
idSet.Add(hypergraph.AtomFromBytes(remoteUpdate.Value))
}
case <-time.After(5 * time.Second):
logger.Info("server timed out")
break outer
commitment := idSet.GetTree().Commit(false)
if !bytes.Equal(commitment, knownRemoteCommitment) &&
!bytes.Equal(knownRemoteCommitment, previousKnownRemoteCommitment) &&
!bytes.Equal(knownRemoteCommitment, []byte{}) {
time.Sleep(1 * time.Second)
requested = map[string]struct{}{}
sent = map[string]struct{}{}
pendingIn <- []int32{}
} else {
break outer
}
}
}
for _, remoteUpdate := range inserts {
idSet.Add(hypergraph.AtomFromBytes(remoteUpdate.Value))
}
total, _ := idSet.GetTree().GetMetadata()
logger.Info("current progress", zap.Float32("percentage", float32(total*100)/float32(debugTotalCoins)))
return nil
}
@ -616,6 +623,7 @@ func (s *hypergraphComparisonServer) HyperStream(
s.localHypergraphStore,
s.localHypergraph,
false,
s.debugTotalCoins,
)
}
@ -629,15 +637,11 @@ func SyncTreeBidirectionally(
shardKey []byte,
phaseSet protobufs.HypergraphPhaseSet,
hypergraphStore store.HypergraphStore,
localTree *crypto.VectorCommitmentTree,
set *hypergraph.IdSet,
syncController *SyncController,
debugTotalCoins int,
metadataOnly bool,
) error {
if !syncController.TryEstablishSyncSession() {
return errors.New("unavailable")
}
defer syncController.EndSyncSession()
logger.Info(
"sending initialization message",
zap.String("shard_key", hex.EncodeToString(shardKey)),
@ -649,7 +653,7 @@ func SyncTreeBidirectionally(
ShardKey: shardKey,
PhaseSet: phaseSet,
Path: []int32{},
Commitment: localTree.Commit(false),
Commitment: set.GetTree().Commit(false),
IncludeLeafData: false,
},
},
@ -657,26 +661,14 @@ func SyncTreeBidirectionally(
return err
}
rootPath := []int32{}
rootInfo, err := getBranchInfoFromTree(localTree, rootPath)
if err != nil {
return err
}
if err := stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Response{
Response: rootInfo,
},
}); err != nil {
return err
}
requested := map[string]struct{}{}
sent := map[string]struct{}{}
inserts := []*protobufs.LeafData{}
pendingIn, pendingOut := UnboundedChan[[]int32]("client pending")
pendingIn <- []int32{}
incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison]("client incoming")
incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison](
"client incoming",
)
go func() {
for {
msg, err := stream.Recv()
@ -695,6 +687,9 @@ func SyncTreeBidirectionally(
}
}()
knownRemoteCommitment := []byte{}
previousKnownRemoteCommitment := []byte{}
outer:
for {
select {
@ -725,7 +720,13 @@ outer:
"handling response",
zap.String("path", hex.EncodeToString(packNibbles(remoteInfo.Path))),
)
localInfo, err := getBranchInfoFromTree(localTree, remoteInfo.Path)
if remoteInfo.IsRoot {
previousKnownRemoteCommitment = knownRemoteCommitment
knownRemoteCommitment = remoteInfo.Commitment
}
localInfo, err := getBranchInfoFromTree(set.GetTree(), remoteInfo.Path)
if err != nil {
logger.Info(
"requesting missing node",
@ -771,7 +772,7 @@ outer:
stream,
hypergraphStore,
sent,
localTree,
set.GetTree(),
remoteInfo.Path,
metadataOnly,
); err != nil {
@ -826,7 +827,7 @@ outer:
stream,
hypergraphStore,
sent,
localTree,
set.GetTree(),
queryPath,
metadataOnly,
); err != nil {
@ -840,7 +841,7 @@ outer:
hex.EncodeToString(packNibbles(queryPath)),
),
)
branchInfo, err := getBranchInfoFromTree(localTree, queryPath)
branchInfo, err := getBranchInfoFromTree(set.GetTree(), queryPath)
if err != nil {
continue
}
@ -888,25 +889,25 @@ outer:
}
}
inserts = append(inserts, remoteUpdate)
set.Add(hypergraph.AtomFromBytes(remoteUpdate.Value))
}
case <-time.After(5 * time.Second):
logger.Info("timed out")
break outer
commitment := set.GetTree().Commit(false)
if !bytes.Equal(commitment, knownRemoteCommitment) &&
!bytes.Equal(knownRemoteCommitment, previousKnownRemoteCommitment) &&
!bytes.Equal(knownRemoteCommitment, []byte{}) {
time.Sleep(1 * time.Second)
requested = map[string]struct{}{}
sent = map[string]struct{}{}
pendingIn <- []int32{}
} else {
break outer
}
}
}
for _, remoteUpdate := range inserts {
size := new(big.Int).SetBytes(remoteUpdate.Size)
localTree.Insert(
remoteUpdate.Key,
remoteUpdate.Value,
remoteUpdate.HashTarget,
size,
)
}
total, _ := set.GetTree().GetMetadata()
logger.Info("current progress", zap.Float32("percentage", float32(total*100)/float32(debugTotalCoins)))
return nil
}

View File

@ -220,7 +220,7 @@ func TestHypergraphSyncServer(t *testing.T) {
grpcServer := grpc.NewServer()
protobufs.RegisterHypergraphComparisonServiceServer(
grpcServer,
rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController()),
rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), 10000),
)
log.Println("Server listening on :50051")
go func() {
@ -240,7 +240,7 @@ func TestHypergraphSyncServer(t *testing.T) {
syncController := rpc.NewSyncController()
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), syncController, false)
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, 10000, false)
if err != nil {
log.Fatalf("Client: failed to sync 1: %v", err)
}
@ -249,7 +249,7 @@ func TestHypergraphSyncServer(t *testing.T) {
crdts[0].GetVertexAdds()[shardKey].GetTree(),
crdts[1].GetVertexAdds()[shardKey].GetTree(),
)
fmt.Println(len(leaves))
fmt.Println("pass completed, orphans:", len(leaves))
crdts[0].GetVertexAdds()[shardKey].GetTree().Commit(false)
crdts[1].GetVertexAdds()[shardKey].GetTree().Commit(false)
@ -259,7 +259,7 @@ func TestHypergraphSyncServer(t *testing.T) {
log.Fatalf("Client: failed to stream: %v", err)
}
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), syncController, false)
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, 10000, false)
if err != nil {
log.Fatalf("Client: failed to sync 2: %v", err)
}