package coreapi import ( "context" "fmt" blockservice "github.com/ipfs/go-blockservice" cid "github.com/ipfs/go-cid" cidutil "github.com/ipfs/go-cidutil" blockstore "github.com/ipfs/go-ipfs-blockstore" offline "github.com/ipfs/go-ipfs-exchange-offline" "github.com/ipfs/go-ipfs/tracing" dag "github.com/ipfs/go-merkledag" coreiface "github.com/ipfs/interface-go-ipfs-core" caopts "github.com/ipfs/interface-go-ipfs-core/options" path "github.com/ipfs/interface-go-ipfs-core/path" peer "github.com/libp2p/go-libp2p-core/peer" routing "github.com/libp2p/go-libp2p-core/routing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) type DhtAPI CoreAPI func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "FindPeer", trace.WithAttributes(attribute.String("peer", p.String()))) defer span.End() err := api.checkOnline(false) if err != nil { return peer.AddrInfo{}, err } pi, err := api.routing.FindPeer(ctx, peer.ID(p)) if err != nil { return peer.AddrInfo{}, err } return pi, nil } func (api *DhtAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.AddrInfo, error) { ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "FindProviders", trace.WithAttributes(attribute.String("path", p.String()))) defer span.End() settings, err := caopts.DhtFindProvidersOptions(opts...) if err != nil { return nil, err } span.SetAttributes(attribute.Int("numproviders", settings.NumProviders)) err = api.checkOnline(false) if err != nil { return nil, err } rp, err := api.core().ResolvePath(ctx, p) if err != nil { return nil, err } numProviders := settings.NumProviders if numProviders < 1 { return nil, fmt.Errorf("number of providers must be greater than 0") } pchan := api.routing.FindProvidersAsync(ctx, rp.Cid(), numProviders) return pchan, nil } func (api *DhtAPI) Provide(ctx context.Context, path path.Path, opts ...caopts.DhtProvideOption) error { ctx, span := tracing.Span(ctx, "CoreAPI.DhtAPI", "Provide", trace.WithAttributes(attribute.String("path", path.String()))) defer span.End() settings, err := caopts.DhtProvideOptions(opts...) if err != nil { return err } span.SetAttributes(attribute.Bool("recursive", settings.Recursive)) err = api.checkOnline(false) if err != nil { return err } rp, err := api.core().ResolvePath(ctx, path) if err != nil { return err } c := rp.Cid() has, err := api.blockstore.Has(ctx, c) if err != nil { return err } if !has { return fmt.Errorf("block %s not found locally, cannot provide", c) } if settings.Recursive { err = provideKeysRec(ctx, api.routing, api.blockstore, []cid.Cid{c}) } else { err = provideKeys(ctx, api.routing, []cid.Cid{c}) } if err != nil { return err } return nil } func provideKeys(ctx context.Context, r routing.Routing, cids []cid.Cid) error { for _, c := range cids { err := r.Provide(ctx, c, true) if err != nil { return err } } return nil } func provideKeysRec(ctx context.Context, r routing.Routing, bs blockstore.Blockstore, cids []cid.Cid) error { provided := cidutil.NewStreamingSet() errCh := make(chan error) go func() { dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) for _, c := range cids { err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx)) if err != nil { errCh <- err } } }() for { select { case k := <-provided.New: err := r.Provide(ctx, k, true) if err != nil { return err } case err := <-errCh: return err case <-ctx.Done(): return ctx.Err() } } } func (api *DhtAPI) core() coreiface.CoreAPI { return (*CoreAPI)(api) }