Merge branch 'v2.0.6' into develop-2.1-pre-milestone3

This commit is contained in:
Cassandra Heart 2025-01-09 06:12:14 -06:00
commit f1f9067988
No known key found for this signature in database
GPG Key ID: 6352152859385958
11 changed files with 199 additions and 280 deletions

View File

@ -27,15 +27,13 @@ func (e *DataClockConsensusEngine) syncWithMesh() error {
if err != nil {
return errors.Wrap(err, "sync")
}
var doneChs []<-chan struct{}
for {
candidates := e.GetAheadPeers(max(latest.FrameNumber, e.latestFrameReceived))
if len(candidates) == 0 {
break
}
for _, candidate := range candidates {
if candidate.MaxFrame <= max(latest.FrameNumber, e.latestFrameReceived) {
continue
}
head, err := e.dataTimeReel.Head()
if err != nil {
return errors.Wrap(err, "sync")
@ -43,13 +41,24 @@ func (e *DataClockConsensusEngine) syncWithMesh() error {
if latest.FrameNumber < head.FrameNumber {
latest = head
}
latest, err = e.syncWithPeer(latest, candidate.MaxFrame, candidate.PeerID)
if candidate.MaxFrame <= max(latest.FrameNumber, e.latestFrameReceived) {
continue
}
latest, doneChs, err = e.syncWithPeer(latest, doneChs, candidate.MaxFrame, candidate.PeerID)
if err != nil {
e.logger.Debug("error syncing frame", zap.Error(err))
}
}
}
for _, doneCh := range doneChs {
select {
case <-e.ctx.Done():
return e.ctx.Err()
case <-doneCh:
}
}
e.logger.Info(
"returning leader frame",
zap.Uint64("frame_number", latest.FrameNumber),
@ -312,13 +321,13 @@ func (e *DataClockConsensusEngine) GetAheadPeers(frameNumber uint64) []internal.
}
func (e *DataClockConsensusEngine) syncWithPeer(
currentLatest *protobufs.ClockFrame,
latest *protobufs.ClockFrame,
doneChs []<-chan struct{},
maxFrame uint64,
peerId []byte,
) (*protobufs.ClockFrame, error) {
) (*protobufs.ClockFrame, []<-chan struct{}, error) {
e.syncingStatus = SyncStatusSynchronizing
defer func() { e.syncingStatus = SyncStatusNotSyncing }()
latest := currentLatest
e.logger.Info(
"polling peer for new frames",
zap.String("peer_id", peer.ID(peerId).String()),
@ -350,7 +359,7 @@ func (e *DataClockConsensusEngine) syncWithPeer(
zap.Error(err),
)
cooperative = false
return latest, errors.Wrap(err, "sync")
return latest, doneChs, errors.Wrap(err, "sync")
}
defer func() {
if err := cc.Close(); err != nil {
@ -378,12 +387,12 @@ func (e *DataClockConsensusEngine) syncWithPeer(
zap.Error(err),
)
cooperative = false
return latest, errors.Wrap(err, "sync")
return latest, doneChs, errors.Wrap(err, "sync")
}
if response == nil {
e.logger.Debug("received no response from peer")
return latest, nil
return latest, doneChs, nil
}
if response.ClockFrame == nil ||
@ -391,7 +400,7 @@ func (e *DataClockConsensusEngine) syncWithPeer(
response.ClockFrame.Timestamp < latest.Timestamp {
e.logger.Debug("received invalid response from peer")
cooperative = false
return latest, nil
return latest, doneChs, nil
}
e.logger.Info(
"received new leading frame",
@ -406,12 +415,16 @@ func (e *DataClockConsensusEngine) syncWithPeer(
if err := e.frameProver.VerifyDataClockFrame(
response.ClockFrame,
); err != nil {
return nil, errors.Wrap(err, "sync")
return latest, doneChs, errors.Wrap(err, "sync")
}
e.dataTimeReel.Insert(e.ctx, response.ClockFrame, true)
doneCh, err := e.dataTimeReel.Insert(e.ctx, response.ClockFrame)
if err != nil {
return latest, doneChs, errors.Wrap(err, "sync")
}
doneChs = append(doneChs, doneCh)
latest = response.ClockFrame
if latest.FrameNumber >= maxFrame {
return latest, nil
return latest, doneChs, nil
}
}
}

View File

@ -141,6 +141,7 @@ type DataClockConsensusEngine struct {
infoMessageProcessorCh chan *pb.Message
report *protobufs.SelfTestReport
clients []protobufs.DataIPCServiceClient
clientsMx sync.Mutex
grpcRateLimiter *RateLimiter
previousFrameProven *protobufs.ClockFrame
previousTree *mt.MerkleTree
@ -562,19 +563,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
e.wg.Add(1)
go func() {
defer e.wg.Done()
if len(e.config.Engine.DataWorkerMultiaddrs) != 0 {
e.clients, err = e.createParallelDataClientsFromList()
if err != nil {
panic(err)
}
} else {
e.clients, err = e.createParallelDataClientsFromBaseMultiaddr(
e.config.Engine.DataWorkerCount,
)
if err != nil {
panic(err)
}
}
e.createParallelDataWorkerClients()
}()
return errChan
@ -831,211 +820,155 @@ func (e *DataClockConsensusEngine) createCommunicationKeys() error {
return nil
}
func (e *DataClockConsensusEngine) createParallelDataClientsFromListAndIndex(
index uint32,
func (e *DataClockConsensusEngine) connectToClient(
index int,
useList bool,
) (
protobufs.DataIPCServiceClient,
error,
) {
ma, err := multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[index])
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
_, addr, err := mn.DialArgs(ma)
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := qgrpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(
insecure.NewCredentials(),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(10*1024*1024),
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
grpc.WithBlock(),
)
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
client := protobufs.NewDataIPCServiceClient(conn)
e.logger.Info(
"connected to data worker process",
zap.Uint32("client", index),
)
return client, nil
}
func (
e *DataClockConsensusEngine,
) createParallelDataClientsFromBaseMultiaddrAndIndex(
index uint32,
) (
protobufs.DataIPCServiceClient,
error,
) {
e.logger.Info(
"re-connecting to data worker process",
zap.Uint32("client", index),
)
ma, err := multiaddr.NewMultiaddr(
fmt.Sprintf(
e.config.Engine.DataWorkerBaseListenMultiaddr,
int(e.config.Engine.DataWorkerBaseListenPort)+int(index),
),
)
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
_, addr, err := mn.DialArgs(ma)
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := qgrpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(
insecure.NewCredentials(),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(10*1024*1024),
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
grpc.WithBlock(),
)
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
client := protobufs.NewDataIPCServiceClient(conn)
e.logger.Info(
"connected to data worker process",
zap.Uint32("client", index),
)
return client, nil
}
func (e *DataClockConsensusEngine) createParallelDataClientsFromList() (
[]protobufs.DataIPCServiceClient,
error,
) {
parallelism := len(e.config.Engine.DataWorkerMultiaddrs)
e.logger.Info(
"connecting to data worker processes",
zap.Int("parallelism", parallelism),
)
clients := make([]protobufs.DataIPCServiceClient, parallelism)
for i := 0; i < parallelism; i++ {
ma, err := multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[i])
if err != nil {
panic(err)
}
_, addr, err := mn.DialArgs(ma)
if err != nil {
e.logger.Error("could not get dial args", zap.Error(err))
continue
}
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := qgrpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(
insecure.NewCredentials(),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(10*1024*1024),
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
grpc.WithBlock(),
)
if err != nil {
e.logger.Error("could not dial", zap.Error(err))
continue
}
clients[i] = protobufs.NewDataIPCServiceClient(conn)
}
e.logger.Info(
"connected to data worker processes",
zap.Int("parallelism", parallelism),
)
return clients, nil
}
func (e *DataClockConsensusEngine) createParallelDataClientsFromBaseMultiaddr(
parallelism int,
) ([]protobufs.DataIPCServiceClient, error) {
e.logger.Info(
"connecting to data worker processes",
zap.Int("parallelism", parallelism),
)
clients := make([]protobufs.DataIPCServiceClient, parallelism)
for i := 0; i < parallelism; i++ {
ma, err := multiaddr.NewMultiaddr(
var ma multiaddr.Multiaddr
var err error
if useList {
ma, err = multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[index])
} else {
ma, err = multiaddr.NewMultiaddr(
fmt.Sprintf(
e.config.Engine.DataWorkerBaseListenMultiaddr,
int(e.config.Engine.DataWorkerBaseListenPort)+i,
int(e.config.Engine.DataWorkerBaseListenPort)+int(index),
),
)
if err != nil {
panic(err)
}
}
if err != nil {
e.logger.Error("failed to create multiaddr", zap.Error(err))
return nil, err
}
_, addr, err := mn.DialArgs(ma)
if err != nil {
e.logger.Error("could not get dial args", zap.Error(err))
continue
}
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := qgrpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(
insecure.NewCredentials(),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(10*1024*1024),
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
grpc.WithBlock(),
_, addr, err := mn.DialArgs(ma)
if err != nil {
e.logger.Error("could not get dial args",
zap.Error(err),
zap.String("multiaddr", ma.String()),
zap.Int("index", index),
)
if err != nil {
e.logger.Error("could not dial", zap.Error(err))
continue
}
return nil, err
}
clients[i] = protobufs.NewDataIPCServiceClient(conn)
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := qgrpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(
insecure.NewCredentials(),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(10*1024*1024),
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
grpc.WithBlock(),
)
if err != nil {
e.logger.Error("could not dial",
zap.Error(err),
zap.String("multiaddr", ma.String()),
zap.Int("index", index),
)
return nil, err
}
e.logger.Info(
"connected to data worker processes",
"connected to data worker process",
zap.String("multiaddr", ma.String()),
)
return protobufs.NewDataIPCServiceClient(conn), nil
}
func (e *DataClockConsensusEngine) createParallelDataWorkerClients() {
parallelism := len(e.config.Engine.DataWorkerMultiaddrs)
useList := true
if parallelism == 0 {
parallelism = e.config.Engine.DataWorkerCount
useList = false
}
e.clientsMx.Lock()
e.clients = make([]protobufs.DataIPCServiceClient, parallelism)
e.clientsMx.Unlock()
e.logger.Info(
"connecting to data worker processes",
zap.Int("parallelism", parallelism),
)
return clients, nil
wg := sync.WaitGroup{}
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
index := i
go func() {
defer wg.Done()
client, err := e.connectToClient(index, useList)
if err != nil {
e.clientsMx.Lock()
e.clients[index] = nil
e.clientsMx.Unlock()
e.logger.Error("failed to connect to data worker", zap.Error(err))
return
}
e.clientsMx.Lock()
e.clients[index] = client
e.clientsMx.Unlock()
}()
}
wg.Wait()
}
func (e *DataClockConsensusEngine) tryReconnectDataWorkerClients() {
// could reload worker list config here
parallelism := len(e.config.Engine.DataWorkerMultiaddrs)
useList := true
if parallelism == 0 {
parallelism = e.config.Engine.DataWorkerCount
useList = false
}
wg := sync.WaitGroup{}
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
index := i
go func() {
defer wg.Done()
if e.clients[index] != nil {
return
}
for j := 3; j >= 0; j-- {
client, err := e.connectToClient(index, useList)
if err != nil {
e.clientsMx.Lock()
e.clients[index] = nil
e.clientsMx.Unlock()
e.logger.Error("failed to connect to data worker",
zap.Error(err),
zap.Int("index", index),
)
time.Sleep(50 * time.Millisecond)
continue
}
e.clientsMx.Lock()
e.logger.Info("reconnected to data worker",
zap.Int("index", index),
)
e.clients[index] = client
e.clientsMx.Unlock()
break
}
}()
}
wg.Wait()
}
func (e *DataClockConsensusEngine) GetWorkerCount() uint32 {

View File

@ -2,7 +2,6 @@ package data
import (
"bytes"
"sync"
"time"
"github.com/iden3/go-iden3-crypto/poseidon"
@ -234,6 +233,7 @@ func (e *DataClockConsensusEngine) processFrame(
latestFrame *protobufs.ClockFrame,
dataFrame *protobufs.ClockFrame,
) *protobufs.ClockFrame {
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
@ -276,7 +276,9 @@ func (e *DataClockConsensusEngine) processFrame(
return dataFrame
}
e.dataTimeReel.Insert(e.ctx, nextFrame, true)
if _, err := e.dataTimeReel.Insert(e.ctx, nextFrame); err != nil {
e.logger.Debug("could not insert frame", zap.Error(err))
}
return nextFrame
} else {
@ -311,49 +313,7 @@ func (e *DataClockConsensusEngine) processFrame(
e.clientReconnectTest++
if e.clientReconnectTest >= 10 {
wg := sync.WaitGroup{}
wg.Add(len(e.clients))
for i, client := range e.clients {
i := i
client := client
go func() {
for j := 3; j >= 0; j-- {
var err error
if client == nil {
if len(e.config.Engine.DataWorkerMultiaddrs) != 0 {
e.logger.Error(
"client failed, reconnecting after 50ms",
zap.Uint32("client", uint32(i)),
)
time.Sleep(50 * time.Millisecond)
client, err = e.createParallelDataClientsFromListAndIndex(uint32(i))
if err != nil {
e.logger.Error("failed to reconnect", zap.Error(err))
}
} else if len(e.config.Engine.DataWorkerMultiaddrs) == 0 {
e.logger.Error(
"client failed, reconnecting after 50ms",
zap.Uint32("client", uint32(i)),
)
time.Sleep(50 * time.Millisecond)
client, err =
e.createParallelDataClientsFromBaseMultiaddrAndIndex(uint32(i))
if err != nil {
e.logger.Error(
"failed to reconnect",
zap.Uint32("client", uint32(i)),
zap.Error(err),
)
}
}
e.clients[i] = client
continue
}
}
wg.Done()
}()
}
wg.Wait()
e.tryReconnectDataWorkerClients()
e.clientReconnectTest = 0
}

View File

@ -253,7 +253,9 @@ func (e *DataClockConsensusEngine) handleClockFrame(
}
if frame.FrameNumber > head.FrameNumber {
e.dataTimeReel.Insert(e.ctx, frame, false)
if _, err := e.dataTimeReel.Insert(e.ctx, frame); err != nil {
e.logger.Debug("could not insert frame", zap.Error(err))
}
}
return nil

View File

@ -155,7 +155,7 @@ func (e *MasterClockConsensusEngine) publishProof(
zap.Uint64("frame_number", frame.FrameNumber),
)
e.masterTimeReel.Insert(context.TODO(), frame, false)
e.masterTimeReel.Insert(context.TODO(), frame)
}
e.state = consensus.EngineStateCollecting

View File

@ -208,7 +208,7 @@ func (e *MasterClockConsensusEngine) Start() <-chan error {
continue
}
e.masterTimeReel.Insert(context.TODO(), newFrame, false)
e.masterTimeReel.Insert(context.TODO(), newFrame)
}
}
}()

View File

@ -29,6 +29,7 @@ type pendingFrame struct {
selector *big.Int
parentSelector *big.Int
frameNumber uint64
done chan struct{}
}
type DataTimeReel struct {
@ -190,12 +191,18 @@ func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) {
return d.head, nil
}
var alreadyDone chan struct{} = func() chan struct{} {
done := make(chan struct{})
close(done)
return done
}()
// Insert enqueues a structurally valid frame into the time reel. If the frame
// is the next one in sequence, it advances the reel head forward and emits a
// new frame on the new frame channel.
func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error {
func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame) (<-chan struct{}, error) {
if err := d.ctx.Err(); err != nil {
return err
return nil, err
}
d.logger.Debug(
@ -222,21 +229,24 @@ func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame,
d.storePending(selector, parent, distance, frame)
if d.head.FrameNumber+1 == frame.FrameNumber {
done := make(chan struct{})
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
case <-d.ctx.Done():
return d.ctx.Err()
return nil, d.ctx.Err()
case d.frames <- &pendingFrame{
selector: selector,
parentSelector: parent,
frameNumber: frame.FrameNumber,
done: done,
}:
return done, nil
}
}
}
return nil
return alreadyDone, nil
}
func (
@ -393,6 +403,7 @@ func (d *DataTimeReel) runLoop() {
// Otherwise set it as the next and process all pending
if err = d.setHead(rawFrame, distance); err != nil {
close(frame.done)
continue
}
d.processPending(d.head, frame)
@ -559,6 +570,7 @@ func (d *DataTimeReel) processPending(
frame *protobufs.ClockFrame,
lastReceived *pendingFrame,
) {
defer close(lastReceived.done)
// d.logger.Debug(
// "process pending",
// zap.Uint64("head_frame", frame.FrameNumber),

View File

@ -233,7 +233,7 @@ func TestDataTimeReel(t *testing.T) {
i+1,
10,
)
d.Insert(ctx, frame, false)
d.Insert(ctx, frame)
prevBI, _ := frame.GetSelector()
prev = prevBI.FillBytes(make([]byte, 32))
}
@ -264,7 +264,7 @@ func TestDataTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := d.Insert(ctx, insertFrames[i], false)
_, err := d.Insert(ctx, insertFrames[i])
assert.NoError(t, err)
}
@ -286,7 +286,7 @@ func TestDataTimeReel(t *testing.T) {
i+1,
10,
)
d.Insert(ctx, frame, false)
d.Insert(ctx, frame)
prevBI, _ := frame.GetSelector()
prev = prevBI.FillBytes(make([]byte, 32))
@ -334,7 +334,7 @@ func TestDataTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := d.Insert(ctx, insertFrames[i], false)
_, err := d.Insert(ctx, insertFrames[i])
assert.NoError(t, err)
}
@ -397,7 +397,7 @@ func TestDataTimeReel(t *testing.T) {
// Someone is honest, but running backwards:
for i := 99; i >= 0; i-- {
err := d.Insert(ctx, insertFrames[i], false)
_, err := d.Insert(ctx, insertFrames[i])
gotime.Sleep(1 * gotime.Second)
assert.NoError(t, err)
}

View File

@ -123,13 +123,12 @@ func (m *MasterTimeReel) Head() (*protobufs.ClockFrame, error) {
func (m *MasterTimeReel) Insert(
ctx context.Context,
frame *protobufs.ClockFrame,
isSync bool,
) error {
) (<-chan struct{}, error) {
go func() {
m.frames <- frame
}()
return nil
return alreadyDone, nil
}
// NewFrameCh implements TimeReel.

View File

@ -61,7 +61,7 @@ func TestMasterTimeReel(t *testing.T) {
)
assert.NoError(t, err)
err := m.Insert(ctx, frame, false)
_, err := m.Insert(ctx, frame)
assert.NoError(t, err)
}
@ -81,7 +81,7 @@ func TestMasterTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := m.Insert(ctx, insertFrames[i], false)
_, err := m.Insert(ctx, insertFrames[i])
assert.NoError(t, err)
}

View File

@ -9,7 +9,7 @@ import (
type TimeReel interface {
Start() error
Stop()
Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error
Insert(ctx context.Context, frame *protobufs.ClockFrame) (<-chan struct{}, error)
Head() (*protobufs.ClockFrame, error)
NewFrameCh() <-chan *protobufs.ClockFrame
BadFrameCh() <-chan *protobufs.ClockFrame