mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
refactor: simplify logic for MFS pinning (#10506)
This commit is contained in:
parent
836d51650d
commit
ca4f486781
@ -586,7 +586,7 @@ take effect.
|
||||
prometheus.MustRegister(&corehttp.IpfsNodeCollector{Node: node})
|
||||
|
||||
// start MFS pinning thread
|
||||
startPinMFS(daemonConfigPollInterval, cctx, &ipfsPinMFSNode{node})
|
||||
startPinMFS(cctx, daemonConfigPollInterval, &ipfsPinMFSNode{node})
|
||||
|
||||
// The daemon is *finally* ready.
|
||||
fmt.Printf("Daemon is ready\n")
|
||||
|
||||
@ -12,7 +12,7 @@ import (
|
||||
pinclient "github.com/ipfs/boxo/pinning/remote/client"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
logging "github.com/ipfs/go-log"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
|
||||
config "github.com/ipfs/kubo/config"
|
||||
"github.com/ipfs/kubo/core"
|
||||
@ -40,6 +40,7 @@ func init() {
|
||||
d, err := time.ParseDuration(pollDurStr)
|
||||
if err != nil {
|
||||
mfslog.Error("error parsing MFS_PIN_POLL_INTERVAL, using default:", err)
|
||||
return
|
||||
}
|
||||
daemonConfigPollInterval = d
|
||||
}
|
||||
@ -74,56 +75,28 @@ func (x *ipfsPinMFSNode) PeerHost() host.Host {
|
||||
return x.node.PeerHost
|
||||
}
|
||||
|
||||
func startPinMFS(configPollInterval time.Duration, cctx pinMFSContext, node pinMFSNode) {
|
||||
errCh := make(chan error)
|
||||
go pinMFSOnChange(configPollInterval, cctx, node, errCh)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case err, isOpen := <-errCh:
|
||||
if !isOpen {
|
||||
return
|
||||
}
|
||||
mfslog.Errorf("%v", err)
|
||||
case <-cctx.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
func startPinMFS(cctx pinMFSContext, configPollInterval time.Duration, node pinMFSNode) {
|
||||
go pinMFSOnChange(cctx, configPollInterval, node)
|
||||
}
|
||||
|
||||
func pinMFSOnChange(configPollInterval time.Duration, cctx pinMFSContext, node pinMFSNode, errCh chan<- error) {
|
||||
defer close(errCh)
|
||||
|
||||
var tmo *time.Timer
|
||||
defer func() {
|
||||
if tmo != nil {
|
||||
tmo.Stop()
|
||||
}
|
||||
}()
|
||||
func pinMFSOnChange(cctx pinMFSContext, configPollInterval time.Duration, node pinMFSNode) {
|
||||
tmo := time.NewTimer(configPollInterval)
|
||||
defer tmo.Stop()
|
||||
|
||||
lastPins := map[string]lastPin{}
|
||||
for {
|
||||
// polling sleep
|
||||
if tmo == nil {
|
||||
tmo = time.NewTimer(configPollInterval)
|
||||
} else {
|
||||
tmo.Reset(configPollInterval)
|
||||
}
|
||||
select {
|
||||
case <-cctx.Context().Done():
|
||||
return
|
||||
case <-tmo.C:
|
||||
tmo.Reset(configPollInterval)
|
||||
}
|
||||
|
||||
// reread the config, which may have changed in the meantime
|
||||
cfg, err := cctx.GetConfig()
|
||||
if err != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("pinning reading config (%v)", err):
|
||||
case <-cctx.Context().Done():
|
||||
return
|
||||
}
|
||||
mfslog.Errorf("pinning reading config (%v)", err)
|
||||
continue
|
||||
}
|
||||
mfslog.Debugf("pinning loop is awake, %d remote services", len(cfg.Pinning.RemoteServices))
|
||||
@ -131,30 +104,29 @@ func pinMFSOnChange(configPollInterval time.Duration, cctx pinMFSContext, node p
|
||||
// get the most recent MFS root cid
|
||||
rootNode, err := node.RootNode()
|
||||
if err != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("pinning reading MFS root (%v)", err):
|
||||
case <-cctx.Context().Done():
|
||||
return
|
||||
}
|
||||
mfslog.Errorf("pinning reading MFS root (%v)", err)
|
||||
continue
|
||||
}
|
||||
rootCid := rootNode.Cid()
|
||||
|
||||
// pin to all remote services in parallel
|
||||
pinAllMFS(cctx.Context(), node, cfg, rootCid, lastPins, errCh)
|
||||
pinAllMFS(cctx.Context(), node, cfg, rootNode.Cid(), lastPins)
|
||||
}
|
||||
}
|
||||
|
||||
// pinAllMFS pins on all remote services in parallel to overcome DoS attacks.
|
||||
func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid cid.Cid, lastPins map[string]lastPin, errCh chan<- error) {
|
||||
ch := make(chan lastPin, len(cfg.Pinning.RemoteServices))
|
||||
for svcName_, svcConfig_ := range cfg.Pinning.RemoteServices {
|
||||
func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid cid.Cid, lastPins map[string]lastPin) {
|
||||
ch := make(chan lastPin)
|
||||
var started int
|
||||
|
||||
for svcName, svcConfig := range cfg.Pinning.RemoteServices {
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
// skip services where MFS is not enabled
|
||||
svcName, svcConfig := svcName_, svcConfig_
|
||||
mfslog.Debugf("pinning MFS root considering service %q", svcName)
|
||||
if !svcConfig.Policies.MFS.Enable {
|
||||
mfslog.Debugf("pinning service %q is not enabled", svcName)
|
||||
ch <- lastPin{}
|
||||
continue
|
||||
}
|
||||
// read mfs pin interval for this service
|
||||
@ -165,11 +137,7 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid
|
||||
var err error
|
||||
repinInterval, err = time.ParseDuration(svcConfig.Policies.MFS.RepinInterval)
|
||||
if err != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("remote pinning service %q has invalid MFS.RepinInterval (%v)", svcName, err):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
ch <- lastPin{}
|
||||
mfslog.Errorf("remote pinning service %q has invalid MFS.RepinInterval (%v)", svcName, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@ -182,38 +150,30 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid
|
||||
} else {
|
||||
mfslog.Debugf("pinning MFS root to %q: skipped due to MFS.RepinInterval=%s (remaining: %s)", svcName, repinInterval.String(), (repinInterval - time.Since(last.Time)).String())
|
||||
}
|
||||
ch <- lastPin{}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
mfslog.Debugf("pinning MFS root %q to %q", rootCid, svcName)
|
||||
go func() {
|
||||
if r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig); err != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("pinning MFS root %q to %q (%v)", rootCid, svcName, err):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
ch <- lastPin{}
|
||||
} else {
|
||||
ch <- r
|
||||
go func(svcName string, svcConfig config.RemotePinningService) {
|
||||
r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig)
|
||||
if err != nil {
|
||||
mfslog.Errorf("pinning MFS root %q to %q (%v)", rootCid, svcName, err)
|
||||
}
|
||||
}()
|
||||
ch <- r
|
||||
}(svcName, svcConfig)
|
||||
started++
|
||||
}
|
||||
for i := 0; i < len(cfg.Pinning.RemoteServices); i++ {
|
||||
|
||||
// Collect results from all started goroutines.
|
||||
for i := 0; i < started; i++ {
|
||||
if x := <-ch; x.IsValid() {
|
||||
lastPins[x.ServiceName] = x
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func pinMFS(
|
||||
ctx context.Context,
|
||||
node pinMFSNode,
|
||||
cid cid.Cid,
|
||||
svcName string,
|
||||
svcConfig config.RemotePinningService,
|
||||
) (lastPin, error) {
|
||||
func pinMFS(ctx context.Context, node pinMFSNode, cid cid.Cid, svcName string, svcConfig config.RemotePinningService) (lastPin, error) {
|
||||
c := pinclient.NewClient(svcConfig.API.Endpoint, svcConfig.API.Key)
|
||||
|
||||
pinName := svcConfig.Policies.MFS.PinName
|
||||
@ -243,43 +203,46 @@ func pinMFS(
|
||||
}
|
||||
for range lsPinCh { // in case the prior loop exits early
|
||||
}
|
||||
if err := <-lsErrCh; err != nil {
|
||||
err := <-lsErrCh
|
||||
if err != nil {
|
||||
return lastPin{}, fmt.Errorf("error while listing remote pins: %v", err)
|
||||
}
|
||||
|
||||
// CID of the current MFS root is already being pinned, nothing to do
|
||||
if pinning {
|
||||
mfslog.Debugf("pinning MFS to %q: pin for %q exists since %s, skipping", svcName, cid, pinTime.String())
|
||||
return lastPin{Time: pinTime, ServiceName: svcName, ServiceConfig: svcConfig, CID: cid}, nil
|
||||
}
|
||||
if !pinning {
|
||||
// Prepare Pin.name
|
||||
addOpts := []pinclient.AddOption{pinclient.PinOpts.WithName(pinName)}
|
||||
|
||||
// Prepare Pin.name
|
||||
addOpts := []pinclient.AddOption{pinclient.PinOpts.WithName(pinName)}
|
||||
|
||||
// Prepare Pin.origins
|
||||
// Add own multiaddrs to the 'origins' array, so Pinning Service can
|
||||
// use that as a hint and connect back to us (if possible)
|
||||
if node.PeerHost() != nil {
|
||||
addrs, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(node.PeerHost()))
|
||||
if err != nil {
|
||||
return lastPin{}, err
|
||||
// Prepare Pin.origins
|
||||
// Add own multiaddrs to the 'origins' array, so Pinning Service can
|
||||
// use that as a hint and connect back to us (if possible)
|
||||
if node.PeerHost() != nil {
|
||||
addrs, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(node.PeerHost()))
|
||||
if err != nil {
|
||||
return lastPin{}, err
|
||||
}
|
||||
addOpts = append(addOpts, pinclient.PinOpts.WithOrigins(addrs...))
|
||||
}
|
||||
addOpts = append(addOpts, pinclient.PinOpts.WithOrigins(addrs...))
|
||||
}
|
||||
|
||||
// Create or replace pin for MFS root
|
||||
if existingRequestID != "" {
|
||||
mfslog.Debugf("pinning to %q: replacing existing MFS root pin with %q", svcName, cid)
|
||||
_, err := c.Replace(ctx, existingRequestID, cid, addOpts...)
|
||||
if err != nil {
|
||||
return lastPin{}, err
|
||||
// Create or replace pin for MFS root
|
||||
if existingRequestID != "" {
|
||||
mfslog.Debugf("pinning to %q: replacing existing MFS root pin with %q", svcName, cid)
|
||||
if _, err = c.Replace(ctx, existingRequestID, cid, addOpts...); err != nil {
|
||||
return lastPin{}, err
|
||||
}
|
||||
} else {
|
||||
mfslog.Debugf("pinning to %q: creating a new MFS root pin for %q", svcName, cid)
|
||||
if _, err = c.Add(ctx, cid, addOpts...); err != nil {
|
||||
return lastPin{}, err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
mfslog.Debugf("pinning to %q: creating a new MFS root pin for %q", svcName, cid)
|
||||
_, err := c.Add(ctx, cid, addOpts...)
|
||||
if err != nil {
|
||||
return lastPin{}, err
|
||||
}
|
||||
mfslog.Debugf("pinning MFS to %q: pin for %q exists since %s, skipping", svcName, cid, pinTime.String())
|
||||
}
|
||||
return lastPin{Time: pinTime, ServiceName: svcName, ServiceConfig: svcConfig, CID: cid}, nil
|
||||
|
||||
return lastPin{
|
||||
Time: pinTime,
|
||||
ServiceName: svcName,
|
||||
ServiceConfig: svcConfig,
|
||||
CID: cid,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -1,14 +1,19 @@
|
||||
package kubo
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
merkledag "github.com/ipfs/boxo/ipld/merkledag"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
config "github.com/ipfs/kubo/config"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
peer "github.com/libp2p/go-libp2p/core/peer"
|
||||
@ -60,25 +65,37 @@ func isErrorSimilar(e1, e2 error) bool {
|
||||
}
|
||||
|
||||
func TestPinMFSConfigError(t *testing.T) {
|
||||
ctx := &testPinMFSContext{
|
||||
ctx: context.Background(),
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*testConfigPollInterval)
|
||||
defer cancel()
|
||||
|
||||
cctx := &testPinMFSContext{
|
||||
ctx: ctx,
|
||||
cfg: nil,
|
||||
err: fmt.Errorf("couldn't read config"),
|
||||
}
|
||||
node := &testPinMFSNode{}
|
||||
errCh := make(chan error)
|
||||
go pinMFSOnChange(testConfigPollInterval, ctx, node, errCh)
|
||||
if !isErrorSimilar(<-errCh, ctx.err) {
|
||||
t.Errorf("error did not propagate")
|
||||
|
||||
logReader := logging.NewPipeReader()
|
||||
go func() {
|
||||
pinMFSOnChange(cctx, testConfigPollInterval, node)
|
||||
logReader.Close()
|
||||
}()
|
||||
|
||||
level, msg := readLogLine(t, logReader)
|
||||
if level != "error" {
|
||||
t.Error("expected error to be logged")
|
||||
}
|
||||
if !isErrorSimilar(<-errCh, ctx.err) {
|
||||
if !isErrorSimilar(errors.New(msg), cctx.err) {
|
||||
t.Errorf("error did not propagate")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPinMFSRootNodeError(t *testing.T) {
|
||||
ctx := &testPinMFSContext{
|
||||
ctx: context.Background(),
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*testConfigPollInterval)
|
||||
defer cancel()
|
||||
|
||||
cctx := &testPinMFSContext{
|
||||
ctx: ctx,
|
||||
cfg: &config.Config{
|
||||
Pinning: config.Pinning{},
|
||||
},
|
||||
@ -87,12 +104,16 @@ func TestPinMFSRootNodeError(t *testing.T) {
|
||||
node := &testPinMFSNode{
|
||||
err: fmt.Errorf("cannot create root node"),
|
||||
}
|
||||
errCh := make(chan error)
|
||||
go pinMFSOnChange(testConfigPollInterval, ctx, node, errCh)
|
||||
if !isErrorSimilar(<-errCh, node.err) {
|
||||
t.Errorf("error did not propagate")
|
||||
logReader := logging.NewPipeReader()
|
||||
go func() {
|
||||
pinMFSOnChange(cctx, testConfigPollInterval, node)
|
||||
logReader.Close()
|
||||
}()
|
||||
level, msg := readLogLine(t, logReader)
|
||||
if level != "error" {
|
||||
t.Error("expected error to be logged")
|
||||
}
|
||||
if !isErrorSimilar(<-errCh, node.err) {
|
||||
if !isErrorSimilar(errors.New(msg), node.err) {
|
||||
t.Errorf("error did not propagate")
|
||||
}
|
||||
}
|
||||
@ -155,7 +176,8 @@ func TestPinMFSService(t *testing.T) {
|
||||
}
|
||||
|
||||
func testPinMFSServiceWithError(t *testing.T, cfg *config.Config, expectedErrorPrefix string) {
|
||||
goctx, cancel := context.WithCancel(context.Background())
|
||||
goctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
ctx := &testPinMFSContext{
|
||||
ctx: goctx,
|
||||
cfg: cfg,
|
||||
@ -164,16 +186,36 @@ func testPinMFSServiceWithError(t *testing.T, cfg *config.Config, expectedErrorP
|
||||
node := &testPinMFSNode{
|
||||
err: nil,
|
||||
}
|
||||
errCh := make(chan error)
|
||||
go pinMFSOnChange(testConfigPollInterval, ctx, node, errCh)
|
||||
defer cancel()
|
||||
// first pass through the pinning loop
|
||||
err := <-errCh
|
||||
if !strings.Contains((err).Error(), expectedErrorPrefix) {
|
||||
t.Errorf("expecting error containing %q", expectedErrorPrefix)
|
||||
logReader := logging.NewPipeReader()
|
||||
go func() {
|
||||
pinMFSOnChange(ctx, testConfigPollInterval, node)
|
||||
logReader.Close()
|
||||
}()
|
||||
level, msg := readLogLine(t, logReader)
|
||||
if level != "error" {
|
||||
t.Error("expected error to be logged")
|
||||
}
|
||||
// second pass through the pinning loop
|
||||
if !strings.Contains((err).Error(), expectedErrorPrefix) {
|
||||
if !strings.Contains(msg, expectedErrorPrefix) {
|
||||
t.Errorf("expecting error containing %q", expectedErrorPrefix)
|
||||
}
|
||||
}
|
||||
|
||||
func readLogLine(t *testing.T, logReader io.Reader) (string, string) {
|
||||
t.Helper()
|
||||
|
||||
r := bufio.NewReader(logReader)
|
||||
data, err := r.ReadBytes('\n')
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
logInfo := struct {
|
||||
Level string `json:"level"`
|
||||
Msg string `json:"msg"`
|
||||
}{}
|
||||
err = json.Unmarshal(data, &logInfo)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return logInfo.Level, logInfo.Msg
|
||||
}
|
||||
|
||||
@ -221,6 +221,8 @@ NOTE: a comma-separated notation is supported in CLI for convenience:
|
||||
|
||||
// Block unless --background=true is passed
|
||||
if !req.Options[pinBackgroundOptionName].(bool) {
|
||||
const pinWaitTime = 500 * time.Millisecond
|
||||
var timer *time.Timer
|
||||
requestID := ps.GetRequestId()
|
||||
for {
|
||||
ps, err = c.GetStatusByID(ctx, requestID)
|
||||
@ -237,10 +239,15 @@ NOTE: a comma-separated notation is supported in CLI for convenience:
|
||||
if s == pinclient.StatusFailed {
|
||||
return fmt.Errorf("remote service failed to pin requestid=%q", requestID)
|
||||
}
|
||||
tmr := time.NewTimer(time.Second / 2)
|
||||
if timer == nil {
|
||||
timer = time.NewTimer(pinWaitTime)
|
||||
} else {
|
||||
timer.Reset(pinWaitTime)
|
||||
}
|
||||
select {
|
||||
case <-tmr.C:
|
||||
case <-timer.C:
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return fmt.Errorf("waiting for pin interrupted, requestid=%q remains on remote service", requestID)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user