core: deprecate CoreAPI.Dht, introduce CoreAPI.Routing

This commit is contained in:
Henrique Dias 2024-01-31 11:07:30 +01:00
parent dccbfcf6b5
commit 80973d87cc
18 changed files with 541 additions and 470 deletions

View File

@ -227,6 +227,8 @@ func (api *HttpApi) Object() iface.ObjectAPI {
return (*ObjectAPI)(api) return (*ObjectAPI)(api)
} }
// nolint deprecated
// Deprecated: use [HttpApi.Routing] instead.
func (api *HttpApi) Dht() iface.DhtAPI { func (api *HttpApi) Dht() iface.DhtAPI {
return (*DhtAPI)(api) return (*DhtAPI)(api)
} }

View File

@ -2,110 +2,30 @@ package rpc
import ( import (
"context" "context"
"encoding/json"
"github.com/ipfs/boxo/path" "github.com/ipfs/boxo/path"
caopts "github.com/ipfs/kubo/core/coreiface/options" caopts "github.com/ipfs/kubo/core/coreiface/options"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
) )
type DhtAPI HttpApi type DhtAPI HttpApi
// nolint deprecated
// Deprecated: use [RoutingAPI.FindPeer] instead.
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
var out struct { return api.core().Routing().FindPeer(ctx, p)
Type routing.QueryEventType
Responses []peer.AddrInfo
}
resp, err := api.core().Request("dht/findpeer", p.String()).Send(ctx)
if err != nil {
return peer.AddrInfo{}, err
}
if resp.Error != nil {
return peer.AddrInfo{}, resp.Error
}
defer resp.Close()
dec := json.NewDecoder(resp.Output)
for {
if err := dec.Decode(&out); err != nil {
return peer.AddrInfo{}, err
}
if out.Type == routing.FinalPeer {
return out.Responses[0], nil
}
}
} }
// nolint deprecated
// Deprecated: use [RoutingAPI.FindProviders] instead.
func (api *DhtAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.AddrInfo, error) { func (api *DhtAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.AddrInfo, error) {
options, err := caopts.DhtFindProvidersOptions(opts...) return api.core().Routing().FindProviders(ctx, p, opts...)
if err != nil {
return nil, err
}
rp, _, err := api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
}
resp, err := api.core().Request("dht/findprovs", rp.RootCid().String()).
Option("num-providers", options.NumProviders).
Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
res := make(chan peer.AddrInfo)
go func() {
defer resp.Close()
defer close(res)
dec := json.NewDecoder(resp.Output)
for {
var out struct {
Extra string
Type routing.QueryEventType
Responses []peer.AddrInfo
}
if err := dec.Decode(&out); err != nil {
return // todo: handle this somehow
}
if out.Type == routing.QueryError {
return // usually a 'not found' error
// todo: handle other errors
}
if out.Type == routing.Provider {
for _, pi := range out.Responses {
select {
case res <- pi:
case <-ctx.Done():
return
}
}
}
}
}()
return res, nil
} }
// nolint deprecated
// Deprecated: use [RoutingAPI.Provide] instead.
func (api *DhtAPI) Provide(ctx context.Context, p path.Path, opts ...caopts.DhtProvideOption) error { func (api *DhtAPI) Provide(ctx context.Context, p path.Path, opts ...caopts.DhtProvideOption) error {
options, err := caopts.DhtProvideOptions(opts...) return api.core().Routing().Provide(ctx, p, opts...)
if err != nil {
return err
}
rp, _, err := api.core().ResolvePath(ctx, p)
if err != nil {
return err
}
return api.core().Request("dht/provide", rp.RootCid().String()).
Option("recursive", options.Recursive).
Exec(ctx, nil)
} }
func (api *DhtAPI) core() *HttpApi { func (api *DhtAPI) core() *HttpApi {

View File

@ -6,7 +6,9 @@ import (
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"github.com/ipfs/boxo/path"
"github.com/ipfs/kubo/core/coreiface/options" "github.com/ipfs/kubo/core/coreiface/options"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/core/routing"
) )
@ -58,6 +60,102 @@ func (api *RoutingAPI) Put(ctx context.Context, key string, value []byte, opts .
return nil return nil
} }
func (api *RoutingAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
var out struct {
Type routing.QueryEventType
Responses []peer.AddrInfo
}
resp, err := api.core().Request("routing/findpeer", p.String()).Send(ctx)
if err != nil {
return peer.AddrInfo{}, err
}
if resp.Error != nil {
return peer.AddrInfo{}, resp.Error
}
defer resp.Close()
dec := json.NewDecoder(resp.Output)
for {
if err := dec.Decode(&out); err != nil {
return peer.AddrInfo{}, err
}
if out.Type == routing.FinalPeer {
return out.Responses[0], nil
}
}
}
func (api *RoutingAPI) FindProviders(ctx context.Context, p path.Path, opts ...options.RoutingFindProvidersOption) (<-chan peer.AddrInfo, error) {
options, err := options.RoutingFindProvidersOptions(opts...)
if err != nil {
return nil, err
}
rp, _, err := api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
}
resp, err := api.core().Request("routing/findprovs", rp.RootCid().String()).
Option("num-providers", options.NumProviders).
Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
res := make(chan peer.AddrInfo)
go func() {
defer resp.Close()
defer close(res)
dec := json.NewDecoder(resp.Output)
for {
var out struct {
Extra string
Type routing.QueryEventType
Responses []peer.AddrInfo
}
if err := dec.Decode(&out); err != nil {
return // todo: handle this somehow
}
if out.Type == routing.QueryError {
return // usually a 'not found' error
// todo: handle other errors
}
if out.Type == routing.Provider {
for _, pi := range out.Responses {
select {
case res <- pi:
case <-ctx.Done():
return
}
}
}
}
}()
return res, nil
}
func (api *RoutingAPI) Provide(ctx context.Context, p path.Path, opts ...options.RoutingProvideOption) error {
options, err := options.RoutingProvideOptions(opts...)
if err != nil {
return err
}
rp, _, err := api.core().ResolvePath(ctx, p)
if err != nil {
return err
}
return api.core().Request("routing/provide", rp.RootCid().String()).
Option("recursive", options.Recursive).
Exec(ctx, nil)
}
func (api *RoutingAPI) core() *HttpApi { func (api *RoutingAPI) core() *HttpApi {
return (*HttpApi)(api) return (*HttpApi)(api)
} }

