mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-25 12:27:43 +08:00
Merge pull request #476 from jbenet/epictest-race-fix
epictest race fix
This commit is contained in:
commit
7e0ea18e79
@ -94,14 +94,16 @@ func DirectAddCat(data []byte, conf Config) error {
|
||||
Bandwidth: math.MaxInt32,
|
||||
})
|
||||
|
||||
if len(mn.Peers()) < numPeers {
|
||||
peers := mn.Peers()
|
||||
if len(peers) < numPeers {
|
||||
return errors.New("test initialization error")
|
||||
}
|
||||
adder, err := makeCore(ctx, MocknetTestRepo(mn.Peers()[0], mn.Net(mn.Peers()[0]), conf))
|
||||
|
||||
adder, err := makeCore(ctx, MocknetTestRepo(peers[0], mn.Net(peers[0]), conf))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
catter, err := makeCore(ctx, MocknetTestRepo(mn.Peers()[1], mn.Net(mn.Peers()[1]), conf))
|
||||
catter, err := makeCore(ctx, MocknetTestRepo(peers[1], mn.Net(peers[1]), conf))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
|
||||
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
blockservice "github.com/jbenet/go-ipfs/blockservice"
|
||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||
@ -22,8 +23,11 @@ import (
|
||||
util "github.com/jbenet/go-ipfs/util"
|
||||
"github.com/jbenet/go-ipfs/util/datastore2"
|
||||
delay "github.com/jbenet/go-ipfs/util/delay"
|
||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("epictest")
|
||||
|
||||
// TODO merge with core.IpfsNode
|
||||
type core struct {
|
||||
repo Repo
|
||||
@ -128,6 +132,8 @@ func MocknetTestRepo(p peer.ID, n net.Network, conf Config) RepoFactory {
|
||||
const alwaysSendToPeer = true
|
||||
dsDelay := delay.Fixed(conf.BlockstoreLatency)
|
||||
ds := sync.MutexWrap(datastore2.WithDelay(datastore.NewMapDatastore(), dsDelay))
|
||||
|
||||
log.Debugf("MocknetTestRepo: %s %s %s", p, n.LocalPeer(), n)
|
||||
dhtt := dht.NewDHT(ctx, p, n, ds)
|
||||
bsn := bsnet.NewFromIpfsNetwork(n, dhtt)
|
||||
bstore, err := blockstore.WriteCached(blockstore.NewBlockstore(ds), kWriteCacheElems)
|
||||
|
||||
@ -38,18 +38,19 @@ func RunThreeLeggedCat(data []byte, conf Config) error {
|
||||
Bandwidth: math.MaxInt32,
|
||||
})
|
||||
|
||||
if len(mn.Peers()) < numPeers {
|
||||
peers := mn.Peers()
|
||||
if len(peers) < numPeers {
|
||||
return errors.New("test initialization error")
|
||||
}
|
||||
adder, err := makeCore(ctx, MocknetTestRepo(mn.Peers()[0], mn.Net(mn.Peers()[0]), conf))
|
||||
adder, err := makeCore(ctx, MocknetTestRepo(peers[0], mn.Net(peers[0]), conf))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
catter, err := makeCore(ctx, MocknetTestRepo(mn.Peers()[1], mn.Net(mn.Peers()[1]), conf))
|
||||
catter, err := makeCore(ctx, MocknetTestRepo(peers[1], mn.Net(peers[1]), conf))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bootstrap, err := makeCore(ctx, MocknetTestRepo(mn.Peers()[2], mn.Net(mn.Peers()[2]), conf))
|
||||
bootstrap, err := makeCore(ctx, MocknetTestRepo(peers[2], mn.Net(peers[2]), conf))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -26,7 +26,7 @@ type Mocknet interface {
|
||||
// ID is derived from PrivKey
|
||||
AddPeer(ic.PrivKey, ma.Multiaddr) (inet.Network, error)
|
||||
|
||||
// retrieve things
|
||||
// retrieve things (with randomized iteration order)
|
||||
Peers() []peer.ID
|
||||
Net(peer.ID) inet.Network
|
||||
Nets() []inet.Network
|
||||
|
||||
12
net/mux.go
12
net/mux.go
@ -1,7 +1,6 @@
|
||||
package net
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
@ -66,17 +65,24 @@ func (m *Mux) ReadProtocolHeader(s io.Reader) (string, StreamHandler, error) {
|
||||
case !found && m.Default != nil:
|
||||
return name, m.Default, nil
|
||||
case !found && m.Default == nil:
|
||||
return name, nil, errors.New("no handler with name: " + name)
|
||||
return name, nil, fmt.Errorf("%s no handler with name: %s (%d)", m, name, len(name))
|
||||
default:
|
||||
return name, h, nil
|
||||
}
|
||||
}
|
||||
|
||||
// String returns the muxer's printing representation
|
||||
func (m *Mux) String() string {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
return fmt.Sprintf("<Muxer %p %d>", m, len(m.Handlers))
|
||||
}
|
||||
|
||||
// SetHandler sets the protocol handler on the Network's Muxer.
|
||||
// This operation is threadsafe.
|
||||
func (m *Mux) SetHandler(p ProtocolID, h StreamHandler) {
|
||||
log.Debugf("%s setting handler for protocol: %s (%d)", m, p, len(p))
|
||||
m.Lock()
|
||||
log.Debug("setting protocol ", p)
|
||||
m.Handlers[p] = h
|
||||
m.Unlock()
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user