diff --git a/net/interface.go b/net/interface.go index f41c4ab9f..90f5ade65 100644 --- a/net/interface.go +++ b/net/interface.go @@ -11,10 +11,12 @@ import ( ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) +type ProtocolID string + const ( - ProtocolBitswap = "/ipfs/bitswap" - ProtocolDHT = "/ipfs/dht" - ProtocolDiag = "/ipfs/diagnostics" + ProtocolBitswap ProtocolID = "/ipfs/bitswap" + ProtocolDHT ProtocolID = "/ipfs/dht" + ProtocolDiag ProtocolID = "/ipfs/diagnostics" ) // Stream represents a bidirectional channel between two agents in @@ -34,7 +36,7 @@ type Stream interface { // incoming streams must implement. type StreamHandler func(Stream) -type StreamHandlerMap map[string]StreamHandler +type StreamHandlerMap map[ProtocolID]StreamHandler // Conn is a connection to a remote peer. It multiplexes streams. // Usually there is no need to use a Conn directly, but it may @@ -47,28 +49,6 @@ type Conn interface { NewStream(p peer.Peer) (Stream, error) } -// Mux provides simple stream multixplexing. -// It helps you precisely when: -// * You have many streams -// * You have function handlers -// -// It contains the handlers for each protocol accepted. -// It dispatches handlers for streams opened by remote peers. -// -// We use a totally ad-hoc encoding: -// <1 byte length in bytes> -// So "bitswap" is 0x0762697473776170 -// -// NOTE: only the dialer specifies this muxing line. -// This is because we're using Streams :) -// -// WARNING: this datastructure IS NOT threadsafe. -// do not modify it once the network is using it. -type Mux struct { - Default StreamHandler // handles unknown protocols. - Handlers StreamHandlerMap -} - // Network is the interface IPFS uses for connecting to the world. // It dials and listens for connections. it uses a Swarm to pool // connnections (see swarm pkg, and peerstream.Swarm). Connections @@ -77,6 +57,10 @@ type Network interface { Dialer io.Closer + // SetHandler sets the protocol handler on the Network's Muxer. + // This operation is threadsafe. + SetHandler(ProtocolID, StreamHandler) + // NewStream returns a new stream to given peer p. // If there is no connection to p, attempts to create one. NewStream(p peer.Peer) (Stream, error) diff --git a/net/mux.go b/net/mux.go index 38f704d65..6a12d4437 100644 --- a/net/mux.go +++ b/net/mux.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "sync" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" eventlog "github.com/jbenet/go-ipfs/util/eventlog" @@ -12,6 +13,30 @@ import ( var log = eventlog.Logger("mux2") +// Mux provides simple stream multixplexing. +// It helps you precisely when: +// * You have many streams +// * You have function handlers +// +// It contains the handlers for each protocol accepted. +// It dispatches handlers for streams opened by remote peers. +// +// We use a totally ad-hoc encoding: +// <1 byte length in bytes> +// So "bitswap" is 0x0762697473776170 +// +// NOTE: only the dialer specifies this muxing line. +// This is because we're using Streams :) +// +// WARNING: this datastructure IS NOT threadsafe. +// do not modify it once the network is using it. +type Mux struct { + Default StreamHandler // handles unknown protocols. + Handlers StreamHandlerMap + + sync.RWMutex +} + // NextName reads the stream and returns the next protocol name // according to the muxer encoding. func (m *Mux) NextName(s io.Reader) (string, error) { @@ -41,16 +66,26 @@ func (m *Mux) NextHandler(s io.Reader) (string, StreamHandler, error) { return "", nil, err } - h, found := m.Handlers[name] - if !found { - if m.Default == nil { - return name, nil, errors.New("no handler with name: " + name) - } + m.RLock() + h, found := m.Handlers[ProtocolID(name)] + m.RUnlock() + switch { + case !found && m.Default != nil: return name, m.Default, nil + case !found && m.Default == nil: + return name, nil, errors.New("no handler with name: " + name) + default: + return name, h, nil } +} - return name, h, nil +// SetHandler sets the protocol handler on the Network's Muxer. +// This operation is threadsafe. +func (m *Mux) SetHandler(p ProtocolID, h StreamHandler) { + m.Lock() + m.Handlers[p] = h + m.Unlock() } // Handle reads the next name off the Stream, and calls a function diff --git a/net/net.go b/net/net.go index 1d61c5d11..01ff3c0cb 100644 --- a/net/net.go +++ b/net/net.go @@ -81,9 +81,9 @@ type network struct { cg ctxgroup.ContextGroup // for Context closing } -// NewConn is the structure that implements the network interface -func NewConn(ctx context.Context, listen []ma.Multiaddr, local peer.Peer, - peers peer.Peerstore, m Mux) (*network, error) { +// NewNetwork constructs a new network and starts listening on given addresses. +func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.Peer, + peers peer.Peerstore) (*network, error) { s, err := swarm.NewSwarm(ctx, listen, local, peers) if err != nil { @@ -93,12 +93,12 @@ func NewConn(ctx context.Context, listen []ma.Multiaddr, local peer.Peer, n := &network{ local: local, swarm: s, - mux: m, + mux: Mux{}, cg: ctxgroup.WithContext(ctx), } s.SetStreamHandler(func(s *swarm.Stream) { - m.Handle((*stream)(s)) + n.mux.Handle((*stream)(s)) }) n.cg.AddChildGroup(s.CtxGroup()) @@ -153,3 +153,9 @@ func (n *network) Connectedness(p peer.Peer) Connectedness { } return NotConnected } + +// SetHandler sets the protocol handler on the Network's Muxer. +// This operation is threadsafe. +func (n *network) SetHandler(p ProtocolID, h StreamHandler) { + n.mux.SetHandler(p, h) +}