mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-22 10:57:42 +08:00
commit
fbbf062e2b
@ -675,6 +675,10 @@ func (n *IpfsNode) teardown() error {
|
||||
// needs to use another during its shutdown/cleanup process, it should be
|
||||
// closed before that other object
|
||||
|
||||
if n.Provider != nil {
|
||||
closers = append(closers, n.Provider)
|
||||
}
|
||||
|
||||
if n.FilesRoot != nil {
|
||||
closers = append(closers, n.FilesRoot)
|
||||
}
|
||||
|
||||
@ -14,3 +14,7 @@ func (op *offlineProvider) Run() {}
|
||||
func (op *offlineProvider) Provide(cid cid.Cid) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (op *offlineProvider) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -20,6 +20,8 @@ type Provider interface {
|
||||
Run()
|
||||
// Provide takes a cid and makes an attempt to announce it to the network
|
||||
Provide(cid.Cid) error
|
||||
// Close stops the provider
|
||||
Close() error
|
||||
}
|
||||
|
||||
type provider struct {
|
||||
@ -39,6 +41,12 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops the provider
|
||||
func (p *provider) Close() error {
|
||||
p.queue.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start workers to handle provide requests.
|
||||
func (p *provider) Run() {
|
||||
p.handleAnnouncements()
|
||||
|
||||
@ -27,6 +27,8 @@ type Queue struct {
|
||||
ds datastore.Datastore // Must be threadsafe
|
||||
dequeue chan cid.Cid
|
||||
enqueue chan cid.Cid
|
||||
close context.CancelFunc
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
// NewQueue creates a queue for cids
|
||||
@ -36,19 +38,29 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cancelCtx, cancel := context.WithCancel(ctx)
|
||||
q := &Queue{
|
||||
name: name,
|
||||
ctx: ctx,
|
||||
ctx: cancelCtx,
|
||||
head: head,
|
||||
tail: tail,
|
||||
ds: namespaced,
|
||||
dequeue: make(chan cid.Cid),
|
||||
enqueue: make(chan cid.Cid),
|
||||
close: cancel,
|
||||
closed: make(chan struct{}, 1),
|
||||
}
|
||||
q.work()
|
||||
return q, nil
|
||||
}
|
||||
|
||||
// Close stops the queue
|
||||
func (q *Queue) Close() error {
|
||||
q.close()
|
||||
<-q.closed
|
||||
return nil
|
||||
}
|
||||
|
||||
// Enqueue puts a cid in the queue
|
||||
func (q *Queue) Enqueue(cid cid.Cid) {
|
||||
select {
|
||||
@ -103,6 +115,10 @@ func (q *Queue) work() {
|
||||
var k datastore.Key = datastore.Key{}
|
||||
var c cid.Cid = cid.Undef
|
||||
|
||||
defer func() {
|
||||
close(q.closed)
|
||||
}()
|
||||
|
||||
for {
|
||||
if c == cid.Undef {
|
||||
k, c = q.nextEntry()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user