diff --git a/CHANGELOG.md b/CHANGELOG.md index e62149b481..5eaa327134 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changes + +- Improve reaper to sustain txs burst better [#3236](https://github.com/evstack/ev-node/pull/3236) + ## v1.1.0 No changes from v1.1.0-rc.2. diff --git a/block/components.go b/block/components.go index 71b6f60523..ac5f782cdd 100644 --- a/block/components.go +++ b/block/components.go @@ -278,9 +278,9 @@ func newAggregatorComponents( sequencer, genesis, logger, - executor, cacheManager, config.Node.ScrapeInterval.Duration, + executor.NotifyNewTransactions, ) if err != nil { return nil, fmt.Errorf("failed to create reaper: %w", err) diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 6f1b9d9cf3..a91b1410d7 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -53,6 +53,7 @@ type CacheManager interface { // Transaction operations IsTxSeen(hash string) bool SetTxSeen(hash string) + SetTxsSeen(hashes []string) CleanupOldTxs(olderThan time.Duration) int // Pending events syncing coordination @@ -210,6 +211,14 @@ func (m *implementation) SetTxSeen(hash string) { m.txTimestamps.Store(hash, time.Now()) } +func (m *implementation) SetTxsSeen(hashes []string) { + now := time.Now() + for _, hash := range hashes { + m.txCache.setSeen(hash, 0) + m.txTimestamps.Store(hash, now) + } +} + // CleanupOldTxs removes transaction hashes older than olderThan and returns // the count removed. Defaults to DefaultTxCacheRetention if olderThan <= 0. func (m *implementation) CleanupOldTxs(olderThan time.Duration) int { @@ -237,13 +246,6 @@ func (m *implementation) CleanupOldTxs(olderThan time.Duration) int { return true }) - if removed > 0 { - m.logger.Debug(). - Int("removed", removed). - Dur("older_than", olderThan). - Msg("cleaned up old transaction hashes from cache") - } - return removed } diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 3ddc90211a..e1c335f1e5 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -162,11 +162,20 @@ func (e *Executor) SetBlockProducer(bp BlockProducer) { } // Start begins the execution component -func (e *Executor) Start(ctx context.Context) error { +func (e *Executor) Start(ctx context.Context) (err error) { + if e.cancel != nil { + return errors.New("executor already started") + } e.ctx, e.cancel = context.WithCancel(ctx) + defer func() { // if error during init cancel context + if err != nil { + e.cancel() + e.ctx, e.cancel = nil, nil + } + }() // Initialize state - if err := e.initializeState(); err != nil { + if err = e.initializeState(); err != nil { return fmt.Errorf("failed to initialize state: %w", err) } diff --git a/block/internal/pruner/pruner.go b/block/internal/pruner/pruner.go index 797f911d50..e56d9c2849 100644 --- a/block/internal/pruner/pruner.go +++ b/block/internal/pruner/pruner.go @@ -52,6 +52,9 @@ func New( // Start begins the pruning loop. func (p *Pruner) Start(ctx context.Context) error { + if p.cancel != nil { + return errors.New("pruner already started") + } if !p.cfg.IsPruningEnabled() { p.logger.Info().Msg("pruning is disabled, not starting pruner") return nil diff --git a/block/internal/reaping/bench_test.go b/block/internal/reaping/bench_test.go new file mode 100644 index 0000000000..5ec0aaa69d --- /dev/null +++ b/block/internal/reaping/bench_test.go @@ -0,0 +1,330 @@ +package reaping + +import ( + "context" + "crypto/rand" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/rs/zerolog" + + "github.com/evstack/ev-node/block/internal/cache" + coreexecutor "github.com/evstack/ev-node/core/execution" + coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/store" +) + +func newBenchCache(b *testing.B) cache.CacheManager { + b.Helper() + cfg := config.Config{RootDir: b.TempDir()} + memDS := dssync.MutexWrap(ds.NewMapDatastore()) + st := store.New(memDS) + cm, err := cache.NewManager(cfg, st, zerolog.Nop()) + if err != nil { + b.Fatal(err) + } + return cm +} + +type infiniteExecutor struct { + mu sync.Mutex + batch [][]byte + pending atomic.Int64 +} + +func (e *infiniteExecutor) Feed(n int, txSize int) { + txs := make([][]byte, n) + for i := range txs { + tx := make([]byte, txSize) + _, _ = rand.Read(tx) + txs[i] = tx + } + e.mu.Lock() + e.batch = txs + e.mu.Unlock() + e.pending.Add(int64(n)) +} + +func (e *infiniteExecutor) GetTxs(_ context.Context) ([][]byte, error) { + e.mu.Lock() + txs := e.batch + e.batch = nil + e.mu.Unlock() + return txs, nil +} + +func (e *infiniteExecutor) ExecuteTxs(_ context.Context, _ [][]byte, _ uint64, _ time.Time, _ []byte) ([]byte, error) { + return nil, nil +} + +func (e *infiniteExecutor) FilterTxs(_ context.Context, txs [][]byte, _ uint64, _ uint64, _ bool) ([]coreexecutor.FilterStatus, error) { + return make([]coreexecutor.FilterStatus, len(txs)), nil +} + +func (e *infiniteExecutor) GetExecutionInfo(_ context.Context) (coreexecutor.ExecutionInfo, error) { + return coreexecutor.ExecutionInfo{}, nil +} + +func (e *infiniteExecutor) InitChain(_ context.Context, _ time.Time, _ uint64, _ string) ([]byte, error) { + return nil, nil +} + +func (e *infiniteExecutor) SetFinal(_ context.Context, _ uint64) error { return nil } + +type countingSequencer struct { + submitted atomic.Int64 +} + +func (s *countingSequencer) SubmitBatchTxs(_ context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + s.submitted.Add(int64(len(req.Batch.Transactions))) + return &coresequencer.SubmitBatchTxsResponse{}, nil +} + +func (s *countingSequencer) GetNextBatch(_ context.Context, _ coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { + return &coresequencer.GetNextBatchResponse{}, nil +} + +func (s *countingSequencer) VerifyBatch(_ context.Context, _ coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + return &coresequencer.VerifyBatchResponse{}, nil +} + +func (s *countingSequencer) GetDAHeight() uint64 { return 0 } +func (s *countingSequencer) SetDAHeight(_ uint64) {} + +func benchmarkReaperFlow(b *testing.B, batchSize int, txSize int, feedInterval time.Duration) { + scenario := fmt.Sprintf("batch=%d/txSize=%d", batchSize, txSize) + + b.Run(scenario, func(b *testing.B) { + exec := &infiniteExecutor{} + seq := &countingSequencer{} + + cm := newBenchCache(b) + var notified atomic.Int64 + + r, err := NewReaper( + exec, + seq, + genesis.Genesis{ChainID: "bench"}, + zerolog.Nop(), + cm, + 50*time.Millisecond, + func() { notified.Add(1) }, + ) + if err != nil { + b.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := r.Start(ctx); err != nil { + b.Fatal(err) + } + + stopFeeding := make(chan struct{}) + feederDone := make(chan struct{}) + go func() { + defer close(feederDone) + ticker := time.NewTicker(feedInterval) + defer ticker.Stop() + for { + select { + case <-stopFeeding: + return + case <-ticker.C: + exec.Feed(batchSize, txSize) + } + } + }() + + b.ResetTimer() + + var lastCount int64 + for i := 0; i < b.N; i++ { + exec.Feed(batchSize, txSize) + + deadline := time.After(5 * time.Second) + expected := seq.submitted.Load() + int64(batchSize) + for { + cur := seq.submitted.Load() + if cur >= expected { + lastCount = cur + break + } + select { + case <-deadline: + b.Fatalf("timeout: submitted %d, expected >= %d", cur, expected) + default: + } + } + } + + b.StopTimer() + close(stopFeeding) + <-feederDone + cancel() + r.Stop() + + b.ReportMetric(float64(lastCount)/b.Elapsed().Seconds(), "txs/sec") + }) +} + +func BenchmarkReaperFlow_Throughput(b *testing.B) { + sizes := []int{256, 1024, 4096} + batches := []int{10, 100, 500} + + for _, batchSize := range batches { + for _, txSize := range sizes { + benchmarkReaperFlow(b, batchSize, txSize, 10*time.Millisecond) + } + } +} + +func BenchmarkReaperFlow_Sustained(b *testing.B) { + b.Run("steady_100txs_256B", func(b *testing.B) { + exec := &infiniteExecutor{} + seq := &countingSequencer{} + cm := newBenchCache(b) + var notified atomic.Int64 + + r, err := NewReaper( + exec, seq, + genesis.Genesis{ChainID: "bench"}, + zerolog.Nop(), cm, + 10*time.Millisecond, + func() { notified.Add(1) }, + ) + if err != nil { + b.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := r.Start(ctx); err != nil { + b.Fatal(err) + } + + const batchSize = 100 + const txSize = 256 + const duration = 3 * time.Second + + feederDone := make(chan struct{}) + go func() { + defer close(feederDone) + for { + exec.Feed(batchSize, txSize) + time.Sleep(time.Millisecond) + select { + case <-ctx.Done(): + return + default: + } + } + }() + + b.ResetTimer() + time.Sleep(duration) + b.StopTimer() + + cancel() + r.Stop() + <-feederDone + + total := seq.submitted.Load() + elapsed := b.Elapsed().Seconds() + b.ReportMetric(float64(total)/elapsed, "txs/sec") + b.ReportMetric(float64(notified.Load())/elapsed, "notifies/sec") + b.Logf("submitted %d txs in %.1fs (%.0f txs/sec, %d notifications)", total, elapsed, float64(total)/elapsed, notified.Load()) + }) +} + +func BenchmarkReaperFlow_StartStop(b *testing.B) { + b.Run("lifecycle", func(b *testing.B) { + exec := &infiniteExecutor{} + seq := &countingSequencer{} + cm := newBenchCache(b) + + for i := 0; i < b.N; i++ { + r, err := NewReaper( + exec, seq, + genesis.Genesis{ChainID: "bench"}, + zerolog.Nop(), cm, + 100*time.Millisecond, + func() {}, + ) + if err != nil { + b.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + if err := r.Start(ctx); err != nil { + cancel() + b.Fatal(err) + } + r.Stop() + cancel() + } + }) + + b.Run("start_with_backlog", func(b *testing.B) { + const batchSize = 500 + const txSize = 256 + + for i := 0; i < b.N; i++ { + b.StopTimer() + + exec := &infiniteExecutor{} + seq := &countingSequencer{} + cm := newBenchCache(b) + + txs := make([][]byte, batchSize) + for j := range txs { + txs[j] = make([]byte, txSize) + _, _ = rand.Read(txs[j]) + } + exec.mu.Lock() + exec.batch = txs + exec.mu.Unlock() + + r, err := NewReaper( + exec, seq, + genesis.Genesis{ChainID: "bench"}, + zerolog.Nop(), cm, + 10*time.Millisecond, + func() {}, + ) + if err != nil { + b.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + b.StartTimer() + if err := r.Start(ctx); err != nil { + cancel() + b.Fatal(err) + } + + deadline := time.After(5 * time.Second) + for seq.submitted.Load() < batchSize { + select { + case <-deadline: + b.Fatal("timeout waiting for backlog drain") + default: + } + } + b.StopTimer() + + r.Stop() + cancel() + } + }) +} diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index 67b2020216..796505e462 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -6,13 +6,13 @@ import ( "encoding/hex" "errors" "fmt" + "runtime" "sync" "time" "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/cache" - "github.com/evstack/ev-node/block/internal/executing" coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/genesis" @@ -21,40 +21,35 @@ import ( const ( // MaxBackoffInterval is the maximum backoff interval for retries MaxBackoffInterval = 30 * time.Second + CleanupInterval = 1 * time.Hour ) // Reaper is responsible for periodically retrieving transactions from the executor, // filtering out already seen transactions, and submitting new transactions to the sequencer. type Reaper struct { - exec coreexecutor.Executor - sequencer coresequencer.Sequencer - chainID string - interval time.Duration - cache cache.CacheManager - executor *executing.Executor - - // shared components + exec coreexecutor.Executor + sequencer coresequencer.Sequencer + chainID string + interval time.Duration + cache cache.CacheManager + onTxsSubmitted func() + logger zerolog.Logger - // Lifecycle ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } -// NewReaper creates a new Reaper instance. func NewReaper( exec coreexecutor.Executor, sequencer coresequencer.Sequencer, genesis genesis.Genesis, logger zerolog.Logger, - executor *executing.Executor, cache cache.CacheManager, scrapeInterval time.Duration, + onTxsSubmitted func(), ) (*Reaper, error) { - if executor == nil { - return nil, errors.New("executor cannot be nil") - } if cache == nil { return nil, errors.New("cache cannot be nil") } @@ -63,134 +58,171 @@ func NewReaper( } return &Reaper{ - exec: exec, - sequencer: sequencer, - chainID: genesis.ChainID, - interval: scrapeInterval, - logger: logger.With().Str("component", "reaper").Logger(), - cache: cache, - executor: executor, + exec: exec, + sequencer: sequencer, + chainID: genesis.ChainID, + interval: scrapeInterval, + logger: logger.With().Str("component", "reaper").Logger(), + cache: cache, + onTxsSubmitted: onTxsSubmitted, }, nil } // Start begins the execution component func (r *Reaper) Start(ctx context.Context) error { + if r.cancel != nil { + return errors.New("reaper already started") + } r.ctx, r.cancel = context.WithCancel(ctx) - // Start reaper loop r.wg.Go(r.reaperLoop) - r.logger.Info().Dur("interval", r.interval).Msg("reaper started") + r.logger.Info().Dur("idle_interval", r.interval).Msg("reaper started") return nil } func (r *Reaper) reaperLoop() { - ticker := time.NewTicker(r.interval) - defer ticker.Stop() - - cleanupTicker := time.NewTicker(1 * time.Hour) + cleanupTicker := time.NewTicker(CleanupInterval) defer cleanupTicker.Stop() consecutiveFailures := 0 for { - select { - case <-r.ctx.Done(): - return - case <-ticker.C: - err := r.SubmitTxs() - if err != nil { - // Increment failure counter and apply exponential backoff - consecutiveFailures++ - backoff := r.interval * time.Duration(1< 0 { - r.logger.Info().Msg("reaper recovered from errors, resetting backoff") - consecutiveFailures = 0 - ticker.Reset(r.interval) - } - } - case <-cleanupTicker.C: - // Clean up transaction hashes older than 24 hours - // This prevents unbounded growth of the transaction seen cache - removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention) - if removed > 0 { - r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes") + submitted, err := r.drainMempool(cleanupTicker.C) + + if err != nil && r.ctx.Err() == nil { + consecutiveFailures++ + backoff := r.interval * time.Duration(1< 0 { + r.logger.Info().Msg("reaper recovered from errors") + consecutiveFailures = 0 + } + + if submitted { + runtime.Gosched() + continue + } + + if r.wait(r.interval) { + return } } } +// wait blocks for the given duration. Returns true if the context was cancelled. +func (r *Reaper) wait(d time.Duration) bool { + timer := time.NewTimer(d) + defer timer.Stop() + select { + case <-r.ctx.Done(): + return true + case <-timer.C: + return false + } +} + // Stop shuts down the reaper component func (r *Reaper) Stop() error { if r.cancel != nil { r.cancel() } r.wg.Wait() - r.logger.Info().Msg("reaper stopped") return nil } -// SubmitTxs retrieves transactions from the executor and submits them to the sequencer. -// Returns an error if any critical operation fails. -func (r *Reaper) SubmitTxs() error { - txs, err := r.exec.GetTxs(r.ctx) - if err != nil { - r.logger.Error().Err(err).Msg("failed to get txs from executor") - return fmt.Errorf("failed to get txs from executor: %w", err) +type pendingTx struct { + tx []byte + hash string +} + +func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { + var totalSubmitted int + + defer func() { + if totalSubmitted > 0 && r.onTxsSubmitted != nil { + r.onTxsSubmitted() + } + }() + + for { + select { + case <-cleanupCh: + removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention) + if removed > 0 { + r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes") + } + default: + } + + txs, err := r.exec.GetTxs(r.ctx) + if err != nil { + return totalSubmitted > 0, fmt.Errorf("failed to get txs from executor: %w", err) + } + if len(txs) == 0 { + break + } + + filtered := r.filterNewTxs(txs) + if len(filtered) == 0 { + break + } + + n, err := r.submitFiltered(filtered) + if err != nil { + return totalSubmitted > 0, err + } + totalSubmitted += n } - if len(txs) == 0 { - r.logger.Debug().Msg("no new txs") - return nil + + if totalSubmitted > 0 { + r.logger.Debug().Int("total_txs", totalSubmitted).Msg("drained mempool") } - var newTxs [][]byte + return totalSubmitted > 0, nil +} + +func (r *Reaper) filterNewTxs(txs [][]byte) []pendingTx { + pending := make([]pendingTx, 0, len(txs)) for _, tx := range txs { - txHash := hashTx(tx) - if !r.cache.IsTxSeen(txHash) { - newTxs = append(newTxs, tx) + h := hashTx(tx) + if !r.cache.IsTxSeen(h) { + pending = append(pending, pendingTx{tx: tx, hash: h}) } } + return pending +} - if len(newTxs) == 0 { - r.logger.Debug().Msg("no new txs to submit") - return nil +func (r *Reaper) submitFiltered(batch []pendingTx) (int, error) { + txs := make([][]byte, len(batch)) + hashes := make([]string, len(batch)) + for i, p := range batch { + txs[i] = p.tx + hashes[i] = p.hash } - r.logger.Debug().Int("txCount", len(newTxs)).Msg("submitting txs to sequencer") - - _, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ + _, err := r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ Id: []byte(r.chainID), - Batch: &coresequencer.Batch{Transactions: newTxs}, + Batch: &coresequencer.Batch{Transactions: txs}, }) if err != nil { - return fmt.Errorf("failed to submit txs to sequencer: %w", err) - } - - for _, tx := range newTxs { - txHash := hashTx(tx) - r.cache.SetTxSeen(txHash) + return 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) } - // Notify the executor that new transactions are available - if len(newTxs) > 0 { - r.logger.Debug().Msg("notifying executor of new transactions") - r.executor.NotifyNewTransactions() - } - - r.logger.Debug().Msg("successfully submitted txs") - return nil + r.cache.SetTxsSeen(hashes) + return len(txs), nil } func hashTx(tx []byte) string { diff --git a/block/internal/reaping/reaper_test.go b/block/internal/reaping/reaper_test.go index 5700882e8a..6bf4426d4a 100644 --- a/block/internal/reaping/reaper_test.go +++ b/block/internal/reaping/reaper_test.go @@ -2,214 +2,251 @@ package reaping import ( "context" - crand "crypto/rand" + "sync/atomic" "testing" "time" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/evstack/ev-node/block/internal/cache" - "github.com/evstack/ev-node/block/internal/common" - "github.com/evstack/ev-node/block/internal/executing" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/signer/noop" "github.com/evstack/ev-node/pkg/store" testmocks "github.com/evstack/ev-node/test/mocks" ) -// helper to create a minimal executor to capture notifications -func newTestExecutor(t *testing.T) *executing.Executor { +func newTestCache(t *testing.T) cache.CacheManager { t.Helper() - - // signer is required by NewExecutor - priv, _, err := crypto.GenerateEd25519Key(crand.Reader) - require.NoError(t, err) - s, err := noop.NewNoopSigner(priv) - require.NoError(t, err) - - // Get the signer's address to use as proposer - signerAddr, err := s.GetAddress() - require.NoError(t, err) - - exec, err := executing.NewExecutor( - nil, // store (unused) - nil, // core executor (unused) - nil, // sequencer (unused) - s, // signer (required) - nil, // cache (unused) - nil, // metrics (unused) - config.DefaultConfig(), - genesis.Genesis{ // minimal genesis - ChainID: "test-chain", - InitialHeight: 1, - StartTime: time.Now(), - ProposerAddress: signerAddr, - }, - nil, // header broadcaster - nil, // data broadcaster - zerolog.Nop(), - common.DefaultBlockOptions(), - make(chan error, 1), // error channel - nil, - ) + cfg := config.Config{RootDir: t.TempDir()} + memDS := dssync.MutexWrap(ds.NewMapDatastore()) + st := store.New(memDS) + cm, err := cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) + return cm +} - return exec +type testEnv struct { + execMock *testmocks.MockExecutor + seqMock *testmocks.MockSequencer + cache cache.CacheManager + reaper *Reaper + notified atomic.Bool } -// helper to create a cache manager for tests -func newTestCache(t *testing.T) cache.CacheManager { +func newTestEnv(t *testing.T) *testEnv { t.Helper() + mockExec := testmocks.NewMockExecutor(t) + mockSeq := testmocks.NewMockSequencer(t) + cm := newTestCache(t) - cfg := config.Config{ - RootDir: t.TempDir(), + env := &testEnv{ + execMock: mockExec, + seqMock: mockSeq, + cache: cm, } - // Create an in-memory store for the cache - memDS := dssync.MutexWrap(ds.NewMapDatastore()) - st := store.New(memDS) - - cacheManager, err := cache.NewManager(cfg, st, zerolog.Nop()) + r, err := NewReaper( + mockExec, mockSeq, + genesis.Genesis{ChainID: "test-chain"}, + zerolog.Nop(), cm, + 100*time.Millisecond, + env.notify, + ) require.NoError(t, err) + env.reaper = r - return cacheManager + return env } -// reaper with mocks and cache manager -func newTestReaper(t *testing.T, chainID string, execMock *testmocks.MockExecutor, seqMock *testmocks.MockSequencer, e *executing.Executor, cm cache.CacheManager) *Reaper { - t.Helper() - - r, err := NewReaper(execMock, seqMock, genesis.Genesis{ChainID: chainID}, zerolog.Nop(), e, cm, 100*time.Millisecond) - require.NoError(t, err) +func (e *testEnv) notify() { + e.notified.Store(true) +} - return r +func (e *testEnv) wasNotified() bool { + return e.notified.Load() } -func TestReaper_SubmitTxs_NewTxs_SubmitsAndPersistsAndNotifies(t *testing.T) { - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) +func TestReaper_NewTxs_SubmitsAndPersistsAndNotifies(t *testing.T) { + env := newTestEnv(t) - // Two new transactions tx1 := []byte("tx1") tx2 := []byte("tx2") - mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() - // Expect a single SubmitBatchTxs with both txs - mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() + + env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). RunAndReturn(func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { - require.Equal(t, []byte("chain-A"), req.Id) - require.NotNil(t, req.Batch) assert.Equal(t, [][]byte{tx1, tx2}, req.Batch.Transactions) return &coresequencer.SubmitBatchTxsResponse{}, nil }).Once() - // Minimal executor to capture NotifyNewTransactions - e := newTestExecutor(t) - cm := newTestCache(t) - - r := newTestReaper(t, "chain-A", mockExec, mockSeq, e, cm) - - assert.NoError(t, r.SubmitTxs()) - - // Verify transactions are marked as seen in cache - assert.True(t, cm.IsTxSeen(hashTx(tx1))) - assert.True(t, cm.IsTxSeen(hashTx(tx2))) - - // Executor notified - check using test helper - if !e.HasPendingTxNotification() { - t.Fatal("expected NotifyNewTransactions to signal txNotifyCh") - } + submitted, err := env.reaper.drainMempool(nil) + assert.NoError(t, err) + assert.True(t, submitted) + assert.True(t, env.cache.IsTxSeen(hashTx(tx1))) + assert.True(t, env.cache.IsTxSeen(hashTx(tx2))) + assert.True(t, env.wasNotified()) } -func TestReaper_SubmitTxs_AllSeen_NoSubmit(t *testing.T) { - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) +func TestReaper_AllSeen_NoSubmit(t *testing.T) { + env := newTestEnv(t) tx1 := []byte("tx1") tx2 := []byte("tx2") - // Pre-populate cache with seen transactions - e := newTestExecutor(t) - cm := newTestCache(t) - cm.SetTxSeen(hashTx(tx1)) - cm.SetTxSeen(hashTx(tx2)) - - r := newTestReaper(t, "chain-B", mockExec, mockSeq, e, cm) - - mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() - // No SubmitBatchTxs expected + env.cache.SetTxSeen(hashTx(tx1)) + env.cache.SetTxSeen(hashTx(tx2)) - assert.NoError(t, r.SubmitTxs()) + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() - // Ensure no notification occurred - if e.HasPendingTxNotification() { - t.Fatal("did not expect notification when all txs are seen") - } + submitted, err := env.reaper.drainMempool(nil) + assert.NoError(t, err) + assert.False(t, submitted) + assert.False(t, env.wasNotified()) } -func TestReaper_SubmitTxs_PartialSeen_FiltersAndPersists(t *testing.T) { - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) +func TestReaper_PartialSeen_FiltersAndPersists(t *testing.T) { + env := newTestEnv(t) txOld := []byte("old") txNew := []byte("new") - e := newTestExecutor(t) - cm := newTestCache(t) - - // Mark txOld as seen - cm.SetTxSeen(hashTx(txOld)) + env.cache.SetTxSeen(hashTx(txOld)) - r := newTestReaper(t, "chain-C", mockExec, mockSeq, e, cm) + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{txOld, txNew}, nil).Once() + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() - mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{txOld, txNew}, nil).Once() - mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). + env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). RunAndReturn(func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { - // Should only include txNew assert.Equal(t, [][]byte{txNew}, req.Batch.Transactions) return &coresequencer.SubmitBatchTxsResponse{}, nil }).Once() - assert.NoError(t, r.SubmitTxs()) + submitted, err := env.reaper.drainMempool(nil) + assert.NoError(t, err) + assert.True(t, submitted) + assert.True(t, env.cache.IsTxSeen(hashTx(txOld))) + assert.True(t, env.cache.IsTxSeen(hashTx(txNew))) + assert.True(t, env.wasNotified()) +} + +func TestReaper_SequencerError_NoPersistence_NoNotify(t *testing.T) { + env := newTestEnv(t) - // Both should be seen after successful submit - assert.True(t, cm.IsTxSeen(hashTx(txOld))) - assert.True(t, cm.IsTxSeen(hashTx(txNew))) + tx := []byte("oops") - // Notification should occur since a new tx was submitted - if !e.HasPendingTxNotification() { - t.Fatal("expected notification when new tx submitted") - } + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx}, nil).Once() + + env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). + Return((*coresequencer.SubmitBatchTxsResponse)(nil), assert.AnError).Once() + + _, err := env.reaper.drainMempool(nil) + assert.Error(t, err) + assert.False(t, env.cache.IsTxSeen(hashTx(tx))) + assert.False(t, env.wasNotified()) +} + +func TestReaper_DrainsMempoolInMultipleRounds(t *testing.T) { + env := newTestEnv(t) + + tx1 := []byte("tx1") + tx2 := []byte("tx2") + tx3 := []byte("tx3") + + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx3}, nil).Once() + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() + + env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). + RunAndReturn(func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + return &coresequencer.SubmitBatchTxsResponse{}, nil + }).Twice() + + submitted, err := env.reaper.drainMempool(nil) + assert.NoError(t, err) + assert.True(t, submitted) + assert.True(t, env.cache.IsTxSeen(hashTx(tx1))) + assert.True(t, env.cache.IsTxSeen(hashTx(tx2))) + assert.True(t, env.cache.IsTxSeen(hashTx(tx3))) + assert.True(t, env.wasNotified()) } -func TestReaper_SubmitTxs_SequencerError_NoPersistence_NoNotify(t *testing.T) { +func TestReaper_EmptyMempool_NoAction(t *testing.T) { + env := newTestEnv(t) + + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() + + submitted, err := env.reaper.drainMempool(nil) + assert.NoError(t, err) + assert.False(t, submitted) + assert.False(t, env.wasNotified()) +} + +func TestReaper_HashComputedOnce(t *testing.T) { + env := newTestEnv(t) + + tx := []byte("unique-tx") + expectedHash := hashTx(tx) + + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx}, nil).Once() + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() + + env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). + Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once() + + submitted, err := env.reaper.drainMempool(nil) + assert.NoError(t, err) + assert.True(t, submitted) + assert.True(t, env.cache.IsTxSeen(expectedHash)) +} + +func TestReaper_NilCallback_NoPanic(t *testing.T) { mockExec := testmocks.NewMockExecutor(t) mockSeq := testmocks.NewMockSequencer(t) + cm := newTestCache(t) - tx := []byte("oops") + r, err := NewReaper( + mockExec, mockSeq, + genesis.Genesis{ChainID: "test-chain"}, + zerolog.Nop(), cm, + 100*time.Millisecond, + nil, + ) + require.NoError(t, err) + + tx := []byte("tx") mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx}, nil).Once() + mockExec.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). - Return((*coresequencer.SubmitBatchTxsResponse)(nil), assert.AnError).Once() + Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once() - e := newTestExecutor(t) - cm := newTestCache(t) - r := newTestReaper(t, "chain-D", mockExec, mockSeq, e, cm) + submitted, err := r.drainMempool(nil) + assert.NoError(t, err) + assert.True(t, submitted) +} + +func TestReaper_StopTerminates(t *testing.T) { + env := newTestEnv(t) + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Maybe() - assert.Error(t, r.SubmitTxs()) + require.NoError(t, env.reaper.Start(context.Background())) - // Should not be marked seen - assert.False(t, cm.IsTxSeen(hashTx(tx))) + done := make(chan struct{}) + go func() { + env.reaper.Stop() + close(done) + }() - // Should not notify - if e.HasPendingTxNotification() { - t.Fatal("did not expect notification on sequencer error") + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Stop() did not return in time") } } diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 34fab216de..d92cf9258c 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -118,11 +118,21 @@ func NewSubmitter( } // Start begins the submitting component -func (s *Submitter) Start(ctx context.Context) error { +func (s *Submitter) Start(ctx context.Context) (err error) { + if s.cancel != nil { + return errors.New("submitter already started") + } + s.ctx, s.cancel = context.WithCancel(ctx) + defer func() { // if error during init cancel context + if err != nil { + s.cancel() + s.ctx, s.cancel = nil, nil + } + }() // Initialize DA included height - if err := s.initializeDAIncludedHeight(ctx); err != nil { + if err = s.initializeDAIncludedHeight(ctx); err != nil { return err } diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 946fba0ab6..f74beddf96 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -289,7 +289,6 @@ func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) { cm.SetHeaderDAIncluded(h2.Hash().String(), 101, 2) cm.SetDataDAIncluded(d2.DACommitment().String(), 101, 2) - s.ctx, s.cancel = ctx, cancel require.NoError(t, s.initializeDAIncludedHeight(ctx)) require.Equal(t, uint64(0), s.GetDAIncludedHeight()) @@ -518,7 +517,6 @@ func TestSubmitter_CacheClearedOnHeightInclusion(t *testing.T) { cm.SetHeaderDAIncluded(h2.Hash().String(), 101, 2) cm.SetDataDAIncluded(d2.DACommitment().String(), 101, 2) - s.ctx, s.cancel = ctx, cancel require.NoError(t, s.initializeDAIncludedHeight(ctx)) require.Equal(t, uint64(0), s.GetDAIncludedHeight()) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 802c1b243d..4d2cbb4afe 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -162,7 +162,11 @@ func (s *Syncer) SetBlockSyncer(bs BlockSyncer) { } // Start begins the syncing component +// The component should not be started after being stopped. func (s *Syncer) Start(ctx context.Context) (err error) { + if s.cancel != nil { + return errors.New("syncer already started") + } ctx, cancel := context.WithCancel(ctx) s.ctx, s.cancel = ctx, cancel @@ -275,7 +279,6 @@ func (s *Syncer) Stop(ctx context.Context) error { s.logger.Info().Msg("syncer stopped") close(s.heightInCh) - s.cancel = nil return nil } diff --git a/docs/adr/adr-021-lazy-aggregation.md b/docs/adr/adr-021-lazy-aggregation.md index 37ece2c65c..988116db70 100644 --- a/docs/adr/adr-021-lazy-aggregation.md +++ b/docs/adr/adr-021-lazy-aggregation.md @@ -20,144 +20,98 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai 1. **Modified Batch Retrieval**: - The batch retrieval mechanism has been modified to handle empty batches differently. Instead of discarding empty batches, we now return them with the ErrNoBatch error, allowing the caller to create empty blocks with proper timestamps. This ensures that block timing remains consistent even during periods of inactivity. - - ```go - func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { - res, err := m.sequencer.GetNextBatch(ctx, req) - if err != nil { - return nil, err - } - - if res != nil && res.Batch != nil { - m.logger.Debug("Retrieved batch", - "txCount", len(res.Batch.Transactions), - "timestamp", res.Timestamp) - - var errRetrieveBatch error - // Even if there are no transactions, return the batch with timestamp - // This allows empty blocks to maintain proper timing - if len(res.Batch.Transactions) == 0 { - errRetrieveBatch = ErrNoBatch - } - // Even if there are no transactions, update lastBatchData so we don't - // repeatedly emit the same empty batch, and persist it to metadata. - if err := m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)); err != nil { - m.logger.Error("error while setting last batch hash", "error", err) - } - m.lastBatchData = res.BatchData - return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, errRetrieveBatch - } - return nil, ErrNoBatch - } - ``` + The batch retrieval mechanism has been modified to handle empty batches differently. Instead of discarding empty batches, we now return them with the ErrNoBatch error, allowing the caller to create empty blocks with proper timestamps. This ensures that block timing remains consistent even during periods of inactivity. + + ```go + func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { + res, err := m.sequencer.GetNextBatch(ctx, req) + if err != nil { + return nil, err + } + + if res != nil && res.Batch != nil { + m.logger.Debug("Retrieved batch", + "txCount", len(res.Batch.Transactions), + "timestamp", res.Timestamp) + + var errRetrieveBatch error + // Even if there are no transactions, return the batch with timestamp + // This allows empty blocks to maintain proper timing + if len(res.Batch.Transactions) == 0 { + errRetrieveBatch = ErrNoBatch + } + // Even if there are no transactions, update lastBatchData so we don't + // repeatedly emit the same empty batch, and persist it to metadata. + if err := m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)); err != nil { + m.logger.Error("error while setting last batch hash", "error", err) + } + m.lastBatchData = res.BatchData + return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, errRetrieveBatch + } + return nil, ErrNoBatch + } + ``` 2. **Empty Block Creation**: - The block publishing logic has been enhanced to create empty blocks when a batch with no transactions is received. This uses the special `dataHashForEmptyTxs` value to indicate an empty batch, maintaining the block height consistency with the DA layer while minimizing overhead. - - ```go - // In publishBlock method - batchData, err := m.retrieveBatch(ctx) - if err != nil { - if errors.Is(err, ErrNoBatch) { - if batchData == nil { - m.logger.Info("No batch retrieved from sequencer, skipping block production") - return nil - } - m.logger.Info("Creating empty block, height: ", newHeight) - } else { - return fmt.Errorf("failed to get transactions from batch: %w", err) - } - } else { - if batchData.Before(lastHeaderTime) { - return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) - } - m.logger.Info("Creating and publishing block, height: ", newHeight) - m.logger.Debug("block info", "num_tx", len(batchData.Batch.Transactions)) - } - - header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) - if err != nil { - return err - } - - if err = m.store.SaveBlockData(ctx, header, data, &signature); err != nil { - return SaveBlockError{err} - } - ``` + The block publishing logic has been enhanced to create empty blocks when a batch with no transactions is received. This uses the special `dataHashForEmptyTxs` value to indicate an empty batch, maintaining the block height consistency with the DA layer while minimizing overhead. + + ```go + // In publishBlock method + batchData, err := m.retrieveBatch(ctx) + if err != nil { + if errors.Is(err, ErrNoBatch) { + if batchData == nil { + m.logger.Info("No batch retrieved from sequencer, skipping block production") + return nil + } + m.logger.Info("Creating empty block, height: ", newHeight) + } else { + return fmt.Errorf("failed to get transactions from batch: %w", err) + } + } else { + if batchData.Before(lastHeaderTime) { + return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) + } + m.logger.Info("Creating and publishing block, height: ", newHeight) + m.logger.Debug("block info", "num_tx", len(batchData.Batch.Transactions)) + } + + header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) + if err != nil { + return err + } + + if err = m.store.SaveBlockData(ctx, header, data, &signature); err != nil { + return SaveBlockError{err} + } + ``` 3. **Lazy Aggregation Loop**: - A dedicated lazy aggregation loop has been implemented with dual timer mechanisms. The `lazyTimer` ensures blocks are produced at regular intervals even during network inactivity, while the `blockTimer` handles normal block production when transactions are available. Transaction notifications from the `Reaper` to the `Manager` are now handled via the `txNotifyCh` channel: when the `Reaper` detects new transactions, it calls `Manager.NotifyNewTransactions()`, which performs a non-blocking signal on this channel. See the tests in `block/lazy_aggregation_test.go` for verification of this behavior. - - ```go - // In Reaper.SubmitTxs - if r.manager != nil && len(newTxs) > 0 { - r.logger.Debug("Notifying manager of new transactions") - r.manager.NotifyNewTransactions() // Signals txNotifyCh - } - - // In Manager.NotifyNewTransactions - func (m *Manager) NotifyNewTransactions() { - select { - case m.txNotifyCh <- struct{}{}: - // Successfully sent notification - default: - // Channel buffer is full, notification already pending - } - } - // Modified lazyAggregationLoop - func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Timer) { - // lazyTimer triggers block publication even during inactivity - lazyTimer := time.NewTimer(0) - defer lazyTimer.Stop() - - for { - select { - case <-ctx.Done(): - return - - case <-lazyTimer.C: - m.logger.Debug("Lazy timer triggered block production") - m.produceBlock(ctx, "lazy_timer", lazyTimer, blockTimer) - - case <-blockTimer.C: - if m.txsAvailable { - m.produceBlock(ctx, "block_timer", lazyTimer, blockTimer) - m.txsAvailable = false - } else { - // Ensure we keep ticking even when there are no txs - blockTimer.Reset(m.config.Node.BlockTime.Duration) - } - case <-m.txNotifyCh: - m.txsAvailable = true - } - } - } - ``` + A dedicated lazy aggregation loop has been implemented with dual timer mechanisms. The `lazyTimer` ensures blocks are produced at regular intervals even during network inactivity, while the `blockTimer` handles normal block production when transactions are available. Transaction notifications from the `Reaper` to the `Manager` are now handled via the `txNotifyCh` channel: when the `Reaper` detects new transactions, it calls `Manager.NotifyNewTransactions()`, which performs a non-blocking signal on this channel. See the tests in `block/lazy_aggregation_test.go` for verification of this behavior. 4. **Block Production**: - The block production function centralizes the logic for publishing blocks and resetting timers. It records the start time, attempts to publish a block, and then intelligently resets both timers based on the elapsed time. This ensures that block production remains on schedule even if the block creation process takes significant time. - - ```go - func (m *Manager) produceBlock(ctx context.Context, trigger string, lazyTimer, blockTimer *time.Timer) { - // Record the start time - start := time.Now() - - // Attempt to publish the block - if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { - m.logger.Error("error while publishing block", "trigger", trigger, "error", err) - } else { - m.logger.Debug("Successfully published block", "trigger", trigger) - } - - // Reset both timers for the next aggregation window - lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) - blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) - } - ``` + The block production function centralizes the logic for publishing blocks and resetting timers. It records the start time, attempts to publish a block, and then intelligently resets both timers based on the elapsed time. This ensures that block production remains on schedule even if the block creation process takes significant time. + + ```go + func (m *Manager) produceBlock(ctx context.Context, trigger string, lazyTimer, blockTimer *time.Timer) { + // Record the start time + start := time.Now() + + // Attempt to publish the block + if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { + m.logger.Error("error while publishing block", "trigger", trigger, "error", err) + } else { + m.logger.Debug("Successfully published block", "trigger", trigger) + } + + // Reset both timers for the next aggregation window + lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) + blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) + } + ``` ### Key Changes