diff --git a/config/version.go b/config/version.go index 92e8034..0705ed8 100644 --- a/config/version.go +++ b/config/version.go @@ -43,7 +43,7 @@ func FormatVersion(version []byte) string { } func GetPatchNumber() byte { - return 0x02 + return 0x03 } func GetRCNumber() byte { diff --git a/consensus/example/generic_consensus_example.go b/consensus/example/generic_consensus_example.go index 826252c..db4ce96 100644 --- a/consensus/example/generic_consensus_example.go +++ b/consensus/example/generic_consensus_example.go @@ -299,6 +299,34 @@ func (m *MockVotingProvider) DecideAndSendVote( return PeerID{ID: chosenID}, vote, nil } +func (m *MockVotingProvider) SendVote(vote *Vote, ctx context.Context) ( + PeerID, + error, +) { + m.logger.Info("re-sent vote", + zap.String("node_id", vote.NodeID), + zap.String("vote", vote.VoteValue), + zap.Uint64("round", vote.Round), + zap.String("for_proposal", vote.ProposerID)) + + if m.messageBus != nil { + // Make a copy to avoid sharing pointers + voteCopy := &Vote{ + NodeID: vote.NodeID, + Round: vote.Round, + VoteValue: vote.VoteValue, + Timestamp: vote.Timestamp, + ProposerID: vote.ProposerID, + } + m.messageBus.Broadcast(Message{ + Type: "vote", + Sender: m.nodeID, + Data: voteCopy, + }) + } + return PeerID{ID: vote.ProposerID}, nil +} + func (m *MockVotingProvider) IsQuorum( proposalVotes map[consensus.Identity]*Vote, ctx context.Context, diff --git a/consensus/state_machine.go b/consensus/state_machine.go index 37c21e6..204b121 100644 --- a/consensus/state_machine.go +++ b/consensus/state_machine.go @@ -139,6 +139,8 @@ type VotingProvider[StateT Unique, VoteT Unique, PeerIDT Unique] interface { proposals map[Identity]*StateT, ctx context.Context, ) (PeerIDT, *VoteT, error) + // Re-publishes a vote message, used to help lagging peers catch up. + SendVote(vote *VoteT, ctx context.Context) (PeerIDT, error) // IsQuorum returns a response indicating whether or not quorum has been // reached. IsQuorum( @@ -603,32 +605,96 @@ func (sm *StateMachine[ return } - proposal, err := sm.leaderProvider.ProveNextState( - data, - *collected, - ctx, - ) + peers, err := sm.leaderProvider.GetNextLeaders(data, ctx) if err != nil { - sm.traceLogger.Error( - fmt.Sprintf("error encountered in %s", sm.machineState), - err, - ) - + sm.traceLogger.Error("could not obtain leaders", err) sm.SendEvent(EventInduceSync) return } - sm.mu.Lock() - sm.traceLogger.Trace( - fmt.Sprintf("adding proposal with rank %d", (*proposal).Rank()), - ) - if _, ok := sm.proposals[(*proposal).Rank()]; !ok { - sm.proposals[(*proposal).Rank()] = make(map[Identity]*StateT) - } - sm.proposals[(*proposal).Rank()][sm.id.Identity()] = proposal - sm.mu.Unlock() + proposalCh := make(chan *StateT) + go func() { + proposal, err := sm.leaderProvider.ProveNextState( + data, + *collected, + ctx, + ) + if err != nil { + sm.traceLogger.Error( + fmt.Sprintf("error encountered in %s", sm.machineState), + err, + ) - sm.SendEvent(EventProofComplete) + proposalCh <- nil + return + } + + proposalCh <- proposal + }() + + timer := time.NewTicker(1 * time.Second) + checks := 0 + + for { + select { + case proposal, ok := <-proposalCh: + if !ok || proposal == nil { + sm.SendEvent(EventInduceSync) + return + } + + sm.mu.Lock() + sm.traceLogger.Trace( + fmt.Sprintf("adding proposal with rank %d", (*proposal).Rank()), + ) + if _, ok := sm.proposals[(*proposal).Rank()]; !ok { + sm.proposals[(*proposal).Rank()] = make(map[Identity]*StateT) + } + sm.proposals[(*proposal).Rank()][sm.id.Identity()] = proposal + sm.mu.Unlock() + + sm.SendEvent(EventProofComplete) + return + case <-timer.C: + checks++ + sm.mu.Lock() + proposals, ok := sm.proposals[(*data).Rank()+1] + if !ok { + sm.mu.Unlock() + continue + } + + // We have the winner, move on + if _, ok := proposals[peers[0].Identity()]; ok { + sm.mu.Unlock() + + sm.SendEvent(EventPublishTimeout) + return + } + + // Reverse decay acceptance on target time + for i := range peers { + if i == 0 { + // already checked + continue + } + + checkTime := i + 10 + if checkTime <= checks { + if _, ok := proposals[peers[i].Identity()]; ok { + sm.mu.Unlock() + + sm.SendEvent(EventPublishTimeout) + return + } + } + } + sm.mu.Unlock() + case <-ctx.Done(): + sm.traceLogger.Trace("context canceled") + return + } + } }, Timeout: 120 * time.Second, OnTimeout: EventPublishTimeout, @@ -685,18 +751,6 @@ func (sm *StateMachine[ } } - if len(sm.proposals[(*sm.activeState).Rank()+1]) < int(sm.minimumProvers()) { - sm.traceLogger.Trace( - fmt.Sprintf( - "insufficient proposal count: %d, need %d", - len(sm.proposals[(*sm.activeState).Rank()+1]), - int(sm.minimumProvers()), - ), - ) - sm.mu.Unlock() - return - } - if ctx == nil { sm.traceLogger.Trace("context null") sm.mu.Unlock() @@ -716,6 +770,16 @@ func (sm *StateMachine[ proposals[k] = &state } + if len(proposals) == 0 { + sm.mu.Unlock() + sm.traceLogger.Error( + "no proposals to vote on", + errors.New("no proposals"), + ) + sm.SendEvent(EventInduceSync) + break + } + sm.mu.Unlock() selectedPeer, vote, err := sm.votingProvider.DecideAndSendVote( proposals, @@ -745,39 +809,64 @@ func (sm *StateMachine[ } } else { sm.traceLogger.Trace("proposal chosen, checking for quorum") - proposalVotes := map[Identity]*VoteT{} - for p, vp := range sm.votes[(*sm.activeState).Rank()+1] { - vclone := (*vp).Clone().(VoteT) - proposalVotes[p] = &vclone - } - haveEnoughProposals := len(sm.proposals[(*sm.activeState).Rank()+1]) >= - int(sm.minimumProvers()) - sm.mu.Unlock() - isQuorum, err := sm.votingProvider.IsQuorum(proposalVotes, ctx) - if err != nil { - sm.traceLogger.Error( - fmt.Sprintf("error encountered in %s", sm.machineState), - err, - ) - sm.SendEvent(EventInduceSync) - return - } + for { + proposalVotes := map[Identity]*VoteT{} + for p, vp := range sm.votes[(*sm.activeState).Rank()+1] { + vclone := (*vp).Clone().(VoteT) + proposalVotes[p] = &vclone + } + sm.mu.Unlock() + isQuorum, err := sm.votingProvider.IsQuorum(proposalVotes, ctx) + if err != nil { + sm.traceLogger.Error( + fmt.Sprintf("error encountered in %s", sm.machineState), + err, + ) + sm.SendEvent(EventInduceSync) + return + } - if isQuorum && haveEnoughProposals { - sm.traceLogger.Trace("quorum reached") - sm.SendEvent(EventQuorumReached) - } else { - sm.traceLogger.Trace( - fmt.Sprintf( - "quorum not reached: proposals: %d, needed: %d", - len(sm.proposals[(*sm.activeState).Rank()+1]), - sm.minimumProvers(), - ), - ) + if isQuorum { + sm.traceLogger.Trace("quorum reached") + sm.SendEvent(EventQuorumReached) + return + } else { + select { + case <-time.After(1 * time.Second): + vote, ok := proposalVotes[sm.id.Identity()] + if !ok { + sm.traceLogger.Error( + "no vote found", + errors.New("prover has no vote"), + ) + sm.SendEvent(EventInduceSync) + return + } + _, err := sm.votingProvider.SendVote(vote, ctx) + if err != nil { + sm.traceLogger.Error( + fmt.Sprintf("error encountered in %s", sm.machineState), + err, + ) + sm.SendEvent(EventInduceSync) + return + } + case <-ctx.Done(): + return + } + sm.traceLogger.Trace( + fmt.Sprintf( + "quorum not reached: votes: %d, needed: %d", + len(proposalVotes), + sm.minimumProvers(), + ), + ) + } + sm.mu.Lock() } } }, - Timeout: 1 * time.Second, + Timeout: 16 * time.Second, OnTimeout: EventVotingTimeout, } diff --git a/consensus/state_machine_test.go b/consensus/state_machine_test.go index 0580220..a2b1b75 100644 --- a/consensus/state_machine_test.go +++ b/consensus/state_machine_test.go @@ -175,6 +175,10 @@ func (m *mockVotingProvider) DecideAndSendVote( return "", nil, errors.New("no proposal to vote for") } +func (m *mockVotingProvider) SendVote(vote *TestVote, ctx context.Context) (TestPeerID, error) { + return "", nil +} + func (m *mockVotingProvider) IsQuorum(proposalVotes map[Identity]*TestVote, ctx context.Context) (bool, error) { totalVotes := 0 voteCount := map[string]int{} @@ -999,6 +1003,10 @@ func (m *mockPanickingVotingProvider) FinalizeVotes( return nil, "", nil } +func (m *mockPanickingVotingProvider) SendVote(vote *TestVote, ctx context.Context) (TestPeerID, error) { + return "", nil +} + func (m *mockPanickingVotingProvider) SendConfirmation(finalized *TestState, ctx context.Context) error { panic("PANIC HERE") } diff --git a/node/app/node.go b/node/app/node.go index 475eed8..733bc2d 100644 --- a/node/app/node.go +++ b/node/app/node.go @@ -5,7 +5,6 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/consensus/global" consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time" "source.quilibrium.com/quilibrium/monorepo/node/execution/manager" - "source.quilibrium.com/quilibrium/monorepo/node/rpc" "source.quilibrium.com/quilibrium/monorepo/types/consensus" "source.quilibrium.com/quilibrium/monorepo/types/keys" "source.quilibrium.com/quilibrium/monorepo/types/p2p" @@ -25,6 +24,7 @@ type MasterNode struct { coinStore store.TokenStore keyManager keys.KeyManager pubSub p2p.PubSub + peerInfoManager p2p.PeerInfoManager globalConsensus *global.GlobalConsensusEngine globalTimeReel *consensustime.GlobalTimeReel pebble store.KVDB @@ -48,6 +48,7 @@ func newMasterNode( coinStore store.TokenStore, keyManager keys.KeyManager, pubSub p2p.PubSub, + peerInfoManager p2p.PeerInfoManager, globalConsensus *global.GlobalConsensusEngine, globalTimeReel *consensustime.GlobalTimeReel, pebble store.KVDB, @@ -61,6 +62,7 @@ func newMasterNode( coinStore: coinStore, keyManager: keyManager, pubSub: pubSub, + peerInfoManager: peerInfoManager, globalConsensus: globalConsensus, globalTimeReel: globalTimeReel, pebble: pebble, @@ -154,8 +156,8 @@ func (m *MasterNode) GetCoreId() uint { return m.coreId } -func (m *MasterNode) GetPeerInfoProvider() rpc.PeerInfoProvider { - return m.globalConsensus +func (m *MasterNode) GetPeerInfoManager() p2p.PeerInfoManager { + return m.peerInfoManager } func (m *MasterNode) GetWorkerManager() worker.WorkerManager { diff --git a/node/app/wire.go b/node/app/wire.go index bf823be..738d37a 100644 --- a/node/app/wire.go +++ b/node/app/wire.go @@ -108,7 +108,7 @@ var pubSubSet = wire.NewSet( p2p.NewBlossomSub, channel.NewDoubleRatchetEncryptedChannel, wire.Bind(new(tp2p.PubSub), new(*p2p.BlossomSub)), - wire.Bind(new(p2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager)), + wire.Bind(new(tp2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager)), wire.Bind( new(tchannel.EncryptedChannel), new(*channel.DoubleRatchetEncryptedChannel), @@ -122,7 +122,7 @@ var proxyPubSubSet = wire.NewSet( rpc.NewProxyBlossomSub, channel.NewDoubleRatchetEncryptedChannel, wire.Bind(new(tp2p.PubSub), new(*rpc.ProxyBlossomSub)), - wire.Bind(new(p2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager)), + wire.Bind(new(tp2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager)), wire.Bind( new(tchannel.EncryptedChannel), new(*channel.DoubleRatchetEncryptedChannel), @@ -288,7 +288,7 @@ func provideDataWorkerIPC( signerRegistry consensus.SignerRegistry, proverRegistry consensus.ProverRegistry, appConsensusEngineFactory *app.AppConsensusEngineFactory, - peerInfoManager p2p.PeerInfoManager, + peerInfoManager tp2p.PeerInfoManager, frameProver crypto.FrameProver, logger *zap.Logger, coreId uint, diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index 1e01181..6485880 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -197,6 +197,7 @@ func NewMasterNode(logger *zap.Logger, config2 *config.Config, coreId uint) (*Ma p2PConfig := config2.P2P engineConfig := config2.Engine blossomSub := p2p.NewBlossomSub(p2PConfig, engineConfig, logger, coreId) + inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(logger) mpCitHVerifiableEncryptor := newVerifiableEncryptor() kzgInclusionProver := bls48581.NewKZGInclusionProver(logger) pebbleHypergraphStore := store2.NewPebbleHypergraphStore(dbConfig, pebbleDB, logger, mpCitHVerifiableEncryptor, kzgInclusionProver) @@ -228,7 +229,6 @@ func NewMasterNode(logger *zap.Logger, config2 *config.Config, coreId uint) (*Ma pebbleWorkerStore := store2.NewPebbleWorkerStore(pebbleDB, logger) doubleRatchetEncryptedChannel := channel.NewDoubleRatchetEncryptedChannel() bedlamCompiler := compiler.NewBedlamCompiler() - inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(logger) consensusEngineFactory := global.NewConsensusEngineFactory(logger, config2, blossomSub, hypergraph, fileKeyManager, pebbleKeyStore, frameProver, kzgInclusionProver, cachedSignerRegistry, proverRegistry, dynamicFeeManager, blsAppFrameValidator, blsGlobalFrameValidator, asertDifficultyAdjuster, optimizedProofOfMeaningfulWorkRewardIssuance, pebbleClockStore, pebbleInboxStore, pebbleHypergraphStore, pebbleShardsStore, pebbleWorkerStore, doubleRatchetEncryptedChannel, decaf448BulletproofProver, mpCitHVerifiableEncryptor, decaf448KeyConstructor, bedlamCompiler, bls48581KeyConstructor, inMemoryPeerInfoManager) globalConsensusComponents, err := provideGlobalConsensusComponents(consensusEngineFactory, config2) if err != nil { @@ -236,7 +236,7 @@ func NewMasterNode(logger *zap.Logger, config2 *config.Config, coreId uint) (*Ma } globalConsensusEngine := provideGlobalConsensusEngine(globalConsensusComponents) globalTimeReel := provideGlobalTimeReelFromComponents(globalConsensusComponents) - masterNode, err := newMasterNode(logger, pebbleDataProofStore, pebbleClockStore, pebbleTokenStore, fileKeyManager, blossomSub, globalConsensusEngine, globalTimeReel, pebbleDB, coreId) + masterNode, err := newMasterNode(logger, pebbleDataProofStore, pebbleClockStore, pebbleTokenStore, fileKeyManager, blossomSub, inMemoryPeerInfoManager, globalConsensusEngine, globalTimeReel, pebbleDB, coreId) if err != nil { return nil, err } @@ -274,13 +274,13 @@ var verencSet = wire.NewSet( var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store2.NewPebbleDB, wire.Bind(new(store.KVDB), new(*store2.PebbleDB)), store2.NewPebbleClockStore, store2.NewPebbleTokenStore, store2.NewPebbleDataProofStore, store2.NewPebbleHypergraphStore, store2.NewPebbleInboxStore, store2.NewPebbleKeyStore, store2.NewPeerstoreDatastore, store2.NewPebbleShardsStore, store2.NewPebbleWorkerStore, wire.Bind(new(store.ClockStore), new(*store2.PebbleClockStore)), wire.Bind(new(store.TokenStore), new(*store2.PebbleTokenStore)), wire.Bind(new(store.DataProofStore), new(*store2.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store2.PebbleHypergraphStore)), wire.Bind(new(store.InboxStore), new(*store2.PebbleInboxStore)), wire.Bind(new(store.KeyStore), new(*store2.PebbleKeyStore)), wire.Bind(new(tries.TreeBackingStore), new(*store2.PebbleHypergraphStore)), wire.Bind(new(store.ShardsStore), new(*store2.PebbleShardsStore)), wire.Bind(new(store.WorkerStore), new(*store2.PebbleWorkerStore))) -var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), wire.FieldsOf(new(*config.Config), "Engine"), p2p.NewInMemoryPeerInfoManager, p2p.NewBlossomSub, channel.NewDoubleRatchetEncryptedChannel, wire.Bind(new(p2p2.PubSub), new(*p2p.BlossomSub)), wire.Bind(new(p2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager)), wire.Bind( +var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), wire.FieldsOf(new(*config.Config), "Engine"), p2p.NewInMemoryPeerInfoManager, p2p.NewBlossomSub, channel.NewDoubleRatchetEncryptedChannel, wire.Bind(new(p2p2.PubSub), new(*p2p.BlossomSub)), wire.Bind(new(p2p2.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager)), wire.Bind( new(channel2.EncryptedChannel), new(*channel.DoubleRatchetEncryptedChannel), ), ) -var proxyPubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), wire.FieldsOf(new(*config.Config), "Engine"), p2p.NewInMemoryPeerInfoManager, rpc.NewProxyBlossomSub, channel.NewDoubleRatchetEncryptedChannel, wire.Bind(new(p2p2.PubSub), new(*rpc.ProxyBlossomSub)), wire.Bind(new(p2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager)), wire.Bind( +var proxyPubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), wire.FieldsOf(new(*config.Config), "Engine"), p2p.NewInMemoryPeerInfoManager, rpc.NewProxyBlossomSub, channel.NewDoubleRatchetEncryptedChannel, wire.Bind(new(p2p2.PubSub), new(*rpc.ProxyBlossomSub)), wire.Bind(new(p2p2.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager)), wire.Bind( new(channel2.EncryptedChannel), new(*channel.DoubleRatchetEncryptedChannel), ), @@ -351,7 +351,7 @@ func provideDataWorkerIPC( signerRegistry consensus.SignerRegistry, proverRegistry consensus.ProverRegistry, appConsensusEngineFactory *app.AppConsensusEngineFactory, - peerInfoManager p2p.PeerInfoManager, + peerInfoManager p2p2.PeerInfoManager, frameProver crypto.FrameProver, logger *zap.Logger, coreId uint, diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index 17a989e..66598f8 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -82,7 +82,7 @@ type AppConsensusEngine struct { executors map[string]execution.ShardExecutionEngine executorsMu sync.RWMutex executionManager *manager.ExecutionEngineManager - peerInfoManager p2p.PeerInfoManager + peerInfoManager tp2p.PeerInfoManager currentDifficulty uint32 currentDifficultyMu sync.RWMutex pendingMessages []*protobufs.Message @@ -171,7 +171,7 @@ func NewAppConsensusEngine( difficultyAdjuster typesconsensus.DifficultyAdjuster, rewardIssuance typesconsensus.RewardIssuance, eventDistributor typesconsensus.EventDistributor, - peerInfoManager p2p.PeerInfoManager, + peerInfoManager tp2p.PeerInfoManager, appTimeReel *consensustime.AppTimeReel, globalTimeReel *consensustime.GlobalTimeReel, blsConstructor crypto.BlsConstructor, diff --git a/node/consensus/app/consensus_voting_provider.go b/node/consensus/app/consensus_voting_provider.go index 7cb80cc..834ae32 100644 --- a/node/consensus/app/consensus_voting_provider.go +++ b/node/consensus/app/consensus_voting_provider.go @@ -273,6 +273,41 @@ func (p *AppVotingProvider) DecideAndSendVote( return PeerID{ID: chosenProposal.Header.Prover}, &vote, nil } +func (p *AppVotingProvider) SendVote( + vote **protobufs.FrameVote, + ctx context.Context, +) (PeerID, error) { + if vote == nil || *vote == nil { + return PeerID{}, errors.Wrap( + errors.New("no vote provided"), + "send vote", + ) + } + + bumpVote := &protobufs.FrameVote{ + Filter: p.engine.appAddress, + FrameNumber: (*vote).FrameNumber, + Proposer: (*vote).Proposer, + Approve: true, + Timestamp: time.Now().UnixMilli(), + PublicKeySignatureBls48581: (*vote).PublicKeySignatureBls48581, + } + + data, err := (*bumpVote).ToCanonicalBytes() + if err != nil { + return PeerID{}, errors.Wrap(err, "serialize vote") + } + + if err := p.engine.pubsub.PublishToBitmask( + p.engine.getConsensusMessageBitmask(), + data, + ); err != nil { + p.engine.logger.Error("failed to publish vote", zap.Error(err)) + } + + return PeerID{ID: (*vote).Proposer}, nil +} + func (p *AppVotingProvider) IsQuorum( proposalVotes map[consensus.Identity]**protobufs.FrameVote, ctx context.Context, diff --git a/node/consensus/app/factory.go b/node/consensus/app/factory.go index 1e01732..8483aa2 100644 --- a/node/consensus/app/factory.go +++ b/node/consensus/app/factory.go @@ -7,7 +7,6 @@ import ( "source.quilibrium.com/quilibrium/monorepo/config" "source.quilibrium.com/quilibrium/monorepo/node/consensus/events" "source.quilibrium.com/quilibrium/monorepo/node/consensus/time" - qp2p "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/types/channel" "source.quilibrium.com/quilibrium/monorepo/types/compiler" "source.quilibrium.com/quilibrium/monorepo/types/consensus" @@ -39,7 +38,7 @@ type AppConsensusEngineFactory struct { compiler compiler.CircuitCompiler signerRegistry consensus.SignerRegistry proverRegistry consensus.ProverRegistry - peerInfoManager qp2p.PeerInfoManager + peerInfoManager p2p.PeerInfoManager dynamicFeeManager consensus.DynamicFeeManager frameValidator consensus.AppFrameValidator globalFrameValidator consensus.GlobalFrameValidator @@ -69,7 +68,7 @@ func NewAppConsensusEngineFactory( compiler compiler.CircuitCompiler, signerRegistry consensus.SignerRegistry, proverRegistry consensus.ProverRegistry, - peerInfoManager qp2p.PeerInfoManager, + peerInfoManager p2p.PeerInfoManager, dynamicFeeManager consensus.DynamicFeeManager, frameValidator consensus.AppFrameValidator, globalFrameValidator consensus.GlobalFrameValidator, diff --git a/node/consensus/app/integration_helper_test.go b/node/consensus/app/integration_helper_test.go index b47d040..37a7946 100644 --- a/node/consensus/app/integration_helper_test.go +++ b/node/consensus/app/integration_helper_test.go @@ -573,6 +573,9 @@ func (m *mockGlobalClientLocks) GetAppShards(ctx context.Context, in *protobufs. func (m *mockGlobalClientLocks) GetGlobalShards(ctx context.Context, in *protobufs.GetGlobalShardsRequest, opts ...grpc.CallOption) (*protobufs.GetGlobalShardsResponse, error) { return nil, errors.New("not used in this test") } +func (m *mockGlobalClientLocks) GetWorkerInfo(ctx context.Context, in *protobufs.GlobalGetWorkerInfoRequest, opts ...grpc.CallOption) (*protobufs.GlobalGetWorkerInfoResponse, error) { + return nil, errors.New("not used in this test") +} func (m *mockGlobalClientLocks) GetLockedAddresses(ctx context.Context, in *protobufs.GetLockedAddressesRequest, opts ...grpc.CallOption) (*protobufs.GetLockedAddressesResponse, error) { out := &protobufs.GetLockedAddressesResponse{Transactions: []*protobufs.LockedTransaction{}} m.shardAddressesMu.Lock() diff --git a/node/consensus/global/consensus_leader_provider.go b/node/consensus/global/consensus_leader_provider.go index 983022f..e27a16d 100644 --- a/node/consensus/global/consensus_leader_provider.go +++ b/node/consensus/global/consensus_leader_provider.go @@ -88,7 +88,7 @@ func (p *GlobalLeaderProvider) ProveNextState( } // Get current timestamp and difficulty - timestamp := time.Now().UnixMilli() + timestamp := time.Now().UnixMilli() + 30000 difficulty := p.engine.difficultyAdjuster.GetNextDifficulty( (*prior).Rank()+1, timestamp, diff --git a/node/consensus/global/consensus_voting_provider.go b/node/consensus/global/consensus_voting_provider.go index 8fda510..92ed5c1 100644 --- a/node/consensus/global/consensus_voting_provider.go +++ b/node/consensus/global/consensus_voting_provider.go @@ -222,6 +222,40 @@ func (p *GlobalVotingProvider) DecideAndSendVote( return GlobalPeerID{ID: proposerID}, &vote, nil } +func (p *GlobalVotingProvider) SendVote( + vote **protobufs.FrameVote, + ctx context.Context, +) (GlobalPeerID, error) { + if vote == nil || *vote == nil { + return GlobalPeerID{}, errors.Wrap( + errors.New("no vote provided"), + "send vote", + ) + } + + bumpVote := &protobufs.FrameVote{ + FrameNumber: (*vote).FrameNumber, + Proposer: (*vote).Proposer, + Approve: true, + Timestamp: time.Now().UnixMilli(), + PublicKeySignatureBls48581: (*vote).PublicKeySignatureBls48581, + } + + data, err := (*bumpVote).ToCanonicalBytes() + if err != nil { + return GlobalPeerID{}, errors.Wrap(err, "serialize vote") + } + + if err := p.engine.pubsub.PublishToBitmask( + GLOBAL_CONSENSUS_BITMASK, + data, + ); err != nil { + p.engine.logger.Error("failed to publish vote", zap.Error(err)) + } + + return GlobalPeerID{ID: (*vote).Proposer}, nil +} + func (p *GlobalVotingProvider) IsQuorum( proposalVotes map[consensus.Identity]**protobufs.FrameVote, ctx context.Context, diff --git a/node/consensus/global/factory.go b/node/consensus/global/factory.go index ea176ae..132c3fd 100644 --- a/node/consensus/global/factory.go +++ b/node/consensus/global/factory.go @@ -7,7 +7,6 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/consensus/events" consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global/compat" - "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/types/channel" "source.quilibrium.com/quilibrium/monorepo/types/compiler" "source.quilibrium.com/quilibrium/monorepo/types/consensus" @@ -47,7 +46,7 @@ type ConsensusEngineFactory struct { decafConstructor crypto.DecafConstructor compiler compiler.CircuitCompiler blsConstructor crypto.BlsConstructor - peerInfoManager p2p.PeerInfoManager + peerInfoManager tp2p.PeerInfoManager } // NewConsensusEngineFactory creates a new factory for consensus engines @@ -78,7 +77,7 @@ func NewConsensusEngineFactory( decafConstructor crypto.DecafConstructor, compiler compiler.CircuitCompiler, blsConstructor crypto.BlsConstructor, - peerInfoManager p2p.PeerInfoManager, + peerInfoManager tp2p.PeerInfoManager, ) *ConsensusEngineFactory { // Initialize peer seniority data compat.RebuildPeerSeniority(uint(config.P2P.Network)) diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 2b742e9..a1fae47 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -28,7 +28,6 @@ import ( "source.quilibrium.com/quilibrium/monorepo/config" "source.quilibrium.com/quilibrium/monorepo/consensus" "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" - qhypergraph "source.quilibrium.com/quilibrium/monorepo/hypergraph" "source.quilibrium.com/quilibrium/monorepo/node/consensus/provers" "source.quilibrium.com/quilibrium/monorepo/node/consensus/reward" consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time" @@ -111,7 +110,7 @@ type GlobalConsensusEngine struct { executorsMu sync.RWMutex executionManager *manager.ExecutionEngineManager mixnet typesconsensus.Mixnet - peerInfoManager p2p.PeerInfoManager + peerInfoManager tp2p.PeerInfoManager workerManager worker.WorkerManager proposer *provers.Manager alertPublicKey []byte @@ -221,7 +220,7 @@ func NewGlobalConsensusEngine( decafConstructor crypto.DecafConstructor, compiler compiler.CircuitCompiler, blsConstructor crypto.BlsConstructor, - peerInfoManager p2p.PeerInfoManager, + peerInfoManager tp2p.PeerInfoManager, ) (*GlobalConsensusEngine, error) { engine := &GlobalConsensusEngine{ logger: logger, @@ -497,42 +496,7 @@ func (e *GlobalConsensusEngine) Start(quit chan struct{}) <-chan error { var initialState **protobufs.GlobalFrame = nil if frame != nil { - if frame.Header.FrameNumber == 244200 && e.config.P2P.Network == 0 { - e.logger.Warn("purging previous genesis to start new") - err = e.clockStore.DeleteGlobalClockFrameRange(0, 244201) - if err != nil { - panic(err) - } - set := e.hypergraph.(*qhypergraph.HypergraphCRDT).GetVertexAddsSet( - tries.ShardKey{ - L1: [3]byte{}, - L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)), - }, - ) - leaves := tries.GetAllLeaves( - set.GetTree().SetType, - set.GetTree().PhaseType, - set.GetTree().ShardKey, - set.GetTree().Root, - ) - txn, err := e.hypergraph.NewTransaction(false) - if err != nil { - panic(err) - } - for _, l := range leaves { - err = set.GetTree().Delete(txn, l.Key) - if err != nil { - txn.Abort() - panic(err) - } - } - if err = txn.Commit(); err != nil { - panic(err) - } - frame = nil - } else { - initialState = &frame - } + initialState = &frame } if e.config.P2P.Network == 99 || e.config.Engine.ArchiveMode { @@ -615,6 +579,8 @@ func (e *GlobalConsensusEngine) Start(quit chan struct{}) <-chan error { return errChan } + e.peerInfoManager.Start() + // Start consensus message queue processor e.wg.Add(1) go e.processGlobalConsensusMessageQueue() @@ -732,6 +698,7 @@ func (e *GlobalConsensusEngine) setupGRPCServer() error { "quilibrium.node.global.pb.GlobalService": channel.AnyPeer, "quilibrium.node.global.pb.OnionService": channel.AnyPeer, "quilibrium.node.global.pb.KeyRegistryService": channel.OnlySelfPeer, + "quilibrium.node.proxy.pb.PubSubProxy": channel.OnlySelfPeer, }, map[string]channel.AllowedPeerPolicyType{ // Alternative nodes may not need to make this only self peer, but this @@ -851,6 +818,8 @@ func (e *GlobalConsensusEngine) Stop(force bool) <-chan error { e.pubsub.Unsubscribe(GLOBAL_ALERT_BITMASK, false) e.pubsub.UnregisterValidator(GLOBAL_ALERT_BITMASK) + e.peerInfoManager.Stop() + // Wait for goroutines to finish done := make(chan struct{}) go func() { diff --git a/node/consensus/registration/cached_signer_registry_test.go b/node/consensus/registration/cached_signer_registry_test.go index 7779564..b96fe61 100644 --- a/node/consensus/registration/cached_signer_registry_test.go +++ b/node/consensus/registration/cached_signer_registry_test.go @@ -7,6 +7,8 @@ import ( "github.com/cloudflare/circl/sign/ed448" "github.com/iden3/go-iden3-crypto/poseidon" + pcrypto "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -322,10 +324,19 @@ func TestValidateSignedKey(t *testing.T) { // Fix the parent key address to match the signature's public key pubKey := signedKey.Signature.(*protobufs.SignedX448Key_Ed448Signature).Ed448Signature.PublicKey.KeyValue - addressBI, _ := poseidon.HashBytes(pubKey) - signedKey.ParentKeyAddress = addressBI.FillBytes(make([]byte, 32)) + pubkey, err := pcrypto.UnmarshalEd448PublicKey(pubKey) + if err != nil { + panic(err) + } - err := registry.ValidateSignedX448Key(signedKey) + peerID, err := peer.IDFromPublicKey(pubkey) + if err != nil { + panic(err) + } + + signedKey.ParentKeyAddress = []byte(peerID) + + err = registry.ValidateSignedX448Key(signedKey) require.NoError(t, err) }) @@ -438,8 +449,17 @@ func TestPutSignedKey(t *testing.T) { // Fix the parent key address to match the signature's public key pubKey := signedKey.Signature.(*protobufs.SignedX448Key_Ed448Signature).Ed448Signature.PublicKey.KeyValue - addressBI, _ := poseidon.HashBytes(pubKey) - signedKey.ParentKeyAddress = addressBI.FillBytes(make([]byte, 32)) + pubkey, err := pcrypto.UnmarshalEd448PublicKey(pubKey) + if err != nil { + panic(err) + } + + peerID, err := peer.IDFromPublicKey(pubkey) + if err != nil { + panic(err) + } + + signedKey.ParentKeyAddress = []byte(peerID) // Mock the keyStore PutSignedKey keyStore.On("PutSignedX448Key", mockTxn, address, signedKey).Return(nil) diff --git a/node/datarpc/data_worker_ipc_server.go b/node/datarpc/data_worker_ipc_server.go index d11a8c0..81e1f85 100644 --- a/node/datarpc/data_worker_ipc_server.go +++ b/node/datarpc/data_worker_ipc_server.go @@ -20,6 +20,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/types/channel" "source.quilibrium.com/quilibrium/monorepo/types/consensus" "source.quilibrium.com/quilibrium/monorepo/types/crypto" + tp2p "source.quilibrium.com/quilibrium/monorepo/types/p2p" ) type DataWorkerIPCServer struct { @@ -33,7 +34,7 @@ type DataWorkerIPCServer struct { signer crypto.Signer signerRegistry consensus.SignerRegistry proverRegistry consensus.ProverRegistry - peerInfoManager p2p.PeerInfoManager + peerInfoManager tp2p.PeerInfoManager authProvider channel.AuthenticationProvider appConsensusEngineFactory *app.AppConsensusEngineFactory appConsensusEngine *app.AppConsensusEngine @@ -47,7 +48,7 @@ func NewDataWorkerIPCServer( config *config.Config, signerRegistry consensus.SignerRegistry, proverRegistry consensus.ProverRegistry, - peerInfoManager p2p.PeerInfoManager, + peerInfoManager tp2p.PeerInfoManager, frameProver crypto.FrameProver, appConsensusEngineFactory *app.AppConsensusEngineFactory, logger *zap.Logger, diff --git a/node/main.go b/node/main.go index 9418378..564627b 100644 --- a/node/main.go +++ b/node/main.go @@ -5,6 +5,7 @@ package main import ( "bytes" "context" + "crypto/sha3" _ "embed" "encoding/hex" "flag" @@ -33,7 +34,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" - "golang.org/x/crypto/sha3" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "source.quilibrium.com/quilibrium/monorepo/config" @@ -81,6 +81,11 @@ var ( false, "print node related information", ) + peerInfo = flag.Bool( + "peer-info", + false, + "prints peer info", + ) debug = flag.Bool( "debug", false, @@ -129,6 +134,24 @@ var ( ver *string = &bver ) +var capabilityLabels = map[uint32]string{ + 0x00010001: "Compute Protocol v1", + 0x00010101: "KZG Verify (BLS48581)", + 0x00010201: "Bulletproof Range Verify (Decaf448)", + 0x00010301: "Bulletproof Sum Verify (Decaf448)", + 0x00010401: "SECP256K1 ECDSA Verify", + 0x00010501: "ED25519 EdDSA Verify", + 0x00010601: "ED448 EdDSA Verify", + 0x00010701: "DECAF448 Schnorr Verify", + 0x00010801: "SECP256R1 ECDSA Verify", + 0x00020001: "Global Protocol v1", + 0x00030001: "Hypergraph Protocol v1", + 0x00040001: "Token Protocol v1", + 0x0101: "Double Ratchet v1", + 0x0201: "Triple Ratchet v1", + 0x0301: "Onion Routing v1", +} + func signatureCheckDefault() bool { envVarValue, envVarExists := os.LookupEnv("QUILIBRIUM_SIGNATURE_CHECK") if envVarExists { @@ -340,6 +363,16 @@ func main() { return } + if *peerInfo { + config, err := config.LoadConfig(*configDirectory, "", false) + if err != nil { + logger.Fatal("failed to load config", zap.Error(err)) + } + + printPeerInfo(logger, config) + return + } + if *core == 0 { config.PrintLogo(*char) config.PrintVersion(uint8(*network), *char, *ver) @@ -538,7 +571,7 @@ func main() { masterNode.GetLogger(), masterNode.GetKeyManager(), masterNode.GetPubSub(), - masterNode.GetPeerInfoProvider(), + masterNode.GetPeerInfoManager(), masterNode.GetWorkerManager(), masterNode.GetProverRegistry(), masterNode.GetExecutionEngineManager(), @@ -629,10 +662,179 @@ func printNodeInfo(logger *zap.Logger, cfg *config.Config) { fmt.Println("Active Workers:", nodeInfo.Workers) } +func printPeerInfo(logger *zap.Logger, cfg *config.Config) { + if cfg.ListenGRPCMultiaddr == "" { + logger.Fatal("gRPC Not Enabled, Please Configure") + } + + printPeerID(logger, cfg.P2P) + + conn, err := ConnectToNode(logger, cfg) + if err != nil { + logger.Fatal( + "could not connect to node. if it is still booting, please wait.", + zap.Error(err), + ) + } + defer conn.Close() + + client := protobufs.NewNodeServiceClient(conn) + + peerInfo, err := client.GetPeerInfo( + context.Background(), + &protobufs.GetPeerInfoRequest{}, + ) + if err != nil { + logger.Panic("failed to fetch node info", zap.Error(err)) + } + + for idx, p := range peerInfo.PeerInfo { + if p == nil { + continue + } + fmt.Printf("Peer %d:\n", idx+1) + + if peerID := formatPeerID(p.PeerId); peerID != "" { + fmt.Println(" Peer ID:", peerID) + } + + if len(p.Version) >= 3 { + fmt.Println(" Version:", config.FormatVersion(p.Version)) + } + + if patch := formatPatchVersion(p.PatchVersion); patch != "" { + fmt.Println(" Patch Version:", patch) + } + + if p.Timestamp != 0 { + fmt.Println( + " Last Seen:", + time.UnixMilli(p.Timestamp).UTC().Format(time.RFC3339), + ) + } + + printReachability(p.Reachability) + printCapabilities(p.Capabilities) + + if len(p.PublicKey) > 0 { + fmt.Println(" Public Key:", hex.EncodeToString(p.PublicKey)) + } + + if len(p.Signature) > 0 { + fmt.Println(" Signature:", hex.EncodeToString(p.Signature)) + } + + if idx < len(peerInfo.PeerInfo)-1 { + fmt.Println() + } + } +} + +func formatPeerID(raw []byte) string { + if len(raw) == 0 { + return "" + } + id, err := peer.IDFromBytes(raw) + if err != nil { + return hex.EncodeToString(raw) + } + return id.String() +} + +func capabilityDescription(id uint32) string { + if name, ok := capabilityLabels[id]; ok { + return fmt.Sprintf("%s (0x%08X)", name, id) + } + return fmt.Sprintf("0x%08X", id) +} + +func formatPatchVersion(raw []byte) string { + if len(raw) == 0 { + return "" + } + if len(raw) == 1 { + return fmt.Sprintf("%d", raw[0]) + } + return fmt.Sprintf("0x%s", hex.EncodeToString(raw)) +} + +func nonEmptyStrings(values []string) []string { + out := make([]string, 0, len(values)) + for _, v := range values { + if v != "" { + out = append(out, v) + } + } + return out +} + +func printReachability(reach []*protobufs.Reachability) { + printed := false + for _, r := range reach { + if r == nil { + continue + } + filter := hex.EncodeToString(r.Filter) + pubsub := nonEmptyStrings(r.PubsubMultiaddrs) + stream := nonEmptyStrings(r.StreamMultiaddrs) + if filter == "" && len(pubsub) == 0 && len(stream) == 0 { + continue + } + if !printed { + fmt.Println(" Reachability:") + printed = true + } + fmt.Println(" -") + if filter != "" { + fmt.Println(" Filter:", filter) + } + if len(pubsub) > 0 { + fmt.Println(" Pubsub Multiaddrs:") + for _, addr := range pubsub { + fmt.Println(" " + addr) + } + } + if len(stream) > 0 { + fmt.Println(" Stream Multiaddrs:") + for _, addr := range stream { + fmt.Println(" " + addr) + } + } + } +} + +func printCapabilities(list []*protobufs.Capability) { + entries := make([]string, 0, len(list)) + for _, capability := range list { + if capability == nil { + continue + } + desc := capabilityDescription(capability.ProtocolIdentifier) + if len(capability.AdditionalMetadata) > 0 { + desc = fmt.Sprintf( + "%s (metadata: %s)", + desc, + hex.EncodeToString(capability.AdditionalMetadata), + ) + } + entries = append(entries, desc) + } + if len(entries) == 0 { + return + } + fmt.Println(" Capabilities:") + for _, entry := range entries { + fmt.Println(" - " + entry) + } +} + var defaultGrpcAddress = "localhost:8337" // Connect to the node via GRPC -func ConnectToNode(logger *zap.Logger, nodeConfig *config.Config) (*grpc.ClientConn, error) { +func ConnectToNode(logger *zap.Logger, nodeConfig *config.Config) ( + *grpc.ClientConn, + error, +) { addr := defaultGrpcAddress if nodeConfig.ListenGRPCMultiaddr != "" { ma, err := multiaddr.NewMultiaddr(nodeConfig.ListenGRPCMultiaddr) diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index 4ab113e..e7afe7a 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -261,6 +261,7 @@ func NewBlossomSubWithHost( p2pConfig.PingTimeout, p2pConfig.PingPeriod, p2pConfig.PingAttempts, + nil, ) var tracer *blossomsub.JSONTracer @@ -682,6 +683,7 @@ func NewBlossomSub( p2pConfig.PingTimeout, p2pConfig.PingPeriod, p2pConfig.PingAttempts, + directPeers, ) var tracer *blossomsub.JSONTracer diff --git a/node/p2p/internal/peer_monitor.go b/node/p2p/internal/peer_monitor.go index 373bf72..06385eb 100644 --- a/node/p2p/internal/peer_monitor.go +++ b/node/p2p/internal/peer_monitor.go @@ -6,16 +6,21 @@ import ( "time" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/p2p/protocol/ping" "go.uber.org/zap" ) type peerMonitor struct { - ps *ping.PingService - timeout time.Duration - period time.Duration - attempts int + ps *ping.PingService + h host.Host + timeout time.Duration + period time.Duration + attempts int + direct []peer.AddrInfo + directPeriod time.Duration } func (pm *peerMonitor) pingOnce( @@ -51,23 +56,76 @@ func (pm *peerMonitor) ping( } func (pm *peerMonitor) run(ctx context.Context, logger *zap.Logger) { + var ( + pingTicker = time.NewTicker(pm.period) + directTicker *time.Ticker + directChannel <-chan time.Time + ) + defer pingTicker.Stop() + + if len(pm.direct) > 0 && pm.directPeriod > 0 { + directTicker = time.NewTicker(pm.directPeriod) + directChannel = directTicker.C + defer directTicker.Stop() + } + for { select { case <-ctx.Done(): return - case <-time.After(pm.period): - peers := pm.ps.Host.Network().Peers() - wg := &sync.WaitGroup{} - for _, id := range peers { - slogger := logger.With(zap.String("peer_id", id.String())) - wg.Add(1) - go pm.ping(ctx, slogger, wg, id) - } - wg.Wait() + case <-pingTicker.C: + pm.pingConnectedPeers(ctx, logger) + case <-directChannel: + pm.ensureDirectPeers(ctx, logger) } } } +func (pm *peerMonitor) pingConnectedPeers( + ctx context.Context, + logger *zap.Logger, +) { + peers := pm.h.Network().Peers() + wg := &sync.WaitGroup{} + for _, id := range peers { + slogger := logger.With(zap.String("peer_id", id.String())) + wg.Add(1) + go pm.ping(ctx, slogger, wg, id) + } + wg.Wait() +} + +func (pm *peerMonitor) ensureDirectPeers( + ctx context.Context, + logger *zap.Logger, +) { + for _, info := range pm.direct { + if info.ID == pm.h.ID() { + continue + } + + slogger := logger.With(zap.String("peer_id", info.ID.String())) + pm.h.Peerstore().AddAddrs( + info.ID, + info.Addrs, + peerstore.PermanentAddrTTL, + ) + + if pm.h.Network().Connectedness(info.ID) == network.Connected { + continue + } + + connectCtx, cancel := context.WithTimeout(ctx, pm.timeout) + err := pm.h.Connect(connectCtx, info) + cancel() + if err != nil { + slogger.Debug("failed to connect to direct peer", zap.Error(err)) + continue + } + slogger.Info("connected to direct peer") + } +} + // MonitorPeers periodically looks up the peers connected to the host and pings // them repeatedly to ensure they are still reachable. If the peer is not // reachable after the attempts, the connections to the peer are closed. @@ -77,13 +135,17 @@ func MonitorPeers( h host.Host, timeout, period time.Duration, attempts int, + directPeers []peer.AddrInfo, ) { ps := ping.NewPingService(h) pm := &peerMonitor{ - ps: ps, - timeout: timeout, - period: period, - attempts: attempts, + ps: ps, + h: h, + timeout: timeout, + period: period, + attempts: attempts, + direct: directPeers, + directPeriod: 10 * time.Second, } go pm.run(ctx, logger) diff --git a/node/p2p/onion/grpc_transport.go b/node/p2p/onion/grpc_transport.go index b0931dd..1326e69 100644 --- a/node/p2p/onion/grpc_transport.go +++ b/node/p2p/onion/grpc_transport.go @@ -14,9 +14,9 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/consensus" + "source.quilibrium.com/quilibrium/monorepo/types/p2p" ) // GRPCTransport implements the Transport interface using gRPC bidirectional diff --git a/node/p2p/onion/router.go b/node/p2p/onion/router.go index 11bf704..d6c2e97 100644 --- a/node/p2p/onion/router.go +++ b/node/p2p/onion/router.go @@ -22,9 +22,9 @@ import ( "golang.org/x/crypto/chacha20poly1305" "golang.org/x/crypto/hkdf" - "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/types/consensus" "source.quilibrium.com/quilibrium/monorepo/types/keys" + tp2p "source.quilibrium.com/quilibrium/monorepo/types/p2p" ) type createdWaitKey struct { @@ -38,7 +38,7 @@ type createdWaitKey struct { // to provide better performance where hardware acceleration is unavailable. type OnionRouter struct { logger *zap.Logger - peers p2p.PeerInfoManager + peers tp2p.PeerInfoManager signers consensus.SignerRegistry keyManager keys.KeyManager link Transport @@ -71,7 +71,7 @@ func WithSharedSecret(f SharedSecretFn) Option { func NewOnionRouter( logger *zap.Logger, - peerManager p2p.PeerInfoManager, + peerManager tp2p.PeerInfoManager, signerRegistry consensus.SignerRegistry, keyManager keys.KeyManager, opts ...Option, @@ -335,7 +335,7 @@ func (r *OnionRouter) BuildCircuitToExit( } // Filter out the chosen exit. - filtered := make([]*p2p.PeerInfo, 0, len(cands)) + filtered := make([]*tp2p.PeerInfo, 0, len(cands)) for _, pm := range cands { if string(pm.PeerId) == string(exitPeerID) { continue @@ -356,7 +356,7 @@ func (r *OnionRouter) BuildCircuitToExit( // Pick first (hops-1) from filtered (already shuffled in selectRoutingPeers), // then append exit at the end. - route := make([]*p2p.PeerInfo, 0, hops) + route := make([]*tp2p.PeerInfo, 0, hops) if hops > 1 { route = append(route, filtered[:hops-1]...) } @@ -904,9 +904,9 @@ func (r *OnionRouter) closeStream(c *Circuit, s *onionStream) { r.mu.Unlock() } -func (r *OnionRouter) selectRoutingPeers() ([]*p2p.PeerInfo, error) { +func (r *OnionRouter) selectRoutingPeers() ([]*tp2p.PeerInfo, error) { ids := r.peers.GetPeersBySpeed() - out := make([]*p2p.PeerInfo, 0, len(ids)) + out := make([]*tp2p.PeerInfo, 0, len(ids)) seen := map[string]struct{}{} for _, id := range ids { @@ -933,7 +933,7 @@ func (r *OnionRouter) selectRoutingPeers() ([]*p2p.PeerInfo, error) { return out, nil } -func hasCapability(pm *p2p.PeerInfo, x uint32) bool { +func hasCapability(pm *tp2p.PeerInfo, x uint32) bool { for _, c := range pm.Capabilities { if c.ProtocolIdentifier == x { return true diff --git a/node/p2p/peer_authenticator.go b/node/p2p/peer_authenticator.go index acbd9b1..98dced4 100644 --- a/node/p2p/peer_authenticator.go +++ b/node/p2p/peer_authenticator.go @@ -32,6 +32,7 @@ import ( qgrpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" "source.quilibrium.com/quilibrium/monorepo/types/channel" "source.quilibrium.com/quilibrium/monorepo/types/consensus" + "source.quilibrium.com/quilibrium/monorepo/types/p2p" ) type PeerAuthenticator struct { @@ -41,7 +42,7 @@ type PeerAuthenticator struct { config *config.P2PConfig // peer info manager is used to identify live peers - peerInfoManager PeerInfoManager + peerInfoManager p2p.PeerInfoManager // prover registry and signer registry are used to confirm membership of // a given shard or global prover status @@ -65,7 +66,7 @@ type PeerAuthenticator struct { func NewPeerAuthenticator( logger *zap.Logger, config *config.P2PConfig, - peerInfoManager PeerInfoManager, + peerInfoManager p2p.PeerInfoManager, proverRegistry consensus.ProverRegistry, signerRegistry consensus.SignerRegistry, filter []byte, diff --git a/node/p2p/peer_info_manager.go b/node/p2p/peer_info_manager.go index 25126c1..13571eb 100644 --- a/node/p2p/peer_info_manager.go +++ b/node/p2p/peer_info_manager.go @@ -7,55 +7,27 @@ import ( "go.uber.org/zap" "source.quilibrium.com/quilibrium/monorepo/protobufs" + "source.quilibrium.com/quilibrium/monorepo/types/p2p" ) -type PeerInfoManager interface { - Start() - Stop() - AddPeerInfo(info *protobufs.PeerInfo) - GetPeerInfo(peerId []byte) *PeerInfo - GetPeerMap() map[string]*PeerInfo - GetPeersBySpeed() [][]byte -} - -type Reachability struct { - Filter []byte - PubsubMultiaddrs []string - StreamMultiaddrs []string -} - -type Capability struct { - ProtocolIdentifier uint32 - AdditionalMetadata []byte -} - -type PeerInfo struct { - PeerId []byte - Cores uint32 - Capabilities []Capability - Reachability []Reachability - Bandwidth uint64 - LastSeen int64 -} - type InMemoryPeerInfoManager struct { logger *zap.Logger peerInfoCh chan *protobufs.PeerInfo quitCh chan struct{} peerInfoMx sync.RWMutex - peerMap map[string]*PeerInfo - fastestPeers []*PeerInfo + peerMap map[string]*p2p.PeerInfo + fastestPeers []*p2p.PeerInfo } -var _ PeerInfoManager = (*InMemoryPeerInfoManager)(nil) +var _ p2p.PeerInfoManager = (*InMemoryPeerInfoManager)(nil) func NewInMemoryPeerInfoManager(logger *zap.Logger) *InMemoryPeerInfoManager { return &InMemoryPeerInfoManager{ logger: logger, peerInfoCh: make(chan *protobufs.PeerInfo, 1000), - fastestPeers: []*PeerInfo{}, - peerMap: make(map[string]*PeerInfo), + fastestPeers: []*p2p.PeerInfo{}, + peerMap: make(map[string]*p2p.PeerInfo), } } @@ -65,23 +37,23 @@ func (m *InMemoryPeerInfoManager) Start() { select { case info := <-m.peerInfoCh: m.peerInfoMx.Lock() - reachability := []Reachability{} + reachability := []p2p.Reachability{} for _, r := range info.Reachability { - reachability = append(reachability, Reachability{ + reachability = append(reachability, p2p.Reachability{ Filter: r.Filter, PubsubMultiaddrs: r.PubsubMultiaddrs, StreamMultiaddrs: r.StreamMultiaddrs, }) } - capabilities := []Capability{} + capabilities := []p2p.Capability{} for _, c := range info.Capabilities { - capabilities = append(capabilities, Capability{ + capabilities = append(capabilities, p2p.Capability{ ProtocolIdentifier: c.ProtocolIdentifier, AdditionalMetadata: c.AdditionalMetadata, }) } seen := time.Now().UnixMilli() - m.peerMap[string(info.PeerId)] = &PeerInfo{ + m.peerMap[string(info.PeerId)] = &p2p.PeerInfo{ PeerId: info.PeerId, Bandwidth: 100, Capabilities: capabilities, @@ -89,7 +61,7 @@ func (m *InMemoryPeerInfoManager) Start() { Cores: uint32(len(reachability)), LastSeen: seen, } - m.searchAndInsertPeer(&PeerInfo{ + m.searchAndInsertPeer(&p2p.PeerInfo{ PeerId: info.PeerId, Bandwidth: 100, Capabilities: capabilities, @@ -117,7 +89,7 @@ func (m *InMemoryPeerInfoManager) AddPeerInfo(info *protobufs.PeerInfo) { }() } -func (m *InMemoryPeerInfoManager) GetPeerInfo(peerId []byte) *PeerInfo { +func (m *InMemoryPeerInfoManager) GetPeerInfo(peerId []byte) *p2p.PeerInfo { m.peerInfoMx.RLock() manifest, ok := m.peerMap[string(peerId)] m.peerInfoMx.RUnlock() @@ -127,8 +99,8 @@ func (m *InMemoryPeerInfoManager) GetPeerInfo(peerId []byte) *PeerInfo { return manifest } -func (m *InMemoryPeerInfoManager) GetPeerMap() map[string]*PeerInfo { - data := make(map[string]*PeerInfo) +func (m *InMemoryPeerInfoManager) GetPeerMap() map[string]*p2p.PeerInfo { + data := make(map[string]*p2p.PeerInfo) m.peerInfoMx.RLock() for k, v := range m.peerMap { data[k] = v @@ -150,7 +122,7 @@ func (m *InMemoryPeerInfoManager) GetPeersBySpeed() [][]byte { // blatantly lifted from slices.BinarySearchFunc, optimized for direct insertion // and uint64 comparison without overflow -func (m *InMemoryPeerInfoManager) searchAndInsertPeer(info *PeerInfo) { +func (m *InMemoryPeerInfoManager) searchAndInsertPeer(info *p2p.PeerInfo) { n := len(m.fastestPeers) i, j := 0, n for i < j { @@ -166,7 +138,7 @@ func (m *InMemoryPeerInfoManager) searchAndInsertPeer(info *PeerInfo) { bytes.Equal(m.fastestPeers[i].PeerId, info.PeerId) { m.fastestPeers[i] = info } else { - m.fastestPeers = append(m.fastestPeers, new(PeerInfo)) + m.fastestPeers = append(m.fastestPeers, new(p2p.PeerInfo)) copy(m.fastestPeers[i+1:], m.fastestPeers[i:]) m.fastestPeers[i] = info } diff --git a/node/rpc/node_rpc_server.go b/node/rpc/node_rpc_server.go index b0a07a6..e298743 100644 --- a/node/rpc/node_rpc_server.go +++ b/node/rpc/node_rpc_server.go @@ -35,10 +35,6 @@ import ( up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p" ) -type PeerInfoProvider interface { - GetPeerInfo() *protobufs.PeerInfo -} - // RPCServer strictly implements NodeService. type RPCServer struct { protobufs.NodeServiceServer @@ -48,7 +44,7 @@ type RPCServer struct { logger *zap.Logger keyManager keys.KeyManager pubSub p2p.PubSub - peerInfoProvider PeerInfoProvider + peerInfoProvider p2p.PeerInfoManager workerManager worker.WorkerManager proverRegistry consensus.ProverRegistry executionManager *manager.ExecutionEngineManager @@ -141,13 +137,38 @@ func (r *RPCServer) GetPeerInfo( ctx context.Context, _ *protobufs.GetPeerInfoRequest, ) (*protobufs.PeerInfoResponse, error) { - self := r.peerInfoProvider.GetPeerInfo() - if self == nil { + set := r.peerInfoProvider.GetPeerMap() + if set == nil { return nil, errors.Wrap(errors.New("no peer info"), "get peer info") } + out := []*protobufs.PeerInfo{} + for _, pi := range set { + re := []*protobufs.Reachability{} + for _, e := range pi.Reachability { + re = append(re, &protobufs.Reachability{ + Filter: e.Filter, + PubsubMultiaddrs: e.PubsubMultiaddrs, + StreamMultiaddrs: e.StreamMultiaddrs, + }) + } + cs := []*protobufs.Capability{} + for _, e := range pi.Capabilities { + cs = append(cs, &protobufs.Capability{ + ProtocolIdentifier: e.ProtocolIdentifier, + AdditionalMetadata: e.AdditionalMetadata, + }) + } + out = append(out, &protobufs.PeerInfo{ + PeerId: pi.PeerId, + Reachability: re, + Timestamp: pi.LastSeen, + Capabilities: cs, + }) + } + return &protobufs.PeerInfoResponse{ - PeerInfo: self, + PeerInfo: out, }, nil } @@ -320,7 +341,7 @@ func NewRPCServer( logger *zap.Logger, keyManager keys.KeyManager, pubSub p2p.PubSub, - peerInfoProvider PeerInfoProvider, + peerInfoProvider p2p.PeerInfoManager, workerManager worker.WorkerManager, proverRegistry consensus.ProverRegistry, executionManager *manager.ExecutionEngineManager, diff --git a/node/worker/manager.go b/node/worker/manager.go index 6980d80..239e777 100644 --- a/node/worker/manager.go +++ b/node/worker/manager.go @@ -520,6 +520,10 @@ func (w *WorkerManager) loadWorkersFromStore() error { continue } } + workers, err = w.store.RangeWorkers() + if err != nil { + return errors.Wrap(err, "load workers from store") + } } var totalStorage uint64 diff --git a/protobufs/keys.go b/protobufs/keys.go index 98059a9..45c9a46 100644 --- a/protobufs/keys.go +++ b/protobufs/keys.go @@ -433,7 +433,7 @@ func (s *SignedX448Key) Verify( } pubKey, err := pcrypto.UnmarshalEd448PublicKey( - s.Key.KeyValue, + sig.Ed448Signature.PublicKey.KeyValue, ) if err != nil { return errors.Wrap(err, "verify signature") @@ -444,7 +444,7 @@ func (s *SignedX448Key) Verify( } // Check that parent key address matches the public key - identityPeerID := []byte(peerID.String()) + identityPeerID := []byte(peerID) if !bytes.Equal(identityPeerID, s.ParentKeyAddress) { return errors.Wrap( @@ -559,7 +559,7 @@ func (s *SignedDecaf448Key) Verify( } pubKey, err := pcrypto.UnmarshalEd448PublicKey( - s.Key.KeyValue, + sig.Ed448Signature.PublicKey.KeyValue, ) if err != nil { return errors.Wrap(err, "verify signature") @@ -570,7 +570,7 @@ func (s *SignedDecaf448Key) Verify( } // Check that parent key address matches the public key - identityPeerID := []byte(peerID.String()) + identityPeerID := []byte(peerID) if !bytes.Equal(identityPeerID, s.ParentKeyAddress) { return errors.Wrap( errors.New("parent key address does not match public key"), diff --git a/protobufs/node.pb.go b/protobufs/node.pb.go index 85c5189..974701f 100644 --- a/protobufs/node.pb.go +++ b/protobufs/node.pb.go @@ -318,7 +318,7 @@ type PeerInfoResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - PeerInfo *PeerInfo `protobuf:"bytes,1,opt,name=peer_info,json=peerInfo,proto3" json:"peer_info,omitempty"` + PeerInfo []*PeerInfo `protobuf:"bytes,1,rep,name=peer_info,json=peerInfo,proto3" json:"peer_info,omitempty"` } func (x *PeerInfoResponse) Reset() { @@ -353,7 +353,7 @@ func (*PeerInfoResponse) Descriptor() ([]byte, []int) { return file_node_proto_rawDescGZIP(), []int{5} } -func (x *PeerInfoResponse) GetPeerInfo() *PeerInfo { +func (x *PeerInfoResponse) GetPeerInfo() []*PeerInfo { if x != nil { return x.PeerInfo } @@ -1937,7 +1937,7 @@ var file_node_proto_rawDesc = []byte{ 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x22, 0x52, 0x0a, 0x10, 0x50, 0x65, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3e, 0x0a, 0x09, 0x70, 0x65, 0x65, 0x72, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x70, 0x65, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0xa5, 0x01, 0x0a, 0x10, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, diff --git a/protobufs/node.proto b/protobufs/node.proto index 0ca2ade..09ab340 100644 --- a/protobufs/node.proto +++ b/protobufs/node.proto @@ -44,7 +44,7 @@ message PeerInfo { } message PeerInfoResponse { - PeerInfo peer_info = 1; + repeated PeerInfo peer_info = 1; } message NodeInfoResponse { diff --git a/types/p2p/peer_info_manager.go b/types/p2p/peer_info_manager.go new file mode 100644 index 0000000..bdeeb7d --- /dev/null +++ b/types/p2p/peer_info_manager.go @@ -0,0 +1,32 @@ +package p2p + +import "source.quilibrium.com/quilibrium/monorepo/protobufs" + +type PeerInfoManager interface { + Start() + Stop() + AddPeerInfo(info *protobufs.PeerInfo) + GetPeerInfo(peerId []byte) *PeerInfo + GetPeerMap() map[string]*PeerInfo + GetPeersBySpeed() [][]byte +} + +type Reachability struct { + Filter []byte + PubsubMultiaddrs []string + StreamMultiaddrs []string +} + +type Capability struct { + ProtocolIdentifier uint32 + AdditionalMetadata []byte +} + +type PeerInfo struct { + PeerId []byte + Cores uint32 + Capabilities []Capability + Reachability []Reachability + Bandwidth uint64 + LastSeen int64 +}