Change Process interface into object variable

License: MIT
Signed-off-by: rht <rhtbot@gmail.com>
This commit is contained in:
rht 2015-06-18 17:17:38 +07:00
parent 330b213777
commit 007a12e7ef
14 changed files with 110 additions and 64 deletions

View File

@ -106,7 +106,8 @@ type IpfsNode struct {
IpnsFs *ipnsfs.Filesystem
goprocess.Process
proc goprocess.Process
ctx context.Context
mode mode
}
@ -121,22 +122,23 @@ type Mounts struct {
type ConfigOption func(ctx context.Context) (*IpfsNode, error)
func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) {
procctx := goprocessctx.WithContext(parent)
ctx := parent
success := false // flip to true after all sub-system inits succeed
defer func() {
if !success {
procctx.Close()
}
}()
func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) {
node, err := option(ctx)
if err != nil {
return nil, err
}
node.Process = procctx
ctxg.SetTeardown(node.teardown)
proc := goprocessctx.WithContext(ctx)
proc.SetTeardown(node.teardown)
node.proc = proc
node.ctx = ctx
success := false // flip to true after all sub-system inits succeed
defer func() {
if !success {
proc.Close()
}
}()
// Need to make sure it's perfectly clear 1) which variables are expected
// to be initialized at this point, and 2) which variables will be
@ -346,6 +348,21 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
return nil
}
// Process returns the Process object
func (n *IpfsNode) Process() goprocess.Process {
return n.proc
}
// Close calls Close() on the Process object
func (n *IpfsNode) Close() error {
return n.proc.Close()
}
// Context returns the IpfsNode context
func (n *IpfsNode) Context() context.Context {
return n.ctx
}
// teardown closes owned children. If any errors occur, this function returns
// the first error.
func (n *IpfsNode) teardown() error {
@ -372,7 +389,7 @@ func (n *IpfsNode) teardown() error {
}
if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
closers = append(closers, dht)
closers = append(closers, dht.Process())
}
if n.PeerHost != nil {

View File

@ -12,6 +12,7 @@ import (
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
core "github.com/ipfs/go-ipfs/core"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
)
@ -78,20 +79,17 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
var serverError error
serverExited := make(chan struct{})
node.Children().Add(1)
defer node.Children().Done()
go func() {
node.Process().Go(func(p goprocess.Process) {
serverError = http.Serve(lis, handler)
close(serverExited)
}()
})
// wait for server to exit.
select {
case <-serverExited:
// if node being closed before server exits, close server
case <-node.Closing():
case <-node.Process().Closing():
log.Infof("server at %s terminating...", addr)
lis.Close()

View File

@ -10,9 +10,9 @@ import (
func Cat(n *core.IpfsNode, pstr string) (io.Reader, error) {
p := path.FromString(pstr)
dagNode, err := n.Resolver.ResolvePath(n.ContextGroup.Context(), p)
dagNode, err := n.Resolver.ResolvePath(n.Context(), p)
if err != nil {
return nil, err
}
return uio.NewDagReader(n.ContextGroup.Context(), dagNode, n.DAG)
return uio.NewDagReader(n.Context(), dagNode, n.DAG)
}

View File

@ -3,7 +3,6 @@ package coremock
import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/ipfs/go-ipfs/blocks/blockstore"
blockservice "github.com/ipfs/go-ipfs/blockservice"
@ -16,6 +15,7 @@ import (
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config"
offrt "github.com/ipfs/go-ipfs/routing/offline"
ds2 "github.com/ipfs/go-ipfs/util/datastore2"
testutil "github.com/ipfs/go-ipfs/util/testutil"
@ -28,31 +28,36 @@ import (
// NewMockNode constructs an IpfsNode for use in tests.
func NewMockNode() (*core.IpfsNode, error) {
ctx := context.TODO()
nd := new(core.IpfsNode)
// Generate Identity
ident, err := testutil.RandIdentity()
if err != nil {
return nil, err
}
p := ident.ID()
nd.Identity = p
nd.PrivateKey = ident.PrivateKey()
nd.Peerstore = peer.NewPeerstore()
nd.Peerstore.AddPrivKey(p, ident.PrivateKey())
nd.Peerstore.AddPubKey(p, ident.PublicKey())
nd.Process = goprocessctx.WithContext(ctx)
nd.PeerHost, err = mocknet.New(ctx).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline
c := config.Config{
Identity: config.Identity{
PeerID: p.String(),
},
}
nd, err := core.Offline(&repo.Mock{
C: c,
D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())),
})(ctx)
if err != nil {
return nil, err
}
// Temp Datastore
nd.Repo = &repo.Mock{
// TODO C: conf,
D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())),
nd.PrivateKey = ident.PrivateKey()
nd.Peerstore = peer.NewPeerstore()
nd.Peerstore.AddPrivKey(p, ident.PrivateKey())
nd.Peerstore.AddPubKey(p, ident.PublicKey())
nd.PeerHost, err = mocknet.New(ctx).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline
if err != nil {
return nil, err
}
// Routing

View File

@ -18,5 +18,5 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) {
return nil, err
}
return mount.NewMount(ipfs, fsys, ipnsmp, allow_other)
return mount.NewMount(ipfs.Process(), fsys, ipnsmp, allow_other)
}

View File

@ -13,5 +13,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) {
cfg := ipfs.Repo.Config()
allow_other := cfg.Mounts.FuseAllowOther
fsys := NewFileSystem(ipfs)
return mount.NewMount(ipfs, fsys, mountpoint, allow_other)
return mount.NewMount(ipfs.Process(), fsys, mountpoint, allow_other)
}

