msgio pooling first hack

This commit is contained in:
Jeromy 2014-10-27 20:23:14 +00:00
parent ffc59fff1b
commit 07733b17b3
8 changed files with 96 additions and 25 deletions

View File

@ -2,6 +2,7 @@ package msgio
import (
"io"
"sync"
)
type Chan struct {
@ -9,13 +10,15 @@ type Chan struct {
MsgChan chan []byte
ErrChan chan error
CloseChan chan bool
BufPool *sync.Pool
}
func NewChan(chanSize int) *Chan {
func NewChan(chanSize int, pool *sync.Pool) *Chan {
return &Chan{
MsgChan: make(chan []byte, chanSize),
ErrChan: make(chan error, 1),
CloseChan: make(chan bool, 2),
BufPool: pool,
}
}
@ -25,7 +28,11 @@ func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) {
mr := NewReader(r)
Loop:
for {
buf := make([]byte, maxMsgLen)
bufi := s.BufPool.Get()
buf, ok := bufi.([]byte)
if !ok {
panic("Got invalid type from sync pool!")
}
l, err := mr.ReadMsg(buf)
if err != nil {
if err == io.EOF {

View File

@ -5,6 +5,7 @@ import (
randbuf "github.com/jbenet/go-randbuf"
"io"
"math/rand"
"sync"
"testing"
"time"
)
@ -12,7 +13,8 @@ import (
func TestReadChan(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
rchan := NewChan(10)
p := &sync.Pool{New: func() interface{} { return make([]byte, 1000) }}
rchan := NewChan(10, p)
msgs := [1000][]byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -58,7 +60,7 @@ Loop:
func TestWriteChan(t *testing.T) {
buf := bytes.NewBuffer(nil)
reader := NewReader(buf)
wchan := NewChan(10)
wchan := NewChan(10, nil)
msgs := [1000][]byte{}
go wchan.WriteTo(buf)

View File

@ -184,10 +184,41 @@ func (s *SecurePipe) handshake() error {
}
cmp := bytes.Compare(myPubKey, proposeResp.GetPubkey())
mIV, tIV, mCKey, tCKey, mMKey, tMKey := ci.KeyStretcher(cmp, cipherType, hashType, secret)
//mIV, tIV, mCKey, tCKey, mMKey, tMKey := ci.KeyStretcher(cmp, cipherType, hashType, secret)
ci.KeyStretcher(cmp, cipherType, hashType, secret)
go s.handleSecureIn(hashType, cipherType, tIV, tCKey, tMKey)
go s.handleSecureOut(hashType, cipherType, mIV, mCKey, mMKey)
//go s.handleSecureIn(hashType, cipherType, tIV, tCKey, tMKey)
//go s.handleSecureOut(hashType, cipherType, mIV, mCKey, mMKey)
// Disable Secure Channel
go func(sp *SecurePipe) {
for {
select {
case <-sp.ctx.Done():
return
case m, ok := <-sp.insecure.In:
if !ok {
sp.cancel()
return
}
sp.In <- m
}
}
}(s)
go func(sp *SecurePipe) {
for {
select {
case <-sp.ctx.Done():
return
case m, ok := <-sp.Out:
if !ok {
sp.cancel()
return
}
sp.insecure.Out <- m
}
}
}(s)
finished := []byte("Finished")

View File

@ -241,3 +241,21 @@ func (n *dagService) Remove(nd *Node) error {
}
return n.Blocks.DeleteBlock(k)
}
func FetchGraph(ctx context.Context, root *Node, serv *DAGService) {
for _, l := range root.Links {
go func(lnk *Link) {
select {
case <-ctx.Done():
return
}
nd, err := lnk.GetNode(serv)
if err != nil {
log.Error(err)
return
}
FetchGraph(ctx, nd, serv)
}(l)
}
}

View File

@ -2,6 +2,7 @@ package conn
import (
"fmt"
"sync"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
@ -27,16 +28,31 @@ const (
HandshakeTimeout = time.Second * 5
)
var BufferPool *sync.Pool
func init() {
BufferPool = new(sync.Pool)
BufferPool.New = func() interface{} {
log.Warning("Pool returning new object")
return make([]byte, MaxMessageSize)
}
}
func ReleaseBuffer(b []byte) {
log.Warningf("Releasing buffer! (size = %d)", cap(b))
BufferPool.Put(b[:cap(b)])
}
// msgioPipe is a pipe using msgio channels.
type msgioPipe struct {
outgoing *msgio.Chan
incoming *msgio.Chan
}
func newMsgioPipe(size int) *msgioPipe {
func newMsgioPipe(size int, pool *sync.Pool) *msgioPipe {
return &msgioPipe{
outgoing: msgio.NewChan(10),
incoming: msgio.NewChan(10),
outgoing: msgio.NewChan(size, nil),
incoming: msgio.NewChan(size, pool),
}
}
@ -58,7 +74,7 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer,
local: local,
remote: remote,
maconn: maconn,
msgio: newMsgioPipe(10),
msgio: newMsgioPipe(10, BufferPool),
}
conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close)

View File

@ -1,9 +1,10 @@
package mux
tpackage mux
import (
"errors"
"sync"
conn "github.com/jbenet/go-ipfs/net/conn"
msg "github.com/jbenet/go-ipfs/net/message"
pb "github.com/jbenet/go-ipfs/net/mux/internal/pb"
u "github.com/jbenet/go-ipfs/util"
@ -130,6 +131,7 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) {
log.Errorf("muxer de-serializing error: %v", err)
return
}
conn.ReleaseBuffer(m1.Data())
m2 := msg.New(m1.Peer(), data)
proto, found := m.Protocols[pid]

View File

@ -13,7 +13,7 @@ It has these top-level messages:
*/
package dht_pb
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import proto "code.google.com/p/gogoprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.

View File

@ -9,7 +9,6 @@ import (
mdag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
u "github.com/jbenet/go-ipfs/util"
)
var ErrIsDir = errors.New("this dag node is a directory")
@ -36,11 +35,12 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
// Dont allow reading directories
return nil, ErrIsDir
case ftpb.Data_File:
return &DagReader{
dr := &DagReader{
node: n,
serv: serv,
buf: bytes.NewBuffer(pb.GetData()),
}, nil
}
return dr, nil
case ftpb.Data_Raw:
// Raw block will just be a single level, return a byte buffer
return bytes.NewBuffer(pb.GetData()), nil
@ -55,17 +55,12 @@ func (dr *DagReader) precalcNextBuf() error {
if dr.position >= len(dr.node.Links) {
return io.EOF
}
nxtLink := dr.node.Links[dr.position]
nxt := nxtLink.Node
if nxt == nil {
nxtNode, err := dr.serv.Get(u.Key(nxtLink.Hash))
if err != nil {
return err
}
nxt = nxtNode
nxt, err := dr.node.Links[dr.position].GetNode(dr.serv)
if err != nil {
return err
}
pb := new(ftpb.Data)
err := proto.Unmarshal(nxt.Data, pb)
err = proto.Unmarshal(nxt.Data, pb)
if err != nil {
return err
}