Merge pull request #4804 from ipfs/feat/coreapi/dht

coreapi: DHT API
This commit is contained in:
Steven Allen 2018-09-11 01:13:37 +00:00 committed by GitHub
commit eb45644104
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 431 additions and 69 deletions

View File

@ -62,3 +62,8 @@ func (api *CoreAPI) Object() coreiface.ObjectAPI {
func (api *CoreAPI) Pin() coreiface.PinAPI {
return (*PinAPI)(api)
}
// Dht returns the DhtAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Dht() coreiface.DhtAPI {
return (*DhtAPI)(api)
}

132
core/coreapi/dht.go Normal file
View File

@ -0,0 +1,132 @@
package coreapi
import (
"context"
"errors"
"fmt"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
dag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag"
offline "gx/ipfs/QmPuLWvxK1vg6ckKUpT53Dow9VLCcQGdL5Trwxa8PTLp7r/go-ipfs-exchange-offline"
cidutil "gx/ipfs/QmPyxJ2QS7L5FhGkNYkNcXHGjDhvGHueJ4auqAstFHYxy5/go-cidutil"
blockservice "gx/ipfs/QmQLG22wSEStiociTSKQpZAuuaaWoF1B3iKyjPFvWiTQ77/go-blockservice"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
routing "gx/ipfs/QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU/go-libp2p-routing"
cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid"
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
blockstore "gx/ipfs/Qmeg56ecxRnVv7VWViMrDeEMoBHaNFMs4vQnyQrJ79Zz7i/go-ipfs-blockstore"
)
type DhtAPI CoreAPI
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) {
pi, err := api.node.Routing.FindPeer(ctx, peer.ID(p))
if err != nil {
return pstore.PeerInfo{}, err
}
return pi, nil
}
func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) {
settings, err := caopts.DhtFindProvidersOptions(opts...)
if err != nil {
return nil, err
}
rp, err := api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
}
numProviders := settings.NumProviders
if numProviders < 1 {
return nil, fmt.Errorf("number of providers must be greater than 0")
}
pchan := api.node.Routing.FindProvidersAsync(ctx, rp.Cid(), numProviders)
return pchan, nil
}
func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error {
settings, err := caopts.DhtProvideOptions(opts...)
if err != nil {
return err
}
if api.node.Routing == nil {
return errors.New("cannot provide in offline mode")
}
rp, err := api.core().ResolvePath(ctx, path)
if err != nil {
return err
}
c := rp.Cid()
has, err := api.node.Blockstore.Has(c)
if err != nil {
return err
}
if !has {
return fmt.Errorf("block %s not found locally, cannot provide", c)
}
if settings.Recursive {
err = provideKeysRec(ctx, api.node.Routing, api.node.Blockstore, []*cid.Cid{c})
} else {
err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c})
}
if err != nil {
return err
}
return nil
}
func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) error {
for _, c := range cids {
err := r.Provide(ctx, c, true)
if err != nil {
return err
}
}
return nil
}
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Blockstore, cids []*cid.Cid) error {
provided := cidutil.NewStreamingSet()
errCh := make(chan error)
go func() {
dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
for _, c := range cids {
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx))
if err != nil {
errCh <- err
}
}
}()
for {
select {
case k := <-provided.New:
err := r.Provide(ctx, k, true)
if err != nil {
return err
}
case err := <-errCh:
return err
case <-ctx.Done():
return ctx.Err()
}
}
}
func (api *DhtAPI) core() coreiface.CoreAPI {
return (*CoreAPI)(api)
}

109
core/coreapi/dht_test.go Normal file
View File

