mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-24 11:57:44 +08:00
@whyrusleeping may wanna have a look and make sure i didn't screw
anything up here
BenchmarkInstantaneousAddCat1MB-4 200 10763761 ns/op
97.42 MB/s
BenchmarkInstantaneousAddCat2MB-4 panic: runtime error: invalid
memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0xbedd]
goroutine 14297 [running]:
github.com/jbenet/go-ipfs/exchange/bitswap/decision.(*taskQueue).Remove(0xc2087553a0,
0xc2085ef200, 0x22, 0x56f570, 0xc208367a40)
/Users/btc/go/src/github.com/jbenet/go-ipfs/exchange/bitswap/decision/taskqueue.go:66
+0x82
github.com/jbenet/go-ipfs/exchange/bitswap/decision.(*Engine).MessageSent(0xc20871b5c0,
0x56f570, 0xc208367a40, 0x570040, 0xc208753d40, 0x0, 0x0)
/Users/btc/go/src/github.com/jbenet/go-ipfs/exchange/bitswap/decision/engine.go:177
+0x29e
github.com/jbenet/go-ipfs/exchange/bitswap.(*bitswap).send(0xc20871b7a0,
0x56f4d8, 0xc208379800, 0x56f570, 0xc208367a40,
0x570040, 0xc208753d40, 0x0, 0x0)
/Users/btc/go/src/github.com/jbenet/go-ipfs/exchange/bitswap/bitswap.go:352
+0x11c
github.com/jbenet/go-ipfs/exchange/bitswap.(*bitswap).taskWorker(0xc20871b7a0,
0x56f4d8, 0xc208379800)
/Users/btc/go/src/github.com/jbenet/go-ipfs/exchange/bitswap/bitswap.go:238
+0x165
created by
github.com/jbenet/go-ipfs/exchange/bitswap.New
/Users/btc/go/src/github.com/jbenet/go-ipfs/exchange/bitswap/bitswap.go:66
+0x49e
85 lines
2.0 KiB
Go
85 lines
2.0 KiB
Go
package decision
|
|
|
|
import (
|
|
"sync"
|
|
|
|
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
|
peer "github.com/jbenet/go-ipfs/peer"
|
|
u "github.com/jbenet/go-ipfs/util"
|
|
)
|
|
|
|
// TODO: at some point, the strategy needs to plug in here
|
|
// to help decide how to sort tasks (on add) and how to select
|
|
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
|
|
type taskQueue struct {
|
|
// TODO: make this into a priority queue
|
|
lock sync.Mutex
|
|
tasks []*task
|
|
taskmap map[string]*task
|
|
}
|
|
|
|
func newTaskQueue() *taskQueue {
|
|
return &taskQueue{
|
|
taskmap: make(map[string]*task),
|
|
}
|
|
}
|
|
|
|
type task struct {
|
|
Entry wantlist.Entry
|
|
Target peer.Peer
|
|
Trash bool
|
|
}
|
|
|
|
// Push currently adds a new task to the end of the list
|
|
func (tl *taskQueue) Push(entry wantlist.Entry, to peer.Peer) {
|
|
tl.lock.Lock()
|
|
defer tl.lock.Unlock()
|
|
if task, ok := tl.taskmap[taskKey(to, entry.Key)]; ok {
|
|
// TODO: when priority queue is implemented,
|
|
// rearrange this task
|
|
task.Entry.Priority = entry.Priority
|
|
return
|
|
}
|
|
task := &task{
|
|
Entry: entry,
|
|
Target: to,
|
|
}
|
|
tl.tasks = append(tl.tasks, task)
|
|
tl.taskmap[taskKey(to, entry.Key)] = task
|
|
}
|
|
|
|
// Pop 'pops' the next task to be performed. Returns nil no task exists.
|
|
func (tl *taskQueue) Pop() *task {
|
|
tl.lock.Lock()
|
|
defer tl.lock.Unlock()
|
|
var out *task
|
|
for len(tl.tasks) > 0 {
|
|
// TODO: instead of zero, use exponential distribution
|
|
// it will help reduce the chance of receiving
|
|
// the same block from multiple peers
|
|
out = tl.tasks[0]
|
|
tl.tasks = tl.tasks[1:]
|
|
delete(tl.taskmap, taskKey(out.Target, out.Entry.Key))
|
|
if out.Trash {
|
|
continue // discarding tasks that have been removed
|
|
}
|
|
break // and return |out|
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Remove lazily removes a task from the queue
|
|
func (tl *taskQueue) Remove(k u.Key, p peer.Peer) {
|
|
tl.lock.Lock()
|
|
t, ok := tl.taskmap[taskKey(p, k)]
|
|
if ok {
|
|
t.Trash = true
|
|
}
|
|
tl.lock.Unlock()
|
|
}
|
|
|
|
// taskKey returns a key that uniquely identifies a task.
|
|
func taskKey(p peer.Peer, k u.Key) string {
|
|
return string(p.Key() + k)
|
|
}
|