mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-23 11:27:42 +08:00
add in some events to bitswap to emit worker information
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
parent
f6f9cae4e4
commit
ff1bf3058e
@ -150,7 +150,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, e
|
||||
ctx, cancelFunc := context.WithCancel(parent)
|
||||
|
||||
ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
|
||||
defer log.EventBegin(ctx, "GetBlockRequest", &k).Done()
|
||||
log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
|
||||
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
|
||||
|
||||
defer func() {
|
||||
cancelFunc()
|
||||
@ -200,6 +201,10 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block
|
||||
}
|
||||
promise := bs.notifications.Subscribe(ctx, keys...)
|
||||
|
||||
for _, k := range keys {
|
||||
log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
|
||||
}
|
||||
|
||||
bs.wm.WantBlocks(keys)
|
||||
|
||||
req := &blockRequest{
|
||||
@ -310,6 +315,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
|
||||
return
|
||||
}
|
||||
|
||||
k := b.Key()
|
||||
log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
|
||||
|
||||
log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup)
|
||||
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
|
||||
if err := bs.HasBlock(hasBlockCtx, b); err != nil {
|
||||
|
||||
@ -7,7 +7,9 @@ import (
|
||||
|
||||
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
|
||||
)
|
||||
|
||||
var TaskWorkerCount = 8
|
||||
@ -36,8 +38,9 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
|
||||
|
||||
// Start up workers to handle requests from other nodes for the data on this node
|
||||
for i := 0; i < TaskWorkerCount; i++ {
|
||||
i := i
|
||||
px.Go(func(px process.Process) {
|
||||
bs.taskWorker(ctx)
|
||||
bs.taskWorker(ctx, i)
|
||||
})
|
||||
}
|
||||
|
||||
@ -55,15 +58,18 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
|
||||
// consider increasing number if providing blocks bottlenecks
|
||||
// file transfers
|
||||
for i := 0; i < provideWorkers; i++ {
|
||||
i := i
|
||||
px.Go(func(px process.Process) {
|
||||
bs.provideWorker(ctx)
|
||||
bs.provideWorker(ctx, i)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *Bitswap) taskWorker(ctx context.Context) {
|
||||
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
|
||||
idmap := eventlog.LoggableMap{"ID": id}
|
||||
defer log.Info("bitswap task worker shutting down...")
|
||||
for {
|
||||
log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
|
||||
select {
|
||||
case nextEnvelope := <-bs.engine.Outbox():
|
||||
select {
|
||||
@ -71,6 +77,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{"ID": id, "Target": envelope.Peer.Pretty(), "Block": envelope.Block.Multihash.B58String()})
|
||||
|
||||
bs.wm.SendBlock(ctx, envelope)
|
||||
case <-ctx.Done():
|
||||
@ -82,10 +89,13 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *Bitswap) provideWorker(ctx context.Context) {
|
||||
func (bs *Bitswap) provideWorker(ctx context.Context, id int) {
|
||||
idmap := eventlog.LoggableMap{"ID": id}
|
||||
for {
|
||||
log.Event(ctx, "Bitswap.ProvideWorker.Loop", idmap)
|
||||
select {
|
||||
case k, ok := <-bs.provideKeys:
|
||||
log.Event(ctx, "Bitswap.ProvideWorker.Work", idmap, &k)
|
||||
if !ok {
|
||||
log.Debug("provideKeys channel closed")
|
||||
return
|
||||
@ -139,6 +149,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) {
|
||||
defer log.Info("bitswap client worker shutting down...")
|
||||
|
||||
for {
|
||||
log.Event(parent, "Bitswap.ProviderConnector.Loop")
|
||||
select {
|
||||
case req := <-bs.findKeys:
|
||||
keys := req.keys
|
||||
@ -146,6 +157,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) {
|
||||
log.Warning("Received batch request for zero blocks")
|
||||
continue
|
||||
}
|
||||
log.Event(parent, "Bitswap.ProviderConnector.Work", eventlog.LoggableMap{"Keys": keys})
|
||||
|
||||
// NB: Optimization. Assumes that providers of key[0] are likely to
|
||||
// be able to provide for all keys. This currently holds true in most
|
||||
@ -174,6 +186,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
|
||||
defer tick.Stop()
|
||||
|
||||
for {
|
||||
log.Event(ctx, "Bitswap.Rebroadcast.idle")
|
||||
select {
|
||||
case <-tick.C:
|
||||
n := bs.wm.wl.Len()
|
||||
@ -181,6 +194,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
|
||||
log.Debug(n, "keys in bitswap wantlist")
|
||||
}
|
||||
case <-broadcastSignal.C: // resend unfulfilled wantlist keys
|
||||
log.Event(ctx, "Bitswap.Rebroadcast.active")
|
||||
entries := bs.wm.wl.Entries()
|
||||
if len(entries) > 0 {
|
||||
bs.connectToProviders(ctx, entries)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user