@ -0,0 +1,109 @@
package coreapi_test
import (
"context"
"io"
"io/ioutil"
"testing"
"github.com/ipfs/go-ipfs/core/coreapi/interface"
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
blocks "gx/ipfs/QmWAzSEoqZ6xU6pu8yL8e5WaMb7wtbfbhhN4p1DknUPtr3/go-block-format"
)
func TestDhtFindPeer(t *testing.T) {
ctx := context.Background()
nds, apis, err := makeAPISwarm(ctx, true, 5)
if err != nil {
t.Fatal(err)
}
pi, err := apis[2].Dht().FindPeer(ctx, peer.ID(nds[0].Identity))
if err != nil {
t.Fatal(err)
}
if pi.Addrs[0].String() != "/ip4/127.0.0.1/tcp/4001" {
t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String())
}
pi, err = apis[1].Dht().FindPeer(ctx, peer.ID(nds[2].Identity))
if err != nil {
t.Fatal(err)
}
if pi.Addrs[0].String() != "/ip4/127.0.2.1/tcp/4001" {
t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String())
}
}
func TestDhtFindProviders(t *testing.T) {
ctx := context.Background()
nds, apis, err := makeAPISwarm(ctx, true, 5)
if err != nil {
t.Fatal(err)
}
p, err := addTestObject(ctx, apis[0])
if err != nil {
t.Fatal(err)
}
out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
if err != nil {
t.Fatal(err)
}
provider := <-out
if provider.ID.String() != nds[0].Identity.String() {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
}
}
func TestDhtProvide(t *testing.T) {
ctx := context.Background()
nds, apis, err := makeAPISwarm(ctx, true, 5)
if err != nil {
t.Fatal(err)
}
// TODO: replace once there is local add on unixfs or somewhere
data, err := ioutil.ReadAll(&io.LimitedReader{R: rnd, N: 4092})
if err != nil {
t.Fatal(err)
}
b := blocks.NewBlock(data)
nds[0].Blockstore.Put(b)
p := iface.IpfsPath(b.Cid())
out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
if err != nil {
t.Fatal(err)
}
provider := <-out
if provider.ID.String() != "<peer.ID >" {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
}
err = apis[0].Dht().Provide(ctx, p)
if err != nil {
t.Fatal(err)
}
out, err = apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
if err != nil {
t.Fatal(err)
}
provider = <-out
if provider.ID.String() != nds[0].Identity.String() {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
}
}

View File

@ -31,6 +31,9 @@ type CoreAPI interface {
// ObjectAPI returns an implementation of Object API
Object() ObjectAPI
// Dht returns an implementation of Dht API
Dht() DhtAPI
// ResolvePath resolves the path using Unixfs resolver
ResolvePath(context.Context, Path) (ResolvedPath, error)

View File

@ -0,0 +1,26 @@
package iface
import (
"context"
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
)
// DhtAPI specifies the interface to the DHT
// Note: This API will likely get deprecated in near future, see
// https://github.com/ipfs/interface-ipfs-core/issues/249 for more context.
type DhtAPI interface {
// FindPeer queries the DHT for all of the multiaddresses associated with a
// Peer ID
FindPeer(context.Context, peer.ID) (pstore.PeerInfo, error)
// FindProviders finds peers in the DHT who can provide a specific value
// given a key.
FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error)
// Provide announces to the network that you are providing given values
Provide(context.Context, Path, ...options.DhtProvideOption) error
}

View File

@ -0,0 +1,62 @@
package options
type DhtProvideSettings struct {
Recursive bool
}
type DhtFindProvidersSettings struct {
NumProviders int
}
type DhtProvideOption func(*DhtProvideSettings) error
type DhtFindProvidersOption func(*DhtFindProvidersSettings) error
func DhtProvideOptions(opts ...DhtProvideOption) (*DhtProvideSettings, error) {
options := &DhtProvideSettings{
Recursive: false,
}
for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}
func DhtFindProvidersOptions(opts ...DhtFindProvidersOption) (*DhtFindProvidersSettings, error) {
options := &DhtFindProvidersSettings{
NumProviders: 20,
}
for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}
type dhtOpts struct{}
var Dht dhtOpts
// Recursive is an option for Dht.Provide which specifies whether to provide
// the given path recursively
func (dhtOpts) Recursive(recursive bool) DhtProvideOption {
return func(settings *DhtProvideSettings) error {
settings.Recursive = recursive
return nil
}
}
// NumProviders is an option for Dht.FindProviders which specifies the
// number of peers to look for. Default is 20
func (dhtOpts) NumProviders(numProviders int) DhtFindProvidersOption {
return func(settings *DhtFindProvidersSettings) error {
settings.NumProviders = numProviders
return nil
}
}