View File

@ -33,6 +33,7 @@ type mocknet struct {
linkDefaults LinkOptions
proc goprocess.Process // for Context closing
ctx context.Context
sync.RWMutex
}
@ -42,6 +43,7 @@ func New(ctx context.Context) Mocknet {
hosts: map[peer.ID]*bhost.BasicHost{},
links: map[peer.ID]map[peer.ID]map[*link]struct{}{},
proc: goprocessctx.WithContext(ctx),
ctx: ctx,
}
}
@ -62,7 +64,7 @@ func (mn *mocknet) GenPeer() (host.Host, error) {
}
func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
n, err := newPeernet(mn.cg.Context(), mn, k, a)
n, err := newPeernet(mn.ctx, mn, k, a)
if err != nil {
return nil, err
}
@ -70,7 +72,7 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
h := bhost.New(n)
log.Debugf("mocknet added listen addr for peer: %s -- %s", n.LocalPeer(), a)
mn.cg.AddChild(n.cg)
mn.proc.AddChild(n.proc)
mn.Lock()
mn.nets[n.peer] = n
@ -298,11 +300,11 @@ func (mn *mocknet) ConnectAll() error {
}
func (mn *mocknet) ConnectPeers(a, b peer.ID) (inet.Conn, error) {
return mn.Net(a).DialPeer(mn.cg.Context(), b)
return mn.Net(a).DialPeer(mn.ctx, b)
}
func (mn *mocknet) ConnectNets(a, b inet.Network) (inet.Conn, error) {
return a.DialPeer(mn.cg.Context(), b.LocalPeer())
return a.DialPeer(mn.ctx, b.LocalPeer())
}
func (mn *mocknet) DisconnectPeers(p1, p2 peer.ID) error {

View File

@ -66,7 +66,7 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey,
notifs: make(map[inet.Notifiee]struct{}),
}
n.cg.SetTeardown(n.teardown)
n.proc.SetTeardown(n.teardown)
return n, nil
}
@ -94,7 +94,7 @@ func (pn *peernet) allConns() []*conn {
// Close calls the ContextCloser func
func (pn *peernet) Close() error {
return pn.cg.Close()
return pn.proc.Close()
}
func (pn *peernet) Peerstore() peer.Peerstore {

View File

@ -65,6 +65,7 @@ type Swarm struct {
Filters *filter.Filters
proc goprocess.Process
ctx context.Context
bwc metrics.Reporter
}
@ -82,6 +83,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
local: local,
peers: peers,
proc: goprocessctx.WithContext(ctx),
ctx: ctx,
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
bwc: bwc,
@ -137,6 +139,11 @@ func (s *Swarm) Process() goprocess.Process {
return s.proc
}
// Context returns the context of the swarm
func (s *Swarm) Context() context.Context {
return s.ctx
}
// Close stops the Swarm.
func (s *Swarm) Close() error {
return s.proc.Close()

View File

@ -64,7 +64,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
}
log.Debugf("Swarm Listening at %s", maddr)
list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk)
list, err := conn.Listen(s.Context(), maddr, s.local, sk)
if err != nil {
return err
}
@ -112,7 +112,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
return
}
}
}(s.cg.Context(), sl)
}(s.Context(), sl)
return nil
}

