diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 6d5e9fc77..66d733e7c 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -3,6 +3,7 @@ package main import ( "fmt" "net/http" + "os" manners "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/braintree/manners" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" @@ -23,6 +24,7 @@ const ( ipnsMountKwd = "mount-ipns" // apiAddrKwd = "address-api" // swarmAddrKwd = "address-swarm" + originEnvKey = "API_ORIGIN" ) var daemonCmd = &cmds.Command{ @@ -144,9 +146,11 @@ func listenAndServeAPI(node *core.IpfsNode, req cmds.Request, addr ma.Multiaddr) return err } + origin := os.Getenv(originEnvKey) + server := manners.NewServer() mux := http.NewServeMux() - cmdHandler := cmdsHttp.NewHandler(*req.Context(), commands.Root) + cmdHandler := cmdsHttp.NewHandler(*req.Context(), commands.Root, origin) mux.Handle(cmdsHttp.ApiPath+"/", cmdHandler) ifpsHandler := &ipfsHandler{node} diff --git a/cmd/ipfs/main.go b/cmd/ipfs/main.go index 8a30a9a71..d8485476c 100644 --- a/cmd/ipfs/main.go +++ b/cmd/ipfs/main.go @@ -14,6 +14,7 @@ import ( ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" cmds "github.com/jbenet/go-ipfs/commands" cmdsCli "github.com/jbenet/go-ipfs/commands/cli" cmdsHttp "github.com/jbenet/go-ipfs/commands/http" @@ -53,6 +54,7 @@ type cmdInvocation struct { // - output the response // - if anything fails, print error, maybe with help func main() { + ctx := context.Background() var err error var invoc cmdInvocation defer invoc.close() @@ -114,7 +116,7 @@ func main() { } // ok, finally, run the command invocation. - output, err := invoc.Run() + output, err := invoc.Run(ctx) if err != nil { printErr(err) @@ -129,7 +131,7 @@ func main() { io.Copy(os.Stdout, output) } -func (i *cmdInvocation) Run() (output io.Reader, err error) { +func (i *cmdInvocation) Run(ctx context.Context) (output io.Reader, err error) { // setup our global interrupt handler. i.setupInterruptHandler() @@ -153,7 +155,7 @@ func (i *cmdInvocation) Run() (output io.Reader, err error) { defer stopProfilingFunc() // to be executed as late as possible } - res, err := callCommand(i.req, Root) + res, err := callCommand(ctx, i.req, Root) if err != nil { return nil, err } @@ -243,8 +245,9 @@ func (i *cmdInvocation) requestedHelp() (short bool, long bool, err error) { return longHelp, shortHelp, nil } -func callPreCommandHooks(details cmdDetails, req cmds.Request, root *cmds.Command) error { +func callPreCommandHooks(ctx context.Context, details cmdDetails, req cmds.Request, root *cmds.Command) error { + log.Event(ctx, "callPreCommandHooks", &details) log.Debug("Calling pre-command hooks...") // some hooks only run when the command is executed locally @@ -284,7 +287,7 @@ func callPreCommandHooks(details cmdDetails, req cmds.Request, root *cmds.Comman return nil } -func callCommand(req cmds.Request, root *cmds.Command) (cmds.Response, error) { +func callCommand(ctx context.Context, req cmds.Request, root *cmds.Command) (cmds.Response, error) { var res cmds.Response details, err := commandDetails(req.Path(), root) @@ -297,7 +300,7 @@ func callCommand(req cmds.Request, root *cmds.Command) (cmds.Response, error) { return nil, err } - err = callPreCommandHooks(*details, req, root) + err = callPreCommandHooks(ctx, *details, req, root) if err != nil { return nil, err } diff --git a/commands/http/handler.go b/commands/http/handler.go index 658d6ebe4..7f7280c88 100644 --- a/commands/http/handler.go +++ b/commands/http/handler.go @@ -12,8 +12,9 @@ import ( var log = u.Logger("commands/http") type Handler struct { - ctx cmds.Context - root *cmds.Command + ctx cmds.Context + root *cmds.Command + origin string } var ErrNotFound = errors.New("404 page not found") @@ -29,13 +30,23 @@ var mimeTypes = map[string]string{ cmds.Text: "text/plain", } -func NewHandler(ctx cmds.Context, root *cmds.Command) *Handler { - return &Handler{ctx, root} +func NewHandler(ctx cmds.Context, root *cmds.Command, origin string) *Handler { + // allow whitelisted origins (so we can make API requests from the browser) + if len(origin) > 0 { + log.Info("Allowing API requests from origin: " + origin) + } + + return &Handler{ctx, root, origin} } func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Incoming API request: ", r.URL) + if len(i.origin) > 0 { + w.Header().Set("Access-Control-Allow-Origin", i.origin) + } + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + req, err := Parse(r, i.root) if err != nil { if err == ErrNotFound { diff --git a/config/config.go b/config/config.go index 46fc8badd..56eb58347 100644 --- a/config/config.go +++ b/config/config.go @@ -146,7 +146,7 @@ func (i *Identity) DecodePrivateKey(passphrase string) (crypto.PrivateKey, error // Load reads given file and returns the read config, or error. func Load(filename string) (*Config, error) { // if nothing is there, fail. User must run 'ipfs init' - if _, err := os.Stat(filename); os.IsNotExist(err) { + if !u.FileExists(filename) { return nil, debugerror.New("ipfs not initialized, please run 'ipfs init'") } diff --git a/core/core.go b/core/core.go index 1fe9c9992..58d99b6f4 100644 --- a/core/core.go +++ b/core/core.go @@ -29,11 +29,12 @@ import ( u "github.com/jbenet/go-ipfs/util" ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" "github.com/jbenet/go-ipfs/util/debugerror" + "github.com/jbenet/go-ipfs/util/eventlog" ) const IpnsValidatorTag = "ipns" -var log = u.Logger("core") +var log = eventlog.Logger("core") // IpfsNode is IPFS Core module. It represents an IPFS instance. type IpfsNode struct { @@ -242,9 +243,11 @@ func initIdentity(cfg *config.Identity, peers peer.Peerstore, online bool) (peer } func initConnections(ctx context.Context, cfg *config.Config, pstore peer.Peerstore, route *dht.IpfsDHT) { + // TODO consider stricter error handling + // TODO consider Criticalf error logging for _, p := range cfg.Bootstrap { if p.PeerID == "" { - log.Errorf("error: peer does not include PeerID. %v", p) + log.Criticalf("error: peer does not include PeerID. %v", p) } maddr, err := ma.NewMultiaddr(p.Address) @@ -256,14 +259,16 @@ func initConnections(ctx context.Context, cfg *config.Config, pstore peer.Peerst // setup peer npeer, err := pstore.Get(peer.DecodePrettyID(p.PeerID)) if err != nil { - log.Errorf("Bootstrapping error: %v", err) + log.Criticalf("Bootstrapping error: %v", err) continue } npeer.AddAddress(maddr) if _, err = route.Connect(ctx, npeer); err != nil { - log.Errorf("Bootstrapping error: %v", err) + log.Criticalf("Bootstrapping error: %v", err) + continue } + log.Event(ctx, "bootstrap", npeer) } } diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 529c78689..8af8426d3 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -31,10 +31,8 @@ func New(ctx context.Context, p peer.Peer, notif := notifications.New() go func() { - select { - case <-ctx.Done(): - notif.Shutdown() - } + <-ctx.Done() + notif.Shutdown() }() bs := &bitswap{ diff --git a/exchange/bitswap/notifications/notifications.go b/exchange/bitswap/notifications/notifications.go index 2da2b7fad..34888d510 100644 --- a/exchange/bitswap/notifications/notifications.go +++ b/exchange/bitswap/notifications/notifications.go @@ -34,7 +34,7 @@ func (ps *impl) Publish(block blocks.Block) { func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block { topic := string(k) subChan := ps.wrapped.SubOnce(topic) - blockChannel := make(chan blocks.Block) + blockChannel := make(chan blocks.Block, 1) // buffered so the sender doesn't wait on receiver go func() { defer close(blockChannel) select { diff --git a/importer/chunk/splitting.go b/importer/chunk/splitting.go index 87d48be85..65a79d5ad 100644 --- a/importer/chunk/splitting.go +++ b/importer/chunk/splitting.go @@ -28,28 +28,17 @@ func (ss *SizeSplitter) Split(r io.Reader) chan []byte { for { // log.Infof("making chunk with size: %d", ss.Size) chunk := make([]byte, ss.Size) - sofar := 0 - - // this-chunk loop (keep reading until this chunk full) - for { - nread, err := r.Read(chunk[sofar:]) - sofar += nread - if err == io.EOF { - if sofar > 0 { - // log.Infof("sending out chunk with size: %d", sofar) - out <- chunk[:sofar] - } - return - } - if err != nil { - log.Errorf("Block split error: %s", err) - return - } - if sofar == ss.Size { - // log.Infof("sending out chunk with size: %d", sofar) - out <- chunk[:sofar] - break // break out of this-chunk loop - } + nread, err := io.ReadFull(r, chunk) + if nread > 0 { + // log.Infof("sending out chunk with size: %d", sofar) + out <- chunk[:nread] + } + if err == io.EOF || err == io.ErrUnexpectedEOF { + return + } + if err != nil { + log.Errorf("Block split error: %s", err) + return } } }() diff --git a/net/message/message.go b/net/message/message.go index b7fe0d972..7db5d21da 100644 --- a/net/message/message.go +++ b/net/message/message.go @@ -10,6 +10,7 @@ import ( type NetMessage interface { Peer() peer.Peer Data() []byte + Loggable() map[string]interface{} } // New is the interface for constructing a new message. @@ -35,6 +36,16 @@ func (m *message) Data() []byte { return m.data } +func (m *message) Loggable() map[string]interface{} { + return map[string]interface{}{ + "netMessage": map[string]interface{}{ + "recipient": m.Peer(), + // TODO sizeBytes? bytes? lenBytes? + "size": len(m.Data()), + }, + } +} + // FromObject creates a message from a protobuf-marshallable message. func FromObject(p peer.Peer, data proto.Message) (NetMessage, error) { bytes, err := proto.Marshal(data) diff --git a/net/swarm/conn.go b/net/swarm/conn.go index 74c3b03d4..f95334d0d 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -122,7 +122,6 @@ func (s *Swarm) peerMultiConn(p peer.Peer) (*conn.MultiConn, error) { s.Children().Add(1) mc.Children().Add(1) // child of Conn as well. go s.fanInSingle(mc) - log.Debugf("added new multiconn: %s", mc) return mc, nil } @@ -133,7 +132,7 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) { return nil, errors.New("Tried to start nil connection.") } - log.Debugf("%s Started connection: %s", c.LocalPeer(), c.RemotePeer()) + log.Event(context.TODO(), "connSetupBegin", c.LocalPeer(), c.RemotePeer()) // add address of connection to Peer. Maybe it should happen in connSecure. // NOT adding this address here, because the incoming address in TCP @@ -163,8 +162,7 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) { return nil, err } mc.Add(c) - log.Debugf("multiconn added new conn %s", c) - + log.Event(context.TODO(), "connSetupSuccess", c.LocalPeer(), c.RemotePeer()) return c, nil } @@ -200,6 +198,7 @@ func (s *Swarm) fanOut() { i++ log.Debugf("%s sent message to %s (%d)", s.local, msg.Peer(), i) + log.Event(context.TODO(), "sendMessage", s.local, msg) // queue it in the connection's buffer c.Out() <- msg.Data() } diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 9328ed7cd..f840d4fff 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -151,6 +151,8 @@ func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) { return nil, err } + // TODO replace the TODO ctx with a context passed in from caller + log.Event(context.TODO(), "dial", peer) return c, nil } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index f4d2948bc..f76ca8f59 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -98,8 +98,6 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia // Connect to a new peer at the given address, ping and add to the routing table func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, error) { - log.Debugf("Connect to new peer: %s", npeer) - // TODO(jbenet,whyrusleeping) // // Connect should take in a Peer (with ID). In a sense, we shouldn't be @@ -120,8 +118,9 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, er if err != nil { return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err) } + log.Event(ctx, "connect", dht.self, npeer) - dht.Update(npeer) + dht.Update(ctx, npeer) return npeer, nil } @@ -150,7 +149,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N } // update the peer (on valid msgs only) - dht.Update(mPeer) + dht.Update(ctx, mPeer) log.Event(ctx, "foo", dht.self, mPeer, pmes) @@ -397,8 +396,8 @@ func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error { // Update signals to all routingTables to Update their last-seen status // on the given peer. -func (dht *IpfsDHT) Update(p peer.Peer) { - log.Debugf("updating peer: %s latency = %f\n", p, p.GetLatency().Seconds()) +func (dht *IpfsDHT) Update(ctx context.Context, p peer.Peer) { + log.Event(ctx, "updatePeer", p) removedCount := 0 for _, route := range dht.routingTables { removed := route.Update(p) diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 55a68ef9e..791c1066c 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -129,7 +129,7 @@ func TestGetFailures(t *testing.T) { d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) other := makePeer(nil) - d.Update(other) + d.Update(ctx, other) // This one should time out // u.POut("Timout Test\n") @@ -232,7 +232,7 @@ func TestNotFound(t *testing.T) { var ps []peer.Peer for i := 0; i < 5; i++ { ps = append(ps, _randPeer()) - d.Update(ps[i]) + d.Update(ctx, ps[i]) } // Reply with random peers to every message @@ -298,7 +298,7 @@ func TestLessThanKResponses(t *testing.T) { var ps []peer.Peer for i := 0; i < 5; i++ { ps = append(ps, _randPeer()) - d.Update(ps[i]) + d.Update(ctx, ps[i]) } other := _randPeer() diff --git a/util/eventlog/context.go b/util/eventlog/context.go index caaa426ad..c11f47179 100644 --- a/util/eventlog/context.go +++ b/util/eventlog/context.go @@ -10,7 +10,10 @@ type key int const metadataKey key = 0 -func ContextWithMetadata(ctx context.Context, l Loggable) context.Context { +// ContextWithLoggable returns a derived context which contains the provided +// Loggable. Any Events logged with the derived context will include the +// provided Loggable. +func ContextWithLoggable(ctx context.Context, l Loggable) context.Context { existing, err := MetadataFromContext(ctx) if err != nil { // context does not contain meta. just set the new metadata diff --git a/util/eventlog/context_test.go b/util/eventlog/context_test.go index 1bab63269..8fd6c3280 100644 --- a/util/eventlog/context_test.go +++ b/util/eventlog/context_test.go @@ -10,7 +10,7 @@ func TestContextContainsMetadata(t *testing.T) { t.Parallel() m := Metadata{"foo": "bar"} - ctx := ContextWithMetadata(context.Background(), m) + ctx := ContextWithLoggable(context.Background(), m) got, err := MetadataFromContext(ctx) if err != nil { t.Fatal(err) @@ -25,8 +25,8 @@ func TestContextContainsMetadata(t *testing.T) { func TestContextWithPreexistingMetadata(t *testing.T) { t.Parallel() - ctx := ContextWithMetadata(context.Background(), Metadata{"hello": "world"}) - ctx = ContextWithMetadata(ctx, Metadata{"goodbye": "earth"}) + ctx := ContextWithLoggable(context.Background(), Metadata{"hello": "world"}) + ctx = ContextWithLoggable(ctx, Metadata{"goodbye": "earth"}) got, err := MetadataFromContext(ctx) if err != nil { diff --git a/util/eventlog/loggable.go b/util/eventlog/loggable.go new file mode 100644 index 000000000..f2c50e460 --- /dev/null +++ b/util/eventlog/loggable.go @@ -0,0 +1,6 @@ +package eventlog + +// Loggable describes objects that can be marshalled into Metadata for logging +type Loggable interface { + Loggable() map[string]interface{} +} diff --git a/util/eventlog/metadata.go b/util/eventlog/metadata.go index ea3cad6b5..d0b296733 100644 --- a/util/eventlog/metadata.go +++ b/util/eventlog/metadata.go @@ -11,11 +11,6 @@ import ( // Metadata is a convenience type for generic maps type Metadata map[string]interface{} -// Loggable describes objects that can be marshalled into Metadata for logging -type Loggable interface { - Loggable() map[string]interface{} -} - // Uuid returns a Metadata with the string key and UUID value func Uuid(key string) Metadata { return Metadata{ diff --git a/util/eventlog/option.go b/util/eventlog/option.go index 563708f80..9ab80cb4c 100644 --- a/util/eventlog/option.go +++ b/util/eventlog/option.go @@ -19,17 +19,19 @@ func init() { type Option func() +// Configure applies the provided options sequentially from left to right func Configure(options ...Option) { for _, f := range options { f() } } -// LdJSONFormatter formats the event log as line-delimited JSON +// LdJSONFormatter Option formats the event log as line-delimited JSON var LdJSONFormatter = func() { logrus.SetFormatter(&PoliteJSONFormatter{}) } +// TextFormatter Option formats the event log as human-readable plain-text var TextFormatter = func() { logrus.SetFormatter(&logrus.TextFormatter{}) } @@ -60,14 +62,17 @@ func OutputRotatingLogFile(config LogRotatorConfig) Option { } } +// LevelDebug Option sets the log level to debug var LevelDebug = func() { logrus.SetLevel(logrus.DebugLevel) } +// LevelDebug Option sets the log level to error var LevelError = func() { logrus.SetLevel(logrus.ErrorLevel) } +// LevelDebug Option sets the log level to info var LevelInfo = func() { logrus.SetLevel(logrus.InfoLevel) }