mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-01 06:17:56 +08:00
Merge pull request #254 from jbenet/comments
comment comment comment comment
This commit is contained in:
commit
60ef8e5e8c
@ -19,6 +19,9 @@ func NewBlock(data []byte) *Block {
|
||||
return &Block{Data: data, Multihash: u.Hash(data)}
|
||||
}
|
||||
|
||||
// NewBlockWithHash creates a new block when the hash of the data
|
||||
// is already known, this is used to save time in situations where
|
||||
// we are able to be confident that the data is correct
|
||||
func NewBlockWithHash(data []byte, h mh.Multihash) (*Block, error) {
|
||||
if u.Debug {
|
||||
chk := u.Hash(data)
|
||||
|
||||
@ -93,6 +93,7 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteBlock deletes a block in the blockservice from the datastore
|
||||
func (s *BlockService) DeleteBlock(k u.Key) error {
|
||||
return s.Datastore.Delete(k.DsKey())
|
||||
}
|
||||
|
||||
@ -28,6 +28,7 @@ const (
|
||||
RSA = iota
|
||||
)
|
||||
|
||||
// Key represents a crypto key that can be compared to another key
|
||||
type Key interface {
|
||||
// Bytes returns a serialized, storeable representation of this key
|
||||
Bytes() ([]byte, error)
|
||||
@ -39,6 +40,8 @@ type Key interface {
|
||||
Equals(Key) bool
|
||||
}
|
||||
|
||||
// PrivKey represents a private key that can be used to generate a public key,
|
||||
// sign data, and decrypt data that was encrypted with a public key
|
||||
type PrivKey interface {
|
||||
Key
|
||||
|
||||
@ -60,12 +63,14 @@ type PubKey interface {
|
||||
// Verify that 'sig' is the signed hash of 'data'
|
||||
Verify(data []byte, sig []byte) (bool, error)
|
||||
|
||||
// Encrypt data in a way that can be decrypted by a paired private key
|
||||
Encrypt(data []byte) ([]byte, error)
|
||||
}
|
||||
|
||||
// Given a public key, generates the shared key.
|
||||
type GenSharedKey func([]byte) ([]byte, error)
|
||||
|
||||
// Generates a keypair of the given type and bitsize
|
||||
func GenerateKeyPair(typ, bits int) (PrivKey, PubKey, error) {
|
||||
switch typ {
|
||||
case RSA:
|
||||
@ -217,6 +222,8 @@ func KeyStretcher(cmp int, cipherType string, hashType string, secret []byte) ([
|
||||
return myIV, theirIV, myCKey, theirCKey, myMKey, theirMKey
|
||||
}
|
||||
|
||||
// UnmarshalPublicKey converts a protobuf serialized public key into its
|
||||
// representative object
|
||||
func UnmarshalPublicKey(data []byte) (PubKey, error) {
|
||||
pmes := new(pb.PublicKey)
|
||||
err := proto.Unmarshal(data, pmes)
|
||||
@ -232,6 +239,8 @@ func UnmarshalPublicKey(data []byte) (PubKey, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// UnmarshalPrivateKey converts a protobuf serialized private key into its
|
||||
// representative object
|
||||
func UnmarshalPrivateKey(data []byte) (PrivKey, error) {
|
||||
pmes := new(pb.PrivateKey)
|
||||
err := proto.Unmarshal(data, pmes)
|
||||
|
||||
@ -24,6 +24,8 @@ var log = util.Logger("diagnostics")
|
||||
|
||||
const ResponseTimeout = time.Second * 10
|
||||
|
||||
// Diagnostics is a net service that manages requesting and responding to diagnostic
|
||||
// requests
|
||||
type Diagnostics struct {
|
||||
network net.Network
|
||||
sender net.Sender
|
||||
@ -34,6 +36,7 @@ type Diagnostics struct {
|
||||
birth time.Time
|
||||
}
|
||||
|
||||
// NewDiagnostics instantiates a new diagnostics service running on the given network
|
||||
func NewDiagnostics(self peer.Peer, inet net.Network, sender net.Sender) *Diagnostics {
|
||||
return &Diagnostics{
|
||||
network: inet,
|
||||
@ -50,15 +53,30 @@ type connDiagInfo struct {
|
||||
}
|
||||
|
||||
type DiagInfo struct {
|
||||
ID string
|
||||
// This nodes ID
|
||||
ID string
|
||||
|
||||
// A list of peers this node currently has open connections to
|
||||
Connections []connDiagInfo
|
||||
Keys []string
|
||||
LifeSpan time.Duration
|
||||
BwIn uint64
|
||||
BwOut uint64
|
||||
|
||||
// A list of keys provided by this node
|
||||
// (currently not filled)
|
||||
Keys []string
|
||||
|
||||
// How long this node has been running for
|
||||
LifeSpan time.Duration
|
||||
|
||||
// Incoming Bandwidth Usage
|
||||
BwIn uint64
|
||||
|
||||
// Outgoing Bandwidth Usage
|
||||
BwOut uint64
|
||||
|
||||
// Information about the version of code this node is running
|
||||
CodeVersion string
|
||||
}
|
||||
|
||||
// Marshal to json
|
||||
func (di *DiagInfo) Marshal() []byte {
|
||||
b, err := json.Marshal(di)
|
||||
if err != nil {
|
||||
@ -93,6 +111,7 @@ func newID() string {
|
||||
return string(id)
|
||||
}
|
||||
|
||||
// GetDiagnostic runs a diagnostics request across the entire network
|
||||
func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) {
|
||||
log.Debug("Getting diagnostic.")
|
||||
ctx, _ := context.WithTimeout(context.TODO(), timeout)
|
||||
@ -134,12 +153,12 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error)
|
||||
if data == nil {
|
||||
continue
|
||||
}
|
||||
out = AppendDiagnostics(data, out)
|
||||
out = appendDiagnostics(data, out)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func AppendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo {
|
||||
func appendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo {
|
||||
buf := bytes.NewBuffer(data)
|
||||
dec := json.NewDecoder(buf)
|
||||
for {
|
||||
@ -202,6 +221,8 @@ func (d *Diagnostics) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Mes
|
||||
func (d *Diagnostics) handleDiagnostic(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||
log.Debugf("HandleDiagnostic from %s for id = %s", p, pmes.GetDiagID())
|
||||
resp := newMessage(pmes.GetDiagID())
|
||||
|
||||
// Make sure we havent already handled this request to prevent loops
|
||||
d.diagLock.Lock()
|
||||
_, found := d.diagMap[pmes.GetDiagID()]
|
||||
if found {
|
||||
|
||||
@ -29,7 +29,7 @@ func NetMessageSession(parent context.Context, p peer.Peer,
|
||||
networkAdapter := bsnet.NetMessageAdapter(srv, net, nil)
|
||||
bs := &bitswap{
|
||||
blockstore: blockstore.NewBlockstore(d),
|
||||
notifications: notifications.New(),
|
||||
notifications: notifications.New(), // TODO Shutdown()
|
||||
strategy: strategy.New(nice),
|
||||
routing: directory,
|
||||
sender: networkAdapter,
|
||||
|
||||
@ -29,6 +29,7 @@ func NewDagFromReader(r io.Reader) (*dag.Node, error) {
|
||||
return NewDagFromReaderWithSplitter(r, chunk.DefaultSplitter)
|
||||
}
|
||||
|
||||
// Creates an in memory DAG from data in the given reader
|
||||
func NewDagFromReaderWithSplitter(r io.Reader, spl chunk.BlockSplitter) (*dag.Node, error) {
|
||||
blkChan := spl.Split(r)
|
||||
first := <-blkChan
|
||||
@ -75,6 +76,8 @@ func NewDagFromFile(fpath string) (*dag.Node, error) {
|
||||
return NewDagFromReader(f)
|
||||
}
|
||||
|
||||
// Builds a DAG from the given file, writing created blocks to disk as they are
|
||||
// created
|
||||
func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*dag.Node, error) {
|
||||
stat, err := os.Stat(fpath)
|
||||
if err != nil {
|
||||
@ -94,6 +97,8 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da
|
||||
return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter)
|
||||
}
|
||||
|
||||
// Builds a DAG from the data in the given reader, writing created blocks to disk
|
||||
// as they are created
|
||||
func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
|
||||
blkChan := spl.Split(r)
|
||||
|
||||
|
||||
@ -48,6 +48,7 @@ type Link struct {
|
||||
Node *Node
|
||||
}
|
||||
|
||||
// MakeLink creates a link to the given node
|
||||
func MakeLink(n *Node) (*Link, error) {
|
||||
s, err := n.Size()
|
||||
if err != nil {
|
||||
@ -64,6 +65,7 @@ func MakeLink(n *Node) (*Link, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetNode returns the MDAG Node that this link points to
|
||||
func (l *Link) GetNode(serv DAGService) (*Node, error) {
|
||||
if l.Node != nil {
|
||||
return l.Node, nil
|
||||
@ -98,6 +100,7 @@ func (n *Node) AddNodeLinkClean(name string, that *Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove a link on this node by the given name
|
||||
func (n *Node) RemoveNodeLink(name string) error {
|
||||
for i, l := range n.Links {
|
||||
if l.Name == name {
|
||||
@ -196,6 +199,7 @@ func (n *dagService) Add(nd *Node) (u.Key, error) {
|
||||
return n.Blocks.AddBlock(b)
|
||||
}
|
||||
|
||||
// AddRecursive adds the given node and all child nodes to the BlockService
|
||||
func (n *dagService) AddRecursive(nd *Node) error {
|
||||
_, err := n.Add(nd)
|
||||
if err != nil {
|
||||
@ -230,6 +234,7 @@ func (n *dagService) Get(k u.Key) (*Node, error) {
|
||||
return Decoded(b.Data)
|
||||
}
|
||||
|
||||
// Remove deletes the given node and all of its children from the BlockService
|
||||
func (n *dagService) Remove(nd *Node) error {
|
||||
for _, l := range nd.Links {
|
||||
if l.Node != nil {
|
||||
@ -243,6 +248,8 @@ func (n *dagService) Remove(nd *Node) error {
|
||||
return n.Blocks.DeleteBlock(k)
|
||||
}
|
||||
|
||||
// FetchGraph asynchronously fetches all nodes that are children of the given
|
||||
// node, and returns a channel that may be waited upon for the fetch to complete
|
||||
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
|
||||
var wg sync.WaitGroup
|
||||
done := make(chan struct{})
|
||||
|
||||
@ -26,6 +26,7 @@ func NewRoutingPublisher(route routing.IpfsRouting) Publisher {
|
||||
}
|
||||
|
||||
// Publish implements Publisher. Accepts a keypair and a value,
|
||||
// and publishes it out to the routing system
|
||||
func (p *ipnsPublisher) Publish(k ci.PrivKey, value string) error {
|
||||
log.Debugf("namesys: Publish %s", value)
|
||||
|
||||
|
||||
@ -28,6 +28,7 @@ const (
|
||||
HandshakeTimeout = time.Second * 5
|
||||
)
|
||||
|
||||
// global static buffer pool for byte arrays of size MaxMessageSize
|
||||
var BufferPool *sync.Pool
|
||||
|
||||
func init() {
|
||||
@ -38,6 +39,8 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
// ReleaseBuffer puts the given byte array back into the buffer pool,
|
||||
// first verifying that it is the correct size
|
||||
func ReleaseBuffer(b []byte) {
|
||||
log.Warningf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b))
|
||||
if cap(b) != MaxMessageSize {
|
||||
|
||||
@ -274,6 +274,7 @@ func (p *peer) VerifyAndSetPubKey(pk ic.PubKey) error {
|
||||
panic("invariant violated: unexpected key mismatch")
|
||||
}
|
||||
|
||||
// Updates this peer with information from another peer instance
|
||||
func (p *peer) Update(other Peer) error {
|
||||
if !p.ID().Equal(other.ID()) {
|
||||
return errors.New("peer ids do not match")
|
||||
|
||||
15
pin/pin.go
15
pin/pin.go
@ -1,9 +1,6 @@
|
||||
package pin
|
||||
|
||||
import (
|
||||
|
||||
//ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
|
||||
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"sync"
|
||||
@ -36,12 +33,14 @@ type Pinner interface {
|
||||
}
|
||||
|
||||
// ManualPinner is for manually editing the pin structure
|
||||
// Use with care
|
||||
// Use with care! If used improperly, garbage collection
|
||||
// may not be successful
|
||||
type ManualPinner interface {
|
||||
PinWithMode(util.Key, PinMode)
|
||||
Pinner
|
||||
}
|
||||
|
||||
// pinner implements the Pinner interface
|
||||
type pinner struct {
|
||||
lock sync.RWMutex
|
||||
recursePin set.BlockSet
|
||||
@ -51,6 +50,7 @@ type pinner struct {
|
||||
dstore ds.Datastore
|
||||
}
|
||||
|
||||
// NewPinner creates a new pinner using the given datastore as a backend
|
||||
func NewPinner(dstore ds.Datastore, serv mdag.DAGService) Pinner {
|
||||
|
||||
// Load set from given datastore...
|
||||
@ -70,6 +70,7 @@ func NewPinner(dstore ds.Datastore, serv mdag.DAGService) Pinner {
|
||||
}
|
||||
}
|
||||
|
||||
// Pin the given node, optionally recursive
|
||||
func (p *pinner) Pin(node *mdag.Node, recurse bool) error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
@ -95,6 +96,7 @@ func (p *pinner) Pin(node *mdag.Node, recurse bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unpin a given key with optional recursive unpinning
|
||||
func (p *pinner) Unpin(k util.Key, recurse bool) error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
@ -158,6 +160,7 @@ func (p *pinner) pinLinks(node *mdag.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsPinned returns whether or not the given key is pinned
|
||||
func (p *pinner) IsPinned(key util.Key) bool {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
@ -166,6 +169,7 @@ func (p *pinner) IsPinned(key util.Key) bool {
|
||||
p.indirPin.HasKey(key)
|
||||
}
|
||||
|
||||
// LoadPinner loads a pinner and its keysets from the given datastore
|
||||
func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) {
|
||||
p := new(pinner)
|
||||
|
||||
@ -200,6 +204,7 @@ func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Flush encodes and writes pinner keysets to the datastore
|
||||
func (p *pinner) Flush() error {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
@ -244,6 +249,8 @@ func loadSet(d ds.Datastore, k ds.Key, val interface{}) error {
|
||||
return json.Unmarshal(bf, val)
|
||||
}
|
||||
|
||||
// PinWithMode is a method on ManualPinners, allowing the user to have fine
|
||||
// grained control over pin counts
|
||||
func (p *pinner) PinWithMode(k util.Key, mode PinMode) {
|
||||
switch mode {
|
||||
case Recursive:
|
||||
|
||||
@ -228,6 +228,8 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
|
||||
return nil
|
||||
}
|
||||
|
||||
// putProvider sends a message to peer 'p' saying that the local node
|
||||
// can provide the value of 'key'
|
||||
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
|
||||
@ -384,6 +386,7 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
|
||||
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) {
|
||||
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level)
|
||||
return dht.sendRequest(ctx, p, pmes)
|
||||
@ -457,6 +460,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer
|
||||
return filtered
|
||||
}
|
||||
|
||||
// getPeer searches the peerstore for a peer with the given peer ID
|
||||
func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
|
||||
p, err := dht.peerstore.Get(id)
|
||||
if err != nil {
|
||||
@ -467,6 +471,8 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// peerFromInfo returns a peer using info in the protobuf peer struct
|
||||
// to lookup or create a peer
|
||||
func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {
|
||||
|
||||
id := peer.ID(pbp.GetId())
|
||||
|
||||
@ -48,7 +48,7 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Follows the next link in line and loads it from the DAGService,
|
||||
// precalcNextBuf follows the next link in line and loads it from the DAGService,
|
||||
// setting the next buffer to read from
|
||||
func (dr *DagReader) precalcNextBuf() error {
|
||||
if dr.position >= len(dr.node.Links) {
|
||||
@ -67,6 +67,7 @@ func (dr *DagReader) precalcNextBuf() error {
|
||||
|
||||
switch pb.GetType() {
|
||||
case ftpb.Data_Directory:
|
||||
// A directory should not exist within a file
|
||||
return ft.ErrInvalidDirLocation
|
||||
case ftpb.Data_File:
|
||||
//TODO: this *should* work, needs testing first
|
||||
@ -85,6 +86,7 @@ func (dr *DagReader) precalcNextBuf() error {
|
||||
}
|
||||
}
|
||||
|
||||
// Read reads data from the DAG structured file
|
||||
func (dr *DagReader) Read(b []byte) (int, error) {
|
||||
// If no cached buffer, load one
|
||||
if dr.buf == nil {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user