extract IPNS over pubsub as a ValueStore

And:

* Update for DHT changes.
* Switch to the new record validation system.

License: MIT
Signed-off-by: Steven Allen <steven@stebalien.com>
This commit is contained in:
Steven Allen 2018-05-03 21:55:38 -07:00
parent 1e9e2f453c
commit 5dc0b7326e
13 changed files with 232 additions and 938 deletions

View File

@ -2,15 +2,15 @@ package commands
import (
"errors"
"fmt"
"io"
"strings"
cmds "github.com/ipfs/go-ipfs/commands"
e "github.com/ipfs/go-ipfs/core/commands/e"
ns "github.com/ipfs/go-ipfs/namesys"
"gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
record "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
cmdkit "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
)
type ipnsPubsubState struct {
@ -49,8 +49,7 @@ var ipnspsStateCmd = &cmds.Command{
return
}
_, ok := n.Namesys.GetResolver("pubsub")
res.SetOutput(&ipnsPubsubState{ok})
res.SetOutput(&ipnsPubsubState{n.PSRouter != nil})
},
Type: ipnsPubsubState{},
Marshalers: cmds.MarshalerMap{
@ -88,19 +87,26 @@ var ipnspsSubsCmd = &cmds.Command{
return
}
r, ok := n.Namesys.GetResolver("pubsub")
if !ok {
if n.PSRouter == nil {
res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient)
return
}
psr, ok := r.(*ns.PubsubResolver)
if !ok {
res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal)
return
var paths []string
for _, key := range n.PSRouter.GetSubscriptions() {
ns, k, err := record.SplitKey(key)
if err != nil || ns != "ipns" {
// Not necessarily an error.
continue
}
pid, err := peer.IDFromString(k)
if err != nil {
log.Errorf("ipns key not a valid peer ID: %s", err)
continue
}
paths = append(paths, "/ipns/"+peer.IDB58Encode(pid))
}
res.SetOutput(&stringList{psr.GetSubscriptions()})
res.SetOutput(&stringList{paths})
},
Type: stringList{},
Marshalers: cmds.MarshalerMap{
@ -119,19 +125,20 @@ var ipnspsCancelCmd = &cmds.Command{
return
}
r, ok := n.Namesys.GetResolver("pubsub")
if !ok {
if n.PSRouter == nil {
res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient)
return
}
psr, ok := r.(*ns.PubsubResolver)
if !ok {
res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal)
name := req.Arguments()[0]
name = strings.TrimPrefix(name, "/ipns/")
pid, err := peer.IDB58Decode(name)
if err != nil {
res.SetError(err, cmdkit.ErrClient)
return
}
ok = psr.Cancel(req.Arguments()[0])
ok := n.PSRouter.Cancel("/ipns/" + string(pid))
res.SetOutput(&ipnsPubsubCancel{ok})
},
Arguments: []cmdkit.Argument{

View File

@ -48,6 +48,7 @@ import (
mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
logging "gx/ipfs/QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7/go-log"
addrutil "gx/ipfs/QmTGSre9j1otFgsr1opCUQDXTPSM6BTZnMWwPeA5nYJM7w/go-addr-util"
record "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record"
routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing"
floodsub "gx/ipfs/QmVKrsEgixRtMWcMd6WQzuwqCUC3jfLf7Q7xcjnKoMMikS/go-libp2p-floodsub"
mssmux "gx/ipfs/QmVniQJkdzLZaZwzwMdd3dJTvWiJ1DQEkreVy6hs6h7Vk5/go-smux-multistream"
@ -65,13 +66,16 @@ import (
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
dht "gx/ipfs/Qmd3jqhBQFvhfBNTSJMQL15GgyVMpdxKTta69Napvx6Myd/go-libp2p-kad-dht"
dhtopts "gx/ipfs/Qmd3jqhBQFvhfBNTSJMQL15GgyVMpdxKTta69Napvx6Myd/go-libp2p-kad-dht/opts"
ipnet "gx/ipfs/Qmd3oYWVLCVWryDV6Pobv6whZcvDXAHqS3chemZ658y4a8/go-libp2p-interface-pnet"
psrouter "gx/ipfs/QmdSX2uedxXdsfNSqbkfSxcYi7pXBBdvp1Km2ZsjPWfydt/go-libp2p-pubsub-router"
exchange "gx/ipfs/QmdcAXgEHUueP4A7b5hjabKn2EooeHgMreMvFC249dGCgc/go-ipfs-exchange-interface"
pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore"
ic "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
mplex "gx/ipfs/QmenmFuirGzv8S1R3DyvbZ6tFmQapkGeDCebgYzni1Ntn3/go-smux-multiplex"
rhelpers "gx/ipfs/QmeoG1seQ8a9b2vS3XJ8HPz9tXr6dzpS5NizjuDDSntQEk/go-libp2p-routing-helpers"
mafilter "gx/ipfs/Qmf2UAmRwDG4TvnkQpHZWPAzw7rpCYVhxmRXmYxXr5LD1g/go-maddr-filter"
ifconnmgr "gx/ipfs/QmfQNieWBPwmnUjXWPZbjJPzhNwFFabTb5RQ79dyVWGujQ/go-libp2p-interface-connmgr"
p2phost "gx/ipfs/QmfZTdmunzKzAGJrSvXXQbQ5kLLUiEMX5vdwux7iXkdk7D/go-libp2p-host"
@ -135,6 +139,7 @@ type IpfsNode struct {
IpnsRepub *ipnsrp.Republisher
Floodsub *floodsub.PubSub
PSRouter *psrouter.PubsubValueStore
P2P *p2p.P2P
proc goprocess.Process
@ -240,7 +245,7 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return err
}
if err := n.startOnlineServicesWithHost(ctx, peerhost, routingOption); err != nil {
if err := n.startOnlineServicesWithHost(ctx, peerhost, routingOption, pubsub, ipnsps); err != nil {
return err
}
@ -249,21 +254,6 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return err
}
if pubsub || ipnsps {
service, err := floodsub.NewFloodSub(ctx, peerhost)
if err != nil {
return err
}
n.Floodsub = service
}
if ipnsps {
err = namesys.AddPubsubNameSystem(ctx, n.Namesys, n.PeerHost, n.Routing, n.Repo.Datastore(), n.Floodsub)
if err != nil {
return err
}
}
n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore)
// setup local discovery
@ -437,17 +427,50 @@ func (n *IpfsNode) HandlePeerFound(p pstore.PeerInfo) {
// startOnlineServicesWithHost is the set of services which need to be
// initialized with the host and _before_ we start listening.
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption) error {
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption, pubsub bool, ipnsps bool) error {
// setup diagnostics service
n.Ping = ping.NewPingService(host)
if pubsub || ipnsps {
service, err := floodsub.NewFloodSub(ctx, host)
if err != nil {
return err
}
n.Floodsub = service
}
validator := record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": namesys.IpnsValidator{KeyBook: host.Peerstore()},
}
// setup routing service
r, err := routingOption(ctx, host, n.Repo.Datastore())
r, err := routingOption(ctx, host, n.Repo.Datastore(), validator)
if err != nil {
return err
}
n.Routing = r
if ipnsps {
n.PSRouter = psrouter.NewPubsubValueStore(
ctx,
host,
n.Routing,
n.Floodsub,
validator,
)
n.Routing = rhelpers.Tiered{
// Always check pubsub first.
&rhelpers.Compose{
ValueStore: &rhelpers.LimitedValueStore{
ValueStore: n.PSRouter,
Namespaces: []string{"ipns"},
},
},
n.Routing,
}
}
// Wrap standard peer host with routing system to allow unknown peer lookups
n.PeerHost = rhost.Wrap(host, n.Routing)
@ -935,21 +958,24 @@ func startListening(host p2phost.Host, cfg *config.Config) error {
return nil
}
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching) (routing.IpfsRouting, error) {
dhtRouting := dht.NewDHT(ctx, host, dstore)
dhtRouting.Validator[IpnsValidatorTag] = namesys.NewIpnsRecordValidator(host.Peerstore())
dhtRouting.Selector[IpnsValidatorTag] = namesys.IpnsSelectorFunc
return dhtRouting, nil
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
return dht.New(
ctx, host,
dhtopts.Datastore(dstore),
dhtopts.Validator(validator),
)
}
func constructClientDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching) (routing.IpfsRouting, error) {
dhtRouting := dht.NewDHTClient(ctx, host, dstore)
dhtRouting.Validator[IpnsValidatorTag] = namesys.NewIpnsRecordValidator(host.Peerstore())
dhtRouting.Selector[IpnsValidatorTag] = namesys.IpnsSelectorFunc
return dhtRouting, nil
func constructClientDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
return dht.New(
ctx, host,
dhtopts.Client(true),
dhtopts.Datastore(dstore),
dhtopts.Validator(validator),
)
}
type RoutingOption func(context.Context, p2phost.Host, ds.Batching) (routing.IpfsRouting, error)
type RoutingOption func(context.Context, p2phost.Host, ds.Batching, record.Validator) (routing.IpfsRouting, error)
type DiscoveryOption func(context.Context, p2phost.Host) (discovery.Service, error)