View File

@ -130,7 +130,8 @@ func (api *CoreAPI) Pin() coreiface.PinAPI {
return (*PinAPI)(api) return (*PinAPI)(api)
} }
// Dht returns the DhtAPI interface implementation backed by the go-ipfs node // nolint deprecated
// Deprecated: use [CoreAPI.Routing] instead.
func (api *CoreAPI) Dht() coreiface.DhtAPI { func (api *CoreAPI) Dht() coreiface.DhtAPI {
return (*DhtAPI)(api) return (*DhtAPI)(api)
} }

View File

@ -2,151 +2,31 @@ package coreapi
import ( import (
"context" "context"
"fmt"
blockservice "github.com/ipfs/boxo/blockservice"
blockstore "github.com/ipfs/boxo/blockstore"
offline "github.com/ipfs/boxo/exchange/offline"
dag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/path" "github.com/ipfs/boxo/path"
cid "github.com/ipfs/go-cid"
cidutil "github.com/ipfs/go-cidutil"
coreiface "github.com/ipfs/kubo/core/coreiface" coreiface "github.com/ipfs/kubo/core/coreiface"
caopts "github.com/ipfs/kubo/core/coreiface/options" caopts "github.com/ipfs/kubo/core/coreiface/options"
"github.com/ipfs/kubo/tracing"
peer "github.com/libp2p/go-libp2p/core/peer" peer "github.com/libp2p/go-libp2p/core/peer"
routing "github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
) )
type DhtAPI CoreAPI type DhtAPI CoreAPI
// nolint deprecated
// Deprecated: use [RoutingAPI.FindPeer] instead.
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "FindPeer", trace.WithAttributes(attribute.String("peer", p.String()))) return api.core().Routing().FindPeer(ctx, p)
defer span.End()
err := api.checkOnline(false)
if err != nil {
return peer.AddrInfo{}, err
}
pi, err := api.routing.FindPeer(ctx, peer.ID(p))
if err != nil {
return peer.AddrInfo{}, err
}
return pi, nil
} }
// nolint deprecated
// Deprecated: use [RoutingAPI.FindProviders] instead.
func (api *DhtAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.AddrInfo, error) { func (api *DhtAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.AddrInfo, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "FindProviders", trace.WithAttributes(attribute.String("path", p.String()))) return api.core().Routing().FindProviders(ctx, p, opts...)
defer span.End()
settings, err := caopts.DhtFindProvidersOptions(opts...)
if err != nil {
return nil, err
}
span.SetAttributes(attribute.Int("numproviders", settings.NumProviders))
err = api.checkOnline(false)
if err != nil {
return nil, err
}
rp, _, err := api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
}
numProviders := settings.NumProviders
if numProviders < 1 {
return nil, fmt.Errorf("number of providers must be greater than 0")
}
pchan := api.routing.FindProvidersAsync(ctx, rp.RootCid(), numProviders)
return pchan, nil
} }
func (api *DhtAPI) Provide(ctx context.Context, path path.Path, opts ...caopts.DhtProvideOption) error { // nolint deprecated
ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "Provide", trace.WithAttributes(attribute.String("path", path.String()))) // Deprecated: use [RoutingAPI.Provide] instead.
defer span.End() func (api *DhtAPI) Provide(ctx context.Context, p path.Path, opts ...caopts.DhtProvideOption) error {
return api.core().Routing().Provide(ctx, p, opts...)
settings, err := caopts.DhtProvideOptions(opts...)
if err != nil {
return err
}
span.SetAttributes(attribute.Bool("recursive", settings.Recursive))
err = api.checkOnline(false)
if err != nil {
return err
}
rp, _, err := api.core().ResolvePath(ctx, path)
if err != nil {
return err
}
c := rp.RootCid()
has, err := api.blockstore.Has(ctx, c)
if err != nil {
return err
}
if !has {
return fmt.Errorf("block %s not found locally, cannot provide", c)
}
if settings.Recursive {
err = provideKeysRec(ctx, api.routing, api.blockstore, []cid.Cid{c})
} else {
err = provideKeys(ctx, api.routing, []cid.Cid{c})
}
if err != nil {
return err
}
return nil
}
func provideKeys(ctx context.Context, r routing.Routing, cids []cid.Cid) error {
for _, c := range cids {
err := r.Provide(ctx, c, true)
if err != nil {
return err
}
}
return nil
}
func provideKeysRec(ctx context.Context, r routing.Routing, bs blockstore.Blockstore, cids []cid.Cid) error {
provided := cidutil.NewStreamingSet()
errCh := make(chan error)
go func() {
dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
for _, c := range cids {
err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx))
if err != nil {
errCh <- err
}
}
}()
for {
select {
case k := <-provided.New:
err := r.Provide(ctx, k, true)
if err != nil {
return err
}
case err := <-errCh:
return err
case <-ctx.Done():
return ctx.Err()
}
}
} }
func (api *DhtAPI) core() coreiface.CoreAPI { func (api *DhtAPI) core() coreiface.CoreAPI {

View File

@ -3,17 +3,29 @@ package coreapi
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"strings" "strings"
blockservice "github.com/ipfs/boxo/blockservice"
blockstore "github.com/ipfs/boxo/blockstore"
offline "github.com/ipfs/boxo/exchange/offline"
dag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/path"
cid "github.com/ipfs/go-cid"
cidutil "github.com/ipfs/go-cidutil"
coreiface "github.com/ipfs/kubo/core/coreiface" coreiface "github.com/ipfs/kubo/core/coreiface"
caopts "github.com/ipfs/kubo/core/coreiface/options" caopts "github.com/ipfs/kubo/core/coreiface/options"
"github.com/ipfs/kubo/tracing"
peer "github.com/libp2p/go-libp2p/core/peer" peer "github.com/libp2p/go-libp2p/core/peer"
routing "github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
) )
type RoutingAPI CoreAPI type RoutingAPI CoreAPI
func (r *RoutingAPI) Get(ctx context.Context, key string) ([]byte, error) { func (api *RoutingAPI) Get(ctx context.Context, key string) ([]byte, error) {
if !r.nd.IsOnline { if !api.nd.IsOnline {
return nil, coreiface.ErrOffline return nil, coreiface.ErrOffline
} }
@ -22,16 +34,16 @@ func (r *RoutingAPI) Get(ctx context.Context, key string) ([]byte, error) {
return nil, err return nil, err
} }
return r.routing.GetValue(ctx, dhtKey) return api.routing.GetValue(ctx, dhtKey)
} }
func (r *RoutingAPI) Put(ctx context.Context, key string, value []byte, opts ...caopts.RoutingPutOption) error { func (api *RoutingAPI) Put(ctx context.Context, key string, value []byte, opts ...caopts.RoutingPutOption) error {
options, err := caopts.RoutingPutOptions(opts...) options, err := caopts.RoutingPutOptions(opts...)
if err != nil { if err != nil {
return err return err
} }
err = r.checkOnline(options.AllowOffline) err = api.checkOnline(options.AllowOffline)
if err != nil { if err != nil {
return err return err
} }
@ -41,7 +53,7 @@ func (r *RoutingAPI) Put(ctx context.Context, key string, value []byte, opts ...
return err return err
} }
return r.routing.PutValue(ctx, dhtKey, value) return api.routing.PutValue(ctx, dhtKey, value)
} }
func normalizeKey(s string) (string, error) { func normalizeKey(s string) (string, error) {
@ -58,3 +70,134 @@ func normalizeKey(s string) (string, error) {
} }
return strings.Join(append(parts[:2], string(k)), "/"), nil return strings.Join(append(parts[:2], string(k)), "/"), nil
} }
func (api *RoutingAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "FindPeer", trace.WithAttributes(attribute.String("peer", p.String())))
defer span.End()
err := api.checkOnline(false)
if err != nil {
return peer.AddrInfo{}, err
}
pi, err := api.routing.FindPeer(ctx, peer.ID(p))
if err != nil {
return peer.AddrInfo{}, err
}
return pi, nil
}
func (api *RoutingAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopts.RoutingFindProvidersOption) (<-chan peer.AddrInfo, error) {
ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "FindProviders", trace.WithAttributes(attribute.String("path", p.String())))
defer span.End()
settings, err := caopts.RoutingFindProvidersOptions(opts...)
if err != nil {
return nil, err
}
span.SetAttributes(attribute.Int("numproviders", settings.NumProviders))
err = api.checkOnline(false)
if err != nil {
return nil, err
}
rp, _, err := api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
}
numProviders := settings.NumProviders
if numProviders < 1 {
return nil, fmt.Errorf("number of providers must be greater than 0")
}
pchan := api.routing.FindProvidersAsync(ctx, rp.RootCid(), numProviders)
return pchan, nil
}
func (api *RoutingAPI) Provide(ctx context.Context, path path.Path, opts ...caopts.RoutingProvideOption) error {
ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "Provide", trace.WithAttributes(attribute.String("path", path.String())))
defer span.End()
settings, err := caopts.RoutingProvideOptions(opts...)
if err != nil {
return err
}
span.SetAttributes(attribute.Bool("recursive", settings.Recursive))
err = api.checkOnline(false)
if err != nil {
return err
}
rp, _, err := api.core().ResolvePath(ctx, path)
if err != nil {
return err
}
c := rp.RootCid()
has, err := api.blockstore.Has(ctx, c)
if err != nil {
return err
}
if !has {
return fmt.Errorf("block %s not found locally, cannot provide", c)
}
if settings.Recursive {
err = provideKeysRec(ctx, api.routing, api.blockstore, []cid.Cid{c})
} else {
err = provideKeys(ctx, api.routing, []cid.Cid{c})
}
if err != nil {
return err
}
return nil
}
func provideKeys(ctx context.Context, r routing.Routing, cids []cid.Cid) error {
for _, c := range cids {
err := r.Provide(ctx, c, true)
if err != nil {
return err
}
}
return nil
}
func provideKeysRec(ctx context.Context, r routing.Routing, bs blockstore.Blockstore, cids []cid.Cid) error {
provided := cidutil.NewStreamingSet()
errCh := make(chan error)
go func() {
dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
for _, c := range cids {
err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx))
if err != nil {
errCh <- err
}
}
}()
for {
select {
case k := <-provided.New:
err := r.Provide(ctx, k, true)
if err != nil {
return err
}
case err := <-errCh:
return err
case <-ctx.Done():
return ctx.Err()
}
}
}
func (api *RoutingAPI) core() coreiface.CoreAPI {
return (*CoreAPI)(api)
}

