From e45df729bea560b879e69a6200755be797d63c74 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 7 Jul 2017 18:32:59 +0300 Subject: [PATCH] namesys/pubsub: publisher and resolver Commits: namesys: pubsub Publisher and Resolver namesys/pubsub: pacify code climate. namesys/pubsub: timeout for rendezvous namesys/pubsub: filter self in bootstrap connections namesys/pubsub: Publish to the correct topic License: MIT Signed-off-by: vyzo namesys/pubsub: unit test Commits: namesys/pubsub: test namesys/pubsub_test: pacify code climate namesys/pubsub: update test to use extant mock routing License: MIT Signed-off-by: vyzo namesys/pubsub: integrate namesys pubsub namesys: integrate pubsub resolvers namesys/pubsub_test: tweak delays - trying to make travis happy. namesys/pubsub: fix duplicate bootstraps - subscription key is topic, not ipnskey. namesys/pubsub: no warning needed on cancellation namesys/pubsub: warning for receive errors - and more informative error messages at that. namesys/pubsub_test: smaller test - make it work with seemingly low fdlimits in travis/macosx. also, more informative test failures. namesys/pubsub: add delay to let pubsub perform handshake namesys/pubsub: update gx imports namesys/pubsub_test: preconnect publisher, reduce delays - preconnects the publisher to the receivers in order to avoid bootstrap flakiness with connectivity problems in travis. reduces sleeps to 1s for flood propagation (3s seems excessive with 5 hosts). namesys/pubsub: drop named return values in resolveOnce - per review comment. namesys/pubsub: check errors namesys/pubsub: store bytes in resolver datastore namesys/pubsub: resolver Cancel - for canceling subscriptions, pre whyrusleeping's request. namesys/pubsub: fix resolution without /ipns prefix - also improve the logging a bit. namesys/pubsub: don't resolve own keys through pubsub namesys/pubsub: signal ErrResolveFailed on resolution failure namesys/pubsub: use sync datastore, resolver lock only for subs namesys/pubsub_test: coverage for Cancel License: MIT Signed-off-by: vyzo namesys/pubsub: parallelize dht and pubsub publishing Commits: namesys/pubsub: code cosmetics namesys: parallelize publishing with dht and pubsub namesys/pubsub: periodically reprovide topic rendezvous namesys/pubsub: cancelation for rendezvous goroutine namesys/pubsub: log ipns record seqno on publish License: MIT Signed-off-by: vyzo namesys/pubsub: error checking License: MIT Signed-off-by: vyzo namesys/pubsub: --enable-namesys-pubsub option and management Commits: package.json: update go-libp2p-blankhost namesys: fix stale package imports update go-testutil namesys/pubsub: reduce bootstrap provide period to 8hr namesys/pubsub: try to extract the key from id first option to enable ipns pubsub: --enable-namesys-pubsub ipfs name pubsub management subcommands corehttp/gateway_test: mockNamesys needs to implement GetResolver pacify code climate License: MIT Signed-off-by: vyzo namesys/pubsub: pubsub sharness test test/sharness: test for ipns pubsub namesys/pubsub: return boolean indicator on Cancel package.json: remove duplicate entry for go-testutil update gx deps, testutil to 1.1.12 fix jenkins failure: use tabs in t0183-namesys-pubsub t0183: use 4 spaces for tabification License: MIT Signed-off-by: vyzo namesys/pubsub: update for new command interface License: MIT Signed-off-by: vyzo namesys/pubsub: fix sharness test for broken MacOS echo echo -n "" should print -n, but hey it's a mac. License: MIT Signed-off-by: vyzo --- cmd/ipfs/daemon.go | 4 + core/builder.go | 2 +- core/commands/ipnsps.go | 163 ++++++++++ core/commands/mount_windows.go | 2 +- core/commands/name.go | 1 + core/core.go | 11 +- core/corehttp/gateway_test.go | 4 + namesys/interface.go | 8 + namesys/namesys.go | 140 +++++++-- namesys/namesys_test.go | 4 +- namesys/pubsub.go | 430 ++++++++++++++++++++++++++ namesys/pubsub_test.go | 187 +++++++++++ test/sharness/t0183-namesys-pubsub.sh | 80 +++++ 13 files changed, 1009 insertions(+), 27 deletions(-) create mode 100644 core/commands/ipnsps.go create mode 100644 namesys/pubsub.go create mode 100644 namesys/pubsub_test.go create mode 100755 test/sharness/t0183-namesys-pubsub.sh diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index bc6c2d9d8..9f8776328 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -46,6 +46,7 @@ const ( unrestrictedApiAccessKwd = "unrestricted-api" writableKwd = "writable" enableFloodSubKwd = "enable-pubsub-experiment" + enableIPNSPubSubKwd = "enable-namesys-pubsub" enableMultiplexKwd = "enable-mplex-experiment" // apiAddrKwd = "address-api" // swarmAddrKwd = "address-swarm" @@ -157,6 +158,7 @@ Headers. cmdkit.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API."), cmdkit.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."), cmdkit.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."), + cmdkit.BoolOption(enableIPNSPubSubKwd, "Enable IPNS record distribution through pubsub; enables pubsub."), cmdkit.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction.").WithDefault(true), // TODO: add way to override addresses. tricky part: updating the config if also --init. @@ -283,6 +285,7 @@ func daemonFunc(req cmds.Request, re cmds.ResponseEmitter) { offline, _, _ := req.Option(offlineKwd).Bool() pubsub, _, _ := req.Option(enableFloodSubKwd).Bool() + ipnsps, _, _ := req.Option(enableIPNSPubSubKwd).Bool() mplex, _, _ := req.Option(enableMultiplexKwd).Bool() // Start assembling node config @@ -292,6 +295,7 @@ func daemonFunc(req cmds.Request, re cmds.ResponseEmitter) { Online: !offline, ExtraOpts: map[string]bool{ "pubsub": pubsub, + "ipnsps": ipnsps, "mplex": mplex, }, //TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral diff --git a/core/builder.go b/core/builder.go index 065c9cbd8..d3cdcc1e6 100644 --- a/core/builder.go +++ b/core/builder.go @@ -210,7 +210,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { if cfg.Online { do := setupDiscoveryOption(rcfg.Discovery) - if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("mplex")); err != nil { + if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil { return err } } else { diff --git a/core/commands/ipnsps.go b/core/commands/ipnsps.go new file mode 100644 index 000000000..d8c9046ae --- /dev/null +++ b/core/commands/ipnsps.go @@ -0,0 +1,163 @@ +package commands + +import ( + "errors" + "fmt" + "io" + "strings" + + cmds "github.com/ipfs/go-ipfs/commands" + e "github.com/ipfs/go-ipfs/core/commands/e" + ns "github.com/ipfs/go-ipfs/namesys" + + cmdkit "gx/ipfs/QmUyfy4QSr3NXym4etEiRyxBLqqAeKHJuRdi8AACxg63fZ/go-ipfs-cmdkit" +) + +type ipnsPubsubState struct { + Enabled bool +} + +type ipnsPubsubCancel struct { + Canceled bool +} + +// IpnsPubsubCmd is the subcommand that allows us to manage the IPNS pubsub system +var IpnsPubsubCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "IPNS pubsub management", + ShortDescription: ` +Manage and inspect the state of the IPNS pubsub resolver. + +Note: this command is experimental and subject to change as the system is refined +`, + }, + Subcommands: map[string]*cmds.Command{ + "state": ipnspsStateCmd, + "subs": ipnspsSubsCmd, + "cancel": ipnspsCancelCmd, + }, +} + +var ipnspsStateCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "Query the state of IPNS pubsub", + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + + _, ok := n.Namesys.GetResolver("pubsub") + res.SetOutput(&ipnsPubsubState{ok}) + }, + Type: ipnsPubsubState{}, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + v, err := unwrapOutput(res.Output()) + if err != nil { + return nil, err + } + + output, ok := v.(*ipnsPubsubState) + if !ok { + return nil, e.TypeErr(output, v) + } + + var state string + if output.Enabled { + state = "enabled" + } else { + state = "disabled" + } + + return strings.NewReader(state + "\n"), nil + }, + }, +} + +var ipnspsSubsCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "Show current name subscriptions", + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + + r, ok := n.Namesys.GetResolver("pubsub") + if !ok { + res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient) + return + } + + psr, ok := r.(*ns.PubsubResolver) + if !ok { + res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal) + return + } + + res.SetOutput(&stringList{psr.GetSubscriptions()}) + }, + Type: stringList{}, + Marshalers: cmds.MarshalerMap{ + cmds.Text: stringListMarshaler, + }, +} + +var ipnspsCancelCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "Cancel a name subscription", + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + + r, ok := n.Namesys.GetResolver("pubsub") + if !ok { + res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient) + return + } + + psr, ok := r.(*ns.PubsubResolver) + if !ok { + res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal) + return + } + + ok = psr.Cancel(req.Arguments()[0]) + res.SetOutput(&ipnsPubsubCancel{ok}) + }, + Arguments: []cmdkit.Argument{ + cmdkit.StringArg("name", true, false, "Name to cancel the subscription for."), + }, + Type: ipnsPubsubCancel{}, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + v, err := unwrapOutput(res.Output()) + if err != nil { + return nil, err + } + + output, ok := v.(*ipnsPubsubCancel) + if !ok { + return nil, e.TypeErr(output, v) + } + + var state string + if output.Canceled { + state = "canceled" + } else { + state = "no subscription" + } + + return strings.NewReader(state + "\n"), nil + }, + }, +} diff --git a/core/commands/mount_windows.go b/core/commands/mount_windows.go index e09ba1b9a..f56eb6ed3 100644 --- a/core/commands/mount_windows.go +++ b/core/commands/mount_windows.go @@ -5,7 +5,7 @@ import ( cmds "github.com/ipfs/go-ipfs/commands" - cmdkit "gx/ipfs/QmSNbH2A1evCCbJSDC6u3RV3GGDhgu6pRGbXHvrN89tMKf/go-ipfs-cmdkit" + cmdkit "gx/ipfs/QmUyfy4QSr3NXym4etEiRyxBLqqAeKHJuRdi8AACxg63fZ/go-ipfs-cmdkit" ) var MountCmd = &cmds.Command{ diff --git a/core/commands/name.go b/core/commands/name.go index 982dad295..204a5c251 100644 --- a/core/commands/name.go +++ b/core/commands/name.go @@ -63,5 +63,6 @@ Resolve the value of a dnslink: Subcommands: map[string]*cmds.Command{ "publish": PublishCmd, "resolve": IpnsCmd, + "pubsub": IpnsPubsubCmd, }, } diff --git a/core/core.go b/core/core.go index a1ea823e9..d7545ed97 100644 --- a/core/core.go +++ b/core/core.go @@ -152,7 +152,7 @@ type Mounts struct { Ipns mount.Mount } -func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, mplex bool) error { +func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, ipnsps, mplex bool) error { if n.PeerHost != nil { // already online. return errors.New("node already online") @@ -249,10 +249,17 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return err } - if pubsub { + if pubsub || ipnsps { n.Floodsub = floodsub.NewFloodSub(ctx, peerhost) } + if ipnsps { + err = namesys.AddPubsubNameSystem(ctx, n.Namesys, n.PeerHost, n.Routing, n.Repo.Datastore(), n.Floodsub) + if err != nil { + return err + } + } + n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore) // setup local discovery diff --git a/core/corehttp/gateway_test.go b/core/corehttp/gateway_test.go index 47e5d03cc..518fa678c 100644 --- a/core/corehttp/gateway_test.go +++ b/core/corehttp/gateway_test.go @@ -48,6 +48,10 @@ func (m mockNamesys) PublishWithEOL(ctx context.Context, name ci.PrivKey, value return errors.New("not implemented for mockNamesys") } +func (m mockNamesys) GetResolver(subs string) (namesys.Resolver, bool) { + return nil, false +} + func newNodeWithMockNamesys(ns mockNamesys) (*core.IpfsNode, error) { c := config.Config{ Identity: config.Identity{ diff --git a/namesys/interface.go b/namesys/interface.go index acaec1740..8097ac616 100644 --- a/namesys/interface.go +++ b/namesys/interface.go @@ -70,6 +70,7 @@ var ErrPublishFailed = errors.New("Could not publish name.") type NameSystem interface { Resolver Publisher + ResolverLookup } // Resolver is an object capable of resolving names. @@ -112,3 +113,10 @@ type Publisher interface { // call once the records spec is implemented PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error } + +// ResolverLookup is an object capable of finding resolvers for a subsystem +type ResolverLookup interface { + + // GetResolver retrieves a resolver associated with a subsystem + GetResolver(subs string) (Resolver, bool) +} diff --git a/namesys/namesys.go b/namesys/namesys.go index d1cda0870..82952f250 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -2,14 +2,20 @@ package namesys import ( "context" + "errors" "strings" + "sync" "time" path "github.com/ipfs/go-ipfs/path" routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing" + p2phost "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host" + mh "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash" + floodsub "gx/ipfs/QmVNv1WV6XxzQV4MBuiLX5729wMazaf8TNzm2Sq6ejyHh7/go-libp2p-floodsub" ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore" peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" + isd "gx/ipfs/QmZmmuAXgX73UQmX1jRKjTGmjzq24Jinqkq8vzkBtno4uX/go-is-domain" ci "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto" ) @@ -36,11 +42,28 @@ func NewNameSystem(r routing.ValueStore, ds ds.Datastore, cachesize int) NameSys "dht": NewRoutingResolver(r, cachesize), }, publishers: map[string]Publisher{ - "/ipns/": NewRoutingPublisher(r, ds), + "dht": NewRoutingPublisher(r, ds), }, } } +// AddPubsubNameSystem adds the pubsub publisher and resolver to the namesystem +func AddPubsubNameSystem(ctx context.Context, ns NameSystem, host p2phost.Host, r routing.IpfsRouting, ds ds.Datastore, ps *floodsub.PubSub) error { + mpns, ok := ns.(*mpns) + if !ok { + return errors.New("unexpected NameSystem; not an mpns instance") + } + + pkf, ok := r.(routing.PubKeyFetcher) + if !ok { + return errors.New("unexpected IpfsRouting; not a PubKeyFetcher instance") + } + + mpns.resolvers["pubsub"] = NewPubsubResolver(ctx, host, r, pkf, ps) + mpns.publishers["pubsub"] = NewPubsubPublisher(ctx, host, ds, r, ps) + return nil +} + const DefaultResolverCacheTTL = time.Minute // Resolve implements Resolver. @@ -72,38 +95,100 @@ func (ns *mpns) resolveOnce(ctx context.Context, name string) (path.Path, error) return "", ErrResolveFailed } - for protocol, resolver := range ns.resolvers { - log.Debugf("Attempting to resolve %s with %s", segments[2], protocol) - p, err := resolver.resolveOnce(ctx, segments[2]) - if err == nil { - if len(segments) > 3 { - return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) - } else { - return p, err - } + makePath := func(p path.Path) (path.Path, error) { + if len(segments) > 3 { + return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) + } else { + return p, nil } } + + // Resolver selection: + // 1. if it is a multihash resolve through "pubsub" (if available), + // with fallback to "dht" + // 2. if it is a domain name, resolve through "dns" + // 3. otherwise resolve through the "proquint" resolver + key := segments[2] + + _, err := mh.FromB58String(key) + if err == nil { + res, ok := ns.resolvers["pubsub"] + if ok { + p, err := res.resolveOnce(ctx, key) + if err == nil { + return makePath(p) + } + } + + res, ok = ns.resolvers["dht"] + if ok { + p, err := res.resolveOnce(ctx, key) + if err == nil { + return makePath(p) + } + } + + return "", ErrResolveFailed + } + + if isd.IsDomain(key) { + res, ok := ns.resolvers["dns"] + if ok { + p, err := res.resolveOnce(ctx, key) + if err == nil { + return makePath(p) + } + } + + return "", ErrResolveFailed + } + + res, ok := ns.resolvers["proquint"] + if ok { + p, err := res.resolveOnce(ctx, key) + if err == nil { + return makePath(p) + } + + return "", ErrResolveFailed + } + log.Warningf("No resolver found for %s", name) return "", ErrResolveFailed } // Publish implements Publisher func (ns *mpns) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error { - err := ns.publishers["/ipns/"].Publish(ctx, name, value) - if err != nil { - return err - } - ns.addToDHTCache(name, value, time.Now().Add(DefaultRecordTTL)) - return nil + return ns.PublishWithEOL(ctx, name, value, time.Now().Add(DefaultRecordTTL)) } func (ns *mpns) PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error { - err := ns.publishers["/ipns/"].PublishWithEOL(ctx, name, value, eol) - if err != nil { - return err + var dhtErr error + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + dhtErr = ns.publishers["dht"].PublishWithEOL(ctx, name, value, eol) + if dhtErr == nil { + ns.addToDHTCache(name, value, eol) + } + wg.Done() + }() + + pub, ok := ns.publishers["pubsub"] + if ok { + wg.Add(1) + go func() { + err := pub.PublishWithEOL(ctx, name, value, eol) + if err != nil { + log.Warningf("error publishing %s with pubsub: %s", name, err.Error()) + } + wg.Done() + }() } - ns.addToDHTCache(name, value, eol) - return nil + + wg.Wait() + return dhtErr } func (ns *mpns) addToDHTCache(key ci.PrivKey, value path.Path, eol time.Time) { @@ -138,3 +223,16 @@ func (ns *mpns) addToDHTCache(key ci.PrivKey, value path.Path, eol time.Time) { eol: eol, }) } + +// GetResolver implements ResolverLookup +func (ns *mpns) GetResolver(subs string) (Resolver, bool) { + res, ok := ns.resolvers[subs] + if ok { + ires, ok := res.(Resolver) + if ok { + return ires, true + } + } + + return nil, false +} diff --git a/namesys/namesys_test.go b/namesys/namesys_test.go index 1507f5510..78396c25e 100644 --- a/namesys/namesys_test.go +++ b/namesys/namesys_test.go @@ -58,8 +58,8 @@ func mockResolverTwo() *mockResolver { func TestNamesysResolution(t *testing.T) { r := &mpns{ resolvers: map[string]resolver{ - "one": mockResolverOne(), - "two": mockResolverTwo(), + "dht": mockResolverOne(), + "dns": mockResolverTwo(), }, } diff --git a/namesys/pubsub.go b/namesys/pubsub.go new file mode 100644 index 000000000..6c1284f27 --- /dev/null +++ b/namesys/pubsub.go @@ -0,0 +1,430 @@ +package namesys + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + pb "github.com/ipfs/go-ipfs/namesys/pb" + path "github.com/ipfs/go-ipfs/path" + dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help" + + cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" + routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing" + pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore" + p2phost "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host" + u "gx/ipfs/QmSU6eubNdhXjFBJBSksTp8kv8YRub8mGAPv8tVJHmL2EU/go-ipfs-util" + mh "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash" + floodsub "gx/ipfs/QmVNv1WV6XxzQV4MBuiLX5729wMazaf8TNzm2Sq6ejyHh7/go-libp2p-floodsub" + ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore" + dssync "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/sync" + peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" + proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" + ci "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto" + record "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record" + dhtpb "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record/pb" +) + +// PubsubPublisher is a publisher that distributes IPNS records through pubsub +type PubsubPublisher struct { + ctx context.Context + ds ds.Datastore + host p2phost.Host + cr routing.ContentRouting + ps *floodsub.PubSub + + mx sync.Mutex + subs map[string]struct{} +} + +// PubsubResolver is a resolver that receives IPNS records through pubsub +type PubsubResolver struct { + ctx context.Context + ds ds.Datastore + host p2phost.Host + cr routing.ContentRouting + pkf routing.PubKeyFetcher + ps *floodsub.PubSub + + mx sync.Mutex + subs map[string]*floodsub.Subscription +} + +// NewPubsubPublisher constructs a new Publisher that publishes IPNS records through pubsub. +// The constructor interface is complicated by the need to bootstrap the pubsub topic. +// This could be greatly simplified if the pubsub implementation handled bootstrap itself +func NewPubsubPublisher(ctx context.Context, host p2phost.Host, ds ds.Datastore, cr routing.ContentRouting, ps *floodsub.PubSub) *PubsubPublisher { + return &PubsubPublisher{ + ctx: ctx, + ds: ds, + host: host, // needed for pubsub bootstrap + cr: cr, // needed for pubsub bootstrap + ps: ps, + subs: make(map[string]struct{}), + } +} + +// NewPubsubResolver constructs a new Resolver that resolves IPNS records through pubsub. +// same as above for pubsub bootstrap dependencies +func NewPubsubResolver(ctx context.Context, host p2phost.Host, cr routing.ContentRouting, pkf routing.PubKeyFetcher, ps *floodsub.PubSub) *PubsubResolver { + return &PubsubResolver{ + ctx: ctx, + ds: dssync.MutexWrap(ds.NewMapDatastore()), + host: host, // needed for pubsub bootstrap + cr: cr, // needed for pubsub bootstrap + pkf: pkf, + ps: ps, + subs: make(map[string]*floodsub.Subscription), + } +} + +// Publish publishes an IPNS record through pubsub with default TTL +func (p *PubsubPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Path) error { + return p.PublishWithEOL(ctx, k, value, time.Now().Add(DefaultRecordTTL)) +} + +// PublishWithEOL publishes an IPNS record through pubsub +func (p *PubsubPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value path.Path, eol time.Time) error { + id, err := peer.IDFromPrivateKey(k) + if err != nil { + return err + } + + _, ipnskey := IpnsKeysForID(id) + + seqno, err := p.getPreviousSeqNo(ctx, ipnskey) + if err != nil { + return err + } + + seqno++ + + return p.publishRecord(ctx, k, value, seqno, eol, ipnskey, id) +} + +func (p *PubsubPublisher) getPreviousSeqNo(ctx context.Context, ipnskey string) (uint64, error) { + // the datastore is shared with the routing publisher to properly increment and persist + // ipns record sequence numbers. + prevrec, err := p.ds.Get(dshelp.NewKeyFromBinary([]byte(ipnskey))) + if err != nil { + if err == ds.ErrNotFound { + // None found, lets start at zero! + return 0, nil + } + return 0, err + } + + prbytes, ok := prevrec.([]byte) + if !ok { + return 0, fmt.Errorf("unexpected type returned from datastore: %#v", prevrec) + } + + var dsrec dhtpb.Record + err = proto.Unmarshal(prbytes, &dsrec) + if err != nil { + return 0, err + } + + var entry pb.IpnsEntry + err = proto.Unmarshal(dsrec.GetValue(), &entry) + if err != nil { + return 0, err + } + + return entry.GetSequence(), nil +} + +func (p *PubsubPublisher) publishRecord(ctx context.Context, k ci.PrivKey, value path.Path, seqno uint64, eol time.Time, ipnskey string, ID peer.ID) error { + entry, err := CreateRoutingEntryData(k, value, seqno, eol) + if err != nil { + return err + } + + data, err := proto.Marshal(entry) + if err != nil { + return err + } + + // the datastore is shared with the routing publisher to properly increment and persist + // ipns record sequence numbers; so we need to Record our new entry in the datastore + dsrec, err := record.MakePutRecord(k, ipnskey, data, true) + if err != nil { + return err + } + + dsdata, err := proto.Marshal(dsrec) + if err != nil { + return err + } + + err = p.ds.Put(dshelp.NewKeyFromBinary([]byte(ipnskey)), dsdata) + if err != nil { + return err + } + + // now we publish, but we also need to bootstrap pubsub for our messages to propagate + topic := "/ipns/" + ID.Pretty() + + p.mx.Lock() + _, ok := p.subs[topic] + + if !ok { + p.subs[topic] = struct{}{} + p.mx.Unlock() + + bootstrapPubsub(p.ctx, p.cr, p.host, topic) + } else { + p.mx.Unlock() + } + + log.Debugf("PubsubPublish: publish IPNS record for %s (%d)", topic, seqno) + return p.ps.Publish(topic, data) +} + +// Resolve resolves a name through pubsub and default depth limit +func (r *PubsubResolver) Resolve(ctx context.Context, name string) (path.Path, error) { + return r.ResolveN(ctx, name, DefaultDepthLimit) +} + +// ResolveN resolves a name through pubsub with the specified depth limit +func (r *PubsubResolver) ResolveN(ctx context.Context, name string, depth int) (path.Path, error) { + return resolve(ctx, r, name, depth, "/ipns/") +} + +func (r *PubsubResolver) resolveOnce(ctx context.Context, name string) (path.Path, error) { + log.Debugf("PubsubResolve: resolve '%s'", name) + + // retrieve the public key once (for verifying messages) + xname := strings.TrimPrefix(name, "/ipns/") + hash, err := mh.FromB58String(xname) + if err != nil { + log.Warningf("PubsubResolve: bad input hash: [%s]", xname) + return "", err + } + + id := peer.ID(hash) + if r.host.Peerstore().PrivKey(id) != nil { + return "", errors.New("Cannot resolve own name through pubsub") + } + + pubk := id.ExtractPublicKey() + if pubk == nil { + pubk, err = r.pkf.GetPublicKey(ctx, id) + if err != nil { + log.Warningf("PubsubResolve: error fetching public key: %s [%s]", err.Error(), xname) + return "", err + } + } + + // the topic is /ipns/Qmhash + if !strings.HasPrefix(name, "/ipns/") { + name = "/ipns/" + name + } + + r.mx.Lock() + // see if we already have a pubsub subscription; if not, subscribe + sub, ok := r.subs[name] + if !ok { + sub, err = r.ps.Subscribe(name) + if err != nil { + r.mx.Unlock() + return "", err + } + + log.Debugf("PubsubResolve: subscribed to %s", name) + + r.subs[name] = sub + + ctx, cancel := context.WithCancel(r.ctx) + go r.handleSubscription(sub, name, pubk, cancel) + go bootstrapPubsub(ctx, r.cr, r.host, name) + } + r.mx.Unlock() + + // resolve to what we may already have in the datastore + dsval, err := r.ds.Get(dshelp.NewKeyFromBinary([]byte(name))) + if err != nil { + if err == ds.ErrNotFound { + return "", ErrResolveFailed + } + return "", err + } + + data := dsval.([]byte) + entry := new(pb.IpnsEntry) + + err = proto.Unmarshal(data, entry) + if err != nil { + return "", err + } + + // check EOL; if the entry has expired, delete from datastore and return ds.ErrNotFound + eol, ok := checkEOL(entry) + if ok && eol.Before(time.Now()) { + err = r.ds.Delete(dshelp.NewKeyFromBinary([]byte(name))) + if err != nil { + log.Warningf("PubsubResolve: error deleting stale value for %s: %s", name, err.Error()) + } + + return "", ErrResolveFailed + } + + value, err := path.ParsePath(string(entry.GetValue())) + return value, err +} + +// GetSubscriptions retrieves a list of active topic subscriptions +func (r *PubsubResolver) GetSubscriptions() []string { + r.mx.Lock() + defer r.mx.Unlock() + + var res []string + for sub := range r.subs { + res = append(res, sub) + } + + return res +} + +// Cancel cancels a topic subscription; returns true if an active +// subscription was canceled +func (r *PubsubResolver) Cancel(name string) bool { + r.mx.Lock() + defer r.mx.Unlock() + + sub, ok := r.subs[name] + if ok { + sub.Cancel() + delete(r.subs, name) + } + + return ok +} + +func (r *PubsubResolver) handleSubscription(sub *floodsub.Subscription, name string, pubk ci.PubKey, cancel func()) { + defer sub.Cancel() + defer cancel() + + for { + msg, err := sub.Next(r.ctx) + if err != nil { + if err != context.Canceled { + log.Warningf("PubsubResolve: subscription error in %s: %s", name, err.Error()) + } + return + } + + err = r.receive(msg, name, pubk) + if err != nil { + log.Warningf("PubsubResolve: error proessing update for %s: %s", name, err.Error()) + } + } +} + +func (r *PubsubResolver) receive(msg *floodsub.Message, name string, pubk ci.PubKey) error { + data := msg.GetData() + if data == nil { + return errors.New("empty message") + } + + entry := new(pb.IpnsEntry) + err := proto.Unmarshal(data, entry) + if err != nil { + return err + } + + ok, err := pubk.Verify(ipnsEntryDataForSig(entry), entry.GetSignature()) + if err != nil || !ok { + return errors.New("signature verification failed") + } + + _, err = path.ParsePath(string(entry.GetValue())) + if err != nil { + return err + } + + eol, ok := checkEOL(entry) + if ok && eol.Before(time.Now()) { + return errors.New("stale update; EOL exceeded") + } + + // check the sequence number against what we may already have in our datastore + oval, err := r.ds.Get(dshelp.NewKeyFromBinary([]byte(name))) + if err == nil { + odata := oval.([]byte) + oentry := new(pb.IpnsEntry) + + err = proto.Unmarshal(odata, oentry) + if err != nil { + return err + } + + if entry.GetSequence() <= oentry.GetSequence() { + return errors.New("stale update; sequence number too small") + } + } + + log.Debugf("PubsubResolve: receive IPNS record for %s", name) + + return r.ds.Put(dshelp.NewKeyFromBinary([]byte(name)), data) +} + +// rendezvous with peers in the name topic through provider records +// Note: rendezbous/boostrap should really be handled by the pubsub implementation itself! +func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phost.Host, name string) { + topic := "floodsub:" + name + hash := u.Hash([]byte(topic)) + rz := cid.NewCidV1(cid.Raw, hash) + + err := cr.Provide(ctx, rz, true) + if err != nil { + log.Warningf("bootstrapPubsub: error providing rendezvous for %s: %s", topic, err.Error()) + } + + go func() { + for { + select { + case <-time.After(8 * time.Hour): + err := cr.Provide(ctx, rz, true) + if err != nil { + log.Warningf("bootstrapPubsub: error providing rendezvous for %s: %s", topic, err.Error()) + } + case <-ctx.Done(): + return + } + } + }() + + rzctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + wg := &sync.WaitGroup{} + for pi := range cr.FindProvidersAsync(rzctx, rz, 10) { + if pi.ID == host.ID() { + continue + } + wg.Add(1) + go func(pi pstore.PeerInfo) { + defer wg.Done() + + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + err := host.Connect(ctx, pi) + if err != nil { + log.Debugf("Error connecting to pubsub peer %s: %s", pi.ID, err.Error()) + return + } + + // delay to let pubsub perform its handshake + time.Sleep(time.Millisecond * 250) + + log.Debugf("Connected to pubsub peer %s", pi.ID) + }(pi) + } + + wg.Wait() +} diff --git a/namesys/pubsub_test.go b/namesys/pubsub_test.go new file mode 100644 index 000000000..a7fa7be9b --- /dev/null +++ b/namesys/pubsub_test.go @@ -0,0 +1,187 @@ +package namesys + +import ( + "context" + "sync" + "testing" + "time" + + path "github.com/ipfs/go-ipfs/path" + mockrouting "github.com/ipfs/go-ipfs/routing/mock" + + routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing" + pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore" + testutil "gx/ipfs/QmQgLZP9haZheimMHqqAjJh2LhRmNfEoZDfbtkpeMhi9xK/go-testutil" + p2phost "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host" + netutil "gx/ipfs/QmUUNDRYXgfqdjxTg79ogkciczU5y4WY1tKMU2vEX9CRN7/go-libp2p-netutil" + floodsub "gx/ipfs/QmVNv1WV6XxzQV4MBuiLX5729wMazaf8TNzm2Sq6ejyHh7/go-libp2p-floodsub" + ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore" + peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" + ci "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto" + bhost "gx/ipfs/Qmb37wDRoh9VZMZXmmZktN35szvj9GeBYDtA9giDmXwwd7/go-libp2p-blankhost" +) + +func newNetHost(ctx context.Context, t *testing.T) p2phost.Host { + netw := netutil.GenSwarmNetwork(t, ctx) + return bhost.NewBlankHost(netw) +} + +func newNetHosts(ctx context.Context, t *testing.T, n int) []p2phost.Host { + var out []p2phost.Host + + for i := 0; i < n; i++ { + h := newNetHost(ctx, t) + out = append(out, h) + } + + return out +} + +// PubKeyFetcher implementation with a global key store +type mockKeyStore struct { + keys map[peer.ID]ci.PubKey + mx sync.Mutex +} + +func (m *mockKeyStore) addPubKey(id peer.ID, pkey ci.PubKey) { + m.mx.Lock() + defer m.mx.Unlock() + m.keys[id] = pkey +} + +func (m *mockKeyStore) getPubKey(id peer.ID) (ci.PubKey, error) { + m.mx.Lock() + defer m.mx.Unlock() + pkey, ok := m.keys[id] + if ok { + return pkey, nil + } + + return nil, routing.ErrNotFound +} + +func (m *mockKeyStore) GetPublicKey(ctx context.Context, id peer.ID) (ci.PubKey, error) { + return m.getPubKey(id) +} + +func newMockKeyStore() *mockKeyStore { + return &mockKeyStore{ + keys: make(map[peer.ID]ci.PubKey), + } +} + +// ConentRouting mock +func newMockRouting(ms mockrouting.Server, ks *mockKeyStore, host p2phost.Host) routing.ContentRouting { + id := host.ID() + + privk := host.Peerstore().PrivKey(id) + pubk := host.Peerstore().PubKey(id) + pi := host.Peerstore().PeerInfo(id) + + ks.addPubKey(id, pubk) + return ms.Client(testutil.NewIdentity(id, pi.Addrs[0], privk, pubk)) +} + +func newMockRoutingForHosts(ms mockrouting.Server, ks *mockKeyStore, hosts []p2phost.Host) []routing.ContentRouting { + rs := make([]routing.ContentRouting, len(hosts)) + for i := 0; i < len(hosts); i++ { + rs[i] = newMockRouting(ms, ks, hosts[i]) + } + return rs +} + +// tests +func TestPubsubPublishSubscribe(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ms := mockrouting.NewServer() + ks := newMockKeyStore() + + pubhost := newNetHost(ctx, t) + pubmr := newMockRouting(ms, ks, pubhost) + pub := NewPubsubPublisher(ctx, pubhost, ds.NewMapDatastore(), pubmr, floodsub.NewFloodSub(ctx, pubhost)) + privk := pubhost.Peerstore().PrivKey(pubhost.ID()) + pubpinfo := pstore.PeerInfo{ID: pubhost.ID(), Addrs: pubhost.Addrs()} + + name := "/ipns/" + pubhost.ID().Pretty() + + reshosts := newNetHosts(ctx, t, 5) + resmrs := newMockRoutingForHosts(ms, ks, reshosts) + res := make([]*PubsubResolver, len(reshosts)) + for i := 0; i < len(res); i++ { + res[i] = NewPubsubResolver(ctx, reshosts[i], resmrs[i], ks, floodsub.NewFloodSub(ctx, reshosts[i])) + if err := reshosts[i].Connect(ctx, pubpinfo); err != nil { + t.Fatal(err) + } + } + + time.Sleep(time.Millisecond * 100) + for i := 0; i < len(res); i++ { + checkResolveNotFound(ctx, t, i, res[i], name) + // delay to avoid connection storms + time.Sleep(time.Millisecond * 100) + } + + // let the bootstrap finish + time.Sleep(time.Second * 1) + + val := path.Path("/ipfs/QmP1DfoUjiWH2ZBo1PBH6FupdBucbDepx3HpWmEY6JMUpY") + err := pub.Publish(ctx, privk, val) + if err != nil { + t.Fatal(err) + } + + // let the flood propagate + time.Sleep(time.Second * 1) + for i := 0; i < len(res); i++ { + checkResolve(ctx, t, i, res[i], name, val) + } + + val = path.Path("/ipfs/QmP1wMAqk6aZYRZirbaAwmrNeqFRgQrwBt3orUtvSa1UYD") + err = pub.Publish(ctx, privk, val) + if err != nil { + t.Fatal(err) + } + + // let the flood propagate + time.Sleep(time.Second * 1) + for i := 0; i < len(res); i++ { + checkResolve(ctx, t, i, res[i], name, val) + } + + // cancel subscriptions + for i := 0; i < len(res); i++ { + res[i].Cancel(name) + } + time.Sleep(time.Millisecond * 100) + + nval := path.Path("/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr") + err = pub.Publish(ctx, privk, nval) + if err != nil { + t.Fatal(err) + } + + // check we still have the old value in the resolver + time.Sleep(time.Second * 1) + for i := 0; i < len(res); i++ { + checkResolve(ctx, t, i, res[i], name, val) + } +} + +func checkResolveNotFound(ctx context.Context, t *testing.T, i int, resolver Resolver, name string) { + _, err := resolver.Resolve(ctx, name) + if err != ErrResolveFailed { + t.Fatalf("[resolver %d] unexpected error: %s", i, err.Error()) + } +} + +func checkResolve(ctx context.Context, t *testing.T, i int, resolver Resolver, name string, val path.Path) { + xval, err := resolver.Resolve(ctx, name) + if err != nil { + t.Fatalf("[resolver %d] resolve failed: %s", i, err.Error()) + } + if xval != val { + t.Fatalf("[resolver %d] unexpected value: %s %s", i, val, xval) + } +} diff --git a/test/sharness/t0183-namesys-pubsub.sh b/test/sharness/t0183-namesys-pubsub.sh new file mode 100755 index 000000000..f4684af4c --- /dev/null +++ b/test/sharness/t0183-namesys-pubsub.sh @@ -0,0 +1,80 @@ +#!/bin/sh + +test_description="Test IPNS pubsub" + +. lib/test-lib.sh + +# start iptb + wait for peering +NUM_NODES=5 +test_expect_success 'init iptb' ' + iptb init -n $NUM_NODES --bootstrap=none --port=0 +' + +startup_cluster $NUM_NODES --enable-namesys-pubsub + +test_expect_success 'peer ids' ' + PEERID_0=$(iptb get id 0) +' + +test_expect_success 'check namesys pubsub state' ' + echo enabled > expected && + ipfsi 0 name pubsub state > state0 && + ipfsi 1 name pubsub state > state1 && + ipfsi 2 name pubsub state > state2 && + test_cmp expected state0 && + test_cmp expected state1 && + test_cmp expected state2 +' + +test_expect_success 'subscribe nodes to the publisher topic' ' + ipfsi 1 name resolve /ipns/$PEERID_0 && + ipfsi 2 name resolve /ipns/$PEERID_0 +' + +test_expect_success 'check subscriptions' ' + echo /ipns/$PEERID_0 > expected && + ipfsi 1 name pubsub subs > subs1 && + ipfsi 2 name pubsub subs > subs2 && + test_cmp expected subs1 && + test_cmp expected subs2 +' + +test_expect_success 'add an obect on publisher node' ' + echo "ipns is super fun" > file && + HASH_FILE=$(ipfsi 0 add -q file) +' + +test_expect_success 'publish that object as an ipns entry' ' + ipfsi 0 name publish $HASH_FILE +' + +test_expect_success 'wait for the flood' ' + sleep 1 +' + +test_expect_success 'resolve name in subscriber nodes' ' + echo "/ipfs/$HASH_FILE" > expected && + ipfsi 1 name resolve /ipns/$PEERID_0 > name1 && + ipfsi 2 name resolve /ipns/$PEERID_0 > name2 && + test_cmp expected name1 && + test_cmp expected name2 +' + +test_expect_success 'cancel subscriptions to the publisher topic' ' + ipfsi 1 name pubsub cancel /ipns/$PEERID_0 && + ipfsi 2 name pubsub cancel /ipns/$PEERID_0 +' + +test_expect_success 'check subscriptions' ' + rm -f expected && touch expected && + ipfsi 1 name pubsub subs > subs1 && + ipfsi 2 name pubsub subs > subs2 && + test_cmp expected subs1 && + test_cmp expected subs2 +' + +test_expect_success "shut down iptb" ' + iptb stop +' + +test_done