View File

@ -61,7 +61,6 @@ var ErrPublishFailed = errors.New("could not publish name")
type NameSystem interface {
Resolver
Publisher
ResolverLookup
}
// Resolver is an object capable of resolving names.
@ -95,10 +94,3 @@ type Publisher interface {
// call once the records spec is implemented
PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error
}
// ResolverLookup is an object capable of finding resolvers for a subsystem
type ResolverLookup interface {
// GetResolver retrieves a resolver associated with a subsystem
GetResolver(subs string) (Resolver, bool)
}

View File

@ -12,8 +12,8 @@ import (
u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util"
mockrouting "gx/ipfs/QmPuPdzoG4b5uyYSQCjLEHB8NM593m3BW19UHX2jZ6Wzfm/go-ipfs-routing/mock"
record "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record"
recordpb "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record/pb"
routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing"
ropts "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing/options"
testutil "gx/ipfs/QmUJzxQQ2kzwQubsMqBTr1NGDpLfh7pGA2E1oaJULcKDPq/go-testutil"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
@ -23,8 +23,10 @@ import (
dssync "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/sync"
)
func testValidatorCase(t *testing.T, priv ci.PrivKey, kbook pstore.KeyBook, ns string, key string, val []byte, eol time.Time, exp error) {
validChecker := NewIpnsRecordValidator(kbook)
func testValidatorCase(t *testing.T, priv ci.PrivKey, kbook pstore.KeyBook, key string, val []byte, eol time.Time, exp error) {
t.Helper()
validator := IpnsValidator{kbook}
p := path.Path("/ipfs/QmfM2r8seH2GiRaC4esTjeraXEachRt8ZsSeGaWTPLyMoG")
entry, err := CreateRoutingEntryData(priv, p, 1, eol)
@ -39,15 +41,9 @@ func testValidatorCase(t *testing.T, priv ci.PrivKey, kbook pstore.KeyBook, ns s
t.Fatal(err)
}
}
rec := &record.ValidationRecord{
Namespace: ns,
Key: key,
Value: data,
}
err = validChecker.Func(rec)
err = validator.Validate(key, data)
if err != exp {
params := fmt.Sprintf("namespace: %s\nkey: %s\neol: %s\n", ns, key, eol)
params := fmt.Sprintf("key: %s\neol: %s\n", key, eol)
if exp == nil {
t.Fatalf("Unexpected error %s for params %s", err, params)
} else if err == nil {
@ -67,15 +63,15 @@ func TestValidator(t *testing.T) {
kbook.AddPubKey(id, priv.GetPublic())
emptyKbook := pstore.NewPeerstore()
testValidatorCase(t, priv, kbook, "ipns", string(id), nil, ts.Add(time.Hour), nil)
testValidatorCase(t, priv, kbook, "ipns", string(id), nil, ts.Add(time.Hour*-1), ErrExpiredRecord)
testValidatorCase(t, priv, kbook, "ipns", string(id), []byte("bad data"), ts.Add(time.Hour), ErrBadRecord)
testValidatorCase(t, priv, kbook, "ipns", "bad key", nil, ts.Add(time.Hour), ErrKeyFormat)
testValidatorCase(t, priv, emptyKbook, "ipns", string(id), nil, ts.Add(time.Hour), ErrPublicKeyNotFound)
testValidatorCase(t, priv2, kbook, "ipns", string(id2), nil, ts.Add(time.Hour), ErrPublicKeyNotFound)
testValidatorCase(t, priv2, kbook, "ipns", string(id), nil, ts.Add(time.Hour), ErrSignature)
testValidatorCase(t, priv, kbook, "", string(id), nil, ts.Add(time.Hour), ErrInvalidPath)
testValidatorCase(t, priv, kbook, "wrong", string(id), nil, ts.Add(time.Hour), ErrInvalidPath)
testValidatorCase(t, priv, kbook, "/ipns/"+string(id), nil, ts.Add(time.Hour), nil)
testValidatorCase(t, priv, kbook, "/ipns/"+string(id), nil, ts.Add(time.Hour*-1), ErrExpiredRecord)
testValidatorCase(t, priv, kbook, "/ipns/"+string(id), []byte("bad data"), ts.Add(time.Hour), ErrBadRecord)
testValidatorCase(t, priv, kbook, "/ipns/"+"bad key", nil, ts.Add(time.Hour), ErrKeyFormat)
testValidatorCase(t, priv, emptyKbook, "/ipns/"+string(id), nil, ts.Add(time.Hour), ErrPublicKeyNotFound)
testValidatorCase(t, priv2, kbook, "/ipns/"+string(id2), nil, ts.Add(time.Hour), ErrPublicKeyNotFound)
testValidatorCase(t, priv2, kbook, "/ipns/"+string(id), nil, ts.Add(time.Hour), ErrSignature)
testValidatorCase(t, priv, kbook, "//"+string(id), nil, ts.Add(time.Hour), ErrInvalidPath)
testValidatorCase(t, priv, kbook, "/wrong/"+string(id), nil, ts.Add(time.Hour), ErrInvalidPath)
}
func TestResolverValidation(t *testing.T) {
@ -85,13 +81,6 @@ func TestResolverValidation(t *testing.T) {
peerstore := pstore.NewPeerstore()
vstore := newMockValueStore(rid, dstore, peerstore)
vstore.Validator["ipns"] = NewIpnsRecordValidator(peerstore)
vstore.Validator["pk"] = &record.ValidChecker{
Func: func(r *record.ValidationRecord) error {
return nil
},
Sign: false,
}
resolver := NewRoutingResolver(vstore, 0)
// Create entry with expiry in one hour
@ -224,19 +213,19 @@ type mockValueStore struct {
func newMockValueStore(id testutil.Identity, dstore ds.Datastore, kbook pstore.KeyBook) *mockValueStore {
serv := mockrouting.NewServer()
r := serv.ClientWithDatastore(context.Background(), id, dstore)
return &mockValueStore{r, kbook, make(record.Validator)}
return &mockValueStore{r, kbook, record.NamespacedValidator{
"ipns": IpnsValidator{kbook},
"pk": record.PublicKeyValidator{},
}}
}
func (m *mockValueStore) GetValue(ctx context.Context, k string) ([]byte, error) {
data, err := m.r.GetValue(ctx, k)
func (m *mockValueStore) GetValue(ctx context.Context, k string, opts ...ropts.Option) ([]byte, error) {
data, err := m.r.GetValue(ctx, k, opts...)
if err != nil {
return data, err
}
rec := new(recordpb.Record)
rec.Key = proto.String(k)
rec.Value = data
if err = m.Validator.VerifyRecord(rec); err != nil {
if err = m.Validator.Validate(k, data); err != nil {
return nil, err
}
@ -263,23 +252,6 @@ func (m *mockValueStore) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey
return pk, m.kbook.AddPubKey(p, pk)
}
func (m *mockValueStore) GetValues(ctx context.Context, k string, count int) ([]routing.RecvdVal, error) {
vals, err := m.r.GetValues(ctx, k, count)
if err != nil {
return nil, err
}
valid := make([]routing.RecvdVal, 0, len(vals))
for _, v := range vals {
rec := new(recordpb.Record)
rec.Key = proto.String(k)
rec.Value = v.Val
if err = m.Validator.VerifyRecord(rec); err == nil {
valid = append(valid, v)
}
}
return valid, nil
}
func (m *mockValueStore) PutValue(ctx context.Context, k string, d []byte) error {
return m.r.PutValue(ctx, k, d)
func (m *mockValueStore) PutValue(ctx context.Context, k string, d []byte, opts ...ropts.Option) error {
return m.r.PutValue(ctx, k, d, opts...)
}

View File

@ -2,7 +2,6 @@ package namesys
import (
"context"
"errors"
"strings"
"sync"
"time"
@ -11,13 +10,11 @@ import (
path "github.com/ipfs/go-ipfs/path"
routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing"
floodsub "gx/ipfs/QmVKrsEgixRtMWcMd6WQzuwqCUC3jfLf7Q7xcjnKoMMikS/go-libp2p-floodsub"
isd "gx/ipfs/QmZmmuAXgX73UQmX1jRKjTGmjzq24Jinqkq8vzkBtno4uX/go-is-domain"
mh "gx/ipfs/QmZyZDi491cCNTLfAhwcaDii2Kg4pwKRkhqQzURGDvY6ua/go-multihash"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
ci "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
p2phost "gx/ipfs/QmfZTdmunzKzAGJrSvXXQbQ5kLLUiEMX5vdwux7iXkdk7D/go-libp2p-host"
)
// mpns (a multi-protocol NameSystem) implements generic IPFS naming.
@ -48,23 +45,6 @@ func NewNameSystem(r routing.ValueStore, ds ds.Datastore, cachesize int) NameSys
}
}
// AddPubsubNameSystem adds the pubsub publisher and resolver to the namesystem
func AddPubsubNameSystem(ctx context.Context, ns NameSystem, host p2phost.Host, r routing.IpfsRouting, ds ds.Datastore, ps *floodsub.PubSub) error {
mpns, ok := ns.(*mpns)
if !ok {
return errors.New("unexpected NameSystem; not an mpns instance")
}
pkf, ok := r.(routing.PubKeyFetcher)
if !ok {
return errors.New("unexpected IpfsRouting; not a PubKeyFetcher instance")
}
mpns.resolvers["pubsub"] = NewPubsubResolver(ctx, host, r, pkf, ps)
mpns.publishers["pubsub"] = NewPubsubPublisher(ctx, host, ds, r, ps)
return nil
}
const DefaultResolverCacheTTL = time.Minute
// Resolve implements Resolver.
@ -219,16 +199,3 @@ func (ns *mpns) addToDHTCache(key ci.PrivKey, value path.Path, eol time.Time) {
eol: eol,
})
}
// GetResolver implements ResolverLookup
func (ns *mpns) GetResolver(subs string) (Resolver, bool) {
res, ok := ns.resolvers[subs]
if ok {
ires, ok := res.(Resolver)
if ok {
return ires, true
}
}
return nil, false
}

View File

@ -142,7 +142,7 @@ func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqn
errs := make(chan error, 2) // At most two errors (IPNS, and public key)
// Attempt to extract the public key from the ID
extractedPublicKey := id.ExtractPublicKey()
extractedPublicKey, _ := id.ExtractPublicKey()
go func() {
errs <- PublishEntry(ctx, r, ipnskey, entry)

View File

@ -49,14 +49,7 @@ func testNamekeyPublisher(t *testing.T, keyType int, expectedErr error, expected
}
// ID
var id peer.ID
switch keyType {
case ci.Ed25519:
id, err = peer.IDFromEd25519PublicKey(pubKey)
default:
id, err = peer.IDFromPublicKey(pubKey)
}
id, err := peer.IDFromPublicKey(pubKey)
if err != nil {
t.Fatal(err)
}

View File

@ -1,426 +0,0 @@
package namesys
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
opts "github.com/ipfs/go-ipfs/namesys/opts"
pb "github.com/ipfs/go-ipfs/namesys/pb"
path "github.com/ipfs/go-ipfs/path"
u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util"
record "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record"
dhtpb "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record/pb"
routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing"
floodsub "gx/ipfs/QmVKrsEgixRtMWcMd6WQzuwqCUC3jfLf7Q7xcjnKoMMikS/go-libp2p-floodsub"
dshelp "gx/ipfs/QmYJgz1Z5PbBGP7n2XA8uv5sF1EKLfYUjL7kFemVAjMNqC/go-ipfs-ds-help"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
mh "gx/ipfs/QmZyZDi491cCNTLfAhwcaDii2Kg4pwKRkhqQzURGDvY6ua/go-multihash"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore"
ci "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
dssync "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/sync"
p2phost "gx/ipfs/QmfZTdmunzKzAGJrSvXXQbQ5kLLUiEMX5vdwux7iXkdk7D/go-libp2p-host"
)
// PubsubPublisher is a publisher that distributes IPNS records through pubsub
type PubsubPublisher struct {
ctx context.Context
ds ds.Datastore
host p2phost.Host
cr routing.ContentRouting
ps *floodsub.PubSub
mx sync.Mutex
subs map[string]struct{}
}
// PubsubResolver is a resolver that receives IPNS records through pubsub
type PubsubResolver struct {
ctx context.Context
ds ds.Datastore
host p2phost.Host
cr routing.ContentRouting
pkf routing.PubKeyFetcher
ps *floodsub.PubSub
mx sync.Mutex
subs map[string]*floodsub.Subscription
}
// NewPubsubPublisher constructs a new Publisher that publishes IPNS records through pubsub.
// The constructor interface is complicated by the need to bootstrap the pubsub topic.
// This could be greatly simplified if the pubsub implementation handled bootstrap itself
func NewPubsubPublisher(ctx context.Context, host p2phost.Host, ds ds.Datastore, cr routing.ContentRouting, ps *floodsub.PubSub) *PubsubPublisher {
return &PubsubPublisher{
ctx: ctx,
ds: ds,
host: host, // needed for pubsub bootstrap
cr: cr, // needed for pubsub bootstrap
ps: ps,
subs: make(map[string]struct{}),
}
}
// NewPubsubResolver constructs a new Resolver that resolves IPNS records through pubsub.
// same as above for pubsub bootstrap dependencies
func NewPubsubResolver(ctx context.Context, host p2phost.Host, cr routing.ContentRouting, pkf routing.PubKeyFetcher, ps *floodsub.PubSub) *PubsubResolver {
return &PubsubResolver{
ctx: ctx,
ds: dssync.MutexWrap(ds.NewMapDatastore()),
host: host, // needed for pubsub bootstrap
cr: cr, // needed for pubsub bootstrap
pkf: pkf,
ps: ps,
subs: make(map[string]*floodsub.Subscription),
}
}
// Publish publishes an IPNS record through pubsub with default TTL
func (p *PubsubPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Path) error {
return p.PublishWithEOL(ctx, k, value, time.Now().Add(DefaultRecordTTL))
}
// PublishWithEOL publishes an IPNS record through pubsub
func (p *PubsubPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value path.Path, eol time.Time) error {
id, err := peer.IDFromPrivateKey(k)
if err != nil {
return err
}
_, ipnskey := IpnsKeysForID(id)
seqno, err := p.getPreviousSeqNo(ctx, ipnskey)
if err != nil {
return err
}
seqno++
return p.publishRecord(ctx, k, value, seqno, eol, ipnskey, id)
}
func (p *PubsubPublisher) getPreviousSeqNo(ctx context.Context, ipnskey string) (uint64, error) {
// the datastore is shared with the routing publisher to properly increment and persist
// ipns record sequence numbers.
prevrec, err := p.ds.Get(dshelp.NewKeyFromBinary([]byte(ipnskey)))
if err != nil {
if err == ds.ErrNotFound {
// None found, lets start at zero!
return 0, nil
}
return 0, err
}
prbytes, ok := prevrec.([]byte)
if !ok {
return 0, fmt.Errorf("unexpected type returned from datastore: %#v", prevrec)
}
var dsrec dhtpb.Record
err = proto.Unmarshal(prbytes, &dsrec)
if err != nil {
return 0, err
}
var entry pb.IpnsEntry
err = proto.Unmarshal(dsrec.GetValue(), &entry)
if err != nil {
return 0, err
}
return entry.GetSequence(), nil
}
func (p *PubsubPublisher) publishRecord(ctx context.Context, k ci.PrivKey, value path.Path, seqno uint64, eol time.Time, ipnskey string, ID peer.ID) error {
entry, err := CreateRoutingEntryData(k, value, seqno, eol)
if err != nil {
return err
}
data, err := proto.Marshal(entry)
if err != nil {
return err
}
// the datastore is shared with the routing publisher to properly increment and persist
// ipns record sequence numbers; so we need to Record our new entry in the datastore
dsrec, err := record.MakePutRecord(k, ipnskey, data, true)
if err != nil {
return err
}
dsdata, err := proto.Marshal(dsrec)
if err != nil {
return err
}
err = p.ds.Put(dshelp.NewKeyFromBinary([]byte(ipnskey)), dsdata)
if err != nil {
return err
}
// now we publish, but we also need to bootstrap pubsub for our messages to propagate
topic := "/ipns/" + ID.Pretty()
p.mx.Lock()
_, ok := p.subs[topic]
if !ok {
p.subs[topic] = struct{}{}
p.mx.Unlock()
bootstrapPubsub(p.ctx, p.cr, p.host, topic)
} else {
p.mx.Unlock()
}
log.Debugf("PubsubPublish: publish IPNS record for %s (%d)", topic, seqno)
return p.ps.Publish(topic, data)
}
// Resolve resolves a name through pubsub and default depth limit
func (r *PubsubResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) {
return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/")
}
func (r *PubsubResolver) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, error) {
log.Debugf("PubsubResolve: resolve '%s'", name)
// retrieve the public key once (for verifying messages)
xname := strings.TrimPrefix(name, "/ipns/")
hash, err := mh.FromB58String(xname)
if err != nil {
log.Warningf("PubsubResolve: bad input hash: [%s]", xname)
return "", err
}
id := peer.ID(hash)
if r.host.Peerstore().PrivKey(id) != nil {
return "", errors.New("cannot resolve own name through pubsub")
}
pubk := id.ExtractPublicKey()
if pubk == nil {
pubk, err = r.pkf.GetPublicKey(ctx, id)
if err != nil {
log.Warningf("PubsubResolve: error fetching public key: %s [%s]", err.Error(), xname)
return "", err
}
}
// the topic is /ipns/Qmhash
if !strings.HasPrefix(name, "/ipns/") {
name = "/ipns/" + name
}
r.mx.Lock()
// see if we already have a pubsub subscription; if not, subscribe
sub, ok := r.subs[name]
if !ok {
sub, err = r.ps.Subscribe(name)
if err != nil {
r.mx.Unlock()
return "", err
}
log.Debugf("PubsubResolve: subscribed to %s", name)
r.subs[name] = sub
ctx, cancel := context.WithCancel(r.ctx)
go r.handleSubscription(sub, name, pubk, cancel)
go bootstrapPubsub(ctx, r.cr, r.host, name)
}
r.mx.Unlock()
// resolve to what we may already have in the datastore
dsval, err := r.ds.Get(dshelp.NewKeyFromBinary([]byte(name)))
if err != nil {
if err == ds.ErrNotFound {
return "", ErrResolveFailed
}
return "", err
}
data := dsval.([]byte)
entry := new(pb.IpnsEntry)
err = proto.Unmarshal(data, entry)
if err != nil {
return "", err
}
// check EOL; if the entry has expired, delete from datastore and return ds.ErrNotFound
eol, ok := checkEOL(entry)
if ok && eol.Before(time.Now()) {
err = r.ds.Delete(dshelp.NewKeyFromBinary([]byte(name)))
if err != nil {
log.Warningf("PubsubResolve: error deleting stale value for %s: %s", name, err.Error())
}
return "", ErrResolveFailed
}
value, err := path.ParsePath(string(entry.GetValue()))
return value, err
}
// GetSubscriptions retrieves a list of active topic subscriptions
func (r *PubsubResolver) GetSubscriptions() []string {
r.mx.Lock()
defer r.mx.Unlock()
var res []string
for sub := range r.subs {
res = append(res, sub)
}
return res
}
// Cancel cancels a topic subscription; returns true if an active
// subscription was canceled
func (r *PubsubResolver) Cancel(name string) bool {
r.mx.Lock()
defer r.mx.Unlock()
sub, ok := r.subs[name]
if ok {
sub.Cancel()
delete(r.subs, name)
}
return ok
}
func (r *PubsubResolver) handleSubscription(sub *floodsub.Subscription, name string, pubk ci.PubKey, cancel func()) {
defer sub.Cancel()
defer cancel()
for {
msg, err := sub.Next(r.ctx)
if err != nil {
if err != context.Canceled {
log.Warningf("PubsubResolve: subscription error in %s: %s", name, err.Error())
}
return
}
err = r.receive(msg, name, pubk)
if err != nil {
log.Warningf("PubsubResolve: error processing update for %s: %s", name, err.Error())
}
}
}
func (r *PubsubResolver) receive(msg *floodsub.Message, name string, pubk ci.PubKey) error {
data := msg.GetData()
if data == nil {
return errors.New("empty message")
}
entry := new(pb.IpnsEntry)
err := proto.Unmarshal(data, entry)
if err != nil {
return err
}
ok, err := pubk.Verify(ipnsEntryDataForSig(entry), entry.GetSignature())
if err != nil || !ok {
return errors.New("signature verification failed")
}
_, err = path.ParsePath(string(entry.GetValue()))
if err != nil {
return err
}
eol, ok := checkEOL(entry)
if ok && eol.Before(time.Now()) {
return errors.New("stale update; EOL exceeded")
}
// check the sequence number against what we may already have in our datastore
oval, err := r.ds.Get(dshelp.NewKeyFromBinary([]byte(name)))
if err == nil {
odata := oval.([]byte)
oentry := new(pb.IpnsEntry)
err = proto.Unmarshal(odata, oentry)
if err != nil {
return err
}
if entry.GetSequence() <= oentry.GetSequence() {
return errors.New("stale update; sequence number too small")
}
}
log.Debugf("PubsubResolve: receive IPNS record for %s", name)
return r.ds.Put(dshelp.NewKeyFromBinary([]byte(name)), data)
}
// rendezvous with peers in the name topic through provider records
// Note: rendezvous/boostrap should really be handled by the pubsub implementation itself!
func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phost.Host, name string) {
topic := "floodsub:" + name
hash := u.Hash([]byte(topic))
rz := cid.NewCidV1(cid.Raw, hash)
err := cr.Provide(ctx, rz, true)
if err != nil {
log.Warningf("bootstrapPubsub: error providing rendezvous for %s: %s", topic, err.Error())
}
go func() {
for {
select {
case <-time.After(8 * time.Hour):
err := cr.Provide(ctx, rz, true)
if err != nil {
log.Warningf("bootstrapPubsub: error providing rendezvous for %s: %s", topic, err.Error())
}
case <-ctx.Done():
return
}
}
}()
rzctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
wg := &sync.WaitGroup{}
for pi := range cr.FindProvidersAsync(rzctx, rz, 10) {
if pi.ID == host.ID() {
continue
}
wg.Add(1)
go func(pi pstore.PeerInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := host.Connect(ctx, pi)
if err != nil {
log.Debugf("Error connecting to pubsub peer %s: %s", pi.ID, err.Error())
return
}
// delay to let pubsub perform its handshake
time.Sleep(time.Millisecond * 250)
log.Debugf("Connected to pubsub peer %s", pi.ID)
}(pi)
}
wg.Wait()
}

View File

@ -1,197 +0,0 @@
package namesys
import (
"context"
"sync"
"testing"
"time"
path "github.com/ipfs/go-ipfs/path"
mockrouting "gx/ipfs/QmPuPdzoG4b5uyYSQCjLEHB8NM593m3BW19UHX2jZ6Wzfm/go-ipfs-routing/mock"
routing "gx/ipfs/QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz/go-libp2p-routing"
testutil "gx/ipfs/QmUJzxQQ2kzwQubsMqBTr1NGDpLfh7pGA2E1oaJULcKDPq/go-testutil"
floodsub "gx/ipfs/QmVKrsEgixRtMWcMd6WQzuwqCUC3jfLf7Q7xcjnKoMMikS/go-libp2p-floodsub"
netutil "gx/ipfs/Qmb6BsZf6Y3kxffXMNTubGPF1w1bkHtpvhfYbmnwP3NQyw/go-libp2p-netutil"
bhost "gx/ipfs/Qmc64U41EEB4nPG7wxjEqFwKJajS2f8kk5q2TvUrQf78Xu/go-libp2p-blankhost"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
pstore "gx/ipfs/QmdeiKhUy1TVGBaKxt7y1QmBDLBdisSrLJ1x58Eoj4PXUh/go-libp2p-peerstore"
ci "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
p2phost "gx/ipfs/QmfZTdmunzKzAGJrSvXXQbQ5kLLUiEMX5vdwux7iXkdk7D/go-libp2p-host"
)
func newNetHost(ctx context.Context, t *testing.T) p2phost.Host {
netw := netutil.GenSwarmNetwork(t, ctx)
return bhost.NewBlankHost(netw)
}
func newNetHosts(ctx context.Context, t *testing.T, n int) []p2phost.Host {
var out []p2phost.Host
for i := 0; i < n; i++ {
h := newNetHost(ctx, t)
out = append(out, h)
}
return out
}
// PubKeyFetcher implementation with a global key store
type mockKeyStore struct {
keys map[peer.ID]ci.PubKey
mx sync.Mutex
}
func (m *mockKeyStore) addPubKey(id peer.ID, pkey ci.PubKey) {
m.mx.Lock()
defer m.mx.Unlock()
m.keys[id] = pkey
}
func (m *mockKeyStore) getPubKey(id peer.ID) (ci.PubKey, error) {
m.mx.Lock()
defer m.mx.Unlock()
pkey, ok := m.keys[id]
if ok {
return pkey, nil
}
return nil, routing.ErrNotFound
}
func (m *mockKeyStore) GetPublicKey(ctx context.Context, id peer.ID) (ci.PubKey, error) {
return m.getPubKey(id)
}
func newMockKeyStore() *mockKeyStore {
return &mockKeyStore{
keys: make(map[peer.ID]ci.PubKey),
}
}
// ConentRouting mock
func newMockRouting(ms mockrouting.Server, ks *mockKeyStore, host p2phost.Host) routing.ContentRouting {
id := host.ID()
privk := host.Peerstore().PrivKey(id)
pubk := host.Peerstore().PubKey(id)
pi := host.Peerstore().PeerInfo(id)
ks.addPubKey(id, pubk)
return ms.Client(testutil.NewIdentity(id, pi.Addrs[0], privk, pubk))
}
func newMockRoutingForHosts(ms mockrouting.Server, ks *mockKeyStore, hosts []p2phost.Host) []routing.ContentRouting {
rs := make([]routing.ContentRouting, len(hosts))
for i := 0; i < len(hosts); i++ {
rs[i] = newMockRouting(ms, ks, hosts[i])
}
return rs
}
// tests
func TestPubsubPublishSubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ms := mockrouting.NewServer()
ks := newMockKeyStore()
pubhost := newNetHost(ctx, t)
pubmr := newMockRouting(ms, ks, pubhost)
fs, err := floodsub.NewFloodSub(ctx, pubhost)
if err != nil {
t.Fatal(err)
}
pub := NewPubsubPublisher(ctx, pubhost, ds.NewMapDatastore(), pubmr, fs)
privk := pubhost.Peerstore().PrivKey(pubhost.ID())
pubpinfo := pstore.PeerInfo{ID: pubhost.ID(), Addrs: pubhost.Addrs()}
name := "/ipns/" + pubhost.ID().Pretty()
reshosts := newNetHosts(ctx, t, 5)
resmrs := newMockRoutingForHosts(ms, ks, reshosts)
res := make([]*PubsubResolver, len(reshosts))
for i := 0; i < len(res); i++ {
fs, err := floodsub.NewFloodSub(ctx, reshosts[i])
if err != nil {
t.Fatal(err)
}
res[i] = NewPubsubResolver(ctx, reshosts[i], resmrs[i], ks, fs)
if err := reshosts[i].Connect(ctx, pubpinfo); err != nil {
t.Fatal(err)
}
}
time.Sleep(time.Millisecond * 100)
for i := 0; i < len(res); i++ {
checkResolveNotFound(ctx, t, i, res[i], name)
// delay to avoid connection storms
time.Sleep(time.Millisecond * 100)
}
// let the bootstrap finish
time.Sleep(time.Second * 1)
val := path.Path("/ipfs/QmP1DfoUjiWH2ZBo1PBH6FupdBucbDepx3HpWmEY6JMUpY")
err = pub.Publish(ctx, privk, val)
if err != nil {
t.Fatal(err)
}
// let the flood propagate
time.Sleep(time.Second * 1)
for i := 0; i < len(res); i++ {
checkResolve(ctx, t, i, res[i], name, val)
}
val = path.Path("/ipfs/QmP1wMAqk6aZYRZirbaAwmrNeqFRgQrwBt3orUtvSa1UYD")
err = pub.Publish(ctx, privk, val)
if err != nil {
t.Fatal(err)
}
// let the flood propagate
time.Sleep(time.Second * 1)
for i := 0; i < len(res); i++ {
checkResolve(ctx, t, i, res[i], name, val)
}
// cancel subscriptions
for i := 0; i < len(res); i++ {
res[i].Cancel(name)
}
time.Sleep(time.Millisecond * 100)
nval := path.Path("/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr")
err = pub.Publish(ctx, privk, nval)
if err != nil {
t.Fatal(err)
}
// check we still have the old value in the resolver
time.Sleep(time.Second * 1)
for i := 0; i < len(res); i++ {
checkResolve(ctx, t, i, res[i], name, val)
}
}
func checkResolveNotFound(ctx context.Context, t *testing.T, i int, resolver Resolver, name string) {
_, err := resolver.Resolve(ctx, name)
if err != ErrResolveFailed {
t.Fatalf("[resolver %d] unexpected error: %s", i, err.Error())
}
}
func checkResolve(ctx context.Context, t *testing.T, i int, resolver Resolver, name string, val path.Path) {
xval, err := resolver.Resolve(ctx, name)
if err != nil {
t.Fatalf("[resolver %d] resolve failed: %s", i, err.Error())
}
if xval != val {
t.Fatalf("[resolver %d] unexpected value: %s %s", i, val, xval)
}
}

View File

@ -17,6 +17,7 @@ import (
mh "gx/ipfs/QmZyZDi491cCNTLfAhwcaDii2Kg4pwKRkhqQzURGDvY6ua/go-multihash"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
dht "gx/ipfs/Qmd3jqhBQFvhfBNTSJMQL15GgyVMpdxKTta69Napvx6Myd/go-libp2p-kad-dht"
)
var log = logging.Logger("namesys")
@ -133,28 +134,28 @@ func (r *routingResolver) resolveOnce(ctx context.Context, name string, options
return "", err
}
pid, err := peer.IDFromBytes(hash)
if err != nil {
log.Debugf("RoutingResolver: could not convert public key hash %s to peer ID: %s\n", name, err)
return "", err
}
// Name should be the hash of a public key retrievable from ipfs.
// We retrieve the public key here to make certain that it's in the peer
// store before calling GetValue() on the DHT - the DHT will call the
// ipns validator, which in turn will get the public key from the peer
// store to verify the record signature
_, err = routing.GetPublicKey(r.routing, ctx, hash)
_, err = routing.GetPublicKey(r.routing, ctx, pid)
if err != nil {
log.Debugf("RoutingResolver: could not retrieve public key %s: %s\n", name, err)
return "", err
}
pid, err := peer.IDFromBytes(hash)
if err != nil {
log.Debugf("RoutingResolver: could not convert public key hash %s to peer ID: %s\n", name, err)
return "", err
}
// Use the routing system to get the name.
// Note that the DHT will call the ipns validator when retrieving
// the value, which in turn verifies the ipns record signature
_, ipnsKey := IpnsKeysForID(pid)
val, err := r.getValue(ctx, ipnsKey, options)
val, err := r.routing.GetValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount)))
if err != nil {
log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err)
return "", err
@ -187,39 +188,6 @@ func (r *routingResolver) resolveOnce(ctx context.Context, name string, options
}
}
func (r *routingResolver) getValue(ctx context.Context, ipnsKey string, options *opts.ResolveOpts) ([]byte, error) {
// Get specified number of values from the DHT
vals, err := r.routing.GetValues(ctx, ipnsKey, int(options.DhtRecordCount))
if err != nil {
return nil, err
}
// Select the best value
recs := make([][]byte, 0, len(vals))
for _, v := range vals {
if v.Val != nil {
recs = append(recs, v.Val)
}
}
if len(recs) == 0 {
return nil, routing.ErrNotFound
}
i, err := IpnsSelectorFunc(ipnsKey, recs)
if err != nil {
return nil, err
}
best := recs[i]
if best == nil {
log.Errorf("GetValues %s yielded record with nil value", ipnsKey)
return nil, routing.ErrNotFound
}
return best, nil
}
func checkEOL(e *pb.IpnsEntry) (time.Time, bool) {
if e.GetValidityType() == pb.IpnsEntry_EOL {
eol, err := u.ParseRFC3339(string(e.GetValidity()))

View File

@ -1,63 +0,0 @@
package namesys
import (
"bytes"
"errors"
pb "github.com/ipfs/go-ipfs/namesys/pb"
u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
)
// IpnsSelectorFunc selects the best record by checking which has the highest
// sequence number and latest EOL
func IpnsSelectorFunc(k string, vals [][]byte) (int, error) {
var recs []*pb.IpnsEntry
for _, v := range vals {
e := new(pb.IpnsEntry)
err := proto.Unmarshal(v, e)
if err == nil {
recs = append(recs, e)
} else {
recs = append(recs, nil)
}
}
return selectRecord(recs, vals)
}
func selectRecord(recs []*pb.IpnsEntry, vals [][]byte) (int, error) {
var bestSeq uint64
besti := -1
for i, r := range recs {
if r == nil || r.GetSequence() < bestSeq {
continue
}
rt, err := u.ParseRFC3339(string(r.GetValidity()))
if err != nil {
log.Errorf("failed to parse ipns record EOL %s", r.GetValidity())
continue
}
if besti == -1 || r.GetSequence() > bestSeq {
bestSeq = r.GetSequence()
besti = i
} else if r.GetSequence() == bestSeq {
bestt, _ := u.ParseRFC3339(string(recs[besti].GetValidity()))
if rt.After(bestt) {
besti = i
} else if rt == bestt {
if bytes.Compare(vals[i], vals[besti]) > 0 {
besti = i
}
}
}
}
if besti == -1 {
return 0, errors.New("no usable records in given set")
}
return besti, nil
}

View File

@ -1,6 +1,7 @@
package namesys
import (
"bytes"
"errors"
"time"
@ -41,64 +42,106 @@ var ErrKeyFormat = errors.New("record key could not be parsed into peer ID")
// from the peer store
var ErrPublicKeyNotFound = errors.New("public key not found in peer store")
// NewIpnsRecordValidator returns a ValidChecker for IPNS records.
// The validator function will get a public key from the KeyBook
// to verify the record's signature. Note that the public key must
// already have been fetched from the network and put into the KeyBook
// by the caller.
func NewIpnsRecordValidator(kbook pstore.KeyBook) *record.ValidChecker {
// ValidateIpnsRecord implements ValidatorFunc and verifies that the
// given record's value is an IpnsEntry, that the entry has been correctly
// signed, and that the entry has not expired
ValidateIpnsRecord := func(r *record.ValidationRecord) error {
if r.Namespace != "ipns" {
return ErrInvalidPath
}
// Parse the value into an IpnsEntry
entry := new(pb.IpnsEntry)
err := proto.Unmarshal(r.Value, entry)
if err != nil {
return ErrBadRecord
}
// Get the public key defined by the ipns path
pid, err := peer.IDFromString(r.Key)
if err != nil {
log.Debugf("failed to parse ipns record key %s into peer ID", r.Key)
return ErrKeyFormat
}
pubk := kbook.PubKey(pid)
if pubk == nil {
log.Debugf("public key with hash %s not found in peer store", pid)
return ErrPublicKeyNotFound
}
// Check the ipns record signature with the public key
if ok, err := pubk.Verify(ipnsEntryDataForSig(entry), entry.GetSignature()); err != nil || !ok {
log.Debugf("failed to verify signature for ipns record %s", r.Key)
return ErrSignature
}
// Check that record has not expired
switch entry.GetValidityType() {
case pb.IpnsEntry_EOL:
t, err := u.ParseRFC3339(string(entry.GetValidity()))
if err != nil {
log.Debugf("failed parsing time for ipns record EOL in record %s", r.Key)
return err
}
if time.Now().After(t) {
return ErrExpiredRecord
}
default:
return ErrUnrecognizedValidity
}
return nil
}
return &record.ValidChecker{
Func: ValidateIpnsRecord,
Sign: false,
}
type IpnsValidator struct {
KeyBook pstore.KeyBook
}
func (v IpnsValidator) Validate(key string, value []byte) error {
ns, pidString, err := record.SplitKey(key)
if err != nil || ns != "ipns" {
return ErrInvalidPath
}
// Parse the value into an IpnsEntry
entry := new(pb.IpnsEntry)
err = proto.Unmarshal(value, entry)
if err != nil {
return ErrBadRecord
}
// Get the public key defined by the ipns path
pid, err := peer.IDFromString(pidString)
if err != nil {
log.Debugf("failed to parse ipns record key %s into peer ID", pidString)
return ErrKeyFormat
}
pubk := v.KeyBook.PubKey(pid)
if pubk == nil {
log.Debugf("public key with hash %s not found in peer store", pid)
return ErrPublicKeyNotFound
}
// Check the ipns record signature with the public key
if ok, err := pubk.Verify(ipnsEntryDataForSig(entry), entry.GetSignature()); err != nil || !ok {
log.Debugf("failed to verify signature for ipns record %s", pidString)
return ErrSignature
}
// Check that record has not expired
switch entry.GetValidityType() {
case pb.IpnsEntry_EOL:
t, err := u.ParseRFC3339(string(entry.GetValidity()))
if err != nil {
log.Debugf("failed parsing time for ipns record EOL in record %s", pidString)
return err
}
if time.Now().After(t) {
return ErrExpiredRecord
}
default:
return ErrUnrecognizedValidity
}
return nil
}
// IpnsSelectorFunc selects the best record by checking which has the highest
// sequence number and latest EOL
func (v IpnsValidator) Select(k string, vals [][]byte) (int, error) {
var recs []*pb.IpnsEntry
for _, v := range vals {
e := new(pb.IpnsEntry)
err := proto.Unmarshal(v, e)
if err == nil {
recs = append(recs, e)
} else {
recs = append(recs, nil)
}
}
return selectRecord(recs, vals)
}
func selectRecord(recs []*pb.IpnsEntry, vals [][]byte) (int, error) {
var bestSeq uint64
besti := -1
for i, r := range recs {
if r == nil || r.GetSequence() < bestSeq {
continue
}
rt, err := u.ParseRFC3339(string(r.GetValidity()))
if err != nil {
log.Errorf("failed to parse ipns record EOL %s", r.GetValidity())
continue
}
if besti == -1 || r.GetSequence() > bestSeq {
bestSeq = r.GetSequence()
besti = i
} else if r.GetSequence() == bestSeq {
bestt, _ := u.ParseRFC3339(string(recs[besti].GetValidity()))
if rt.After(bestt) {
besti = i
} else if rt == bestt {
if bytes.Compare(vals[i], vals[besti]) > 0 {
besti = i
}
}
}
}
if besti == -1 {
return 0, errors.New("no usable records in given set")
}
return besti, nil
}

View File

@ -599,6 +599,18 @@
"hash": "QmTbBs3Y3u5F69XNJzdnnc6SP5GKgcXxCDzx6w8m6piVRT",
"name": "go-bitfield",
"version": "0.1.1"
},
{
"author": "stebalien",
"hash": "QmdSX2uedxXdsfNSqbkfSxcYi7pXBBdvp1Km2ZsjPWfydt",
"name": "go-libp2p-pubsub-router",
"version": "0.2.1"
},
{
"author": "Stebalien",
"hash": "QmeoG1seQ8a9b2vS3XJ8HPz9tXr6dzpS5NizjuDDSntQEk",
"name": "go-libp2p-routing-helpers",
"version": "0.1.0"
}
],
"gxVersion": "0.10.0",