View File

@ -34,7 +34,8 @@ type CoreAPI interface {
// Object returns an implementation of Object API // Object returns an implementation of Object API
Object() ObjectAPI Object() ObjectAPI
// Dht returns an implementation of Dht API // nolint deprecated
// Deprecated: use [Routing] instead.
Dht() DhtAPI Dht() DhtAPI
// Swarm returns an implementation of Swarm API // Swarm returns an implementation of Swarm API

View File

@ -4,24 +4,22 @@ import (
"context" "context"
"github.com/ipfs/boxo/path" "github.com/ipfs/boxo/path"
"github.com/ipfs/kubo/core/coreiface/options" "github.com/ipfs/kubo/core/coreiface/options"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
) )
// DhtAPI specifies the interface to the DHT // nolint deprecated
// Note: This API will likely get deprecated in near future, see // Deprecated: use [RoutingAPI] instead.
// https://github.com/ipfs/interface-ipfs-core/issues/249 for more context.
type DhtAPI interface { type DhtAPI interface {
// FindPeer queries the DHT for all of the multiaddresses associated with a // nolint deprecated
// Peer ID // Deprecated: use [RoutingAPI.FindPeer] instead.
FindPeer(context.Context, peer.ID) (peer.AddrInfo, error) FindPeer(context.Context, peer.ID) (peer.AddrInfo, error)
// FindProviders finds peers in the DHT who can provide a specific value // nolint deprecated
// given a key. // Deprecated: use [RoutingAPI.FindProviders] instead.
FindProviders(context.Context, path.Path, ...options.DhtFindProvidersOption) (<-chan peer.AddrInfo, error) FindProviders(context.Context, path.Path, ...options.DhtFindProvidersOption) (<-chan peer.AddrInfo, error)
// Provide announces to the network that you are providing given values // nolint deprecated
// Deprecated: use [RoutingAPI.Provide] instead.
Provide(context.Context, path.Path, ...options.DhtProvideOption) error Provide(context.Context, path.Path, ...options.DhtProvideOption) error
} }

