mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-25 20:37:53 +08:00
a little error handling and some work on providers
This commit is contained in:
parent
248e06f759
commit
3a76ef0478
@ -35,6 +35,7 @@ type IpfsDHT struct {
|
||||
// Map keys to peers that can provide their value
|
||||
// TODO: implement a TTL on each of these keys
|
||||
providers map[u.Key][]*peer.Peer
|
||||
providerLock sync.RWMutex
|
||||
|
||||
// map of channels waiting for reply messages
|
||||
listeners map[uint64]chan *swarm.Message
|
||||
@ -46,6 +47,9 @@ type IpfsDHT struct {
|
||||
|
||||
// Create a new DHT object with the given peer as the 'local' host
|
||||
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
|
||||
if p == nil {
|
||||
panic("Tried to create new dht with nil peer")
|
||||
}
|
||||
network := swarm.NewSwarm(p)
|
||||
err := network.Listen()
|
||||
if err != nil {
|
||||
@ -68,24 +72,27 @@ func (dht *IpfsDHT) Start() {
|
||||
}
|
||||
|
||||
// Connect to a new peer at the given address
|
||||
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error {
|
||||
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
|
||||
if addr == nil {
|
||||
panic("addr was nil!")
|
||||
}
|
||||
peer := new(peer.Peer)
|
||||
peer.AddAddress(addr)
|
||||
|
||||
conn,err := swarm.Dial("tcp", peer)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dht.network.StartConn(conn)
|
||||
|
||||
dht.routes.Update(peer)
|
||||
return nil
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
// Read in all messages from swarm and handle them appropriately
|
||||
@ -195,6 +202,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
|
||||
// ?????
|
||||
}
|
||||
|
||||
// This is just a quick hack, formalize method of sending addrs later
|
||||
var addrs []string
|
||||
for _,prov := range providers {
|
||||
ma := prov.NetAddress("tcp")
|
||||
|
||||
@ -3,9 +3,12 @@ package dht
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
"encoding/json"
|
||||
|
||||
proto "code.google.com/p/goprotobuf/proto"
|
||||
|
||||
ma "github.com/jbenet/go-multiaddr"
|
||||
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
swarm "github.com/jbenet/go-ipfs/swarm"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
@ -125,7 +128,32 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
panic("Not yet implemented.")
|
||||
var addrs map[string]string
|
||||
err := json.Unmarshal(pmes_out.GetValue(), &addrs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for key,addr := range addrs {
|
||||
p := s.network.Find(u.Key(key))
|
||||
if p == nil {
|
||||
maddr,err := ma.NewMultiaddr(addr)
|
||||
if err != nil {
|
||||
u.PErr("error connecting to new peer: %s", err)
|
||||
continue
|
||||
}
|
||||
p, err := s.Connect(maddr)
|
||||
if err != nil {
|
||||
u.PErr("error connecting to new peer: %s", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
s.providerLock.Lock()
|
||||
prov_arr := s.providers[key]
|
||||
s.providers[key] = append(prov_arr, p)
|
||||
s.providerLock.Unlock()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -290,3 +290,6 @@ Loop:
|
||||
delete(s.conns, conn.Peer.Key())
|
||||
s.connsLock.Unlock()
|
||||
}
|
||||
|
||||
func (s *Swarm) Find(addr *ma.Multiaddr) {
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user