ceremonyclient/node/rpc/node_rpc_server.go
Cassandra Heart 9e1d07d1a0
Squashed commit of the following:
commit d05a4d5f688dbd09900ceccdcc5f8109dd0671c2
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Wed Jun 12 00:50:16 2024 -0500

    merge

commit db57ff1f191f9dedc87ca77da1c71244dd2325bd
Merge: 7b43494 2e3279a
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Wed Jun 12 00:49:32 2024 -0500

    Merge branch 'v1.4.19' into not-release

commit 7b43494246e28152b46710c8c9821429d4231f7e
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Wed Jun 12 00:49:13 2024 -0500

    pull from release site

commit 2e3279ac930ac630d9ca2b26cf4f3232abe79823
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Sat Jun 8 06:31:02 2024 -0500

    remove binaries

commit 2768a8778b3860c5736352c8aa950e3496a46e56
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Sat Jun 8 06:24:44 2024 -0500

    signatory #8 added

commit 6a944628575ccadd17c9f9f4a11a49c032fa0c1d
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Sat Jun 8 06:08:26 2024 -0500

    signatory #6 added

commit b401fb65e5ddbe0340fe85aab1182d6120a4e161
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Sat Jun 8 05:39:03 2024 -0500

    signatory #3 added

commit e5700913c0f6246fb607bcd3e219c257cb4a80e9
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Sat Jun 8 05:31:24 2024 -0500

    signatory #15 added

commit 9b1da6c03e517135bfcd59226f900adab42f3687
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Sat Jun 8 05:23:02 2024 -0500

    signatories #4 and #16 added

commit 9c97d1bbc399a070ac21b35ed9b1af127fa4c7ea
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Sat Jun 8 04:59:27 2024 -0500

    signatories #1 and #2 added

commit 905e3f78a8121eade1c331ae910ed25dd534f27a
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Sat Jun 8 04:40:32 2024 -0500

    build, binaries, signatory #13

commit ebfb57bc29d9ed1fb25d0dd100e38709354b3d84
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Sat Jun 8 03:38:53 2024 -0500

    tests pass, let's go

commit 5d4612c6c624c3dc18f9a5657936034ac9d9d8dd
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Fri Jun 7 03:53:15 2024 -0500

    update version info + readme

commit 6b0dd69e930d01b98acb8d7b56bb5d572e1a4324
Merge: 090d630 859221b
Author: Cassie Heart <cassandra@quilibrium.com>
Date:   Fri Jun 7 08:25:16 2024 +0000

    Merge branch 'feat-data-worker-direct-config' into 'v1.4.19'

    feat: support detached configuration mode for data workers

    See merge request quilibrium/ceremonyclient!7

commit 859221b179ab2631fa474be2494259afaaa6bd51
Author: Cassandra Heart <cassandra@quilibrium.com>
Date:   Fri Jun 7 03:24:22 2024 -0500

    feat: support detached configuration mode for data workers

commit 090d6301d44a2aa88886120783cd5a6e537aa6d1
Merge: 62db30c d1cae94
Author: Cassie Heart <cassandra@quilibrium.com>
Date:   Fri Jun 7 06:25:43 2024 +0000

    Merge branch 'feat-go-1-22' into 'v1.4.19'

    feat: go 1.22 support

    See merge request quilibrium/ceremonyclient!6

commit d1cae942165f4871f8051e266722c0ca717780cb
Author: Cassie Heart <cassandra@quilibrium.com>
Date:   Fri Jun 7 06:25:43 2024 +0000

    feat: go 1.22 support

commit 62db30c54f9258c92113c6664ce817670a339083
Merge: 0cbc0d0 f36cea3
Author: Cassie Heart <cassandra@quilibrium.com>
Date:   Fri Jun 7 03:52:17 2024 +0000

    Merge branch 'rust-vdf' into 'v1.4.19'

    Switch to Rust VDF

    See merge request quilibrium/ceremonyclient!2

commit f36cea323bfe5e56f519f59f9a0cce35f0f8b6ab
Author: Agost Biro <agostbiro@gmail.com>
Date:   Fri Jun 7 03:52:16 2024 +0000

    Switch to Rust VDF

commit 0cbc0d0d319713e20ca7f48588c4153833e58429
Merge: 986e12c 0c48a83
Author: Cassie Heart <cassandra@quilibrium.com>
Date:   Fri Jun 7 00:50:15 2024 +0000

    Merge branch 'release_image' into 'v1.4.19'

    create docker image based on release binaries

    See merge request quilibrium/ceremonyclient!4

