mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-06 16:58:11 +08:00
clarify synhronization constructs
This commit is contained in:
parent
c273a3bd4f
commit
b71a0aced0
@ -14,23 +14,17 @@ import (
|
||||
)
|
||||
|
||||
type WantManager struct {
|
||||
receiver bsnet.Receiver
|
||||
|
||||
incoming chan []*bsmsg.Entry
|
||||
|
||||
// notification channel for new peers connecting
|
||||
connect chan peer.ID
|
||||
|
||||
// notification channel for peers disconnecting
|
||||
disconnect chan peer.ID
|
||||
// sync channels for Run loop
|
||||
incoming chan []*bsmsg.Entry
|
||||
connect chan peer.ID // notification channel for new peers connecting
|
||||
disconnect chan peer.ID // notification channel for peers disconnecting
|
||||
|
||||
// synchronized by Run loop, only touch inside there
|
||||
peers map[peer.ID]*msgQueue
|
||||
|
||||
wl *wantlist.Wantlist
|
||||
wl *wantlist.Wantlist
|
||||
|
||||
network bsnet.BitSwapNetwork
|
||||
|
||||
ctx context.Context
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
|
||||
@ -58,8 +52,9 @@ type cancellation struct {
|
||||
type msgQueue struct {
|
||||
p peer.ID
|
||||
|
||||
outlk sync.Mutex
|
||||
out bsmsg.BitSwapMessage
|
||||
outlk sync.Mutex
|
||||
out bsmsg.BitSwapMessage
|
||||
network bsnet.BitSwapNetwork
|
||||
|
||||
work chan struct{}
|
||||
done chan struct{}
|
||||
@ -112,7 +107,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
|
||||
return nil
|
||||
}
|
||||
|
||||
mq := newMsgQueue(p)
|
||||
mq := pm.newMsgQueue(p)
|
||||
|
||||
// new peer, we will want to give them our full wantlist
|
||||
fullwantlist := bsmsg.New(true)
|
||||
@ -123,7 +118,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
|
||||
mq.work <- struct{}{}
|
||||
|
||||
pm.peers[p] = mq
|
||||
go pm.runQueue(mq)
|
||||
go mq.runQueue(pm.ctx)
|
||||
return mq
|
||||
}
|
||||
|
||||
@ -138,12 +133,12 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) {
|
||||
delete(pm.peers, p)
|
||||
}
|
||||
|
||||
func (pm *WantManager) runQueue(mq *msgQueue) {
|
||||
func (mq *msgQueue) runQueue(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-mq.work: // there is work to be done
|
||||
|
||||
err := pm.network.ConnectTo(pm.ctx, mq.p)
|
||||
err := mq.network.ConnectTo(ctx, mq.p)
|
||||
if err != nil {
|
||||
log.Errorf("cant connect to peer %s: %s", mq.p, err)
|
||||
// TODO: cant connect, what now?
|
||||
@ -161,7 +156,7 @@ func (pm *WantManager) runQueue(mq *msgQueue) {
|
||||
mq.outlk.Unlock()
|
||||
|
||||
// send wantlist updates
|
||||
err = pm.network.SendMessage(pm.ctx, mq.p, wlm)
|
||||
err = mq.network.SendMessage(ctx, mq.p, wlm)
|
||||
if err != nil {
|
||||
log.Error("bitswap send error: ", err)
|
||||
// TODO: what do we do if this fails?
|
||||
@ -224,10 +219,11 @@ func (pm *WantManager) Run() {
|
||||
}
|
||||
}
|
||||
|
||||
func newMsgQueue(p peer.ID) *msgQueue {
|
||||
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
|
||||
mq := new(msgQueue)
|
||||
mq.done = make(chan struct{})
|
||||
mq.work = make(chan struct{}, 1)
|
||||
mq.network = wm.network
|
||||
mq.p = p
|
||||
|
||||
return mq
|
||||
|
||||
Loading…
Reference in New Issue
Block a user