mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-24 11:57:44 +08:00
Merge pull request #711 from jbenet/fix-hanging-notifs
misc fixes: hanging connects + test output
This commit is contained in:
commit
d0f60432d8
@ -1,8 +1,8 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
cmds "github.com/jbenet/go-ipfs/commands"
|
||||
@ -51,9 +51,8 @@ the daemon.
|
||||
}
|
||||
|
||||
func daemonFunc(req cmds.Request, res cmds.Response) {
|
||||
var out bytes.Buffer
|
||||
res.SetOutput(&out)
|
||||
writef(&out, "Initializing daemon...\n")
|
||||
// let the user know we're going.
|
||||
fmt.Printf("Initializing daemon...\n")
|
||||
|
||||
// first, whether user has provided the initialization flag. we may be
|
||||
// running in an uninitialized state.
|
||||
@ -70,7 +69,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
|
||||
// `IsInitialized` where the quality of the signal can be improved over
|
||||
// time, and many call-sites can benefit.
|
||||
if !util.FileExists(req.Context().ConfigRoot) {
|
||||
err := initWithDefaults(&out, req.Context().ConfigRoot)
|
||||
err := initWithDefaults(os.Stdout, req.Context().ConfigRoot)
|
||||
if err != nil {
|
||||
res.SetError(debugerror.Wrap(err), cmds.ErrNormal)
|
||||
return
|
||||
@ -155,8 +154,8 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
}
|
||||
writef(&out, "IPFS mounted at: %s\n", fsdir)
|
||||
writef(&out, "IPNS mounted at: %s\n", nsdir)
|
||||
fmt.Printf("IPFS mounted at: %s\n", fsdir)
|
||||
fmt.Printf("IPNS mounted at: %s\n", nsdir)
|
||||
}
|
||||
|
||||
var rootRedirect corehttp.ServeOption
|
||||
@ -173,10 +172,6 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
|
||||
writable = cfg.Gateway.Writable
|
||||
}
|
||||
|
||||
if writable {
|
||||
fmt.Printf("IPNS gateway mounted read-write\n")
|
||||
}
|
||||
|
||||
if gatewayMaddr != nil {
|
||||
go func() {
|
||||
var opts = []corehttp.ServeOption{corehttp.GatewayOption(writable)}
|
||||
@ -184,6 +179,9 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
|
||||
opts = append(opts, rootRedirect)
|
||||
}
|
||||
fmt.Printf("Gateway server listening on %s\n", gatewayMaddr)
|
||||
if writable {
|
||||
fmt.Printf("Gateway server is writable\n")
|
||||
}
|
||||
err := corehttp.ListenAndServe(node, gatewayMaddr.String(), opts...)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
|
||||
@ -18,6 +18,7 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
|
||||
select {
|
||||
case <-dht.Closing():
|
||||
return
|
||||
default:
|
||||
}
|
||||
dht.Update(dht.Context(), v.RemotePeer())
|
||||
}
|
||||
@ -27,6 +28,7 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
|
||||
select {
|
||||
case <-dht.Closing():
|
||||
return
|
||||
default:
|
||||
}
|
||||
dht.routingTable.Remove(v.RemotePeer())
|
||||
}
|
||||
|
||||
@ -74,15 +74,16 @@ test_run_repeat_10_sec() {
|
||||
}
|
||||
|
||||
test_wait_output_n_lines_60_sec() {
|
||||
echo "$2" >expected_waitn
|
||||
for i in 1 2 3 4 5 6 7 8 9 10
|
||||
for i in 1 2 3 4 5 6
|
||||
do
|
||||
cat "$1" | wc -l | tr -d " " >actual_waitn
|
||||
test_cmp "expected_waitn" "actual_waitn" && return
|
||||
sleep 2
|
||||
for i in 1 2 3 4 5 6 7 8 9 10
|
||||
do
|
||||
test $(cat "$1" | wc -l | tr -d " ") -ge $2 && return
|
||||
sleep 1
|
||||
done
|
||||
done
|
||||
cat "$1" | wc -l | tr -d " " >actual_waitn
|
||||
test_cmp "expected_waitn" "actual_waitn"
|
||||
actual=$(cat "$1" | wc -l | tr -d " ")
|
||||
fsh "expected $2 lines of output. got $actual"
|
||||
}
|
||||
|
||||
test_wait_open_tcp_port_10_sec() {
|
||||
@ -130,6 +131,13 @@ test_config_ipfs_gateway_writable() {
|
||||
|
||||
test_launch_ipfs_daemon() {
|
||||
|
||||
ADDR_API="/ip4/127.0.0.1/tcp/5001"
|
||||
ADDR_GWAY=`ipfs config Addresses.Gateway`
|
||||
NLINES="2"
|
||||
if test "$ADDR_GWAY" != ""; then
|
||||
NLINES="3"
|
||||
fi
|
||||
|
||||
test_expect_success "'ipfs daemon' succeeds" '
|
||||
ipfs daemon >actual_daemon 2>daemon_err &
|
||||
'
|
||||
@ -138,19 +146,17 @@ test_launch_ipfs_daemon() {
|
||||
# and we make sure there are no errors
|
||||
test_expect_success "'ipfs daemon' is ready" '
|
||||
IPFS_PID=$! &&
|
||||
test_run_repeat_10_sec "cat actual_daemon | grep \"API server listening on\"" &&
|
||||
test_wait_output_n_lines_60_sec actual_daemon $NLINES &&
|
||||
printf "" >empty && test_cmp daemon_err empty ||
|
||||
fsh cat actual_daemon || fsh cat daemon_err
|
||||
'
|
||||
|
||||
ADDR_API="/ip4/127.0.0.1/tcp/5001"
|
||||
test_expect_success "'ipfs daemon' output includes API address" '
|
||||
cat actual_daemon | grep "API server listening on $ADDR_API" ||
|
||||
fsh cat actual_daemon ||
|
||||
fsh "cat actual_daemon | grep \"API server listening on $ADDR_API\""
|
||||
'
|
||||
|
||||
ADDR_GWAY=`ipfs config Addresses.Gateway`
|
||||
if test "$ADDR_GWAY" != ""; then
|
||||
test_expect_success "'ipfs daemon' output includes Gateway address" '
|
||||
cat actual_daemon | grep "Gateway server listening on $ADDR_GWAY" ||
|
||||
|
||||
26
thirdparty/notifier/notifier.go
vendored
26
thirdparty/notifier/notifier.go
vendored
@ -5,6 +5,9 @@ package notifier
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
ratelimit "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
|
||||
)
|
||||
|
||||
// Notifiee is a generic interface. Clients implement
|
||||
@ -31,6 +34,18 @@ type Notifiee interface{}
|
||||
type Notifier struct {
|
||||
mu sync.RWMutex // guards notifiees
|
||||
nots map[Notifiee]struct{}
|
||||
lim *ratelimit.RateLimiter
|
||||
}
|
||||
|
||||
// RateLimited returns a rate limited Notifier. only limit goroutines
|
||||
// will be spawned. If limit is zero, no rate limiting happens. This
|
||||
// is the same as `Notifier{}`.
|
||||
func RateLimited(limit int) Notifier {
|
||||
n := Notifier{}
|
||||
if limit > 0 {
|
||||
n.lim = ratelimit.NewRateLimiter(process.Background(), limit)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// Notify signs up Notifiee e for notifications. This function
|
||||
@ -107,8 +122,15 @@ func (n *Notifier) NotifyAll(notify func(Notifiee)) {
|
||||
n.mu.Lock()
|
||||
if n.nots != nil { // so that zero-value is ready to be used.
|
||||
for notifiee := range n.nots {
|
||||
go notify(notifiee)
|
||||
// TODO find a good way to rate limit this without blocking notifier.
|
||||
|
||||
if n.lim == nil { // no rate limit
|
||||
go notify(notifiee)
|
||||
} else {
|
||||
notifiee := notifiee // rebind for data races
|
||||
n.lim.LimitedGo(func(worker process.Process) {
|
||||
notify(notifiee)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
n.mu.Unlock()
|
||||
|
||||
84
thirdparty/notifier/notifier_test.go
vendored
84
thirdparty/notifier/notifier_test.go
vendored
@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// test data structures
|
||||
@ -205,3 +206,86 @@ func TestThreadsafe(t *testing.T) {
|
||||
t.Error("counts disagree")
|
||||
}
|
||||
}
|
||||
|
||||
type highwatermark struct {
|
||||
mu sync.Mutex
|
||||
mark int
|
||||
limit int
|
||||
errs chan error
|
||||
}
|
||||
|
||||
func (m *highwatermark) incr() {
|
||||
m.mu.Lock()
|
||||
m.mark++
|
||||
// fmt.Println("incr", m.mark)
|
||||
if m.mark > m.limit {
|
||||
m.errs <- fmt.Errorf("went over limit: %d/%d", m.mark, m.limit)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *highwatermark) decr() {
|
||||
m.mu.Lock()
|
||||
m.mark--
|
||||
// fmt.Println("decr", m.mark)
|
||||
if m.mark < 0 {
|
||||
m.errs <- fmt.Errorf("went under zero: %d/%d", m.mark, m.limit)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestLimited(t *testing.T) {
|
||||
timeout := 10 * time.Second // huge timeout.
|
||||
limit := 9
|
||||
|
||||
hwm := highwatermark{limit: limit, errs: make(chan error, 100)}
|
||||
n := RateLimited(limit) // will stop after 3 rounds
|
||||
n.Notify(1)
|
||||
n.Notify(2)
|
||||
n.Notify(3)
|
||||
|
||||
entr := make(chan struct{})
|
||||
exit := make(chan struct{})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for i := 0; i < 10; i++ {
|
||||
// fmt.Printf("round: %d\n", i)
|
||||
n.NotifyAll(func(e Notifiee) {
|
||||
hwm.incr()
|
||||
entr <- struct{}{}
|
||||
<-exit // wait
|
||||
hwm.decr()
|
||||
})
|
||||
}
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
for i := 0; i < 30; {
|
||||
select {
|
||||
case <-entr:
|
||||
continue // let as many enter as possible
|
||||
case <-time.After(1 * time.Millisecond):
|
||||
}
|
||||
|
||||
// let one exit
|
||||
select {
|
||||
case <-entr:
|
||||
continue // in case of timing issues.
|
||||
case exit <- struct{}{}:
|
||||
case <-time.After(timeout):
|
||||
t.Error("got stuck")
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
select {
|
||||
case <-done: // two parts done
|
||||
case <-time.After(timeout):
|
||||
t.Error("did not finish")
|
||||
}
|
||||
|
||||
close(hwm.errs)
|
||||
for err := range hwm.errs {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user