View File

@ -58,8 +58,8 @@ type IpfsDHT struct {
Validator record.Validator // record validator funcs
Context context.Context
goprocess.Process
ctx context.Context
proc goprocess.Process
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
@ -73,18 +73,18 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
procctx = goprocessctx.WithContext(ctx)
procctx.SetTeardown(func() error {
proc := goprocessctx.WithContext(ctx)
proc.SetTeardown(func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
dht.Process = procctx
dht.Context = ctx
dht.proc = proc
dht.ctx = ctx
h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
dht.providers = NewProviderManager(dht.Context, dht.self)
dht.AddChild(dht.providers)
dht.providers = NewProviderManager(dht.ctx, dht.self)
dht.proc.AddChild(dht.providers.proc)
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
dht.birth = time.Now()
@ -93,7 +93,9 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
dht.Validator["pk"] = record.PublicKeyValidator
if doPinging {
dht.Go(func() { dht.PingRoutine(time.Second * 10) })
dht.proc.Go(func(p goprocess.Process) {
dht.PingRoutine(time.Second * 10)
})
}
return dht
}
@ -360,15 +362,30 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
rand.Read(id)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.Key(id)), 5)
for _, p := range peers {
ctx, cancel := context.WithTimeout(dht.Context, time.Second*5)
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*5)
_, err := dht.Ping(ctx, p)
if err != nil {
log.Debugf("Ping error: %s", err)
}
cancel()
}
case <-dht.Closing():
case <-dht.proc.Closing():
return
}
}
}
// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
}
// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
return dht.proc
}
// Close calls Process Close
func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}

View File

@ -16,7 +16,7 @@ func (nn *netNotifiee) DHT() *IpfsDHT {
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
case <-dht.Closing():
case <-dht.Process().Closing():
return
default:
}
@ -26,7 +26,7 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
case <-dht.Closing():
case <-dht.Process().Closing():
return
default:
}

View File

@ -22,7 +22,7 @@ type ProviderManager struct {
newprovs chan *addProv
getprovs chan *getProv
period time.Duration
goprocess.Process
proc goprocess.Process
}
type providerSet struct {
@ -47,8 +47,8 @@ func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
pm.providers = make(map[key.Key]*providerSet)
pm.getlocal = make(chan chan []key.Key)
pm.local = make(map[key.Key]struct{})
pm.Process = goprocessctx.WithContext(ctx)
pm.Go(pm.run)
pm.proc = goprocessctx.WithContext(ctx)
pm.proc.Go(func(p goprocess.Process) { pm.run() })
return pm
}
@ -97,7 +97,7 @@ func (pm *ProviderManager) run() {
provs.providers = filtered
}
case <-pm.Closing():
case <-pm.proc.Closing():
return
}
}

View File

@ -19,5 +19,5 @@ func TestProviderManager(t *testing.T) {
if len(resp) != 1 {
t.Fatal("Could not retrieve provider.")
}
p.Close()
p.proc.Close()
}