View File

@ -1,64 +1,29 @@
package options package options
type DhtProvideSettings struct { // nolint deprecated
Recursive bool // Deprecated: use [RoutingProvideSettings] instead.
} type DhtProvideSettings = RoutingProvideSettings
type DhtFindProvidersSettings struct { // nolint deprecated
NumProviders int // Deprecated: use [RoutingFindProvidersSettings] instead.
} type DhtFindProvidersSettings = RoutingFindProvidersSettings
type ( // nolint deprecated
DhtProvideOption func(*DhtProvideSettings) error // Deprecated: use [RoutingProvideOption] instead.
DhtFindProvidersOption func(*DhtFindProvidersSettings) error type DhtProvideOption = RoutingProvideOption
)
func DhtProvideOptions(opts ...DhtProvideOption) (*DhtProvideSettings, error) { // nolint deprecated
options := &DhtProvideSettings{ // Deprecated: use [RoutingFindProvidersOption] instead.
Recursive: false, type DhtFindProvidersOption = RoutingFindProvidersOption
}
for _, opt := range opts { // nolint deprecated
err := opt(options) // Deprecated: use [RoutingProvideOptions] instead.
if err != nil { var DhtProvideOptions = RoutingProvideOptions
return nil, err
}
}
return options, nil
}
func DhtFindProvidersOptions(opts ...DhtFindProvidersOption) (*DhtFindProvidersSettings, error) { // nolint deprecated
options := &DhtFindProvidersSettings{ // Deprecated: use [RoutingFindProvidersOptions] instead.
NumProviders: 20, var DhtFindProvidersOptions = RoutingFindProvidersOptions
}
for _, opt := range opts { // nolint deprecated
err := opt(options) // Deprecated: use [Routing] instead.
if err != nil { var Dht = Routing
return nil, err
}
}
return options, nil
}
type dhtOpts struct{}
var Dht dhtOpts
// Recursive is an option for Dht.Provide which specifies whether to provide
// the given path recursively
func (dhtOpts) Recursive(recursive bool) DhtProvideOption {
return func(settings *DhtProvideSettings) error {
settings.Recursive = recursive
return nil
}
}
// NumProviders is an option for Dht.FindProviders which specifies the
// number of peers to look for. Default is 20
func (dhtOpts) NumProviders(numProviders int) DhtFindProvidersOption {
return func(settings *DhtFindProvidersSettings) error {
settings.NumProviders = numProviders
return nil
}
}

