From 60708ea60ed7ccb258c73f8a95818390fcda74d6 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 9 May 2018 09:21:55 +0100 Subject: [PATCH] store IPNS records *outside* of the DHT fixes #4749 License: MIT Signed-off-by: Steven Allen --- core/core.go | 2 +- namesys/namesys.go | 111 ++++---------- namesys/namesys_test.go | 4 +- namesys/publisher.go | 232 +++++++++++++++++++++--------- namesys/publisher_test.go | 7 +- namesys/republisher/repub.go | 52 +++---- namesys/republisher/repub_test.go | 2 +- namesys/resolve_test.go | 13 +- 8 files changed, 236 insertions(+), 187 deletions(-) diff --git a/core/core.go b/core/core.go index 28ae84ba0..e7a423c68 100644 --- a/core/core.go +++ b/core/core.go @@ -513,7 +513,7 @@ func (n *IpfsNode) setupIpnsRepublisher() error { return err } - n.IpnsRepub = ipnsrp.NewRepublisher(n.Routing, n.Repo.Datastore(), n.PrivateKey, n.Repo.Keystore()) + n.IpnsRepub = ipnsrp.NewRepublisher(n.Namesys, n.Repo.Datastore(), n.PrivateKey, n.Repo.Keystore()) if cfg.Ipns.RepublishPeriod != "" { d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod) diff --git a/namesys/namesys.go b/namesys/namesys.go index c1f3f7c6d..afd264ac1 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -3,7 +3,6 @@ package namesys import ( "context" "strings" - "sync" "time" opts "github.com/ipfs/go-ipfs/namesys/opts" @@ -37,10 +36,10 @@ func NewNameSystem(r routing.ValueStore, ds ds.Datastore, cachesize int) NameSys resolvers: map[string]resolver{ "dns": NewDNSResolver(), "proquint": new(ProquintResolver), - "dht": NewRoutingResolver(r, cachesize), + "ipns": NewRoutingResolver(r, cachesize), }, publishers: map[string]Publisher{ - "dht": NewRoutingPublisher(r, ds), + "ipns": NewRoutingPublisher(r, ds), }, } } @@ -71,66 +70,32 @@ func (ns *mpns) resolveOnce(ctx context.Context, name string, options *opts.Reso return "", ErrResolveFailed } - makePath := func(p path.Path) (path.Path, error) { - if len(segments) > 3 { - return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) - } else { - return p, nil - } - } - // Resolver selection: - // 1. if it is a multihash resolve through "pubsub" (if available), - // with fallback to "dht" + // 1. if it is a multihash resolve through "ipns". // 2. if it is a domain name, resolve through "dns" // 3. otherwise resolve through the "proquint" resolver key := segments[2] + resName := "proquint" + if _, err := mh.FromB58String(key); err == nil { + resName = "ipns" + } else if isd.IsDomain(key) { + resName = "dns" + } - _, err := mh.FromB58String(key) - if err == nil { - res, ok := ns.resolvers["pubsub"] - if ok { - p, err := res.resolveOnce(ctx, key, options) - if err == nil { - return makePath(p) - } - } - - res, ok = ns.resolvers["dht"] - if ok { - p, err := res.resolveOnce(ctx, key, options) - if err == nil { - return makePath(p) - } - } - + res, ok := ns.resolvers[resName] + if !ok { + log.Debugf("no resolver found for %s", name) + return "", ErrResolveFailed + } + p, err := res.resolveOnce(ctx, key, options) + if err != nil { return "", ErrResolveFailed } - if isd.IsDomain(key) { - res, ok := ns.resolvers["dns"] - if ok { - p, err := res.resolveOnce(ctx, key, options) - if err == nil { - return makePath(p) - } - } - - return "", ErrResolveFailed + if len(segments) > 3 { + return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) } - - res, ok := ns.resolvers["proquint"] - if ok { - p, err := res.resolveOnce(ctx, key, options) - if err == nil { - return makePath(p) - } - - return "", ErrResolveFailed - } - - log.Debugf("no resolver found for %s", name) - return "", ErrResolveFailed + return p, nil } // Publish implements Publisher @@ -139,39 +104,23 @@ func (ns *mpns) Publish(ctx context.Context, name ci.PrivKey, value path.Path) e } func (ns *mpns) PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error { - var dhtErr error - - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - dhtErr = ns.publishers["dht"].PublishWithEOL(ctx, name, value, eol) - if dhtErr == nil { - ns.addToDHTCache(name, value, eol) - } - wg.Done() - }() - - pub, ok := ns.publishers["pubsub"] - if ok { - wg.Add(1) - go func() { - err := pub.PublishWithEOL(ctx, name, value, eol) - if err != nil { - log.Warningf("error publishing %s with pubsub: %s", name, err.Error()) - } - wg.Done() - }() + pub, ok := ns.publishers["ipns"] + if !ok { + return ErrPublishFailed } + if err := pub.PublishWithEOL(ctx, name, value, eol); err != nil { + return err + } + ns.addToIpnsCache(name, value, eol) + return nil - wg.Wait() - return dhtErr } -func (ns *mpns) addToDHTCache(key ci.PrivKey, value path.Path, eol time.Time) { - rr, ok := ns.resolvers["dht"].(*routingResolver) +func (ns *mpns) addToIpnsCache(key ci.PrivKey, value path.Path, eol time.Time) { + rr, ok := ns.resolvers["ipns"].(*routingResolver) if !ok { // should never happen, purely for sanity - log.Panicf("unexpected type %T as DHT resolver.", ns.resolvers["dht"]) + log.Panicf("unexpected type %T as DHT resolver.", ns.resolvers["ipns"]) } if rr.cache == nil { // resolver has no caching diff --git a/namesys/namesys_test.go b/namesys/namesys_test.go index 03ba60ed0..766217296 100644 --- a/namesys/namesys_test.go +++ b/namesys/namesys_test.go @@ -59,8 +59,8 @@ func mockResolverTwo() *mockResolver { func TestNamesysResolution(t *testing.T) { r := &mpns{ resolvers: map[string]resolver{ - "dht": mockResolverOne(), - "dns": mockResolverTwo(), + "ipns": mockResolverOne(), + "dns": mockResolverTwo(), }, } diff --git a/namesys/publisher.go b/namesys/publisher.go index 2e470d66b..8c376d1af 100644 --- a/namesys/publisher.go +++ b/namesys/publisher.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "fmt" + "strings" + "sync" "time" pb "github.com/ipfs/go-ipfs/namesys/pb" @@ -12,15 +14,17 @@ import ( ft "github.com/ipfs/go-ipfs/unixfs" u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util" - dhtpb "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record/pb" routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing" - dshelp "gx/ipfs/QmYJgz1Z5PbBGP7n2XA8uv5sF1EKLfYUjL7kFemVAjMNqC/go-ipfs-ds-help" proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" ci "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto" ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore" + dsquery "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/query" + base32 "gx/ipfs/QmfVj3x4D6Jkq9SEoi5n2NmoUomLwoeiwnYz2KQa15wRw6/base32" ) +const ipnsPrefix = "/ipns/" + const PublishPutValTimeout = time.Minute const DefaultRecordTTL = 24 * time.Hour @@ -29,6 +33,9 @@ const DefaultRecordTTL = 24 * time.Hour type ipnsPublisher struct { routing routing.ValueStore ds ds.Datastore + + // Used to ensure we assign IPNS records *sequential* sequence numbers. + mu sync.Mutex } // NewRoutingPublisher constructs a publisher for the IPFS Routing name system. @@ -46,69 +53,157 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa return p.PublishWithEOL(ctx, k, value, time.Now().Add(DefaultRecordTTL)) } +func IpnsDsKey(id peer.ID) ds.Key { + return ds.NewKey("/ipns/" + base32.RawStdEncoding.EncodeToString([]byte(id))) +} + +// PublishedNames returns the latest IPNS records published by this node and +// their expiration times. +// +// This method will not search the routing system for records published by other +// nodes. +func (p *ipnsPublisher) ListPublished(ctx context.Context) (map[peer.ID]*pb.IpnsEntry, error) { + query, err := p.ds.Query(dsquery.Query{ + Prefix: ipnsPrefix, + }) + if err != nil { + return nil, err + } + defer query.Close() + + records := make(map[peer.ID]*pb.IpnsEntry) + for { + select { + case result, ok := <-query.Next(): + if !ok { + return records, nil + } + if result.Error != nil { + return nil, result.Error + } + value, ok := result.Value.([]byte) + if !ok { + log.Error("found ipns record that we couldn't convert to a value") + continue + } + e := new(pb.IpnsEntry) + if err := proto.Unmarshal(value, e); err != nil { + // Might as well return what we can. + log.Error("found an invalid IPNS entry:", err) + continue + } + if !strings.HasPrefix(result.Key, ipnsPrefix) { + log.Errorf("datastore query for keys with prefix %s returned a key: %s", ipnsPrefix, result.Key) + continue + } + k := result.Key[len(ipnsPrefix):] + pid, err := base32.RawStdEncoding.DecodeString(k) + if err != nil { + log.Errorf("ipns ds key invalid: %s", result.Key) + continue + } + records[peer.ID(pid)] = e + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +// GetPublished returns the record this node has published corresponding to the +// given peer ID. +// +// If `checkRouting` is true and we have no existing record, this method will +// check the routing system for any existing records. +func (p *ipnsPublisher) GetPublished(ctx context.Context, id peer.ID, checkRouting bool) (*pb.IpnsEntry, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + + dsVal, err := p.ds.Get(IpnsDsKey(id)) + var value []byte + switch err { + case nil: + var ok bool + value, ok = dsVal.([]byte) + if !ok { + return nil, fmt.Errorf("found ipns record that we couldn't convert to a value") + } + case ds.ErrNotFound: + if !checkRouting { + return nil, nil + } + _, ipnskey := IpnsKeysForID(id) + value, err = p.routing.GetValue(ctx, ipnskey) + if err != nil { + // Not found or other network issue. Can't really do + // anything about this case. + return nil, nil + } + default: + return nil, err + } + e := new(pb.IpnsEntry) + if err := proto.Unmarshal(value, e); err != nil { + return nil, err + } + return e, nil +} + +func (p *ipnsPublisher) updateRecord(ctx context.Context, k ci.PrivKey, value path.Path, eol time.Time) (*pb.IpnsEntry, error) { + id, err := peer.IDFromPrivateKey(k) + if err != nil { + return nil, err + } + + p.mu.Lock() + defer p.mu.Unlock() + + // get previous records sequence number + rec, err := p.GetPublished(ctx, id, true) + if err != nil { + return nil, err + } + + seqno := rec.GetSequence() // returns 0 if rec is nil + if rec != nil && value != path.Path(rec.GetValue()) { + // Don't bother incrementing the sequence number unless the + // value changes. + seqno++ + } + + // Create record + entry, err := CreateRoutingEntryData(k, value, seqno, eol) + if err != nil { + return nil, err + } + + // Set the TTL + // TODO: Make this less hacky. + ttl, ok := checkCtxTTL(ctx) + if ok { + entry.Ttl = proto.Uint64(uint64(ttl.Nanoseconds())) + } + + data, err := proto.Marshal(entry) + if err != nil { + return nil, err + } + + // Put the new record. + if err := p.ds.Put(IpnsDsKey(id), data); err != nil { + return nil, err + } + return entry, nil +} + // PublishWithEOL is a temporary stand in for the ipns records implementation // see here for more details: https://github.com/ipfs/specs/tree/master/records func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value path.Path, eol time.Time) error { - - id, err := peer.IDFromPrivateKey(k) + record, err := p.updateRecord(ctx, k, value, eol) if err != nil { return err } - _, ipnskey := IpnsKeysForID(id) - - // get previous records sequence number - seqnum, err := p.getPreviousSeqNo(ctx, ipnskey) - if err != nil { - return err - } - - // increment it - seqnum++ - - return PutRecordToRouting(ctx, k, value, seqnum, eol, p.routing, id) -} - -func (p *ipnsPublisher) getPreviousSeqNo(ctx context.Context, ipnskey string) (uint64, error) { - prevrec, err := p.ds.Get(dshelp.NewKeyFromBinary([]byte(ipnskey))) - if err != nil && err != ds.ErrNotFound { - // None found, lets start at zero! - return 0, err - } - var val []byte - if err == nil { - prbytes, ok := prevrec.([]byte) - if !ok { - return 0, fmt.Errorf("unexpected type returned from datastore: %#v", prevrec) - } - dhtrec := new(dhtpb.Record) - err := proto.Unmarshal(prbytes, dhtrec) - if err != nil { - return 0, err - } - - val = dhtrec.GetValue() - } else { - // try and check the dht for a record - ctx, cancel := context.WithTimeout(ctx, time.Second*30) - defer cancel() - - rv, err := p.routing.GetValue(ctx, ipnskey) - if err != nil { - // no such record found, start at zero! - return 0, nil - } - - val = rv - } - - e := new(pb.IpnsEntry) - err = proto.Unmarshal(val, e) - if err != nil { - return 0, err - } - - return e.GetSequence(), nil + return PutRecordToRouting(ctx, p.routing, k.GetPublic(), record) } // setting the TTL on published records is an experimental feature. @@ -124,25 +219,24 @@ func checkCtxTTL(ctx context.Context) (time.Duration, bool) { return d, ok } -func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqnum uint64, eol time.Time, r routing.ValueStore, id peer.ID) error { +func PutRecordToRouting(ctx context.Context, r routing.ValueStore, k ci.PubKey, entry *pb.IpnsEntry) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - namekey, ipnskey := IpnsKeysForID(id) - entry, err := CreateRoutingEntryData(k, value, seqnum, eol) + errs := make(chan error, 2) // At most two errors (IPNS, and public key) + + id, err := peer.IDFromPublicKey(k) if err != nil { return err } - ttl, ok := checkCtxTTL(ctx) - if ok { - entry.Ttl = proto.Uint64(uint64(ttl.Nanoseconds())) + // Attempt to extract the public key from the ID + extractedPublicKey, err := id.ExtractPublicKey() + if err != nil { + return err } - errs := make(chan error, 2) // At most two errors (IPNS, and public key) - - // Attempt to extract the public key from the ID - extractedPublicKey, _ := id.ExtractPublicKey() + namekey, ipnskey := IpnsKeysForID(id) go func() { errs <- PublishEntry(ctx, r, ipnskey, entry) @@ -151,7 +245,7 @@ func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqn // Publish the public key if a public key cannot be extracted from the ID if extractedPublicKey == nil { go func() { - errs <- PublishPublicKey(ctx, r, namekey, k.GetPublic()) + errs <- PublishPublicKey(ctx, r, namekey, k) }() if err := waitOnErrChan(ctx, errs); err != nil { diff --git a/namesys/publisher_test.go b/namesys/publisher_test.go index 39a975332..8f544d0c1 100644 --- a/namesys/publisher_test.go +++ b/namesys/publisher_test.go @@ -75,7 +75,12 @@ func testNamekeyPublisher(t *testing.T, keyType int, expectedErr error, expected serv := mockrouting.NewServer() r := serv.ClientWithDatastore(context.Background(), &identity{p}, dstore) - err = PutRecordToRouting(ctx, privKey, value, seqnum, eol, r, id) + entry, err := CreateRoutingEntryData(privKey, value, seqnum, eol) + if err != nil { + t.Fatal(err) + } + + err = PutRecordToRouting(ctx, r, pubKey, entry) if err != nil { t.Fatal(err) } diff --git a/namesys/republisher/repub.go b/namesys/republisher/repub.go index db7ae590e..aa1f85647 100644 --- a/namesys/republisher/repub.go +++ b/namesys/republisher/repub.go @@ -13,9 +13,6 @@ import ( goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" gpctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" logging "gx/ipfs/QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7/go-log" - recpb "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record/pb" - routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing" - dshelp "gx/ipfs/QmYJgz1Z5PbBGP7n2XA8uv5sF1EKLfYUjL7kFemVAjMNqC/go-ipfs-ds-help" proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" ic "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto" @@ -39,7 +36,7 @@ var FailureRetryInterval = time.Minute * 5 const DefaultRecordLifetime = time.Hour * 24 type Republisher struct { - r routing.ValueStore + ns namesys.Publisher ds ds.Datastore self ic.PrivKey ks keystore.Keystore @@ -51,9 +48,9 @@ type Republisher struct { } // NewRepublisher creates a new Republisher -func NewRepublisher(r routing.ValueStore, ds ds.Datastore, self ic.PrivKey, ks keystore.Keystore) *Republisher { +func NewRepublisher(ns namesys.Publisher, ds ds.Datastore, self ic.PrivKey, ks keystore.Keystore) *Republisher { return &Republisher{ - r: r, + ns: ns, ds: ds, self: self, ks: ks, @@ -90,6 +87,10 @@ func (rp *Republisher) republishEntries(p goprocess.Process) error { ctx, cancel := context.WithCancel(gpctx.OnClosingContext(p)) defer cancel() + // TODO: Use rp.ipns.ListPublished(). We can't currently *do* that + // because: + // 1. There's no way to get keys from the keystore by ID. + // 2. We don't actually have access to the IPNS publisher. err := rp.republishEntry(ctx, rp.self) if err != nil { return err @@ -125,8 +126,7 @@ func (rp *Republisher) republishEntry(ctx context.Context, priv ic.PrivKey) erro log.Debugf("republishing ipns entry for %s", id) // Look for it locally only - _, ipnskey := namesys.IpnsKeysForID(id) - p, seq, err := rp.getLastVal(ipnskey) + p, err := rp.getLastVal(id) if err != nil { if err == errNoEntry { return nil @@ -136,33 +136,25 @@ func (rp *Republisher) republishEntry(ctx context.Context, priv ic.PrivKey) erro // update record with same sequence number eol := time.Now().Add(rp.RecordLifetime) - err = namesys.PutRecordToRouting(ctx, priv, p, seq, eol, rp.r, id) - if err != nil { - return err - } - - return nil + return rp.ns.PublishWithEOL(ctx, priv, p, eol) } -func (rp *Republisher) getLastVal(k string) (path.Path, uint64, error) { - ival, err := rp.ds.Get(dshelp.NewKeyFromBinary([]byte(k))) - if err != nil { - // not found means we dont have a previously published entry - return "", 0, errNoEntry +func (rp *Republisher) getLastVal(id peer.ID) (path.Path, error) { + // Look for it locally only + vali, err := rp.ds.Get(namesys.IpnsDsKey(id)) + switch err { + case nil: + case ds.ErrNotFound: + return "", errNoEntry + default: + return "", err } - val := ival.([]byte) - dhtrec := new(recpb.Record) - err = proto.Unmarshal(val, dhtrec) - if err != nil { - return "", 0, err - } + val := vali.([]byte) - // extract published data from record e := new(pb.IpnsEntry) - err = proto.Unmarshal(dhtrec.GetValue(), e) - if err != nil { - return "", 0, err + if err := proto.Unmarshal(val, e); err != nil { + return "", err } - return path.Path(e.Value), e.GetSequence(), nil + return path.Path(e.Value), nil } diff --git a/namesys/republisher/repub_test.go b/namesys/republisher/repub_test.go index f47df4696..8a9ab366f 100644 --- a/namesys/republisher/repub_test.go +++ b/namesys/republisher/repub_test.go @@ -78,7 +78,7 @@ func TestRepublish(t *testing.T) { // The republishers that are contained within the nodes have their timeout set // to 12 hours. Instead of trying to tweak those, we're just going to pretend // they dont exist and make our own. - repub := NewRepublisher(publisher.Routing, publisher.Repo.Datastore(), publisher.PrivateKey, publisher.Repo.Keystore()) + repub := NewRepublisher(rp, publisher.Repo.Datastore(), publisher.PrivateKey, publisher.Repo.Keystore()) repub.Interval = time.Second repub.RecordLifetime = time.Second * 5 diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index 39a670088..206329667 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -70,7 +70,12 @@ func TestPrexistingExpiredRecord(t *testing.T) { // Make an expired record and put it in the datastore h := path.FromString("/ipfs/QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN") eol := time.Now().Add(time.Hour * -1) - err = PutRecordToRouting(context.Background(), privk, h, 0, eol, d, id) + + entry, err := CreateRoutingEntryData(privk, h, 0, eol) + if err != nil { + t.Fatal(err) + } + err = PutRecordToRouting(context.Background(), d, pubk, entry) if err != nil { t.Fatal(err) } @@ -107,7 +112,11 @@ func TestPrexistingRecord(t *testing.T) { // Make a good record and put it in the datastore h := path.FromString("/ipfs/QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN") eol := time.Now().Add(time.Hour) - err = PutRecordToRouting(context.Background(), privk, h, 0, eol, d, id) + entry, err := CreateRoutingEntryData(privk, h, 0, eol) + if err != nil { + t.Fatal(err) + } + err = PutRecordToRouting(context.Background(), d, pubk, entry) if err != nil { t.Fatal(err) }