commit 0c48a83bb5751abf7c8c0ff188bfdc2130631e78
Author: Marius Scurtescu <marius.scurtescu@gmail.com>
Date:   Fri Jun 7 00:50:15 2024 +0000

    create docker image based on release binaries

commit 986e12c88bb2d2b412b59f7db1ae39f828304dbe
Merge: 58456c1 a3ef5c6
Author: Cassie Heart <cassandra@quilibrium.com>
Date:   Wed Jun 5 22:01:37 2024 +0000

    Merge branch 'signature_check' into 'v1.4.19'

    add default of signature check from QUILIBRIUM_SIGNATURE_CHECK env var

    See merge request quilibrium/ceremonyclient!1

commit a3ef5c6af2d5de107d01c45a62d7324165e2551b
Author: Marius Scurtescu <marius.scurtescu@gmail.com>
Date:   Wed Jun 5 14:37:50 2024 -0700

    add default of signature check from QUILIBRIUM_SIGNATURE_CHECK env var
2024-06-12 00:51:16 -05:00

429 lines
11 KiB
Go

package rpc
import (
"bytes"
"context"
"net/http"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/multiformats/go-multiaddr"
mn "github.com/multiformats/go-multiaddr/net"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/master"
"source.quilibrium.com/quilibrium/monorepo/node/execution"
"source.quilibrium.com/quilibrium/monorepo/node/keys"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/store"
)
type RPCServer struct {
protobufs.UnimplementedNodeServiceServer
listenAddrGRPC string
listenAddrHTTP string
logger *zap.Logger
dataProofStore store.DataProofStore
clockStore store.ClockStore
keyManager keys.KeyManager
pubSub p2p.PubSub
masterClock *master.MasterClockConsensusEngine
executionEngines []execution.ExecutionEngine
}
// GetFrameInfo implements protobufs.NodeServiceServer.
func (r *RPCServer) GetFrameInfo(
ctx context.Context,
req *protobufs.GetFrameInfoRequest,
) (*protobufs.FrameInfoResponse, error) {
if bytes.Equal(req.Filter, p2p.BITMASK_ALL) {
frame, err := r.clockStore.GetMasterClockFrame(
req.Filter,
req.FrameNumber,
)
if err != nil {
return nil, errors.Wrap(err, "get frame info")
}
return &protobufs.FrameInfoResponse{
ClockFrame: frame,
}, nil
} else if req.Selector == nil {
frame, _, err := r.clockStore.GetDataClockFrame(
req.Filter,
req.FrameNumber,
false,
)
if err != nil {
return nil, errors.Wrap(err, "get frame info")
}
return &protobufs.FrameInfoResponse{
ClockFrame: frame,
}, nil
} else {
return nil, errors.Wrap(errors.New("not found"), "get frame info")
}
}
// GetFrames implements protobufs.NodeServiceServer.
func (r *RPCServer) GetFrames(
ctx context.Context,
req *protobufs.GetFramesRequest,
) (*protobufs.FramesResponse, error) {
if bytes.Equal(req.Filter, p2p.BITMASK_ALL) {
iter, err := r.clockStore.RangeMasterClockFrames(
req.Filter,
req.FromFrameNumber,
req.ToFrameNumber,
)
if err != nil {
return nil, errors.Wrap(err, "get frames")
}
frames := []*protobufs.ClockFrame{}
for iter.First(); iter.Valid(); iter.Next() {
frame, err := iter.Value()
if err != nil {
iter.Close()
return nil, errors.Wrap(err, "get frames")
}
frames = append(frames, frame)
}
if err := iter.Close(); err != nil {
return nil, errors.Wrap(err, "get frames")
}
return &protobufs.FramesResponse{
TruncatedClockFrames: frames,
}, nil
} else {
iter, err := r.clockStore.RangeDataClockFrames(
req.Filter,
req.FromFrameNumber,
req.ToFrameNumber,
)
if err != nil {
return nil, errors.Wrap(err, "get frame info")
}
frames := []*protobufs.ClockFrame{}
for iter.First(); iter.Valid(); iter.Next() {
frame, err := iter.TruncatedValue()
if err != nil {
iter.Close()
return nil, errors.Wrap(err, "get frames")
}
frames = append(frames, frame)
}
if err := iter.Close(); err != nil {
return nil, errors.Wrap(err, "get frames")
}
return &protobufs.FramesResponse{
TruncatedClockFrames: frames,
}, nil
}
}
// GetNetworkInfo implements protobufs.NodeServiceServer.
func (r *RPCServer) GetNetworkInfo(
ctx context.Context,
req *protobufs.GetNetworkInfoRequest,
) (*protobufs.NetworkInfoResponse, error) {
return r.pubSub.GetNetworkInfo(), nil
}
// GetNodeInfo implements protobufs.NodeServiceServer.
func (r *RPCServer) GetNodeInfo(
ctx context.Context,
req *protobufs.GetNodeInfoRequest,
) (*protobufs.NodeInfoResponse, error) {
peerID, err := peer.IDFromBytes(r.pubSub.GetPeerID())
if err != nil {
return nil, errors.Wrap(err, "getting id from bytes")
}
peerScore := r.pubSub.GetPeerScore(r.pubSub.GetPeerID())
return &protobufs.NodeInfoResponse{
PeerId: peerID.String(),
MaxFrame: r.masterClock.GetFrame().GetFrameNumber(),
PeerScore: uint64(peerScore),
Version: config.GetVersion(),
}, nil
}
// GetPeerInfo implements protobufs.NodeServiceServer.
func (r *RPCServer) GetPeerInfo(
ctx context.Context,
req *protobufs.GetPeerInfoRequest,
) (*protobufs.PeerInfoResponse, error) {
resp := &protobufs.PeerInfoResponse{}
manifests := r.masterClock.GetPeerManifests()
for _, m := range manifests.PeerManifests {
multiaddr := r.pubSub.GetMultiaddrOfPeer(m.PeerId)
addrs := []string{}
if multiaddr != "" {
addrs = append(addrs, multiaddr)
}
resp.PeerInfo = append(resp.PeerInfo, &protobufs.PeerInfo{
PeerId: m.PeerId,
Multiaddrs: addrs,
MaxFrame: m.MasterHeadFrame,
Timestamp: m.LastSeen,
// We can get away with this for this release only, we will want to add
// version info in manifests.
Version: config.GetVersion(),
})
}
return resp, nil
}
// Only returns the active amounts earned under 1.4.19 until 2.0
func (r *RPCServer) GetTokenInfo(
ctx context.Context,
req *protobufs.GetTokenInfoRequest,
) (*protobufs.TokenInfoResponse, error) {
// provingKey, err := r.keyManager.GetRawKey(
// "default-proving-key",
// )
// if err != nil {
// return nil, errors.Wrap(err, "get token info")
// }
// peerBytes := r.pubSub.GetPeerID()
// peerAddr, err := poseidon.HashBytes(peerBytes)
// if err != nil {
// panic(err)
// }
// addr, err := poseidon.HashBytes(provingKey.PublicKey)
// if err != nil {
// panic(err)
// }
// addrBytes := addr.Bytes()
// addrBytes = append(make([]byte, 32-len(addrBytes)), addrBytes...)
// peerAddrBytes := peerAddr.Bytes()
// peerAddrBytes = append(make([]byte, 32-len(peerAddrBytes)), peerAddrBytes...)
// frame, err := r.clockStore.GetLatestDataClockFrame(
// append(
// p2p.GetBloomFilter(application.CEREMONY_ADDRESS, 256, 3),
// p2p.GetBloomFilterIndices(application.CEREMONY_ADDRESS, 65536, 24)...,
// ),
// nil,
// )
// if err != nil {
// return nil, errors.Wrap(err, "get token info")
// }
// confirmed, err := application.MaterializeApplicationFromFrame(frame)
// if err != nil {
// return nil, errors.Wrap(err, "get token info")
// }
// confirmedTotal := new(big.Int)
// ownedTotal := new(big.Int)
// if confirmed.RewardTrie.Root == nil ||
// (confirmed.RewardTrie.Root.External == nil &&
// confirmed.RewardTrie.Root.Internal == nil) {
// return &protobufs.TokenInfoResponse{
// ConfirmedTokenSupply: confirmedTotal.FillBytes(make([]byte, 32)),
// UnconfirmedTokenSupply: confirmedTotal.FillBytes(make([]byte, 32)),
// OwnedTokens: ownedTotal.FillBytes(make([]byte, 32)),
// UnconfirmedOwnedTokens: ownedTotal.FillBytes(make([]byte, 32)),
// }, nil
// }
// limbs := []*tries.RewardInternalNode{}
// if confirmed.RewardTrie.Root.Internal != nil {
// limbs = append(limbs, confirmed.RewardTrie.Root.Internal)
// } else {
// confirmedTotal = confirmedTotal.Add(
// confirmedTotal,
// new(big.Int).SetUint64(confirmed.RewardTrie.Root.External.Total),
// )
// if bytes.Equal(
// confirmed.RewardTrie.Root.External.Key,
// addrBytes,
// ) {
// ownedTotal = ownedTotal.Add(
// ownedTotal,
// new(big.Int).SetUint64(confirmed.RewardTrie.Root.External.Total),
// )
// }
// }
// for len(limbs) != 0 {
// nextLimbs := []*tries.RewardInternalNode{}
// for _, limb := range limbs {
// for _, child := range limb.Child {
// child := child
// if child.Internal != nil {
// nextLimbs = append(nextLimbs, child.Internal)
// } else {
// confirmedTotal = confirmedTotal.Add(
// confirmedTotal,
// new(big.Int).SetUint64(child.External.Total),
// )
// if bytes.Equal(
// child.External.Key,
// addrBytes,
// ) {
// ownedTotal = ownedTotal.Add(
// ownedTotal,
// new(big.Int).SetUint64(child.External.Total),
// )
// }
// if bytes.Equal(
// child.External.Key,
// peerAddrBytes,
// ) {
// ownedTotal = ownedTotal.Add(
// ownedTotal,
// new(big.Int).SetUint64(child.External.Total),
// )
// }
// }
// }
// }
// limbs = nextLimbs
// }
// if err != nil {
// return nil, errors.Wrap(err, "get token info")
// }
// 1 QUIL = 0x1DCD65000 units
// conversionFactor, ok := new(big.Int).SetString("1DCD65000", 16)
// if !ok {
// return nil, errors.Wrap(err, "get token info")
// }
total, err := r.dataProofStore.GetTotalReward(r.pubSub.GetPeerID())
if err != nil {
panic(err)
}
// confirmedTotal = confirmedTotal.Mul(confirmedTotal, conversionFactor)
// ownedTotal = ownedTotal.Mul(ownedTotal, conversionFactor)
return &protobufs.TokenInfoResponse{
// ConfirmedTokenSupply: confirmedTotal.FillBytes(make([]byte, 32)),
// UnconfirmedTokenSupply: confirmedTotal.FillBytes(make([]byte, 32)),
// OwnedTokens: ownedTotal.FillBytes(make([]byte, 32)),
UnconfirmedOwnedTokens: total.FillBytes(make([]byte, 32)),
}, nil
}
func (r *RPCServer) GetPeerManifests(
ctx context.Context,
req *protobufs.GetPeerManifestsRequest,
) (*protobufs.PeerManifestsResponse, error) {
return r.masterClock.GetPeerManifests(), nil
}
func NewRPCServer(
listenAddrGRPC string,
listenAddrHTTP string,
logger *zap.Logger,
dataProofStore store.DataProofStore,
clockStore store.ClockStore,
keyManager keys.KeyManager,
pubSub p2p.PubSub,
masterClock *master.MasterClockConsensusEngine,
executionEngines []execution.ExecutionEngine,
) (*RPCServer, error) {
return &RPCServer{
listenAddrGRPC: listenAddrGRPC,
listenAddrHTTP: listenAddrHTTP,
logger: logger,
dataProofStore: dataProofStore,
clockStore: clockStore,
keyManager: keyManager,
pubSub: pubSub,
masterClock: masterClock,
executionEngines: executionEngines,
}, nil
}
func (r *RPCServer) Start() error {
s := grpc.NewServer(
grpc.MaxRecvMsgSize(600*1024*1024),
grpc.MaxSendMsgSize(600*1024*1024),
)
protobufs.RegisterNodeServiceServer(s, r)
reflection.Register(s)
mg, err := multiaddr.NewMultiaddr(r.listenAddrGRPC)
if err != nil {
return errors.Wrap(err, "start")
}
lis, err := mn.Listen(mg)
if err != nil {
return errors.Wrap(err, "start")
}
go func() {
if err := s.Serve(mn.NetListener(lis)); err != nil {
panic(err)
}
}()
if r.listenAddrHTTP != "" {
m, err := multiaddr.NewMultiaddr(r.listenAddrHTTP)
if err != nil {
return errors.Wrap(err, "start")
}
ma, err := mn.ToNetAddr(m)
if err != nil {
return errors.Wrap(err, "start")
}
mga, err := mn.ToNetAddr(mg)
if err != nil {
return errors.Wrap(err, "start")
}
go func() {
mux := runtime.NewServeMux()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(600*1024*1024),
grpc.MaxCallSendMsgSize(600*1024*1024),
),
}
if err := protobufs.RegisterNodeServiceHandlerFromEndpoint(
context.Background(),
mux,
mga.String(),
opts,
); err != nil {
panic(err)
}
if err := http.ListenAndServe(ma.String(), mux); err != nil {
panic(err)
}
}()
}
return nil
}