diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 53c89a7d9..4511e188e 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -150,7 +150,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, e ctx, cancelFunc := context.WithCancel(parent) ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest")) - defer log.EventBegin(ctx, "GetBlockRequest", &k).Done() + log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k) + defer log.Event(ctx, "Bitswap.GetBlockRequest.End", &k) defer func() { cancelFunc() @@ -200,6 +201,10 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block } promise := bs.notifications.Subscribe(ctx, keys...) + for _, k := range keys { + log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k) + } + bs.wm.WantBlocks(keys) req := &blockRequest{ @@ -310,6 +315,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg return } + k := b.Key() + log.Event(ctx, "Bitswap.GetBlockRequest.End", &k) + log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) if err := bs.HasBlock(hasBlockCtx, b); err != nil { diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 17c74a879..edd05bfb3 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -7,7 +7,9 @@ import ( process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + key "github.com/ipfs/go-ipfs/blocks/key" + eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" ) var TaskWorkerCount = 8 @@ -36,8 +38,9 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // Start up workers to handle requests from other nodes for the data on this node for i := 0; i < TaskWorkerCount; i++ { + i := i px.Go(func(px process.Process) { - bs.taskWorker(ctx) + bs.taskWorker(ctx, i) }) } @@ -55,15 +58,18 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // consider increasing number if providing blocks bottlenecks // file transfers for i := 0; i < provideWorkers; i++ { + i := i px.Go(func(px process.Process) { - bs.provideWorker(ctx) + bs.provideWorker(ctx, i) }) } } -func (bs *Bitswap) taskWorker(ctx context.Context) { +func (bs *Bitswap) taskWorker(ctx context.Context, id int) { + idmap := eventlog.LoggableMap{"ID": id} defer log.Info("bitswap task worker shutting down...") for { + log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap) select { case nextEnvelope := <-bs.engine.Outbox(): select { @@ -71,6 +77,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { if !ok { continue } + log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{"ID": id, "Target": envelope.Peer.Pretty(), "Block": envelope.Block.Multihash.B58String()}) bs.wm.SendBlock(ctx, envelope) case <-ctx.Done(): @@ -82,10 +89,13 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { } } -func (bs *Bitswap) provideWorker(ctx context.Context) { +func (bs *Bitswap) provideWorker(ctx context.Context, id int) { + idmap := eventlog.LoggableMap{"ID": id} for { + log.Event(ctx, "Bitswap.ProvideWorker.Loop", idmap) select { case k, ok := <-bs.provideKeys: + log.Event(ctx, "Bitswap.ProvideWorker.Work", idmap, &k) if !ok { log.Debug("provideKeys channel closed") return @@ -139,6 +149,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) { defer log.Info("bitswap client worker shutting down...") for { + log.Event(parent, "Bitswap.ProviderConnector.Loop") select { case req := <-bs.findKeys: keys := req.keys @@ -146,6 +157,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) { log.Warning("Received batch request for zero blocks") continue } + log.Event(parent, "Bitswap.ProviderConnector.Work", eventlog.LoggableMap{"Keys": keys}) // NB: Optimization. Assumes that providers of key[0] are likely to // be able to provide for all keys. This currently holds true in most @@ -174,6 +186,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { defer tick.Stop() for { + log.Event(ctx, "Bitswap.Rebroadcast.idle") select { case <-tick.C: n := bs.wm.wl.Len() @@ -181,6 +194,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { log.Debug(n, "keys in bitswap wantlist") } case <-broadcastSignal.C: // resend unfulfilled wantlist keys + log.Event(ctx, "Bitswap.Rebroadcast.active") entries := bs.wm.wl.Entries() if len(entries) > 0 { bs.connectToProviders(ctx, entries)