View File

@ -21,13 +21,76 @@ func RoutingPutOptions(opts ...RoutingPutOption) (*RoutingPutSettings, error) {
return options, nil return options, nil
} }
type putOpts struct{} // nolint deprecated
// Deprecated: use [Routing] instead.
var Put = Routing
var Put putOpts type RoutingProvideSettings struct {
Recursive bool
}
// AllowOffline is an option for Routing.Put which specifies whether to allow type RoutingFindProvidersSettings struct {
NumProviders int
}
type (
RoutingProvideOption func(*DhtProvideSettings) error
RoutingFindProvidersOption func(*DhtFindProvidersSettings) error
)
func RoutingProvideOptions(opts ...RoutingProvideOption) (*RoutingProvideSettings, error) {
options := &RoutingProvideSettings{
Recursive: false,
}
for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}
func RoutingFindProvidersOptions(opts ...RoutingFindProvidersOption) (*RoutingFindProvidersSettings, error) {
options := &RoutingFindProvidersSettings{
NumProviders: 20,
}
for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}
type routingOpts struct{}
var Routing routingOpts
// Recursive is an option for [Routing.Provide] which specifies whether to provide
// the given path recursively.
func (routingOpts) Recursive(recursive bool) RoutingProvideOption {
return func(settings *DhtProvideSettings) error {
settings.Recursive = recursive
return nil
}
}
// NumProviders is an option for [Routing.FindProviders] which specifies the
// number of peers to look for. Default is 20.
func (routingOpts) NumProviders(numProviders int) RoutingFindProvidersOption {
return func(settings *DhtFindProvidersSettings) error {
settings.NumProviders = numProviders
return nil
}
}
// AllowOffline is an option for [Routing.Put] which specifies whether to allow
// publishing when the node is offline. Default value is false // publishing when the node is offline. Default value is false
func (putOpts) AllowOffline(allow bool) RoutingPutOption { func (routingOpts) AllowOffline(allow bool) RoutingPutOption {
return func(settings *RoutingPutSettings) error { return func(settings *RoutingPutSettings) error {
settings.AllowOffline = allow settings.AllowOffline = allow
return nil return nil

View File

@ -3,7 +3,9 @@ package iface
import ( import (
"context" "context"
"github.com/ipfs/boxo/path"
"github.com/ipfs/kubo/core/coreiface/options" "github.com/ipfs/kubo/core/coreiface/options"
"github.com/libp2p/go-libp2p/core/peer"
) )
// RoutingAPI specifies the interface to the routing layer. // RoutingAPI specifies the interface to the routing layer.
@ -13,4 +15,15 @@ type RoutingAPI interface {
// Put sets a value for a given key // Put sets a value for a given key
Put(ctx context.Context, key string, value []byte, opts ...options.RoutingPutOption) error Put(ctx context.Context, key string, value []byte, opts ...options.RoutingPutOption) error
// FindPeer queries the routing system for all the multiaddresses associated
// with the given [peer.ID].
FindPeer(context.Context, peer.ID) (peer.AddrInfo, error)
// FindProviders finds the peers in the routing system who can provide a specific
// value given a key.
FindProviders(context.Context, path.Path, ...options.RoutingFindProvidersOption) (<-chan peer.AddrInfo, error)
// Provide announces to the network that you are providing given values
Provide(context.Context, path.Path, ...options.RoutingProvideOption) error
} }

View File

@ -75,7 +75,6 @@ func TestApi(p Provider) func(t *testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
t.Run("Block", tp.TestBlock) t.Run("Block", tp.TestBlock)
t.Run("Dag", tp.TestDag) t.Run("Dag", tp.TestDag)
t.Run("Dht", tp.TestDht)
t.Run("Key", tp.TestKey) t.Run("Key", tp.TestKey)
t.Run("Name", tp.TestName) t.Run("Name", tp.TestName)
t.Run("Object", tp.TestObject) t.Run("Object", tp.TestObject)

View File

@ -1,166 +0,0 @@
package tests
import (
"context"
"io"
"testing"
"time"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/ipfs/kubo/core/coreiface/options"
)
func (tp *TestSuite) TestDht(t *testing.T) {
tp.hasApi(t, func(api iface.CoreAPI) error {
if api.Dht() == nil {
return errAPINotImplemented
}
return nil
})
t.Run("TestDhtFindPeer", tp.TestDhtFindPeer)
t.Run("TestDhtFindProviders", tp.TestDhtFindProviders)
t.Run("TestDhtProvide", tp.TestDhtProvide)
}
func (tp *TestSuite) TestDhtFindPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
apis, err := tp.MakeAPISwarm(t, ctx, 5)
if err != nil {
t.Fatal(err)
}
self0, err := apis[0].Key().Self(ctx)
if err != nil {
t.Fatal(err)
}
laddrs0, err := apis[0].Swarm().LocalAddrs(ctx)
if err != nil {
t.Fatal(err)
}
if len(laddrs0) != 1 {
t.Fatal("unexpected number of local addrs")
}
time.Sleep(3 * time.Second)
pi, err := apis[2].Dht().FindPeer(ctx, self0.ID())
if err != nil {
t.Fatal(err)
}
if pi.Addrs[0].String() != laddrs0[0].String() {
t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String())
}
self2, err := apis[2].Key().Self(ctx)
if err != nil {
t.Fatal(err)
}
pi, err = apis[1].Dht().FindPeer(ctx, self2.ID())
if err != nil {
t.Fatal(err)
}
laddrs2, err := apis[2].Swarm().LocalAddrs(ctx)
if err != nil {
t.Fatal(err)
}
if len(laddrs2) != 1 {
t.Fatal("unexpected number of local addrs")
}
if pi.Addrs[0].String() != laddrs2[0].String() {
t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String())
}
}
func (tp *TestSuite) TestDhtFindProviders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
apis, err := tp.MakeAPISwarm(t, ctx, 5)
if err != nil {
t.Fatal(err)
}
p, err := addTestObject(ctx, apis[0])
if err != nil {
t.Fatal(err)
}
time.Sleep(3 * time.Second)
out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
if err != nil {
t.Fatal(err)
}
provider := <-out
self0, err := apis[0].Key().Self(ctx)
if err != nil {
t.Fatal(err)
}
if provider.ID.String() != self0.ID().String() {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), self0.ID().String())
}
}
func (tp *TestSuite) TestDhtProvide(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
apis, err := tp.MakeAPISwarm(t, ctx, 5)
if err != nil {
t.Fatal(err)
}
off0, err := apis[0].WithOptions(options.Api.Offline(true))
if err != nil {
t.Fatal(err)
}
s, err := off0.Block().Put(ctx, &io.LimitedReader{R: rnd, N: 4092})
if err != nil {
t.Fatal(err)
}
p := s.Path()
time.Sleep(3 * time.Second)
out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
if err != nil {
t.Fatal(err)
}
_, ok := <-out
if ok {
t.Fatal("did not expect to find any providers")
}
self0, err := apis[0].Key().Self(ctx)
if err != nil {
t.Fatal(err)
}
err = apis[0].Dht().Provide(ctx, p)
if err != nil {
t.Fatal(err)
}
out, err = apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
if err != nil {
t.Fatal(err)
}
provider := <-out
if provider.ID.String() != self0.ID().String() {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), self0.ID().String())
}
}

