diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go new file mode 100644 index 000000000..284e75e13 --- /dev/null +++ b/core/commands/bitswap.go @@ -0,0 +1,96 @@ +package commands + +import ( + "bytes" + "fmt" + cmds "github.com/jbenet/go-ipfs/commands" + bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" + u "github.com/jbenet/go-ipfs/util" + "io" +) + +var BitswapCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "A set of commands to manipulate the bitswap agent", + ShortDescription: ``, + }, + Subcommands: map[string]*cmds.Command{ + "wantlist": showWantlistCmd, + "stat": bitswapStatCmd, + }, +} + +var showWantlistCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Show blocks currently on the wantlist", + ShortDescription: ` +Print out all blocks currently on the bitswap wantlist for the local peer`, + }, + Type: KeyList{}, + Run: func(req cmds.Request, res cmds.Response) { + nd, err := req.Context().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + bs, ok := nd.Exchange.(*bitswap.Bitswap) + if !ok { + res.SetError(u.ErrCast(), cmds.ErrNormal) + return + } + + res.SetOutput(&KeyList{bs.GetWantlist()}) + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: KeyListTextMarshaler, + }, +} + +var bitswapStatCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "show some diagnostic information on the bitswap agent", + ShortDescription: ``, + }, + Type: bitswap.Stat{}, + Run: func(req cmds.Request, res cmds.Response) { + nd, err := req.Context().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + bs, ok := nd.Exchange.(*bitswap.Bitswap) + if !ok { + res.SetError(u.ErrCast(), cmds.ErrNormal) + return + } + + st, err := bs.Stat() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + res.SetOutput(st) + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + out, ok := res.Output().(*bitswap.Stat) + if !ok { + return nil, u.ErrCast() + } + buf := new(bytes.Buffer) + fmt.Fprintln(buf, "bitswap status") + fmt.Fprintf(buf, "\tprovides buffer: %d / %d\n", out.ProvideBufLen, bitswap.HasBlockBufferSize) + fmt.Fprintf(buf, "\twantlist [%d keys]\n", len(out.Wantlist)) + for _, k := range out.Wantlist { + fmt.Fprintf(buf, "\t\t%s\n", k.B58String()) + } + fmt.Fprintf(buf, "\tpartners [%d]\n", len(out.Peers)) + for _, p := range out.Peers { + fmt.Fprintf(buf, "\t\t%s\n", p) + } + return buf, nil + }, + }, +} diff --git a/core/commands/root.go b/core/commands/root.go index 0ab07fecb..cb609930a 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -5,10 +5,10 @@ import ( "strings" cmds "github.com/jbenet/go-ipfs/commands" - u "github.com/jbenet/go-ipfs/util" + evlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" ) -var log = u.Logger("core/commands") +var log = evlog.Logger("core/commands") type TestOutput struct { Foo string @@ -98,6 +98,7 @@ var rootSubcommands = map[string]*cmds.Command{ "swarm": SwarmCmd, "update": UpdateCmd, "version": VersionCmd, + "bitswap": BitswapCmd, } func init() { diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 5508f66e3..3a81015be 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -40,7 +40,7 @@ const ( // kMaxPriority is the max priority as defined by the bitswap protocol kMaxPriority = math.MaxInt32 - hasBlockBufferSize = 256 + HasBlockBufferSize = 256 provideWorkers = 4 ) @@ -79,7 +79,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, px.Close() }() - bs := &bitswap{ + bs := &Bitswap{ self: p, blockstore: bstore, notifications: notif, @@ -88,7 +88,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, wantlist: wantlist.NewThreadSafe(), batchRequests: make(chan *blockRequest, sizeBatchRequestChan), process: px, - newBlocks: make(chan *blocks.Block, hasBlockBufferSize), + newBlocks: make(chan *blocks.Block, HasBlockBufferSize), } network.SetDelegate(bs) @@ -97,8 +97,8 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, return bs } -// bitswap instances implement the bitswap protocol. -type bitswap struct { +// Bitswap instances implement the bitswap protocol. +type Bitswap struct { // the ID of the peer to act on behalf of self peer.ID @@ -133,7 +133,7 @@ type blockRequest struct { // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. -func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) { +func (bs *Bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) { // Any async work initiated by this function must end when this function // returns. To ensure this, derive a new context. Note that it is okay to @@ -179,7 +179,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err // NB: Your request remains open until the context expires. To conserve // resources, provide a context with a reasonably short deadline (ie. not one // that lasts throughout the lifetime of the server) -func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) { +func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) { select { case <-bs.process.Closing(): return nil, errors.New("bitswap is closed") @@ -201,7 +201,7 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. -func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { +func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { log.Event(ctx, "hasBlock", blk) select { case <-bs.process.Closing(): @@ -221,7 +221,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { return nil } -func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { +func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { set := pset.New() wg := sync.WaitGroup{} for peerToQuery := range peers { @@ -242,7 +242,7 @@ func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMe return nil } -func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error { +func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error { message := bsmsg.New() message.SetFull(true) for _, wanted := range bs.wantlist.Entries() { @@ -251,7 +251,7 @@ func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID return bs.sendWantlistMsgToPeers(ctx, message, peers) } -func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) { +func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -286,7 +286,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli } // TODO(brian): handle errors -func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) ( +func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) ( peer.ID, bsmsg.BitSwapMessage) { defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() @@ -325,7 +325,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg } // Connected/Disconnected warns bitswap about peer connections -func (bs *bitswap) PeerConnected(p peer.ID) { +func (bs *Bitswap) PeerConnected(p peer.ID) { // TODO: add to clientWorker?? peers := make(chan peer.ID, 1) peers <- p @@ -337,11 +337,11 @@ func (bs *bitswap) PeerConnected(p peer.ID) { } // Connected/Disconnected warns bitswap about peer connections -func (bs *bitswap) PeerDisconnected(p peer.ID) { +func (bs *Bitswap) PeerDisconnected(p peer.ID) { bs.engine.PeerDisconnected(p) } -func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { +func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { if len(bkeys) < 1 { return } @@ -358,7 +358,7 @@ func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { } } -func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { +func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { if len(bkeys) < 1 { return } @@ -383,7 +383,7 @@ func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { wg.Wait() } -func (bs *bitswap) ReceiveError(err error) { +func (bs *Bitswap) ReceiveError(err error) { log.Debugf("Bitswap ReceiveError: %s", err) // TODO log the network error // TODO bubble the network error up to the parent context/error logger @@ -391,7 +391,7 @@ func (bs *bitswap) ReceiveError(err error) { // send strives to ensure that accounting is always performed when a message is // sent -func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error { +func (bs *Bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error { defer log.EventBegin(ctx, "sendMessage", p, m).Done() if err := bs.network.SendMessage(ctx, p, m); err != nil { return errors.Wrap(err) @@ -399,6 +399,14 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) return bs.engine.MessageSent(p, m) } -func (bs *bitswap) Close() error { +func (bs *Bitswap) Close() error { return bs.process.Close() } + +func (bs *Bitswap) GetWantlist() []u.Key { + var out []u.Key + for _, e := range bs.wantlist.Entries() { + out = append(out, e.Key) + } + return out +} diff --git a/exchange/bitswap/stat.go b/exchange/bitswap/stat.go new file mode 100644 index 000000000..4e37443ef --- /dev/null +++ b/exchange/bitswap/stat.go @@ -0,0 +1,25 @@ +package bitswap + +import ( + u "github.com/jbenet/go-ipfs/util" + "sort" +) + +type Stat struct { + ProvideBufLen int + Wantlist []u.Key + Peers []string +} + +func (bs *Bitswap) Stat() (*Stat, error) { + st := new(Stat) + st.ProvideBufLen = len(bs.newBlocks) + st.Wantlist = bs.GetWantlist() + + for _, p := range bs.engine.Peers() { + st.Peers = append(st.Peers, p.Pretty()) + } + sort.Strings(st.Peers) + + return st, nil +} diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 3753edb62..1b28aedb1 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -8,7 +8,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) -func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) { +func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // Start up a worker to handle block requests this node is making px.Go(func(px process.Process) { bs.clientWorker(ctx) @@ -34,7 +34,7 @@ func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) { } } -func (bs *bitswap) taskWorker(ctx context.Context) { +func (bs *Bitswap) taskWorker(ctx context.Context) { defer log.Info("bitswap task worker shutting down...") for { select { @@ -55,7 +55,7 @@ func (bs *bitswap) taskWorker(ctx context.Context) { } } -func (bs *bitswap) provideWorker(ctx context.Context) { +func (bs *Bitswap) provideWorker(ctx context.Context) { for { select { case blk, ok := <-bs.newBlocks: @@ -75,7 +75,7 @@ func (bs *bitswap) provideWorker(ctx context.Context) { } // TODO ensure only one active request per key -func (bs *bitswap) clientWorker(parent context.Context) { +func (bs *Bitswap) clientWorker(parent context.Context) { defer log.Info("bitswap client worker shutting down...") for { @@ -115,7 +115,7 @@ func (bs *bitswap) clientWorker(parent context.Context) { } } -func (bs *bitswap) rebroadcastWorker(parent context.Context) { +func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ctx, cancel := context.WithCancel(parent) defer cancel()