From 07733b17b3fa5be48a89d497ca7649fef7ef3d73 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 27 Oct 2014 20:23:14 +0000 Subject: [PATCH] msgio pooling first hack --- .../src/github.com/jbenet/go-msgio/chan.go | 11 +++++- .../github.com/jbenet/go-msgio/chan_test.go | 6 ++- crypto/spipe/handshake.go | 37 +++++++++++++++++-- merkledag/merkledag.go | 18 +++++++++ net/conn/conn.go | 24 ++++++++++-- net/mux/mux.go | 4 +- routing/dht/pb/dht.pb.go | 2 +- unixfs/io/dagreader.go | 19 ++++------ 8 files changed, 96 insertions(+), 25 deletions(-) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go index 4d5af5b8c..84ccbe355 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go @@ -2,6 +2,7 @@ package msgio import ( "io" + "sync" ) type Chan struct { @@ -9,13 +10,15 @@ type Chan struct { MsgChan chan []byte ErrChan chan error CloseChan chan bool + BufPool *sync.Pool } -func NewChan(chanSize int) *Chan { +func NewChan(chanSize int, pool *sync.Pool) *Chan { return &Chan{ MsgChan: make(chan []byte, chanSize), ErrChan: make(chan error, 1), CloseChan: make(chan bool, 2), + BufPool: pool, } } @@ -25,7 +28,11 @@ func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) { mr := NewReader(r) Loop: for { - buf := make([]byte, maxMsgLen) + bufi := s.BufPool.Get() + buf, ok := bufi.([]byte) + if !ok { + panic("Got invalid type from sync pool!") + } l, err := mr.ReadMsg(buf) if err != nil { if err == io.EOF { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan_test.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan_test.go index 043baa2bb..5ca431198 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan_test.go @@ -5,6 +5,7 @@ import ( randbuf "github.com/jbenet/go-randbuf" "io" "math/rand" + "sync" "testing" "time" ) @@ -12,7 +13,8 @@ import ( func TestReadChan(t *testing.T) { buf := bytes.NewBuffer(nil) writer := NewWriter(buf) - rchan := NewChan(10) + p := &sync.Pool{New: func() interface{} { return make([]byte, 1000) }} + rchan := NewChan(10, p) msgs := [1000][]byte{} r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -58,7 +60,7 @@ Loop: func TestWriteChan(t *testing.T) { buf := bytes.NewBuffer(nil) reader := NewReader(buf) - wchan := NewChan(10) + wchan := NewChan(10, nil) msgs := [1000][]byte{} go wchan.WriteTo(buf) diff --git a/crypto/spipe/handshake.go b/crypto/spipe/handshake.go index b3796248c..a5f647afd 100644 --- a/crypto/spipe/handshake.go +++ b/crypto/spipe/handshake.go @@ -184,10 +184,41 @@ func (s *SecurePipe) handshake() error { } cmp := bytes.Compare(myPubKey, proposeResp.GetPubkey()) - mIV, tIV, mCKey, tCKey, mMKey, tMKey := ci.KeyStretcher(cmp, cipherType, hashType, secret) + //mIV, tIV, mCKey, tCKey, mMKey, tMKey := ci.KeyStretcher(cmp, cipherType, hashType, secret) + ci.KeyStretcher(cmp, cipherType, hashType, secret) - go s.handleSecureIn(hashType, cipherType, tIV, tCKey, tMKey) - go s.handleSecureOut(hashType, cipherType, mIV, mCKey, mMKey) + //go s.handleSecureIn(hashType, cipherType, tIV, tCKey, tMKey) + //go s.handleSecureOut(hashType, cipherType, mIV, mCKey, mMKey) + + // Disable Secure Channel + go func(sp *SecurePipe) { + for { + select { + case <-sp.ctx.Done(): + return + case m, ok := <-sp.insecure.In: + if !ok { + sp.cancel() + return + } + sp.In <- m + } + } + }(s) + go func(sp *SecurePipe) { + for { + select { + case <-sp.ctx.Done(): + return + case m, ok := <-sp.Out: + if !ok { + sp.cancel() + return + } + sp.insecure.Out <- m + } + } + }(s) finished := []byte("Finished") diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 3134899bd..19e145254 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -241,3 +241,21 @@ func (n *dagService) Remove(nd *Node) error { } return n.Blocks.DeleteBlock(k) } + +func FetchGraph(ctx context.Context, root *Node, serv *DAGService) { + for _, l := range root.Links { + go func(lnk *Link) { + select { + case <-ctx.Done(): + return + } + + nd, err := lnk.GetNode(serv) + if err != nil { + log.Error(err) + return + } + FetchGraph(ctx, nd, serv) + }(l) + } +} diff --git a/net/conn/conn.go b/net/conn/conn.go index eb262c9fa..537f30870 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -2,6 +2,7 @@ package conn import ( "fmt" + "sync" "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -27,16 +28,31 @@ const ( HandshakeTimeout = time.Second * 5 ) +var BufferPool *sync.Pool + +func init() { + BufferPool = new(sync.Pool) + BufferPool.New = func() interface{} { + log.Warning("Pool returning new object") + return make([]byte, MaxMessageSize) + } +} + +func ReleaseBuffer(b []byte) { + log.Warningf("Releasing buffer! (size = %d)", cap(b)) + BufferPool.Put(b[:cap(b)]) +} + // msgioPipe is a pipe using msgio channels. type msgioPipe struct { outgoing *msgio.Chan incoming *msgio.Chan } -func newMsgioPipe(size int) *msgioPipe { +func newMsgioPipe(size int, pool *sync.Pool) *msgioPipe { return &msgioPipe{ - outgoing: msgio.NewChan(10), - incoming: msgio.NewChan(10), + outgoing: msgio.NewChan(size, nil), + incoming: msgio.NewChan(size, pool), } } @@ -58,7 +74,7 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer, local: local, remote: remote, maconn: maconn, - msgio: newMsgioPipe(10), + msgio: newMsgioPipe(10, BufferPool), } conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close) diff --git a/net/mux/mux.go b/net/mux/mux.go index a8865bb73..cd5f7f876 100644 --- a/net/mux/mux.go +++ b/net/mux/mux.go @@ -1,9 +1,10 @@ -package mux +tpackage mux import ( "errors" "sync" + conn "github.com/jbenet/go-ipfs/net/conn" msg "github.com/jbenet/go-ipfs/net/message" pb "github.com/jbenet/go-ipfs/net/mux/internal/pb" u "github.com/jbenet/go-ipfs/util" @@ -130,6 +131,7 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) { log.Errorf("muxer de-serializing error: %v", err) return } + conn.ReleaseBuffer(m1.Data()) m2 := msg.New(m1.Peer(), data) proto, found := m.Protocols[pid] diff --git a/routing/dht/pb/dht.pb.go b/routing/dht/pb/dht.pb.go index 6c488c51a..22c87bac9 100644 --- a/routing/dht/pb/dht.pb.go +++ b/routing/dht/pb/dht.pb.go @@ -13,7 +13,7 @@ It has these top-level messages: */ package dht_pb -import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" +import proto "code.google.com/p/gogoprotobuf/proto" import math "math" // Reference imports to suppress errors if they are not otherwise used. diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 17ad87371..a2dbeb2f2 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -9,7 +9,6 @@ import ( mdag "github.com/jbenet/go-ipfs/merkledag" ft "github.com/jbenet/go-ipfs/unixfs" ftpb "github.com/jbenet/go-ipfs/unixfs/pb" - u "github.com/jbenet/go-ipfs/util" ) var ErrIsDir = errors.New("this dag node is a directory") @@ -36,11 +35,12 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) { // Dont allow reading directories return nil, ErrIsDir case ftpb.Data_File: - return &DagReader{ + dr := &DagReader{ node: n, serv: serv, buf: bytes.NewBuffer(pb.GetData()), - }, nil + } + return dr, nil case ftpb.Data_Raw: // Raw block will just be a single level, return a byte buffer return bytes.NewBuffer(pb.GetData()), nil @@ -55,17 +55,12 @@ func (dr *DagReader) precalcNextBuf() error { if dr.position >= len(dr.node.Links) { return io.EOF } - nxtLink := dr.node.Links[dr.position] - nxt := nxtLink.Node - if nxt == nil { - nxtNode, err := dr.serv.Get(u.Key(nxtLink.Hash)) - if err != nil { - return err - } - nxt = nxtNode + nxt, err := dr.node.Links[dr.position].GetNode(dr.serv) + if err != nil { + return err } pb := new(ftpb.Data) - err := proto.Unmarshal(nxt.Data, pb) + err = proto.Unmarshal(nxt.Data, pb) if err != nil { return err }