store IPNS records *outside* of the DHT

fixes #4749

License: MIT
Signed-off-by: Steven Allen <steven@stebalien.com>
This commit is contained in:
Steven Allen 2018-05-09 09:21:55 +01:00
parent 55cb76d4f3
commit 60708ea60e
8 changed files with 236 additions and 187 deletions

View File

@ -513,7 +513,7 @@ func (n *IpfsNode) setupIpnsRepublisher() error {
return err
}
n.IpnsRepub = ipnsrp.NewRepublisher(n.Routing, n.Repo.Datastore(), n.PrivateKey, n.Repo.Keystore())
n.IpnsRepub = ipnsrp.NewRepublisher(n.Namesys, n.Repo.Datastore(), n.PrivateKey, n.Repo.Keystore())
if cfg.Ipns.RepublishPeriod != "" {
d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)

View File

@ -3,7 +3,6 @@ package namesys
import (
"context"
"strings"
"sync"
"time"
opts "github.com/ipfs/go-ipfs/namesys/opts"
@ -37,10 +36,10 @@ func NewNameSystem(r routing.ValueStore, ds ds.Datastore, cachesize int) NameSys
resolvers: map[string]resolver{
"dns": NewDNSResolver(),
"proquint": new(ProquintResolver),
"dht": NewRoutingResolver(r, cachesize),
"ipns": NewRoutingResolver(r, cachesize),
},
publishers: map[string]Publisher{
"dht": NewRoutingPublisher(r, ds),
"ipns": NewRoutingPublisher(r, ds),
},
}
}
@ -71,66 +70,32 @@ func (ns *mpns) resolveOnce(ctx context.Context, name string, options *opts.Reso
return "", ErrResolveFailed
}
makePath := func(p path.Path) (path.Path, error) {
if len(segments) > 3 {
return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3])
} else {
return p, nil
}
}
// Resolver selection:
// 1. if it is a multihash resolve through "pubsub" (if available),
// with fallback to "dht"
// 1. if it is a multihash resolve through "ipns".
// 2. if it is a domain name, resolve through "dns"
// 3. otherwise resolve through the "proquint" resolver
key := segments[2]
resName := "proquint"
if _, err := mh.FromB58String(key); err == nil {
resName = "ipns"
} else if isd.IsDomain(key) {
resName = "dns"
}
_, err := mh.FromB58String(key)
if err == nil {
res, ok := ns.resolvers["pubsub"]
if ok {
p, err := res.resolveOnce(ctx, key, options)
if err == nil {
return makePath(p)
}
}
res, ok = ns.resolvers["dht"]
if ok {
p, err := res.resolveOnce(ctx, key, options)
if err == nil {
return makePath(p)
}
}
res, ok := ns.resolvers[resName]
if !ok {
log.Debugf("no resolver found for %s", name)
return "", ErrResolveFailed
}
p, err := res.resolveOnce(ctx, key, options)
if err != nil {
return "", ErrResolveFailed
}
if isd.IsDomain(key) {
res, ok := ns.resolvers["dns"]
if ok {
p, err := res.resolveOnce(ctx, key, options)
if err == nil {
return makePath(p)
}
}
return "", ErrResolveFailed
if len(segments) > 3 {
return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3])
}
res, ok := ns.resolvers["proquint"]
if ok {
p, err := res.resolveOnce(ctx, key, options)
if err == nil {
return makePath(p)
}
return "", ErrResolveFailed
}
log.Debugf("no resolver found for %s", name)
return "", ErrResolveFailed
return p, nil
}
// Publish implements Publisher
@ -139,39 +104,23 @@ func (ns *mpns) Publish(ctx context.Context, name ci.PrivKey, value path.Path) e
}
func (ns *mpns) PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error {
var dhtErr error
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
dhtErr = ns.publishers["dht"].PublishWithEOL(ctx, name, value, eol)
if dhtErr == nil {
ns.addToDHTCache(name, value, eol)
}
wg.Done()
}()
pub, ok := ns.publishers["pubsub"]
if ok {
wg.Add(1)
go func() {
err := pub.PublishWithEOL(ctx, name, value, eol)
if err != nil {
log.Warningf("error publishing %s with pubsub: %s", name, err.Error())
}
wg.Done()
}()
pub, ok := ns.publishers["ipns"]
if !ok {
return ErrPublishFailed
}
if err := pub.PublishWithEOL(ctx, name, value, eol); err != nil {
return err
}
ns.addToIpnsCache(name, value, eol)
return nil
wg.Wait()
return dhtErr
}
func (ns *mpns) addToDHTCache(key ci.PrivKey, value path.Path, eol time.Time) {
rr, ok := ns.resolvers["dht"].(*routingResolver)
func (ns *mpns) addToIpnsCache(key ci.PrivKey, value path.Path, eol time.Time) {
rr, ok := ns.resolvers["ipns"].(*routingResolver)
if !ok {
// should never happen, purely for sanity
log.Panicf("unexpected type %T as DHT resolver.", ns.resolvers["dht"])
log.Panicf("unexpected type %T as DHT resolver.", ns.resolvers["ipns"])
}
if rr.cache == nil {
// resolver has no caching

View File

@ -59,8 +59,8 @@ func mockResolverTwo() *mockResolver {
func TestNamesysResolution(t *testing.T) {
r := &mpns{
resolvers: map[string]resolver{
"dht": mockResolverOne(),
"dns": mockResolverTwo(),
"ipns": mockResolverOne(),
"dns": mockResolverTwo(),
},
}

View File

@ -4,6 +4,8 @@ import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"time"
pb "github.com/ipfs/go-ipfs/namesys/pb"
@ -12,15 +14,17 @@ import (
ft "github.com/ipfs/go-ipfs/unixfs"
u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util"
dhtpb "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record/pb"
routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing"
dshelp "gx/ipfs/QmYJgz1Z5PbBGP7n2XA8uv5sF1EKLfYUjL7kFemVAjMNqC/go-ipfs-ds-help"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
ci "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
dsquery "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/query"
base32 "gx/ipfs/QmfVj3x4D6Jkq9SEoi5n2NmoUomLwoeiwnYz2KQa15wRw6/base32"
)
const ipnsPrefix = "/ipns/"
const PublishPutValTimeout = time.Minute
const DefaultRecordTTL = 24 * time.Hour
@ -29,6 +33,9 @@ const DefaultRecordTTL = 24 * time.Hour
type ipnsPublisher struct {
routing routing.ValueStore
ds ds.Datastore
// Used to ensure we assign IPNS records *sequential* sequence numbers.
mu sync.Mutex
}
// NewRoutingPublisher constructs a publisher for the IPFS Routing name system.
@ -46,69 +53,157 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa
return p.PublishWithEOL(ctx, k, value, time.Now().Add(DefaultRecordTTL))
}
func IpnsDsKey(id peer.ID) ds.Key {
return ds.NewKey("/ipns/" + base32.RawStdEncoding.EncodeToString([]byte(id)))
}
// PublishedNames returns the latest IPNS records published by this node and
// their expiration times.
//
// This method will not search the routing system for records published by other
// nodes.
func (p *ipnsPublisher) ListPublished(ctx context.Context) (map[peer.ID]*pb.IpnsEntry, error) {
query, err := p.ds.Query(dsquery.Query{
Prefix: ipnsPrefix,
})
if err != nil {
return nil, err
}
defer query.Close()
records := make(map[peer.ID]*pb.IpnsEntry)
for {
select {
case result, ok := <-query.Next():
if !ok {
return records, nil
}
if result.Error != nil {
return nil, result.Error
}
value, ok := result.Value.([]byte)
if !ok {
log.Error("found ipns record that we couldn't convert to a value")
continue
}
e := new(pb.IpnsEntry)
if err := proto.Unmarshal(value, e); err != nil {
// Might as well return what we can.
log.Error("found an invalid IPNS entry:", err)
continue
}
if !strings.HasPrefix(result.Key, ipnsPrefix) {
log.Errorf("datastore query for keys with prefix %s returned a key: %s", ipnsPrefix, result.Key)
continue
}
k := result.Key[len(ipnsPrefix):]
pid, err := base32.RawStdEncoding.DecodeString(k)
if err != nil {
log.Errorf("ipns ds key invalid: %s", result.Key)
continue
}
records[peer.ID(pid)] = e
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// GetPublished returns the record this node has published corresponding to the
// given peer ID.
//
// If `checkRouting` is true and we have no existing record, this method will
// check the routing system for any existing records.
func (p *ipnsPublisher) GetPublished(ctx context.Context, id peer.ID, checkRouting bool) (*pb.IpnsEntry, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
dsVal, err := p.ds.Get(IpnsDsKey(id))
var value []byte
switch err {
case nil:
var ok bool
value, ok = dsVal.([]byte)
if !ok {
return nil, fmt.Errorf("found ipns record that we couldn't convert to a value")
}
case ds.ErrNotFound:
if !checkRouting {
return nil, nil
}
_, ipnskey := IpnsKeysForID(id)
value, err = p.routing.GetValue(ctx, ipnskey)
if err != nil {
// Not found or other network issue. Can't really do
// anything about this case.
return nil, nil
}
default:
return nil, err
}
e := new(pb.IpnsEntry)
if err := proto.Unmarshal(value, e); err != nil {
return nil, err
}
return e, nil
}
func (p *ipnsPublisher) updateRecord(ctx context.Context, k ci.PrivKey, value path.Path, eol time.Time) (*pb.IpnsEntry, error) {
id, err := peer.IDFromPrivateKey(k)
if err != nil {
return nil, err
}
p.mu.Lock()
defer p.mu.Unlock()
// get previous records sequence number
rec, err := p.GetPublished(ctx, id, true)
if err != nil {
return nil, err
}
seqno := rec.GetSequence() // returns 0 if rec is nil
if rec != nil && value != path.Path(rec.GetValue()) {
// Don't bother incrementing the sequence number unless the
// value changes.
seqno++
}
// Create record
entry, err := CreateRoutingEntryData(k, value, seqno, eol)
if err != nil {
return nil, err
}
// Set the TTL
// TODO: Make this less hacky.
ttl, ok := checkCtxTTL(ctx)
if ok {
entry.Ttl = proto.Uint64(uint64(ttl.Nanoseconds()))
}
data, err := proto.Marshal(entry)
if err != nil {
return nil, err
}
// Put the new record.
if err := p.ds.Put(IpnsDsKey(id), data); err != nil {
return nil, err
}
return entry, nil
}
// PublishWithEOL is a temporary stand in for the ipns records implementation
// see here for more details: https://github.com/ipfs/specs/tree/master/records
func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value path.Path, eol time.Time) error {
id, err := peer.IDFromPrivateKey(k)
record, err := p.updateRecord(ctx, k, value, eol)
if err != nil {
return err
}
_, ipnskey := IpnsKeysForID(id)
// get previous records sequence number
seqnum, err := p.getPreviousSeqNo(ctx, ipnskey)
if err != nil {
return err
}
// increment it
seqnum++
return PutRecordToRouting(ctx, k, value, seqnum, eol, p.routing, id)
}
func (p *ipnsPublisher) getPreviousSeqNo(ctx context.Context, ipnskey string) (uint64, error) {
prevrec, err := p.ds.Get(dshelp.NewKeyFromBinary([]byte(ipnskey)))
if err != nil && err != ds.ErrNotFound {
// None found, lets start at zero!
return 0, err
}
var val []byte
if err == nil {
prbytes, ok := prevrec.([]byte)
if !ok {
return 0, fmt.Errorf("unexpected type returned from datastore: %#v", prevrec)
}
dhtrec := new(dhtpb.Record)
err := proto.Unmarshal(prbytes, dhtrec)
if err != nil {
return 0, err
}
val = dhtrec.GetValue()
} else {
// try and check the dht for a record
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
rv, err := p.routing.GetValue(ctx, ipnskey)
if err != nil {
// no such record found, start at zero!
return 0, nil
}
val = rv
}
e := new(pb.IpnsEntry)
err = proto.Unmarshal(val, e)
if err != nil {
return 0, err
}
return e.GetSequence(), nil
return PutRecordToRouting(ctx, p.routing, k.GetPublic(), record)
}
// setting the TTL on published records is an experimental feature.
@ -124,25 +219,24 @@ func checkCtxTTL(ctx context.Context) (time.Duration, bool) {
return d, ok
}
func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqnum uint64, eol time.Time, r routing.ValueStore, id peer.ID) error {
func PutRecordToRouting(ctx context.Context, r routing.ValueStore, k ci.PubKey, entry *pb.IpnsEntry) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
namekey, ipnskey := IpnsKeysForID(id)
entry, err := CreateRoutingEntryData(k, value, seqnum, eol)
errs := make(chan error, 2) // At most two errors (IPNS, and public key)
id, err := peer.IDFromPublicKey(k)
if err != nil {
return err
}
ttl, ok := checkCtxTTL(ctx)
if ok {
entry.Ttl = proto.Uint64(uint64(ttl.Nanoseconds()))
// Attempt to extract the public key from the ID
extractedPublicKey, err := id.ExtractPublicKey()
if err != nil {
return err
}
errs := make(chan error, 2) // At most two errors (IPNS, and public key)
// Attempt to extract the public key from the ID
extractedPublicKey, _ := id.ExtractPublicKey()
namekey, ipnskey := IpnsKeysForID(id)
go func() {
errs <- PublishEntry(ctx, r, ipnskey, entry)
@ -151,7 +245,7 @@ func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqn
// Publish the public key if a public key cannot be extracted from the ID
if extractedPublicKey == nil {
go func() {
errs <- PublishPublicKey(ctx, r, namekey, k.GetPublic())
errs <- PublishPublicKey(ctx, r, namekey, k)
}()
if err := waitOnErrChan(ctx, errs); err != nil {

View File

@ -75,7 +75,12 @@ func testNamekeyPublisher(t *testing.T, keyType int, expectedErr error, expected
serv := mockrouting.NewServer()
r := serv.ClientWithDatastore(context.Background(), &identity{p}, dstore)
err = PutRecordToRouting(ctx, privKey, value, seqnum, eol, r, id)
entry, err := CreateRoutingEntryData(privKey, value, seqnum, eol)
if err != nil {
t.Fatal(err)
}
err = PutRecordToRouting(ctx, r, pubKey, entry)
if err != nil {
t.Fatal(err)
}

View File

@ -13,9 +13,6 @@ import (
goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
gpctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7/go-log"
recpb "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record/pb"
routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing"
dshelp "gx/ipfs/QmYJgz1Z5PbBGP7n2XA8uv5sF1EKLfYUjL7kFemVAjMNqC/go-ipfs-ds-help"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
ic "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
@ -39,7 +36,7 @@ var FailureRetryInterval = time.Minute * 5
const DefaultRecordLifetime = time.Hour * 24
type Republisher struct {
r routing.ValueStore
ns namesys.Publisher
ds ds.Datastore
self ic.PrivKey
ks keystore.Keystore
@ -51,9 +48,9 @@ type Republisher struct {
}
// NewRepublisher creates a new Republisher
func NewRepublisher(r routing.ValueStore, ds ds.Datastore, self ic.PrivKey, ks keystore.Keystore) *Republisher {
func NewRepublisher(ns namesys.Publisher, ds ds.Datastore, self ic.PrivKey, ks keystore.Keystore) *Republisher {
return &Republisher{
r: r,
ns: ns,
ds: ds,
self: self,
ks: ks,
@ -90,6 +87,10 @@ func (rp *Republisher) republishEntries(p goprocess.Process) error {
ctx, cancel := context.WithCancel(gpctx.OnClosingContext(p))
defer cancel()
// TODO: Use rp.ipns.ListPublished(). We can't currently *do* that
// because:
// 1. There's no way to get keys from the keystore by ID.
// 2. We don't actually have access to the IPNS publisher.
err := rp.republishEntry(ctx, rp.self)
if err != nil {
return err
@ -125,8 +126,7 @@ func (rp *Republisher) republishEntry(ctx context.Context, priv ic.PrivKey) erro
log.Debugf("republishing ipns entry for %s", id)
// Look for it locally only
_, ipnskey := namesys.IpnsKeysForID(id)
p, seq, err := rp.getLastVal(ipnskey)
p, err := rp.getLastVal(id)
if err != nil {
if err == errNoEntry {
return nil
@ -136,33 +136,25 @@ func (rp *Republisher) republishEntry(ctx context.Context, priv ic.PrivKey) erro
// update record with same sequence number
eol := time.Now().Add(rp.RecordLifetime)
err = namesys.PutRecordToRouting(ctx, priv, p, seq, eol, rp.r, id)
if err != nil {
return err
}
return nil
return rp.ns.PublishWithEOL(ctx, priv, p, eol)
}
func (rp *Republisher) getLastVal(k string) (path.Path, uint64, error) {
ival, err := rp.ds.Get(dshelp.NewKeyFromBinary([]byte(k)))
if err != nil {
// not found means we dont have a previously published entry
return "", 0, errNoEntry
func (rp *Republisher) getLastVal(id peer.ID) (path.Path, error) {
// Look for it locally only
vali, err := rp.ds.Get(namesys.IpnsDsKey(id))
switch err {
case nil:
case ds.ErrNotFound:
return "", errNoEntry
default:
return "", err
}
val := ival.([]byte)
dhtrec := new(recpb.Record)
err = proto.Unmarshal(val, dhtrec)
if err != nil {
return "", 0, err
}
val := vali.([]byte)
// extract published data from record
e := new(pb.IpnsEntry)
err = proto.Unmarshal(dhtrec.GetValue(), e)
if err != nil {
return "", 0, err
if err := proto.Unmarshal(val, e); err != nil {
return "", err
}
return path.Path(e.Value), e.GetSequence(), nil
return path.Path(e.Value), nil
}

View File

@ -78,7 +78,7 @@ func TestRepublish(t *testing.T) {
// The republishers that are contained within the nodes have their timeout set
// to 12 hours. Instead of trying to tweak those, we're just going to pretend
// they dont exist and make our own.
repub := NewRepublisher(publisher.Routing, publisher.Repo.Datastore(), publisher.PrivateKey, publisher.Repo.Keystore())
repub := NewRepublisher(rp, publisher.Repo.Datastore(), publisher.PrivateKey, publisher.Repo.Keystore())
repub.Interval = time.Second
repub.RecordLifetime = time.Second * 5

View File

@ -70,7 +70,12 @@ func TestPrexistingExpiredRecord(t *testing.T) {
// Make an expired record and put it in the datastore
h := path.FromString("/ipfs/QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN")
eol := time.Now().Add(time.Hour * -1)
err = PutRecordToRouting(context.Background(), privk, h, 0, eol, d, id)
entry, err := CreateRoutingEntryData(privk, h, 0, eol)
if err != nil {
t.Fatal(err)
}
err = PutRecordToRouting(context.Background(), d, pubk, entry)
if err != nil {
t.Fatal(err)
}
@ -107,7 +112,11 @@ func TestPrexistingRecord(t *testing.T) {
// Make a good record and put it in the datastore
h := path.FromString("/ipfs/QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN")
eol := time.Now().Add(time.Hour)
err = PutRecordToRouting(context.Background(), privk, h, 0, eol, d, id)
entry, err := CreateRoutingEntryData(privk, h, 0, eol)
if err != nil {
t.Fatal(err)
}
err = PutRecordToRouting(context.Background(), d, pubk, entry)
if err != nil {
t.Fatal(err)
}