diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index f487713e7..9f5fc729a 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -89,6 +89,10 @@ "ImportPath": "github.com/inconshreveable/go-update", "Rev": "221d034a558b4c21b0624b2a450c076913854a57" }, + { + "ImportPath": "github.com/hashicorp/golang-lru", + "Rev": "253b2dc1ca8bae42c3b5b6e53dd2eab1a7551116" + }, { "ImportPath": "github.com/jbenet/go-base58", "Rev": "568a28d73fd97651d3442392036a658b6976eed5" diff --git a/Godeps/_workspace/src/github.com/hashicorp/golang-lru/.gitignore b/Godeps/_workspace/src/github.com/hashicorp/golang-lru/.gitignore new file mode 100644 index 000000000..836562412 --- /dev/null +++ b/Godeps/_workspace/src/github.com/hashicorp/golang-lru/.gitignore @@ -0,0 +1,23 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test diff --git a/Godeps/_workspace/src/github.com/hashicorp/golang-lru/LICENSE b/Godeps/_workspace/src/github.com/hashicorp/golang-lru/LICENSE new file mode 100644 index 000000000..be2cc4dfb --- /dev/null +++ b/Godeps/_workspace/src/github.com/hashicorp/golang-lru/LICENSE @@ -0,0 +1,362 @@ +Mozilla Public License, version 2.0 + +1. Definitions + +1.1. "Contributor" + + means each individual or legal entity that creates, contributes to the + creation of, or owns Covered Software. + +1.2. "Contributor Version" + + means the combination of the Contributions of others (if any) used by a + Contributor and that particular Contributor's Contribution. + +1.3. "Contribution" + + means Covered Software of a particular Contributor. + +1.4. "Covered Software" + + means Source Code Form to which the initial Contributor has attached the + notice in Exhibit A, the Executable Form of such Source Code Form, and + Modifications of such Source Code Form, in each case including portions + thereof. + +1.5. "Incompatible With Secondary Licenses" + means + + a. that the initial Contributor has attached the notice described in + Exhibit B to the Covered Software; or + + b. that the Covered Software was made available under the terms of + version 1.1 or earlier of the License, but not also under the terms of + a Secondary License. + +1.6. "Executable Form" + + means any form of the work other than Source Code Form. + +1.7. "Larger Work" + + means a work that combines Covered Software with other material, in a + separate file or files, that is not Covered Software. + +1.8. "License" + + means this document. + +1.9. "Licensable" + + means having the right to grant, to the maximum extent possible, whether + at the time of the initial grant or subsequently, any and all of the + rights conveyed by this License. + +1.10. "Modifications" + + means any of the following: + + a. any file in Source Code Form that results from an addition to, + deletion from, or modification of the contents of Covered Software; or + + b. any new file in Source Code Form that contains any Covered Software. + +1.11. "Patent Claims" of a Contributor + + means any patent claim(s), including without limitation, method, + process, and apparatus claims, in any patent Licensable by such + Contributor that would be infringed, but for the grant of the License, + by the making, using, selling, offering for sale, having made, import, + or transfer of either its Contributions or its Contributor Version. + +1.12. "Secondary License" + + means either the GNU General Public License, Version 2.0, the GNU Lesser + General Public License, Version 2.1, the GNU Affero General Public + License, Version 3.0, or any later versions of those licenses. + +1.13. "Source Code Form" + + means the form of the work preferred for making modifications. + +1.14. "You" (or "Your") + + means an individual or a legal entity exercising rights under this + License. For legal entities, "You" includes any entity that controls, is + controlled by, or is under common control with You. For purposes of this + definition, "control" means (a) the power, direct or indirect, to cause + the direction or management of such entity, whether by contract or + otherwise, or (b) ownership of more than fifty percent (50%) of the + outstanding shares or beneficial ownership of such entity. + + +2. License Grants and Conditions + +2.1. Grants + + Each Contributor hereby grants You a world-wide, royalty-free, + non-exclusive license: + + a. under intellectual property rights (other than patent or trademark) + Licensable by such Contributor to use, reproduce, make available, + modify, display, perform, distribute, and otherwise exploit its + Contributions, either on an unmodified basis, with Modifications, or + as part of a Larger Work; and + + b. under Patent Claims of such Contributor to make, use, sell, offer for + sale, have made, import, and otherwise transfer either its + Contributions or its Contributor Version. + +2.2. Effective Date + + The licenses granted in Section 2.1 with respect to any Contribution + become effective for each Contribution on the date the Contributor first + distributes such Contribution. + +2.3. Limitations on Grant Scope + + The licenses granted in this Section 2 are the only rights granted under + this License. No additional rights or licenses will be implied from the + distribution or licensing of Covered Software under this License. + Notwithstanding Section 2.1(b) above, no patent license is granted by a + Contributor: + + a. for any code that a Contributor has removed from Covered Software; or + + b. for infringements caused by: (i) Your and any other third party's + modifications of Covered Software, or (ii) the combination of its + Contributions with other software (except as part of its Contributor + Version); or + + c. under Patent Claims infringed by Covered Software in the absence of + its Contributions. + + This License does not grant any rights in the trademarks, service marks, + or logos of any Contributor (except as may be necessary to comply with + the notice requirements in Section 3.4). + +2.4. Subsequent Licenses + + No Contributor makes additional grants as a result of Your choice to + distribute the Covered Software under a subsequent version of this + License (see Section 10.2) or under the terms of a Secondary License (if + permitted under the terms of Section 3.3). + +2.5. Representation + + Each Contributor represents that the Contributor believes its + Contributions are its original creation(s) or it has sufficient rights to + grant the rights to its Contributions conveyed by this License. + +2.6. Fair Use + + This License is not intended to limit any rights You have under + applicable copyright doctrines of fair use, fair dealing, or other + equivalents. + +2.7. Conditions + + Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted in + Section 2.1. + + +3. Responsibilities + +3.1. Distribution of Source Form + + All distribution of Covered Software in Source Code Form, including any + Modifications that You create or to which You contribute, must be under + the terms of this License. You must inform recipients that the Source + Code Form of the Covered Software is governed by the terms of this + License, and how they can obtain a copy of this License. You may not + attempt to alter or restrict the recipients' rights in the Source Code + Form. + +3.2. Distribution of Executable Form + + If You distribute Covered Software in Executable Form then: + + a. such Covered Software must also be made available in Source Code Form, + as described in Section 3.1, and You must inform recipients of the + Executable Form how they can obtain a copy of such Source Code Form by + reasonable means in a timely manner, at a charge no more than the cost + of distribution to the recipient; and + + b. You may distribute such Executable Form under the terms of this + License, or sublicense it under different terms, provided that the + license for the Executable Form does not attempt to limit or alter the + recipients' rights in the Source Code Form under this License. + +3.3. Distribution of a Larger Work + + You may create and distribute a Larger Work under terms of Your choice, + provided that You also comply with the requirements of this License for + the Covered Software. If the Larger Work is a combination of Covered + Software with a work governed by one or more Secondary Licenses, and the + Covered Software is not Incompatible With Secondary Licenses, this + License permits You to additionally distribute such Covered Software + under the terms of such Secondary License(s), so that the recipient of + the Larger Work may, at their option, further distribute the Covered + Software under the terms of either this License or such Secondary + License(s). + +3.4. Notices + + You may not remove or alter the substance of any license notices + (including copyright notices, patent notices, disclaimers of warranty, or + limitations of liability) contained within the Source Code Form of the + Covered Software, except that You may alter any license notices to the + extent required to remedy known factual inaccuracies. + +3.5. Application of Additional Terms + + You may choose to offer, and to charge a fee for, warranty, support, + indemnity or liability obligations to one or more recipients of Covered + Software. However, You may do so only on Your own behalf, and not on + behalf of any Contributor. You must make it absolutely clear that any + such warranty, support, indemnity, or liability obligation is offered by + You alone, and You hereby agree to indemnify every Contributor for any + liability incurred by such Contributor as a result of warranty, support, + indemnity or liability terms You offer. You may include additional + disclaimers of warranty and limitations of liability specific to any + jurisdiction. + +4. Inability to Comply Due to Statute or Regulation + + If it is impossible for You to comply with any of the terms of this License + with respect to some or all of the Covered Software due to statute, + judicial order, or regulation then You must: (a) comply with the terms of + this License to the maximum extent possible; and (b) describe the + limitations and the code they affect. Such description must be placed in a + text file included with all distributions of the Covered Software under + this License. Except to the extent prohibited by statute or regulation, + such description must be sufficiently detailed for a recipient of ordinary + skill to be able to understand it. + +5. Termination + +5.1. The rights granted under this License will terminate automatically if You + fail to comply with any of its terms. However, if You become compliant, + then the rights granted under this License from a particular Contributor + are reinstated (a) provisionally, unless and until such Contributor + explicitly and finally terminates Your grants, and (b) on an ongoing + basis, if such Contributor fails to notify You of the non-compliance by + some reasonable means prior to 60 days after You have come back into + compliance. Moreover, Your grants from a particular Contributor are + reinstated on an ongoing basis if such Contributor notifies You of the + non-compliance by some reasonable means, this is the first time You have + received notice of non-compliance with this License from such + Contributor, and You become compliant prior to 30 days after Your receipt + of the notice. + +5.2. If You initiate litigation against any entity by asserting a patent + infringement claim (excluding declaratory judgment actions, + counter-claims, and cross-claims) alleging that a Contributor Version + directly or indirectly infringes any patent, then the rights granted to + You by any and all Contributors for the Covered Software under Section + 2.1 of this License shall terminate. + +5.3. In the event of termination under Sections 5.1 or 5.2 above, all end user + license agreements (excluding distributors and resellers) which have been + validly granted by You or Your distributors under this License prior to + termination shall survive termination. + +6. Disclaimer of Warranty + + Covered Software is provided under this License on an "as is" basis, + without warranty of any kind, either expressed, implied, or statutory, + including, without limitation, warranties that the Covered Software is free + of defects, merchantable, fit for a particular purpose or non-infringing. + The entire risk as to the quality and performance of the Covered Software + is with You. Should any Covered Software prove defective in any respect, + You (not any Contributor) assume the cost of any necessary servicing, + repair, or correction. This disclaimer of warranty constitutes an essential + part of this License. No use of any Covered Software is authorized under + this License except under this disclaimer. + +7. Limitation of Liability + + Under no circumstances and under no legal theory, whether tort (including + negligence), contract, or otherwise, shall any Contributor, or anyone who + distributes Covered Software as permitted above, be liable to You for any + direct, indirect, special, incidental, or consequential damages of any + character including, without limitation, damages for lost profits, loss of + goodwill, work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses, even if such party shall have been + informed of the possibility of such damages. This limitation of liability + shall not apply to liability for death or personal injury resulting from + such party's negligence to the extent applicable law prohibits such + limitation. Some jurisdictions do not allow the exclusion or limitation of + incidental or consequential damages, so this exclusion and limitation may + not apply to You. + +8. Litigation + + Any litigation relating to this License may be brought only in the courts + of a jurisdiction where the defendant maintains its principal place of + business and such litigation shall be governed by laws of that + jurisdiction, without reference to its conflict-of-law provisions. Nothing + in this Section shall prevent a party's ability to bring cross-claims or + counter-claims. + +9. Miscellaneous + + This License represents the complete agreement concerning the subject + matter hereof. If any provision of this License is held to be + unenforceable, such provision shall be reformed only to the extent + necessary to make it enforceable. Any law or regulation which provides that + the language of a contract shall be construed against the drafter shall not + be used to construe this License against a Contributor. + + +10. Versions of the License + +10.1. New Versions + + Mozilla Foundation is the license steward. Except as provided in Section + 10.3, no one other than the license steward has the right to modify or + publish new versions of this License. Each version will be given a + distinguishing version number. + +10.2. Effect of New Versions + + You may distribute the Covered Software under the terms of the version + of the License under which You originally received the Covered Software, + or under the terms of any subsequent version published by the license + steward. + +10.3. Modified Versions + + If you create software not governed by this License, and you want to + create a new license for such software, you may create and use a + modified version of this License if you rename the license and remove + any references to the name of the license steward (except to note that + such modified license differs from this License). + +10.4. Distributing Source Code Form that is Incompatible With Secondary + Licenses If You choose to distribute Source Code Form that is + Incompatible With Secondary Licenses under the terms of this version of + the License, the notice described in Exhibit B of this License must be + attached. + +Exhibit A - Source Code Form License Notice + + This Source Code Form is subject to the + terms of the Mozilla Public License, v. + 2.0. If a copy of the MPL was not + distributed with this file, You can + obtain one at + http://mozilla.org/MPL/2.0/. + +If it is not possible or desirable to put the notice in a particular file, +then You may include the notice in a location (such as a LICENSE file in a +relevant directory) where a recipient would be likely to look for such a +notice. + +You may add additional accurate notices of copyright ownership. + +Exhibit B - "Incompatible With Secondary Licenses" Notice + + This Source Code Form is "Incompatible + With Secondary Licenses", as defined by + the Mozilla Public License, v. 2.0. diff --git a/Godeps/_workspace/src/github.com/hashicorp/golang-lru/README.md b/Godeps/_workspace/src/github.com/hashicorp/golang-lru/README.md new file mode 100644 index 000000000..37fa54d42 --- /dev/null +++ b/Godeps/_workspace/src/github.com/hashicorp/golang-lru/README.md @@ -0,0 +1,25 @@ +golang-lru +========== + +This provides the `lru` package which implements a fixed-size +thread safe LRU cache. It is based on the cache in Groupcache. + +Documentation +============= + +Full docs are available on [Godoc](http://godoc.org/github.com/hashicorp/golang-lru) + +Example +======= + +Using the LRU is very simple: + +```go +l, _ := New(128) +for i := 0; i < 256; i++ { + l.Add(i, nil) +} +if l.Len() != 128 { + panic("bad len: %v", l.Len()) +} +``` diff --git a/Godeps/_workspace/src/github.com/hashicorp/golang-lru/lru.go b/Godeps/_workspace/src/github.com/hashicorp/golang-lru/lru.go new file mode 100644 index 000000000..174767c54 --- /dev/null +++ b/Godeps/_workspace/src/github.com/hashicorp/golang-lru/lru.go @@ -0,0 +1,134 @@ +// This package provides a simple LRU cache. It is based on the +// LRU implementation in groupcache: +// https://github.com/golang/groupcache/tree/master/lru +package lru + +import ( + "container/list" + "errors" + "sync" +) + +// Cache is a thread-safe fixed size LRU cache. +type Cache struct { + size int + evictList *list.List + items map[interface{}]*list.Element + lock sync.Mutex +} + +// entry is used to hold a value in the evictList +type entry struct { + key interface{} + value interface{} +} + +// New creates an LRU of the given size +func New(size int) (*Cache, error) { + if size <= 0 { + return nil, errors.New("Must provide a positive size") + } + c := &Cache{ + size: size, + evictList: list.New(), + items: make(map[interface{}]*list.Element, size), + } + return c, nil +} + +// Purge is used to completely clear the cache +func (c *Cache) Purge() { + c.lock.Lock() + defer c.lock.Unlock() + c.evictList = list.New() + c.items = make(map[interface{}]*list.Element, c.size) +} + +// Add adds a value to the cache. +func (c *Cache) Add(key, value interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + + // Check for existing item + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + ent.Value.(*entry).value = value + return + } + + // Add new item + ent := &entry{key, value} + entry := c.evictList.PushFront(ent) + c.items[key] = entry + + // Verify size not exceeded + if c.evictList.Len() > c.size { + c.removeOldest() + } +} + +// Get looks up a key's value from the cache. +func (c *Cache) Get(key interface{}) (value interface{}, ok bool) { + c.lock.Lock() + defer c.lock.Unlock() + + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + return ent.Value.(*entry).value, true + } + return +} + +// Remove removes the provided key from the cache. +func (c *Cache) Remove(key interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + + if ent, ok := c.items[key]; ok { + c.removeElement(ent) + } +} + +// RemoveOldest removes the oldest item from the cache. +func (c *Cache) RemoveOldest() { + c.lock.Lock() + defer c.lock.Unlock() + c.removeOldest() +} + +// Keys returns a slice of the keys in the cache. +func (c *Cache) Keys() []interface{} { + c.lock.Lock() + defer c.lock.Unlock() + + keys := make([]interface{}, len(c.items)) + i := 0 + for k := range c.items { + keys[i] = k + i++ + } + + return keys +} + +// removeOldest removes the oldest item from the cache. +func (c *Cache) removeOldest() { + ent := c.evictList.Back() + if ent != nil { + c.removeElement(ent) + } +} + +// removeElement is used to remove a given list element from the cache +func (c *Cache) removeElement(e *list.Element) { + c.evictList.Remove(e) + kv := e.Value.(*entry) + delete(c.items, kv.key) +} + +// Len returns the number of items in the cache. +func (c *Cache) Len() int { + c.lock.Lock() + defer c.lock.Unlock() + return c.evictList.Len() +} diff --git a/Godeps/_workspace/src/github.com/hashicorp/golang-lru/lru_test.go b/Godeps/_workspace/src/github.com/hashicorp/golang-lru/lru_test.go new file mode 100644 index 000000000..626b95fbe --- /dev/null +++ b/Godeps/_workspace/src/github.com/hashicorp/golang-lru/lru_test.go @@ -0,0 +1,48 @@ +package lru + +import "testing" + +func TestLRU(t *testing.T) { + l, err := New(128) + if err != nil { + t.Fatalf("err: %v", err) + } + for i := 0; i < 256; i++ { + l.Add(i, i) + } + if l.Len() != 128 { + t.Fatalf("bad len: %v", l.Len()) + } + for _, k := range l.Keys() { + if v, ok := l.Get(k); !ok || v != k { + t.Fatalf("bad key: %v", k) + } + } + for i := 0; i < 128; i++ { + _, ok := l.Get(i) + if ok { + t.Fatalf("should be evicted") + } + } + for i := 128; i < 256; i++ { + _, ok := l.Get(i) + if !ok { + t.Fatalf("should not be evicted") + } + } + for i := 128; i < 192; i++ { + l.Remove(i) + _, ok := l.Get(i) + if ok { + t.Fatalf("should be deleted") + } + } + + l.Purge() + if l.Len() != 0 { + t.Fatalf("bad len: %v", l.Len()) + } + if _, ok := l.Get(200); ok { + t.Fatalf("should contain nothing") + } +} diff --git a/blocks/blockstore/write_cache.go b/blocks/blockstore/write_cache.go new file mode 100644 index 000000000..b46d05846 --- /dev/null +++ b/blocks/blockstore/write_cache.go @@ -0,0 +1,45 @@ +package blockstore + +import ( + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/golang-lru" + "github.com/jbenet/go-ipfs/blocks" + u "github.com/jbenet/go-ipfs/util" +) + +// WriteCached returns a blockstore that caches up to |size| unique writes (bs.Put). +func WriteCached(bs Blockstore, size int) (Blockstore, error) { + c, err := lru.New(size) + if err != nil { + return nil, err + } + return &writecache{blockstore: bs, cache: c}, nil +} + +type writecache struct { + cache *lru.Cache // pointer b/c Cache contains a Mutex as value (complicates copying) + blockstore Blockstore +} + +func (w *writecache) DeleteBlock(k u.Key) error { + w.cache.Remove(k) + return w.blockstore.DeleteBlock(k) +} + +func (w *writecache) Has(k u.Key) (bool, error) { + if _, ok := w.cache.Get(k); ok { + return true, nil + } + return w.blockstore.Has(k) +} + +func (w *writecache) Get(k u.Key) (*blocks.Block, error) { + return w.blockstore.Get(k) +} + +func (w *writecache) Put(b *blocks.Block) error { + if _, ok := w.cache.Get(b.Key()); ok { + return nil + } + w.cache.Add(b.Key(), struct{}{}) + return w.blockstore.Put(b) +} diff --git a/blocks/blockstore/write_cache_test.go b/blocks/blockstore/write_cache_test.go new file mode 100644 index 000000000..2be865903 --- /dev/null +++ b/blocks/blockstore/write_cache_test.go @@ -0,0 +1,88 @@ +package blockstore + +import ( + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + "github.com/jbenet/go-ipfs/blocks" + "testing" +) + +func TestReturnsErrorWhenSizeNegative(t *testing.T) { + bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) + _, err := WriteCached(bs, -1) + if err != nil { + return + } + t.Fail() +} + +func TestRemoveCacheEntryOnDelete(t *testing.T) { + b := blocks.NewBlock([]byte("foo")) + cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} + bs := NewBlockstore(syncds.MutexWrap(cd)) + cachedbs, err := WriteCached(bs, 1) + if err != nil { + t.Fatal(err) + } + cachedbs.Put(b) + + writeHitTheDatastore := false + cd.SetFunc(func() { + writeHitTheDatastore = true + }) + + cachedbs.DeleteBlock(b.Key()) + cachedbs.Put(b) + if !writeHitTheDatastore { + t.Fail() + } +} + +func TestElideDuplicateWrite(t *testing.T) { + cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} + bs := NewBlockstore(syncds.MutexWrap(cd)) + cachedbs, err := WriteCached(bs, 1) + if err != nil { + t.Fatal(err) + } + + b1 := blocks.NewBlock([]byte("foo")) + + cachedbs.Put(b1) + cd.SetFunc(func() { + t.Fatal("write hit the datastore") + }) + cachedbs.Put(b1) +} + +type callbackDatastore struct { + f func() + ds ds.Datastore +} + +func (c *callbackDatastore) SetFunc(f func()) { c.f = f } + +func (c *callbackDatastore) Put(key ds.Key, value interface{}) (err error) { + c.f() + return c.ds.Put(key, value) +} + +func (c *callbackDatastore) Get(key ds.Key) (value interface{}, err error) { + c.f() + return c.ds.Get(key) +} + +func (c *callbackDatastore) Has(key ds.Key) (exists bool, err error) { + c.f() + return c.ds.Has(key) +} + +func (c *callbackDatastore) Delete(key ds.Key) (err error) { + c.f() + return c.ds.Delete(key) +} + +func (c *callbackDatastore) KeyList() ([]ds.Key, error) { + c.f() + return c.ds.KeyList() +} diff --git a/blockservice/blocks_test.go b/blockservice/blocks_test.go index 1a75723e2..2645b2024 100644 --- a/blockservice/blocks_test.go +++ b/blockservice/blocks_test.go @@ -19,9 +19,8 @@ import ( ) func TestBlocks(t *testing.T) { - d := ds.NewMapDatastore() - tsds := dssync.MutexWrap(d) - bs, err := New(blockstore.NewBlockstore(tsds), offline.Exchange()) + bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + bs, err := New(bstore, offline.Exchange(bstore)) if err != nil { t.Error("failed to construct block service", err) return diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 0ddec7955..0ebe30a4d 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -25,7 +25,7 @@ var ErrNotFound = errors.New("blockservice: key not found") type BlockService struct { // TODO don't expose underlying impl details Blockstore blockstore.Blockstore - Remote exchange.Interface + Exchange exchange.Interface } // NewBlockService creates a BlockService with given datastore instance. @@ -36,7 +36,7 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error if rem == nil { log.Warning("blockservice running in local (offline) mode.") } - return &BlockService{Blockstore: bs, Remote: rem}, nil + return &BlockService{Blockstore: bs, Exchange: rem}, nil } // AddBlock adds a particular block to the service, Putting it into the datastore. @@ -66,9 +66,9 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { // TODO this operation rate-limits blockservice operations, we should // consider moving this to an sync process. - if s.Remote != nil { + if s.Exchange != nil { ctx := context.TODO() - err = s.Remote.HasBlock(ctx, b) + err = s.Exchange.HasBlock(ctx, b) } return k, err } @@ -82,9 +82,9 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er return block, nil // TODO be careful checking ErrNotFound. If the underlying // implementation changes, this will break. - } else if err == ds.ErrNotFound && s.Remote != nil { + } else if err == ds.ErrNotFound && s.Exchange != nil { log.Debug("Blockservice: Searching bitswap.") - blk, err := s.Remote.GetBlock(ctx, k) + blk, err := s.Exchange.GetBlock(ctx, k) if err != nil { return nil, err } @@ -117,7 +117,7 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks } } - rblocks, err := s.Remote.GetBlocks(ctx, misses) + rblocks, err := s.Exchange.GetBlocks(ctx, misses) if err != nil { log.Errorf("Error with GetBlocks: %s", err) return diff --git a/core/core.go b/core/core.go index 148853bfc..007c01b80 100644 --- a/core/core.go +++ b/core/core.go @@ -8,7 +8,7 @@ import ( b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" + bstore "github.com/jbenet/go-ipfs/blocks/blockstore" bserv "github.com/jbenet/go-ipfs/blockservice" config "github.com/jbenet/go-ipfs/config" diag "github.com/jbenet/go-ipfs/diagnostics" @@ -35,6 +35,7 @@ import ( ) const IpnsValidatorTag = "ipns" +const kSizeBlockstoreWriteCache = 100 var log = eventlog.Logger("core") @@ -130,7 +131,8 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) { return nil, debugerror.Wrap(err) } - n.Exchange = offline.Exchange() + blockstore, err := bstore.WriteCached(bstore.NewBlockstore(n.Datastore), kSizeBlockstoreWriteCache) + n.Exchange = offline.Exchange(blockstore) // setup online services if online { @@ -174,16 +176,15 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) { // setup exchange service const alwaysSendToPeer = true // use YesManStrategy bitswapNetwork := bsnet.NewFromIpfsNetwork(exchangeService, n.Network) - bstore := blockstore.NewBlockstore(n.Datastore) - n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, bstore, alwaysSendToPeer) + n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, blockstore, alwaysSendToPeer) go initConnections(ctx, n.Config, n.Peerstore, dhtRouting) } // TODO(brian): when offline instantiate the BlockService with a bitswap // session that simply doesn't return blocks - n.Blocks, err = bserv.New(blockstore.NewBlockstore(n.Datastore), n.Exchange) + n.Blocks, err = bserv.New(blockstore, n.Exchange) if err != nil { return nil, debugerror.Wrap(err) } diff --git a/core/mock.go b/core/mock.go index 2d2359277..ab1149787 100644 --- a/core/mock.go +++ b/core/mock.go @@ -45,7 +45,8 @@ func NewMockNode() (*IpfsNode, error) { nd.Routing = dht // Bitswap - bserv, err := blockservice.New(blockstore.NewBlockstore(nd.Datastore), offline.Exchange()) + bstore := blockstore.NewBlockstore(nd.Datastore) + bserv, err := blockservice.New(bstore, offline.Exchange(bstore)) if err != nil { return nil, err } diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index e00b23f91..8dbf05314 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -145,6 +145,22 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. } } +// HasBlock announces the existance of a block to this bitswap service. The +// service will potentially notify its peers. +func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { + if err := bs.blockstore.Put(blk); err != nil { + return err + } + bs.wantlist.Remove(blk.Key()) + bs.notifications.Publish(blk) + child, _ := context.WithTimeout(ctx, hasBlockTimeout) + if err := bs.sendToPeersThatWant(child, blk); err != nil { + return err + } + child, _ = context.WithTimeout(ctx, hasBlockTimeout) + return bs.routing.Provide(child, blk.Key()) +} + func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error { if peers == nil { panic("Cant send wantlist to nil peerchan") @@ -245,33 +261,6 @@ func (bs *bitswap) loop(parent context.Context) { } } -// HasBlock announces the existance of a block to this bitswap service. The -// service will potentially notify its peers. -func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { - // TODO check all errors - log.Debugf("Has Block %s", blk.Key()) - bs.wantlist.Remove(blk.Key()) - bs.notifications.Publish(blk) - - child, _ := context.WithTimeout(ctx, hasBlockTimeout) - bs.sendToPeersThatWant(child, blk) - child, _ = context.WithTimeout(ctx, hasBlockTimeout) - return bs.routing.Provide(child, blk.Key()) -} - -// receiveBlock handles storing the block in the blockstore and calling HasBlock -func (bs *bitswap) receiveBlock(ctx context.Context, block *blocks.Block) { - // TODO verify blocks? - if err := bs.blockstore.Put(block); err != nil { - log.Criticalf("error putting block: %s", err) - return - } - err := bs.HasBlock(ctx, block) - if err != nil { - log.Warningf("HasBlock errored: %s", err) - } -} - // TODO(brian): handle errors func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) ( peer.Peer, bsmsg.BitSwapMessage) { @@ -295,11 +284,11 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm // and number of bytes transfered. bs.strategy.MessageReceived(p, incoming) - go func() { - for _, block := range incoming.Blocks() { - bs.receiveBlock(ctx, block) + for _, block := range incoming.Blocks() { + if err := bs.HasBlock(ctx, block); err != nil { + log.Error(err) } - }() + } for _, key := range incoming.Wantlist() { if bs.strategy.ShouldSendBlockToPeer(key, p) { @@ -334,27 +323,29 @@ func (bs *bitswap) ReceiveError(err error) { // send strives to ensure that accounting is always performed when a message is // sent -func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) { - bs.sender.SendMessage(ctx, p, m) - bs.strategy.MessageSent(p, m) +func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) error { + if err := bs.sender.SendMessage(ctx, p, m); err != nil { + return err + } + return bs.strategy.MessageSent(p, m) } -func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) { - log.Debugf("Sending %s to peers that want it", block) - +func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) error { for _, p := range bs.strategy.Peers() { if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { - log.Debugf("%v wants %v", p, block.Key()) if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { message := bsmsg.New() message.AddBlock(block) for _, wanted := range bs.wantlist.Keys() { message.AddWanted(wanted) } - bs.send(ctx, p, message) + if err := bs.send(ctx, p, message); err != nil { + return err + } } } } + return nil } func (bs *bitswap) Close() error { diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index 24a89e038..f1a6aaa61 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -3,42 +3,66 @@ package offline import ( - "errors" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - blocks "github.com/jbenet/go-ipfs/blocks" + "github.com/jbenet/go-ipfs/blocks/blockstore" exchange "github.com/jbenet/go-ipfs/exchange" u "github.com/jbenet/go-ipfs/util" ) -var OfflineMode = errors.New("Block unavailable. Operating in offline mode") - -func Exchange() exchange.Interface { - return &offlineExchange{} +func Exchange(bs blockstore.Blockstore) exchange.Interface { + return &offlineExchange{bs: bs} } // offlineExchange implements the Exchange interface but doesn't return blocks. // For use in offline mode. -type offlineExchange struct{} +type offlineExchange struct { + bs blockstore.Blockstore +} // GetBlock returns nil to signal that a block could not be retrieved for the // given key. // NB: This function may return before the timeout expires. -func (_ *offlineExchange) GetBlock(context.Context, u.Key) (*blocks.Block, error) { - return nil, OfflineMode +func (e *offlineExchange) GetBlock(_ context.Context, k u.Key) (*blocks.Block, error) { + return e.bs.Get(k) } // HasBlock always returns nil. -func (_ *offlineExchange) HasBlock(context.Context, *blocks.Block) error { - return nil +func (e *offlineExchange) HasBlock(_ context.Context, b *blocks.Block) error { + return e.bs.Put(b) } // Close always returns nil. func (_ *offlineExchange) Close() error { + // NB: exchange doesn't own the blockstore's underlying datastore, so it is + // not responsible for closing it. return nil } -func (_ *offlineExchange) GetBlocks(context.Context, []u.Key) (<-chan *blocks.Block, error) { - return nil, OfflineMode +func (e *offlineExchange) GetBlocks(ctx context.Context, ks []u.Key) (<-chan *blocks.Block, error) { + out := make(chan *blocks.Block, 0) + go func() { + defer close(out) + var misses []u.Key + for _, k := range ks { + hit, err := e.bs.Get(k) + if err != nil { + misses = append(misses, k) + // a long line of misses should abort when context is cancelled. + select { + // TODO case send misses down channel + case <-ctx.Done(): + return + default: + continue + } + } + select { + case out <- hit: + case <-ctx.Done(): + return + } + } + }() + return out, nil } diff --git a/exchange/offline/offline_test.go b/exchange/offline/offline_test.go index ac02d2101..d32f336d0 100644 --- a/exchange/offline/offline_test.go +++ b/exchange/offline/offline_test.go @@ -4,13 +4,16 @@ import ( "testing" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" blocks "github.com/jbenet/go-ipfs/blocks" + "github.com/jbenet/go-ipfs/blocks/blockstore" + "github.com/jbenet/go-ipfs/blocks/blocksutil" u "github.com/jbenet/go-ipfs/util" ) func TestBlockReturnsErr(t *testing.T) { - off := Exchange() + off := Exchange(bstore()) _, err := off.GetBlock(context.Background(), u.Key("foo")) if err != nil { return // as desired @@ -19,10 +22,56 @@ func TestBlockReturnsErr(t *testing.T) { } func TestHasBlockReturnsNil(t *testing.T) { - off := Exchange() + store := bstore() + ex := Exchange(store) block := blocks.NewBlock([]byte("data")) - err := off.HasBlock(context.Background(), block) + + err := ex.HasBlock(context.Background(), block) if err != nil { - t.Fatal("") + t.Fail() + } + + if _, err := store.Get(block.Key()); err != nil { + t.Fatal(err) } } + +func TestGetBlocks(t *testing.T) { + store := bstore() + ex := Exchange(store) + g := blocksutil.NewBlockGenerator() + + expected := g.Blocks(2) + + for _, b := range expected { + if err := ex.HasBlock(context.Background(), b); err != nil { + t.Fail() + } + } + + request := func() []u.Key { + var ks []u.Key + + for _, b := range expected { + ks = append(ks, b.Key()) + } + return ks + }() + + received, err := ex.GetBlocks(context.Background(), request) + if err != nil { + t.Fatal(err) + } + + var count int + for _ = range received { + count++ + } + if len(expected) != count { + t.Fail() + } +} + +func bstore() blockstore.Blockstore { + return blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) +} diff --git a/pin/pin_test.go b/pin/pin_test.go index fc9dc215d..dada99803 100644 --- a/pin/pin_test.go +++ b/pin/pin_test.go @@ -23,7 +23,7 @@ func randNode() (*mdag.Node, util.Key) { func TestPinnerBasic(t *testing.T) { dstore := ds.NewMapDatastore() bstore := blockstore.NewBlockstore(dssync.MutexWrap(dstore)) - bserv, err := bs.New(bstore, offline.Exchange()) + bserv, err := bs.New(bstore, offline.Exchange(bstore)) if err != nil { t.Fatal(err) } diff --git a/routing/dht/providers.go b/routing/dht/providers.go index 7f70056d3..0deea6324 100644 --- a/routing/dht/providers.go +++ b/routing/dht/providers.go @@ -107,10 +107,15 @@ func (pm *ProviderManager) GetProviders(ctx context.Context, k u.Key) []peer.Pee resp: make(chan []peer.Peer, 1), // buffered to prevent sender from blocking } select { - case pm.getprovs <- gp: - return <-gp.resp case <-ctx.Done(): return nil + case pm.getprovs <- gp: + } + select { + case <-ctx.Done(): + return nil + case peers := <-gp.resp: + return peers } } diff --git a/unixfs/io/dagmodifier_test.go b/unixfs/io/dagmodifier_test.go index d0aa83795..ed5b10d69 100644 --- a/unixfs/io/dagmodifier_test.go +++ b/unixfs/io/dagmodifier_test.go @@ -24,7 +24,7 @@ func getMockDagServ(t *testing.T) mdag.DAGService { dstore := ds.NewMapDatastore() tsds := sync.MutexWrap(dstore) bstore := blockstore.NewBlockstore(tsds) - bserv, err := bs.New(bstore, offline.Exchange()) + bserv, err := bs.New(bstore, offline.Exchange(bstore)) if err != nil { t.Fatal(err) } diff --git a/util/testutil/gen.go b/util/testutil/gen.go index c7f310c2a..186740ea9 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -16,9 +16,8 @@ import ( ) func GetDAGServ(t testing.TB) dag.DAGService { - dstore := ds.NewMapDatastore() - tsds := dssync.MutexWrap(dstore) - bserv, err := bsrv.New(blockstore.NewBlockstore(tsds), offline.Exchange()) + bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + bserv, err := bsrv.New(bstore, offline.Exchange(bstore)) if err != nil { t.Fatal(err) }