From 8d98d4b48dd4d47cf7a4c0aca9e9c192db64617a Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 31 Jul 2014 21:55:44 -0700 Subject: [PATCH] making connections between nodes get closer to working --- identify/identify.go | 14 ++++++-------- routing/dht/dht.go | 32 +++++++++++++++++++++++++------- swarm/swarm.go | 40 +++++++++++++++++++++++----------------- util/util.go | 4 ++-- 4 files changed, 56 insertions(+), 34 deletions(-) diff --git a/identify/identify.go b/identify/identify.go index 87a8ec60d..5d12b3cab 100644 --- a/identify/identify.go +++ b/identify/identify.go @@ -4,12 +4,12 @@ package identify import ( peer "github.com/jbenet/go-ipfs/peer" - swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" ) // Perform initial communication with this peer to share node ID's and // initiate communication -func Handshake(self *peer.Peer, conn *swarm.Conn) error { +func Handshake(self, remote *peer.Peer, in, out chan []byte) error { // temporary: // put your own id in a 16byte buffer and send that over to @@ -17,12 +17,10 @@ func Handshake(self *peer.Peer, conn *swarm.Conn) error { // Once that trade is finished, the handshake is complete and // both sides should 'trust' each other - id := make([]byte, 16) - copy(id, self.ID) - - conn.Outgoing.MsgChan <- id - resp := <-conn.Incoming.MsgChan - conn.Peer.ID = peer.ID(resp) + out <- self.ID + resp := <-in + remote.ID = peer.ID(resp) + u.DOut("Got node id: %s", string(remote.ID)) return nil } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 62d4a71cf..9d6d1223a 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -55,6 +55,10 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) { return dht, nil } +func (dht *IpfsDHT) Start() { + go dht.handleMessages() +} + // Connect to a new peer at the given address func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error { peer := new(peer.Peer) @@ -65,24 +69,26 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error { return err } - err = identify.Handshake(dht.self, conn) + err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) if err != nil { return err } - dht.network.StartConn(conn.Peer.Key(), conn) + dht.network.StartConn(conn) // TODO: Add this peer to our routing table return nil } - // Read in all messages from swarm and handle them appropriately // NOTE: this function is just a quick sketch func (dht *IpfsDHT) handleMessages() { + u.DOut("Being message handling routine") for { select { case mes := <-dht.network.Chan.Incoming: + u.DOut("recieved message from swarm.") + pmes := new(DHTMessage) err := proto.Unmarshal(mes.Data, pmes) if err != nil { @@ -118,6 +124,8 @@ func (dht *IpfsDHT) handleMessages() { dht.handleFindNode(mes.Peer, pmes) } + case err := <-dht.network.Chan.Errors: + panic(err) case <-dht.shutdown: return } @@ -158,10 +166,6 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) { } } -func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) { - panic("Not implemented.") -} - func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) { isResponse := true resp := new(DHTMessage) @@ -172,6 +176,18 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) { dht.network.Chan.Outgoing <-swarm.NewMessage(p, []byte(resp.String())) } +func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) { + panic("Not implemented.") +} + +func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { + panic("Not implemented.") +} + +func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) { + panic("Not implemented.") +} + // Register a handler for a specific message ID, used for getting replies // to certain messages (i.e. response to a GET_VALUE message) @@ -202,6 +218,8 @@ func (dht *IpfsDHT) Halt() { // Ping a node, log the time it took func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) { // Thoughts: maybe this should accept an ID and do a peer lookup? + u.DOut("Enter Ping.") + id := GenerateMessageID() mes_type := DHTMessage_PING pmes := new(DHTMessage) diff --git a/swarm/swarm.go b/swarm/swarm.go index 8f6676190..de238768b 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -8,6 +8,7 @@ import ( peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ma "github.com/jbenet/go-multiaddr" + ident "github.com/jbenet/go-ipfs/identify" ) // Message represents a packet of information sent to or received from a @@ -109,27 +110,21 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error { // Handle getting ID from this peer and adding it into the map func (s *Swarm) handleNewConn(nconn net.Conn) { - p := MakePeerFromConn(nconn) - - var addr *ma.Multiaddr - - //naddr := nconn.RemoteAddr() - //addr := ma.FromDialArgs(naddr.Network(), naddr.String()) + p := new(peer.Peer) conn := &Conn{ Peer: p, - Addr: addr, + Addr: nil, Conn: nconn, } - newConnChans(conn) - go s.fanIn(conn) -} -// Negotiate with peer for its ID and create a peer object -// TODO: this might belong in the peer package -func MakePeerFromConn(conn net.Conn) *peer.Peer { - panic("Not yet implemented.") + err := ident.Handshake(s.local, p, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) + if err != nil { + panic(err) + } + + s.StartConn(conn) } // Close closes a swarm. @@ -170,14 +165,19 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) { return nil, err } - s.StartConn(k, conn) + s.StartConn(conn) return conn, nil } -func (s *Swarm) StartConn(k u.Key, conn *Conn) { +func (s *Swarm) StartConn(conn *Conn) { + if conn == nil { + panic("tried to start nil Conn!") + } + + u.DOut("Starting connection: %s", string(conn.Peer.ID)) // add to conns s.connsLock.Lock() - s.conns[k] = conn + s.conns[conn.Peer.Key()] = conn s.connsLock.Unlock() // kick off reader goroutine @@ -191,6 +191,7 @@ func (s *Swarm) fanOut() { case <-s.Chan.Close: return // told to close. case msg, ok := <-s.Chan.Outgoing: + u.DOut("fanOut: outgoing message for: '%s'", msg.Peer.Key()) if !ok { return } @@ -198,14 +199,17 @@ func (s *Swarm) fanOut() { s.connsLock.RLock() conn, found := s.conns[msg.Peer.Key()] s.connsLock.RUnlock() + if !found { e := fmt.Errorf("Sent msg to peer without open conn: %v", msg.Peer) s.Chan.Errors <- e + continue } // queue it in the connection's buffer conn.Outgoing.MsgChan <- msg.Data + u.DOut("fanOut: message off.") } } } @@ -225,6 +229,7 @@ Loop: break Loop case data, ok := <-conn.Incoming.MsgChan: + u.DOut("fanIn: got message from incoming channel.") if !ok { e := fmt.Errorf("Error retrieving from conn: %v", conn) s.Chan.Errors <- e @@ -234,6 +239,7 @@ Loop: // wrap it for consumers. msg := &Message{Peer: conn.Peer, Data: data} s.Chan.Incoming <- msg + u.DOut("fanIn: message off.") } } diff --git a/util/util.go b/util/util.go index 69831ff8d..d54be111e 100644 --- a/util/util.go +++ b/util/util.go @@ -41,12 +41,12 @@ func TildeExpansion(filename string) (string, error) { // PErr is a shorthand printing function to output to Stderr. func PErr(format string, a ...interface{}) { - fmt.Fprintf(os.Stderr, format, a...) + fmt.Fprintf(os.Stderr, format + "\n", a...) } // POut is a shorthand printing function to output to Stdout. func POut(format string, a ...interface{}) { - fmt.Fprintf(os.Stdout, format, a...) + fmt.Fprintf(os.Stdout, format + "\n", a...) } // DErr is a shorthand debug printing function to output to Stderr.