mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
Some checks are pending
CodeQL / codeql (push) Waiting to run
Docker Check / lint (push) Waiting to run
Docker Check / build (push) Waiting to run
Gateway Conformance / gateway-conformance (push) Waiting to run
Gateway Conformance / gateway-conformance-libp2p-experiment (push) Waiting to run
Go Build / go-build (push) Waiting to run
Go Check / go-check (push) Waiting to run
Go Lint / go-lint (push) Waiting to run
Go Test / unit-tests (push) Waiting to run
Go Test / cli-tests (push) Waiting to run
Go Test / example-tests (push) Waiting to run
Interop / interop-prep (push) Waiting to run
Interop / helia-interop (push) Blocked by required conditions
Interop / ipfs-webui (push) Blocked by required conditions
Sharness / sharness-test (push) Waiting to run
Spell Check / spellcheck (push) Waiting to run
* feat(pubsub): persistent seqno validation and diagnostic commands - upgrade go-libp2p-pubsub to v0.15.0 - add persistent seqno validator using BasicSeqnoValidator stores max seen seqno per peer at /pubsub/seqno/<peerid> survives daemon restarts, addresses message cycling in large networks (#9665) - add `ipfs pubsub reset` command to clear validator state - add `ipfs diag datastore get/count` commands for datastore inspection requires daemon to be stopped, useful for debugging - change pubsub status from Deprecated to Experimental - add CLI tests for pubsub and diag datastore commands - remove flaky pubsub_msg_seen_cache_test.go (replaced by CLI tests) * fix(pubsub): improve reset command and add deprecation warnings - use batched delete for efficient bulk reset - check key existence before reporting deleted count - sync datastore after deletions to ensure persistence - show "no validator state found" when resetting non-existent peer - log deprecation warnings when using --enable-pubsub-experiment or --enable-namesys-pubsub CLI flags * refactor(test): add datastore helpers to test harness --------- Co-authored-by: Andrew Gillis <11790789+gammazero@users.noreply.github.com>
86 lines
2.6 KiB
Go
86 lines
2.6 KiB
Go
package libp2p
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log/slog"
|
|
|
|
"github.com/ipfs/go-datastore"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"github.com/libp2p/go-libp2p/core/discovery"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"go.uber.org/fx"
|
|
|
|
"github.com/ipfs/kubo/core/node/helpers"
|
|
"github.com/ipfs/kubo/repo"
|
|
)
|
|
|
|
type pubsubParams struct {
|
|
fx.In
|
|
|
|
Repo repo.Repo
|
|
Host host.Host
|
|
Discovery discovery.Discovery
|
|
}
|
|
|
|
func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
|
|
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params pubsubParams) (service *pubsub.PubSub, err error) {
|
|
return pubsub.NewFloodSub(
|
|
helpers.LifecycleCtx(mctx, lc),
|
|
params.Host,
|
|
append(pubsubOptions,
|
|
pubsub.WithDiscovery(params.Discovery),
|
|
pubsub.WithDefaultValidator(newSeqnoValidator(params.Repo.Datastore())))...,
|
|
)
|
|
}
|
|
}
|
|
|
|
func GossipSub(pubsubOptions ...pubsub.Option) interface{} {
|
|
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params pubsubParams) (service *pubsub.PubSub, err error) {
|
|
return pubsub.NewGossipSub(
|
|
helpers.LifecycleCtx(mctx, lc),
|
|
params.Host,
|
|
append(pubsubOptions,
|
|
pubsub.WithDiscovery(params.Discovery),
|
|
pubsub.WithFloodPublish(true), // flood own publications to all peers for reliable IPNS delivery
|
|
pubsub.WithDefaultValidator(newSeqnoValidator(params.Repo.Datastore())))...,
|
|
)
|
|
}
|
|
}
|
|
|
|
func newSeqnoValidator(ds datastore.Datastore) pubsub.ValidatorEx {
|
|
return pubsub.NewBasicSeqnoValidator(&seqnoStore{ds: ds}, slog.New(logging.SlogHandler()).With("logger", "pubsub"))
|
|
}
|
|
|
|
// SeqnoStorePrefix is the datastore prefix for pubsub seqno validator state.
|
|
const SeqnoStorePrefix = "/pubsub/seqno/"
|
|
|
|
// seqnoStore implements pubsub.PeerMetadataStore using the repo datastore.
|
|
// It stores the maximum seen sequence number per peer to prevent message
|
|
// cycles when network diameter exceeds the timecache span.
|
|
type seqnoStore struct {
|
|
ds datastore.Datastore
|
|
}
|
|
|
|
var _ pubsub.PeerMetadataStore = (*seqnoStore)(nil)
|
|
|
|
// Get returns the stored seqno for a peer, or (nil, nil) if the peer is unknown.
|
|
// Returning (nil, nil) for unknown peers allows BasicSeqnoValidator to accept
|
|
// the first message from any peer.
|
|
func (s *seqnoStore) Get(ctx context.Context, p peer.ID) ([]byte, error) {
|
|
key := datastore.NewKey(SeqnoStorePrefix + p.String())
|
|
val, err := s.ds.Get(ctx, key)
|
|
if errors.Is(err, datastore.ErrNotFound) {
|
|
return nil, nil
|
|
}
|
|
return val, err
|
|
}
|
|
|
|
// Put stores the seqno for a peer.
|
|
func (s *seqnoStore) Put(ctx context.Context, p peer.ID, val []byte) error {
|
|
key := datastore.NewKey(SeqnoStorePrefix + p.String())
|
|
return s.ds.Put(ctx, key, val)
|
|
}
|