View File

@ -2,6 +2,7 @@ package tests
import ( import (
"context" "context"
"io"
"testing" "testing"
"time" "time"
@ -23,6 +24,9 @@ func (tp *TestSuite) TestRouting(t *testing.T) {
t.Run("TestRoutingGet", tp.TestRoutingGet) t.Run("TestRoutingGet", tp.TestRoutingGet)
t.Run("TestRoutingPut", tp.TestRoutingPut) t.Run("TestRoutingPut", tp.TestRoutingPut)
t.Run("TestRoutingPutOffline", tp.TestRoutingPutOffline) t.Run("TestRoutingPutOffline", tp.TestRoutingPutOffline)
t.Run("TestRoutingFindPeer", tp.TestRoutingFindPeer)
t.Run("TestRoutingFindProviders", tp.TestRoutingFindProviders)
t.Run("TestRoutingProvide", tp.TestRoutingProvide)
} }
func (tp *TestSuite) testRoutingPublishKey(t *testing.T, ctx context.Context, api iface.CoreAPI, opts ...options.NamePublishOption) (path.Path, ipns.Name) { func (tp *TestSuite) testRoutingPublishKey(t *testing.T, ctx context.Context, api iface.CoreAPI, opts ...options.NamePublishOption) (path.Path, ipns.Name) {
@ -95,6 +99,148 @@ func (tp *TestSuite) TestRoutingPutOffline(t *testing.T) {
err = api.Routing().Put(ctx, ipns.NamespacePrefix+name.String(), data) err = api.Routing().Put(ctx, ipns.NamespacePrefix+name.String(), data)
require.Error(t, err, "this operation should fail because we are offline") require.Error(t, err, "this operation should fail because we are offline")
err = api.Routing().Put(ctx, ipns.NamespacePrefix+name.String(), data, options.Put.AllowOffline(true)) err = api.Routing().Put(ctx, ipns.NamespacePrefix+name.String(), data, options.Routing.AllowOffline(true))
require.NoError(t, err) require.NoError(t, err)
} }
func (tp *TestSuite) TestRoutingFindPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
apis, err := tp.MakeAPISwarm(t, ctx, 5)
if err != nil {
t.Fatal(err)
}
self0, err := apis[0].Key().Self(ctx)
if err != nil {
t.Fatal(err)
}
laddrs0, err := apis[0].Swarm().LocalAddrs(ctx)
if err != nil {
t.Fatal(err)
}
if len(laddrs0) != 1 {
t.Fatal("unexpected number of local addrs")
}
time.Sleep(3 * time.Second)
pi, err := apis[2].Routing().FindPeer(ctx, self0.ID())
if err != nil {
t.Fatal(err)
}
if pi.Addrs[0].String() != laddrs0[0].String() {
t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String())
}
self2, err := apis[2].Key().Self(ctx)
if err != nil {
t.Fatal(err)
}
pi, err = apis[1].Routing().FindPeer(ctx, self2.ID())
if err != nil {
t.Fatal(err)
}
laddrs2, err := apis[2].Swarm().LocalAddrs(ctx)
if err != nil {
t.Fatal(err)
}
if len(laddrs2) != 1 {
t.Fatal("unexpected number of local addrs")
}
if pi.Addrs[0].String() != laddrs2[0].String() {
t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String())
}
}
func (tp *TestSuite) TestRoutingFindProviders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
apis, err := tp.MakeAPISwarm(t, ctx, 5)
if err != nil {
t.Fatal(err)
}
p, err := addTestObject(ctx, apis[0])
if err != nil {
t.Fatal(err)
}
time.Sleep(3 * time.Second)
out, err := apis[2].Routing().FindProviders(ctx, p, options.Routing.NumProviders(1))
if err != nil {
t.Fatal(err)
}
provider := <-out
self0, err := apis[0].Key().Self(ctx)
if err != nil {
t.Fatal(err)
}
if provider.ID.String() != self0.ID().String() {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), self0.ID().String())
}
}
func (tp *TestSuite) TestRoutingProvide(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
apis, err := tp.MakeAPISwarm(t, ctx, 5)
if err != nil {
t.Fatal(err)
}
off0, err := apis[0].WithOptions(options.Api.Offline(true))
if err != nil {
t.Fatal(err)
}
s, err := off0.Block().Put(ctx, &io.LimitedReader{R: rnd, N: 4092})
if err != nil {
t.Fatal(err)
}
p := s.Path()
time.Sleep(3 * time.Second)
out, err := apis[2].Routing().FindProviders(ctx, p, options.Routing.NumProviders(1))
if err != nil {
t.Fatal(err)
}
_, ok := <-out
if ok {
t.Fatal("did not expect to find any providers")
}
self0, err := apis[0].Key().Self(ctx)
if err != nil {
t.Fatal(err)
}
err = apis[0].Routing().Provide(ctx, p)
if err != nil {
t.Fatal(err)
}
out, err = apis[2].Routing().FindProviders(ctx, p, options.Routing.NumProviders(1))
if err != nil {
t.Fatal(err)
}
provider := <-out
if provider.ID.String() != self0.ID().String() {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), self0.ID().String())
}
}

