diff --git a/core/commands/ipnsps.go b/core/commands/ipnsps.go index a13d48aa3..9b10b3641 100644 --- a/core/commands/ipnsps.go +++ b/core/commands/ipnsps.go @@ -2,15 +2,15 @@ package commands import ( "errors" - "fmt" "io" "strings" cmds "github.com/ipfs/go-ipfs/commands" e "github.com/ipfs/go-ipfs/core/commands/e" - ns "github.com/ipfs/go-ipfs/namesys" - "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit" + record "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record" + peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" + cmdkit "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit" ) type ipnsPubsubState struct { @@ -49,8 +49,7 @@ var ipnspsStateCmd = &cmds.Command{ return } - _, ok := n.Namesys.GetResolver("pubsub") - res.SetOutput(&ipnsPubsubState{ok}) + res.SetOutput(&ipnsPubsubState{n.PSRouter != nil}) }, Type: ipnsPubsubState{}, Marshalers: cmds.MarshalerMap{ @@ -88,19 +87,26 @@ var ipnspsSubsCmd = &cmds.Command{ return } - r, ok := n.Namesys.GetResolver("pubsub") - if !ok { + if n.PSRouter == nil { res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient) return } - - psr, ok := r.(*ns.PubsubResolver) - if !ok { - res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal) - return + var paths []string + for _, key := range n.PSRouter.GetSubscriptions() { + ns, k, err := record.SplitKey(key) + if err != nil || ns != "ipns" { + // Not necessarily an error. + continue + } + pid, err := peer.IDFromString(k) + if err != nil { + log.Errorf("ipns key not a valid peer ID: %s", err) + continue + } + paths = append(paths, "/ipns/"+peer.IDB58Encode(pid)) } - res.SetOutput(&stringList{psr.GetSubscriptions()}) + res.SetOutput(&stringList{paths}) }, Type: stringList{}, Marshalers: cmds.MarshalerMap{ @@ -119,19 +125,20 @@ var ipnspsCancelCmd = &cmds.Command{ return } - r, ok := n.Namesys.GetResolver("pubsub") - if !ok { + if n.PSRouter == nil { res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient) return } - psr, ok := r.(*ns.PubsubResolver) - if !ok { - res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal) + name := req.Arguments()[0] + name = strings.TrimPrefix(name, "/ipns/") + pid, err := peer.IDB58Decode(name) + if err != nil { + res.SetError(err, cmdkit.ErrClient) return } - ok = psr.Cancel(req.Arguments()[0]) + ok := n.PSRouter.Cancel("/ipns/" + string(pid)) res.SetOutput(&ipnsPubsubCancel{ok}) }, Arguments: []cmdkit.Argument{ diff --git a/core/core.go b/core/core.go index 3d78b1b24..28ae84ba0 100644 --- a/core/core.go +++ b/core/core.go @@ -48,6 +48,7 @@ import ( mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" logging "gx/ipfs/QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7/go-log" addrutil "gx/ipfs/QmTGSre9j1otFgsr1opCUQDXTPSM6BTZnMWwPeA5nYJM7w/go-addr-util" + record "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record" routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing" floodsub "gx/ipfs/QmVKrsEgixRtMWcMd6WQzuwqCUC3jfLf7Q7xcjnKoMMikS/go-libp2p-floodsub" mssmux "gx/ipfs/QmVniQJkdzLZaZwzwMdd3dJTvWiJ1DQEkreVy6hs6h7Vk5/go-smux-multistream" @@ -65,13 +66,16 @@ import ( peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" dht "gx/ipfs/Qmd3jqhBQFvhfBNTSJMQL15GgyVMpdxKTta69Napvx6Myd/go-libp2p-kad-dht" + dhtopts "gx/ipfs/Qmd3jqhBQFvhfBNTSJMQL15GgyVMpdxKTta69Napvx6Myd/go-libp2p-kad-dht/opts" ipnet "gx/ipfs/Qmd3oYWVLCVWryDV6Pobv6whZcvDXAHqS3chemZ658y4a8/go-libp2p-interface-pnet" + psrouter "gx/ipfs/QmdSX2uedxXdsfNSqbkfSxcYi7pXBBdvp1Km2ZsjPWfydt/go-libp2p-pubsub-router" exchange "gx/ipfs/QmdcAXgEHUueP4A7b5hjabKn2EooeHgMreMvFC249dGCgc/go-ipfs-exchange-interface" pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore" ic "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto" ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore" mplex "gx/ipfs/QmenmFuirGzv8S1R3DyvbZ6tFmQapkGeDCebgYzni1Ntn3/go-smux-multiplex" + rhelpers "gx/ipfs/QmeoG1seQ8a9b2vS3XJ8HPz9tXr6dzpS5NizjuDDSntQEk/go-libp2p-routing-helpers" mafilter "gx/ipfs/Qmf2UAmRwDG4TvnkQpHZWPAzw7rpCYVhxmRXmYxXr5LD1g/go-maddr-filter" ifconnmgr "gx/ipfs/QmfQNieWBPwmnUjXWPZbjJPzhNwFFabTb5RQ79dyVWGujQ/go-libp2p-interface-connmgr" p2phost "gx/ipfs/QmfZTdmunzKzAGJrSvXXQbQ5kLLUiEMX5vdwux7iXkdk7D/go-libp2p-host" @@ -135,6 +139,7 @@ type IpfsNode struct { IpnsRepub *ipnsrp.Republisher Floodsub *floodsub.PubSub + PSRouter *psrouter.PubsubValueStore P2P *p2p.P2P proc goprocess.Process @@ -240,7 +245,7 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return err } - if err := n.startOnlineServicesWithHost(ctx, peerhost, routingOption); err != nil { + if err := n.startOnlineServicesWithHost(ctx, peerhost, routingOption, pubsub, ipnsps); err != nil { return err } @@ -249,21 +254,6 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return err } - if pubsub || ipnsps { - service, err := floodsub.NewFloodSub(ctx, peerhost) - if err != nil { - return err - } - n.Floodsub = service - } - - if ipnsps { - err = namesys.AddPubsubNameSystem(ctx, n.Namesys, n.PeerHost, n.Routing, n.Repo.Datastore(), n.Floodsub) - if err != nil { - return err - } - } - n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore) // setup local discovery @@ -437,17 +427,50 @@ func (n *IpfsNode) HandlePeerFound(p pstore.PeerInfo) { // startOnlineServicesWithHost is the set of services which need to be // initialized with the host and _before_ we start listening. -func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption) error { +func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption, pubsub bool, ipnsps bool) error { // setup diagnostics service n.Ping = ping.NewPingService(host) + if pubsub || ipnsps { + service, err := floodsub.NewFloodSub(ctx, host) + if err != nil { + return err + } + n.Floodsub = service + } + + validator := record.NamespacedValidator{ + "pk": record.PublicKeyValidator{}, + "ipns": namesys.IpnsValidator{KeyBook: host.Peerstore()}, + } + // setup routing service - r, err := routingOption(ctx, host, n.Repo.Datastore()) + r, err := routingOption(ctx, host, n.Repo.Datastore(), validator) if err != nil { return err } n.Routing = r + if ipnsps { + n.PSRouter = psrouter.NewPubsubValueStore( + ctx, + host, + n.Routing, + n.Floodsub, + validator, + ) + n.Routing = rhelpers.Tiered{ + // Always check pubsub first. + &rhelpers.Compose{ + ValueStore: &rhelpers.LimitedValueStore{ + ValueStore: n.PSRouter, + Namespaces: []string{"ipns"}, + }, + }, + n.Routing, + } + } + // Wrap standard peer host with routing system to allow unknown peer lookups n.PeerHost = rhost.Wrap(host, n.Routing) @@ -935,21 +958,24 @@ func startListening(host p2phost.Host, cfg *config.Config) error { return nil } -func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching) (routing.IpfsRouting, error) { - dhtRouting := dht.NewDHT(ctx, host, dstore) - dhtRouting.Validator[IpnsValidatorTag] = namesys.NewIpnsRecordValidator(host.Peerstore()) - dhtRouting.Selector[IpnsValidatorTag] = namesys.IpnsSelectorFunc - return dhtRouting, nil +func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) { + return dht.New( + ctx, host, + dhtopts.Datastore(dstore), + dhtopts.Validator(validator), + ) } -func constructClientDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching) (routing.IpfsRouting, error) { - dhtRouting := dht.NewDHTClient(ctx, host, dstore) - dhtRouting.Validator[IpnsValidatorTag] = namesys.NewIpnsRecordValidator(host.Peerstore()) - dhtRouting.Selector[IpnsValidatorTag] = namesys.IpnsSelectorFunc - return dhtRouting, nil +func constructClientDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) { + return dht.New( + ctx, host, + dhtopts.Client(true), + dhtopts.Datastore(dstore), + dhtopts.Validator(validator), + ) } -type RoutingOption func(context.Context, p2phost.Host, ds.Batching) (routing.IpfsRouting, error) +type RoutingOption func(context.Context, p2phost.Host, ds.Batching, record.Validator) (routing.IpfsRouting, error) type DiscoveryOption func(context.Context, p2phost.Host) (discovery.Service, error) diff --git a/namesys/interface.go b/namesys/interface.go index fcd619b49..6536ac712 100644 --- a/namesys/interface.go +++ b/namesys/interface.go @@ -61,7 +61,6 @@ var ErrPublishFailed = errors.New("could not publish name") type NameSystem interface { Resolver Publisher - ResolverLookup } // Resolver is an object capable of resolving names. @@ -95,10 +94,3 @@ type Publisher interface { // call once the records spec is implemented PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error } - -// ResolverLookup is an object capable of finding resolvers for a subsystem -type ResolverLookup interface { - - // GetResolver retrieves a resolver associated with a subsystem - GetResolver(subs string) (Resolver, bool) -} diff --git a/namesys/ipns_validate_test.go b/namesys/ipns_validate_test.go index 44149180e..bcffcc5e6 100644 --- a/namesys/ipns_validate_test.go +++ b/namesys/ipns_validate_test.go @@ -12,8 +12,8 @@ import ( u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util" mockrouting "gx/ipfs/QmPuPdzoG4b5uyYSQCjLEHB8NM593m3BW19UHX2jZ6Wzfm/go-ipfs-routing/mock" record "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record" - recordpb "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record/pb" routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing" + ropts "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing/options" testutil "gx/ipfs/QmUJzxQQ2kzwQubsMqBTr1NGDpLfh7pGA2E1oaJULcKDPq/go-testutil" proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" @@ -23,8 +23,10 @@ import ( dssync "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/sync" ) -func testValidatorCase(t *testing.T, priv ci.PrivKey, kbook pstore.KeyBook, ns string, key string, val []byte, eol time.Time, exp error) { - validChecker := NewIpnsRecordValidator(kbook) +func testValidatorCase(t *testing.T, priv ci.PrivKey, kbook pstore.KeyBook, key string, val []byte, eol time.Time, exp error) { + t.Helper() + + validator := IpnsValidator{kbook} p := path.Path("/ipfs/QmfM2r8seH2GiRaC4esTjeraXEachRt8ZsSeGaWTPLyMoG") entry, err := CreateRoutingEntryData(priv, p, 1, eol) @@ -39,15 +41,9 @@ func testValidatorCase(t *testing.T, priv ci.PrivKey, kbook pstore.KeyBook, ns s t.Fatal(err) } } - rec := &record.ValidationRecord{ - Namespace: ns, - Key: key, - Value: data, - } - - err = validChecker.Func(rec) + err = validator.Validate(key, data) if err != exp { - params := fmt.Sprintf("namespace: %s\nkey: %s\neol: %s\n", ns, key, eol) + params := fmt.Sprintf("key: %s\neol: %s\n", key, eol) if exp == nil { t.Fatalf("Unexpected error %s for params %s", err, params) } else if err == nil { @@ -67,15 +63,15 @@ func TestValidator(t *testing.T) { kbook.AddPubKey(id, priv.GetPublic()) emptyKbook := pstore.NewPeerstore() - testValidatorCase(t, priv, kbook, "ipns", string(id), nil, ts.Add(time.Hour), nil) - testValidatorCase(t, priv, kbook, "ipns", string(id), nil, ts.Add(time.Hour*-1), ErrExpiredRecord) - testValidatorCase(t, priv, kbook, "ipns", string(id), []byte("bad data"), ts.Add(time.Hour), ErrBadRecord) - testValidatorCase(t, priv, kbook, "ipns", "bad key", nil, ts.Add(time.Hour), ErrKeyFormat) - testValidatorCase(t, priv, emptyKbook, "ipns", string(id), nil, ts.Add(time.Hour), ErrPublicKeyNotFound) - testValidatorCase(t, priv2, kbook, "ipns", string(id2), nil, ts.Add(time.Hour), ErrPublicKeyNotFound) - testValidatorCase(t, priv2, kbook, "ipns", string(id), nil, ts.Add(time.Hour), ErrSignature) - testValidatorCase(t, priv, kbook, "", string(id), nil, ts.Add(time.Hour), ErrInvalidPath) - testValidatorCase(t, priv, kbook, "wrong", string(id), nil, ts.Add(time.Hour), ErrInvalidPath) + testValidatorCase(t, priv, kbook, "/ipns/"+string(id), nil, ts.Add(time.Hour), nil) + testValidatorCase(t, priv, kbook, "/ipns/"+string(id), nil, ts.Add(time.Hour*-1), ErrExpiredRecord) + testValidatorCase(t, priv, kbook, "/ipns/"+string(id), []byte("bad data"), ts.Add(time.Hour), ErrBadRecord) + testValidatorCase(t, priv, kbook, "/ipns/"+"bad key", nil, ts.Add(time.Hour), ErrKeyFormat) + testValidatorCase(t, priv, emptyKbook, "/ipns/"+string(id), nil, ts.Add(time.Hour), ErrPublicKeyNotFound) + testValidatorCase(t, priv2, kbook, "/ipns/"+string(id2), nil, ts.Add(time.Hour), ErrPublicKeyNotFound) + testValidatorCase(t, priv2, kbook, "/ipns/"+string(id), nil, ts.Add(time.Hour), ErrSignature) + testValidatorCase(t, priv, kbook, "//"+string(id), nil, ts.Add(time.Hour), ErrInvalidPath) + testValidatorCase(t, priv, kbook, "/wrong/"+string(id), nil, ts.Add(time.Hour), ErrInvalidPath) } func TestResolverValidation(t *testing.T) { @@ -85,13 +81,6 @@ func TestResolverValidation(t *testing.T) { peerstore := pstore.NewPeerstore() vstore := newMockValueStore(rid, dstore, peerstore) - vstore.Validator["ipns"] = NewIpnsRecordValidator(peerstore) - vstore.Validator["pk"] = &record.ValidChecker{ - Func: func(r *record.ValidationRecord) error { - return nil - }, - Sign: false, - } resolver := NewRoutingResolver(vstore, 0) // Create entry with expiry in one hour @@ -224,19 +213,19 @@ type mockValueStore struct { func newMockValueStore(id testutil.Identity, dstore ds.Datastore, kbook pstore.KeyBook) *mockValueStore { serv := mockrouting.NewServer() r := serv.ClientWithDatastore(context.Background(), id, dstore) - return &mockValueStore{r, kbook, make(record.Validator)} + return &mockValueStore{r, kbook, record.NamespacedValidator{ + "ipns": IpnsValidator{kbook}, + "pk": record.PublicKeyValidator{}, + }} } -func (m *mockValueStore) GetValue(ctx context.Context, k string) ([]byte, error) { - data, err := m.r.GetValue(ctx, k) +func (m *mockValueStore) GetValue(ctx context.Context, k string, opts ...ropts.Option) ([]byte, error) { + data, err := m.r.GetValue(ctx, k, opts...) if err != nil { return data, err } - rec := new(recordpb.Record) - rec.Key = proto.String(k) - rec.Value = data - if err = m.Validator.VerifyRecord(rec); err != nil { + if err = m.Validator.Validate(k, data); err != nil { return nil, err } @@ -263,23 +252,6 @@ func (m *mockValueStore) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey return pk, m.kbook.AddPubKey(p, pk) } -func (m *mockValueStore) GetValues(ctx context.Context, k string, count int) ([]routing.RecvdVal, error) { - vals, err := m.r.GetValues(ctx, k, count) - if err != nil { - return nil, err - } - valid := make([]routing.RecvdVal, 0, len(vals)) - for _, v := range vals { - rec := new(recordpb.Record) - rec.Key = proto.String(k) - rec.Value = v.Val - if err = m.Validator.VerifyRecord(rec); err == nil { - valid = append(valid, v) - } - } - return valid, nil -} - -func (m *mockValueStore) PutValue(ctx context.Context, k string, d []byte) error { - return m.r.PutValue(ctx, k, d) +func (m *mockValueStore) PutValue(ctx context.Context, k string, d []byte, opts ...ropts.Option) error { + return m.r.PutValue(ctx, k, d, opts...) } diff --git a/namesys/namesys.go b/namesys/namesys.go index 47e1f874f..6e37d1c6f 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -2,7 +2,6 @@ package namesys import ( "context" - "errors" "strings" "sync" "time" @@ -11,13 +10,11 @@ import ( path "github.com/ipfs/go-ipfs/path" routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing" - floodsub "gx/ipfs/QmVKrsEgixRtMWcMd6WQzuwqCUC3jfLf7Q7xcjnKoMMikS/go-libp2p-floodsub" isd "gx/ipfs/QmZmmuAXgX73UQmX1jRKjTGmjzq24Jinqkq8vzkBtno4uX/go-is-domain" mh "gx/ipfs/QmZyZDi491cCNTLfAhwcaDii2Kg4pwKRkhqQzURGDvY6ua/go-multihash" peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" ci "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto" ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore" - p2phost "gx/ipfs/QmfZTdmunzKzAGJrSvXXQbQ5kLLUiEMX5vdwux7iXkdk7D/go-libp2p-host" ) // mpns (a multi-protocol NameSystem) implements generic IPFS naming. @@ -48,23 +45,6 @@ func NewNameSystem(r routing.ValueStore, ds ds.Datastore, cachesize int) NameSys } } -// AddPubsubNameSystem adds the pubsub publisher and resolver to the namesystem -func AddPubsubNameSystem(ctx context.Context, ns NameSystem, host p2phost.Host, r routing.IpfsRouting, ds ds.Datastore, ps *floodsub.PubSub) error { - mpns, ok := ns.(*mpns) - if !ok { - return errors.New("unexpected NameSystem; not an mpns instance") - } - - pkf, ok := r.(routing.PubKeyFetcher) - if !ok { - return errors.New("unexpected IpfsRouting; not a PubKeyFetcher instance") - } - - mpns.resolvers["pubsub"] = NewPubsubResolver(ctx, host, r, pkf, ps) - mpns.publishers["pubsub"] = NewPubsubPublisher(ctx, host, ds, r, ps) - return nil -} - const DefaultResolverCacheTTL = time.Minute // Resolve implements Resolver. @@ -219,16 +199,3 @@ func (ns *mpns) addToDHTCache(key ci.PrivKey, value path.Path, eol time.Time) { eol: eol, }) } - -// GetResolver implements ResolverLookup -func (ns *mpns) GetResolver(subs string) (Resolver, bool) { - res, ok := ns.resolvers[subs] - if ok { - ires, ok := res.(Resolver) - if ok { - return ires, true - } - } - - return nil, false -} diff --git a/namesys/publisher.go b/namesys/publisher.go index d75a4e5cf..2e470d66b 100644 --- a/namesys/publisher.go +++ b/namesys/publisher.go @@ -142,7 +142,7 @@ func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqn errs := make(chan error, 2) // At most two errors (IPNS, and public key) // Attempt to extract the public key from the ID - extractedPublicKey := id.ExtractPublicKey() + extractedPublicKey, _ := id.ExtractPublicKey() go func() { errs <- PublishEntry(ctx, r, ipnskey, entry) diff --git a/namesys/publisher_test.go b/namesys/publisher_test.go index e4ad1fa69..39a975332 100644 --- a/namesys/publisher_test.go +++ b/namesys/publisher_test.go @@ -49,14 +49,7 @@ func testNamekeyPublisher(t *testing.T, keyType int, expectedErr error, expected } // ID - var id peer.ID - switch keyType { - case ci.Ed25519: - id, err = peer.IDFromEd25519PublicKey(pubKey) - default: - id, err = peer.IDFromPublicKey(pubKey) - } - + id, err := peer.IDFromPublicKey(pubKey) if err != nil { t.Fatal(err) } diff --git a/namesys/pubsub.go b/namesys/pubsub.go deleted file mode 100644 index ba4a73f66..000000000 --- a/namesys/pubsub.go +++ /dev/null @@ -1,426 +0,0 @@ -package namesys - -import ( - "context" - "errors" - "fmt" - "strings" - "sync" - "time" - - opts "github.com/ipfs/go-ipfs/namesys/opts" - pb "github.com/ipfs/go-ipfs/namesys/pb" - path "github.com/ipfs/go-ipfs/path" - - u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util" - record "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record" - dhtpb "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record/pb" - routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing" - floodsub "gx/ipfs/QmVKrsEgixRtMWcMd6WQzuwqCUC3jfLf7Q7xcjnKoMMikS/go-libp2p-floodsub" - dshelp "gx/ipfs/QmYJgz1Z5PbBGP7n2XA8uv5sF1EKLfYUjL7kFemVAjMNqC/go-ipfs-ds-help" - proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" - mh "gx/ipfs/QmZyZDi491cCNTLfAhwcaDii2Kg4pwKRkhqQzURGDvY6ua/go-multihash" - peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" - cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" - pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore" - ci "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto" - ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore" - dssync "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/sync" - p2phost "gx/ipfs/QmfZTdmunzKzAGJrSvXXQbQ5kLLUiEMX5vdwux7iXkdk7D/go-libp2p-host" -) - -// PubsubPublisher is a publisher that distributes IPNS records through pubsub -type PubsubPublisher struct { - ctx context.Context - ds ds.Datastore - host p2phost.Host - cr routing.ContentRouting - ps *floodsub.PubSub - - mx sync.Mutex - subs map[string]struct{} -} - -// PubsubResolver is a resolver that receives IPNS records through pubsub -type PubsubResolver struct { - ctx context.Context - ds ds.Datastore - host p2phost.Host - cr routing.ContentRouting - pkf routing.PubKeyFetcher - ps *floodsub.PubSub - - mx sync.Mutex - subs map[string]*floodsub.Subscription -} - -// NewPubsubPublisher constructs a new Publisher that publishes IPNS records through pubsub. -// The constructor interface is complicated by the need to bootstrap the pubsub topic. -// This could be greatly simplified if the pubsub implementation handled bootstrap itself -func NewPubsubPublisher(ctx context.Context, host p2phost.Host, ds ds.Datastore, cr routing.ContentRouting, ps *floodsub.PubSub) *PubsubPublisher { - return &PubsubPublisher{ - ctx: ctx, - ds: ds, - host: host, // needed for pubsub bootstrap - cr: cr, // needed for pubsub bootstrap - ps: ps, - subs: make(map[string]struct{}), - } -} - -// NewPubsubResolver constructs a new Resolver that resolves IPNS records through pubsub. -// same as above for pubsub bootstrap dependencies -func NewPubsubResolver(ctx context.Context, host p2phost.Host, cr routing.ContentRouting, pkf routing.PubKeyFetcher, ps *floodsub.PubSub) *PubsubResolver { - return &PubsubResolver{ - ctx: ctx, - ds: dssync.MutexWrap(ds.NewMapDatastore()), - host: host, // needed for pubsub bootstrap - cr: cr, // needed for pubsub bootstrap - pkf: pkf, - ps: ps, - subs: make(map[string]*floodsub.Subscription), - } -} - -// Publish publishes an IPNS record through pubsub with default TTL -func (p *PubsubPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Path) error { - return p.PublishWithEOL(ctx, k, value, time.Now().Add(DefaultRecordTTL)) -} - -// PublishWithEOL publishes an IPNS record through pubsub -func (p *PubsubPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value path.Path, eol time.Time) error { - id, err := peer.IDFromPrivateKey(k) - if err != nil { - return err - } - - _, ipnskey := IpnsKeysForID(id) - - seqno, err := p.getPreviousSeqNo(ctx, ipnskey) - if err != nil { - return err - } - - seqno++ - - return p.publishRecord(ctx, k, value, seqno, eol, ipnskey, id) -} - -func (p *PubsubPublisher) getPreviousSeqNo(ctx context.Context, ipnskey string) (uint64, error) { - // the datastore is shared with the routing publisher to properly increment and persist - // ipns record sequence numbers. - prevrec, err := p.ds.Get(dshelp.NewKeyFromBinary([]byte(ipnskey))) - if err != nil { - if err == ds.ErrNotFound { - // None found, lets start at zero! - return 0, nil - } - return 0, err - } - - prbytes, ok := prevrec.([]byte) - if !ok { - return 0, fmt.Errorf("unexpected type returned from datastore: %#v", prevrec) - } - - var dsrec dhtpb.Record - err = proto.Unmarshal(prbytes, &dsrec) - if err != nil { - return 0, err - } - - var entry pb.IpnsEntry - err = proto.Unmarshal(dsrec.GetValue(), &entry) - if err != nil { - return 0, err - } - - return entry.GetSequence(), nil -} - -func (p *PubsubPublisher) publishRecord(ctx context.Context, k ci.PrivKey, value path.Path, seqno uint64, eol time.Time, ipnskey string, ID peer.ID) error { - entry, err := CreateRoutingEntryData(k, value, seqno, eol) - if err != nil { - return err - } - - data, err := proto.Marshal(entry) - if err != nil { - return err - } - - // the datastore is shared with the routing publisher to properly increment and persist - // ipns record sequence numbers; so we need to Record our new entry in the datastore - dsrec, err := record.MakePutRecord(k, ipnskey, data, true) - if err != nil { - return err - } - - dsdata, err := proto.Marshal(dsrec) - if err != nil { - return err - } - - err = p.ds.Put(dshelp.NewKeyFromBinary([]byte(ipnskey)), dsdata) - if err != nil { - return err - } - - // now we publish, but we also need to bootstrap pubsub for our messages to propagate - topic := "/ipns/" + ID.Pretty() - - p.mx.Lock() - _, ok := p.subs[topic] - - if !ok { - p.subs[topic] = struct{}{} - p.mx.Unlock() - - bootstrapPubsub(p.ctx, p.cr, p.host, topic) - } else { - p.mx.Unlock() - } - - log.Debugf("PubsubPublish: publish IPNS record for %s (%d)", topic, seqno) - return p.ps.Publish(topic, data) -} - -// Resolve resolves a name through pubsub and default depth limit -func (r *PubsubResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) { - return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") -} - -func (r *PubsubResolver) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, error) { - log.Debugf("PubsubResolve: resolve '%s'", name) - - // retrieve the public key once (for verifying messages) - xname := strings.TrimPrefix(name, "/ipns/") - hash, err := mh.FromB58String(xname) - if err != nil { - log.Warningf("PubsubResolve: bad input hash: [%s]", xname) - return "", err - } - - id := peer.ID(hash) - if r.host.Peerstore().PrivKey(id) != nil { - return "", errors.New("cannot resolve own name through pubsub") - } - - pubk := id.ExtractPublicKey() - if pubk == nil { - pubk, err = r.pkf.GetPublicKey(ctx, id) - if err != nil { - log.Warningf("PubsubResolve: error fetching public key: %s [%s]", err.Error(), xname) - return "", err - } - } - - // the topic is /ipns/Qmhash - if !strings.HasPrefix(name, "/ipns/") { - name = "/ipns/" + name - } - - r.mx.Lock() - // see if we already have a pubsub subscription; if not, subscribe - sub, ok := r.subs[name] - if !ok { - sub, err = r.ps.Subscribe(name) - if err != nil { - r.mx.Unlock() - return "", err - } - - log.Debugf("PubsubResolve: subscribed to %s", name) - - r.subs[name] = sub - - ctx, cancel := context.WithCancel(r.ctx) - go r.handleSubscription(sub, name, pubk, cancel) - go bootstrapPubsub(ctx, r.cr, r.host, name) - } - r.mx.Unlock() - - // resolve to what we may already have in the datastore - dsval, err := r.ds.Get(dshelp.NewKeyFromBinary([]byte(name))) - if err != nil { - if err == ds.ErrNotFound { - return "", ErrResolveFailed - } - return "", err - } - - data := dsval.([]byte) - entry := new(pb.IpnsEntry) - - err = proto.Unmarshal(data, entry) - if err != nil { - return "", err - } - - // check EOL; if the entry has expired, delete from datastore and return ds.ErrNotFound - eol, ok := checkEOL(entry) - if ok && eol.Before(time.Now()) { - err = r.ds.Delete(dshelp.NewKeyFromBinary([]byte(name))) - if err != nil { - log.Warningf("PubsubResolve: error deleting stale value for %s: %s", name, err.Error()) - } - - return "", ErrResolveFailed - } - - value, err := path.ParsePath(string(entry.GetValue())) - return value, err -} - -// GetSubscriptions retrieves a list of active topic subscriptions -func (r *PubsubResolver) GetSubscriptions() []string { - r.mx.Lock() - defer r.mx.Unlock() - - var res []string - for sub := range r.subs { - res = append(res, sub) - } - - return res -} - -// Cancel cancels a topic subscription; returns true if an active -// subscription was canceled -func (r *PubsubResolver) Cancel(name string) bool { - r.mx.Lock() - defer r.mx.Unlock() - - sub, ok := r.subs[name] - if ok { - sub.Cancel() - delete(r.subs, name) - } - - return ok -} - -func (r *PubsubResolver) handleSubscription(sub *floodsub.Subscription, name string, pubk ci.PubKey, cancel func()) { - defer sub.Cancel() - defer cancel() - - for { - msg, err := sub.Next(r.ctx) - if err != nil { - if err != context.Canceled { - log.Warningf("PubsubResolve: subscription error in %s: %s", name, err.Error()) - } - return - } - - err = r.receive(msg, name, pubk) - if err != nil { - log.Warningf("PubsubResolve: error processing update for %s: %s", name, err.Error()) - } - } -} - -func (r *PubsubResolver) receive(msg *floodsub.Message, name string, pubk ci.PubKey) error { - data := msg.GetData() - if data == nil { - return errors.New("empty message") - } - - entry := new(pb.IpnsEntry) - err := proto.Unmarshal(data, entry) - if err != nil { - return err - } - - ok, err := pubk.Verify(ipnsEntryDataForSig(entry), entry.GetSignature()) - if err != nil || !ok { - return errors.New("signature verification failed") - } - - _, err = path.ParsePath(string(entry.GetValue())) - if err != nil { - return err - } - - eol, ok := checkEOL(entry) - if ok && eol.Before(time.Now()) { - return errors.New("stale update; EOL exceeded") - } - - // check the sequence number against what we may already have in our datastore - oval, err := r.ds.Get(dshelp.NewKeyFromBinary([]byte(name))) - if err == nil { - odata := oval.([]byte) - oentry := new(pb.IpnsEntry) - - err = proto.Unmarshal(odata, oentry) - if err != nil { - return err - } - - if entry.GetSequence() <= oentry.GetSequence() { - return errors.New("stale update; sequence number too small") - } - } - - log.Debugf("PubsubResolve: receive IPNS record for %s", name) - - return r.ds.Put(dshelp.NewKeyFromBinary([]byte(name)), data) -} - -// rendezvous with peers in the name topic through provider records -// Note: rendezvous/boostrap should really be handled by the pubsub implementation itself! -func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phost.Host, name string) { - topic := "floodsub:" + name - hash := u.Hash([]byte(topic)) - rz := cid.NewCidV1(cid.Raw, hash) - - err := cr.Provide(ctx, rz, true) - if err != nil { - log.Warningf("bootstrapPubsub: error providing rendezvous for %s: %s", topic, err.Error()) - } - - go func() { - for { - select { - case <-time.After(8 * time.Hour): - err := cr.Provide(ctx, rz, true) - if err != nil { - log.Warningf("bootstrapPubsub: error providing rendezvous for %s: %s", topic, err.Error()) - } - case <-ctx.Done(): - return - } - } - }() - - rzctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - wg := &sync.WaitGroup{} - for pi := range cr.FindProvidersAsync(rzctx, rz, 10) { - if pi.ID == host.ID() { - continue - } - wg.Add(1) - go func(pi pstore.PeerInfo) { - defer wg.Done() - - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - err := host.Connect(ctx, pi) - if err != nil { - log.Debugf("Error connecting to pubsub peer %s: %s", pi.ID, err.Error()) - return - } - - // delay to let pubsub perform its handshake - time.Sleep(time.Millisecond * 250) - - log.Debugf("Connected to pubsub peer %s", pi.ID) - }(pi) - } - - wg.Wait() -} diff --git a/namesys/pubsub_test.go b/namesys/pubsub_test.go deleted file mode 100644 index 46e414b9d..000000000 --- a/namesys/pubsub_test.go +++ /dev/null @@ -1,197 +0,0 @@ -package namesys - -import ( - "context" - "sync" - "testing" - "time" - - path "github.com/ipfs/go-ipfs/path" - - mockrouting "gx/ipfs/QmPuPdzoG4b5uyYSQCjLEHB8NM593m3BW19UHX2jZ6Wzfm/go-ipfs-routing/mock" - routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing" - testutil "gx/ipfs/QmUJzxQQ2kzwQubsMqBTr1NGDpLfh7pGA2E1oaJULcKDPq/go-testutil" - floodsub "gx/ipfs/QmVKrsEgixRtMWcMd6WQzuwqCUC3jfLf7Q7xcjnKoMMikS/go-libp2p-floodsub" - netutil "gx/ipfs/Qmb6BsZf6Y3kxffXMNTubGPF1w1bkHtpvhfYbmnwP3NQyw/go-libp2p-netutil" - bhost "gx/ipfs/Qmc64U41EEB4nPG7wxjEqFwKJajS2f8kk5q2TvUrQf78Xu/go-libp2p-blankhost" - peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" - pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore" - ci "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto" - ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore" - p2phost "gx/ipfs/QmfZTdmunzKzAGJrSvXXQbQ5kLLUiEMX5vdwux7iXkdk7D/go-libp2p-host" -) - -func newNetHost(ctx context.Context, t *testing.T) p2phost.Host { - netw := netutil.GenSwarmNetwork(t, ctx) - return bhost.NewBlankHost(netw) -} - -func newNetHosts(ctx context.Context, t *testing.T, n int) []p2phost.Host { - var out []p2phost.Host - - for i := 0; i < n; i++ { - h := newNetHost(ctx, t) - out = append(out, h) - } - - return out -} - -// PubKeyFetcher implementation with a global key store -type mockKeyStore struct { - keys map[peer.ID]ci.PubKey - mx sync.Mutex -} - -func (m *mockKeyStore) addPubKey(id peer.ID, pkey ci.PubKey) { - m.mx.Lock() - defer m.mx.Unlock() - m.keys[id] = pkey -} - -func (m *mockKeyStore) getPubKey(id peer.ID) (ci.PubKey, error) { - m.mx.Lock() - defer m.mx.Unlock() - pkey, ok := m.keys[id] - if ok { - return pkey, nil - } - - return nil, routing.ErrNotFound -} - -func (m *mockKeyStore) GetPublicKey(ctx context.Context, id peer.ID) (ci.PubKey, error) { - return m.getPubKey(id) -} - -func newMockKeyStore() *mockKeyStore { - return &mockKeyStore{ - keys: make(map[peer.ID]ci.PubKey), - } -} - -// ConentRouting mock -func newMockRouting(ms mockrouting.Server, ks *mockKeyStore, host p2phost.Host) routing.ContentRouting { - id := host.ID() - - privk := host.Peerstore().PrivKey(id) - pubk := host.Peerstore().PubKey(id) - pi := host.Peerstore().PeerInfo(id) - - ks.addPubKey(id, pubk) - return ms.Client(testutil.NewIdentity(id, pi.Addrs[0], privk, pubk)) -} - -func newMockRoutingForHosts(ms mockrouting.Server, ks *mockKeyStore, hosts []p2phost.Host) []routing.ContentRouting { - rs := make([]routing.ContentRouting, len(hosts)) - for i := 0; i < len(hosts); i++ { - rs[i] = newMockRouting(ms, ks, hosts[i]) - } - return rs -} - -// tests -func TestPubsubPublishSubscribe(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ms := mockrouting.NewServer() - ks := newMockKeyStore() - - pubhost := newNetHost(ctx, t) - pubmr := newMockRouting(ms, ks, pubhost) - fs, err := floodsub.NewFloodSub(ctx, pubhost) - if err != nil { - t.Fatal(err) - } - pub := NewPubsubPublisher(ctx, pubhost, ds.NewMapDatastore(), pubmr, fs) - privk := pubhost.Peerstore().PrivKey(pubhost.ID()) - pubpinfo := pstore.PeerInfo{ID: pubhost.ID(), Addrs: pubhost.Addrs()} - - name := "/ipns/" + pubhost.ID().Pretty() - - reshosts := newNetHosts(ctx, t, 5) - resmrs := newMockRoutingForHosts(ms, ks, reshosts) - res := make([]*PubsubResolver, len(reshosts)) - for i := 0; i < len(res); i++ { - - fs, err := floodsub.NewFloodSub(ctx, reshosts[i]) - if err != nil { - t.Fatal(err) - } - - res[i] = NewPubsubResolver(ctx, reshosts[i], resmrs[i], ks, fs) - if err := reshosts[i].Connect(ctx, pubpinfo); err != nil { - t.Fatal(err) - } - } - - time.Sleep(time.Millisecond * 100) - for i := 0; i < len(res); i++ { - checkResolveNotFound(ctx, t, i, res[i], name) - // delay to avoid connection storms - time.Sleep(time.Millisecond * 100) - } - - // let the bootstrap finish - time.Sleep(time.Second * 1) - - val := path.Path("/ipfs/QmP1DfoUjiWH2ZBo1PBH6FupdBucbDepx3HpWmEY6JMUpY") - err = pub.Publish(ctx, privk, val) - if err != nil { - t.Fatal(err) - } - - // let the flood propagate - time.Sleep(time.Second * 1) - for i := 0; i < len(res); i++ { - checkResolve(ctx, t, i, res[i], name, val) - } - - val = path.Path("/ipfs/QmP1wMAqk6aZYRZirbaAwmrNeqFRgQrwBt3orUtvSa1UYD") - err = pub.Publish(ctx, privk, val) - if err != nil { - t.Fatal(err) - } - - // let the flood propagate - time.Sleep(time.Second * 1) - for i := 0; i < len(res); i++ { - checkResolve(ctx, t, i, res[i], name, val) - } - - // cancel subscriptions - for i := 0; i < len(res); i++ { - res[i].Cancel(name) - } - time.Sleep(time.Millisecond * 100) - - nval := path.Path("/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr") - err = pub.Publish(ctx, privk, nval) - if err != nil { - t.Fatal(err) - } - - // check we still have the old value in the resolver - time.Sleep(time.Second * 1) - for i := 0; i < len(res); i++ { - checkResolve(ctx, t, i, res[i], name, val) - } -} - -func checkResolveNotFound(ctx context.Context, t *testing.T, i int, resolver Resolver, name string) { - _, err := resolver.Resolve(ctx, name) - if err != ErrResolveFailed { - t.Fatalf("[resolver %d] unexpected error: %s", i, err.Error()) - } -} - -func checkResolve(ctx context.Context, t *testing.T, i int, resolver Resolver, name string, val path.Path) { - xval, err := resolver.Resolve(ctx, name) - if err != nil { - t.Fatalf("[resolver %d] resolve failed: %s", i, err.Error()) - } - if xval != val { - t.Fatalf("[resolver %d] unexpected value: %s %s", i, val, xval) - } -} diff --git a/namesys/routing.go b/namesys/routing.go index 5831e3ea6..73670145e 100644 --- a/namesys/routing.go +++ b/namesys/routing.go @@ -17,6 +17,7 @@ import ( mh "gx/ipfs/QmZyZDi491cCNTLfAhwcaDii2Kg4pwKRkhqQzURGDvY6ua/go-multihash" peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" + dht "gx/ipfs/Qmd3jqhBQFvhfBNTSJMQL15GgyVMpdxKTta69Napvx6Myd/go-libp2p-kad-dht" ) var log = logging.Logger("namesys") @@ -133,28 +134,28 @@ func (r *routingResolver) resolveOnce(ctx context.Context, name string, options return "", err } + pid, err := peer.IDFromBytes(hash) + if err != nil { + log.Debugf("RoutingResolver: could not convert public key hash %s to peer ID: %s\n", name, err) + return "", err + } + // Name should be the hash of a public key retrievable from ipfs. // We retrieve the public key here to make certain that it's in the peer // store before calling GetValue() on the DHT - the DHT will call the // ipns validator, which in turn will get the public key from the peer // store to verify the record signature - _, err = routing.GetPublicKey(r.routing, ctx, hash) + _, err = routing.GetPublicKey(r.routing, ctx, pid) if err != nil { log.Debugf("RoutingResolver: could not retrieve public key %s: %s\n", name, err) return "", err } - pid, err := peer.IDFromBytes(hash) - if err != nil { - log.Debugf("RoutingResolver: could not convert public key hash %s to peer ID: %s\n", name, err) - return "", err - } - // Use the routing system to get the name. // Note that the DHT will call the ipns validator when retrieving // the value, which in turn verifies the ipns record signature _, ipnsKey := IpnsKeysForID(pid) - val, err := r.getValue(ctx, ipnsKey, options) + val, err := r.routing.GetValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) if err != nil { log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err) return "", err @@ -187,39 +188,6 @@ func (r *routingResolver) resolveOnce(ctx context.Context, name string, options } } -func (r *routingResolver) getValue(ctx context.Context, ipnsKey string, options *opts.ResolveOpts) ([]byte, error) { - // Get specified number of values from the DHT - vals, err := r.routing.GetValues(ctx, ipnsKey, int(options.DhtRecordCount)) - if err != nil { - return nil, err - } - - // Select the best value - recs := make([][]byte, 0, len(vals)) - for _, v := range vals { - if v.Val != nil { - recs = append(recs, v.Val) - } - } - - if len(recs) == 0 { - return nil, routing.ErrNotFound - } - - i, err := IpnsSelectorFunc(ipnsKey, recs) - if err != nil { - return nil, err - } - - best := recs[i] - if best == nil { - log.Errorf("GetValues %s yielded record with nil value", ipnsKey) - return nil, routing.ErrNotFound - } - - return best, nil -} - func checkEOL(e *pb.IpnsEntry) (time.Time, bool) { if e.GetValidityType() == pb.IpnsEntry_EOL { eol, err := u.ParseRFC3339(string(e.GetValidity())) diff --git a/namesys/selector.go b/namesys/selector.go deleted file mode 100644 index aebfb1533..000000000 --- a/namesys/selector.go +++ /dev/null @@ -1,63 +0,0 @@ -package namesys - -import ( - "bytes" - "errors" - - pb "github.com/ipfs/go-ipfs/namesys/pb" - - u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util" - proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" -) - -// IpnsSelectorFunc selects the best record by checking which has the highest -// sequence number and latest EOL -func IpnsSelectorFunc(k string, vals [][]byte) (int, error) { - var recs []*pb.IpnsEntry - for _, v := range vals { - e := new(pb.IpnsEntry) - err := proto.Unmarshal(v, e) - if err == nil { - recs = append(recs, e) - } else { - recs = append(recs, nil) - } - } - - return selectRecord(recs, vals) -} - -func selectRecord(recs []*pb.IpnsEntry, vals [][]byte) (int, error) { - var bestSeq uint64 - besti := -1 - - for i, r := range recs { - if r == nil || r.GetSequence() < bestSeq { - continue - } - rt, err := u.ParseRFC3339(string(r.GetValidity())) - if err != nil { - log.Errorf("failed to parse ipns record EOL %s", r.GetValidity()) - continue - } - - if besti == -1 || r.GetSequence() > bestSeq { - bestSeq = r.GetSequence() - besti = i - } else if r.GetSequence() == bestSeq { - bestt, _ := u.ParseRFC3339(string(recs[besti].GetValidity())) - if rt.After(bestt) { - besti = i - } else if rt == bestt { - if bytes.Compare(vals[i], vals[besti]) > 0 { - besti = i - } - } - } - } - if besti == -1 { - return 0, errors.New("no usable records in given set") - } - - return besti, nil -} diff --git a/namesys/validator.go b/namesys/validator.go index cde4e92ed..941d6a667 100644 --- a/namesys/validator.go +++ b/namesys/validator.go @@ -1,6 +1,7 @@ package namesys import ( + "bytes" "errors" "time" @@ -41,64 +42,106 @@ var ErrKeyFormat = errors.New("record key could not be parsed into peer ID") // from the peer store var ErrPublicKeyNotFound = errors.New("public key not found in peer store") -// NewIpnsRecordValidator returns a ValidChecker for IPNS records. -// The validator function will get a public key from the KeyBook -// to verify the record's signature. Note that the public key must -// already have been fetched from the network and put into the KeyBook -// by the caller. -func NewIpnsRecordValidator(kbook pstore.KeyBook) *record.ValidChecker { - // ValidateIpnsRecord implements ValidatorFunc and verifies that the - // given record's value is an IpnsEntry, that the entry has been correctly - // signed, and that the entry has not expired - ValidateIpnsRecord := func(r *record.ValidationRecord) error { - if r.Namespace != "ipns" { - return ErrInvalidPath - } - - // Parse the value into an IpnsEntry - entry := new(pb.IpnsEntry) - err := proto.Unmarshal(r.Value, entry) - if err != nil { - return ErrBadRecord - } - - // Get the public key defined by the ipns path - pid, err := peer.IDFromString(r.Key) - if err != nil { - log.Debugf("failed to parse ipns record key %s into peer ID", r.Key) - return ErrKeyFormat - } - pubk := kbook.PubKey(pid) - if pubk == nil { - log.Debugf("public key with hash %s not found in peer store", pid) - return ErrPublicKeyNotFound - } - - // Check the ipns record signature with the public key - if ok, err := pubk.Verify(ipnsEntryDataForSig(entry), entry.GetSignature()); err != nil || !ok { - log.Debugf("failed to verify signature for ipns record %s", r.Key) - return ErrSignature - } - - // Check that record has not expired - switch entry.GetValidityType() { - case pb.IpnsEntry_EOL: - t, err := u.ParseRFC3339(string(entry.GetValidity())) - if err != nil { - log.Debugf("failed parsing time for ipns record EOL in record %s", r.Key) - return err - } - if time.Now().After(t) { - return ErrExpiredRecord - } - default: - return ErrUnrecognizedValidity - } - return nil - } - - return &record.ValidChecker{ - Func: ValidateIpnsRecord, - Sign: false, - } +type IpnsValidator struct { + KeyBook pstore.KeyBook +} + +func (v IpnsValidator) Validate(key string, value []byte) error { + ns, pidString, err := record.SplitKey(key) + if err != nil || ns != "ipns" { + return ErrInvalidPath + } + + // Parse the value into an IpnsEntry + entry := new(pb.IpnsEntry) + err = proto.Unmarshal(value, entry) + if err != nil { + return ErrBadRecord + } + + // Get the public key defined by the ipns path + pid, err := peer.IDFromString(pidString) + if err != nil { + log.Debugf("failed to parse ipns record key %s into peer ID", pidString) + return ErrKeyFormat + } + pubk := v.KeyBook.PubKey(pid) + if pubk == nil { + log.Debugf("public key with hash %s not found in peer store", pid) + return ErrPublicKeyNotFound + } + + // Check the ipns record signature with the public key + if ok, err := pubk.Verify(ipnsEntryDataForSig(entry), entry.GetSignature()); err != nil || !ok { + log.Debugf("failed to verify signature for ipns record %s", pidString) + return ErrSignature + } + + // Check that record has not expired + switch entry.GetValidityType() { + case pb.IpnsEntry_EOL: + t, err := u.ParseRFC3339(string(entry.GetValidity())) + if err != nil { + log.Debugf("failed parsing time for ipns record EOL in record %s", pidString) + return err + } + if time.Now().After(t) { + return ErrExpiredRecord + } + default: + return ErrUnrecognizedValidity + } + return nil +} + +// IpnsSelectorFunc selects the best record by checking which has the highest +// sequence number and latest EOL +func (v IpnsValidator) Select(k string, vals [][]byte) (int, error) { + var recs []*pb.IpnsEntry + for _, v := range vals { + e := new(pb.IpnsEntry) + err := proto.Unmarshal(v, e) + if err == nil { + recs = append(recs, e) + } else { + recs = append(recs, nil) + } + } + + return selectRecord(recs, vals) +} + +func selectRecord(recs []*pb.IpnsEntry, vals [][]byte) (int, error) { + var bestSeq uint64 + besti := -1 + + for i, r := range recs { + if r == nil || r.GetSequence() < bestSeq { + continue + } + rt, err := u.ParseRFC3339(string(r.GetValidity())) + if err != nil { + log.Errorf("failed to parse ipns record EOL %s", r.GetValidity()) + continue + } + + if besti == -1 || r.GetSequence() > bestSeq { + bestSeq = r.GetSequence() + besti = i + } else if r.GetSequence() == bestSeq { + bestt, _ := u.ParseRFC3339(string(recs[besti].GetValidity())) + if rt.After(bestt) { + besti = i + } else if rt == bestt { + if bytes.Compare(vals[i], vals[besti]) > 0 { + besti = i + } + } + } + } + if besti == -1 { + return 0, errors.New("no usable records in given set") + } + + return besti, nil } diff --git a/package.json b/package.json index 9a9ad21cd..0beeef1c7 100644 --- a/package.json +++ b/package.json @@ -599,6 +599,18 @@ "hash": "QmTbBs3Y3u5F69XNJzdnnc6SP5GKgcXxCDzx6w8m6piVRT", "name": "go-bitfield", "version": "0.1.1" + }, + { + "author": "stebalien", + "hash": "QmdSX2uedxXdsfNSqbkfSxcYi7pXBBdvp1Km2ZsjPWfydt", + "name": "go-libp2p-pubsub-router", + "version": "0.2.1" + }, + { + "author": "Stebalien", + "hash": "QmeoG1seQ8a9b2vS3XJ8HPz9tXr6dzpS5NizjuDDSntQEk", + "name": "go-libp2p-routing-helpers", + "version": "0.1.0" } ], "gxVersion": "0.10.0",