View File

@ -21,11 +21,13 @@ func addTestObject(ctx context.Context, api coreiface.CoreAPI) (coreiface.Path,
func TestBasicPublishResolve(t *testing.T) {
ctx := context.Background()
n, api, err := makeAPIIdent(ctx, true)
nds, apis, err := makeAPISwarm(ctx, true, 2)
if err != nil {
t.Fatal(err)
return
}
n := nds[0]
api := apis[0]
p, err := addTestObject(ctx, api)
if err != nil {
@ -60,11 +62,12 @@ func TestBasicPublishResolve(t *testing.T) {
func TestBasicPublishResolveKey(t *testing.T) {
ctx := context.Background()
_, api, err := makeAPIIdent(ctx, true)
_, apis, err := makeAPISwarm(ctx, true, 2)
if err != nil {
t.Fatal(err)
return
}
api := apis[0]
k, err := api.Key().Generate(ctx, "foo")
if err != nil {
@ -107,12 +110,13 @@ func TestBasicPublishResolveTimeout(t *testing.T) {
t.Skip("ValidTime doesn't appear to work at this time resolution")
ctx := context.Background()
n, api, err := makeAPIIdent(ctx, true)
nds, apis, err := makeAPISwarm(ctx, true, 2)
if err != nil {
t.Fatal(err)
return
}
n := nds[0]
api := apis[0]
p, err := addTestObject(ctx, api)
if err != nil {
t.Fatal(err)

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
"math"
"strings"
@ -14,6 +15,7 @@ import (
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
coreunix "github.com/ipfs/go-ipfs/core/coreunix"
mock "github.com/ipfs/go-ipfs/core/mock"
keystore "github.com/ipfs/go-ipfs/keystore"
repo "github.com/ipfs/go-ipfs/repo"
@ -22,8 +24,10 @@ import (
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
datastore "gx/ipfs/QmSpg1CvpXQQow5ernt1gNBXaXV6yxyNqi7XoeerWfzB5w/go-datastore"
syncds "gx/ipfs/QmSpg1CvpXQQow5ernt1gNBXaXV6yxyNqi7XoeerWfzB5w/go-datastore/sync"
mocknet "gx/ipfs/QmUEqyXr97aUbNmQADHYNknjwjjdVpJXEt1UZXmSG81EV4/go-libp2p/p2p/net/mock"
unixfs "gx/ipfs/QmWAfTyD6KEBm7bzqNRBPvqKrZCDtn5PGbs9V1DKfnVK59/go-unixfs"
config "gx/ipfs/QmYVqYJTVjetcf1guieEgWpK1PZtHPytP624vKzTF1P3r2/go-ipfs-config"
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
cbor "gx/ipfs/QmepvyyduWnXHm1G7ybmGbJfQQHTAo36DjP2nvF7H7ZXjE/go-ipld-cbor"
)
@ -36,51 +40,89 @@ var helloStr = "hello, world!"
// `echo -n | ipfs add`
var emptyFile = "/ipfs/QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH"
func makeAPIIdent(ctx context.Context, fullIdentity bool) (*core.IpfsNode, coreiface.CoreAPI, error) {
var ident config.Identity
if fullIdentity {
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
func makeAPISwarm(ctx context.Context, fullIdentity bool, n int) ([]*core.IpfsNode, []coreiface.CoreAPI, error) {
mn := mocknet.New(ctx)
nodes := make([]*core.IpfsNode, n)
apis := make([]coreiface.CoreAPI, n)
for i := 0; i < n; i++ {
var ident config.Identity
if fullIdentity {
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
return nil, nil, err
}
id, err := peer.IDFromPublicKey(pk)
if err != nil {
return nil, nil, err
}
kbytes, err := sk.Bytes()
if err != nil {
return nil, nil, err
}
ident = config.Identity{
PeerID: id.Pretty(),
PrivKey: base64.StdEncoding.EncodeToString(kbytes),
}
} else {
ident = config.Identity{
PeerID: testPeerID,
}
}
c := config.Config{}
c.Addresses.Swarm = []string{fmt.Sprintf("/ip4/127.0.%d.1/tcp/4001", i)}
c.Identity = ident
r := &repo.Mock{
C: c,
D: syncds.MutexWrap(datastore.NewMapDatastore()),
K: keystore.NewMemKeystore(),
}
node, err := core.NewNode(ctx, &core.BuildCfg{
Repo: r,
Host: mock.MockHostOption(mn),
Online: fullIdentity,
})
if err != nil {
return nil, nil, err
}
id, err := peer.IDFromPublicKey(pk)
if err != nil {
return nil, nil, err
}
kbytes, err := sk.Bytes()
if err != nil {
return nil, nil, err
}
ident = config.Identity{
PeerID: id.Pretty(),
PrivKey: base64.StdEncoding.EncodeToString(kbytes),
}
} else {
ident = config.Identity{
PeerID: testPeerID,
}
nodes[i] = node
apis[i] = coreapi.NewCoreAPI(node)
}
r := &repo.Mock{
C: config.Config{
Identity: ident,
},
D: syncds.MutexWrap(datastore.NewMapDatastore()),
K: keystore.NewMemKeystore(),
}
node, err := core.NewNode(ctx, &core.BuildCfg{Repo: r})
err := mn.LinkAll()
if err != nil {
return nil, nil, err
}
api := coreapi.NewCoreAPI(node)
return node, api, nil
bsinf := core.BootstrapConfigWithPeers(
[]pstore.PeerInfo{
nodes[0].Peerstore.PeerInfo(nodes[0].Identity),
},
)
for _, n := range nodes[1:] {
if err := n.Bootstrap(bsinf); err != nil {
return nil, nil, err
}
}
return nodes, apis, nil
}
func makeAPI(ctx context.Context) (*core.IpfsNode, coreiface.CoreAPI, error) {
return makeAPIIdent(ctx, false)
nd, api, err := makeAPISwarm(ctx, false, 1)
if err != nil {
return nil, nil, err
}
return nd[0], api[0], nil
}
func TestAdd(t *testing.T) {

View File

@ -6,6 +6,7 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
merkledag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag"
cidutil "gx/ipfs/QmPyxJ2QS7L5FhGkNYkNcXHGjDhvGHueJ4auqAstFHYxy5/go-cidutil"
ipld "gx/ipfs/QmX5CsuHyVZeTLxgRSYkgLSDQKb9UjE8xnhQzCEJWWWFsC/go-ipld-format"
cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid"
blocks "gx/ipfs/Qmeg56ecxRnVv7VWViMrDeEMoBHaNFMs4vQnyQrJ79Zz7i/go-ipfs-blockstore"
@ -29,7 +30,7 @@ func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool)
outCh := make(chan *cid.Cid)
go func() {
defer close(outCh)
for c := range set.new {
for c := range set.New {
select {
case <-ctx.Done():
return
@ -43,21 +44,23 @@ func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool)
}
}
func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*streamingSet, error) {
set := newStreamingSet()
func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*cidutil.StreamingSet, error) {
set := cidutil.NewStreamingSet()
go func() {
defer close(set.new)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(set.New)
for _, key := range pinning.DirectKeys() {
set.add(key)
set.Visitor(ctx)(key)
}
for _, key := range pinning.RecursiveKeys() {
set.add(key)
set.Visitor(ctx)(key)
if !onlyRoots {
err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.add)
err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.Visitor(ctx))
if err != nil {
log.Errorf("reprovide indirect pins: %s", err)
return
@ -68,27 +71,3 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRo
return set, nil
}
type streamingSet struct {
set *cid.Set
new chan *cid.Cid
}
// NewSet initializes and returns a new Set.
func newStreamingSet() *streamingSet {
return &streamingSet{
set: cid.NewSet(),
new: make(chan *cid.Cid),
}
}
// add adds a Cid to the set only if it is
// not in it already.
func (s *streamingSet) add(c *cid.Cid) bool {
if s.set.Visit(c) {
s.new <- c
return true
}
return false
}