mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-03 23:38:07 +08:00
starting on dht-- msg handler
This commit is contained in:
parent
1461feec3f
commit
9c5c49b690
@ -3,18 +3,20 @@ package dht
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/net"
|
||||
msg "github.com/jbenet/go-ipfs/net/message"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
)
|
||||
@ -28,7 +30,9 @@ type IpfsDHT struct {
|
||||
// NOTE: (currently, only a single table is used)
|
||||
routingTables []*kb.RoutingTable
|
||||
|
||||
// the network interface. service
|
||||
network inet.Network
|
||||
sender inet.Sender
|
||||
|
||||
// Local peer (yourself)
|
||||
self *peer.Peer
|
||||
@ -50,12 +54,13 @@ type IpfsDHT struct {
|
||||
}
|
||||
|
||||
// NewDHT creates a new DHT object with the given peer as the 'local' host
|
||||
func NewDHT(p *peer.Peer, net swarm.Network, dstore ds.Datastore) *IpfsDHT {
|
||||
func NewDHT(p *peer.Peer, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
|
||||
dht := new(IpfsDHT)
|
||||
dht.network = net
|
||||
dht.netChan = net.GetChannel(swarm.PBWrapper_DHT_MESSAGE)
|
||||
dht.sender = sender
|
||||
dht.datastore = dstore
|
||||
dht.self = p
|
||||
|
||||
dht.providers = NewProviderManager(p.ID)
|
||||
dht.shutdown = make(chan struct{})
|
||||
|
||||
@ -63,21 +68,32 @@ func NewDHT(p *peer.Peer, net swarm.Network, dstore ds.Datastore) *IpfsDHT {
|
||||
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
|
||||
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
|
||||
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
|
||||
dht.listener = swarm.NewMessageListener()
|
||||
dht.birth = time.Now()
|
||||
return dht
|
||||
}
|
||||
|
||||
// Start up background goroutines needed by the DHT
|
||||
func (dht *IpfsDHT) Start() {
|
||||
go dht.handleMessages()
|
||||
panic("the service is already started. rmv this method")
|
||||
}
|
||||
|
||||
// Connect to a new peer at the given address, ping and add to the routing table
|
||||
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
|
||||
maddrstr, _ := addr.String()
|
||||
u.DOut("Connect to new peer: %s\n", maddrstr)
|
||||
npeer, err := dht.network.ConnectNew(addr)
|
||||
|
||||
// TODO(jbenet,whyrusleeping)
|
||||
//
|
||||
// Connect should take in a Peer (with ID). In a sense, we shouldn't be
|
||||
// allowing connections to random multiaddrs without knowing who we're
|
||||
// speaking to (i.e. peer.ID). In terms of moving around simple addresses
|
||||
// -- instead of an (ID, Addr) pair -- we can use:
|
||||
//
|
||||
// /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
|
||||
//
|
||||
npeer := &peer.Peer{}
|
||||
npeer.AddAddress(addr)
|
||||
err := dht.network.DialPeer(npeer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -94,63 +110,77 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
|
||||
return npeer, nil
|
||||
}
|
||||
|
||||
// Read in all messages from swarm and handle them appropriately
|
||||
// NOTE: this function is just a quick sketch
|
||||
func (dht *IpfsDHT) handleMessages() {
|
||||
u.DOut("Begin message handling routine\n")
|
||||
// HandleMessage implements the inet.Handler interface.
|
||||
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {
|
||||
|
||||
errs := dht.network.GetErrChan()
|
||||
for {
|
||||
select {
|
||||
case mes, ok := <-dht.netChan.Incoming:
|
||||
if !ok {
|
||||
u.DOut("handleMessages closing, bad recv on incoming\n")
|
||||
return
|
||||
}
|
||||
pmes := new(PBDHTMessage)
|
||||
err := proto.Unmarshal(mes.Data, pmes)
|
||||
if err != nil {
|
||||
u.PErr("Failed to decode protobuf message: %s\n", err)
|
||||
continue
|
||||
}
|
||||
mData := mes.Data()
|
||||
if mData == nil {
|
||||
return nil, errors.New("message did not include Data")
|
||||
}
|
||||
|
||||
dht.Update(mes.Peer)
|
||||
mPeer := mes.Peer()
|
||||
if mPeer == nil {
|
||||
return nil, errors.New("message did not include a Peer")
|
||||
}
|
||||
|
||||
// Note: not sure if this is the correct place for this
|
||||
if pmes.GetResponse() {
|
||||
dht.listener.Respond(pmes.GetId(), mes)
|
||||
continue
|
||||
}
|
||||
//
|
||||
// deserialize msg
|
||||
pmes := new(Message)
|
||||
err := proto.Unmarshal(mData, pmes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
|
||||
}
|
||||
|
||||
u.DOut("[peer: %s]\nGot message type: '%s' [id = %x, from = %s]\n",
|
||||
dht.self.ID.Pretty(),
|
||||
PBDHTMessage_MessageType_name[int32(pmes.GetType())],
|
||||
pmes.GetId(), mes.Peer.ID.Pretty())
|
||||
switch pmes.GetType() {
|
||||
case PBDHTMessage_GET_VALUE:
|
||||
go dht.handleGetValue(mes.Peer, pmes)
|
||||
case PBDHTMessage_PUT_VALUE:
|
||||
go dht.handlePutValue(mes.Peer, pmes)
|
||||
case PBDHTMessage_FIND_NODE:
|
||||
go dht.handleFindPeer(mes.Peer, pmes)
|
||||
case PBDHTMessage_ADD_PROVIDER:
|
||||
go dht.handleAddProvider(mes.Peer, pmes)
|
||||
case PBDHTMessage_GET_PROVIDERS:
|
||||
go dht.handleGetProviders(mes.Peer, pmes)
|
||||
case PBDHTMessage_PING:
|
||||
go dht.handlePing(mes.Peer, pmes)
|
||||
case PBDHTMessage_DIAGNOSTIC:
|
||||
go dht.handleDiagnostic(mes.Peer, pmes)
|
||||
default:
|
||||
u.PErr("Recieved invalid message type")
|
||||
}
|
||||
// update the peer (on valid msgs only)
|
||||
dht.Update(mPeer)
|
||||
|
||||
case err := <-errs:
|
||||
u.PErr("dht err: %s\n", err)
|
||||
case <-dht.shutdown:
|
||||
return
|
||||
}
|
||||
// Print out diagnostic
|
||||
u.DOut("[peer: %s]\nGot message type: '%s' [from = %s]\n",
|
||||
dht.self.ID.Pretty(),
|
||||
Message_MessageType_name[int32(pmes.GetType())], mPeer.ID.Pretty())
|
||||
|
||||
// get handler for this msg type.
|
||||
var resp *Message
|
||||
handler := dht.handlerForMsgType(pmes.GetType())
|
||||
if handler == nil {
|
||||
return nil, errors.New("Recieved invalid message type")
|
||||
}
|
||||
|
||||
// dispatch handler.
|
||||
rpmes, err := handler(mPeer, pmes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// serialize response msg
|
||||
rmes, err := msg.FromObject(mPeer, rpmes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
|
||||
}
|
||||
|
||||
return rmes, nil
|
||||
}
|
||||
|
||||
// dhthandler specifies the signature of functions that handle DHT messages.
|
||||
type dhtHandler func(*peer.Peer, *Message) (*Message, error)
|
||||
|
||||
func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
|
||||
switch t {
|
||||
case Message_GET_VALUE:
|
||||
return dht.handleGetValue
|
||||
// case Message_PUT_VALUE:
|
||||
// return dht.handlePutValue
|
||||
// case Message_FIND_NODE:
|
||||
// return dht.handleFindPeer
|
||||
// case Message_ADD_PROVIDER:
|
||||
// return dht.handleAddProvider
|
||||
// case Message_GET_PROVIDERS:
|
||||
// return dht.handleGetProviders
|
||||
// case Message_PING:
|
||||
// return dht.handlePing
|
||||
// case Message_DIAGNOSTIC:
|
||||
// return dht.handleDiagnostic
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user