From 87407a99b9904cd2ad30caf1dac5344a5db7a0c2 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 25 Oct 2014 12:38:32 -0700 Subject: [PATCH 1/4] add context to blockservice Get --- blockservice/blocks_test.go | 6 +++++- blockservice/blockservice.go | 4 +--- core/commands/block.go | 6 +++++- merkledag/merkledag.go | 6 +++++- net/conn/multiconn.go | 14 +++++++------- 5 files changed, 23 insertions(+), 13 deletions(-) diff --git a/blockservice/blocks_test.go b/blockservice/blocks_test.go index 41eceaae6..69a62d322 100644 --- a/blockservice/blocks_test.go +++ b/blockservice/blocks_test.go @@ -3,6 +3,9 @@ package blockservice import ( "bytes" "testing" + "time" + + "code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" blocks "github.com/jbenet/go-ipfs/blocks" @@ -37,7 +40,8 @@ func TestBlocks(t *testing.T) { t.Error("returned key is not equal to block key", err) } - b2, err := bs.GetBlock(b.Key()) + ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) + b2, err := bs.GetBlock(ctx, b.Key()) if err != nil { t.Error("failed to retrieve block from BlockService", err) return diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 9f914dc38..51e9ad7d6 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -2,7 +2,6 @@ package blockservice import ( "fmt" - "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" @@ -52,7 +51,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). -func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) { +func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) { log.Debug("BlockService GetBlock: '%s'", k) datai, err := s.Datastore.Get(k.DsKey()) if err == nil { @@ -67,7 +66,6 @@ func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) { }, nil } else if err == ds.ErrNotFound && s.Remote != nil { log.Debug("Blockservice: Searching bitswap.") - ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second) blk, err := s.Remote.Block(ctx, k) if err != nil { return nil, err diff --git a/core/commands/block.go b/core/commands/block.go index 14375cf9f..089ed3734 100644 --- a/core/commands/block.go +++ b/core/commands/block.go @@ -5,6 +5,9 @@ import ( "io" "io/ioutil" "os" + "time" + + "code.google.com/p/go.net/context" mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" "github.com/jbenet/go-ipfs/blocks" @@ -26,7 +29,8 @@ func BlockGet(n *core.IpfsNode, args []string, opts map[string]interface{}, out k := u.Key(h) log.Debug("BlockGet key: '%q'", k) - b, err := n.Blocks.GetBlock(k) + ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) + b, err := n.Blocks.GetBlock(ctx, k) if err != nil { return fmt.Errorf("block get: %v", err) } diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 8ce2fed6b..a6dbc6ebf 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -2,6 +2,9 @@ package merkledag import ( "fmt" + "time" + + "code.google.com/p/go.net/context" mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" blocks "github.com/jbenet/go-ipfs/blocks" @@ -204,7 +207,8 @@ func (n *DAGService) Get(k u.Key) (*Node, error) { return nil, fmt.Errorf("DAGService is nil") } - b, err := n.Blocks.GetBlock(k) + ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) + b, err := n.Blocks.GetBlock(ctx, k) if err != nil { return nil, err } diff --git a/net/conn/multiconn.go b/net/conn/multiconn.go index aa45c24c6..13f9a5852 100644 --- a/net/conn/multiconn.go +++ b/net/conn/multiconn.go @@ -67,7 +67,7 @@ func (c *MultiConn) Add(conns ...Conn) { defer c.Unlock() for _, c2 := range conns { - log.Info("MultiConn: adding %s", c2) + log.Infof("MultiConn: adding %s", c2) if c.LocalPeer() != c2.LocalPeer() || c.RemotePeer() != c2.RemotePeer() { log.Error(c2) c.Unlock() // ok to unlock (to log). panicing. @@ -82,7 +82,7 @@ func (c *MultiConn) Add(conns ...Conn) { c.conns[c2.ID()] = c2 go c.fanInSingle(c2) - log.Info("MultiConn: added %s", c2) + log.Infof("MultiConn: added %s", c2) } } @@ -146,7 +146,7 @@ func (c *MultiConn) fanOut() { // send data out through our "best connection" case m, more := <-c.duplex.Out: if !more { - log.Info("%s out channel closed", c) + log.Infof("%s out channel closed", c) return } sc := c.BestConn() @@ -156,7 +156,7 @@ func (c *MultiConn) fanOut() { } i++ - log.Info("%s sending (%d)", sc, i) + log.Infof("%s sending (%d)", sc, i) sc.Out() <- m } } @@ -170,7 +170,7 @@ func (c *MultiConn) fanInSingle(child Conn) { // cleanup all data associated with this child Connection. defer func() { - log.Info("closing: %s", child) + log.Infof("closing: %s", child) // in case it still is in the map, remove it. c.Lock() @@ -197,11 +197,11 @@ func (c *MultiConn) fanInSingle(child Conn) { case m, more := <-child.In(): // receiving data if !more { - log.Info("%s in channel closed", child) + log.Infof("%s in channel closed", child) return // closed } i++ - log.Info("%s received (%d)", child, i) + log.Infof("%s received (%d)", child, i) c.duplex.In <- m } } From e1f2fe75f8420e122580135fc6b523a6f8cbb159 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 25 Oct 2014 03:36:00 -0700 Subject: [PATCH 2/4] add in dag removal --- blockservice/blockservice.go | 4 ++++ exchange/bitswap/bitswap.go | 2 +- merkledag/merkledag.go | 13 +++++++++++++ routing/dht/dht.go | 6 +++++- 4 files changed, 23 insertions(+), 2 deletions(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 51e9ad7d6..0ca533b19 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -76,3 +76,7 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er return nil, u.ErrNotFound } } + +func (s *BlockService) DeleteBlock(k u.Key) error { + return s.Datastore.Delete(k.DsKey()) +} diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 64dcf96a8..9d3abccc2 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -94,7 +94,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) response, err := bs.sender.SendRequest(ctx, p, message) if err != nil { - log.Errorf("Error sender.SendRequest(%s)", p) + log.Error("Error sender.SendRequest(%s) = %s", p, err) return } // FIXME ensure accounting is handled correctly when diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index a6dbc6ebf..7834677a8 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -215,3 +215,16 @@ func (n *DAGService) Get(k u.Key) (*Node, error) { return Decoded(b.Data) } + +func (n *DAGService) Remove(nd *Node) error { + for _, l := range nd.Links { + if l.Node != nil { + n.Remove(l.Node) + } + } + k, err := nd.Key() + if err != nil { + return err + } + return n.Blocks.DeleteBlock(k) +} diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 52ae1f76c..b3ca010b7 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -540,7 +540,11 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) { func (dht *IpfsDHT) Bootstrap(ctx context.Context) { id := make([]byte, 16) rand.Read(id) - _, err := dht.FindPeer(ctx, peer.ID(id)) + p, err := dht.FindPeer(ctx, peer.ID(id)) + if err != nil { + log.Error("Bootstrap peer error: %s", err) + } + err = dht.dialer.DialPeer(p) if err != nil { log.Errorf("Bootstrap peer error: %s", err) } From ab7491f809aeba551d9de6bdbcb722249a0382f9 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 25 Oct 2014 14:50:22 -0700 Subject: [PATCH 3/4] logging, logging, and some minor logging --- exchange/bitswap/bitswap.go | 31 ++++++++++++++----- .../bitswap/network/net_message_adapter.go | 3 ++ net/swarm/conn.go | 6 ++-- routing/dht/dht.go | 2 +- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 9d3abccc2..f631c651c 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -66,7 +66,7 @@ type bitswap struct { // // TODO ensure only one active request per key func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) { - log.Debug("Get Block %v", k) + log.Debugf("Get Block %v", k) ctx, cancelFunc := context.WithCancel(parent) bs.wantlist.Add(k) @@ -82,10 +82,10 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) } message.AppendWanted(k) for peerToQuery := range peersToQuery { - log.Debug("bitswap got peersToQuery: %s", peerToQuery) + log.Debugf("bitswap got peersToQuery: %s", peerToQuery) go func(p peer.Peer) { - log.Debug("bitswap dialing peer: %s", p) + log.Debugf("bitswap dialing peer: %s", p) err := bs.sender.DialPeer(p) if err != nil { log.Errorf("Error sender.DialPeer(%s)", p) @@ -124,7 +124,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) // HasBlock announces the existance of a block to bitswap, potentially sending // it to peers (Partners) whose WantLists include it. func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { - log.Debug("Has Block %v", blk.Key()) + log.Debugf("Has Block %v", blk.Key()) bs.wantlist.Remove(blk.Key()) bs.sendToPeersThatWant(ctx, blk) return bs.routing.Provide(ctx, blk.Key()) @@ -133,17 +133,24 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { // TODO(brian): handle errors func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) ( peer.Peer, bsmsg.BitSwapMessage) { - log.Debug("ReceiveMessage from %v", p.Key()) + log.Debugf("ReceiveMessage from %v", p.Key()) + log.Debugf("Message wantlist: %v", incoming.Wantlist()) + log.Debugf("Message blockset: %v", incoming.Blocks()) if p == nil { + log.Error("Received message from nil peer!") // TODO propagate the error upward return nil, nil } if incoming == nil { + log.Error("Got nil bitswap message!") // TODO propagate the error upward return nil, nil } + // Record message bytes in ledger + // TODO: this is bad, and could be easily abused. + // Should only track *useful* messages in ledger bs.strategy.MessageReceived(p, incoming) // FIRST for _, block := range incoming.Blocks() { @@ -153,7 +160,10 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm } go bs.notifications.Publish(block) go func(block blocks.Block) { - _ = bs.HasBlock(ctx, block) // FIXME err ignored + err := bs.HasBlock(ctx, block) // FIXME err ignored + if err != nil { + log.Errorf("HasBlock errored: %s", err) + } }(block) } @@ -162,6 +172,8 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm message.AppendWanted(wanted) } for _, key := range incoming.Wantlist() { + // TODO: might be better to check if we have the block before checking + // if we should send it to someone if bs.strategy.ShouldSendBlockToPeer(key, p) { if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil { continue @@ -171,10 +183,13 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm } } defer bs.strategy.MessageSent(p, message) + + log.Debug("Returning message.") return p, message } func (bs *bitswap) ReceiveError(err error) { + log.Errorf("Bitswap ReceiveError: %s", err) // TODO log the network error // TODO bubble the network error up to the parent context/error logger } @@ -187,10 +202,10 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage } func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { - log.Debug("Sending %v to peers that want it", block.Key()) + log.Debugf("Sending %v to peers that want it", block.Key()) for _, p := range bs.strategy.Peers() { if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { - log.Debug("%v wants %v", p, block.Key()) + log.Debugf("%v wants %v", p, block.Key()) if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { message := bsmsg.New() message.AppendBlock(block) diff --git a/exchange/bitswap/network/net_message_adapter.go b/exchange/bitswap/network/net_message_adapter.go index 3ae11a2c6..9f51e9010 100644 --- a/exchange/bitswap/network/net_message_adapter.go +++ b/exchange/bitswap/network/net_message_adapter.go @@ -1,6 +1,8 @@ package network import ( + "errors" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" @@ -48,6 +50,7 @@ func (adapter *impl) HandleMessage( // TODO(brian): put this in a helper function if bsmsg == nil || p == nil { + adapter.receiver.ReceiveError(errors.New("ReceiveMessage returned nil peer or message")) return nil } diff --git a/net/swarm/conn.go b/net/swarm/conn.go index 43f935cf7..46b04309a 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -92,7 +92,7 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) { return nil, errors.New("Tried to start nil connection.") } - log.Debug("%s Started connection: %s", c.LocalPeer(), c.RemotePeer()) + log.Debugf("%s Started connection: %s", c.LocalPeer(), c.RemotePeer()) // add address of connection to Peer. Maybe it should happen in connSecure. // NOT adding this address here, because the incoming address in TCP @@ -167,7 +167,7 @@ func (s *Swarm) fanOut() { } i++ - log.Debugf("%s sent message to %s (%d)", s.local, msg.Peer(), i) + //log.Debugf("%s sent message to %s (%d)", s.local, msg.Peer(), i) // queue it in the connection's buffer c.Out() <- msg.Data() } @@ -206,7 +206,7 @@ func (s *Swarm) fanInSingle(c conn.Conn) { return // channel closed. } i++ - log.Debugf("%s received message from %s (%d)", s.local, c.RemotePeer(), i) + //log.Debugf("%s received message from %s (%d)", s.local, c.RemotePeer(), i) s.Incoming <- msg.New(c.RemotePeer(), data) } } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index b3ca010b7..fdb9f96f2 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -23,7 +23,7 @@ import ( var log = u.Logger("dht") -const doPinging = true +const doPinging = false // TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js From d92db1246078fb18e8921fa5c6f79d818f521386 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 26 Oct 2014 00:45:40 +0000 Subject: [PATCH 4/4] lots of logging --- blocks/blocks.go | 6 ++++++ blockservice/blockservice.go | 2 +- crypto/spipe/handshake.go | 6 +++--- exchange/bitswap/bitswap.go | 1 - exchange/bitswap/network/net_message_adapter.go | 4 ++++ net/conn/conn.go | 13 ++++++++++++- net/conn/handshake.go | 2 +- net/conn/interface.go | 2 ++ net/conn/multiconn.go | 16 +++++++++++++++- net/conn/secure_conn.go | 4 ++++ net/swarm/conn.go | 7 +++++-- routing/dht/handlers.go | 2 +- 12 files changed, 54 insertions(+), 11 deletions(-) diff --git a/blocks/blocks.go b/blocks/blocks.go index 696c774ab..9bf556f5a 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -1,6 +1,8 @@ package blocks import ( + "fmt" + mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" u "github.com/jbenet/go-ipfs/util" ) @@ -20,3 +22,7 @@ func NewBlock(data []byte) *Block { func (b *Block) Key() u.Key { return u.Key(b.Multihash) } + +func (b *Block) String() string { + return fmt.Sprintf("[Block %s]", b.Key()) +} diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 0ca533b19..acb6564ed 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -52,7 +52,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) { - log.Debug("BlockService GetBlock: '%s'", k) + log.Debugf("BlockService GetBlock: '%s'", k) datai, err := s.Datastore.Get(k.DsKey()) if err == nil { log.Debug("Blockservice: Got data in datastore.") diff --git a/crypto/spipe/handshake.go b/crypto/spipe/handshake.go index 687c6a541..7a88665ef 100644 --- a/crypto/spipe/handshake.go +++ b/crypto/spipe/handshake.go @@ -53,7 +53,7 @@ func (s *SecurePipe) handshake() error { return err } - log.Debug("handshake: %s <--> %s", s.local, s.remote) + log.Debugf("handshake: %s <--> %s", s.local, s.remote) myPubKey, err := s.local.PubKey().Bytes() if err != nil { return err @@ -105,7 +105,7 @@ func (s *SecurePipe) handshake() error { if err != nil { return err } - log.Debug("%s Remote Peer Identified as %s", s.local, s.remote) + log.Debugf("%s Remote Peer Identified as %s", s.local, s.remote) exchange, err := selectBest(SupportedExchanges, proposeResp.GetExchanges()) if err != nil { @@ -209,7 +209,7 @@ func (s *SecurePipe) handshake() error { return fmt.Errorf("Negotiation failed, got: %s", resp2) } - log.Debug("%s handshake: Got node id: %s", s.local, s.remote) + log.Debugf("%s handshake: Got node id: %s", s.local, s.remote) return nil } diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index f631c651c..5e00a5888 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -135,7 +135,6 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm peer.Peer, bsmsg.BitSwapMessage) { log.Debugf("ReceiveMessage from %v", p.Key()) log.Debugf("Message wantlist: %v", incoming.Wantlist()) - log.Debugf("Message blockset: %v", incoming.Blocks()) if p == nil { log.Error("Received message from nil peer!") diff --git a/exchange/bitswap/network/net_message_adapter.go b/exchange/bitswap/network/net_message_adapter.go index 9f51e9010..c7e1a852d 100644 --- a/exchange/bitswap/network/net_message_adapter.go +++ b/exchange/bitswap/network/net_message_adapter.go @@ -4,6 +4,7 @@ import ( "errors" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + "github.com/jbenet/go-ipfs/util" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" inet "github.com/jbenet/go-ipfs/net" @@ -11,6 +12,8 @@ import ( peer "github.com/jbenet/go-ipfs/peer" ) +var log = util.Logger("net_message_adapter") + // NetMessageAdapter wraps a NetMessage network service func NetMessageAdapter(s inet.Service, n inet.Network, r Receiver) Adapter { adapter := impl{ @@ -60,6 +63,7 @@ func (adapter *impl) HandleMessage( return nil } + log.Debugf("Message size: %d", len(outgoing.Data())) return outgoing } diff --git a/net/conn/conn.go b/net/conn/conn.go index 444616b6c..3df82d15d 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -21,7 +21,7 @@ const ( ChanBuffer = 10 // MaxMessageSize is the size of the largest single message - MaxMessageSize = 1 << 20 // 1 MB + MaxMessageSize = 1 << 22 // 4 MB // HandshakeTimeout for when nodes first connect HandshakeTimeout = time.Second * 5 @@ -97,6 +97,17 @@ func (c *singleConn) close() error { return err } +func (c *singleConn) GetError() error { + select { + case err := <-c.msgio.incoming.ErrChan: + return err + case err := <-c.msgio.outgoing.ErrChan: + return err + default: + return nil + } +} + // ID is an identifier unique to this connection. func (c *singleConn) ID() string { return ID(c) diff --git a/net/conn/handshake.go b/net/conn/handshake.go index f8c88a0b6..ab8cea8e3 100644 --- a/net/conn/handshake.go +++ b/net/conn/handshake.go @@ -46,7 +46,7 @@ func Handshake1(ctx context.Context, c Conn) error { return fmt.Errorf("could not decode remote version: %q", err) } - log.Debug("Received remote version (%s) from %s", remoteH, rpeer) + log.Debugf("Received remote version (%s) from %s", remoteH, rpeer) } if err := handshake.Handshake1Compatible(localH, remoteH); err != nil { diff --git a/net/conn/interface.go b/net/conn/interface.go index 36ff4131e..689b6c9d6 100644 --- a/net/conn/interface.go +++ b/net/conn/interface.go @@ -37,6 +37,8 @@ type Conn interface { // Out returns a writable message channel Out() chan<- []byte + GetError() error + // Close ends the connection // Close() error -- already in ContextCloser } diff --git a/net/conn/multiconn.go b/net/conn/multiconn.go index 13f9a5852..885ad6008 100644 --- a/net/conn/multiconn.go +++ b/net/conn/multiconn.go @@ -198,6 +198,10 @@ func (c *MultiConn) fanInSingle(child Conn) { case m, more := <-child.In(): // receiving data if !more { log.Infof("%s in channel closed", child) + err := c.GetError() + if err != nil { + log.Errorf("Found error on connection: %s", err) + } return // closed } i++ @@ -209,7 +213,7 @@ func (c *MultiConn) fanInSingle(child Conn) { // close is the internal close function, called by ContextCloser.Close func (c *MultiConn) close() error { - log.Debug("%s closing Conn with %s", c.local, c.remote) + log.Debugf("%s closing Conn with %s", c.local, c.remote) // get connections c.RLock() @@ -291,3 +295,13 @@ func (c *MultiConn) In() <-chan []byte { func (c *MultiConn) Out() chan<- []byte { return c.duplex.Out } + +func (c *MultiConn) GetError() error { + for _, sub := range c.conns { + err := sub.GetError() + if err != nil { + return err + } + } + return nil +} diff --git a/net/conn/secure_conn.go b/net/conn/secure_conn.go index 3c80f3273..128b8d283 100644 --- a/net/conn/secure_conn.go +++ b/net/conn/secure_conn.go @@ -134,3 +134,7 @@ func (c *secureConn) In() <-chan []byte { func (c *secureConn) Out() chan<- []byte { return c.secure.Out } + +func (c *secureConn) GetError() error { + return c.insecure.GetError() +} diff --git a/net/swarm/conn.go b/net/swarm/conn.go index 46b04309a..1b0aa933b 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -154,6 +154,9 @@ func (s *Swarm) fanOut() { log.Infof("%s outgoing channel closed", s) return } + if len(msg.Data()) >= conn.MaxMessageSize { + log.Critical("Attempted to send message bigger than max size.") + } s.connsLock.RLock() c, found := s.conns[msg.Peer().Key()] @@ -167,7 +170,7 @@ func (s *Swarm) fanOut() { } i++ - //log.Debugf("%s sent message to %s (%d)", s.local, msg.Peer(), i) + log.Debugf("%s sent message to %s (%d)", s.local, msg.Peer(), i) // queue it in the connection's buffer c.Out() <- msg.Data() } @@ -206,7 +209,7 @@ func (s *Swarm) fanInSingle(c conn.Conn) { return // channel closed. } i++ - //log.Debugf("%s received message from %s (%d)", s.local, c.RemotePeer(), i) + log.Debugf("%s received message from %s (%d)", s.local, c.RemotePeer(), i) s.Incoming <- msg.New(c.RemotePeer(), data) } } diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 35355b32f..d5db8d1da 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -145,7 +145,7 @@ func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Messa resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) // check if we have this value, to add ourselves as provider. - log.Debugf("handling GetProviders: '%s'", pmes.GetKey()) + log.Debugf("handling GetProviders: '%s'", u.Key(pmes.GetKey())) dsk := u.Key(pmes.GetKey()).DsKey() has, err := dht.datastore.Has(dsk) if err != nil && err != ds.ErrNotFound {