mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-09 10:18:04 +08:00
logging, logging, and some minor logging
This commit is contained in:
parent
e1f2fe75f8
commit
ab7491f809
@ -66,7 +66,7 @@ type bitswap struct {
|
||||
//
|
||||
// TODO ensure only one active request per key
|
||||
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
|
||||
log.Debug("Get Block %v", k)
|
||||
log.Debugf("Get Block %v", k)
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(parent)
|
||||
bs.wantlist.Add(k)
|
||||
@ -82,10 +82,10 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
|
||||
}
|
||||
message.AppendWanted(k)
|
||||
for peerToQuery := range peersToQuery {
|
||||
log.Debug("bitswap got peersToQuery: %s", peerToQuery)
|
||||
log.Debugf("bitswap got peersToQuery: %s", peerToQuery)
|
||||
go func(p peer.Peer) {
|
||||
|
||||
log.Debug("bitswap dialing peer: %s", p)
|
||||
log.Debugf("bitswap dialing peer: %s", p)
|
||||
err := bs.sender.DialPeer(p)
|
||||
if err != nil {
|
||||
log.Errorf("Error sender.DialPeer(%s)", p)
|
||||
@ -124,7 +124,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
|
||||
// HasBlock announces the existance of a block to bitswap, potentially sending
|
||||
// it to peers (Partners) whose WantLists include it.
|
||||
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
|
||||
log.Debug("Has Block %v", blk.Key())
|
||||
log.Debugf("Has Block %v", blk.Key())
|
||||
bs.wantlist.Remove(blk.Key())
|
||||
bs.sendToPeersThatWant(ctx, blk)
|
||||
return bs.routing.Provide(ctx, blk.Key())
|
||||
@ -133,17 +133,24 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
|
||||
// TODO(brian): handle errors
|
||||
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
|
||||
peer.Peer, bsmsg.BitSwapMessage) {
|
||||
log.Debug("ReceiveMessage from %v", p.Key())
|
||||
log.Debugf("ReceiveMessage from %v", p.Key())
|
||||
log.Debugf("Message wantlist: %v", incoming.Wantlist())
|
||||
log.Debugf("Message blockset: %v", incoming.Blocks())
|
||||
|
||||
if p == nil {
|
||||
log.Error("Received message from nil peer!")
|
||||
// TODO propagate the error upward
|
||||
return nil, nil
|
||||
}
|
||||
if incoming == nil {
|
||||
log.Error("Got nil bitswap message!")
|
||||
// TODO propagate the error upward
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Record message bytes in ledger
|
||||
// TODO: this is bad, and could be easily abused.
|
||||
// Should only track *useful* messages in ledger
|
||||
bs.strategy.MessageReceived(p, incoming) // FIRST
|
||||
|
||||
for _, block := range incoming.Blocks() {
|
||||
@ -153,7 +160,10 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
|
||||
}
|
||||
go bs.notifications.Publish(block)
|
||||
go func(block blocks.Block) {
|
||||
_ = bs.HasBlock(ctx, block) // FIXME err ignored
|
||||
err := bs.HasBlock(ctx, block) // FIXME err ignored
|
||||
if err != nil {
|
||||
log.Errorf("HasBlock errored: %s", err)
|
||||
}
|
||||
}(block)
|
||||
}
|
||||
|
||||
@ -162,6 +172,8 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
|
||||
message.AppendWanted(wanted)
|
||||
}
|
||||
for _, key := range incoming.Wantlist() {
|
||||
// TODO: might be better to check if we have the block before checking
|
||||
// if we should send it to someone
|
||||
if bs.strategy.ShouldSendBlockToPeer(key, p) {
|
||||
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
|
||||
continue
|
||||
@ -171,10 +183,13 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
|
||||
}
|
||||
}
|
||||
defer bs.strategy.MessageSent(p, message)
|
||||
|
||||
log.Debug("Returning message.")
|
||||
return p, message
|
||||
}
|
||||
|
||||
func (bs *bitswap) ReceiveError(err error) {
|
||||
log.Errorf("Bitswap ReceiveError: %s", err)
|
||||
// TODO log the network error
|
||||
// TODO bubble the network error up to the parent context/error logger
|
||||
}
|
||||
@ -187,10 +202,10 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage
|
||||
}
|
||||
|
||||
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
|
||||
log.Debug("Sending %v to peers that want it", block.Key())
|
||||
log.Debugf("Sending %v to peers that want it", block.Key())
|
||||
for _, p := range bs.strategy.Peers() {
|
||||
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
|
||||
log.Debug("%v wants %v", p, block.Key())
|
||||
log.Debugf("%v wants %v", p, block.Key())
|
||||
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
|
||||
message := bsmsg.New()
|
||||
message.AppendBlock(block)
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package network
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
|
||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||
@ -48,6 +50,7 @@ func (adapter *impl) HandleMessage(
|
||||
|
||||
// TODO(brian): put this in a helper function
|
||||
if bsmsg == nil || p == nil {
|
||||
adapter.receiver.ReceiveError(errors.New("ReceiveMessage returned nil peer or message"))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -92,7 +92,7 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) {
|
||||
return nil, errors.New("Tried to start nil connection.")
|
||||
}
|
||||
|
||||
log.Debug("%s Started connection: %s", c.LocalPeer(), c.RemotePeer())
|
||||
log.Debugf("%s Started connection: %s", c.LocalPeer(), c.RemotePeer())
|
||||
|
||||
// add address of connection to Peer. Maybe it should happen in connSecure.
|
||||
// NOT adding this address here, because the incoming address in TCP
|
||||
@ -167,7 +167,7 @@ func (s *Swarm) fanOut() {
|
||||
}
|
||||
|
||||
i++
|
||||
log.Debugf("%s sent message to %s (%d)", s.local, msg.Peer(), i)
|
||||
//log.Debugf("%s sent message to %s (%d)", s.local, msg.Peer(), i)
|
||||
// queue it in the connection's buffer
|
||||
c.Out() <- msg.Data()
|
||||
}
|
||||
@ -206,7 +206,7 @@ func (s *Swarm) fanInSingle(c conn.Conn) {
|
||||
return // channel closed.
|
||||
}
|
||||
i++
|
||||
log.Debugf("%s received message from %s (%d)", s.local, c.RemotePeer(), i)
|
||||
//log.Debugf("%s received message from %s (%d)", s.local, c.RemotePeer(), i)
|
||||
s.Incoming <- msg.New(c.RemotePeer(), data)
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,7 +23,7 @@ import (
|
||||
|
||||
var log = u.Logger("dht")
|
||||
|
||||
const doPinging = true
|
||||
const doPinging = false
|
||||
|
||||
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user