mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
remove unneeded thirdparty packages (#10871)
Some checks failed
CodeQL / codeql (push) Has been cancelled
Docker Build / docker-build (push) Has been cancelled
Gateway Conformance / gateway-conformance (push) Has been cancelled
Gateway Conformance / gateway-conformance-libp2p-experiment (push) Has been cancelled
Go Build / go-build (push) Has been cancelled
Go Check / go-check (push) Has been cancelled
Go Lint / go-lint (push) Has been cancelled
Go Test / go-test (push) Has been cancelled
Interop / interop-prep (push) Has been cancelled
Sharness / sharness-test (push) Has been cancelled
Spell Check / spellcheck (push) Has been cancelled
Interop / helia-interop (push) Has been cancelled
Interop / ipfs-webui (push) Has been cancelled
Some checks failed
CodeQL / codeql (push) Has been cancelled
Docker Build / docker-build (push) Has been cancelled
Gateway Conformance / gateway-conformance (push) Has been cancelled
Gateway Conformance / gateway-conformance-libp2p-experiment (push) Has been cancelled
Go Build / go-build (push) Has been cancelled
Go Check / go-check (push) Has been cancelled
Go Lint / go-lint (push) Has been cancelled
Go Test / go-test (push) Has been cancelled
Interop / interop-prep (push) Has been cancelled
Sharness / sharness-test (push) Has been cancelled
Spell Check / spellcheck (push) Has been cancelled
Interop / helia-interop (push) Has been cancelled
Interop / ipfs-webui (push) Has been cancelled
* remove unneeded thirdparty packages Remove unnecessary packages from `thirdparty` in repo. - Remove `thirdparty/assert` (replaced by `github.com/stretchr/testify/require`) - Remove `thirdparty/dir` (replacd by `misc/fsutil`) - Remove `thirdparty/notifier` (unused)
This commit is contained in:
parent
a22efea6f3
commit
bb58ca4737
@ -6,11 +6,11 @@ package main
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ipfs/kubo/thirdparty/assert"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestIsHidden(t *testing.T) {
|
func TestIsHidden(t *testing.T) {
|
||||||
assert.True(IsHidden("bar/.git"), t, "dirs beginning with . should be recognized as hidden")
|
require.True(t, IsHidden("bar/.git"), "dirs beginning with . should be recognized as hidden")
|
||||||
assert.False(IsHidden("."), t, ". for current dir should not be considered hidden")
|
require.False(t, IsHidden("."), ". for current dir should not be considered hidden")
|
||||||
assert.False(IsHidden("bar/baz"), t, "normal dirs should not be hidden")
|
require.False(t, IsHidden("bar/baz"), "normal dirs should not be hidden")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,9 +5,8 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ipfs/kubo/thirdparty/assert"
|
|
||||||
|
|
||||||
protocol "github.com/libp2p/go-libp2p/core/protocol"
|
protocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TestCase struct {
|
type TestCase struct {
|
||||||
@ -29,12 +28,10 @@ func TestParseRequest(t *testing.T) {
|
|||||||
req, _ := http.NewRequest(http.MethodGet, url, strings.NewReader(""))
|
req, _ := http.NewRequest(http.MethodGet, url, strings.NewReader(""))
|
||||||
|
|
||||||
parsed, err := parseRequest(req)
|
parsed, err := parseRequest(req)
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
t.Fatal(err)
|
require.Equal(t, tc.path, parsed.httpPath, "proxy request path")
|
||||||
}
|
require.Equal(t, protocol.ID(tc.name), parsed.name, "proxy request name")
|
||||||
assert.True(parsed.httpPath == tc.path, t, "proxy request path")
|
require.Equal(t, tc.target, parsed.target, "proxy request peer-id")
|
||||||
assert.True(parsed.name == protocol.ID(tc.name), t, "proxy request name")
|
|
||||||
assert.True(parsed.target == tc.target, t, "proxy request peer-id")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,8 +46,6 @@ func TestParseRequestInvalidPath(t *testing.T) {
|
|||||||
req, _ := http.NewRequest(http.MethodGet, url, strings.NewReader(""))
|
req, _ := http.NewRequest(http.MethodGet, url, strings.NewReader(""))
|
||||||
|
|
||||||
_, err := parseRequest(req)
|
_, err := parseRequest(req)
|
||||||
if err == nil {
|
require.Error(t, err)
|
||||||
t.Fail()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -11,6 +11,7 @@ This release was brought to you by the [Interplanetary Shipyard](https://ipship
|
|||||||
- [Overview](#overview)
|
- [Overview](#overview)
|
||||||
- [🔦 Highlights](#-highlights)
|
- [🔦 Highlights](#-highlights)
|
||||||
- [Clear provide queue when reprovide strategy changes](#clear-provide-queue-when-reprovide-strategy-changes)
|
- [Clear provide queue when reprovide strategy changes](#clear-provide-queue-when-reprovide-strategy-changes)
|
||||||
|
- [Remove unnecessary packages from thirdparty](#remove-unnecessary-packages-from-thirdparty)
|
||||||
- [📦️ Important dependency updates](#-important-dependency-updates)
|
- [📦️ Important dependency updates](#-important-dependency-updates)
|
||||||
- [📝 Changelog](#-changelog)
|
- [📝 Changelog](#-changelog)
|
||||||
- [👨👩👧👦 Contributors](#-contributors)
|
- [👨👩👧👦 Contributors](#-contributors)
|
||||||
@ -30,6 +31,14 @@ A new `ipfs provide clear` command also allows manual queue clearing for debuggi
|
|||||||
> [!NOTE]
|
> [!NOTE]
|
||||||
> Upgrading to Kubo 0.37 will automatically clear any preexisting provide queue. The next time `Reprovider.Interval` hits, `Reprovider.Strategy` will be executed on a clean slate, ensuring consistent behavior with your current configuration.
|
> Upgrading to Kubo 0.37 will automatically clear any preexisting provide queue. The next time `Reprovider.Interval` hits, `Reprovider.Strategy` will be executed on a clean slate, ensuring consistent behavior with your current configuration.
|
||||||
|
|
||||||
|
#### Remove unnecessary packages from thirdparty
|
||||||
|
|
||||||
|
Removed unnecessary packages from the `thirdparty` area of kubo repositroy.
|
||||||
|
|
||||||
|
- Removed `thirdparty/assert` (replaced by `github.com/stretchr/testify/require`)
|
||||||
|
- Removed `thirdparty/dir` (replaced by `misc/fsutil)`
|
||||||
|
- Removed `thirdparty/notifier` (unused)
|
||||||
|
|
||||||
#### 📦️ Important dependency updates
|
#### 📦️ Important dependency updates
|
||||||
|
|
||||||
- update `boxo` to [v0.34.0](https://github.com/ipfs/boxo/releases/tag/v0.34.0)
|
- update `boxo` to [v0.34.0](https://github.com/ipfs/boxo/releases/tag/v0.34.0)
|
||||||
|
|||||||
@ -3,7 +3,7 @@ package common
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ipfs/kubo/thirdparty/assert"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMapMergeDeepReturnsNew(t *testing.T) {
|
func TestMapMergeDeepReturnsNew(t *testing.T) {
|
||||||
@ -15,7 +15,7 @@ func TestMapMergeDeepReturnsNew(t *testing.T) {
|
|||||||
|
|
||||||
MapMergeDeep(leftMap, rightMap)
|
MapMergeDeep(leftMap, rightMap)
|
||||||
|
|
||||||
assert.True(leftMap["A"] == "Hello World", t, "MapMergeDeep should return a new map instance")
|
require.Equal(t, "Hello World", leftMap["A"], "MapMergeDeep should return a new map instance")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapMergeDeepNewKey(t *testing.T) {
|
func TestMapMergeDeepNewKey(t *testing.T) {
|
||||||
@ -46,7 +46,7 @@ func TestMapMergeDeepNewKey(t *testing.T) {
|
|||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
assert.True(result["B"] == "Bar", t, "New keys in right map should exist in resulting map")
|
require.Equal(t, "Bar", result["B"], "New keys in right map should exist in resulting map")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapMergeDeepRecursesOnMaps(t *testing.T) {
|
func TestMapMergeDeepRecursesOnMaps(t *testing.T) {
|
||||||
@ -92,8 +92,8 @@ func TestMapMergeDeepRecursesOnMaps(t *testing.T) {
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
resultA := result["A"].(map[string]interface{})
|
resultA := result["A"].(map[string]interface{})
|
||||||
assert.True(resultA["B"] == "A value!", t, "Unaltered values should not change")
|
require.Equal(t, "A value!", resultA["B"], "Unaltered values should not change")
|
||||||
assert.True(resultA["C"] == "A different value!", t, "Nested values should be altered")
|
require.Equal(t, "A different value!", resultA["C"], "Nested values should be altered")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapMergeDeepRightNotAMap(t *testing.T) {
|
func TestMapMergeDeepRightNotAMap(t *testing.T) {
|
||||||
@ -128,5 +128,5 @@ func TestMapMergeDeepRightNotAMap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
assert.True(result["A"] == "Not a map!", t, "Right values that are not a map should be set on the result")
|
require.Equal(t, "Not a map!", result["A"], "Right values that are not a map should be set on the result")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,7 +16,6 @@ import (
|
|||||||
keystore "github.com/ipfs/boxo/keystore"
|
keystore "github.com/ipfs/boxo/keystore"
|
||||||
repo "github.com/ipfs/kubo/repo"
|
repo "github.com/ipfs/kubo/repo"
|
||||||
"github.com/ipfs/kubo/repo/common"
|
"github.com/ipfs/kubo/repo/common"
|
||||||
dir "github.com/ipfs/kubo/thirdparty/dir"
|
|
||||||
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
|
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
|
||||||
|
|
||||||
ds "github.com/ipfs/go-datastore"
|
ds "github.com/ipfs/go-datastore"
|
||||||
@ -192,7 +191,7 @@ func open(repoPath string, userConfigFilePath string) (repo.Repo, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check repo path, then check all constituent parts.
|
// check repo path, then check all constituent parts.
|
||||||
if err := dir.Writable(r.path); err != nil {
|
if err := fsutil.DirWritable(r.path); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -7,17 +7,16 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ipfs/kubo/thirdparty/assert"
|
|
||||||
|
|
||||||
datastore "github.com/ipfs/go-datastore"
|
datastore "github.com/ipfs/go-datastore"
|
||||||
config "github.com/ipfs/kubo/config"
|
config "github.com/ipfs/kubo/config"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestInitIdempotence(t *testing.T) {
|
func TestInitIdempotence(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
path := t.TempDir()
|
path := t.TempDir()
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
assert.Nil(Init(path, &config.Config{Datastore: config.DefaultDatastoreConfig()}), t, "multiple calls to init should succeed")
|
require.NoError(t, Init(path, &config.Config{Datastore: config.DefaultDatastoreConfig()}), "multiple calls to init should succeed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -32,78 +31,78 @@ func TestCanManageReposIndependently(t *testing.T) {
|
|||||||
pathB := t.TempDir()
|
pathB := t.TempDir()
|
||||||
|
|
||||||
t.Log("initialize two repos")
|
t.Log("initialize two repos")
|
||||||
assert.Nil(Init(pathA, &config.Config{Datastore: config.DefaultDatastoreConfig()}), t, "a", "should initialize successfully")
|
require.NoError(t, Init(pathA, &config.Config{Datastore: config.DefaultDatastoreConfig()}), "a", "should initialize successfully")
|
||||||
assert.Nil(Init(pathB, &config.Config{Datastore: config.DefaultDatastoreConfig()}), t, "b", "should initialize successfully")
|
require.NoError(t, Init(pathB, &config.Config{Datastore: config.DefaultDatastoreConfig()}), "b", "should initialize successfully")
|
||||||
|
|
||||||
t.Log("ensure repos initialized")
|
t.Log("ensure repos initialized")
|
||||||
assert.True(IsInitialized(pathA), t, "a should be initialized")
|
require.True(t, IsInitialized(pathA), "a should be initialized")
|
||||||
assert.True(IsInitialized(pathB), t, "b should be initialized")
|
require.True(t, IsInitialized(pathB), "b should be initialized")
|
||||||
|
|
||||||
t.Log("open the two repos")
|
t.Log("open the two repos")
|
||||||
repoA, err := Open(pathA)
|
repoA, err := Open(pathA)
|
||||||
assert.Nil(err, t, "a")
|
require.NoError(t, err, "a")
|
||||||
repoB, err := Open(pathB)
|
repoB, err := Open(pathB)
|
||||||
assert.Nil(err, t, "b")
|
require.NoError(t, err, "b")
|
||||||
|
|
||||||
t.Log("close and remove b while a is open")
|
t.Log("close and remove b while a is open")
|
||||||
assert.Nil(repoB.Close(), t, "close b")
|
require.NoError(t, repoB.Close(), "close b")
|
||||||
assert.Nil(Remove(pathB), t, "remove b")
|
require.NoError(t, Remove(pathB), "remove b")
|
||||||
|
|
||||||
t.Log("close and remove a")
|
t.Log("close and remove a")
|
||||||
assert.Nil(repoA.Close(), t)
|
require.NoError(t, repoA.Close())
|
||||||
assert.Nil(Remove(pathA), t)
|
require.NoError(t, Remove(pathA))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDatastoreGetNotAllowedAfterClose(t *testing.T) {
|
func TestDatastoreGetNotAllowedAfterClose(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
path := t.TempDir()
|
path := t.TempDir()
|
||||||
|
|
||||||
assert.True(!IsInitialized(path), t, "should NOT be initialized")
|
require.False(t, IsInitialized(path), "should NOT be initialized")
|
||||||
assert.Nil(Init(path, &config.Config{Datastore: config.DefaultDatastoreConfig()}), t, "should initialize successfully")
|
require.NoError(t, Init(path, &config.Config{Datastore: config.DefaultDatastoreConfig()}), "should initialize successfully")
|
||||||
r, err := Open(path)
|
r, err := Open(path)
|
||||||
assert.Nil(err, t, "should open successfully")
|
require.NoError(t, err, "should open successfully")
|
||||||
|
|
||||||
k := "key"
|
k := "key"
|
||||||
data := []byte(k)
|
data := []byte(k)
|
||||||
assert.Nil(r.Datastore().Put(context.Background(), datastore.NewKey(k), data), t, "Put should be successful")
|
require.NoError(t, r.Datastore().Put(context.Background(), datastore.NewKey(k), data), "Put should be successful")
|
||||||
|
|
||||||
assert.Nil(r.Close(), t)
|
require.NoError(t, r.Close())
|
||||||
_, err = r.Datastore().Get(context.Background(), datastore.NewKey(k))
|
_, err = r.Datastore().Get(context.Background(), datastore.NewKey(k))
|
||||||
assert.Err(err, t, "after closer, Get should be fail")
|
require.Error(t, err, "after closer, Get should be fail")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDatastorePersistsFromRepoToRepo(t *testing.T) {
|
func TestDatastorePersistsFromRepoToRepo(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
path := t.TempDir()
|
path := t.TempDir()
|
||||||
|
|
||||||
assert.Nil(Init(path, &config.Config{Datastore: config.DefaultDatastoreConfig()}), t)
|
require.NoError(t, Init(path, &config.Config{Datastore: config.DefaultDatastoreConfig()}))
|
||||||
r1, err := Open(path)
|
r1, err := Open(path)
|
||||||
assert.Nil(err, t)
|
require.NoError(t, err)
|
||||||
|
|
||||||
k := "key"
|
k := "key"
|
||||||
expected := []byte(k)
|
expected := []byte(k)
|
||||||
assert.Nil(r1.Datastore().Put(context.Background(), datastore.NewKey(k), expected), t, "using first repo, Put should be successful")
|
require.NoError(t, r1.Datastore().Put(context.Background(), datastore.NewKey(k), expected), "using first repo, Put should be successful")
|
||||||
assert.Nil(r1.Close(), t)
|
require.NoError(t, r1.Close())
|
||||||
|
|
||||||
r2, err := Open(path)
|
r2, err := Open(path)
|
||||||
assert.Nil(err, t)
|
require.NoError(t, err)
|
||||||
actual, err := r2.Datastore().Get(context.Background(), datastore.NewKey(k))
|
actual, err := r2.Datastore().Get(context.Background(), datastore.NewKey(k))
|
||||||
assert.Nil(err, t, "using second repo, Get should be successful")
|
require.NoError(t, err, "using second repo, Get should be successful")
|
||||||
assert.Nil(r2.Close(), t)
|
require.NoError(t, r2.Close())
|
||||||
assert.True(bytes.Equal(expected, actual), t, "data should match")
|
require.True(t, bytes.Equal(expected, actual), "data should match")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOpenMoreThanOnceInSameProcess(t *testing.T) {
|
func TestOpenMoreThanOnceInSameProcess(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
path := t.TempDir()
|
path := t.TempDir()
|
||||||
assert.Nil(Init(path, &config.Config{Datastore: config.DefaultDatastoreConfig()}), t)
|
require.NoError(t, Init(path, &config.Config{Datastore: config.DefaultDatastoreConfig()}))
|
||||||
|
|
||||||
r1, err := Open(path)
|
r1, err := Open(path)
|
||||||
assert.Nil(err, t, "first repo should open successfully")
|
require.NoError(t, err, "first repo should open successfully")
|
||||||
r2, err := Open(path)
|
r2, err := Open(path)
|
||||||
assert.Nil(err, t, "second repo should open successfully")
|
require.NoError(t, err, "second repo should open successfully")
|
||||||
assert.True(r1 == r2, t, "second open returns same value")
|
require.Equal(t, r1, r2, "second open returns same value")
|
||||||
|
|
||||||
assert.Nil(r1.Close(), t)
|
require.NoError(t, r1.Close())
|
||||||
assert.Nil(r2.Close(), t)
|
require.NoError(t, r2.Close())
|
||||||
}
|
}
|
||||||
|
|||||||
5
thirdparty/README.md
vendored
5
thirdparty/README.md
vendored
@ -1,5 +1,2 @@
|
|||||||
thirdparty consists of Golang packages that contain no go-ipfs dependencies and
|
|
||||||
may be vendored ipfs/go-ipfs at a later date.
|
|
||||||
|
|
||||||
packages under this directory _must not_ import packages under
|
packages under this directory _must not_ import packages under
|
||||||
`ipfs/go-ipfs` that are not also under `thirdparty`.
|
`ipfs/kubo` that are not also under `thirdparty`.
|
||||||
|
|||||||
25
thirdparty/assert/assert.go
vendored
25
thirdparty/assert/assert.go
vendored
@ -1,25 +0,0 @@
|
|||||||
package assert
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
func Nil(err error, t *testing.T, msgs ...string) {
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(msgs, "error:", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func True(v bool, t *testing.T, msgs ...string) {
|
|
||||||
if !v {
|
|
||||||
t.Fatal(msgs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func False(v bool, t *testing.T, msgs ...string) {
|
|
||||||
True(!v, t, msgs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Err(err error, t *testing.T, msgs ...string) {
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal(msgs, "error:", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
25
thirdparty/dir/dir.go
vendored
25
thirdparty/dir/dir.go
vendored
@ -1,25 +0,0 @@
|
|||||||
package dir
|
|
||||||
|
|
||||||
// TODO move somewhere generic
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Writable ensures the directory exists and is writable.
|
|
||||||
func Writable(path string) error {
|
|
||||||
// Construct the path if missing
|
|
||||||
if err := os.MkdirAll(path, os.ModePerm); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Check the directory is writable
|
|
||||||
if f, err := os.Create(filepath.Join(path, "._check_writable")); err == nil {
|
|
||||||
f.Close()
|
|
||||||
os.Remove(f.Name())
|
|
||||||
} else {
|
|
||||||
return errors.New("'" + path + "' is not writable")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
142
thirdparty/notifier/notifier.go
vendored
142
thirdparty/notifier/notifier.go
vendored
@ -1,142 +0,0 @@
|
|||||||
// Package notifier provides a simple notification dispatcher
|
|
||||||
// meant to be embedded in larger structures who wish to allow
|
|
||||||
// clients to sign up for event notifications.
|
|
||||||
package notifier
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
process "github.com/jbenet/goprocess"
|
|
||||||
ratelimit "github.com/jbenet/goprocess/ratelimit"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Notifiee is a generic interface. Clients implement
|
|
||||||
// their own Notifiee interfaces to ensure type-safety
|
|
||||||
// of notifications:
|
|
||||||
//
|
|
||||||
// type RocketNotifiee interface{
|
|
||||||
// Countdown(r Rocket, countdown time.Duration)
|
|
||||||
// LiftedOff(Rocket)
|
|
||||||
// ReachedOrbit(Rocket)
|
|
||||||
// Detached(Rocket, Capsule)
|
|
||||||
// Landed(Rocket)
|
|
||||||
// }
|
|
||||||
type Notifiee interface{}
|
|
||||||
|
|
||||||
// Notifier is a notification dispatcher. It's meant
|
|
||||||
// to be composed, and its zero-value is ready to be used.
|
|
||||||
//
|
|
||||||
// type Rocket struct {
|
|
||||||
// notifier notifier.Notifier
|
|
||||||
// }
|
|
||||||
type Notifier struct {
|
|
||||||
mu sync.RWMutex // guards notifiees
|
|
||||||
nots map[Notifiee]struct{}
|
|
||||||
lim *ratelimit.RateLimiter
|
|
||||||
}
|
|
||||||
|
|
||||||
// RateLimited returns a rate limited Notifier. only limit goroutines
|
|
||||||
// will be spawned. If limit is zero, no rate limiting happens. This
|
|
||||||
// is the same as `Notifier{}`.
|
|
||||||
func RateLimited(limit int) *Notifier {
|
|
||||||
n := &Notifier{}
|
|
||||||
if limit > 0 {
|
|
||||||
n.lim = ratelimit.NewRateLimiter(process.Background(), limit)
|
|
||||||
}
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
// Notify signs up Notifiee e for notifications. This function
|
|
||||||
// is meant to be called behind your own type-safe function(s):
|
|
||||||
//
|
|
||||||
// // generic function for pattern-following
|
|
||||||
// func (r *Rocket) Notify(n Notifiee) {
|
|
||||||
// r.notifier.Notify(n)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // or as part of other functions
|
|
||||||
// func (r *Rocket) Onboard(a Astronaut) {
|
|
||||||
// r.astronauts = append(r.austronauts, a)
|
|
||||||
// r.notifier.Notify(a)
|
|
||||||
// }
|
|
||||||
func (n *Notifier) Notify(e Notifiee) {
|
|
||||||
n.mu.Lock()
|
|
||||||
if n.nots == nil { // so that zero-value is ready to be used.
|
|
||||||
n.nots = make(map[Notifiee]struct{})
|
|
||||||
}
|
|
||||||
n.nots[e] = struct{}{}
|
|
||||||
n.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// StopNotify stops notifying Notifiee e. This function
|
|
||||||
// is meant to be called behind your own type-safe function(s):
|
|
||||||
//
|
|
||||||
// // generic function for pattern-following
|
|
||||||
// func (r *Rocket) StopNotify(n Notifiee) {
|
|
||||||
// r.notifier.StopNotify(n)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // or as part of other functions
|
|
||||||
// func (r *Rocket) Detach(c Capsule) {
|
|
||||||
// r.notifier.StopNotify(c)
|
|
||||||
// r.capsule = nil
|
|
||||||
// }
|
|
||||||
func (n *Notifier) StopNotify(e Notifiee) {
|
|
||||||
n.mu.Lock()
|
|
||||||
if n.nots != nil { // so that zero-value is ready to be used.
|
|
||||||
delete(n.nots, e)
|
|
||||||
}
|
|
||||||
n.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// NotifyAll messages the notifier's notifiees with a given notification.
|
|
||||||
// This is done by calling the given function with each notifiee. It is
|
|
||||||
// meant to be called with your own type-safe notification functions:
|
|
||||||
//
|
|
||||||
// func (r *Rocket) Launch() {
|
|
||||||
// r.notifyAll(func(n Notifiee) {
|
|
||||||
// n.Launched(r)
|
|
||||||
// })
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // make it private so only you can use it. This function is necessary
|
|
||||||
// // to make sure you only up-cast in one place. You control who you added
|
|
||||||
// // to be a notifiee. If Go adds generics, maybe we can get rid of this
|
|
||||||
// // method but for now it is like wrapping a type-less container with
|
|
||||||
// // a type safe interface.
|
|
||||||
// func (r *Rocket) notifyAll(notify func(Notifiee)) {
|
|
||||||
// r.notifier.NotifyAll(func(n notifier.Notifiee) {
|
|
||||||
// notify(n.(Notifiee))
|
|
||||||
// })
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// Note well: each notification is launched in its own goroutine, so they
|
|
||||||
// can be processed concurrently, and so that whatever the notification does
|
|
||||||
// it _never_ blocks out the client. This is so that consumers _cannot_ add
|
|
||||||
// hooks into your object that block you accidentally.
|
|
||||||
func (n *Notifier) NotifyAll(notify func(Notifiee)) {
|
|
||||||
n.mu.Lock()
|
|
||||||
defer n.mu.Unlock()
|
|
||||||
|
|
||||||
if n.nots == nil { // so that zero-value is ready to be used.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// no rate limiting.
|
|
||||||
if n.lim == nil {
|
|
||||||
for notifiee := range n.nots {
|
|
||||||
go notify(notifiee)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// with rate limiting.
|
|
||||||
n.lim.Go(func(worker process.Process) {
|
|
||||||
for notifiee := range n.nots {
|
|
||||||
notifiee := notifiee // rebind for loop data races
|
|
||||||
n.lim.LimitedGo(func(worker process.Process) {
|
|
||||||
notify(notifiee)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
289
thirdparty/notifier/notifier_test.go
vendored
289
thirdparty/notifier/notifier_test.go
vendored
@ -1,289 +0,0 @@
|
|||||||
package notifier
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// test data structures.
|
|
||||||
type Router struct {
|
|
||||||
queue chan Packet
|
|
||||||
notifier Notifier
|
|
||||||
}
|
|
||||||
|
|
||||||
type Packet struct{}
|
|
||||||
|
|
||||||
type RouterNotifiee interface {
|
|
||||||
Enqueued(*Router, Packet)
|
|
||||||
Forwarded(*Router, Packet)
|
|
||||||
Dropped(*Router, Packet)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Router) Notify(n RouterNotifiee) {
|
|
||||||
r.notifier.Notify(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Router) StopNotify(n RouterNotifiee) {
|
|
||||||
r.notifier.StopNotify(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Router) notifyAll(notify func(n RouterNotifiee)) {
|
|
||||||
r.notifier.NotifyAll(func(n Notifiee) {
|
|
||||||
notify(n.(RouterNotifiee))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Router) Receive(p Packet) {
|
|
||||||
select {
|
|
||||||
case r.queue <- p: // enqueued
|
|
||||||
r.notifyAll(func(n RouterNotifiee) {
|
|
||||||
n.Enqueued(r, p)
|
|
||||||
})
|
|
||||||
|
|
||||||
default: // drop
|
|
||||||
r.notifyAll(func(n RouterNotifiee) {
|
|
||||||
n.Dropped(r, p)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Router) Forward() {
|
|
||||||
p := <-r.queue
|
|
||||||
r.notifyAll(func(n RouterNotifiee) {
|
|
||||||
n.Forwarded(r, p)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
type Metrics struct {
|
|
||||||
enqueued int
|
|
||||||
forwarded int
|
|
||||||
dropped int
|
|
||||||
received chan struct{}
|
|
||||||
sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Metrics) Enqueued(*Router, Packet) {
|
|
||||||
m.Lock()
|
|
||||||
m.enqueued++
|
|
||||||
m.Unlock()
|
|
||||||
if m.received != nil {
|
|
||||||
m.received <- struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Metrics) Forwarded(*Router, Packet) {
|
|
||||||
m.Lock()
|
|
||||||
m.forwarded++
|
|
||||||
m.Unlock()
|
|
||||||
if m.received != nil {
|
|
||||||
m.received <- struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Metrics) Dropped(*Router, Packet) {
|
|
||||||
m.Lock()
|
|
||||||
m.dropped++
|
|
||||||
m.Unlock()
|
|
||||||
if m.received != nil {
|
|
||||||
m.received <- struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Metrics) String() string {
|
|
||||||
m.Lock()
|
|
||||||
defer m.Unlock()
|
|
||||||
return fmt.Sprintf("%d enqueued, %d forwarded, %d in queue, %d dropped",
|
|
||||||
m.enqueued, m.forwarded, m.enqueued-m.forwarded, m.dropped)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNotifies(t *testing.T) {
|
|
||||||
m := Metrics{received: make(chan struct{})}
|
|
||||||
r := Router{queue: make(chan Packet, 10)}
|
|
||||||
r.Notify(&m)
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
r.Receive(Packet{})
|
|
||||||
<-m.received
|
|
||||||
if m.enqueued != (1 + i) {
|
|
||||||
t.Error("not notifying correctly", m.enqueued, 1+i)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
r.Receive(Packet{})
|
|
||||||
<-m.received
|
|
||||||
if m.enqueued != 10 {
|
|
||||||
t.Error("not notifying correctly", m.enqueued, 10)
|
|
||||||
}
|
|
||||||
if m.dropped != (1 + i) {
|
|
||||||
t.Error("not notifying correctly", m.dropped, 1+i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStopsNotifying(t *testing.T) {
|
|
||||||
m := Metrics{received: make(chan struct{})}
|
|
||||||
r := Router{queue: make(chan Packet, 10)}
|
|
||||||
r.Notify(&m)
|
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
|
||||||
r.Receive(Packet{})
|
|
||||||
<-m.received
|
|
||||||
if m.enqueued != (1 + i) {
|
|
||||||
t.Error("not notifying correctly")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
r.StopNotify(&m)
|
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
|
||||||
r.Receive(Packet{})
|
|
||||||
select {
|
|
||||||
case <-m.received:
|
|
||||||
t.Error("did not stop notifying")
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
if m.enqueued != 5 {
|
|
||||||
t.Error("did not stop notifying")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestThreadsafe(t *testing.T) {
|
|
||||||
N := 1000
|
|
||||||
r := Router{queue: make(chan Packet, 10)}
|
|
||||||
m1 := Metrics{received: make(chan struct{})}
|
|
||||||
m2 := Metrics{received: make(chan struct{})}
|
|
||||||
m3 := Metrics{received: make(chan struct{})}
|
|
||||||
r.Notify(&m1)
|
|
||||||
r.Notify(&m2)
|
|
||||||
r.Notify(&m3)
|
|
||||||
|
|
||||||
var n int
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
for i := 0; i < N; i++ {
|
|
||||||
n++
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
r.Receive(Packet{})
|
|
||||||
}()
|
|
||||||
|
|
||||||
if i%3 == 0 {
|
|
||||||
n++
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
r.Forward()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// drain queues
|
|
||||||
for i := 0; i < (n * 3); i++ {
|
|
||||||
select {
|
|
||||||
case <-m1.received:
|
|
||||||
case <-m2.received:
|
|
||||||
case <-m3.received:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
// counts should be correct and all agree. and this should
|
|
||||||
// run fine under `go test -race -cpu=5`
|
|
||||||
|
|
||||||
t.Log("m1", m1.String())
|
|
||||||
t.Log("m2", m2.String())
|
|
||||||
t.Log("m3", m3.String())
|
|
||||||
|
|
||||||
if m1.String() != m2.String() || m2.String() != m3.String() {
|
|
||||||
t.Error("counts disagree")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type highwatermark struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
mark int
|
|
||||||
limit int
|
|
||||||
errs chan error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *highwatermark) incr() {
|
|
||||||
m.mu.Lock()
|
|
||||||
m.mark++
|
|
||||||
// fmt.Println("incr", m.mark)
|
|
||||||
if m.mark > m.limit {
|
|
||||||
m.errs <- fmt.Errorf("went over limit: %d/%d", m.mark, m.limit)
|
|
||||||
}
|
|
||||||
m.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *highwatermark) decr() {
|
|
||||||
m.mu.Lock()
|
|
||||||
m.mark--
|
|
||||||
// fmt.Println("decr", m.mark)
|
|
||||||
if m.mark < 0 {
|
|
||||||
m.errs <- fmt.Errorf("went under zero: %d/%d", m.mark, m.limit)
|
|
||||||
}
|
|
||||||
m.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLimited(t *testing.T) {
|
|
||||||
timeout := 10 * time.Second // huge timeout.
|
|
||||||
limit := 9
|
|
||||||
|
|
||||||
hwm := highwatermark{limit: limit, errs: make(chan error, 100)}
|
|
||||||
n := RateLimited(limit) // will stop after 3 rounds
|
|
||||||
n.Notify(1)
|
|
||||||
n.Notify(2)
|
|
||||||
n.Notify(3)
|
|
||||||
|
|
||||||
entr := make(chan struct{})
|
|
||||||
exit := make(chan struct{})
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
// fmt.Printf("round: %d\n", i)
|
|
||||||
n.NotifyAll(func(e Notifiee) {
|
|
||||||
hwm.incr()
|
|
||||||
entr <- struct{}{}
|
|
||||||
<-exit // wait
|
|
||||||
hwm.decr()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
done <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for i := 0; i < 30; {
|
|
||||||
select {
|
|
||||||
case <-entr:
|
|
||||||
continue // let as many enter as possible
|
|
||||||
case <-time.After(1 * time.Millisecond):
|
|
||||||
}
|
|
||||||
|
|
||||||
// let one exit
|
|
||||||
select {
|
|
||||||
case <-entr:
|
|
||||||
continue // in case of timing issues.
|
|
||||||
case exit <- struct{}{}:
|
|
||||||
case <-time.After(timeout):
|
|
||||||
t.Error("got stuck")
|
|
||||||
}
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done: // two parts done
|
|
||||||
case <-time.After(timeout):
|
|
||||||
t.Error("did not finish")
|
|
||||||
}
|
|
||||||
|
|
||||||
close(hwm.errs)
|
|
||||||
for err := range hwm.errs {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue
Block a user