mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-25 20:37:53 +08:00
commit
6f8569db7d
@ -31,6 +31,8 @@ import (
|
||||
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
|
||||
)
|
||||
|
||||
const IpnsValidatorTag = "ipns"
|
||||
|
||||
var log = u.Logger("core")
|
||||
|
||||
// IpfsNode is IPFS Core module. It represents an IPFS instance.
|
||||
@ -156,6 +158,8 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
|
||||
|
||||
// setup routing service
|
||||
dhtRouting := dht.NewDHT(ctx, n.Identity, n.Peerstore, n.Network, dhtService, n.Datastore)
|
||||
dhtRouting.Validators[IpnsValidatorTag] = namesys.ValidateIpnsRecord
|
||||
|
||||
// TODO(brian): perform this inside NewDHT factory method
|
||||
dhtService.SetHandler(dhtRouting) // wire the handler to the service.
|
||||
n.Routing = dhtRouting
|
||||
|
||||
@ -20,10 +20,43 @@ import math "math"
|
||||
var _ = proto.Marshal
|
||||
var _ = math.Inf
|
||||
|
||||
type IpnsEntry_ValidityType int32
|
||||
|
||||
const (
|
||||
// setting an EOL says "this record is valid until..."
|
||||
IpnsEntry_EOL IpnsEntry_ValidityType = 0
|
||||
)
|
||||
|
||||
var IpnsEntry_ValidityType_name = map[int32]string{
|
||||
0: "EOL",
|
||||
}
|
||||
var IpnsEntry_ValidityType_value = map[string]int32{
|
||||
"EOL": 0,
|
||||
}
|
||||
|
||||
func (x IpnsEntry_ValidityType) Enum() *IpnsEntry_ValidityType {
|
||||
p := new(IpnsEntry_ValidityType)
|
||||
*p = x
|
||||
return p
|
||||
}
|
||||
func (x IpnsEntry_ValidityType) String() string {
|
||||
return proto.EnumName(IpnsEntry_ValidityType_name, int32(x))
|
||||
}
|
||||
func (x *IpnsEntry_ValidityType) UnmarshalJSON(data []byte) error {
|
||||
value, err := proto.UnmarshalJSONEnum(IpnsEntry_ValidityType_value, data, "IpnsEntry_ValidityType")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*x = IpnsEntry_ValidityType(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
type IpnsEntry struct {
|
||||
Value []byte `protobuf:"bytes,1,req,name=value" json:"value,omitempty"`
|
||||
Signature []byte `protobuf:"bytes,2,req,name=signature" json:"signature,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
Value []byte `protobuf:"bytes,1,req,name=value" json:"value,omitempty"`
|
||||
Signature []byte `protobuf:"bytes,2,req,name=signature" json:"signature,omitempty"`
|
||||
ValidityType *IpnsEntry_ValidityType `protobuf:"varint,3,opt,name=validityType,enum=namesys.pb.IpnsEntry_ValidityType" json:"validityType,omitempty"`
|
||||
Validity []byte `protobuf:"bytes,4,opt,name=validity" json:"validity,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *IpnsEntry) Reset() { *m = IpnsEntry{} }
|
||||
@ -44,5 +77,20 @@ func (m *IpnsEntry) GetSignature() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
func (m *IpnsEntry) GetValidityType() IpnsEntry_ValidityType {
|
||||
if m != nil && m.ValidityType != nil {
|
||||
return *m.ValidityType
|
||||
}
|
||||
return IpnsEntry_EOL
|
||||
}
|
||||
|
||||
func (m *IpnsEntry) GetValidity() []byte {
|
||||
if m != nil {
|
||||
return m.Validity
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("namesys.pb.IpnsEntry_ValidityType", IpnsEntry_ValidityType_name, IpnsEntry_ValidityType_value)
|
||||
}
|
||||
|
||||
@ -1,6 +1,13 @@
|
||||
package namesys.pb;
|
||||
|
||||
message IpnsEntry {
|
||||
enum ValidityType {
|
||||
// setting an EOL says "this record is valid until..."
|
||||
EOL = 0;
|
||||
}
|
||||
required bytes value = 1;
|
||||
required bytes signature = 2;
|
||||
|
||||
optional ValidityType validityType = 3;
|
||||
optional bytes validity = 4;
|
||||
}
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package namesys
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@ -14,6 +16,14 @@ import (
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
// ErrExpiredRecord should be returned when an ipns record is
|
||||
// invalid due to being too old
|
||||
var ErrExpiredRecord = errors.New("expired record")
|
||||
|
||||
// ErrUnrecognizedValidity is returned when an IpnsRecord has an
|
||||
// unknown validity type.
|
||||
var ErrUnrecognizedValidity = errors.New("unrecognized validity type")
|
||||
|
||||
// ipnsPublisher is capable of publishing and resolving names to the IPFS
|
||||
// routing system.
|
||||
type ipnsPublisher struct {
|
||||
@ -33,34 +43,40 @@ func (p *ipnsPublisher) Publish(k ci.PrivKey, value string) error {
|
||||
// validate `value` is a ref (multihash)
|
||||
_, err := mh.FromB58String(value)
|
||||
if err != nil {
|
||||
log.Errorf("hash cast failed: %s", value)
|
||||
return fmt.Errorf("publish value must be str multihash. %v", err)
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
data, err := createRoutingEntryData(k, value)
|
||||
if err != nil {
|
||||
log.Error("entry creation failed.")
|
||||
return err
|
||||
}
|
||||
pubkey := k.GetPublic()
|
||||
pkbytes, err := pubkey.Bytes()
|
||||
if err != nil {
|
||||
return nil
|
||||
log.Error("pubkey getbytes failed.")
|
||||
return err
|
||||
}
|
||||
|
||||
nameb := u.Hash(pkbytes)
|
||||
namekey := u.Key(nameb).Pretty()
|
||||
ipnskey := u.Hash([]byte("/ipns/" + namekey))
|
||||
namekey := u.Key("/pk/" + string(nameb))
|
||||
|
||||
log.Debugf("Storing pubkey at: %s", namekey)
|
||||
// Store associated public key
|
||||
timectx, _ := context.WithDeadline(ctx, time.Now().Add(time.Second*4))
|
||||
err = p.routing.PutValue(timectx, u.Key(nameb), pkbytes)
|
||||
err = p.routing.PutValue(timectx, namekey, pkbytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store ipns entry at h("/ipns/"+b58(h(pubkey)))
|
||||
ipnskey := u.Key("/ipns/" + string(nameb))
|
||||
|
||||
log.Debugf("Storing ipns entry at: %s", ipnskey)
|
||||
// Store ipns entry at "/ipns/"+b58(h(pubkey))
|
||||
timectx, _ = context.WithDeadline(ctx, time.Now().Add(time.Second*4))
|
||||
err = p.routing.PutValue(timectx, u.Key(ipnskey), data)
|
||||
err = p.routing.PutValue(timectx, ipnskey, data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -70,11 +86,49 @@ func (p *ipnsPublisher) Publish(k ci.PrivKey, value string) error {
|
||||
|
||||
func createRoutingEntryData(pk ci.PrivKey, val string) ([]byte, error) {
|
||||
entry := new(pb.IpnsEntry)
|
||||
sig, err := pk.Sign([]byte(val))
|
||||
|
||||
entry.Value = []byte(val)
|
||||
typ := pb.IpnsEntry_EOL
|
||||
entry.ValidityType = &typ
|
||||
entry.Validity = []byte(u.FormatRFC3339(time.Now().Add(time.Hour * 24)))
|
||||
|
||||
sig, err := pk.Sign(ipnsEntryDataForSig(entry))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
entry.Signature = sig
|
||||
entry.Value = []byte(val)
|
||||
return proto.Marshal(entry)
|
||||
}
|
||||
|
||||
func ipnsEntryDataForSig(e *pb.IpnsEntry) []byte {
|
||||
return bytes.Join([][]byte{
|
||||
e.Value,
|
||||
e.Validity,
|
||||
[]byte(fmt.Sprint(e.GetValidityType())),
|
||||
},
|
||||
[]byte{})
|
||||
}
|
||||
|
||||
// ValidateIpnsRecord implements ValidatorFunc and verifies that the
|
||||
// given 'val' is an IpnsEntry and that that entry is valid.
|
||||
func ValidateIpnsRecord(k u.Key, val []byte) error {
|
||||
entry := new(pb.IpnsEntry)
|
||||
err := proto.Unmarshal(val, entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch entry.GetValidityType() {
|
||||
case pb.IpnsEntry_EOL:
|
||||
t, err := u.ParseRFC3339(string(entry.GetValue()))
|
||||
if err != nil {
|
||||
log.Error("Failed parsing time for ipns record EOL")
|
||||
return err
|
||||
}
|
||||
if time.Now().After(t) {
|
||||
return ErrExpiredRecord
|
||||
}
|
||||
default:
|
||||
return ErrUnrecognizedValidity
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -46,7 +46,7 @@ func (r *routingResolver) Resolve(name string) (string, error) {
|
||||
|
||||
// use the routing system to get the name.
|
||||
// /ipns/<name>
|
||||
h := u.Hash([]byte("/ipns/" + name))
|
||||
h := []byte("/ipns/" + string(hash))
|
||||
|
||||
ipnsKey := u.Key(h)
|
||||
val, err := r.routing.GetValue(ctx, ipnsKey)
|
||||
@ -63,7 +63,7 @@ func (r *routingResolver) Resolve(name string) (string, error) {
|
||||
|
||||
// name should be a public key retrievable from ipfs
|
||||
// /ipfs/<name>
|
||||
key := u.Key(hash)
|
||||
key := u.Key("/pk/" + string(hash))
|
||||
pkval, err := r.routing.GetValue(ctx, key)
|
||||
if err != nil {
|
||||
log.Warning("RoutingResolve PubKey Get failed.")
|
||||
@ -75,9 +75,11 @@ func (r *routingResolver) Resolve(name string) (string, error) {
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
hsh, _ := pk.Hash()
|
||||
log.Debugf("pk hash = %s", u.Key(hsh))
|
||||
|
||||
// check sig with pk
|
||||
if ok, err := pk.Verify(entry.GetValue(), entry.GetSignature()); err != nil || !ok {
|
||||
if ok, err := pk.Verify(ipnsEntryDataForSig(entry), entry.GetSignature()); err != nil || !ok {
|
||||
return "", fmt.Errorf("Invalid value. Not signed by PrivateKey corresponding to %v", pk)
|
||||
}
|
||||
|
||||
|
||||
@ -60,6 +60,9 @@ type IpfsDHT struct {
|
||||
//lock to make diagnostics work better
|
||||
diaglock sync.Mutex
|
||||
|
||||
// record validator funcs
|
||||
Validators map[string]ValidatorFunc
|
||||
|
||||
ctxc.ContextCloser
|
||||
}
|
||||
|
||||
@ -82,6 +85,9 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
|
||||
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour)
|
||||
dht.birth = time.Now()
|
||||
|
||||
dht.Validators = make(map[string]ValidatorFunc)
|
||||
dht.Validators["pk"] = ValidatePublicKeyRecord
|
||||
|
||||
if doPinging {
|
||||
dht.Children().Add(1)
|
||||
go dht.PingRoutine(time.Second * 10)
|
||||
@ -215,16 +221,16 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Messa
|
||||
|
||||
// putValueToNetwork stores the given key/value pair at the peer 'p'
|
||||
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
|
||||
key string, value []byte) error {
|
||||
key string, rec *pb.Record) error {
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
|
||||
pmes.Value = value
|
||||
pmes.Record = rec
|
||||
rpmes, err := dht.sendRequest(ctx, p, pmes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !bytes.Equal(rpmes.Value, pmes.Value) {
|
||||
if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
|
||||
return errors.New("value not put correctly")
|
||||
}
|
||||
return nil
|
||||
@ -260,11 +266,17 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
log.Debugf("pmes.GetValue() %v", pmes.GetValue())
|
||||
if value := pmes.GetValue(); value != nil {
|
||||
if record := pmes.GetRecord(); record != nil {
|
||||
// Success! We were given the value
|
||||
log.Debug("getValueOrPeers: got value")
|
||||
return value, nil, nil
|
||||
|
||||
// make sure record is still valid
|
||||
err = dht.verifyRecord(record)
|
||||
if err != nil {
|
||||
log.Error("Received invalid record!")
|
||||
return nil, nil, err
|
||||
}
|
||||
return record.GetValue(), nil, nil
|
||||
}
|
||||
|
||||
// TODO decide on providers. This probably shouldn't be happening.
|
||||
@ -325,10 +337,15 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
|
||||
continue
|
||||
}
|
||||
|
||||
if value := pmes.GetValue(); value != nil {
|
||||
if record := pmes.GetRecord(); record != nil {
|
||||
// Success! We were given the value
|
||||
|
||||
err := dht.verifyRecord(record)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dht.providers.AddProvider(key, p)
|
||||
return value, nil
|
||||
return record.GetValue(), nil
|
||||
}
|
||||
}
|
||||
return nil, routing.ErrNotFound
|
||||
@ -338,21 +355,47 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
|
||||
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
|
||||
dht.dslock.Lock()
|
||||
defer dht.dslock.Unlock()
|
||||
log.Debug("getLocal %s", key)
|
||||
v, err := dht.datastore.Get(key.DsKey())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Debug("found in db")
|
||||
|
||||
byt, ok := v.([]byte)
|
||||
if !ok {
|
||||
return nil, errors.New("value stored in datastore not []byte")
|
||||
}
|
||||
return byt, nil
|
||||
rec := new(pb.Record)
|
||||
err = proto.Unmarshal(byt, rec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: 'if paranoid'
|
||||
if u.Debug {
|
||||
err = dht.verifyRecord(rec)
|
||||
if err != nil {
|
||||
log.Errorf("local record verify failed: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return rec.GetValue(), nil
|
||||
}
|
||||
|
||||
// putLocal stores the key value pair in the datastore
|
||||
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
|
||||
return dht.datastore.Put(key.DsKey(), value)
|
||||
rec, err := dht.makePutRecord(key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := proto.Marshal(rec)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return dht.datastore.Put(key.DsKey(), data)
|
||||
}
|
||||
|
||||
// Update signals to all routingTables to Update their last-seen status
|
||||
|
||||
@ -33,6 +33,9 @@ func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT {
|
||||
|
||||
d := NewDHT(ctx, p, peerstore, net, dhts, ds.NewMapDatastore())
|
||||
dhts.SetHandler(d)
|
||||
d.Validators["v"] = func(u.Key, []byte) error {
|
||||
return nil
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
@ -136,6 +139,12 @@ func TestValueGetSet(t *testing.T) {
|
||||
dhtA := setupDHT(ctx, t, peerA)
|
||||
dhtB := setupDHT(ctx, t, peerB)
|
||||
|
||||
vf := func(u.Key, []byte) error {
|
||||
return nil
|
||||
}
|
||||
dhtA.Validators["v"] = vf
|
||||
dhtB.Validators["v"] = vf
|
||||
|
||||
defer dhtA.Close()
|
||||
defer dhtB.Close()
|
||||
defer dhtA.dialer.(inet.Network).Close()
|
||||
@ -147,10 +156,10 @@ func TestValueGetSet(t *testing.T) {
|
||||
}
|
||||
|
||||
ctxT, _ := context.WithTimeout(ctx, time.Second)
|
||||
dhtA.PutValue(ctxT, "hello", []byte("world"))
|
||||
dhtA.PutValue(ctxT, "/v/hello", []byte("world"))
|
||||
|
||||
ctxT, _ = context.WithTimeout(ctx, time.Second*2)
|
||||
val, err := dhtA.GetValue(ctxT, "hello")
|
||||
val, err := dhtA.GetValue(ctxT, "/v/hello")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -160,7 +169,7 @@ func TestValueGetSet(t *testing.T) {
|
||||
}
|
||||
|
||||
ctxT, _ = context.WithTimeout(ctx, time.Second*2)
|
||||
val, err = dhtB.GetValue(ctxT, "hello")
|
||||
val, err = dhtB.GetValue(ctxT, "/v/hello")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -326,12 +335,12 @@ func TestLayeredGet(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[3].putLocal(u.Key("hello"), []byte("world"))
|
||||
err = dhts[3].putLocal(u.Key("/v/hello"), []byte("world"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[3].Provide(ctx, u.Key("hello"))
|
||||
err = dhts[3].Provide(ctx, u.Key("/v/hello"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -339,7 +348,7 @@ func TestLayeredGet(t *testing.T) {
|
||||
time.Sleep(time.Millisecond * 60)
|
||||
|
||||
ctxT, _ := context.WithTimeout(ctx, time.Second)
|
||||
val, err := dhts[0].GetValue(ctxT, u.Key("hello"))
|
||||
val, err := dhts[0].GetValue(ctxT, u.Key("/v/hello"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"testing"
|
||||
|
||||
crand "crypto/rand"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
|
||||
@ -124,10 +125,10 @@ func TestGetFailures(t *testing.T) {
|
||||
fs := &fauxSender{}
|
||||
|
||||
peerstore := peer.NewPeerstore()
|
||||
local := peer.WithIDString("test_peer")
|
||||
local := makePeer(nil)
|
||||
|
||||
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
|
||||
other := peer.WithIDString("other_peer")
|
||||
other := makePeer(nil)
|
||||
d.Update(other)
|
||||
|
||||
// This one should time out
|
||||
@ -173,10 +174,14 @@ func TestGetFailures(t *testing.T) {
|
||||
// Now we test this DHT's handleGetValue failure
|
||||
typ := pb.Message_GET_VALUE
|
||||
str := "hello"
|
||||
rec, err := d.makePutRecord(u.Key(str), []byte("blah"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req := pb.Message{
|
||||
Type: &typ,
|
||||
Key: &str,
|
||||
Value: []byte{0},
|
||||
Type: &typ,
|
||||
Key: &str,
|
||||
Record: rec,
|
||||
}
|
||||
|
||||
// u.POut("handleGetValue Test\n")
|
||||
@ -192,12 +197,9 @@ func TestGetFailures(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if pmes.GetValue() != nil {
|
||||
if pmes.GetRecord() != nil {
|
||||
t.Fatal("shouldnt have value")
|
||||
}
|
||||
if pmes.GetCloserPeers() != nil {
|
||||
t.Fatal("shouldnt have closer peers")
|
||||
}
|
||||
if pmes.GetProviderPeers() != nil {
|
||||
t.Fatal("shouldnt have provider peers")
|
||||
}
|
||||
@ -221,7 +223,7 @@ func TestNotFound(t *testing.T) {
|
||||
fn := &fauxNet{}
|
||||
fs := &fauxSender{}
|
||||
|
||||
local := peer.WithIDString("test_peer")
|
||||
local := makePeer(nil)
|
||||
peerstore := peer.NewPeerstore()
|
||||
peerstore.Add(local)
|
||||
|
||||
@ -287,7 +289,7 @@ func TestLessThanKResponses(t *testing.T) {
|
||||
u.Debug = false
|
||||
fn := &fauxNet{}
|
||||
fs := &fauxSender{}
|
||||
local := peer.WithIDString("test_peer")
|
||||
local := makePeer(nil)
|
||||
peerstore := peer.NewPeerstore()
|
||||
peerstore.Add(local)
|
||||
|
||||
|
||||
@ -5,6 +5,8 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
@ -72,7 +74,14 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message,
|
||||
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
|
||||
}
|
||||
|
||||
resp.Value = byts
|
||||
rec := new(pb.Record)
|
||||
err := proto.Unmarshal(byts, rec)
|
||||
if err != nil {
|
||||
log.Error("Failed to unmarshal dht record from datastore")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp.Record = rec
|
||||
}
|
||||
|
||||
// if we know any providers for the requested value, return those.
|
||||
@ -102,8 +111,21 @@ func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message,
|
||||
dht.dslock.Lock()
|
||||
defer dht.dslock.Unlock()
|
||||
dskey := u.Key(pmes.GetKey()).DsKey()
|
||||
err := dht.datastore.Put(dskey, pmes.GetValue())
|
||||
log.Debugf("%s handlePutValue %v %v\n", dht.self, dskey, pmes.GetValue())
|
||||
|
||||
err := dht.verifyRecord(pmes.GetRecord())
|
||||
if err != nil {
|
||||
fmt.Println(u.Key(pmes.GetRecord().GetAuthor()))
|
||||
log.Error("Bad dht record in put request")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err := proto.Marshal(pmes.GetRecord())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = dht.datastore.Put(dskey, data)
|
||||
log.Debugf("%s handlePutValue %v\n", dht.self, dskey)
|
||||
return pmes, err
|
||||
}
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ It is generated from these files:
|
||||
|
||||
It has these top-level messages:
|
||||
Message
|
||||
Record
|
||||
*/
|
||||
package dht_pb
|
||||
|
||||
@ -75,7 +76,7 @@ type Message struct {
|
||||
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
|
||||
// Used to return a value
|
||||
// PUT_VALUE, GET_VALUE
|
||||
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
|
||||
Record *Record `protobuf:"bytes,3,opt,name=record" json:"record,omitempty"`
|
||||
// Used to return peers closer to a key in a query
|
||||
// GET_VALUE, GET_PROVIDERS, FIND_NODE
|
||||
CloserPeers []*Message_Peer `protobuf:"bytes,8,rep,name=closerPeers" json:"closerPeers,omitempty"`
|
||||
@ -110,9 +111,9 @@ func (m *Message) GetKey() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Message) GetValue() []byte {
|
||||
func (m *Message) GetRecord() *Record {
|
||||
if m != nil {
|
||||
return m.Value
|
||||
return m.Record
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -155,6 +156,52 @@ func (m *Message_Peer) GetAddr() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Record represents a dht record that contains a value
|
||||
// for a key value pair
|
||||
type Record struct {
|
||||
// The key that references this record
|
||||
Key *string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"`
|
||||
// The actual value this record is storing
|
||||
Value []byte `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"`
|
||||
// hash of the authors public key
|
||||
Author *string `protobuf:"bytes,3,opt,name=author" json:"author,omitempty"`
|
||||
// A PKI signature for the key+value+author
|
||||
Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Record) Reset() { *m = Record{} }
|
||||
func (m *Record) String() string { return proto.CompactTextString(m) }
|
||||
func (*Record) ProtoMessage() {}
|
||||
|
||||
func (m *Record) GetKey() string {
|
||||
if m != nil && m.Key != nil {
|
||||
return *m.Key
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Record) GetValue() []byte {
|
||||
if m != nil {
|
||||
return m.Value
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Record) GetAuthor() string {
|
||||
if m != nil && m.Author != nil {
|
||||
return *m.Author
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Record) GetSignature() []byte {
|
||||
if m != nil {
|
||||
return m.Signature
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value)
|
||||
}
|
||||
|
||||
@ -29,7 +29,7 @@ message Message {
|
||||
|
||||
// Used to return a value
|
||||
// PUT_VALUE, GET_VALUE
|
||||
optional bytes value = 3;
|
||||
optional Record record = 3;
|
||||
|
||||
// Used to return peers closer to a key in a query
|
||||
// GET_VALUE, GET_PROVIDERS, FIND_NODE
|
||||
@ -39,3 +39,19 @@ message Message {
|
||||
// GET_VALUE, ADD_PROVIDER, GET_PROVIDERS
|
||||
repeated Peer providerPeers = 9;
|
||||
}
|
||||
|
||||
// Record represents a dht record that contains a value
|
||||
// for a key value pair
|
||||
message Record {
|
||||
// The key that references this record
|
||||
optional string key = 1;
|
||||
|
||||
// The actual value this record is storing
|
||||
optional bytes value = 2;
|
||||
|
||||
// hash of the authors public key
|
||||
optional string author = 3;
|
||||
|
||||
// A PKI signature for the key+value+author
|
||||
optional bytes signature = 4;
|
||||
}
|
||||
|
||||
120
routing/dht/records.go
Normal file
120
routing/dht/records.go
Normal file
@ -0,0 +1,120 @@
|
||||
package dht
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
ci "github.com/jbenet/go-ipfs/crypto"
|
||||
"github.com/jbenet/go-ipfs/peer"
|
||||
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
// ValidatorFunc is a function that is called to validate a given
|
||||
// type of DHTRecord.
|
||||
type ValidatorFunc func(u.Key, []byte) error
|
||||
|
||||
// ErrBadRecord is returned any time a dht record is found to be
|
||||
// incorrectly formatted or signed.
|
||||
var ErrBadRecord = errors.New("bad dht record")
|
||||
|
||||
// ErrInvalidRecordType is returned if a DHTRecord keys prefix
|
||||
// is not found in the Validator map of the DHT.
|
||||
var ErrInvalidRecordType = errors.New("invalid record keytype")
|
||||
|
||||
// creates and signs a dht record for the given key/value pair
|
||||
func (dht *IpfsDHT) makePutRecord(key u.Key, value []byte) (*pb.Record, error) {
|
||||
record := new(pb.Record)
|
||||
|
||||
record.Key = proto.String(string(key))
|
||||
record.Value = value
|
||||
record.Author = proto.String(string(dht.self.ID()))
|
||||
blob := bytes.Join([][]byte{[]byte(key), value, []byte(dht.self.ID())}, []byte{})
|
||||
sig, err := dht.self.PrivKey().Sign(blob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
record.Signature = sig
|
||||
return record, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) getPublicKey(pid peer.ID) (ci.PubKey, error) {
|
||||
log.Debug("getPublicKey for: %s", pid)
|
||||
p, err := dht.peerstore.Get(pid)
|
||||
if err == nil {
|
||||
return p.PubKey(), nil
|
||||
}
|
||||
|
||||
log.Debug("not in peerstore, searching dht.")
|
||||
ctxT, _ := context.WithTimeout(dht.ContextCloser.Context(), time.Second*5)
|
||||
val, err := dht.GetValue(ctxT, u.Key("/pk/"+string(pid)))
|
||||
if err != nil {
|
||||
log.Warning("Failed to find requested public key.")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pubkey, err := ci.UnmarshalPublicKey(val)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to unmarshal public key: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
return pubkey, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) verifyRecord(r *pb.Record) error {
|
||||
// First, validate the signature
|
||||
p, err := dht.peerstore.Get(peer.ID(r.GetAuthor()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
k := u.Key(r.GetKey())
|
||||
|
||||
blob := bytes.Join([][]byte{[]byte(k),
|
||||
r.GetValue(),
|
||||
[]byte(r.GetAuthor())}, []byte{})
|
||||
|
||||
ok, err := p.PubKey().Verify(blob, r.GetSignature())
|
||||
if err != nil {
|
||||
log.Error("Signature verify failed.")
|
||||
return err
|
||||
}
|
||||
|
||||
if !ok {
|
||||
return ErrBadRecord
|
||||
}
|
||||
|
||||
// Now, check validity func
|
||||
parts := strings.Split(r.GetKey(), "/")
|
||||
if len(parts) < 3 {
|
||||
log.Errorf("Record had bad key: %s", u.Key(r.GetKey()))
|
||||
return ErrBadRecord
|
||||
}
|
||||
|
||||
fnc, ok := dht.Validators[parts[1]]
|
||||
if !ok {
|
||||
log.Errorf("Unrecognized key prefix: %s", parts[1])
|
||||
return ErrInvalidRecordType
|
||||
}
|
||||
|
||||
return fnc(u.Key(r.GetKey()), r.GetValue())
|
||||
}
|
||||
|
||||
// ValidatePublicKeyRecord implements ValidatorFunc and
|
||||
// verifies that the passed in record value is the PublicKey
|
||||
// that matches the passed in key.
|
||||
func ValidatePublicKeyRecord(k u.Key, val []byte) error {
|
||||
keyparts := bytes.Split([]byte(k), []byte("/"))
|
||||
if len(keyparts) < 3 {
|
||||
return errors.New("invalid key")
|
||||
}
|
||||
|
||||
pkh := u.Hash(val)
|
||||
if !bytes.Equal(keyparts[2], pkh) {
|
||||
return errors.New("public key does not match storage key")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -25,6 +25,12 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
|
||||
return err
|
||||
}
|
||||
|
||||
rec, err := dht.makePutRecord(key, value)
|
||||
if err != nil {
|
||||
log.Error("Creation of record failed!")
|
||||
return err
|
||||
}
|
||||
|
||||
var peers []peer.Peer
|
||||
for _, route := range dht.routingTables {
|
||||
npeers := route.NearestPeers(kb.ConvertKey(key), KValue)
|
||||
@ -33,7 +39,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
|
||||
|
||||
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
log.Debugf("%s PutValue qry part %v", dht.self, p)
|
||||
err := dht.putValueToNetwork(ctx, p, string(key), value)
|
||||
err := dht.putValueToNetwork(ctx, p, string(key), rec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -12,6 +12,8 @@ import (
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
var log = u.Logger("mockrouter")
|
||||
|
||||
var _ routing.IpfsRouting = &MockRouter{}
|
||||
|
||||
type MockRouter struct {
|
||||
@ -33,10 +35,12 @@ func (mr *MockRouter) SetRoutingServer(rs RoutingServer) {
|
||||
}
|
||||
|
||||
func (mr *MockRouter) PutValue(ctx context.Context, key u.Key, val []byte) error {
|
||||
log.Debugf("PutValue: %s", key)
|
||||
return mr.datastore.Put(key.DsKey(), val)
|
||||
}
|
||||
|
||||
func (mr *MockRouter) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
||||
log.Debugf("GetValue: %s", key)
|
||||
v, err := mr.datastore.Get(key.DsKey())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -55,6 +59,7 @@ func (mr *MockRouter) FindProviders(ctx context.Context, key u.Key) ([]peer.Peer
|
||||
}
|
||||
|
||||
func (mr *MockRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.Peer, error) {
|
||||
log.Debug("FindPeer: %s", pid)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
||||
17
util/time.go
Normal file
17
util/time.go
Normal file
@ -0,0 +1,17 @@
|
||||
package util
|
||||
|
||||
import "time"
|
||||
|
||||
var TimeFormatIpfs = time.RFC3339Nano
|
||||
|
||||
func ParseRFC3339(s string) (time.Time, error) {
|
||||
t, err := time.Parse(TimeFormatIpfs, s)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
return t.UTC(), nil
|
||||
}
|
||||
|
||||
func FormatRFC3339(t time.Time) string {
|
||||
return t.UTC().Format(TimeFormatIpfs)
|
||||
}
|
||||
16
util/time_test.go
Normal file
16
util/time_test.go
Normal file
@ -0,0 +1,16 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestTimeFormatParseInversion(t *testing.T) {
|
||||
v, err := ParseRFC3339(FormatRFC3339(time.Now()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if v.Location() != time.UTC {
|
||||
t.Fatal("Time should be UTC")
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user