net: threadsafe mux handler add

This commit is contained in:
Juan Batiz-Benet 2014-12-16 06:20:38 -08:00
parent f18bbde344
commit c150668a1c
3 changed files with 62 additions and 37 deletions

View File

@ -11,10 +11,12 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
type ProtocolID string
const (
ProtocolBitswap = "/ipfs/bitswap"
ProtocolDHT = "/ipfs/dht"
ProtocolDiag = "/ipfs/diagnostics"
ProtocolBitswap ProtocolID = "/ipfs/bitswap"
ProtocolDHT ProtocolID = "/ipfs/dht"
ProtocolDiag ProtocolID = "/ipfs/diagnostics"
)
// Stream represents a bidirectional channel between two agents in
@ -34,7 +36,7 @@ type Stream interface {
// incoming streams must implement.
type StreamHandler func(Stream)
type StreamHandlerMap map[string]StreamHandler
type StreamHandlerMap map[ProtocolID]StreamHandler
// Conn is a connection to a remote peer. It multiplexes streams.
// Usually there is no need to use a Conn directly, but it may
@ -47,28 +49,6 @@ type Conn interface {
NewStream(p peer.Peer) (Stream, error)
}
// Mux provides simple stream multixplexing.
// It helps you precisely when:
// * You have many streams
// * You have function handlers
//
// It contains the handlers for each protocol accepted.
// It dispatches handlers for streams opened by remote peers.
//
// We use a totally ad-hoc encoding:
// <1 byte length in bytes><string name>
// So "bitswap" is 0x0762697473776170
//
// NOTE: only the dialer specifies this muxing line.
// This is because we're using Streams :)
//
// WARNING: this datastructure IS NOT threadsafe.
// do not modify it once the network is using it.
type Mux struct {
Default StreamHandler // handles unknown protocols.
Handlers StreamHandlerMap
}
// Network is the interface IPFS uses for connecting to the world.
// It dials and listens for connections. it uses a Swarm to pool
// connnections (see swarm pkg, and peerstream.Swarm). Connections
@ -77,6 +57,10 @@ type Network interface {
Dialer
io.Closer
// SetHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
SetHandler(ProtocolID, StreamHandler)
// NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one.
NewStream(p peer.Peer) (Stream, error)

View File

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
@ -12,6 +13,30 @@ import (
var log = eventlog.Logger("mux2")
// Mux provides simple stream multixplexing.
// It helps you precisely when:
// * You have many streams
// * You have function handlers
//
// It contains the handlers for each protocol accepted.
// It dispatches handlers for streams opened by remote peers.
//
// We use a totally ad-hoc encoding:
// <1 byte length in bytes><string name>
// So "bitswap" is 0x0762697473776170
//
// NOTE: only the dialer specifies this muxing line.
// This is because we're using Streams :)
//
// WARNING: this datastructure IS NOT threadsafe.
// do not modify it once the network is using it.
type Mux struct {
Default StreamHandler // handles unknown protocols.
Handlers StreamHandlerMap
sync.RWMutex
}
// NextName reads the stream and returns the next protocol name
// according to the muxer encoding.
func (m *Mux) NextName(s io.Reader) (string, error) {
@ -41,16 +66,26 @@ func (m *Mux) NextHandler(s io.Reader) (string, StreamHandler, error) {
return "", nil, err
}
h, found := m.Handlers[name]
if !found {
if m.Default == nil {
return name, nil, errors.New("no handler with name: " + name)
}
m.RLock()
h, found := m.Handlers[ProtocolID(name)]
m.RUnlock()
switch {
case !found && m.Default != nil:
return name, m.Default, nil
case !found && m.Default == nil:
return name, nil, errors.New("no handler with name: " + name)
default:
return name, h, nil
}
}
return name, h, nil
// SetHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
func (m *Mux) SetHandler(p ProtocolID, h StreamHandler) {
m.Lock()
m.Handlers[p] = h
m.Unlock()
}
// Handle reads the next name off the Stream, and calls a function

View File

@ -81,9 +81,9 @@ type network struct {
cg ctxgroup.ContextGroup // for Context closing
}
// NewConn is the structure that implements the network interface
func NewConn(ctx context.Context, listen []ma.Multiaddr, local peer.Peer,
peers peer.Peerstore, m Mux) (*network, error) {
// NewNetwork constructs a new network and starts listening on given addresses.
func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.Peer,
peers peer.Peerstore) (*network, error) {
s, err := swarm.NewSwarm(ctx, listen, local, peers)
if err != nil {
@ -93,12 +93,12 @@ func NewConn(ctx context.Context, listen []ma.Multiaddr, local peer.Peer,
n := &network{
local: local,
swarm: s,
mux: m,
mux: Mux{},
cg: ctxgroup.WithContext(ctx),
}
s.SetStreamHandler(func(s *swarm.Stream) {
m.Handle((*stream)(s))
n.mux.Handle((*stream)(s))
})
n.cg.AddChildGroup(s.CtxGroup())
@ -153,3 +153,9 @@ func (n *network) Connectedness(p peer.Peer) Connectedness {
}
return NotConnected
}
// SetHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
func (n *network) SetHandler(p ProtocolID, h StreamHandler) {
n.mux.SetHandler(p, h)
}