diff --git a/core/commands/publish.go b/core/commands/publish.go index 3d58ab431..042f3f0c5 100644 --- a/core/commands/publish.go +++ b/core/commands/publish.go @@ -35,7 +35,7 @@ func Publish(n *core.IpfsNode, args []string, opts map[string]interface{}, out i } // later, n.Keychain.Get(name).PrivKey - k := n.Identity.PrivKey + k := n.Identity.PrivKey() pub := nsys.NewRoutingPublisher(n.Routing) err := pub.Publish(k, ref) diff --git a/core/commands/resolve.go b/core/commands/resolve.go index 7307dc265..f6b462c8e 100644 --- a/core/commands/resolve.go +++ b/core/commands/resolve.go @@ -19,7 +19,7 @@ func Resolve(n *core.IpfsNode, args []string, opts map[string]interface{}, out i if n.Identity == nil { return errors.New("Identity not loaded!") } - name = n.Identity.ID.String() + name = n.Identity.ID().String() default: return fmt.Errorf("Publish expects 1 or 2 args; got %d.", len(args)) diff --git a/core/core.go b/core/core.go index 69a5c2b3e..22cf77e8c 100644 --- a/core/core.go +++ b/core/core.go @@ -36,7 +36,7 @@ type IpfsNode struct { Config *config.Config // the local node's identity - Identity *peer.Peer + Identity peer.Peer // storage for other Peer instances Peerstore peer.Peerstore @@ -177,7 +177,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { }, nil } -func initIdentity(cfg *config.Config, peers peer.Peerstore, online bool) (*peer.Peer, error) { +func initIdentity(cfg *config.Config, peers peer.Peerstore, online bool) (peer.Peer, error) { if cfg.Identity.PeerID == "" { return nil, errors.New("Identity was not set in config (was ipfs init run?)") } diff --git a/core/mock.go b/core/mock.go index 340478c1c..114acd7db 100644 --- a/core/mock.go +++ b/core/mock.go @@ -8,7 +8,7 @@ import ( mdag "github.com/jbenet/go-ipfs/merkledag" nsys "github.com/jbenet/go-ipfs/namesys" path "github.com/jbenet/go-ipfs/path" - "github.com/jbenet/go-ipfs/peer" + peer "github.com/jbenet/go-ipfs/peer" mdht "github.com/jbenet/go-ipfs/routing/mock" ) @@ -16,20 +16,18 @@ import ( func NewMockNode() (*IpfsNode, error) { nd := new(IpfsNode) - //Generate Identity - nd.Peerstore = peer.NewPeerstore() - var err error - nd.Identity, err = nd.Peerstore.Get(peer.ID("TESTING")) + // Generate Identity + sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024) if err != nil { return nil, err } - pk, sk, err := ci.GenerateKeyPair(ci.RSA, 1024) + nd.Identity, err = peer.WithKeyPair(sk, pk) if err != nil { return nil, err } - nd.Identity.PrivKey = pk - nd.Identity.PubKey = sk + nd.Peerstore = peer.NewPeerstore() + nd.Peerstore.Put(nd.Identity) // Temp Datastore dstore := ds.NewMapDatastore() diff --git a/crypto/spipe/handshake.go b/crypto/spipe/handshake.go index 140ef584f..7b7ae746e 100644 --- a/crypto/spipe/handshake.go +++ b/crypto/spipe/handshake.go @@ -53,7 +53,7 @@ func (s *SecurePipe) handshake() error { } log.Debug("handshake: %s <--> %s", s.local, s.remote) - myPubKey, err := s.local.PubKey.Bytes() + myPubKey, err := s.local.PubKey().Bytes() if err != nil { return err } @@ -132,7 +132,7 @@ func (s *SecurePipe) handshake() error { exPacket := new(Exchange) exPacket.Epubkey = epubkey - exPacket.Signature, err = s.local.PrivKey.Sign(handshake.Bytes()) + exPacket.Signature, err = s.local.PrivKey().Sign(handshake.Bytes()) if err != nil { return err } @@ -167,7 +167,7 @@ func (s *SecurePipe) handshake() error { theirHandshake.Write(exchangeResp.GetEpubkey()) // u.POut("Remote Peer Identified as %s\n", s.remote) - ok, err := s.remote.PubKey.Verify(theirHandshake.Bytes(), exchangeResp.GetSignature()) + ok, err := s.remote.PubKey().Verify(theirHandshake.Bytes(), exchangeResp.GetSignature()) if err != nil { return err } @@ -340,7 +340,7 @@ func selectBest(myPrefs, theirPrefs string) (string, error) { // getOrConstructPeer attempts to fetch a peer from a peerstore. // if succeeds, verify ID and PubKey match. // else, construct it. -func getOrConstructPeer(peers peer.Peerstore, rpk ci.PubKey) (*peer.Peer, error) { +func getOrConstructPeer(peers peer.Peerstore, rpk ci.PubKey) (peer.Peer, error) { rid, err := peer.IDFromPubKey(rpk) if err != nil { diff --git a/crypto/spipe/pipe.go b/crypto/spipe/pipe.go index b1c56f1c1..17b13f467 100644 --- a/crypto/spipe/pipe.go +++ b/crypto/spipe/pipe.go @@ -18,8 +18,8 @@ type SecurePipe struct { Duplex insecure Duplex - local *peer.Peer - remote *peer.Peer + local peer.Peer + remote peer.Peer peers peer.Peerstore params params @@ -33,7 +33,7 @@ type params struct { } // NewSecurePipe constructs a pipe with channels of a given buffer size. -func NewSecurePipe(ctx context.Context, bufsize int, local *peer.Peer, +func NewSecurePipe(ctx context.Context, bufsize int, local peer.Peer, peers peer.Peerstore, insecure Duplex) (*SecurePipe, error) { ctx, cancel := context.WithCancel(ctx) @@ -60,12 +60,12 @@ func NewSecurePipe(ctx context.Context, bufsize int, local *peer.Peer, } // LocalPeer retrieves the local peer. -func (s *SecurePipe) LocalPeer() *peer.Peer { +func (s *SecurePipe) LocalPeer() peer.Peer { return s.local } // RemotePeer retrieves the local peer. -func (s *SecurePipe) RemotePeer() *peer.Peer { +func (s *SecurePipe) RemotePeer() peer.Peer { return s.remote } diff --git a/diagnostics/diag.go b/diagnostics/diag.go index f347c79ed..6b8eaf420 100644 --- a/diagnostics/diag.go +++ b/diagnostics/diag.go @@ -26,14 +26,14 @@ const ResponseTimeout = time.Second * 10 type Diagnostics struct { network net.Network sender net.Sender - self *peer.Peer + self peer.Peer diagLock sync.Mutex diagMap map[string]time.Time birth time.Time } -func NewDiagnostics(self *peer.Peer, inet net.Network, sender net.Sender) *Diagnostics { +func NewDiagnostics(self peer.Peer, inet net.Network, sender net.Sender) *Diagnostics { return &Diagnostics{ network: inet, sender: sender, @@ -67,20 +67,21 @@ func (di *DiagInfo) Marshal() []byte { return b } -func (d *Diagnostics) getPeers() []*peer.Peer { +func (d *Diagnostics) getPeers() []peer.Peer { return d.network.GetPeerList() } func (d *Diagnostics) getDiagInfo() *DiagInfo { di := new(DiagInfo) di.CodeVersion = "github.com/jbenet/go-ipfs" - di.ID = d.self.ID.Pretty() + di.ID = d.self.ID().Pretty() di.LifeSpan = time.Since(d.birth) di.Keys = nil // Currently no way to query datastore di.BwIn, di.BwOut = d.network.GetBandwidthTotals() for _, p := range d.getPeers() { - di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID.Pretty()}) + d := connDiagInfo{p.GetLatency(), p.ID().Pretty()} + di.Connections = append(di.Connections, d) } return di } @@ -116,7 +117,7 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) for _, p := range peers { log.Debug("Sending getDiagnostic to: %s", p) sends++ - go func(p *peer.Peer) { + go func(p peer.Peer) { data, err := d.getDiagnosticFromPeer(ctx, p, pmes) if err != nil { log.Error("GetDiagnostic error: %v", err) @@ -155,7 +156,7 @@ func AppendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo { } // TODO: this method no longer needed. -func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p *peer.Peer, mes *Message) ([]byte, error) { +func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.Peer, mes *Message) ([]byte, error) { rpmes, err := d.sendRequest(ctx, p, mes) if err != nil { return nil, err @@ -169,7 +170,7 @@ func newMessage(diagID string) *Message { return pmes } -func (d *Diagnostics) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) { +func (d *Diagnostics) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) { mes, err := msg.FromObject(p, pmes) if err != nil { @@ -197,7 +198,7 @@ func (d *Diagnostics) sendRequest(ctx context.Context, p *peer.Peer, pmes *Messa return rpmes, nil } -func (d *Diagnostics) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) { +func (d *Diagnostics) handleDiagnostic(p peer.Peer, pmes *Message) (*Message, error) { log.Debug("HandleDiagnostic from %s for id = %s", p, pmes.GetDiagID()) resp := newMessage(pmes.GetDiagID()) d.diagLock.Lock() @@ -220,7 +221,7 @@ func (d *Diagnostics) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, e for _, p := range d.getPeers() { log.Debug("Sending diagnostic request to peer: %s", p) sendcount++ - go func(p *peer.Peer) { + go func(p peer.Peer) { out, err := d.getDiagnosticFromPeer(ctx, p, pmes) if err != nil { log.Error("getDiagnostic error: %v", err) @@ -267,7 +268,7 @@ func (d *Diagnostics) HandleMessage(ctx context.Context, mes msg.NetMessage) msg // Print out diagnostic log.Info("[peer: %s] Got message from [%s]\n", - d.self.ID.Pretty(), mPeer.ID.Pretty()) + d.self.ID().Pretty(), mPeer.ID().Pretty()) // dispatch handler. rpmes, err := d.handleDiagnostic(mPeer, pmes) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index b93b1a9b8..4a3170fac 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -20,7 +20,7 @@ var log = u.Logger("bitswap") // NetMessageSession initializes a BitSwap session that communicates over the // provided NetMessage service -func NetMessageSession(parent context.Context, p *peer.Peer, +func NetMessageSession(parent context.Context, p peer.Peer, net inet.Network, srv inet.Service, directory bsnet.Routing, d ds.Datastore, nice bool) exchange.Interface { @@ -83,7 +83,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) message.AppendWanted(k) for peerToQuery := range peersToQuery { log.Debug("bitswap got peersToQuery: %s", peerToQuery) - go func(p *peer.Peer) { + go func(p peer.Peer) { log.Debug("bitswap dialing peer: %s", p) err := bs.sender.DialPeer(p) @@ -131,8 +131,8 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { } // TODO(brian): handle errors -func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage) { +func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) ( + peer.Peer, bsmsg.BitSwapMessage) { log.Debug("ReceiveMessage from %v", p.Key()) if p == nil { @@ -181,7 +181,7 @@ func (bs *bitswap) ReceiveError(err error) { // send strives to ensure that accounting is always performed when a message is // sent -func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessage) { +func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) { bs.sender.SendMessage(ctx, p, m) go bs.strategy.MessageSent(p, m) } diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index d1c92d8d0..8a2f1f421 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -44,7 +44,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { g := NewSessionGenerator(net, rs) block := blocks.NewBlock([]byte("block")) - rs.Announce(&peer.Peer{}, block.Key()) // but not on network + rs.Announce(peer.WithIDString("testing"), block.Key()) // but not on network solo := g.Next() @@ -263,7 +263,7 @@ func (g *SessionGenerator) Instances(n int) []instance { } type instance struct { - peer *peer.Peer + peer peer.Peer exchange exchange.Interface blockstore bstore.Blockstore } @@ -274,7 +274,7 @@ type instance struct { // sessions. To safeguard, use the SessionGenerator to generate sessions. It's // just a much better idea. func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance { - p := &peer.Peer{ID: id} + p := peer.WithID(id) adapter := net.Adapter(p) htc := rs.Client(p) diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index a724f7cc7..423cc329c 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -19,7 +19,7 @@ type BitSwapMessage interface { type Exportable interface { ToProto() *PBMessage - ToNet(p *peer.Peer) (nm.NetMessage, error) + ToNet(p peer.Peer) (nm.NetMessage, error) } // message wraps a proto message for convenience @@ -82,6 +82,6 @@ func (m *message) ToProto() *PBMessage { return pb } -func (m *message) ToNet(p *peer.Peer) (nm.NetMessage, error) { +func (m *message) ToNet(p peer.Peer) (nm.NetMessage, error) { return nm.FromObject(p, m.ToProto()) } diff --git a/exchange/bitswap/message/message_test.go b/exchange/bitswap/message/message_test.go index b5954eba8..5aa63ecc3 100644 --- a/exchange/bitswap/message/message_test.go +++ b/exchange/bitswap/message/message_test.go @@ -88,7 +88,7 @@ func TestCopyProtoByValue(t *testing.T) { func TestToNetMethodSetsPeer(t *testing.T) { m := New() - p := &peer.Peer{ID: []byte("X")} + p := peer.WithIDString("X") netmsg, err := m.ToNet(p) if err != nil { t.Fatal(err) @@ -106,7 +106,8 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { original.AppendWanted(u.Key("T")) original.AppendWanted(u.Key("F")) - netmsg, err := original.ToNet(&peer.Peer{ID: []byte("X")}) + p := peer.WithIDString("X") + netmsg, err := original.ToNet(p) if err != nil { t.Fatal(err) } @@ -136,7 +137,7 @@ func TestToAndFromNetMessage(t *testing.T) { original.AppendBlock(*blocks.NewBlock([]byte("F"))) original.AppendBlock(*blocks.NewBlock([]byte("M"))) - p := &peer.Peer{ID: []byte("X")} + p := peer.WithIDString("X") netmsg, err := original.ToNet(p) if err != nil { t.Fatal(err) diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 03d7d3415..467b0f400 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -12,18 +12,18 @@ import ( type Adapter interface { // DialPeer ensures there is a connection to peer. - DialPeer(*peer.Peer) error + DialPeer(peer.Peer) error // SendMessage sends a BitSwap message to a peer. SendMessage( context.Context, - *peer.Peer, + peer.Peer, bsmsg.BitSwapMessage) error // SendRequest sends a BitSwap message to a peer and waits for a response. SendRequest( context.Context, - *peer.Peer, + peer.Peer, bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) // SetDelegate registers the Reciver to handle messages received from the @@ -33,8 +33,8 @@ type Adapter interface { type Receiver interface { ReceiveMessage( - ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( - destination *peer.Peer, outgoing bsmsg.BitSwapMessage) + ctx context.Context, sender peer.Peer, incoming bsmsg.BitSwapMessage) ( + destination peer.Peer, outgoing bsmsg.BitSwapMessage) ReceiveError(error) } @@ -42,7 +42,7 @@ type Receiver interface { // TODO rename -> Router? type Routing interface { // FindProvidersAsync returns a channel of providers for the given key - FindProvidersAsync(context.Context, u.Key, int) <-chan *peer.Peer + FindProvidersAsync(context.Context, u.Key, int) <-chan peer.Peer // Provide provides the key to the network Provide(context.Context, u.Key) error diff --git a/exchange/bitswap/network/net_message_adapter.go b/exchange/bitswap/network/net_message_adapter.go index 52f428076..3ae11a2c6 100644 --- a/exchange/bitswap/network/net_message_adapter.go +++ b/exchange/bitswap/network/net_message_adapter.go @@ -60,13 +60,13 @@ func (adapter *impl) HandleMessage( return outgoing } -func (adapter *impl) DialPeer(p *peer.Peer) error { +func (adapter *impl) DialPeer(p peer.Peer) error { return adapter.net.DialPeer(p) } func (adapter *impl) SendMessage( ctx context.Context, - p *peer.Peer, + p peer.Peer, outgoing bsmsg.BitSwapMessage) error { nmsg, err := outgoing.ToNet(p) @@ -78,7 +78,7 @@ func (adapter *impl) SendMessage( func (adapter *impl) SendRequest( ctx context.Context, - p *peer.Peer, + p peer.Peer, outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) { outgoingMsg, err := outgoing.ToNet(p) diff --git a/exchange/bitswap/strategy/interface.go b/exchange/bitswap/strategy/interface.go index 48097b027..ac1f09a1f 100644 --- a/exchange/bitswap/strategy/interface.go +++ b/exchange/bitswap/strategy/interface.go @@ -8,25 +8,25 @@ import ( type Strategy interface { // Returns a slice of Peers with whom the local node has active sessions - Peers() []*peer.Peer + Peers() []peer.Peer // BlockIsWantedByPeer returns true if peer wants the block given by this // key - BlockIsWantedByPeer(u.Key, *peer.Peer) bool + BlockIsWantedByPeer(u.Key, peer.Peer) bool // ShouldSendTo(Peer) decides whether to send data to this Peer - ShouldSendBlockToPeer(u.Key, *peer.Peer) bool + ShouldSendBlockToPeer(u.Key, peer.Peer) bool // Seed initializes the decider to a deterministic state Seed(int64) // MessageReceived records receipt of message for accounting purposes - MessageReceived(*peer.Peer, bsmsg.BitSwapMessage) error + MessageReceived(peer.Peer, bsmsg.BitSwapMessage) error // MessageSent records sending of message for accounting purposes - MessageSent(*peer.Peer, bsmsg.BitSwapMessage) error + MessageSent(peer.Peer, bsmsg.BitSwapMessage) error - NumBytesSentTo(*peer.Peer) uint64 + NumBytesSentTo(peer.Peer) uint64 - NumBytesReceivedFrom(*peer.Peer) uint64 + NumBytesReceivedFrom(peer.Peer) uint64 } diff --git a/exchange/bitswap/strategy/ledger.go b/exchange/bitswap/strategy/ledger.go index 34f301055..3700c1f43 100644 --- a/exchange/bitswap/strategy/ledger.go +++ b/exchange/bitswap/strategy/ledger.go @@ -12,7 +12,7 @@ import ( // access/lookups. type keySet map[u.Key]struct{} -func newLedger(p *peer.Peer, strategy strategyFunc) *ledger { +func newLedger(p peer.Peer, strategy strategyFunc) *ledger { return &ledger{ wantList: keySet{}, Strategy: strategy, @@ -25,7 +25,7 @@ type ledger struct { lock sync.RWMutex // Partner is the remote Peer. - Partner *peer.Peer + Partner peer.Peer // Accounting tracks bytes sent and recieved. Accounting debtRatio diff --git a/exchange/bitswap/strategy/strategy.go b/exchange/bitswap/strategy/strategy.go index 5d09f30b5..399d7777b 100644 --- a/exchange/bitswap/strategy/strategy.go +++ b/exchange/bitswap/strategy/strategy.go @@ -37,20 +37,20 @@ type ledgerMap map[peerKey]*ledger type peerKey u.Key // Peers returns a list of peers -func (s *strategist) Peers() []*peer.Peer { - response := make([]*peer.Peer, 0) +func (s *strategist) Peers() []peer.Peer { + response := make([]peer.Peer, 0) for _, ledger := range s.ledgerMap { response = append(response, ledger.Partner) } return response } -func (s *strategist) BlockIsWantedByPeer(k u.Key, p *peer.Peer) bool { +func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool { ledger := s.ledger(p) return ledger.WantListContains(k) } -func (s *strategist) ShouldSendBlockToPeer(k u.Key, p *peer.Peer) bool { +func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool { ledger := s.ledger(p) return ledger.ShouldSend() } @@ -59,7 +59,7 @@ func (s *strategist) Seed(int64) { // TODO } -func (s *strategist) MessageReceived(p *peer.Peer, m bsmsg.BitSwapMessage) error { +func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { // TODO find a more elegant way to handle this check if p == nil { return errors.New("Strategy received nil peer") @@ -84,7 +84,7 @@ func (s *strategist) MessageReceived(p *peer.Peer, m bsmsg.BitSwapMessage) error // inconsistent. Would need to ensure that Sends and acknowledgement of the // send happen atomically -func (s *strategist) MessageSent(p *peer.Peer, m bsmsg.BitSwapMessage) error { +func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error { l := s.ledger(p) for _, block := range m.Blocks() { l.SentBytes(len(block.Data)) @@ -95,16 +95,16 @@ func (s *strategist) MessageSent(p *peer.Peer, m bsmsg.BitSwapMessage) error { return nil } -func (s *strategist) NumBytesSentTo(p *peer.Peer) uint64 { +func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 { return s.ledger(p).Accounting.BytesSent } -func (s *strategist) NumBytesReceivedFrom(p *peer.Peer) uint64 { +func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 { return s.ledger(p).Accounting.BytesRecv } // ledger lazily instantiates a ledger -func (s *strategist) ledger(p *peer.Peer) *ledger { +func (s *strategist) ledger(p peer.Peer) *ledger { l, ok := s.ledgerMap[peerKey(p.Key())] if !ok { l = newLedger(p, s.strategyFunc) diff --git a/exchange/bitswap/strategy/strategy_test.go b/exchange/bitswap/strategy/strategy_test.go index dccc4a374..e3ffc05ea 100644 --- a/exchange/bitswap/strategy/strategy_test.go +++ b/exchange/bitswap/strategy/strategy_test.go @@ -10,13 +10,13 @@ import ( ) type peerAndStrategist struct { - *peer.Peer + peer.Peer Strategy } func newPeerAndStrategist(idStr string) peerAndStrategist { return peerAndStrategist{ - Peer: &peer.Peer{ID: peer.ID(idStr)}, + Peer: peer.WithIDString(idStr), Strategy: New(true), } } @@ -93,7 +93,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) { } } -func peerIsPartner(p *peer.Peer, s Strategy) bool { +func peerIsPartner(p peer.Peer, s Strategy) bool { for _, partner := range s.Peers() { if partner.Key() == p.Key() { return true diff --git a/exchange/bitswap/testnet/network.go b/exchange/bitswap/testnet/network.go index c3081337d..418f75ce0 100644 --- a/exchange/bitswap/testnet/network.go +++ b/exchange/bitswap/testnet/network.go @@ -13,20 +13,20 @@ import ( ) type Network interface { - Adapter(*peer.Peer) bsnet.Adapter + Adapter(peer.Peer) bsnet.Adapter - HasPeer(*peer.Peer) bool + HasPeer(peer.Peer) bool SendMessage( ctx context.Context, - from *peer.Peer, - to *peer.Peer, + from peer.Peer, + to peer.Peer, message bsmsg.BitSwapMessage) error SendRequest( ctx context.Context, - from *peer.Peer, - to *peer.Peer, + from peer.Peer, + to peer.Peer, message bsmsg.BitSwapMessage) ( incoming bsmsg.BitSwapMessage, err error) } @@ -43,7 +43,7 @@ type network struct { clients map[util.Key]bsnet.Receiver } -func (n *network) Adapter(p *peer.Peer) bsnet.Adapter { +func (n *network) Adapter(p peer.Peer) bsnet.Adapter { client := &networkClient{ local: p, network: n, @@ -52,7 +52,7 @@ func (n *network) Adapter(p *peer.Peer) bsnet.Adapter { return client } -func (n *network) HasPeer(p *peer.Peer) bool { +func (n *network) HasPeer(p peer.Peer) bool { _, found := n.clients[p.Key()] return found } @@ -61,8 +61,8 @@ func (n *network) HasPeer(p *peer.Peer) bool { // TODO what does the network layer do with errors received from services? func (n *network) SendMessage( ctx context.Context, - from *peer.Peer, - to *peer.Peer, + from peer.Peer, + to peer.Peer, message bsmsg.BitSwapMessage) error { receiver, ok := n.clients[to.Key()] @@ -79,7 +79,7 @@ func (n *network) SendMessage( } func (n *network) deliver( - r bsnet.Receiver, from *peer.Peer, message bsmsg.BitSwapMessage) error { + r bsnet.Receiver, from peer.Peer, message bsmsg.BitSwapMessage) error { if message == nil || from == nil { return errors.New("Invalid input") } @@ -107,8 +107,8 @@ var NoResponse = errors.New("No response received from the receiver") // TODO func (n *network) SendRequest( ctx context.Context, - from *peer.Peer, - to *peer.Peer, + from peer.Peer, + to peer.Peer, message bsmsg.BitSwapMessage) ( incoming bsmsg.BitSwapMessage, err error) { @@ -130,7 +130,7 @@ func (n *network) SendRequest( } // TODO test when receiver doesn't immediately respond to the initiator of the request - if !bytes.Equal(nextPeer.ID, from.ID) { + if !bytes.Equal(nextPeer.ID(), from.ID()) { go func() { nextReceiver, ok := n.clients[nextPeer.Key()] if !ok { @@ -144,26 +144,26 @@ func (n *network) SendRequest( } type networkClient struct { - local *peer.Peer + local peer.Peer bsnet.Receiver network Network } func (nc *networkClient) SendMessage( ctx context.Context, - to *peer.Peer, + to peer.Peer, message bsmsg.BitSwapMessage) error { return nc.network.SendMessage(ctx, nc.local, to, message) } func (nc *networkClient) SendRequest( ctx context.Context, - to *peer.Peer, + to peer.Peer, message bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) { return nc.network.SendRequest(ctx, nc.local, to, message) } -func (nc *networkClient) DialPeer(p *peer.Peer) error { +func (nc *networkClient) DialPeer(p peer.Peer) error { // no need to do anything because dialing isn't a thing in this test net. if !nc.network.HasPeer(p) { return fmt.Errorf("Peer not in network: %s", p) diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index fbd7c8893..c2cc28f8d 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -18,15 +18,15 @@ func TestSendRequestToCooperativePeer(t *testing.T) { t.Log("Get two network adapters") - initiator := net.Adapter(&peer.Peer{ID: []byte("initiator")}) - recipient := net.Adapter(&peer.Peer{ID: idOfRecipient}) + initiator := net.Adapter(peer.WithIDString("initiator")) + recipient := net.Adapter(peer.WithID(idOfRecipient)) expectedStr := "response from recipient" recipient.SetDelegate(lambda(func( ctx context.Context, - from *peer.Peer, + from peer.Peer, incoming bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage) { + peer.Peer, bsmsg.BitSwapMessage) { t.Log("Recipient received a message from the network") @@ -43,7 +43,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { message := bsmsg.New() message.AppendBlock(*blocks.NewBlock([]byte("data"))) response, err := initiator.SendRequest( - context.Background(), &peer.Peer{ID: idOfRecipient}, message) + context.Background(), peer.WithID(idOfRecipient), message) if err != nil { t.Fatal(err) } @@ -61,8 +61,8 @@ func TestSendRequestToCooperativePeer(t *testing.T) { func TestSendMessageAsyncButWaitForResponse(t *testing.T) { net := VirtualNetwork() idOfResponder := []byte("responder") - waiter := net.Adapter(&peer.Peer{ID: []byte("waiter")}) - responder := net.Adapter(&peer.Peer{ID: idOfResponder}) + waiter := net.Adapter(peer.WithIDString("waiter")) + responder := net.Adapter(peer.WithID(idOfResponder)) var wg sync.WaitGroup @@ -72,9 +72,9 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { responder.SetDelegate(lambda(func( ctx context.Context, - fromWaiter *peer.Peer, + fromWaiter peer.Peer, msgFromWaiter bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage) { + peer.Peer, bsmsg.BitSwapMessage) { msgToWaiter := bsmsg.New() msgToWaiter.AppendBlock(*blocks.NewBlock([]byte(expectedStr))) @@ -84,9 +84,9 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { waiter.SetDelegate(lambda(func( ctx context.Context, - fromResponder *peer.Peer, + fromResponder peer.Peer, msgFromResponder bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage) { + peer.Peer, bsmsg.BitSwapMessage) { // TODO assert that this came from the correct peer and that the message contents are as expected ok := false @@ -107,7 +107,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { messageSentAsync := bsmsg.New() messageSentAsync.AppendBlock(*blocks.NewBlock([]byte("data"))) errSending := waiter.SendMessage( - context.Background(), &peer.Peer{ID: idOfResponder}, messageSentAsync) + context.Background(), peer.WithID(idOfResponder), messageSentAsync) if errSending != nil { t.Fatal(errSending) } @@ -115,8 +115,8 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { wg.Wait() // until waiter delegate function is executed } -type receiverFunc func(ctx context.Context, p *peer.Peer, - incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage) +type receiverFunc func(ctx context.Context, p peer.Peer, + incoming bsmsg.BitSwapMessage) (peer.Peer, bsmsg.BitSwapMessage) // lambda returns a Receiver instance given a receiver function func lambda(f receiverFunc) bsnet.Receiver { @@ -126,13 +126,13 @@ func lambda(f receiverFunc) bsnet.Receiver { } type lambdaImpl struct { - f func(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage) + f func(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) ( + peer.Peer, bsmsg.BitSwapMessage) } func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, - p *peer.Peer, incoming bsmsg.BitSwapMessage) ( - *peer.Peer, bsmsg.BitSwapMessage) { + p peer.Peer, incoming bsmsg.BitSwapMessage) ( + peer.Peer, bsmsg.BitSwapMessage) { return lam.f(ctx, p, incoming) } diff --git a/fuse/ipns/ipns_test.go b/fuse/ipns/ipns_test.go index 0d3ed7a84..d2288198c 100644 --- a/fuse/ipns/ipns_test.go +++ b/fuse/ipns/ipns_test.go @@ -209,7 +209,7 @@ func TestFastRepublish(t *testing.T) { node, mnt := setupIpnsTest(t, nil) - h, err := node.Identity.PrivKey.GetPublic().Hash() + h, err := node.Identity.PrivKey().GetPublic().Hash() if err != nil { t.Fatal(err) } diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index 3f56dbb68..352c9480f 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -35,7 +35,7 @@ type FileSystem struct { // NewFileSystem constructs new fs using given core.IpfsNode instance. func NewIpns(ipfs *core.IpfsNode, ipfspath string) (*FileSystem, error) { - root, err := CreateRoot(ipfs, []ci.PrivKey{ipfs.Identity.PrivKey}, ipfspath) + root, err := CreateRoot(ipfs, []ci.PrivKey{ipfs.Identity.PrivKey()}, ipfspath) if err != nil { return nil, err } diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index 5e652f42f..c6ee63351 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -11,9 +11,7 @@ import ( ) func TestRoutingResolve(t *testing.T) { - local := &peer.Peer{ - ID: []byte("testID"), - } + local := peer.WithIDString("testID") lds := ds.NewMapDatastore() d := mock.NewMockRouter(local, lds) diff --git a/net/conn/conn.go b/net/conn/conn.go index 00d4a91e9..cf26ff6d2 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -42,8 +42,8 @@ func newMsgioPipe(size int) *msgioPipe { // singleConn represents a single connection to another Peer (IPFS Node). type singleConn struct { - local *peer.Peer - remote *peer.Peer + local peer.Peer + remote peer.Peer maconn manet.Conn msgio *msgioPipe @@ -51,7 +51,7 @@ type singleConn struct { } // newConn constructs a new connection -func newSingleConn(ctx context.Context, local, remote *peer.Peer, +func newSingleConn(ctx context.Context, local, remote peer.Peer, maconn manet.Conn) (Conn, error) { conn := &singleConn{ @@ -117,12 +117,12 @@ func (c *singleConn) RemoteMultiaddr() ma.Multiaddr { } // LocalPeer is the Peer on this side -func (c *singleConn) LocalPeer() *peer.Peer { +func (c *singleConn) LocalPeer() peer.Peer { return c.local } // RemotePeer is the Peer on the remote side -func (c *singleConn) RemotePeer() *peer.Peer { +func (c *singleConn) RemotePeer() peer.Peer { return c.remote } diff --git a/net/conn/dial.go b/net/conn/dial.go index 7bf85b913..050475e78 100644 --- a/net/conn/dial.go +++ b/net/conn/dial.go @@ -12,7 +12,7 @@ import ( // Dial connects to a particular peer, over a given network // Example: d.Dial(ctx, "udp", peer) -func (d *Dialer) Dial(ctx context.Context, network string, remote *peer.Peer) (Conn, error) { +func (d *Dialer) Dial(ctx context.Context, network string, remote peer.Peer) (Conn, error) { laddr := d.LocalPeer.NetAddress(network) if laddr == nil { return nil, fmt.Errorf("No local address for network %s", network) diff --git a/net/conn/dial_test.go b/net/conn/dial_test.go index e89f31040..f0fa6a3c1 100644 --- a/net/conn/dial_test.go +++ b/net/conn/dial_test.go @@ -10,7 +10,7 @@ import ( ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) -func setupPeer(addr string) (*peer.Peer, error) { +func setupPeer(addr string) (peer.Peer, error) { tcp, err := ma.NewMultiaddr(addr) if err != nil { return nil, err @@ -21,14 +21,10 @@ func setupPeer(addr string) (*peer.Peer, error) { return nil, err } - id, err := peer.IDFromPubKey(pk) + p, err := peer.WithKeyPair(sk, pk) if err != nil { return nil, err } - - p := &peer.Peer{ID: id} - p.PrivKey = sk - p.PubKey = pk p.AddAddress(tcp) return p, nil } diff --git a/net/conn/interface.go b/net/conn/interface.go index 5cfd8336d..36ff4131e 100644 --- a/net/conn/interface.go +++ b/net/conn/interface.go @@ -23,13 +23,13 @@ type Conn interface { LocalMultiaddr() ma.Multiaddr // LocalPeer is the Peer on this side - LocalPeer() *peer.Peer + LocalPeer() peer.Peer // RemoteMultiaddr is the Multiaddr on the remote side RemoteMultiaddr() ma.Multiaddr // RemotePeer is the Peer on the remote side - RemotePeer() *peer.Peer + RemotePeer() peer.Peer // In returns a readable message channel In() <-chan []byte @@ -47,7 +47,7 @@ type Conn interface { type Dialer struct { // LocalPeer is the identity of the local Peer. - LocalPeer *peer.Peer + LocalPeer peer.Peer // Peerstore is the set of peers we know about locally. The Dialer needs it // because when an incoming connection is identified, we should reuse the @@ -65,7 +65,7 @@ type Listener interface { Multiaddr() ma.Multiaddr // LocalPeer is the identity of the local Peer. - LocalPeer() *peer.Peer + LocalPeer() peer.Peer // Peerstore is the set of peers we know about locally. The Listener needs it // because when an incoming connection is identified, we should reuse the diff --git a/net/conn/listen.go b/net/conn/listen.go index 20cfbb4fb..ce1e49e01 100644 --- a/net/conn/listen.go +++ b/net/conn/listen.go @@ -23,7 +23,7 @@ type listener struct { maddr ma.Multiaddr // LocalPeer is the identity of the local Peer. - local *peer.Peer + local peer.Peer // Peerstore is the set of peers we know about locally peers peer.Peerstore @@ -105,7 +105,7 @@ func (l *listener) Multiaddr() ma.Multiaddr { } // LocalPeer is the identity of the local Peer. -func (l *listener) LocalPeer() *peer.Peer { +func (l *listener) LocalPeer() peer.Peer { return l.local } @@ -117,7 +117,7 @@ func (l *listener) Peerstore() peer.Peerstore { } // Listen listens on the particular multiaddr, with given peer and peerstore. -func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer.Peerstore) (Listener, error) { +func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer.Peerstore) (Listener, error) { ml, err := manet.Listen(addr) if err != nil { diff --git a/net/conn/multiconn.go b/net/conn/multiconn.go index 24b4cc994..dce3e3acc 100644 --- a/net/conn/multiconn.go +++ b/net/conn/multiconn.go @@ -27,8 +27,8 @@ type MultiConn struct { // this string is: /addr1/peer1/addr2/peer2 (peers ordered lexicographically) conns map[string]Conn - local *peer.Peer - remote *peer.Peer + local peer.Peer + remote peer.Peer // fan-in/fan-out duplex Duplex @@ -39,7 +39,7 @@ type MultiConn struct { } // NewMultiConn constructs a new connection -func NewMultiConn(ctx context.Context, local, remote *peer.Peer, conns []Conn) (*MultiConn, error) { +func NewMultiConn(ctx context.Context, local, remote peer.Peer, conns []Conn) (*MultiConn, error) { c := &MultiConn{ local: local, @@ -72,6 +72,10 @@ func (c *MultiConn) Add(conns ...Conn) { log.Error("%s", c2) c.Unlock() // ok to unlock (to log). panicing. log.Error("%s", c) + log.Error("c.LocalPeer: %s %#v", c.LocalPeer(), c.LocalPeer()) + log.Error("c2.LocalPeer: %s %#v", c2.LocalPeer(), c2.LocalPeer()) + log.Error("c.RemotePeer: %s %#v", c.RemotePeer(), c.RemotePeer()) + log.Error("c2.RemotePeer: %s %#v", c2.RemotePeer(), c2.RemotePeer()) c.Lock() // gotta relock to avoid lock panic from deferring. panic("connection addresses mismatch") } @@ -269,12 +273,12 @@ func (c *MultiConn) RemoteMultiaddr() ma.Multiaddr { } // LocalPeer is the Peer on this side -func (c *MultiConn) LocalPeer() *peer.Peer { +func (c *MultiConn) LocalPeer() peer.Peer { return c.local } // RemotePeer is the Peer on the remote side -func (c *MultiConn) RemotePeer() *peer.Peer { +func (c *MultiConn) RemotePeer() peer.Peer { return c.remote } diff --git a/net/conn/multiconn_test.go b/net/conn/multiconn_test.go index bb8404a13..e45a348d8 100644 --- a/net/conn/multiconn_test.go +++ b/net/conn/multiconn_test.go @@ -97,7 +97,7 @@ func setupMultiConns(t *testing.T, ctx context.Context) (a, b *MultiConn) { p2ps := peer.NewPeerstore() // listeners - listen := func(addr ma.Multiaddr, p *peer.Peer, ps peer.Peerstore) Listener { + listen := func(addr ma.Multiaddr, p peer.Peer, ps peer.Peerstore) Listener { l, err := Listen(ctx, addr, p, ps) if err != nil { t.Fatal(err) @@ -106,14 +106,14 @@ func setupMultiConns(t *testing.T, ctx context.Context) (a, b *MultiConn) { } log.Info("Setting up listeners") - p1l := listen(p1.Addresses[0], p1, p1ps) - p2l := listen(p2.Addresses[0], p2, p2ps) + p1l := listen(p1.Addresses()[0], p1, p1ps) + p2l := listen(p2.Addresses()[0], p2, p2ps) // dialers p1d := &Dialer{Peerstore: p1ps, LocalPeer: p1} p2d := &Dialer{Peerstore: p2ps, LocalPeer: p2} - dial := func(d *Dialer, dst *peer.Peer) <-chan Conn { + dial := func(d *Dialer, dst peer.Peer) <-chan Conn { cc := make(chan Conn) go func() { c, err := d.Dial(ctx, "tcp", dst) diff --git a/net/conn/secure_conn.go b/net/conn/secure_conn.go index dfccbaf2e..1b039a180 100644 --- a/net/conn/secure_conn.go +++ b/net/conn/secure_conn.go @@ -79,6 +79,8 @@ func (c *secureConn) secureHandshake(peers peer.Peerstore) error { // perhaps return an error. TBD. log.Error("secureConn peer mismatch. %v != %v", insecureSC.remote, c.secure.RemotePeer()) + log.Error("insecureSC.remote: %s %#v", insecureSC.remote, insecureSC.remote) + log.Error("c.secure.LocalPeer: %s %#v", c.secure.RemotePeer(), c.secure.RemotePeer()) panic("secureConn peer mismatch. consructed incorrectly?") } @@ -114,12 +116,12 @@ func (c *secureConn) RemoteMultiaddr() ma.Multiaddr { } // LocalPeer is the Peer on this side -func (c *secureConn) LocalPeer() *peer.Peer { +func (c *secureConn) LocalPeer() peer.Peer { return c.insecure.LocalPeer() } // RemotePeer is the Peer on the remote side -func (c *secureConn) RemotePeer() *peer.Peer { +func (c *secureConn) RemotePeer() peer.Peer { return c.insecure.RemotePeer() } diff --git a/net/interface.go b/net/interface.go index 379d01968..71a774456 100644 --- a/net/interface.go +++ b/net/interface.go @@ -15,19 +15,19 @@ type Network interface { // TODO: for now, only listen on addrs in local peer when initializing. // DialPeer attempts to establish a connection to a given peer - DialPeer(*peer.Peer) error + DialPeer(peer.Peer) error // ClosePeer connection to peer - ClosePeer(*peer.Peer) error + ClosePeer(peer.Peer) error // IsConnected returns whether a connection to given peer exists. - IsConnected(*peer.Peer) (bool, error) + IsConnected(peer.Peer) (bool, error) // GetProtocols returns the protocols registered in the network. GetProtocols() *mux.ProtocolMap // GetPeerList returns the list of peers currently connected in this network. - GetPeerList() []*peer.Peer + GetPeerList() []peer.Peer // GetBandwidthTotals returns the total number of bytes passed through // the network since it was instantiated diff --git a/net/message/message.go b/net/message/message.go index d414701ba..b7fe0d972 100644 --- a/net/message/message.go +++ b/net/message/message.go @@ -8,12 +8,12 @@ import ( // NetMessage is the interface for the message type NetMessage interface { - Peer() *peer.Peer + Peer() peer.Peer Data() []byte } // New is the interface for constructing a new message. -func New(p *peer.Peer, data []byte) NetMessage { +func New(p peer.Peer, data []byte) NetMessage { return &message{peer: p, data: data} } @@ -21,13 +21,13 @@ func New(p *peer.Peer, data []byte) NetMessage { // particular Peer. type message struct { // To or from, depending on direction. - peer *peer.Peer + peer peer.Peer // Opaque data data []byte } -func (m *message) Peer() *peer.Peer { +func (m *message) Peer() peer.Peer { return m.peer } @@ -36,7 +36,7 @@ func (m *message) Data() []byte { } // FromObject creates a message from a protobuf-marshallable message. -func FromObject(p *peer.Peer, data proto.Message) (NetMessage, error) { +func FromObject(p peer.Peer, data proto.Message) (NetMessage, error) { bytes, err := proto.Marshal(data) if err != nil { return nil, err diff --git a/net/mux/mux_test.go b/net/mux/mux_test.go index 1cd4830c4..027a6b22d 100644 --- a/net/mux/mux_test.go +++ b/net/mux/mux_test.go @@ -21,14 +21,14 @@ func (t *TestProtocol) GetPipe() *msg.Pipe { return t.Pipe } -func newPeer(t *testing.T, id string) *peer.Peer { +func newPeer(t *testing.T, id string) peer.Peer { mh, err := mh.FromHexString(id) if err != nil { t.Error(err) return nil } - return &peer.Peer{ID: peer.ID(mh)} + return peer.WithID(peer.ID(mh)) } func testMsg(t *testing.T, m msg.NetMessage, data []byte) { diff --git a/net/net.go b/net/net.go index 9ec7d2982..de433546a 100644 --- a/net/net.go +++ b/net/net.go @@ -15,7 +15,7 @@ import ( type IpfsNetwork struct { // local peer - local *peer.Peer + local peer.Peer // protocol multiplexing muxer *mux.Muxer @@ -29,7 +29,7 @@ type IpfsNetwork struct { } // NewIpfsNetwork is the structure that implements the network interface -func NewIpfsNetwork(ctx context.Context, local *peer.Peer, +func NewIpfsNetwork(ctx context.Context, local peer.Peer, peers peer.Peerstore, pmap *mux.ProtocolMap) (*IpfsNetwork, error) { ctx, cancel := context.WithCancel(ctx) @@ -63,19 +63,19 @@ func NewIpfsNetwork(ctx context.Context, local *peer.Peer, // func (n *IpfsNetwork) Listen(*ma.Muliaddr) error {} // DialPeer attempts to establish a connection to a given peer -func (n *IpfsNetwork) DialPeer(p *peer.Peer) error { +func (n *IpfsNetwork) DialPeer(p peer.Peer) error { _, err := n.swarm.Dial(p) return err } // ClosePeer connection to peer -func (n *IpfsNetwork) ClosePeer(p *peer.Peer) error { +func (n *IpfsNetwork) ClosePeer(p peer.Peer) error { return n.swarm.CloseConnection(p) } // IsConnected returns whether a connection to given peer exists. -func (n *IpfsNetwork) IsConnected(p *peer.Peer) (bool, error) { - return n.swarm.GetConnection(p.ID) != nil, nil +func (n *IpfsNetwork) IsConnected(p peer.Peer) (bool, error) { + return n.swarm.GetConnection(p.ID()) != nil, nil } // GetProtocols returns the protocols registered in the network. @@ -108,10 +108,12 @@ func (n *IpfsNetwork) Close() error { return nil } -func (n *IpfsNetwork) GetPeerList() []*peer.Peer { +// GetPeerList returns the networks list of connected peers +func (n *IpfsNetwork) GetPeerList() []peer.Peer { return n.swarm.GetPeerList() } +// GetBandwidthTotals returns the total amount of bandwidth transferred func (n *IpfsNetwork) GetBandwidthTotals() (in uint64, out uint64) { return n.muxer.GetBandwidthTotals() } diff --git a/net/service/service.go b/net/service/service.go index 27dc6f1b1..0eeea8987 100644 --- a/net/service/service.go +++ b/net/service/service.go @@ -133,7 +133,7 @@ func (s *service) SendMessage(ctx context.Context, m msg.NetMessage) error { func (s *service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) { // create a request - r, err := NewRequest(m.Peer().ID) + r, err := NewRequest(m.Peer().ID()) if err != nil { return nil, err } @@ -227,7 +227,7 @@ func (s *service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) { log.Error("RequestID should identify a response here.") } - key := RequestKey(m.Peer().ID, RequestID(rid)) + key := RequestKey(m.Peer().ID(), RequestID(rid)) s.RequestsLock.RLock() r, found := s.Requests[key] s.RequestsLock.RUnlock() diff --git a/net/service/service_test.go b/net/service/service_test.go index e28f5654a..ddcd93b89 100644 --- a/net/service/service_test.go +++ b/net/service/service_test.go @@ -25,14 +25,14 @@ func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) ms return msg.New(m.Peer(), d) } -func newPeer(t *testing.T, id string) *peer.Peer { +func newPeer(t *testing.T, id string) peer.Peer { mh, err := mh.FromHexString(id) if err != nil { t.Error(err) return nil } - return &peer.Peer{ID: peer.ID(mh)} + return peer.WithID(peer.ID(mh)) } func TestServiceHandler(t *testing.T) { diff --git a/net/swarm/conn.go b/net/swarm/conn.go index 891a191a6..a0ef4583e 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -14,11 +14,11 @@ import ( func (s *Swarm) listen() error { hasErr := false retErr := &ListenErr{ - Errors: make([]error, len(s.local.Addresses)), + Errors: make([]error, len(s.local.Addresses())), } // listen on every address - for i, addr := range s.local.Addresses { + for i, addr := range s.local.Addresses() { err := s.connListen(addr) if err != nil { hasErr = true diff --git a/net/swarm/simul_test.go b/net/swarm/simul_test.go index 2cffd0d2c..253d9784d 100644 --- a/net/swarm/simul_test.go +++ b/net/swarm/simul_test.go @@ -24,10 +24,10 @@ func TestSimultOpen(t *testing.T) { // connect everyone { var wg sync.WaitGroup - connect := func(s *Swarm, dst *peer.Peer) { + connect := func(s *Swarm, dst peer.Peer) { // copy for other peer - cp := &peer.Peer{ID: dst.ID} - cp.AddAddress(dst.Addresses[0]) + cp := peer.WithID(dst.ID()) + cp.AddAddress(dst.Addresses()[0]) if _, err := s.Dial(cp); err != nil { t.Fatal("error swarm dialing to peer", err) diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 157a9ff92..1085abf61 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -45,7 +45,7 @@ func (e *ListenErr) Error() string { type Swarm struct { // local is the peer this swarm represents - local *peer.Peer + local peer.Peer // peers is a collection of peers for swarm to use peers peer.Peerstore @@ -69,7 +69,7 @@ type Swarm struct { } // NewSwarm constructs a Swarm, with a Chan. -func NewSwarm(ctx context.Context, local *peer.Peer, ps peer.Peerstore) (*Swarm, error) { +func NewSwarm(ctx context.Context, local peer.Peer, ps peer.Peerstore) (*Swarm, error) { s := &Swarm{ Pipe: msg.NewPipe(10), conns: conn.MultiConnMap{}, @@ -104,13 +104,13 @@ func (s *Swarm) close() error { // etc. to achive connection. // // For now, Dial uses only TCP. This will be extended. -func (s *Swarm) Dial(peer *peer.Peer) (conn.Conn, error) { - if peer.ID.Equal(s.local.ID) { +func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) { + if peer.ID().Equal(s.local.ID()) { return nil, errors.New("Attempted connection to self!") } // check if we already have an open connection first - c := s.GetConnection(peer.ID) + c := s.GetConnection(peer.ID()) if c != nil { return c, nil } @@ -167,14 +167,14 @@ func (s *Swarm) Connections() []conn.Conn { } // CloseConnection removes a given peer from swarm + closes the connection -func (s *Swarm) CloseConnection(p *peer.Peer) error { - c := s.GetConnection(p.ID) +func (s *Swarm) CloseConnection(p peer.Peer) error { + c := s.GetConnection(p.ID()) if c == nil { return u.ErrNotFound } s.connsLock.Lock() - delete(s.conns, u.Key(p.ID)) + delete(s.conns, u.Key(p.ID())) s.connsLock.Unlock() return c.Close() @@ -190,8 +190,8 @@ func (s *Swarm) GetErrChan() chan error { } // GetPeerList returns a copy of the set of peers swarm is connected to. -func (s *Swarm) GetPeerList() []*peer.Peer { - var out []*peer.Peer +func (s *Swarm) GetPeerList() []peer.Peer { + var out []peer.Peer s.connsLock.RLock() for _, p := range s.conns { out = append(out, p.RemotePeer()) diff --git a/net/swarm/swarm_test.go b/net/swarm/swarm_test.go index d920b6b87..b4a9b0278 100644 --- a/net/swarm/swarm_test.go +++ b/net/swarm/swarm_test.go @@ -32,7 +32,7 @@ func pong(ctx context.Context, swarm *Swarm) { } } -func setupPeer(t *testing.T, addr string) *peer.Peer { +func setupPeer(t *testing.T, addr string) peer.Peer { tcp, err := ma.NewMultiaddr(addr) if err != nil { t.Fatal(err) @@ -43,19 +43,15 @@ func setupPeer(t *testing.T, addr string) *peer.Peer { t.Fatal(err) } - id, err := peer.IDFromPubKey(pk) + p, err := peer.WithKeyPair(sk, pk) if err != nil { t.Fatal(err) } - - p := &peer.Peer{ID: id} - p.PrivKey = sk - p.PubKey = pk p.AddAddress(tcp) return p } -func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, []*peer.Peer) { +func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, []peer.Peer) { swarms := []*Swarm{} for _, addr := range addrs { @@ -68,7 +64,7 @@ func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, [] swarms = append(swarms, swarm) } - peers := make([]*peer.Peer, len(swarms)) + peers := make([]peer.Peer, len(swarms)) for i, s := range swarms { peers[i] = s.local } @@ -85,14 +81,14 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { // connect everyone { var wg sync.WaitGroup - connect := func(s *Swarm, dst *peer.Peer) { + connect := func(s *Swarm, dst peer.Peer) { // copy for other peer - cp, err := s.peers.Get(dst.ID) + cp, err := s.peers.Get(dst.ID()) if err != nil { - cp = &peer.Peer{ID: dst.ID} + t.Fatal(err) } - cp.AddAddress(dst.Addresses[0]) + cp.AddAddress(dst.Addresses()[0]) log.Info("SWARM TEST: %s dialing %s", s.local, dst) if _, err := s.Dial(cp); err != nil { diff --git a/peer/peer.go b/peer/peer.go index 74e1a4ce5..93ddc6498 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -49,17 +49,48 @@ func IDFromPubKey(pk ic.PubKey) (ID, error) { return ID(hash), nil } -// Map maps Key (string) : *Peer (slices are not comparable). -type Map map[u.Key]*Peer +// Map maps Key (string) : *peer (slices are not comparable). +type Map map[u.Key]Peer // Peer represents the identity information of an IPFS Node, including // ID, and relevant Addresses. -type Peer struct { - ID ID - Addresses []ma.Multiaddr +type Peer interface { + // ID returns the peer's ID + ID() ID - PrivKey ic.PrivKey - PubKey ic.PubKey + // Key returns the ID as a Key (string) for maps. + Key() u.Key + + // Addresses returns the peer's multiaddrs + Addresses() []ma.Multiaddr + + // AddAddress adds the given Multiaddr address to Peer's addresses. + AddAddress(a ma.Multiaddr) + + // NetAddress returns the first Multiaddr found for a given network. + NetAddress(n string) ma.Multiaddr + + // Priv/PubKey returns the peer's Private Key + PrivKey() ic.PrivKey + PubKey() ic.PubKey + + // LoadAndVerifyKeyPair unmarshalls, loads a private/public key pair. + // Error if (a) unmarshalling fails, or (b) pubkey does not match id. + LoadAndVerifyKeyPair(marshalled []byte) error + VerifyAndSetPrivKey(sk ic.PrivKey) error + VerifyAndSetPubKey(pk ic.PubKey) error + + // Get/SetLatency manipulate the current latency measurement. + GetLatency() (out time.Duration) + SetLatency(laten time.Duration) +} + +type peer struct { + id ID + addresses []ma.Multiaddr + + privKey ic.PrivKey + pubKey ic.PubKey latency time.Duration @@ -67,34 +98,56 @@ type Peer struct { } // String prints out the peer. -func (p *Peer) String() string { - return "[Peer " + p.ID.String()[:12] + "]" +func (p *peer) String() string { + return "[Peer " + p.id.String()[:12] + "]" } // Key returns the ID as a Key (string) for maps. -func (p *Peer) Key() u.Key { - return u.Key(p.ID) +func (p *peer) Key() u.Key { + return u.Key(p.id) +} + +// ID returns the peer's ID +func (p *peer) ID() ID { + return p.id +} + +// PrivKey returns the peer's Private Key +func (p *peer) PrivKey() ic.PrivKey { + return p.privKey +} + +// PubKey returns the peer's Private Key +func (p *peer) PubKey() ic.PubKey { + return p.pubKey +} + +// Addresses returns the peer's multiaddrs +func (p *peer) Addresses() []ma.Multiaddr { + cp := make([]ma.Multiaddr, len(p.addresses)) + copy(cp, p.addresses) + return cp } // AddAddress adds the given Multiaddr address to Peer's addresses. -func (p *Peer) AddAddress(a ma.Multiaddr) { +func (p *peer) AddAddress(a ma.Multiaddr) { p.Lock() defer p.Unlock() - for _, addr := range p.Addresses { + for _, addr := range p.addresses { if addr.Equal(a) { return } } - p.Addresses = append(p.Addresses, a) + p.addresses = append(p.addresses, a) } // NetAddress returns the first Multiaddr found for a given network. -func (p *Peer) NetAddress(n string) ma.Multiaddr { +func (p *peer) NetAddress(n string) ma.Multiaddr { p.RLock() defer p.RUnlock() - for _, a := range p.Addresses { + for _, a := range p.addresses { for _, p := range a.Protocols() { if p.Name == n { return a @@ -105,7 +158,7 @@ func (p *Peer) NetAddress(n string) ma.Multiaddr { } // GetLatency retrieves the current latency measurement. -func (p *Peer) GetLatency() (out time.Duration) { +func (p *peer) GetLatency() (out time.Duration) { p.RLock() out = p.latency p.RUnlock() @@ -116,7 +169,7 @@ func (p *Peer) GetLatency() (out time.Duration) { // TODO: Instead of just keeping a single number, // keep a running average over the last hour or so // Yep, should be EWMA or something. (-jbenet) -func (p *Peer) SetLatency(laten time.Duration) { +func (p *peer) SetLatency(laten time.Duration) { p.Lock() if p.latency == 0 { p.latency = laten @@ -128,61 +181,98 @@ func (p *Peer) SetLatency(laten time.Duration) { // LoadAndVerifyKeyPair unmarshalls, loads a private/public key pair. // Error if (a) unmarshalling fails, or (b) pubkey does not match id. -func (p *Peer) LoadAndVerifyKeyPair(marshalled []byte) error { +func (p *peer) LoadAndVerifyKeyPair(marshalled []byte) error { sk, err := ic.UnmarshalPrivateKey(marshalled) if err != nil { return fmt.Errorf("Failed to unmarshal private key: %v", err) } + return p.VerifyAndSetPrivKey(sk) +} + +// VerifyAndSetPrivKey sets private key, given its pubkey matches the peer.ID +func (p *peer) VerifyAndSetPrivKey(sk ic.PrivKey) error { + // construct and assign pubkey. ensure it matches this peer if err := p.VerifyAndSetPubKey(sk.GetPublic()); err != nil { return err } // if we didn't have the priavte key, assign it - if p.PrivKey == nil { - p.PrivKey = sk + if p.privKey == nil { + p.privKey = sk return nil } // if we already had the keys, check they're equal. - if p.PrivKey.Equals(sk) { + if p.privKey.Equals(sk) { return nil // as expected. keep the old objects. } // keys not equal. invariant violated. this warrants a panic. // these keys should be _the same_ because peer.ID = H(pk) // this mismatch should never happen. - log.Error("%s had PrivKey: %v -- got %v", p, p.PrivKey, sk) + log.Error("%s had PrivKey: %v -- got %v", p, p.privKey, sk) panic("invariant violated: unexpected key mismatch") } // VerifyAndSetPubKey sets public key, given it matches the peer.ID -func (p *Peer) VerifyAndSetPubKey(pk ic.PubKey) error { +func (p *peer) VerifyAndSetPubKey(pk ic.PubKey) error { pkid, err := IDFromPubKey(pk) if err != nil { return fmt.Errorf("Failed to hash public key: %v", err) } - if !p.ID.Equal(pkid) { + if !p.id.Equal(pkid) { return fmt.Errorf("Public key does not match peer.ID.") } // if we didn't have the keys, assign them. - if p.PubKey == nil { - p.PubKey = pk + if p.pubKey == nil { + p.pubKey = pk return nil } // if we already had the pubkey, check they're equal. - if p.PubKey.Equals(pk) { + if p.pubKey.Equals(pk) { return nil // as expected. keep the old objects. } // keys not equal. invariant violated. this warrants a panic. // these keys should be _the same_ because peer.ID = H(pk) // this mismatch should never happen. - log.Error("%s had PubKey: %v -- got %v", p, p.PubKey, pk) + log.Error("%s had PubKey: %v -- got %v", p, p.pubKey, pk) panic("invariant violated: unexpected key mismatch") } + +// WithKeyPair returns a Peer object with given keys. +func WithKeyPair(sk ic.PrivKey, pk ic.PubKey) (Peer, error) { + if sk == nil && pk == nil { + return nil, fmt.Errorf("PeerWithKeyPair nil keys") + } + + pk2 := sk.GetPublic() + if pk == nil { + pk = pk2 + } else if !pk.Equals(pk2) { + return nil, fmt.Errorf("key mismatch. pubkey is not privkey's pubkey") + } + + pkid, err := IDFromPubKey(pk) + if err != nil { + return nil, fmt.Errorf("Failed to hash public key: %v", err) + } + + return &peer{id: pkid, pubKey: pk, privKey: sk}, nil +} + +// WithID constructs a peer with given ID. +func WithID(id ID) Peer { + return &peer{id: id} +} + +// WithIDString constructs a peer with given ID (string). +func WithIDString(id string) Peer { + return WithID(ID(id)) +} diff --git a/peer/peer_test.go b/peer/peer_test.go index a873d6a6f..f4431a6a7 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -27,12 +27,12 @@ func TestNetAddress(t *testing.T) { return } - p := Peer{ID: ID(mh)} + p := WithID(ID(mh)) p.AddAddress(tcp) p.AddAddress(udp) p.AddAddress(tcp) - if len(p.Addresses) == 3 { + if len(p.Addresses()) == 3 { t.Error("added same address twice") } diff --git a/peer/peerstore.go b/peer/peerstore.go index 510353c7c..4004371fd 100644 --- a/peer/peerstore.go +++ b/peer/peerstore.go @@ -11,8 +11,8 @@ import ( // Peerstore provides a threadsafe collection for peers. type Peerstore interface { - Get(ID) (*Peer, error) - Put(*Peer) error + Get(ID) (Peer, error) + Put(Peer) error Delete(ID) error All() (*Map, error) } @@ -29,9 +29,13 @@ func NewPeerstore() Peerstore { } } -func (p *peerstore) Get(i ID) (*Peer, error) { - p.RLock() - defer p.RUnlock() +func (p *peerstore) Get(i ID) (Peer, error) { + p.Lock() + defer p.Unlock() + + if i == nil { + panic("wat") + } k := u.Key(i).DsKey() val, err := p.peers.Get(k) @@ -43,7 +47,7 @@ func (p *peerstore) Get(i ID) (*Peer, error) { // not found, construct it ourselves, add it to datastore, and return. case ds.ErrNotFound: - peer := &Peer{ID: i} + peer := &peer{id: i} if err := p.peers.Put(k, peer); err != nil { return nil, err } @@ -51,7 +55,7 @@ func (p *peerstore) Get(i ID) (*Peer, error) { // no error, got it back fine case nil: - peer, ok := val.(*Peer) + peer, ok := val.(*peer) if !ok { return nil, errors.New("stored value was not a Peer") } @@ -59,11 +63,11 @@ func (p *peerstore) Get(i ID) (*Peer, error) { } } -func (p *peerstore) Put(peer *Peer) error { +func (p *peerstore) Put(peer Peer) error { p.Lock() defer p.Unlock() - k := u.Key(peer.ID).DsKey() + k := peer.Key().DsKey() return p.peers.Put(k, peer) } @@ -91,9 +95,9 @@ func (p *peerstore) All() (*Map, error) { continue } - pval, ok := val.(*Peer) + pval, ok := val.(*peer) if ok { - (*ps)[u.Key(pval.ID)] = pval + (*ps)[pval.Key()] = pval } } return ps, nil diff --git a/peer/peerstore_test.go b/peer/peerstore_test.go index f24f1de20..4dce4c46b 100644 --- a/peer/peerstore_test.go +++ b/peer/peerstore_test.go @@ -7,13 +7,13 @@ import ( ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) -func setupPeer(id string, addr string) (*Peer, error) { +func setupPeer(id string, addr string) (Peer, error) { tcp, err := ma.NewMultiaddr(addr) if err != nil { return nil, err } - p := &Peer{ID: ID(id)} + p := WithIDString(id) p.AddAddress(tcp) return p, nil } diff --git a/peer/queue/distance.go b/peer/queue/distance.go index 4dbe2a7b0..8eff20288 100644 --- a/peer/queue/distance.go +++ b/peer/queue/distance.go @@ -13,7 +13,7 @@ import ( // peerMetric tracks a peer and its distance to something else. type peerMetric struct { // the peer - peer *peer.Peer + peer peer.Peer // big.Int for XOR metric metric *big.Int @@ -64,11 +64,11 @@ func (pq *distancePQ) Len() int { return len(pq.heap) } -func (pq *distancePQ) Enqueue(p *peer.Peer) { +func (pq *distancePQ) Enqueue(p peer.Peer) { pq.Lock() defer pq.Unlock() - distance := ks.XORKeySpace.Key(p.ID).Distance(pq.from) + distance := ks.XORKeySpace.Key(p.ID()).Distance(pq.from) heap.Push(&pq.heap, &peerMetric{ peer: p, @@ -76,7 +76,7 @@ func (pq *distancePQ) Enqueue(p *peer.Peer) { }) } -func (pq *distancePQ) Dequeue() *peer.Peer { +func (pq *distancePQ) Dequeue() peer.Peer { pq.Lock() defer pq.Unlock() diff --git a/peer/queue/interface.go b/peer/queue/interface.go index ce635fab4..ba17f0aa2 100644 --- a/peer/queue/interface.go +++ b/peer/queue/interface.go @@ -11,8 +11,8 @@ type PeerQueue interface { Len() int // Enqueue adds this node to the queue. - Enqueue(*peer.Peer) + Enqueue(peer.Peer) // Dequeue retrieves the highest (smallest int) priority node - Dequeue() *peer.Peer + Dequeue() peer.Peer } diff --git a/peer/queue/queue_test.go b/peer/queue/queue_test.go index a4812befa..efaf2037e 100644 --- a/peer/queue/queue_test.go +++ b/peer/queue/queue_test.go @@ -12,8 +12,8 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) -func newPeer(id string) *peer.Peer { - return &peer.Peer{ID: peer.ID(id)} +func newPeer(id string) peer.Peer { + return peer.WithIDString(id) } func TestQueue(t *testing.T) { @@ -66,10 +66,10 @@ func TestQueue(t *testing.T) { } -func newPeerTime(t time.Time) *peer.Peer { +func newPeerTime(t time.Time) peer.Peer { s := fmt.Sprintf("hmmm time: %v", t) h := u.Hash([]byte(s)) - return &peer.Peer{ID: peer.ID(h)} + return peer.WithID(peer.ID(h)) } func TestSyncQueue(t *testing.T) { diff --git a/peer/queue/sync.go b/peer/queue/sync.go index f369493a7..c219e2671 100644 --- a/peer/queue/sync.go +++ b/peer/queue/sync.go @@ -9,8 +9,8 @@ import ( // ChanQueue makes any PeerQueue synchronizable through channels. type ChanQueue struct { Queue PeerQueue - EnqChan chan<- *peer.Peer - DeqChan <-chan *peer.Peer + EnqChan chan<- peer.Peer + DeqChan <-chan peer.Peer } // NewChanQueue creates a ChanQueue by wrapping pq. @@ -23,8 +23,8 @@ func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue { func (cq *ChanQueue) process(ctx context.Context) { // construct the channels here to be able to use them bidirectionally - enqChan := make(chan *peer.Peer, 10) - deqChan := make(chan *peer.Peer, 10) + enqChan := make(chan peer.Peer, 10) + deqChan := make(chan peer.Peer, 10) cq.EnqChan = enqChan cq.DeqChan = deqChan @@ -32,8 +32,8 @@ func (cq *ChanQueue) process(ctx context.Context) { go func() { defer close(deqChan) - var next *peer.Peer - var item *peer.Peer + var next peer.Peer + var item peer.Peer var more bool for { diff --git a/routing/dht/Message.go b/routing/dht/Message.go index 526724287..ae78d1f39 100644 --- a/routing/dht/Message.go +++ b/routing/dht/Message.go @@ -17,20 +17,21 @@ func newMessage(typ Message_MessageType, key string, level int) *Message { return m } -func peerToPBPeer(p *peer.Peer) *Message_Peer { +func peerToPBPeer(p peer.Peer) *Message_Peer { pbp := new(Message_Peer) - if len(p.Addresses) == 0 || p.Addresses[0] == nil { + addrs := p.Addresses() + if len(addrs) == 0 || addrs[0] == nil { pbp.Addr = proto.String("") } else { - addr := p.Addresses[0].String() + addr := addrs[0].String() pbp.Addr = &addr } - pid := string(p.ID) + pid := string(p.ID()) pbp.Id = &pid return pbp } -func peersToPBPeers(peers []*peer.Peer) []*Message_Peer { +func peersToPBPeers(peers []peer.Peer) []*Message_Peer { pbpeers := make([]*Message_Peer, len(peers)) for i, p := range peers { pbpeers[i] = peerToPBPeer(p) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 683b5785b..3617b7142 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -39,7 +39,7 @@ type IpfsDHT struct { sender inet.Sender // Local peer (yourself) - self *peer.Peer + self peer.Peer // Other peers peerstore peer.Peerstore @@ -60,7 +60,7 @@ type IpfsDHT struct { } // NewDHT creates a new DHT object with the given peer as the 'local' host -func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT { +func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT { dht := new(IpfsDHT) dht.network = net dht.sender = sender @@ -69,12 +69,12 @@ func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Netwo dht.peerstore = ps dht.ctx = ctx - dht.providers = NewProviderManager(p.ID) + dht.providers = NewProviderManager(p.ID()) dht.routingTables = make([]*kb.RoutingTable, 3) - dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000) - dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000) - dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour) + dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000) + dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000) + dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour) dht.birth = time.Now() if doPinging { @@ -84,7 +84,7 @@ func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Netwo } // Connect to a new peer at the given address, ping and add to the routing table -func (dht *IpfsDHT) Connect(ctx context.Context, npeer *peer.Peer) (*peer.Peer, error) { +func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, error) { log.Debug("Connect to new peer: %s", npeer) // TODO(jbenet,whyrusleeping) @@ -175,7 +175,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N // sendRequest sends out a request using dht.sender, but also makes sure to // measure the RTT for latency measurements. -func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) { mes, err := msg.FromObject(p, pmes) if err != nil { @@ -208,7 +208,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message } // putValueToNetwork stores the given key/value pair at the peer 'p' -func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, +func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer, key string, value []byte) error { pmes := newMessage(Message_PUT_VALUE, string(key), 0) @@ -224,12 +224,12 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, return nil } -func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) error { +func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error { pmes := newMessage(Message_ADD_PROVIDER, string(key), 0) // add self as the provider - pmes.ProviderPeers = peersToPBPeers([]*peer.Peer{dht.self}) + pmes.ProviderPeers = peersToPBPeers([]peer.Peer{dht.self}) rpmes, err := dht.sendRequest(ctx, p, pmes) if err != nil { @@ -244,8 +244,8 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) e return nil } -func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer, - key u.Key, level int) ([]byte, []*peer.Peer, error) { +func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer, + key u.Key, level int) ([]byte, []peer.Peer, error) { pmes, err := dht.getValueSingle(ctx, p, key, level) if err != nil { @@ -270,7 +270,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer, } // Perhaps we were given closer peers - var peers []*peer.Peer + var peers []peer.Peer for _, pb := range pmes.GetCloserPeers() { pr, err := dht.addPeer(pb) if err != nil { @@ -289,8 +289,8 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer, return nil, nil, u.ErrNotFound } -func (dht *IpfsDHT) addPeer(pb *Message_Peer) (*peer.Peer, error) { - if peer.ID(pb.GetId()).Equal(dht.self.ID) { +func (dht *IpfsDHT) addPeer(pb *Message_Peer) (peer.Peer, error) { + if peer.ID(pb.GetId()).Equal(dht.self.ID()) { return nil, errors.New("cannot add self as peer") } @@ -310,7 +310,7 @@ func (dht *IpfsDHT) addPeer(pb *Message_Peer) (*peer.Peer, error) { } // getValueSingle simply performs the get value RPC with the given parameters -func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer, +func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) { pmes := newMessage(Message_GET_VALUE, string(key), level) @@ -369,7 +369,7 @@ func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error { // Update signals to all routingTables to Update their last-seen status // on the given peer. -func (dht *IpfsDHT) Update(p *peer.Peer) { +func (dht *IpfsDHT) Update(p peer.Peer) { log.Debug("updating peer: %s latency = %f\n", p, p.GetLatency().Seconds()) removedCount := 0 for _, route := range dht.routingTables { @@ -390,7 +390,7 @@ func (dht *IpfsDHT) Update(p *peer.Peer) { } // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. -func (dht *IpfsDHT) FindLocal(id peer.ID) (*peer.Peer, *kb.RoutingTable) { +func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) { for _, table := range dht.routingTables { p := table.Find(id) if p != nil { @@ -400,7 +400,7 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) (*peer.Peer, *kb.RoutingTable) { return nil, nil } -func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p *peer.Peer, id peer.ID, level int) (*Message, error) { +func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*Message, error) { pmes := newMessage(Message_FIND_NODE, string(id), level) return dht.sendRequest(ctx, p, pmes) } @@ -411,14 +411,14 @@ func (dht *IpfsDHT) printTables() { } } -func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p *peer.Peer, key u.Key, level int) (*Message, error) { +func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) { pmes := newMessage(Message_GET_PROVIDERS, string(key), level) return dht.sendRequest(ctx, p, pmes) } // TODO: Could be done async -func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer { - var provArr []*peer.Peer +func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer { + var provArr []peer.Peer for _, prov := range peers { p, err := dht.peerFromInfo(prov) if err != nil { @@ -429,7 +429,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer log.Debug("%s adding provider: %s for %s", dht.self, p, key) // Dont add outselves to the list - if p.ID.Equal(dht.self.ID) { + if p.ID().Equal(dht.self.ID()) { continue } @@ -441,7 +441,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer } // nearestPeersToQuery returns the routing tables closest peers. -func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []*peer.Peer { +func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer { level := pmes.GetClusterLevel() cluster := dht.routingTables[level] @@ -451,7 +451,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []*peer.Peer { } // betterPeerToQuery returns nearestPeersToQuery, but iff closer than self. -func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer { +func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []peer.Peer { closer := dht.nearestPeersToQuery(pmes, count) // no node? nil @@ -461,17 +461,17 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer { // == to self? thats bad for _, p := range closer { - if p.ID.Equal(dht.self.ID) { + if p.ID().Equal(dht.self.ID()) { log.Error("Attempted to return self! this shouldnt happen...") return nil } } - var filtered []*peer.Peer + var filtered []peer.Peer for _, p := range closer { // must all be closer than self key := u.Key(pmes.GetKey()) - if !kb.Closer(dht.self.ID, p.ID, key) { + if !kb.Closer(dht.self.ID(), p.ID(), key) { filtered = append(filtered, p) } } @@ -480,7 +480,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer { return filtered } -func (dht *IpfsDHT) getPeer(id peer.ID) (*peer.Peer, error) { +func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) { p, err := dht.peerstore.Get(id) if err != nil { err = fmt.Errorf("Failed to get peer from peerstore: %s", err) @@ -490,12 +490,12 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (*peer.Peer, error) { return p, nil } -func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) { +func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) { id := peer.ID(pbp.GetId()) // continue if it's ourselves - if id.Equal(dht.self.ID) { + if id.Equal(dht.self.ID()) { return nil, errors.New("found self") } @@ -512,7 +512,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) { return p, nil } -func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (*peer.Peer, error) { +func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error) { p, err := dht.peerFromInfo(pbp) if err != nil { return nil, err diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 98b196da5..23d9fcf17 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -20,7 +20,7 @@ import ( "time" ) -func setupDHT(ctx context.Context, t *testing.T, p *peer.Peer) *IpfsDHT { +func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT { peerstore := peer.NewPeerstore() dhts := netservice.NewService(nil) // nil handler for now, need to patch it @@ -40,7 +40,7 @@ func setupDHT(ctx context.Context, t *testing.T, p *peer.Peer) *IpfsDHT { return d } -func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*peer.Peer, []*IpfsDHT) { +func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.Peer, []*IpfsDHT) { var addrs []ma.Multiaddr for i := 0; i < n; i++ { a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i)) @@ -50,7 +50,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*pee addrs = append(addrs, a) } - var peers []*peer.Peer + var peers []peer.Peer for i := 0; i < n; i++ { p := makePeer(addrs[i]) peers = append(peers, p) @@ -64,21 +64,16 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*pee return addrs, peers, dhts } -func makePeer(addr ma.Multiaddr) *peer.Peer { - p := new(peer.Peer) - p.AddAddress(addr) +func makePeer(addr ma.Multiaddr) peer.Peer { sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) if err != nil { panic(err) } - p.PrivKey = sk - p.PubKey = pk - id, err := peer.IDFromPubKey(pk) + p, err := peer.WithKeyPair(sk, pk) if err != nil { panic(err) } - - p.ID = id + p.AddAddress(addr) return p } @@ -289,7 +284,7 @@ func TestProvidesAsync(t *testing.T) { provs := dhts[0].FindProvidersAsync(ctxT, u.Key("hello"), 5) select { case p := <-provs: - if !p.ID.Equal(dhts[3].self.ID) { + if !p.ID().Equal(dhts[3].self.ID()) { t.Fatalf("got a provider, but not the right one. %s", p) } case <-ctxT.Done(): @@ -379,7 +374,7 @@ func TestFindPeer(t *testing.T) { } ctxT, _ := context.WithTimeout(ctx, time.Second) - p, err := dhts[0].FindPeer(ctxT, peers[2].ID) + p, err := dhts[0].FindPeer(ctxT, peers[2].ID()) if err != nil { t.Fatal(err) } @@ -388,7 +383,7 @@ func TestFindPeer(t *testing.T) { t.Fatal("Failed to find peer.") } - if !p.ID.Equal(peers[2].ID) { + if !p.ID().Equal(peers[2].ID()) { t.Fatal("Didnt find expected peer.") } } diff --git a/routing/dht/diag.go b/routing/dht/diag.go index 8fd581a45..e91ba9bee 100644 --- a/routing/dht/diag.go +++ b/routing/dht/diag.go @@ -32,12 +32,13 @@ func (di *diagInfo) Marshal() []byte { func (dht *IpfsDHT) getDiagInfo() *diagInfo { di := new(diagInfo) di.CodeVersion = "github.com/jbenet/go-ipfs" - di.ID = dht.self.ID + di.ID = dht.self.ID() di.LifeSpan = time.Since(dht.birth) di.Keys = nil // Currently no way to query datastore for _, p := range dht.routingTables[0].ListPeers() { - di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID}) + d := connDiagInfo{p.GetLatency(), p.ID()} + di.Connections = append(di.Connections, d) } return di } diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index b5bf48772..c4ba09414 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -9,7 +9,6 @@ import ( "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" msg "github.com/jbenet/go-ipfs/net/message" mux "github.com/jbenet/go-ipfs/net/mux" peer "github.com/jbenet/go-ipfs/peer" @@ -66,17 +65,17 @@ type fauxNet struct { } // DialPeer attempts to establish a connection to a given peer -func (f *fauxNet) DialPeer(*peer.Peer) error { +func (f *fauxNet) DialPeer(peer.Peer) error { return nil } // ClosePeer connection to peer -func (f *fauxNet) ClosePeer(*peer.Peer) error { +func (f *fauxNet) ClosePeer(peer.Peer) error { return nil } // IsConnected returns whether a connection to given peer exists. -func (f *fauxNet) IsConnected(*peer.Peer) (bool, error) { +func (f *fauxNet) IsConnected(peer.Peer) (bool, error) { return true, nil } @@ -88,7 +87,7 @@ func (f *fauxNet) SendMessage(msg.NetMessage) error { return nil } -func (f *fauxNet) GetPeerList() []*peer.Peer { +func (f *fauxNet) GetPeerList() []peer.Peer { return nil } @@ -107,11 +106,10 @@ func TestGetFailures(t *testing.T) { fs := &fauxSender{} peerstore := peer.NewPeerstore() - local := new(peer.Peer) - local.ID = peer.ID("test_peer") + local := peer.WithIDString("test_peer") d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) - other := &peer.Peer{ID: peer.ID("other_peer")} + other := peer.WithIDString("other_peer") d.Update(other) // This one should time out @@ -189,11 +187,10 @@ func TestGetFailures(t *testing.T) { } // TODO: Maybe put these in some sort of "ipfs_testutil" package -func _randPeer() *peer.Peer { - p := new(peer.Peer) - p.ID = make(peer.ID, 16) - p.Addresses = []ma.Multiaddr{nil} - crand.Read(p.ID) +func _randPeer() peer.Peer { + id := make(peer.ID, 16) + crand.Read(id) + p := peer.WithID(id) return p } @@ -204,13 +201,13 @@ func TestNotFound(t *testing.T) { fn := &fauxNet{} fs := &fauxSender{} - local := new(peer.Peer) - local.ID = peer.ID("test_peer") + local := peer.WithIDString("test_peer") peerstore := peer.NewPeerstore() + peerstore.Put(local) d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) - var ps []*peer.Peer + var ps []peer.Peer for i := 0; i < 5; i++ { ps = append(ps, _randPeer()) d.Update(ps[i]) @@ -228,7 +225,7 @@ func TestNotFound(t *testing.T) { case Message_GET_VALUE: resp := &Message{Type: pmes.Type} - peers := []*peer.Peer{} + peers := []peer.Peer{} for i := 0; i < 7; i++ { peers = append(peers, _randPeer()) } @@ -270,13 +267,13 @@ func TestLessThanKResponses(t *testing.T) { u.Debug = false fn := &fauxNet{} fs := &fauxSender{} + local := peer.WithIDString("test_peer") peerstore := peer.NewPeerstore() - local := new(peer.Peer) - local.ID = peer.ID("test_peer") + peerstore.Put(local) d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) - var ps []*peer.Peer + var ps []peer.Peer for i := 0; i < 5; i++ { ps = append(ps, _randPeer()) d.Update(ps[i]) @@ -295,7 +292,7 @@ func TestLessThanKResponses(t *testing.T) { case Message_GET_VALUE: resp := &Message{ Type: pmes.Type, - CloserPeers: peersToPBPeers([]*peer.Peer{other}), + CloserPeers: peersToPBPeers([]peer.Peer{other}), } mes, err := msg.FromObject(mes.Peer(), resp) diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 1046516b6..44802babb 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -14,7 +14,7 @@ import ( var CloserPeerCount = 4 // dhthandler specifies the signature of functions that handle DHT messages. -type dhtHandler func(*peer.Peer, *Message) (*Message, error) +type dhtHandler func(peer.Peer, *Message) (*Message, error) func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler { switch t { @@ -35,7 +35,7 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler { } } -func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error) { log.Debug("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey()) // setup response @@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error } // Store a value in this peer local storage -func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *Message) (*Message, error) { dht.dslock.Lock() defer dht.dslock.Unlock() dskey := u.Key(pmes.GetKey()).DsKey() @@ -102,18 +102,18 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error return pmes, err } -func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *Message) (*Message, error) { log.Debug("%s Responding to ping from %s!\n", dht.self, p) return pmes, nil } -func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *Message) (*Message, error) { resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel()) - var closest []*peer.Peer + var closest []peer.Peer // if looking for self... special case where we send it on CloserPeers. - if peer.ID(pmes.GetKey()).Equal(dht.self.ID) { - closest = []*peer.Peer{dht.self} + if peer.ID(pmes.GetKey()).Equal(dht.self.ID()) { + closest = []peer.Peer{dht.self} } else { closest = dht.betterPeersToQuery(pmes, CloserPeerCount) } @@ -123,9 +123,9 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error return resp, nil } - var withAddresses []*peer.Peer + var withAddresses []peer.Peer for _, p := range closest { - if len(p.Addresses) > 0 { + if len(p.Addresses()) > 0 { withAddresses = append(withAddresses, p) } } @@ -137,7 +137,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error return resp, nil } -func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *Message) (*Message, error) { resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) // check if we have this value, to add ourselves as provider. @@ -171,10 +171,10 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, e type providerInfo struct { Creation time.Time - Value *peer.Peer + Value peer.Peer } -func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *Message) (*Message, error) { key := u.Key(pmes.GetKey()) log.Debug("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key)) @@ -182,7 +182,7 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, er // add provider should use the address given in the message for _, pb := range pmes.GetProviderPeers() { pid := peer.ID(pb.GetId()) - if pid.Equal(p.ID) { + if pid.Equal(p.ID()) { addr, err := pb.Address() if err != nil { diff --git a/routing/dht/providers.go b/routing/dht/providers.go index c62755cf2..204fdf7d5 100644 --- a/routing/dht/providers.go +++ b/routing/dht/providers.go @@ -20,12 +20,12 @@ type ProviderManager struct { type addProv struct { k u.Key - val *peer.Peer + val peer.Peer } type getProv struct { k u.Key - resp chan []*peer.Peer + resp chan []peer.Peer } func NewProviderManager(local peer.ID) *ProviderManager { @@ -45,7 +45,7 @@ func (pm *ProviderManager) run() { for { select { case np := <-pm.newprovs: - if np.val.ID.Equal(pm.lpeer) { + if np.val.ID().Equal(pm.lpeer) { pm.local[np.k] = struct{}{} } pi := new(providerInfo) @@ -54,7 +54,7 @@ func (pm *ProviderManager) run() { arr := pm.providers[np.k] pm.providers[np.k] = append(arr, pi) case gp := <-pm.getprovs: - var parr []*peer.Peer + var parr []peer.Peer provs := pm.providers[gp.k] for _, p := range provs { parr = append(parr, p.Value) @@ -82,17 +82,17 @@ func (pm *ProviderManager) run() { } } -func (pm *ProviderManager) AddProvider(k u.Key, val *peer.Peer) { +func (pm *ProviderManager) AddProvider(k u.Key, val peer.Peer) { pm.newprovs <- &addProv{ k: k, val: val, } } -func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer { +func (pm *ProviderManager) GetProviders(k u.Key) []peer.Peer { gp := new(getProv) gp.k = k - gp.resp = make(chan []*peer.Peer) + gp.resp = make(chan []peer.Peer) pm.getprovs <- gp return <-gp.resp } diff --git a/routing/dht/providers_test.go b/routing/dht/providers_test.go index 0cdfa4fcc..b37327d2e 100644 --- a/routing/dht/providers_test.go +++ b/routing/dht/providers_test.go @@ -11,7 +11,7 @@ func TestProviderManager(t *testing.T) { mid := peer.ID("testing") p := NewProviderManager(mid) a := u.Key("test") - p.AddProvider(a, &peer.Peer{}) + p.AddProvider(a, peer.WithIDString("testingprovider")) resp := p.GetProviders(a) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") diff --git a/routing/dht/query.go b/routing/dht/query.go index 0a9ca0bd8..ef3670c7d 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -26,10 +26,10 @@ type dhtQuery struct { } type dhtQueryResult struct { - value []byte // GetValue - peer *peer.Peer // FindPeer - providerPeers []*peer.Peer // GetProviders - closerPeers []*peer.Peer // * + value []byte // GetValue + peer peer.Peer // FindPeer + providerPeers []peer.Peer // GetProviders + closerPeers []peer.Peer // * success bool } @@ -47,10 +47,10 @@ func newQuery(k u.Key, f queryFunc) *dhtQuery { // - the value // - a list of peers potentially better able to serve the query // - an error -type queryFunc func(context.Context, *peer.Peer) (*dhtQueryResult, error) +type queryFunc func(context.Context, peer.Peer) (*dhtQueryResult, error) // Run runs the query at hand. pass in a list of peers to use first. -func (q *dhtQuery) Run(ctx context.Context, peers []*peer.Peer) (*dhtQueryResult, error) { +func (q *dhtQuery) Run(ctx context.Context, peers []peer.Peer) (*dhtQueryResult, error) { runner := newQueryRunner(ctx, q) return runner.Run(peers) } @@ -100,7 +100,7 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner { } } -func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) { +func (r *dhtQueryRunner) Run(peers []peer.Peer) (*dhtQueryResult, error) { log.Debug("Run query with %d peers.", len(peers)) if len(peers) == 0 { log.Warning("Running query with no peers!") @@ -148,7 +148,7 @@ func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) { return nil, err } -func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) { +func (r *dhtQueryRunner) addPeerToQuery(next peer.Peer, benchmark peer.Peer) { if next == nil { // wtf why are peers nil?!? log.Error("Query getting nil peers!!!\n") @@ -156,7 +156,7 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) { } // if new peer further away than whom we got it from, bother (loops) - if benchmark != nil && kb.Closer(benchmark.ID, next.ID, r.query.key) { + if benchmark != nil && kb.Closer(benchmark.ID(), next.ID(), r.query.key) { return } @@ -200,7 +200,7 @@ func (r *dhtQueryRunner) spawnWorkers() { } } -func (r *dhtQueryRunner) queryPeer(p *peer.Peer) { +func (r *dhtQueryRunner) queryPeer(p peer.Peer) { log.Debug("spawned worker for: %v\n", p) // make sure we rate limit concurrency. diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 55ef265cb..7f004dc47 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -23,13 +23,13 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error return err } - var peers []*peer.Peer + var peers []peer.Peer for _, route := range dht.routingTables { npeers := route.NearestPeers(kb.ConvertKey(key), KValue) peers = append(peers, npeers...) } - query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) { + query := newQuery(key, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { log.Debug("%s PutValue qry part %v", dht.self, p) err := dht.putValueToNetwork(ctx, p, string(key), value) if err != nil { @@ -65,7 +65,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { } // setup the Query - query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) { + query := newQuery(key, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel) if err != nil { @@ -117,8 +117,8 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { return nil } -func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan *peer.Peer { - peerOut := make(chan *peer.Peer, count) +func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer { + peerOut := make(chan peer.Peer, count) go func() { ps := newPeerSet() provs := dht.providers.GetProviders(key) @@ -136,7 +136,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) for _, pp := range peers { wg.Add(1) - go func(p *peer.Peer) { + go func(p peer.Peer) { defer wg.Done() pmes, err := dht.findProvidersSingle(ctx, p, key, 0) if err != nil { @@ -153,7 +153,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int } //TODO: this function could also be done asynchronously -func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan *peer.Peer) { +func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan peer.Peer) { for _, pbp := range peers { // construct new peer @@ -173,7 +173,7 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet // Find specific Peer // FindPeer searches for a peer with given ID. -func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error) { +func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) { // Check if were already connected to them p, _ := dht.FindLocal(id) @@ -186,7 +186,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error if p == nil { return nil, nil } - if p.ID.Equal(id) { + if p.ID().Equal(id) { return p, nil } @@ -205,7 +205,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error continue } - if nxtPeer.ID.Equal(id) { + if nxtPeer.ID().Equal(id) { return nxtPeer, nil } @@ -214,7 +214,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error return nil, u.ErrNotFound } -func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Peer, error) { +func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (peer.Peer, error) { // Check if were already connected to them p, _ := dht.FindLocal(id) @@ -230,7 +230,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee } // setup query function - query := newQuery(u.Key(id), func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) { + query := newQuery(u.Key(id), func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel) if err != nil { log.Error("%s getPeer error: %v", dht.self, err) @@ -242,7 +242,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee routeLevel++ } - nxtprs := make([]*peer.Peer, len(plist)) + nxtprs := make([]peer.Peer, len(plist)) for i, fp := range plist { nxtp, err := dht.peerFromInfo(fp) if err != nil { @@ -250,7 +250,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee continue } - if nxtp.ID.Equal(id) { + if nxtp.ID().Equal(id) { return &dhtQueryResult{peer: nxtp, success: true}, nil } @@ -272,7 +272,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee } // Ping a peer, log the time it took -func (dht *IpfsDHT) Ping(ctx context.Context, p *peer.Peer) error { +func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error { // Thoughts: maybe this should accept an ID and do a peer lookup? log.Info("ping %s start", p) diff --git a/routing/dht/util.go b/routing/dht/util.go index d12f7f9df..3cc812638 100644 --- a/routing/dht/util.go +++ b/routing/dht/util.go @@ -52,15 +52,15 @@ func newPeerSet() *peerSet { return ps } -func (ps *peerSet) Add(p *peer.Peer) { +func (ps *peerSet) Add(p peer.Peer) { ps.lk.Lock() - ps.ps[string(p.ID)] = true + ps.ps[string(p.ID())] = true ps.lk.Unlock() } -func (ps *peerSet) Contains(p *peer.Peer) bool { +func (ps *peerSet) Contains(p peer.Peer) bool { ps.lk.RLock() - _, ok := ps.ps[string(p.ID)] + _, ok := ps.ps[string(p.ID())] ps.lk.RUnlock() return ok } @@ -71,12 +71,12 @@ func (ps *peerSet) Size() int { return len(ps.ps) } -func (ps *peerSet) AddIfSmallerThan(p *peer.Peer, maxsize int) bool { +func (ps *peerSet) AddIfSmallerThan(p peer.Peer, maxsize int) bool { var success bool ps.lk.Lock() - if _, ok := ps.ps[string(p.ID)]; !ok && len(ps.ps) < maxsize { + if _, ok := ps.ps[string(p.ID())]; !ok && len(ps.ps) < maxsize { success = true - ps.ps[string(p.ID)] = true + ps.ps[string(p.ID())] = true } ps.lk.Unlock() return success diff --git a/routing/kbucket/bucket.go b/routing/kbucket/bucket.go index 3a9c71fad..b114f9e21 100644 --- a/routing/kbucket/bucket.go +++ b/routing/kbucket/bucket.go @@ -23,7 +23,7 @@ func (b *Bucket) find(id peer.ID) *list.Element { b.lk.RLock() defer b.lk.RUnlock() for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(*peer.Peer).ID.Equal(id) { + if e.Value.(peer.Peer).ID().Equal(id) { return e } } @@ -36,18 +36,18 @@ func (b *Bucket) moveToFront(e *list.Element) { b.lk.Unlock() } -func (b *Bucket) pushFront(p *peer.Peer) { +func (b *Bucket) pushFront(p peer.Peer) { b.lk.Lock() b.list.PushFront(p) b.lk.Unlock() } -func (b *Bucket) popBack() *peer.Peer { +func (b *Bucket) popBack() peer.Peer { b.lk.Lock() defer b.lk.Unlock() last := b.list.Back() b.list.Remove(last) - return last.Value.(*peer.Peer) + return last.Value.(peer.Peer) } func (b *Bucket) len() int { @@ -68,7 +68,7 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket { newbuck.list = out e := b.list.Front() for e != nil { - peerID := ConvertPeerID(e.Value.(*peer.Peer).ID) + peerID := ConvertPeerID(e.Value.(peer.Peer).ID()) peerCPL := commonPrefixLen(peerID, target) if peerCPL > cpl { cur := e diff --git a/routing/kbucket/table.go b/routing/kbucket/table.go index 45ffb3cdf..6f37e94de 100644 --- a/routing/kbucket/table.go +++ b/routing/kbucket/table.go @@ -42,10 +42,10 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration) *Routing // Update adds or moves the given peer to the front of its respective bucket // If a peer gets removed from a bucket, it is returned -func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer { +func (rt *RoutingTable) Update(p peer.Peer) peer.Peer { rt.tabLock.Lock() defer rt.tabLock.Unlock() - peerID := ConvertPeerID(p.ID) + peerID := ConvertPeerID(p.ID()) cpl := commonPrefixLen(peerID, rt.local) bucketID := cpl @@ -54,7 +54,7 @@ func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer { } bucket := rt.Buckets[bucketID] - e := bucket.find(p.ID) + e := bucket.find(p.ID()) if e == nil { // New peer, add to bucket if p.GetLatency() > rt.maxLatency { @@ -93,7 +93,7 @@ func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer { // A helper struct to sort peers by their distance to the local node type peerDistance struct { - p *peer.Peer + p peer.Peer distance ID } @@ -110,8 +110,8 @@ func (p peerSorterArr) Less(a, b int) bool { func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { for e := peerList.Front(); e != nil; e = e.Next() { - p := e.Value.(*peer.Peer) - pID := ConvertPeerID(p.ID) + p := e.Value.(peer.Peer) + pID := ConvertPeerID(p.ID()) pd := peerDistance{ p: p, distance: xor(target, pID), @@ -126,16 +126,16 @@ func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) pe } // Find a specific peer by ID or return nil -func (rt *RoutingTable) Find(id peer.ID) *peer.Peer { +func (rt *RoutingTable) Find(id peer.ID) peer.Peer { srch := rt.NearestPeers(ConvertPeerID(id), 1) - if len(srch) == 0 || !srch[0].ID.Equal(id) { + if len(srch) == 0 || !srch[0].ID().Equal(id) { return nil } return srch[0] } // NearestPeer returns a single peer that is nearest to the given ID -func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer { +func (rt *RoutingTable) NearestPeer(id ID) peer.Peer { peers := rt.NearestPeers(id, 1) if len(peers) > 0 { return peers[0] @@ -146,7 +146,7 @@ func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer { } // NearestPeers returns a list of the 'count' closest peers to the given ID -func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { +func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.Peer { rt.tabLock.RLock() defer rt.tabLock.RUnlock() cpl := commonPrefixLen(id, rt.local) @@ -178,7 +178,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { // Sort by distance to local peer sort.Sort(peerArr) - var out []*peer.Peer + var out []peer.Peer for i := 0; i < count && i < peerArr.Len(); i++ { out = append(out, peerArr[i].p) } @@ -197,11 +197,11 @@ func (rt *RoutingTable) Size() int { // ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table. // NOTE: This is potentially unsafe... use at your own risk -func (rt *RoutingTable) ListPeers() []*peer.Peer { - var peers []*peer.Peer +func (rt *RoutingTable) ListPeers() []peer.Peer { + var peers []peer.Peer for _, buck := range rt.Buckets { for e := buck.getIter(); e != nil; e = e.Next() { - peers = append(peers, e.Value.(*peer.Peer)) + peers = append(peers, e.Value.(peer.Peer)) } } return peers @@ -213,6 +213,6 @@ func (rt *RoutingTable) Print() { rt.tabLock.RLock() peers := rt.ListPeers() for i, p := range peers { - fmt.Printf("%d) %s %s\n", i, p.ID.Pretty(), p.GetLatency().String()) + fmt.Printf("%d) %s %s\n", i, p.ID().Pretty(), p.GetLatency().String()) } } diff --git a/routing/kbucket/table_test.go b/routing/kbucket/table_test.go index cc1cdfba1..2b45d1572 100644 --- a/routing/kbucket/table_test.go +++ b/routing/kbucket/table_test.go @@ -10,11 +10,10 @@ import ( peer "github.com/jbenet/go-ipfs/peer" ) -func _randPeer() *peer.Peer { - p := new(peer.Peer) - p.ID = make(peer.ID, 16) - crand.Read(p.ID) - return p +func _randPeer() peer.Peer { + id := make(peer.ID, 16) + crand.Read(id) + return peer.WithID(id) } func _randID() ID { @@ -29,25 +28,25 @@ func _randID() ID { func TestBucket(t *testing.T) { b := newBucket() - peers := make([]*peer.Peer, 100) + peers := make([]peer.Peer, 100) for i := 0; i < 100; i++ { peers[i] = _randPeer() b.pushFront(peers[i]) } local := _randPeer() - localID := ConvertPeerID(local.ID) + localID := ConvertPeerID(local.ID()) i := rand.Intn(len(peers)) - e := b.find(peers[i].ID) + e := b.find(peers[i].ID()) if e == nil { t.Errorf("Failed to find peer: %v", peers[i]) } - spl := b.Split(0, ConvertPeerID(local.ID)) + spl := b.Split(0, ConvertPeerID(local.ID())) llist := b.list for e := llist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(*peer.Peer).ID) + p := ConvertPeerID(e.Value.(peer.Peer).ID()) cpl := commonPrefixLen(p, localID) if cpl > 0 { t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket") @@ -56,7 +55,7 @@ func TestBucket(t *testing.T) { rlist := spl.list for e := rlist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(*peer.Peer).ID) + p := ConvertPeerID(e.Value.(peer.Peer).ID()) cpl := commonPrefixLen(p, localID) if cpl == 0 { t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket") @@ -67,9 +66,9 @@ func TestBucket(t *testing.T) { // Right now, this just makes sure that it doesnt hang or crash func TestTableUpdate(t *testing.T) { local := _randPeer() - rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour) + rt := NewRoutingTable(10, ConvertPeerID(local.ID()), time.Hour) - peers := make([]*peer.Peer, 100) + peers := make([]peer.Peer, 100) for i := 0; i < 100; i++ { peers[i] = _randPeer() } @@ -93,33 +92,33 @@ func TestTableUpdate(t *testing.T) { func TestTableFind(t *testing.T) { local := _randPeer() - rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour) + rt := NewRoutingTable(10, ConvertPeerID(local.ID()), time.Hour) - peers := make([]*peer.Peer, 100) + peers := make([]peer.Peer, 100) for i := 0; i < 5; i++ { peers[i] = _randPeer() rt.Update(peers[i]) } t.Logf("Searching for peer: '%s'", peers[2]) - found := rt.NearestPeer(ConvertPeerID(peers[2].ID)) - if !found.ID.Equal(peers[2].ID) { + found := rt.NearestPeer(ConvertPeerID(peers[2].ID())) + if !found.ID().Equal(peers[2].ID()) { t.Fatalf("Failed to lookup known node...") } } func TestTableFindMultiple(t *testing.T) { local := _randPeer() - rt := NewRoutingTable(20, ConvertPeerID(local.ID), time.Hour) + rt := NewRoutingTable(20, ConvertPeerID(local.ID()), time.Hour) - peers := make([]*peer.Peer, 100) + peers := make([]peer.Peer, 100) for i := 0; i < 18; i++ { peers[i] = _randPeer() rt.Update(peers[i]) } t.Logf("Searching for peer: '%s'", peers[2]) - found := rt.NearestPeers(ConvertPeerID(peers[2].ID), 15) + found := rt.NearestPeers(ConvertPeerID(peers[2].ID()), 15) if len(found) != 15 { t.Fatalf("Got back different number of peers than we expected.") } @@ -131,7 +130,7 @@ func TestTableFindMultiple(t *testing.T) { func TestTableMultithreaded(t *testing.T) { local := peer.ID("localPeer") tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour) - var peers []*peer.Peer + var peers []peer.Peer for i := 0; i < 500; i++ { peers = append(peers, _randPeer()) } @@ -156,7 +155,7 @@ func TestTableMultithreaded(t *testing.T) { go func() { for i := 0; i < 1000; i++ { n := rand.Intn(len(peers)) - tab.Find(peers[n].ID) + tab.Find(peers[n].ID()) } done <- struct{}{} }() @@ -170,7 +169,7 @@ func BenchmarkUpdates(b *testing.B) { local := ConvertKey("localKey") tab := NewRoutingTable(20, local, time.Hour) - var peers []*peer.Peer + var peers []peer.Peer for i := 0; i < b.N; i++ { peers = append(peers, _randPeer()) } @@ -186,7 +185,7 @@ func BenchmarkFinds(b *testing.B) { local := ConvertKey("localKey") tab := NewRoutingTable(20, local, time.Hour) - var peers []*peer.Peer + var peers []peer.Peer for i := 0; i < b.N; i++ { peers = append(peers, _randPeer()) tab.Update(peers[i]) @@ -194,6 +193,6 @@ func BenchmarkFinds(b *testing.B) { b.StartTimer() for i := 0; i < b.N; i++ { - tab.Find(peers[i].ID) + tab.Find(peers[i].ID()) } } diff --git a/routing/mock/routing.go b/routing/mock/routing.go index 954914c3b..caa74ffe3 100644 --- a/routing/mock/routing.go +++ b/routing/mock/routing.go @@ -17,10 +17,10 @@ var _ routing.IpfsRouting = &MockRouter{} type MockRouter struct { datastore ds.Datastore hashTable RoutingServer - peer *peer.Peer + peer peer.Peer } -func NewMockRouter(local *peer.Peer, dstore ds.Datastore) routing.IpfsRouting { +func NewMockRouter(local peer.Peer, dstore ds.Datastore) routing.IpfsRouting { return &MockRouter{ datastore: dstore, peer: local, @@ -50,16 +50,16 @@ func (mr *MockRouter) GetValue(ctx context.Context, key u.Key) ([]byte, error) { return data, nil } -func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]*peer.Peer, error) { +func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer, error) { return nil, nil } -func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (*peer.Peer, error) { +func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) { return nil, nil } -func (mr *MockRouter) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan *peer.Peer { - out := make(chan *peer.Peer) +func (mr *MockRouter) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.Peer { + out := make(chan peer.Peer) go func() { defer close(out) for i, p := range mr.hashTable.Providers(k) { @@ -81,11 +81,11 @@ func (mr *MockRouter) Provide(_ context.Context, key u.Key) error { } type RoutingServer interface { - Announce(*peer.Peer, u.Key) error + Announce(peer.Peer, u.Key) error - Providers(u.Key) []*peer.Peer + Providers(u.Key) []peer.Peer - Client(p *peer.Peer) routing.IpfsRouting + Client(p peer.Peer) routing.IpfsRouting } func VirtualRoutingServer() RoutingServer { @@ -99,7 +99,7 @@ type hashTable struct { providers map[u.Key]peer.Map } -func (rs *hashTable) Announce(p *peer.Peer, k u.Key) error { +func (rs *hashTable) Announce(p peer.Peer, k u.Key) error { rs.lock.Lock() defer rs.lock.Unlock() @@ -111,10 +111,10 @@ func (rs *hashTable) Announce(p *peer.Peer, k u.Key) error { return nil } -func (rs *hashTable) Providers(k u.Key) []*peer.Peer { +func (rs *hashTable) Providers(k u.Key) []peer.Peer { rs.lock.RLock() defer rs.lock.RUnlock() - ret := make([]*peer.Peer, 0) + ret := make([]peer.Peer, 0) peerset, ok := rs.providers[k] if !ok { return ret @@ -131,7 +131,7 @@ func (rs *hashTable) Providers(k u.Key) []*peer.Peer { return ret } -func (rs *hashTable) Client(p *peer.Peer) routing.IpfsRouting { +func (rs *hashTable) Client(p peer.Peer) routing.IpfsRouting { return &MockRouter{ peer: p, hashTable: rs, diff --git a/routing/mock/routing_test.go b/routing/mock/routing_test.go index 650f5d3d5..196e00b5e 100644 --- a/routing/mock/routing_test.go +++ b/routing/mock/routing_test.go @@ -20,9 +20,7 @@ func TestKeyNotFound(t *testing.T) { func TestSetAndGet(t *testing.T) { pid := peer.ID([]byte("the peer id")) - p := &peer.Peer{ - ID: pid, - } + p := peer.WithID(pid) k := u.Key("42") rs := VirtualRoutingServer() err := rs.Announce(p, k) @@ -34,7 +32,7 @@ func TestSetAndGet(t *testing.T) { t.Fatal("should be one") } for _, elem := range providers { - if bytes.Equal(elem.ID, pid) { + if bytes.Equal(elem.ID(), pid) { return } } @@ -42,7 +40,7 @@ func TestSetAndGet(t *testing.T) { } func TestClientFindProviders(t *testing.T) { - peer := &peer.Peer{ID: []byte("42")} + peer := peer.WithIDString("42") rs := VirtualRoutingServer() client := rs.Client(peer) @@ -57,7 +55,7 @@ func TestClientFindProviders(t *testing.T) { isInHT := false for _, p := range providersFromHashTable { - if bytes.Equal(p.ID, peer.ID) { + if bytes.Equal(p.ID(), peer.ID()) { isInHT = true } } @@ -67,7 +65,7 @@ func TestClientFindProviders(t *testing.T) { providersFromClient := client.FindProvidersAsync(context.Background(), u.Key("hello"), max) isInClient := false for p := range providersFromClient { - if bytes.Equal(p.ID, peer.ID) { + if bytes.Equal(p.ID(), peer.ID()) { isInClient = true } } @@ -81,9 +79,7 @@ func TestClientOverMax(t *testing.T) { k := u.Key("hello") numProvidersForHelloKey := 100 for i := 0; i < numProvidersForHelloKey; i++ { - peer := &peer.Peer{ - ID: []byte(string(i)), - } + peer := peer.WithIDString(string(i)) err := rs.Announce(peer, k) if err != nil { t.Fatal(err) @@ -96,7 +92,7 @@ func TestClientOverMax(t *testing.T) { } max := 10 - peer := &peer.Peer{ID: []byte("TODO")} + peer := peer.WithIDString("TODO") client := rs.Client(peer) providersFromClient := client.FindProvidersAsync(context.Background(), k, max) @@ -118,9 +114,7 @@ func TestCanceledContext(t *testing.T) { i := 0 go func() { // infinite stream for { - peer := &peer.Peer{ - ID: []byte(string(i)), - } + peer := peer.WithIDString(string(i)) err := rs.Announce(peer, k) if err != nil { t.Fatal(err) @@ -129,7 +123,7 @@ func TestCanceledContext(t *testing.T) { } }() - local := &peer.Peer{ID: []byte("peer id doesn't matter")} + local := peer.WithIDString("peer id doesn't matter") client := rs.Client(local) t.Log("warning: max is finite so this test is non-deterministic") diff --git a/routing/routing.go b/routing/routing.go index f3dd0c9d8..cb60e5ee8 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -10,7 +10,7 @@ import ( // IpfsRouting is the routing module interface // It is implemented by things like DHTs, etc. type IpfsRouting interface { - FindProvidersAsync(context.Context, u.Key, int) <-chan *peer.Peer + FindProvidersAsync(context.Context, u.Key, int) <-chan peer.Peer // Basic Put/Get @@ -28,5 +28,5 @@ type IpfsRouting interface { // Find specific Peer // FindPeer searches for a peer with given ID. - FindPeer(context.Context, peer.ID) (*peer.Peer, error) + FindPeer(context.Context, peer.ID) (peer.Peer, error) }