diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 8ba0b5c50..1d55d19dd 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -92,6 +92,10 @@ "ImportPath": "github.com/jbenet/go-base58", "Rev": "568a28d73fd97651d3442392036a658b6976eed5" }, + { + "ImportPath": "github.com/jbenet/go-ctxgroup", + "Rev": "6b9437e8517175306e30ffec241c523752a38303" + }, { "ImportPath": "github.com/jbenet/go-datastore", "Rev": "6a1c83bda2a71a9bdc936749fdb507df958ed949" @@ -126,6 +130,10 @@ "ImportPath": "github.com/jbenet/go-random", "Rev": "2e83344e7dc7898f94501665af34edd4aa95a013" }, + { + "ImportPath": "github.com/jbenet/go-router", + "Rev": "7a4053217b7bfe3a14cc79541557d47d0c4ad85f" + }, { "ImportPath": "github.com/kr/binarydist", "Rev": "9955b0ab8708602d411341e55fffd7e0700f86bd" diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Godeps.json new file mode 100644 index 000000000..0ab169bc8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Godeps.json @@ -0,0 +1,14 @@ +{ + "ImportPath": "github.com/jbenet/go-ctxgroup", + "GoVersion": "go1.3", + "Packages": [ + "./..." + ], + "Deps": [ + { + "ImportPath": "code.google.com/p/go.net/context", + "Comment": "null-144", + "Rev": "ad01a6fcc8a19d3a4478c836895ffe883bd2ceab" + } + ] +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Readme b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Readme new file mode 100644 index 000000000..4cdaa53d5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Godeps/Readme @@ -0,0 +1,5 @@ +This directory tree is generated automatically by godep. + +Please do not edit. + +See https://github.com/tools/godep for more information. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Makefile b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Makefile new file mode 100644 index 000000000..6b988cf64 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/Makefile @@ -0,0 +1,16 @@ +all: + # no-op + +GODEP=$(which godep) + +godep: ${GODEP} + +${GODEP}: + echo ${GODEP} + go get github.com/tools/godep + +# saves/vendors third-party dependencies to Godeps/_workspace +# -r flag rewrites import paths to use the vendored path +# ./... performs operation on all packages in tree +vendor: godep + godep save -r ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/README.md b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/README.md new file mode 100644 index 000000000..ee5f3af55 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/README.md @@ -0,0 +1,35 @@ +# ContextGroup + + +- Godoc: https://godoc.org/github.com/jbenet/go-ctxgroup + +ContextGroup is an interface for services able to be opened and closed. +It has a parent Context, and Children. But ContextGroup is not a proper +"tree" like the Context tree. It is more like a Context-WaitGroup hybrid. +It models a main object with a few children objects -- and, unlike the +context -- concerns itself with the parent-child closing semantics: + +- Can define an optional TeardownFunc (func() error) to be run at Closetime. +- Children call Children().Add(1) to be waited upon +- Children can select on <-Closing() to know when they should shut down. +- Close() will wait until all children call Children().Done() +- <-Closed() signals when the service is completely closed. + +ContextGroup can be embedded into the main object itself. In that case, +the teardownFunc (if a member function) has to be set after the struct +is intialized: + +```Go +type service struct { + ContextGroup + net.Conn +} +func (s *service) close() error { + return s.Conn.Close() +} +func newService(ctx context.Context, c net.Conn) *service { + s := &service{c} + s.ContextGroup = NewContextGroup(ctx, s.close) + return s +} +``` diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup.go b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup.go new file mode 100644 index 000000000..631a8919f --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup.go @@ -0,0 +1,265 @@ +// package ctxgroup provides the ContextGroup, a hybrid between the +// context.Context and sync.WaitGroup, which models process trees. +package ctxgroup + +import ( + "sync" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +// TeardownFunc is a function used to cleanup state at the end of the +// lifecycle of a process. +type TeardownFunc func() error + +// ChildFunc is a function to register as a child. It will be automatically +// tracked. +type ChildFunc func(parent ContextGroup) + +var nilTeardownFunc = func() error { return nil } + +// ContextGroup is an interface for services able to be opened and closed. +// It has a parent Context, and Children. But ContextGroup is not a proper +// "tree" like the Context tree. It is more like a Context-WaitGroup hybrid. +// It models a main object with a few children objects -- and, unlike the +// context -- concerns itself with the parent-child closing semantics: +// +// - Can define an optional TeardownFunc (func() error) to be run at Close time. +// - Children call Children().Add(1) to be waited upon +// - Children can select on <-Closing() to know when they should shut down. +// - Close() will wait until all children call Children().Done() +// - <-Closed() signals when the service is completely closed. +// +// ContextGroup can be embedded into the main object itself. In that case, +// the teardownFunc (if a member function) has to be set after the struct +// is intialized: +// +// type service struct { +// ContextGroup +// net.Conn +// } +// +// func (s *service) close() error { +// return s.Conn.Close() +// } +// +// func newService(ctx context.Context, c net.Conn) *service { +// s := &service{c} +// s.ContextGroup = NewContextGroup(ctx, s.close) +// return s +// } +// +type ContextGroup interface { + + // Context is the context of this ContextGroup. It is "sort of" a parent. + Context() context.Context + + // SetTeardown assigns the teardown function. + // It is called exactly _once_ when the ContextGroup is Closed. + SetTeardown(tf TeardownFunc) + + // Children is a sync.Waitgroup for all children goroutines that should + // shut down completely before this service is said to be "closed". + // Follows the semantics of WaitGroup: + // + // Children().Add(1) // add one more dependent child + // Children().Done() // child signals it is done + // + // WARNING: this is deprecated and will go away soon. + Children() *sync.WaitGroup + + // AddChildGroup registers a dependent ContextGroup child. The child will + // be closed when this parent is closed, and waited upon to finish. It is + // the functional equivalent of the following: + // + // parent.Children().Add(1) // add one more dependent child + // go func(parent, child ContextGroup) { + // <-parent.Closing() // wait until parent is closing + // child.Close() // signal child to close + // parent.Children().Done() // child signals it is done + // }(a, b) + // + AddChildGroup(c ContextGroup) + + // AddChildFunc registers a dependent ChildFund. The child will receive + // its parent ContextGroup, and can wait on its signals. Child references + // tracked automatically. It equivalent to the following: + // + // go func(parent, child ContextGroup) { + // + // <-parent.Closing() // wait until parent is closing + // child.Close() // signal child to close + // parent.Children().Done() // child signals it is done + // }(a, b) + // + AddChildFunc(c ChildFunc) + + // Close is a method to call when you wish to stop this ContextGroup + Close() error + + // Closing is a signal to wait upon, like Context.Done(). + // It fires when the object should be closing (but hasn't yet fully closed). + // The primary use case is for child goroutines who need to know when + // they should shut down. (equivalent to Context().Done()) + Closing() <-chan struct{} + + // Closed is a method to wait upon, like Context.Done(). + // It fires when the entire object is fully closed. + // The primary use case is for external listeners who need to know when + // this object is completly done, and all its children closed. + Closed() <-chan struct{} +} + +// contextGroup is a Closer with a cancellable context +type contextGroup struct { + ctx context.Context + cancel context.CancelFunc + + // called to run the teardown logic. + teardownFunc TeardownFunc + + // closed is released once the close function is done. + closed chan struct{} + + // wait group for child goroutines + children sync.WaitGroup + + // sync primitive to ensure the close logic is only called once. + closeOnce sync.Once + + // error to return to clients of Close(). + closeErr error +} + +// newContextGroup constructs and returns a ContextGroup. It will call +// cf TeardownFunc before its Done() Wait signals fire. +func newContextGroup(ctx context.Context, cf TeardownFunc) ContextGroup { + ctx, cancel := context.WithCancel(ctx) + c := &contextGroup{ + ctx: ctx, + cancel: cancel, + closed: make(chan struct{}), + } + c.SetTeardown(cf) + + c.Children().Add(1) // initialize with 1. calling Close will decrement it. + go c.closeOnContextDone() + return c +} + +// SetTeardown assigns the teardown function. +func (c *contextGroup) SetTeardown(cf TeardownFunc) { + if cf == nil { + cf = nilTeardownFunc + } + c.teardownFunc = cf +} + +func (c *contextGroup) Context() context.Context { + return c.ctx +} + +func (c *contextGroup) Children() *sync.WaitGroup { + return &c.children +} + +func (c *contextGroup) AddChildGroup(child ContextGroup) { + c.children.Add(1) + go func(parent, child ContextGroup) { + <-parent.Closing() // wait until parent is closing + child.Close() // signal child to close + parent.Children().Done() // child signals it is done + }(c, child) +} + +func (c *contextGroup) AddChildFunc(child ChildFunc) { + c.children.Add(1) + go func(parent ContextGroup, child ChildFunc) { + child(parent) + parent.Children().Done() // child signals it is done + }(c, child) +} + +// Close is the external close function. it's a wrapper around internalClose +// that waits on Closed() +func (c *contextGroup) Close() error { + c.internalClose() + <-c.Closed() // wait until we're totally done. + return c.closeErr +} + +func (c *contextGroup) Closing() <-chan struct{} { + return c.Context().Done() +} + +func (c *contextGroup) Closed() <-chan struct{} { + return c.closed +} + +func (c *contextGroup) internalClose() { + go c.closeOnce.Do(c.closeLogic) +} + +// the _actual_ close process. +func (c *contextGroup) closeLogic() { + // this function should only be called once (hence the sync.Once). + // and it will panic at the bottom (on close(c.closed)) otherwise. + + c.cancel() // signal that we're shutting down (Closing) + c.closeErr = c.teardownFunc() // actually run the close logic + c.children.Wait() // wait till all children are done. + close(c.closed) // signal that we're shut down (Closed) +} + +// if parent context is shut down before we call Close explicitly, +// we need to go through the Close motions anyway. Hence all the sync +// stuff all over the place... +func (c *contextGroup) closeOnContextDone() { + <-c.Context().Done() // wait until parent (context) is done. + c.internalClose() + c.Children().Done() +} + +// WithTeardown constructs and returns a ContextGroup with +// cf TeardownFunc (and context.Background) +func WithTeardown(cf TeardownFunc) ContextGroup { + if cf == nil { + panic("nil TeardownFunc") + } + return newContextGroup(context.Background(), cf) +} + +// WithContext constructs and returns a ContextGroup with given context +func WithContext(ctx context.Context) ContextGroup { + if ctx == nil { + panic("nil Context") + } + return newContextGroup(ctx, nil) +} + +// WithContextAndTeardown constructs and returns a ContextGroup with +// cf TeardownFunc (and context.Background) +func WithContextAndTeardown(ctx context.Context, cf TeardownFunc) ContextGroup { + if ctx == nil { + panic("nil Context") + } + if cf == nil { + panic("nil TeardownFunc") + } + return newContextGroup(ctx, cf) +} + +// WithParent constructs and returns a ContextGroup with given parent +func WithParent(p ContextGroup) ContextGroup { + if p == nil { + panic("nil ContextGroup") + } + c := newContextGroup(p.Context(), nil) + p.AddChildGroup(c) + return c +} + +// WithBackground returns a ContextGroup with context.Background() +func WithBackground() ContextGroup { + return newContextGroup(context.Background(), nil) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup_test.go b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup_test.go new file mode 100644 index 000000000..883ccf634 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup/ctxgroup_test.go @@ -0,0 +1,187 @@ +package ctxgroup + +import ( + "testing" + "time" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +type tree struct { + ContextGroup + c []tree +} + +func setupCGHierarchy(ctx context.Context) tree { + t := func(n ContextGroup, ts ...tree) tree { + return tree{n, ts} + } + + if ctx == nil { + ctx = context.Background() + } + a := WithContext(ctx) + b1 := WithParent(a) + b2 := WithParent(a) + c1 := WithParent(b1) + c2 := WithParent(b1) + c3 := WithParent(b2) + c4 := WithParent(b2) + + return t(a, t(b1, t(c1), t(c2)), t(b2, t(c3), t(c4))) +} + +func TestClosingClosed(t *testing.T) { + + a := WithBackground() + Q := make(chan string) + + go func() { + <-a.Closing() + Q <- "closing" + }() + + go func() { + <-a.Closed() + Q <- "closed" + }() + + go func() { + a.Close() + Q <- "closed" + }() + + if q := <-Q; q != "closing" { + t.Error("order incorrect. closing not first") + } + if q := <-Q; q != "closed" { + t.Error("order incorrect. closing not first") + } + if q := <-Q; q != "closed" { + t.Error("order incorrect. closing not first") + } +} + +func TestChildFunc(t *testing.T) { + a := WithBackground() + + wait1 := make(chan struct{}) + wait2 := make(chan struct{}) + wait3 := make(chan struct{}) + wait4 := make(chan struct{}) + go func() { + a.Close() + wait4 <- struct{}{} + }() + + a.AddChildFunc(func(parent ContextGroup) { + wait1 <- struct{}{} + <-wait2 + wait3 <- struct{}{} + }) + + <-wait1 + select { + case <-wait3: + t.Error("should not be closed yet") + case <-wait4: + t.Error("should not be closed yet") + case <-a.Closed(): + t.Error("should not be closed yet") + default: + } + + wait2 <- struct{}{} + + select { + case <-wait3: + case <-time.After(time.Second): + t.Error("should be closed now") + } + + select { + case <-wait4: + case <-time.After(time.Second): + t.Error("should be closed now") + } +} + +func TestTeardownCalledOnce(t *testing.T) { + a := setupCGHierarchy(nil) + + onlyOnce := func() func() error { + count := 0 + return func() error { + count++ + if count > 1 { + t.Error("called", count, "times") + } + return nil + } + } + + a.SetTeardown(onlyOnce()) + a.c[0].SetTeardown(onlyOnce()) + a.c[0].c[0].SetTeardown(onlyOnce()) + a.c[0].c[1].SetTeardown(onlyOnce()) + a.c[1].SetTeardown(onlyOnce()) + a.c[1].c[0].SetTeardown(onlyOnce()) + a.c[1].c[1].SetTeardown(onlyOnce()) + + a.c[0].c[0].Close() + a.c[0].c[0].Close() + a.c[0].c[0].Close() + a.c[0].c[0].Close() + a.c[0].Close() + a.c[0].Close() + a.c[0].Close() + a.c[0].Close() + a.Close() + a.Close() + a.Close() + a.Close() + a.c[1].Close() + a.c[1].Close() + a.c[1].Close() + a.c[1].Close() +} + +func TestOnClosed(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + a := setupCGHierarchy(ctx) + Q := make(chan string, 10) + + onClosed := func(s string, c ContextGroup) { + <-c.Closed() + Q <- s + } + + go onClosed("0", a.c[0]) + go onClosed("10", a.c[1].c[0]) + go onClosed("", a) + go onClosed("00", a.c[0].c[0]) + go onClosed("1", a.c[1]) + go onClosed("01", a.c[0].c[1]) + go onClosed("11", a.c[1].c[1]) + + test := func(ss ...string) { + s1 := <-Q + for _, s2 := range ss { + if s1 == s2 { + return + } + } + t.Error("context not in group", s1, ss) + } + + cancel() + + test("00", "01", "10", "11") + test("00", "01", "10", "11") + test("00", "01", "10", "11") + test("00", "01", "10", "11") + test("0", "1") + test("0", "1") + test("") +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-router/LICENSE b/Godeps/_workspace/src/github.com/jbenet/go-router/LICENSE new file mode 100644 index 000000000..c7386b3c9 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-router/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Juan Batiz-Benet + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-router/README.md b/Godeps/_workspace/src/github.com/jbenet/go-router/README.md new file mode 100644 index 000000000..c8675d678 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-router/README.md @@ -0,0 +1,8 @@ +# go-router - networking inspired muxing + +This is a networking-inspired model. It's useful for muxing and lots of other +things. It enables you to construct your abstractions as you would compose +computer networks (endpoints, switches, routing tables). These could represent +processing workers, entire subsystems, or even real computers ;). + +See https://godoc.org/github.com/jbenet/go-router diff --git a/Godeps/_workspace/src/github.com/jbenet/go-router/distance.go b/Godeps/_workspace/src/github.com/jbenet/go-router/distance.go new file mode 100644 index 000000000..956b3b1c6 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-router/distance.go @@ -0,0 +1,33 @@ +package router + +// HammingDistance is a DistanceFunc that interprets Addresses as strings +// and uses their Hamming distance. +// Return -1 if the Addresses are not strings, or strings length don't match. +func HammingDistance(a1, a2 Address) int { + s1, ok := a1.(string) + if !ok { + return -1 + } + + s2, ok := a2.(string) + if !ok { + return -1 + } + + // runes not code points + r1 := []rune(s1) + r2 := []rune(s2) + + // hamming distance requires equal length strings + if len(r1) != len(r2) { + return -1 + } + + d := 0 + for i := range r1 { + if r1[i] != r2[i] { + d++ + } + } + return d +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-router/impl.go b/Godeps/_workspace/src/github.com/jbenet/go-router/impl.go new file mode 100644 index 000000000..81683faf2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-router/impl.go @@ -0,0 +1,81 @@ +package router + +import ( + "errors" +) + +type packet struct { + a Address + p interface{} +} + +// ErrNoRoute signals when there is no Route to a destination +var ErrNoRoute = errors.New("routing error: no route") + +// NewPacket constructs a trivial packet linking a destination Address to +// an interface{} payload. +func NewPacket(destination Address, payload interface{}) Packet { + return &packet{destination, payload} +} + +func (p *packet) Destination() Address { + return p.a +} + +func (p *packet) Payload() interface{} { + return p.p +} + +// QueueNode is a trivial node, which accepts packets into a queue +type QueueNode struct { + a Address + q chan Packet +} + +// NewQueueNode constructs a node with an internal chan Packet queue +func NewQueueNode(addr Address, q chan Packet) *QueueNode { + return &QueueNode{addr, q} +} + +// Queue returns the chan Packet queue +func (n *QueueNode) Queue() <-chan Packet { + return n.q +} + +// Address returns the QueueNode's Address +func (n *QueueNode) Address() Address { + return n.a +} + +// HandlePacket consumes the incomng packet and adds it to the queue. +func (n *QueueNode) HandlePacket(p Packet, s Node) error { + n.q <- p + return nil +} + +type switchh struct { + addr Address + router Router + nodes []Node +} + +// NewSwitch constructs a switch with given Router and list of adjacent Nodes. +func NewSwitch(a Address, r Router, adj []Node) Switch { + return &switchh{a, r, adj} +} + +func (s *switchh) Address() Address { + return s.addr +} + +func (s *switchh) Router() Router { + return s.router +} + +func (s *switchh) HandlePacket(p Packet, n Node) error { + next := s.router.Route(p) + if next != nil { + return next.HandlePacket(p, s) + } + return ErrNoRoute +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-router/interface.go b/Godeps/_workspace/src/github.com/jbenet/go-router/interface.go new file mode 100644 index 000000000..df86dce4c --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-router/interface.go @@ -0,0 +1,81 @@ +// Package router is a networking-inspired routing model. It's useful for +// muxing and lots of other things. It enables you to construct your +// abstractions as you would compose computer networks (endpoints, switches, +// routing tables). These could represent processing workers, entire +// subsystems, or even real computers ;). +package router + +// Address is our way of knowing where we're headed. Traditionally, addresses +// are things like IP Addresses, or email addresses. But filepaths, or URLs +// can be seen as addresses too. We really leave it up to you. Our routing +// is general, and forces you to pick an addressing scheme, and some logic to +// discriminate addresses that you'll plug into Routers (Address DistanceFunc). +type Address interface{} + +// Packet is the unit of moving things in our network. Anything can be routed +// in our network, as long as it has a Destination. +type Packet interface { + + // Destination is the Address of the endpoint this Packet is headed to. + // They could use the same Addressing throughout (recommended) like the + // internet, or face the pain of translating addresses at inter-network + // gateways. + Destination() Address + + // Payload is here for completeness. This can be the Packet itself, but this + // function encourages the client to think through their packet design. + Payload() interface{} +} + +// Node is an object which has interfaces to connect to networks. This +// is an "endpoint" object. +type Node interface { + + // Address returns the node's address. + Address() Address + + // HandlePacket receives a packet sent by another node. + HandlePacket(Packet, Node) error +} + +// Switch is the basic forwarding device. It listens to all its interfaces (it +// is a Node in our network) and Forwards all Packets received by any interface +// out another interface, according to its ForwardingTable. +type Switch interface { + Node + + // Router returns the Router used to decide where to forward packets. + Router() Router +} + +// Router is an object that decides how a Packet should be Routed. This should +// be as close to a static table lookup as possible, meaning it would be best +// to prepare a forwarding table in parallel, instead of blocking. +// +// Our Router captures the entire Control Plane, meaning that we can implement: +// - Static Routing - forwarding table only +// - Dynamic Routing - routing table computed with an algorithm or protocol +// - "SDN" Routing - Control Plane separated from Data Plane +// And even: +// - URL Routers (like gorilla.Muxer) +// - Protocol Muxers +// entirely within different Router implementations. +// +// Note that this is a break from traditional networking systems. Instead of +// having the abstractions of FIB, RIB, Routing/Forwarding Tables, Routers, +// and Switches, we only have the last two: +// - Router -- the things that "route" (decide where things go) +// - Switch -- connecting Nodes, "switch" Packets according to a Router. +type Router interface { + + // Route decides how to route a Packet. + // It returns the next hop Node chosen to send the Packet to. + // Route may return nil, if no route is suitable at all (equivalent of drop). + Route(Packet) Node +} + +// DistanceFunc returns a measure of distance between two Addresses. +// Examples: +// - masked longest prefix match (IP, CIDR) +// - XOR distance (Kademlia) +type DistanceFunc func(a, b Address) int diff --git a/Godeps/_workspace/src/github.com/jbenet/go-router/ip/ip.go b/Godeps/_workspace/src/github.com/jbenet/go-router/ip/ip.go new file mode 100644 index 000000000..f3d82aa72 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-router/ip/ip.go @@ -0,0 +1,6 @@ +// Package ip provides a Table for ip matching +package ip + +// import ( +// ipaddr "github.com/mikioh/ipaddr" +// ) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-router/ip/ip_test.go b/Godeps/_workspace/src/github.com/jbenet/go-router/ip/ip_test.go new file mode 100644 index 000000000..5c861f3f1 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-router/ip/ip_test.go @@ -0,0 +1 @@ +package ip diff --git a/Godeps/_workspace/src/github.com/jbenet/go-router/pkt_test.go b/Godeps/_workspace/src/github.com/jbenet/go-router/pkt_test.go new file mode 100644 index 000000000..731dcd480 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-router/pkt_test.go @@ -0,0 +1,77 @@ +package router + +import ( + "testing" +) + +type mockNode struct { + addr string + pkts []Packet +} + +func (m *mockNode) Address() Address { + return m.addr +} + +func (m *mockNode) HandlePacket(p Packet, n Node) error { + m.pkts = append(m.pkts, p) + return nil +} + +type mockPacket struct { + a string +} + +func (m *mockPacket) Destination() Address { + return m.a +} + +func (m *mockPacket) Payload() interface{} { + return m +} + +func TestAddrs(t *testing.T) { + + ta := func(a, b Address, expect int) { + actual := HammingDistance(a, b) + if actual != expect { + t.Error("address distance error:", a, b, expect, actual) + } + } + + a := "abc" + b := "abc" + c := "abd" + d := "add" + e := "ddd" + + ta(a, a, 0) + ta(a, b, 0) + ta(a, c, 1) + ta(a, d, 2) + ta(a, e, 3) +} + +func TestNodes(t *testing.T) { + + a := &mockPacket{"abc"} + b := &mockPacket{"abc"} + c := &mockPacket{"abd"} + d := &mockPacket{"add"} + e := &mockPacket{"ddd"} + + n := &mockNode{addr: "abc"} + n2 := &mockNode{addr: "ddd"} + n.HandlePacket(a, n2) + n.HandlePacket(b, n2) + n.HandlePacket(c, n2) + n.HandlePacket(d, n2) + n.HandlePacket(e, n2) + + pkts := []Packet{a, b, c, d, e} + for i, p := range pkts { + if n.pkts[i] != p { + t.Error("pkts not handled in order.") + } + } +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-router/table.go b/Godeps/_workspace/src/github.com/jbenet/go-router/table.go new file mode 100644 index 000000000..ed5ca7c71 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-router/table.go @@ -0,0 +1,137 @@ +package router + +import ( + "sync" +) + +// TableEntry is an (Address, Node) pair for a routing Table +type TableEntry interface { + Address() Address + NextHop() Node +} + +func NewTableEntry(addr Address, nexthop Node) TableEntry { + return &tableEntry{addr, nexthop} +} + +type tableEntry struct { + addr Address + next Node +} + +func (te *tableEntry) Address() Address { + return te.addr +} + +func (te *tableEntry) NextHop() Node { + return te.next +} + +// Table is a Router (Routing Table, really) based on a distance criterion. +// +// For example: +// +// n1 := NewQueueNode("aaa", make(chan Packet, 10)) +// n2 := NewQueueNode("aba", make(chan Packet, 10)) +// n3 := NewQueueNode("abc", make(chan Packet, 10)) +// +// var t router.Table +// t.Distance = router.HammingDistance +// t.AddNodes(n1, n2) +// +// p1 := NewPacket("aaa", "hello1") +// p2 := NewPacket("aba", "hello2") +// p3 := NewPacket("abc", "hello3") +// +// t.Route(p1) // n1 +// t.Route(p2) // n2 +// t.Route(p3) // n2, because we don't have n3 and n2 is closet +// +// t.AddNode(n3) +// t.Route(p3) // n3 +type Table interface { + Router + + // Entries are the entries in this routing table + Entries() []TableEntry + + // Distance returns a measure of distance between two Addresses + Distance() DistanceFunc +} + +type SimpleTable struct { + entries []TableEntry + distance DistanceFunc + sync.RWMutex +} + +// Entries are the entries in this routing table +func (t *SimpleTable) Entries() []TableEntry { + return t.entries +} + +// Distance returns a measure of distance between two Addresses. +func (t *SimpleTable) Distance() DistanceFunc { + return t.distance +} + +// AddEntry adds an (Address, NextHop) entry to the Table +func (t *SimpleTable) AddEntry(addr Address, nextHop Node) { + t.Lock() + defer t.Unlock() + t.entries = append(t.entries, NewTableEntry(addr, nextHop)) +} + +// AddNode calls AddTableEntry for the given Node +func (t *SimpleTable) AddNode(n Node) { + t.AddEntry(n.Address(), n) +} + +// AddNodes calls AddTableEntry for the given Node +func (t *SimpleTable) AddNodes(ns ...Node) { + t.Lock() + defer t.Unlock() + for _, n := range ns { + t.entries = append(t.entries, NewTableEntry(n.Address(), n)) + } +} + +// Route decides how to route a Packet out of a list of Nodes. +// It returns the Node chosen to send the Packet to. +// Route may return nil, if no route is suitable at all (equivalent of drop). +func (t *SimpleTable) Route(p Packet) Node { + if t.entries == nil { + return nil + } + + t.RLock() + defer t.RUnlock() + + dist := t.Distance() + if dist == nil { + dist = equalDistance + } + + var best Node + var bestDist int + var addr = p.Destination() + + for _, e := range t.entries { + d := dist(e.Address(), addr) + if d < 0 { + continue + } + if best == nil || d < bestDist { + bestDist = d + best = e.NextHop() + } + } + return best +} + +func equalDistance(a, b Address) int { + if a == b { + return 0 + } + return -1 +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-router/table_test.go b/Godeps/_workspace/src/github.com/jbenet/go-router/table_test.go new file mode 100644 index 000000000..8acaa7093 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-router/table_test.go @@ -0,0 +1,80 @@ +package router + +import ( + "testing" +) + +func TestTable(t *testing.T) { + + na := &mockNode{addr: "abc"} + nc := &mockNode{addr: "abd"} + nd := &mockNode{addr: "add"} + ne := &mockNode{addr: "ddd"} + + pa := &mockPacket{"abc"} + pb := &mockPacket{"abc"} + pc := &mockPacket{"abd"} + pd := &mockPacket{"add"} + pe := &mockPacket{"ddd"} + + table := &SimpleTable{ + entries: []TableEntry{ + &tableEntry{na.Address(), na}, + &tableEntry{nc.Address(), nc}, + &tableEntry{nd.Address(), nd}, + &tableEntry{ne.Address(), ne}, + }, + } + + s := NewSwitch("sss", table, []Node{na, nc, nd, ne}) + s.HandlePacket(pa, na) + s.HandlePacket(pb, na) + s.HandlePacket(pc, na) + s.HandlePacket(pd, na) + s.HandlePacket(pe, na) + + tt := func(n *mockNode, pkts []Packet) { + for i, p := range pkts { + if len(n.pkts) <= i { + t.Error("pkts not handled in order.", n, pkts) + return + } + if n.pkts[i] != p { + t.Error("pkts not handled in order.", n, pkts) + } + } + } + + tt(na, []Packet{pa, pb}) + tt(nc, []Packet{pc}) + tt(nd, []Packet{pd}) + tt(ne, []Packet{pe}) +} + +func TestTable2(t *testing.T) { + + n1 := NewQueueNode("aaa", make(chan Packet, 1)) + n2 := NewQueueNode("aba", make(chan Packet, 1)) + n3 := NewQueueNode("abc", make(chan Packet, 1)) + + var tb SimpleTable + tb.distance = HammingDistance + tb.AddNodes(n1, n2) + + p1 := NewPacket("aaa", "hello1") + p2 := NewPacket("aba", "hello2") + p3 := NewPacket("abc", "hello3") + + testRoute := func(p Packet, expect Node) { + if tb.Route(p) != expect { + t.Error(p, "route should be", expect) + } + } + + testRoute(p1, n1) + testRoute(p2, n2) + testRoute(p3, n2) // n2 because we don't have n3 and n2 is closet + + tb.AddNode(n3) + testRoute(p3, n3) +}