mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-26 04:47:45 +08:00
Merge pull request #363 from jbenet/misc/2014-10-2X
miscellaneous fixes
This commit is contained in:
commit
bad4db3135
@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
manners "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/braintree/manners"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
@ -23,6 +24,7 @@ const (
|
||||
ipnsMountKwd = "mount-ipns"
|
||||
// apiAddrKwd = "address-api"
|
||||
// swarmAddrKwd = "address-swarm"
|
||||
originEnvKey = "API_ORIGIN"
|
||||
)
|
||||
|
||||
var daemonCmd = &cmds.Command{
|
||||
@ -144,9 +146,11 @@ func listenAndServeAPI(node *core.IpfsNode, req cmds.Request, addr ma.Multiaddr)
|
||||
return err
|
||||
}
|
||||
|
||||
origin := os.Getenv(originEnvKey)
|
||||
|
||||
server := manners.NewServer()
|
||||
mux := http.NewServeMux()
|
||||
cmdHandler := cmdsHttp.NewHandler(*req.Context(), commands.Root)
|
||||
cmdHandler := cmdsHttp.NewHandler(*req.Context(), commands.Root, origin)
|
||||
mux.Handle(cmdsHttp.ApiPath+"/", cmdHandler)
|
||||
|
||||
ifpsHandler := &ipfsHandler{node}
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
cmds "github.com/jbenet/go-ipfs/commands"
|
||||
cmdsCli "github.com/jbenet/go-ipfs/commands/cli"
|
||||
cmdsHttp "github.com/jbenet/go-ipfs/commands/http"
|
||||
@ -53,6 +54,7 @@ type cmdInvocation struct {
|
||||
// - output the response
|
||||
// - if anything fails, print error, maybe with help
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
var err error
|
||||
var invoc cmdInvocation
|
||||
defer invoc.close()
|
||||
@ -114,7 +116,7 @@ func main() {
|
||||
}
|
||||
|
||||
// ok, finally, run the command invocation.
|
||||
output, err := invoc.Run()
|
||||
output, err := invoc.Run(ctx)
|
||||
if err != nil {
|
||||
printErr(err)
|
||||
|
||||
@ -129,7 +131,7 @@ func main() {
|
||||
io.Copy(os.Stdout, output)
|
||||
}
|
||||
|
||||
func (i *cmdInvocation) Run() (output io.Reader, err error) {
|
||||
func (i *cmdInvocation) Run(ctx context.Context) (output io.Reader, err error) {
|
||||
// setup our global interrupt handler.
|
||||
i.setupInterruptHandler()
|
||||
|
||||
@ -153,7 +155,7 @@ func (i *cmdInvocation) Run() (output io.Reader, err error) {
|
||||
defer stopProfilingFunc() // to be executed as late as possible
|
||||
}
|
||||
|
||||
res, err := callCommand(i.req, Root)
|
||||
res, err := callCommand(ctx, i.req, Root)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -243,8 +245,9 @@ func (i *cmdInvocation) requestedHelp() (short bool, long bool, err error) {
|
||||
return longHelp, shortHelp, nil
|
||||
}
|
||||
|
||||
func callPreCommandHooks(details cmdDetails, req cmds.Request, root *cmds.Command) error {
|
||||
func callPreCommandHooks(ctx context.Context, details cmdDetails, req cmds.Request, root *cmds.Command) error {
|
||||
|
||||
log.Event(ctx, "callPreCommandHooks", &details)
|
||||
log.Debug("Calling pre-command hooks...")
|
||||
|
||||
// some hooks only run when the command is executed locally
|
||||
@ -284,7 +287,7 @@ func callPreCommandHooks(details cmdDetails, req cmds.Request, root *cmds.Comman
|
||||
return nil
|
||||
}
|
||||
|
||||
func callCommand(req cmds.Request, root *cmds.Command) (cmds.Response, error) {
|
||||
func callCommand(ctx context.Context, req cmds.Request, root *cmds.Command) (cmds.Response, error) {
|
||||
var res cmds.Response
|
||||
|
||||
details, err := commandDetails(req.Path(), root)
|
||||
@ -297,7 +300,7 @@ func callCommand(req cmds.Request, root *cmds.Command) (cmds.Response, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = callPreCommandHooks(*details, req, root)
|
||||
err = callPreCommandHooks(ctx, *details, req, root)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -12,8 +12,9 @@ import (
|
||||
var log = u.Logger("commands/http")
|
||||
|
||||
type Handler struct {
|
||||
ctx cmds.Context
|
||||
root *cmds.Command
|
||||
ctx cmds.Context
|
||||
root *cmds.Command
|
||||
origin string
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("404 page not found")
|
||||
@ -29,13 +30,23 @@ var mimeTypes = map[string]string{
|
||||
cmds.Text: "text/plain",
|
||||
}
|
||||
|
||||
func NewHandler(ctx cmds.Context, root *cmds.Command) *Handler {
|
||||
return &Handler{ctx, root}
|
||||
func NewHandler(ctx cmds.Context, root *cmds.Command, origin string) *Handler {
|
||||
// allow whitelisted origins (so we can make API requests from the browser)
|
||||
if len(origin) > 0 {
|
||||
log.Info("Allowing API requests from origin: " + origin)
|
||||
}
|
||||
|
||||
return &Handler{ctx, root, origin}
|
||||
}
|
||||
|
||||
func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("Incoming API request: ", r.URL)
|
||||
|
||||
if len(i.origin) > 0 {
|
||||
w.Header().Set("Access-Control-Allow-Origin", i.origin)
|
||||
}
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
|
||||
|
||||
req, err := Parse(r, i.root)
|
||||
if err != nil {
|
||||
if err == ErrNotFound {
|
||||
|
||||
@ -146,7 +146,7 @@ func (i *Identity) DecodePrivateKey(passphrase string) (crypto.PrivateKey, error
|
||||
// Load reads given file and returns the read config, or error.
|
||||
func Load(filename string) (*Config, error) {
|
||||
// if nothing is there, fail. User must run 'ipfs init'
|
||||
if _, err := os.Stat(filename); os.IsNotExist(err) {
|
||||
if !u.FileExists(filename) {
|
||||
return nil, debugerror.New("ipfs not initialized, please run 'ipfs init'")
|
||||
}
|
||||
|
||||
|
||||
13
core/core.go
13
core/core.go
@ -29,11 +29,12 @@ import (
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
|
||||
"github.com/jbenet/go-ipfs/util/debugerror"
|
||||
"github.com/jbenet/go-ipfs/util/eventlog"
|
||||
)
|
||||
|
||||
const IpnsValidatorTag = "ipns"
|
||||
|
||||
var log = u.Logger("core")
|
||||
var log = eventlog.Logger("core")
|
||||
|
||||
// IpfsNode is IPFS Core module. It represents an IPFS instance.
|
||||
type IpfsNode struct {
|
||||
@ -242,9 +243,11 @@ func initIdentity(cfg *config.Identity, peers peer.Peerstore, online bool) (peer
|
||||
}
|
||||
|
||||
func initConnections(ctx context.Context, cfg *config.Config, pstore peer.Peerstore, route *dht.IpfsDHT) {
|
||||
// TODO consider stricter error handling
|
||||
// TODO consider Criticalf error logging
|
||||
for _, p := range cfg.Bootstrap {
|
||||
if p.PeerID == "" {
|
||||
log.Errorf("error: peer does not include PeerID. %v", p)
|
||||
log.Criticalf("error: peer does not include PeerID. %v", p)
|
||||
}
|
||||
|
||||
maddr, err := ma.NewMultiaddr(p.Address)
|
||||
@ -256,14 +259,16 @@ func initConnections(ctx context.Context, cfg *config.Config, pstore peer.Peerst
|
||||
// setup peer
|
||||
npeer, err := pstore.Get(peer.DecodePrettyID(p.PeerID))
|
||||
if err != nil {
|
||||
log.Errorf("Bootstrapping error: %v", err)
|
||||
log.Criticalf("Bootstrapping error: %v", err)
|
||||
continue
|
||||
}
|
||||
npeer.AddAddress(maddr)
|
||||
|
||||
if _, err = route.Connect(ctx, npeer); err != nil {
|
||||
log.Errorf("Bootstrapping error: %v", err)
|
||||
log.Criticalf("Bootstrapping error: %v", err)
|
||||
continue
|
||||
}
|
||||
log.Event(ctx, "bootstrap", npeer)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -31,10 +31,8 @@ func New(ctx context.Context, p peer.Peer,
|
||||
|
||||
notif := notifications.New()
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
notif.Shutdown()
|
||||
}
|
||||
<-ctx.Done()
|
||||
notif.Shutdown()
|
||||
}()
|
||||
|
||||
bs := &bitswap{
|
||||
|
||||
@ -34,7 +34,7 @@ func (ps *impl) Publish(block blocks.Block) {
|
||||
func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block {
|
||||
topic := string(k)
|
||||
subChan := ps.wrapped.SubOnce(topic)
|
||||
blockChannel := make(chan blocks.Block)
|
||||
blockChannel := make(chan blocks.Block, 1) // buffered so the sender doesn't wait on receiver
|
||||
go func() {
|
||||
defer close(blockChannel)
|
||||
select {
|
||||
|
||||
@ -28,28 +28,17 @@ func (ss *SizeSplitter) Split(r io.Reader) chan []byte {
|
||||
for {
|
||||
// log.Infof("making chunk with size: %d", ss.Size)
|
||||
chunk := make([]byte, ss.Size)
|
||||
sofar := 0
|
||||
|
||||
// this-chunk loop (keep reading until this chunk full)
|
||||
for {
|
||||
nread, err := r.Read(chunk[sofar:])
|
||||
sofar += nread
|
||||
if err == io.EOF {
|
||||
if sofar > 0 {
|
||||
// log.Infof("sending out chunk with size: %d", sofar)
|
||||
out <- chunk[:sofar]
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("Block split error: %s", err)
|
||||
return
|
||||
}
|
||||
if sofar == ss.Size {
|
||||
// log.Infof("sending out chunk with size: %d", sofar)
|
||||
out <- chunk[:sofar]
|
||||
break // break out of this-chunk loop
|
||||
}
|
||||
nread, err := io.ReadFull(r, chunk)
|
||||
if nread > 0 {
|
||||
// log.Infof("sending out chunk with size: %d", sofar)
|
||||
out <- chunk[:nread]
|
||||
}
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("Block split error: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
type NetMessage interface {
|
||||
Peer() peer.Peer
|
||||
Data() []byte
|
||||
Loggable() map[string]interface{}
|
||||
}
|
||||
|
||||
// New is the interface for constructing a new message.
|
||||
@ -35,6 +36,16 @@ func (m *message) Data() []byte {
|
||||
return m.data
|
||||
}
|
||||
|
||||
func (m *message) Loggable() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"netMessage": map[string]interface{}{
|
||||
"recipient": m.Peer(),
|
||||
// TODO sizeBytes? bytes? lenBytes?
|
||||
"size": len(m.Data()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// FromObject creates a message from a protobuf-marshallable message.
|
||||
func FromObject(p peer.Peer, data proto.Message) (NetMessage, error) {
|
||||
bytes, err := proto.Marshal(data)
|
||||
|
||||
@ -122,7 +122,6 @@ func (s *Swarm) peerMultiConn(p peer.Peer) (*conn.MultiConn, error) {
|
||||
s.Children().Add(1)
|
||||
mc.Children().Add(1) // child of Conn as well.
|
||||
go s.fanInSingle(mc)
|
||||
log.Debugf("added new multiconn: %s", mc)
|
||||
return mc, nil
|
||||
}
|
||||
|
||||
@ -133,7 +132,7 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) {
|
||||
return nil, errors.New("Tried to start nil connection.")
|
||||
}
|
||||
|
||||
log.Debugf("%s Started connection: %s", c.LocalPeer(), c.RemotePeer())
|
||||
log.Event(context.TODO(), "connSetupBegin", c.LocalPeer(), c.RemotePeer())
|
||||
|
||||
// add address of connection to Peer. Maybe it should happen in connSecure.
|
||||
// NOT adding this address here, because the incoming address in TCP
|
||||
@ -163,8 +162,7 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) {
|
||||
return nil, err
|
||||
}
|
||||
mc.Add(c)
|
||||
log.Debugf("multiconn added new conn %s", c)
|
||||
|
||||
log.Event(context.TODO(), "connSetupSuccess", c.LocalPeer(), c.RemotePeer())
|
||||
return c, nil
|
||||
}
|
||||
|
||||
@ -200,6 +198,7 @@ func (s *Swarm) fanOut() {
|
||||
|
||||
i++
|
||||
log.Debugf("%s sent message to %s (%d)", s.local, msg.Peer(), i)
|
||||
log.Event(context.TODO(), "sendMessage", s.local, msg)
|
||||
// queue it in the connection's buffer
|
||||
c.Out() <- msg.Data()
|
||||
}
|
||||
|
||||
@ -151,6 +151,8 @@ func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO replace the TODO ctx with a context passed in from caller
|
||||
log.Event(context.TODO(), "dial", peer)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
|
||||
@ -98,8 +98,6 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
|
||||
|
||||
// Connect to a new peer at the given address, ping and add to the routing table
|
||||
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, error) {
|
||||
log.Debugf("Connect to new peer: %s", npeer)
|
||||
|
||||
// TODO(jbenet,whyrusleeping)
|
||||
//
|
||||
// Connect should take in a Peer (with ID). In a sense, we shouldn't be
|
||||
@ -120,8 +118,9 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, er
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
|
||||
}
|
||||
log.Event(ctx, "connect", dht.self, npeer)
|
||||
|
||||
dht.Update(npeer)
|
||||
dht.Update(ctx, npeer)
|
||||
|
||||
return npeer, nil
|
||||
}
|
||||
@ -150,7 +149,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
|
||||
}
|
||||
|
||||
// update the peer (on valid msgs only)
|
||||
dht.Update(mPeer)
|
||||
dht.Update(ctx, mPeer)
|
||||
|
||||
log.Event(ctx, "foo", dht.self, mPeer, pmes)
|
||||
|
||||
@ -397,8 +396,8 @@ func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
|
||||
|
||||
// Update signals to all routingTables to Update their last-seen status
|
||||
// on the given peer.
|
||||
func (dht *IpfsDHT) Update(p peer.Peer) {
|
||||
log.Debugf("updating peer: %s latency = %f\n", p, p.GetLatency().Seconds())
|
||||
func (dht *IpfsDHT) Update(ctx context.Context, p peer.Peer) {
|
||||
log.Event(ctx, "updatePeer", p)
|
||||
removedCount := 0
|
||||
for _, route := range dht.routingTables {
|
||||
removed := route.Update(p)
|
||||
|
||||
@ -129,7 +129,7 @@ func TestGetFailures(t *testing.T) {
|
||||
|
||||
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
|
||||
other := makePeer(nil)
|
||||
d.Update(other)
|
||||
d.Update(ctx, other)
|
||||
|
||||
// This one should time out
|
||||
// u.POut("Timout Test\n")
|
||||
@ -232,7 +232,7 @@ func TestNotFound(t *testing.T) {
|
||||
var ps []peer.Peer
|
||||
for i := 0; i < 5; i++ {
|
||||
ps = append(ps, _randPeer())
|
||||
d.Update(ps[i])
|
||||
d.Update(ctx, ps[i])
|
||||
}
|
||||
|
||||
// Reply with random peers to every message
|
||||
@ -298,7 +298,7 @@ func TestLessThanKResponses(t *testing.T) {
|
||||
var ps []peer.Peer
|
||||
for i := 0; i < 5; i++ {
|
||||
ps = append(ps, _randPeer())
|
||||
d.Update(ps[i])
|
||||
d.Update(ctx, ps[i])
|
||||
}
|
||||
other := _randPeer()
|
||||
|
||||
|
||||
@ -10,7 +10,10 @@ type key int
|
||||
|
||||
const metadataKey key = 0
|
||||
|
||||
func ContextWithMetadata(ctx context.Context, l Loggable) context.Context {
|
||||
// ContextWithLoggable returns a derived context which contains the provided
|
||||
// Loggable. Any Events logged with the derived context will include the
|
||||
// provided Loggable.
|
||||
func ContextWithLoggable(ctx context.Context, l Loggable) context.Context {
|
||||
existing, err := MetadataFromContext(ctx)
|
||||
if err != nil {
|
||||
// context does not contain meta. just set the new metadata
|
||||
|
||||
@ -10,7 +10,7 @@ func TestContextContainsMetadata(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
m := Metadata{"foo": "bar"}
|
||||
ctx := ContextWithMetadata(context.Background(), m)
|
||||
ctx := ContextWithLoggable(context.Background(), m)
|
||||
got, err := MetadataFromContext(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -25,8 +25,8 @@ func TestContextContainsMetadata(t *testing.T) {
|
||||
func TestContextWithPreexistingMetadata(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := ContextWithMetadata(context.Background(), Metadata{"hello": "world"})
|
||||
ctx = ContextWithMetadata(ctx, Metadata{"goodbye": "earth"})
|
||||
ctx := ContextWithLoggable(context.Background(), Metadata{"hello": "world"})
|
||||
ctx = ContextWithLoggable(ctx, Metadata{"goodbye": "earth"})
|
||||
|
||||
got, err := MetadataFromContext(ctx)
|
||||
if err != nil {
|
||||
|
||||
6
util/eventlog/loggable.go
Normal file
6
util/eventlog/loggable.go
Normal file
@ -0,0 +1,6 @@
|
||||
package eventlog
|
||||
|
||||
// Loggable describes objects that can be marshalled into Metadata for logging
|
||||
type Loggable interface {
|
||||
Loggable() map[string]interface{}
|
||||
}
|
||||
@ -11,11 +11,6 @@ import (
|
||||
// Metadata is a convenience type for generic maps
|
||||
type Metadata map[string]interface{}
|
||||
|
||||
// Loggable describes objects that can be marshalled into Metadata for logging
|
||||
type Loggable interface {
|
||||
Loggable() map[string]interface{}
|
||||
}
|
||||
|
||||
// Uuid returns a Metadata with the string key and UUID value
|
||||
func Uuid(key string) Metadata {
|
||||
return Metadata{
|
||||
|
||||
@ -19,17 +19,19 @@ func init() {
|
||||
|
||||
type Option func()
|
||||
|
||||
// Configure applies the provided options sequentially from left to right
|
||||
func Configure(options ...Option) {
|
||||
for _, f := range options {
|
||||
f()
|
||||
}
|
||||
}
|
||||
|
||||
// LdJSONFormatter formats the event log as line-delimited JSON
|
||||
// LdJSONFormatter Option formats the event log as line-delimited JSON
|
||||
var LdJSONFormatter = func() {
|
||||
logrus.SetFormatter(&PoliteJSONFormatter{})
|
||||
}
|
||||
|
||||
// TextFormatter Option formats the event log as human-readable plain-text
|
||||
var TextFormatter = func() {
|
||||
logrus.SetFormatter(&logrus.TextFormatter{})
|
||||
}
|
||||
@ -60,14 +62,17 @@ func OutputRotatingLogFile(config LogRotatorConfig) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// LevelDebug Option sets the log level to debug
|
||||
var LevelDebug = func() {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
}
|
||||
|
||||
// LevelDebug Option sets the log level to error
|
||||
var LevelError = func() {
|
||||
logrus.SetLevel(logrus.ErrorLevel)
|
||||
}
|
||||
|
||||
// LevelDebug Option sets the log level to info
|
||||
var LevelInfo = func() {
|
||||
logrus.SetLevel(logrus.InfoLevel)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user