View File

@ -7,6 +7,8 @@
- [Overview](#overview) - [Overview](#overview)
- [🔦 Highlights](#-highlights) - [🔦 Highlights](#-highlights)
- [Gateway: support for `/api/v0` is deprecated](#gateway-support-for-apiv0-is-deprecated) - [Gateway: support for `/api/v0` is deprecated](#gateway-support-for-apiv0-is-deprecated)
- [IPNS resolver cache's TTL can now be configured](#ipns-resolver-caches-ttl-can-now-be-configured)
- [RPC client: deprecated DHT API, added Routing API](#rpc-client-deprecated-dht-api-added-routing-api)
- [📝 Changelog](#-changelog) - [📝 Changelog](#-changelog)
- [👨‍👩‍👧‍👦 Contributors](#-contributors) - [👨‍👩‍👧‍👦 Contributors](#-contributors)
@ -24,6 +26,12 @@ If you have a legacy software that relies on this behavior, and want to expose p
You can now configure the upper-bound of a cached IPNS entry's Time-To-Live via [`Ipns.MaxCacheTTL`](https://github.com/ipfs/kubo/blob/master/docs/config.md#ipnsmaxcachettl). You can now configure the upper-bound of a cached IPNS entry's Time-To-Live via [`Ipns.MaxCacheTTL`](https://github.com/ipfs/kubo/blob/master/docs/config.md#ipnsmaxcachettl).
#### RPC client: deprecated DHT API, added Routing API
The RPC client now includes a Routing API to match the available commands in `/api/v0/routing`. In addition, the DHT API has been marked as deprecated.
In the next version, all DHT deprecated methods will be removed from the Go RPC client.
### 📝 Changelog ### 📝 Changelog
### 👨‍👩‍👧‍👦 Contributors ### 👨‍👩‍👧‍👦 Contributors

View File

@ -68,12 +68,12 @@ pitfalls that people run into)
### Checking providers ### Checking providers
When requesting content on ipfs, nodes search the DHT for 'provider records' to When requesting content on ipfs, nodes search the DHT for 'provider records' to
see who has what content. Let's manually do that on node B to make sure that see who has what content. Let's manually do that on node B to make sure that
node B is able to determine that node A has the data. Run `ipfs dht findprovs node B is able to determine that node A has the data. Run `ipfs routing findprovs
<hash>`. We expect to see the peer ID of node A printed out. If this command <hash>`. We expect to see the peer ID of node A printed out. If this command
returns nothing (or returns IDs that are not node A), then no record of A returns nothing (or returns IDs that are not node A), then no record of A
having the data exists on the network. This can happen if the data is added having the data exists on the network. This can happen if the data is added
while node A does not have a daemon running. If this happens, you can run `ipfs while node A does not have a daemon running. If this happens, you can run `ipfs
dht provide <hash>` on node A to announce to the network that you have that routing provide <hash>` on node A to announce to the network that you have that
hash. Then if you restart the `ipfs get` command, node B should now be able hash. Then if you restart the `ipfs get` command, node B should now be able
to tell that node A has the content it wants. If node A's peer ID showed up in to tell that node A has the content it wants. If node A's peer ID showed up in
the initial `findprovs` call, or manually providing the hash didn't resolve the the initial `findprovs` call, or manually providing the hash didn't resolve the

View File

@ -22,7 +22,7 @@ func TestDHTOptimisticProvide(t *testing.T) {
nodes.StartDaemons().Connect() nodes.StartDaemons().Connect()
hash := nodes[0].IPFSAddStr(testutils.RandomStr(100)) hash := nodes[0].IPFSAddStr(testutils.RandomStr(100))
nodes[0].IPFS("dht", "provide", hash) nodes[0].IPFS("routing", "provide", hash)
res := nodes[1].IPFS("routing", "findprovs", "--num-providers=1", hash) res := nodes[1].IPFS("routing", "findprovs", "--num-providers=1", hash)
assert.Equal(t, nodes[0].PeerID().String(), res.Stdout.Trimmed()) assert.Equal(t, nodes[0].PeerID().String(), res.Stdout.Trimmed())

View File

@ -512,7 +512,7 @@ port_from_maddr() {
findprovs_empty() { findprovs_empty() {
test_expect_success 'findprovs '$1' succeeds' ' test_expect_success 'findprovs '$1' succeeds' '
ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut ipfsi 1 routing findprovs -n 1 '$1' > findprovsOut
' '
test_expect_success "findprovs $1 output is empty" ' test_expect_success "findprovs $1 output is empty" '
@ -522,7 +522,7 @@ findprovs_empty() {
findprovs_expect() { findprovs_expect() {
test_expect_success 'findprovs '$1' succeeds' ' test_expect_success 'findprovs '$1' succeeds' '
ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut && ipfsi 1 routing findprovs -n 1 '$1' > findprovsOut &&
echo '$2' > expected echo '$2' > expected
' '