mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
202 lines
5.0 KiB
Go
202 lines
5.0 KiB
Go
package keyedaggregator
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
|
|
)
|
|
|
|
type testItem struct {
|
|
sequence uint64
|
|
value string
|
|
}
|
|
|
|
type stubCollector struct {
|
|
mu sync.Mutex
|
|
items []*testItem
|
|
}
|
|
|
|
func (c *stubCollector) Add(item *testItem) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.items = append(c.items, item)
|
|
return nil
|
|
}
|
|
|
|
func (c *stubCollector) Records() []*testItem {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
out := make([]*testItem, len(c.items))
|
|
copy(out, c.items)
|
|
return out
|
|
}
|
|
|
|
func (c *stubCollector) Remove(item *testItem) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
for i, existing := range c.items {
|
|
if existing == item {
|
|
c.items = append(c.items[:i], c.items[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
type trackingFactory struct {
|
|
mu sync.Mutex
|
|
collectors map[uint64]*stubCollector
|
|
}
|
|
|
|
func newTrackingFactory() *trackingFactory {
|
|
return &trackingFactory{
|
|
collectors: make(map[uint64]*stubCollector),
|
|
}
|
|
}
|
|
|
|
func (f *trackingFactory) Create(
|
|
sequence uint64,
|
|
) (Collector[testItem], error) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
collector := &stubCollector{}
|
|
f.collectors[sequence] = collector
|
|
return collector, nil
|
|
}
|
|
|
|
func (f *trackingFactory) collector(sequence uint64) *stubCollector {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
return f.collectors[sequence]
|
|
}
|
|
|
|
type noopTracer struct{}
|
|
|
|
func (noopTracer) Trace(string, ...consensus.LogParam) {}
|
|
func (noopTracer) Error(string, error, ...consensus.LogParam) {}
|
|
func (noopTracer) With(...consensus.LogParam) consensus.TraceLogger { return noopTracer{} }
|
|
|
|
func startTestAggregator(
|
|
t *testing.T,
|
|
aggregator *SequencedAggregator[testItem],
|
|
) func() {
|
|
t.Helper()
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
signalCtx, _ := lifecycle.WithSignaler(ctx)
|
|
require.NoError(t, aggregator.ComponentManager.Start(signalCtx))
|
|
<-aggregator.ComponentManager.Ready()
|
|
return func() {
|
|
cancel()
|
|
<-aggregator.ComponentManager.Done()
|
|
}
|
|
}
|
|
|
|
func TestSequencedAggregatorDispatchesItems(t *testing.T) {
|
|
t.Parallel()
|
|
factory := newTrackingFactory()
|
|
collectors := NewSequencedCollectors[testItem](noopTracer{}, 0, factory)
|
|
aggregator, err := NewSequencedAggregator[testItem](
|
|
noopTracer{},
|
|
0,
|
|
collectors,
|
|
func(item *testItem) uint64 { return item.sequence },
|
|
)
|
|
require.NoError(t, err)
|
|
stop := startTestAggregator(t, aggregator)
|
|
defer stop()
|
|
|
|
expected := &testItem{sequence: 2, value: "payload"}
|
|
aggregator.Add(expected)
|
|
|
|
require.Eventually(t, func() bool {
|
|
c := factory.collector(2)
|
|
if c == nil {
|
|
return false
|
|
}
|
|
items := c.Records()
|
|
return len(items) == 1 && items[0] == expected
|
|
}, time.Second, 10*time.Millisecond)
|
|
}
|
|
|
|
func TestSequencedAggregatorDropsStaleItems(t *testing.T) {
|
|
t.Parallel()
|
|
factory := newTrackingFactory()
|
|
collectors := NewSequencedCollectors[testItem](noopTracer{}, 5, factory)
|
|
aggregator, err := NewSequencedAggregator[testItem](
|
|
noopTracer{},
|
|
5,
|
|
collectors,
|
|
func(item *testItem) uint64 { return item.sequence },
|
|
)
|
|
require.NoError(t, err)
|
|
stop := startTestAggregator(t, aggregator)
|
|
defer stop()
|
|
|
|
aggregator.Add(&testItem{sequence: 2})
|
|
|
|
// Item is dropped before it ever enters the queue, so no collector
|
|
// should have been created for the stale sequence.
|
|
time.Sleep(50 * time.Millisecond)
|
|
require.Nil(t, factory.collector(2))
|
|
}
|
|
|
|
func TestSequencedAggregatorPrunesCollectorsOnSequenceChange(t *testing.T) {
|
|
t.Parallel()
|
|
factory := newTrackingFactory()
|
|
collectors := NewSequencedCollectors[testItem](noopTracer{}, 0, factory)
|
|
aggregator, err := NewSequencedAggregator[testItem](
|
|
noopTracer{},
|
|
0,
|
|
collectors,
|
|
func(item *testItem) uint64 { return item.sequence },
|
|
)
|
|
require.NoError(t, err)
|
|
stop := startTestAggregator(t, aggregator)
|
|
defer stop()
|
|
|
|
aggregator.Add(&testItem{sequence: 1})
|
|
require.Eventually(t, func() bool {
|
|
_, found, err := collectors.getCollector(1)
|
|
return err == nil && found
|
|
}, time.Second, 10*time.Millisecond)
|
|
|
|
aggregator.OnSequenceChange(0, 3)
|
|
require.Eventually(t, func() bool {
|
|
_, _, err := collectors.getCollector(1)
|
|
return errors.Is(err, ErrSequenceBelowRetention)
|
|
}, time.Second, 10*time.Millisecond)
|
|
}
|
|
|
|
func TestSequencedCollectorsGetOrCreateReusesInstances(t *testing.T) {
|
|
t.Parallel()
|
|
collectors := NewSequencedCollectors[testItem](noopTracer{}, 0, newTrackingFactory())
|
|
|
|
first, created, err := collectors.GetOrCreateCollector(4)
|
|
require.NoError(t, err)
|
|
require.True(t, created)
|
|
|
|
second, created, err := collectors.GetOrCreateCollector(4)
|
|
require.NoError(t, err)
|
|
require.False(t, created)
|
|
require.Equal(t, first, second)
|
|
}
|
|
|
|
func TestSequencedCollectorsPruneRemovesOldSequences(t *testing.T) {
|
|
t.Parallel()
|
|
collectors := NewSequencedCollectors[testItem](noopTracer{}, 0, newTrackingFactory())
|
|
|
|
_, _, err := collectors.GetOrCreateCollector(2)
|
|
require.NoError(t, err)
|
|
|
|
collectors.PruneUpToSequence(5)
|
|
|
|
_, _, err = collectors.GetOrCreateCollector(2)
|
|
require.ErrorIs(t, err, ErrSequenceBelowRetention)
|
|
}
|