From 066544ce4197e31d4f4981844d06f4177097e650 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Apr 2026 14:08:38 +0200 Subject: [PATCH 01/14] refactor: reaper to drain mempool --- block/components.go | 2 +- block/internal/cache/manager.go | 9 + block/internal/reaping/reaper.go | 206 +++++++++--------- block/internal/reaping/reaper_test.go | 291 ++++++++++++++------------ docs/adr/adr-021-lazy-aggregation.md | 1 - 5 files changed, 275 insertions(+), 234 deletions(-) diff --git a/block/components.go b/block/components.go index 71b6f60523..ac5f782cdd 100644 --- a/block/components.go +++ b/block/components.go @@ -278,9 +278,9 @@ func newAggregatorComponents( sequencer, genesis, logger, - executor, cacheManager, config.Node.ScrapeInterval.Duration, + executor.NotifyNewTransactions, ) if err != nil { return nil, fmt.Errorf("failed to create reaper: %w", err) diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 6f1b9d9cf3..e021c5ca44 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -53,6 +53,7 @@ type CacheManager interface { // Transaction operations IsTxSeen(hash string) bool SetTxSeen(hash string) + SetTxsSeen(hashes []string) CleanupOldTxs(olderThan time.Duration) int // Pending events syncing coordination @@ -210,6 +211,14 @@ func (m *implementation) SetTxSeen(hash string) { m.txTimestamps.Store(hash, time.Now()) } +func (m *implementation) SetTxsSeen(hashes []string) { + now := time.Now() + for _, hash := range hashes { + m.txCache.setSeen(hash, 0) + m.txTimestamps.Store(hash, now) + } +} + // CleanupOldTxs removes transaction hashes older than olderThan and returns // the count removed. Defaults to DefaultTxCacheRetention if olderThan <= 0. func (m *implementation) CleanupOldTxs(olderThan time.Duration) int { diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index 67b2020216..436727fdf2 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -12,7 +12,6 @@ import ( "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/cache" - "github.com/evstack/ev-node/block/internal/executing" coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/genesis" @@ -21,40 +20,35 @@ import ( const ( // MaxBackoffInterval is the maximum backoff interval for retries MaxBackoffInterval = 30 * time.Second + CleanupInterval = 1 * time.Hour ) // Reaper is responsible for periodically retrieving transactions from the executor, // filtering out already seen transactions, and submitting new transactions to the sequencer. type Reaper struct { - exec coreexecutor.Executor - sequencer coresequencer.Sequencer - chainID string - interval time.Duration - cache cache.CacheManager - executor *executing.Executor - - // shared components + exec coreexecutor.Executor + sequencer coresequencer.Sequencer + chainID string + interval time.Duration + cache cache.CacheManager + onTxsSubmitted func() + logger zerolog.Logger - // Lifecycle ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } -// NewReaper creates a new Reaper instance. func NewReaper( exec coreexecutor.Executor, sequencer coresequencer.Sequencer, genesis genesis.Genesis, logger zerolog.Logger, - executor *executing.Executor, cache cache.CacheManager, scrapeInterval time.Duration, + onTxsSubmitted func(), ) (*Reaper, error) { - if executor == nil { - return nil, errors.New("executor cannot be nil") - } if cache == nil { return nil, errors.New("cache cannot be nil") } @@ -63,13 +57,13 @@ func NewReaper( } return &Reaper{ - exec: exec, - sequencer: sequencer, - chainID: genesis.ChainID, - interval: scrapeInterval, - logger: logger.With().Str("component", "reaper").Logger(), - cache: cache, - executor: executor, + exec: exec, + sequencer: sequencer, + chainID: genesis.ChainID, + interval: scrapeInterval, + logger: logger.With().Str("component", "reaper").Logger(), + cache: cache, + onTxsSubmitted: onTxsSubmitted, }, nil } @@ -80,54 +74,56 @@ func (r *Reaper) Start(ctx context.Context) error { // Start reaper loop r.wg.Go(r.reaperLoop) - r.logger.Info().Dur("interval", r.interval).Msg("reaper started") + r.logger.Info().Dur("idle_interval", r.interval).Msg("reaper started") return nil } func (r *Reaper) reaperLoop() { - ticker := time.NewTicker(r.interval) - defer ticker.Stop() - - cleanupTicker := time.NewTicker(1 * time.Hour) + cleanupTicker := time.NewTicker(CleanupInterval) defer cleanupTicker.Stop() consecutiveFailures := 0 for { - select { - case <-r.ctx.Done(): - return - case <-ticker.C: - err := r.SubmitTxs() - if err != nil { - // Increment failure counter and apply exponential backoff - consecutiveFailures++ - backoff := r.interval * time.Duration(1< 0 { - r.logger.Info().Msg("reaper recovered from errors, resetting backoff") - consecutiveFailures = 0 - ticker.Reset(r.interval) - } - } - case <-cleanupTicker.C: - // Clean up transaction hashes older than 24 hours - // This prevents unbounded growth of the transaction seen cache - removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention) - if removed > 0 { - r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes") - } + submitted, err := r.drainMempool() + + if err != nil { + consecutiveFailures++ + backoff := r.interval * time.Duration(1< 0 { + r.logger.Info().Msg("reaper recovered from errors") + consecutiveFailures = 0 + } + + if submitted { + continue + } + + r.wait(r.interval, cleanupTicker.C) + } +} + +func (r *Reaper) wait(d time.Duration, cleanupCh <-chan time.Time) { + timer := time.NewTimer(d) + defer timer.Stop() + select { + case <-r.ctx.Done(): + case <-cleanupCh: + removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention) + if removed > 0 { + r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes") } + case <-timer.C: } } @@ -137,60 +133,78 @@ func (r *Reaper) Stop() error { r.cancel() } r.wg.Wait() - r.logger.Info().Msg("reaper stopped") return nil } -// SubmitTxs retrieves transactions from the executor and submits them to the sequencer. -// Returns an error if any critical operation fails. -func (r *Reaper) SubmitTxs() error { - txs, err := r.exec.GetTxs(r.ctx) - if err != nil { - r.logger.Error().Err(err).Msg("failed to get txs from executor") - return fmt.Errorf("failed to get txs from executor: %w", err) +type pendingTx struct { + tx []byte + hash string +} + +func (r *Reaper) drainMempool() (bool, error) { + var totalSubmitted int + + for { + txs, err := r.exec.GetTxs(r.ctx) + if err != nil { + return totalSubmitted > 0, fmt.Errorf("failed to get txs from executor: %w", err) + } + if len(txs) == 0 { + break + } + + filtered := r.filterNewTxs(txs) + if len(filtered) == 0 { + continue + } + + n, err := r.submitFiltered(filtered) + if err != nil { + return totalSubmitted > 0, err + } + totalSubmitted += n } - if len(txs) == 0 { - r.logger.Debug().Msg("no new txs") - return nil + + if totalSubmitted > 0 { + r.logger.Debug().Int("total_txs", totalSubmitted).Msg("drained mempool") + if r.onTxsSubmitted != nil { + r.onTxsSubmitted() + } } - var newTxs [][]byte + return totalSubmitted > 0, nil +} + +func (r *Reaper) filterNewTxs(txs [][]byte) []pendingTx { + pending := make([]pendingTx, 0, len(txs)) for _, tx := range txs { - txHash := hashTx(tx) - if !r.cache.IsTxSeen(txHash) { - newTxs = append(newTxs, tx) + h := hashTx(tx) + if !r.cache.IsTxSeen(h) { + pending = append(pending, pendingTx{tx: tx, hash: h}) } } + return pending +} - if len(newTxs) == 0 { - r.logger.Debug().Msg("no new txs to submit") - return nil +func (r *Reaper) submitFiltered(batch []pendingTx) (int, error) { + txs := make([][]byte, len(batch)) + hashes := make([]string, len(batch)) + for i, p := range batch { + txs[i] = p.tx + hashes[i] = p.hash } - r.logger.Debug().Int("txCount", len(newTxs)).Msg("submitting txs to sequencer") - - _, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ + _, err := r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ Id: []byte(r.chainID), - Batch: &coresequencer.Batch{Transactions: newTxs}, + Batch: &coresequencer.Batch{Transactions: txs}, }) if err != nil { - return fmt.Errorf("failed to submit txs to sequencer: %w", err) - } - - for _, tx := range newTxs { - txHash := hashTx(tx) - r.cache.SetTxSeen(txHash) - } - - // Notify the executor that new transactions are available - if len(newTxs) > 0 { - r.logger.Debug().Msg("notifying executor of new transactions") - r.executor.NotifyNewTransactions() + return 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) } - r.logger.Debug().Msg("successfully submitted txs") - return nil + r.cache.SetTxsSeen(hashes) + return len(txs), nil } func hashTx(tx []byte) string { diff --git a/block/internal/reaping/reaper_test.go b/block/internal/reaping/reaper_test.go index 5700882e8a..cc8d71a748 100644 --- a/block/internal/reaping/reaper_test.go +++ b/block/internal/reaping/reaper_test.go @@ -2,214 +2,233 @@ package reaping import ( "context" - crand "crypto/rand" + "sync/atomic" "testing" "time" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/evstack/ev-node/block/internal/cache" - "github.com/evstack/ev-node/block/internal/common" - "github.com/evstack/ev-node/block/internal/executing" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/signer/noop" "github.com/evstack/ev-node/pkg/store" testmocks "github.com/evstack/ev-node/test/mocks" ) -// helper to create a minimal executor to capture notifications -func newTestExecutor(t *testing.T) *executing.Executor { +func newTestCache(t *testing.T) cache.CacheManager { t.Helper() - - // signer is required by NewExecutor - priv, _, err := crypto.GenerateEd25519Key(crand.Reader) - require.NoError(t, err) - s, err := noop.NewNoopSigner(priv) - require.NoError(t, err) - - // Get the signer's address to use as proposer - signerAddr, err := s.GetAddress() - require.NoError(t, err) - - exec, err := executing.NewExecutor( - nil, // store (unused) - nil, // core executor (unused) - nil, // sequencer (unused) - s, // signer (required) - nil, // cache (unused) - nil, // metrics (unused) - config.DefaultConfig(), - genesis.Genesis{ // minimal genesis - ChainID: "test-chain", - InitialHeight: 1, - StartTime: time.Now(), - ProposerAddress: signerAddr, - }, - nil, // header broadcaster - nil, // data broadcaster - zerolog.Nop(), - common.DefaultBlockOptions(), - make(chan error, 1), // error channel - nil, - ) + cfg := config.Config{RootDir: t.TempDir()} + memDS := dssync.MutexWrap(ds.NewMapDatastore()) + st := store.New(memDS) + cm, err := cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) + return cm +} - return exec +type testEnv struct { + execMock *testmocks.MockExecutor + seqMock *testmocks.MockSequencer + cache cache.CacheManager + reaper *Reaper + notified atomic.Bool } -// helper to create a cache manager for tests -func newTestCache(t *testing.T) cache.CacheManager { +func newTestEnv(t *testing.T) *testEnv { t.Helper() + mockExec := testmocks.NewMockExecutor(t) + mockSeq := testmocks.NewMockSequencer(t) + cm := newTestCache(t) - cfg := config.Config{ - RootDir: t.TempDir(), + env := &testEnv{ + execMock: mockExec, + seqMock: mockSeq, + cache: cm, } - // Create an in-memory store for the cache - memDS := dssync.MutexWrap(ds.NewMapDatastore()) - st := store.New(memDS) - - cacheManager, err := cache.NewManager(cfg, st, zerolog.Nop()) + r, err := NewReaper( + mockExec, mockSeq, + genesis.Genesis{ChainID: "test-chain"}, + zerolog.Nop(), cm, + 100*time.Millisecond, + env.notify, + ) require.NoError(t, err) + env.reaper = r - return cacheManager + return env } -// reaper with mocks and cache manager -func newTestReaper(t *testing.T, chainID string, execMock *testmocks.MockExecutor, seqMock *testmocks.MockSequencer, e *executing.Executor, cm cache.CacheManager) *Reaper { - t.Helper() - - r, err := NewReaper(execMock, seqMock, genesis.Genesis{ChainID: chainID}, zerolog.Nop(), e, cm, 100*time.Millisecond) - require.NoError(t, err) +func (e *testEnv) notify() { + e.notified.Store(true) +} - return r +func (e *testEnv) wasNotified() bool { + return e.notified.Load() } -func TestReaper_SubmitTxs_NewTxs_SubmitsAndPersistsAndNotifies(t *testing.T) { - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) +func TestReaper_NewTxs_SubmitsAndPersistsAndNotifies(t *testing.T) { + env := newTestEnv(t) - // Two new transactions tx1 := []byte("tx1") tx2 := []byte("tx2") - mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() - // Expect a single SubmitBatchTxs with both txs - mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() + + env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). RunAndReturn(func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { - require.Equal(t, []byte("chain-A"), req.Id) - require.NotNil(t, req.Batch) assert.Equal(t, [][]byte{tx1, tx2}, req.Batch.Transactions) return &coresequencer.SubmitBatchTxsResponse{}, nil }).Once() - // Minimal executor to capture NotifyNewTransactions - e := newTestExecutor(t) - cm := newTestCache(t) - - r := newTestReaper(t, "chain-A", mockExec, mockSeq, e, cm) - - assert.NoError(t, r.SubmitTxs()) - - // Verify transactions are marked as seen in cache - assert.True(t, cm.IsTxSeen(hashTx(tx1))) - assert.True(t, cm.IsTxSeen(hashTx(tx2))) - - // Executor notified - check using test helper - if !e.HasPendingTxNotification() { - t.Fatal("expected NotifyNewTransactions to signal txNotifyCh") - } + submitted, err := env.reaper.drainMempool() + assert.NoError(t, err) + assert.True(t, submitted) + assert.True(t, env.cache.IsTxSeen(hashTx(tx1))) + assert.True(t, env.cache.IsTxSeen(hashTx(tx2))) + assert.True(t, env.wasNotified()) } -func TestReaper_SubmitTxs_AllSeen_NoSubmit(t *testing.T) { - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) +func TestReaper_AllSeen_NoSubmit(t *testing.T) { + env := newTestEnv(t) tx1 := []byte("tx1") tx2 := []byte("tx2") - // Pre-populate cache with seen transactions - e := newTestExecutor(t) - cm := newTestCache(t) - cm.SetTxSeen(hashTx(tx1)) - cm.SetTxSeen(hashTx(tx2)) - - r := newTestReaper(t, "chain-B", mockExec, mockSeq, e, cm) - - mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() - // No SubmitBatchTxs expected + env.cache.SetTxSeen(hashTx(tx1)) + env.cache.SetTxSeen(hashTx(tx2)) - assert.NoError(t, r.SubmitTxs()) + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() - // Ensure no notification occurred - if e.HasPendingTxNotification() { - t.Fatal("did not expect notification when all txs are seen") - } + submitted, err := env.reaper.drainMempool() + assert.NoError(t, err) + assert.False(t, submitted) + assert.False(t, env.wasNotified()) } -func TestReaper_SubmitTxs_PartialSeen_FiltersAndPersists(t *testing.T) { - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) +func TestReaper_PartialSeen_FiltersAndPersists(t *testing.T) { + env := newTestEnv(t) txOld := []byte("old") txNew := []byte("new") - e := newTestExecutor(t) - cm := newTestCache(t) - - // Mark txOld as seen - cm.SetTxSeen(hashTx(txOld)) + env.cache.SetTxSeen(hashTx(txOld)) - r := newTestReaper(t, "chain-C", mockExec, mockSeq, e, cm) + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{txOld, txNew}, nil).Once() + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() - mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{txOld, txNew}, nil).Once() - mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). + env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). RunAndReturn(func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { - // Should only include txNew assert.Equal(t, [][]byte{txNew}, req.Batch.Transactions) return &coresequencer.SubmitBatchTxsResponse{}, nil }).Once() - assert.NoError(t, r.SubmitTxs()) - - // Both should be seen after successful submit - assert.True(t, cm.IsTxSeen(hashTx(txOld))) - assert.True(t, cm.IsTxSeen(hashTx(txNew))) - - // Notification should occur since a new tx was submitted - if !e.HasPendingTxNotification() { - t.Fatal("expected notification when new tx submitted") - } + submitted, err := env.reaper.drainMempool() + assert.NoError(t, err) + assert.True(t, submitted) + assert.True(t, env.cache.IsTxSeen(hashTx(txOld))) + assert.True(t, env.cache.IsTxSeen(hashTx(txNew))) + assert.True(t, env.wasNotified()) } -func TestReaper_SubmitTxs_SequencerError_NoPersistence_NoNotify(t *testing.T) { - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) +func TestReaper_SequencerError_NoPersistence_NoNotify(t *testing.T) { + env := newTestEnv(t) tx := []byte("oops") - mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx}, nil).Once() - mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). + + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx}, nil).Once() + + env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). Return((*coresequencer.SubmitBatchTxsResponse)(nil), assert.AnError).Once() - e := newTestExecutor(t) + _, err := env.reaper.drainMempool() + assert.Error(t, err) + assert.False(t, env.cache.IsTxSeen(hashTx(tx))) + assert.False(t, env.wasNotified()) +} + +func TestReaper_DrainsMempoolInMultipleRounds(t *testing.T) { + env := newTestEnv(t) + + tx1 := []byte("tx1") + tx2 := []byte("tx2") + tx3 := []byte("tx3") + + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx3}, nil).Once() + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() + + env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). + RunAndReturn(func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + return &coresequencer.SubmitBatchTxsResponse{}, nil + }).Twice() + + submitted, err := env.reaper.drainMempool() + assert.NoError(t, err) + assert.True(t, submitted) + assert.True(t, env.cache.IsTxSeen(hashTx(tx1))) + assert.True(t, env.cache.IsTxSeen(hashTx(tx2))) + assert.True(t, env.cache.IsTxSeen(hashTx(tx3))) + assert.True(t, env.wasNotified()) +} + +func TestReaper_EmptyMempool_NoAction(t *testing.T) { + env := newTestEnv(t) + + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() + + submitted, err := env.reaper.drainMempool() + assert.NoError(t, err) + assert.False(t, submitted) + assert.False(t, env.wasNotified()) +} + +func TestReaper_HashComputedOnce(t *testing.T) { + env := newTestEnv(t) + + tx := []byte("unique-tx") + expectedHash := hashTx(tx) + + env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx}, nil).Once() + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() + + env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). + Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once() + + submitted, err := env.reaper.drainMempool() + assert.NoError(t, err) + assert.True(t, submitted) + assert.True(t, env.cache.IsTxSeen(expectedHash)) +} + +func TestReaper_NilCallback_NoPanic(t *testing.T) { + mockExec := testmocks.NewMockExecutor(t) + mockSeq := testmocks.NewMockSequencer(t) cm := newTestCache(t) - r := newTestReaper(t, "chain-D", mockExec, mockSeq, e, cm) - assert.Error(t, r.SubmitTxs()) + r, err := NewReaper( + mockExec, mockSeq, + genesis.Genesis{ChainID: "test-chain"}, + zerolog.Nop(), cm, + 100*time.Millisecond, + nil, + ) + require.NoError(t, err) - // Should not be marked seen - assert.False(t, cm.IsTxSeen(hashTx(tx))) + tx := []byte("tx") + mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx}, nil).Once() + mockExec.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() + mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). + Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once() - // Should not notify - if e.HasPendingTxNotification() { - t.Fatal("did not expect notification on sequencer error") - } + submitted, err := r.drainMempool() + assert.NoError(t, err) + assert.True(t, submitted) } diff --git a/docs/adr/adr-021-lazy-aggregation.md b/docs/adr/adr-021-lazy-aggregation.md index 37ece2c65c..152c9fa7c4 100644 --- a/docs/adr/adr-021-lazy-aggregation.md +++ b/docs/adr/adr-021-lazy-aggregation.md @@ -92,7 +92,6 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai A dedicated lazy aggregation loop has been implemented with dual timer mechanisms. The `lazyTimer` ensures blocks are produced at regular intervals even during network inactivity, while the `blockTimer` handles normal block production when transactions are available. Transaction notifications from the `Reaper` to the `Manager` are now handled via the `txNotifyCh` channel: when the `Reaper` detects new transactions, it calls `Manager.NotifyNewTransactions()`, which performs a non-blocking signal on this channel. See the tests in `block/lazy_aggregation_test.go` for verification of this behavior. ```go - // In Reaper.SubmitTxs if r.manager != nil && len(newTxs) > 0 { r.logger.Debug("Notifying manager of new transactions") r.manager.NotifyNewTransactions() // Signals txNotifyCh From bc016bda2555e058331ae40a560224f8258d6d60 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Apr 2026 14:24:41 +0200 Subject: [PATCH 02/14] feedback --- block/internal/reaping/reaper.go | 17 +++++++++++++---- block/internal/reaping/reaper_test.go | 20 +++++++++++++++++++- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index 436727fdf2..f9e1690394 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -96,7 +96,9 @@ func (r *Reaper) reaperLoop() { Int("consecutive_failures", consecutiveFailures). Dur("backoff", backoff). Msg("reaper error, backing off") - r.wait(backoff, cleanupTicker.C) + if r.wait(backoff, nil) { + return + } continue } @@ -109,21 +111,28 @@ func (r *Reaper) reaperLoop() { continue } - r.wait(r.interval, cleanupTicker.C) + if r.wait(r.interval, cleanupTicker.C) { + return + } } } -func (r *Reaper) wait(d time.Duration, cleanupCh <-chan time.Time) { +// wait blocks for the given duration. Returns true if the context was cancelled. +// When cleanupCh is non-nil, processes cache cleanup if that channel fires first. +func (r *Reaper) wait(d time.Duration, cleanupCh <-chan time.Time) bool { timer := time.NewTimer(d) defer timer.Stop() select { case <-r.ctx.Done(): + return true case <-cleanupCh: removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention) if removed > 0 { r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes") } + return false case <-timer.C: + return false } } @@ -156,7 +165,7 @@ func (r *Reaper) drainMempool() (bool, error) { filtered := r.filterNewTxs(txs) if len(filtered) == 0 { - continue + break } n, err := r.submitFiltered(filtered) diff --git a/block/internal/reaping/reaper_test.go b/block/internal/reaping/reaper_test.go index cc8d71a748..2cff6bdfa3 100644 --- a/block/internal/reaping/reaper_test.go +++ b/block/internal/reaping/reaper_test.go @@ -105,7 +105,6 @@ func TestReaper_AllSeen_NoSubmit(t *testing.T) { env.cache.SetTxSeen(hashTx(tx2)) env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() - env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() submitted, err := env.reaper.drainMempool() assert.NoError(t, err) @@ -232,3 +231,22 @@ func TestReaper_NilCallback_NoPanic(t *testing.T) { assert.NoError(t, err) assert.True(t, submitted) } + +func TestReaper_StopTerminates(t *testing.T) { + env := newTestEnv(t) + env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Maybe() + + require.NoError(t, env.reaper.Start(context.Background())) + + done := make(chan struct{}) + go func() { + env.reaper.Stop() + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Stop() did not return in time") + } +} From 6815a246bf9a14a98d7b4127a0081a0c14390093 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Apr 2026 15:07:23 +0200 Subject: [PATCH 03/14] fix partial drain --- block/internal/reaping/reaper.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index f9e1690394..a0fb642874 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -170,6 +170,10 @@ func (r *Reaper) drainMempool() (bool, error) { n, err := r.submitFiltered(filtered) if err != nil { + // partial drain, still submit + if totalSubmitted > 0 && r.onTxsSubmitted != nil { + r.onTxsSubmitted() + } return totalSubmitted > 0, err } totalSubmitted += n From 0940b1b20dd58894c4bab6a6e7037988b6715971 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Apr 2026 15:14:05 +0200 Subject: [PATCH 04/14] cleanup old readme --- docs/adr/adr-021-lazy-aggregation.md | 211 +++++++++++---------------- 1 file changed, 83 insertions(+), 128 deletions(-) diff --git a/docs/adr/adr-021-lazy-aggregation.md b/docs/adr/adr-021-lazy-aggregation.md index 152c9fa7c4..988116db70 100644 --- a/docs/adr/adr-021-lazy-aggregation.md +++ b/docs/adr/adr-021-lazy-aggregation.md @@ -20,143 +20,98 @@ Leverage the existing empty batch mechanism and `dataHashForEmptyTxs` to maintai 1. **Modified Batch Retrieval**: - The batch retrieval mechanism has been modified to handle empty batches differently. Instead of discarding empty batches, we now return them with the ErrNoBatch error, allowing the caller to create empty blocks with proper timestamps. This ensures that block timing remains consistent even during periods of inactivity. - - ```go - func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { - res, err := m.sequencer.GetNextBatch(ctx, req) - if err != nil { - return nil, err - } - - if res != nil && res.Batch != nil { - m.logger.Debug("Retrieved batch", - "txCount", len(res.Batch.Transactions), - "timestamp", res.Timestamp) - - var errRetrieveBatch error - // Even if there are no transactions, return the batch with timestamp - // This allows empty blocks to maintain proper timing - if len(res.Batch.Transactions) == 0 { - errRetrieveBatch = ErrNoBatch - } - // Even if there are no transactions, update lastBatchData so we don't - // repeatedly emit the same empty batch, and persist it to metadata. - if err := m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)); err != nil { - m.logger.Error("error while setting last batch hash", "error", err) - } - m.lastBatchData = res.BatchData - return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, errRetrieveBatch - } - return nil, ErrNoBatch - } - ``` + The batch retrieval mechanism has been modified to handle empty batches differently. Instead of discarding empty batches, we now return them with the ErrNoBatch error, allowing the caller to create empty blocks with proper timestamps. This ensures that block timing remains consistent even during periods of inactivity. + + ```go + func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) { + res, err := m.sequencer.GetNextBatch(ctx, req) + if err != nil { + return nil, err + } + + if res != nil && res.Batch != nil { + m.logger.Debug("Retrieved batch", + "txCount", len(res.Batch.Transactions), + "timestamp", res.Timestamp) + + var errRetrieveBatch error + // Even if there are no transactions, return the batch with timestamp + // This allows empty blocks to maintain proper timing + if len(res.Batch.Transactions) == 0 { + errRetrieveBatch = ErrNoBatch + } + // Even if there are no transactions, update lastBatchData so we don't + // repeatedly emit the same empty batch, and persist it to metadata. + if err := m.store.SetMetadata(ctx, LastBatchDataKey, convertBatchDataToBytes(res.BatchData)); err != nil { + m.logger.Error("error while setting last batch hash", "error", err) + } + m.lastBatchData = res.BatchData + return &BatchData{Batch: res.Batch, Time: res.Timestamp, Data: res.BatchData}, errRetrieveBatch + } + return nil, ErrNoBatch + } + ``` 2. **Empty Block Creation**: - The block publishing logic has been enhanced to create empty blocks when a batch with no transactions is received. This uses the special `dataHashForEmptyTxs` value to indicate an empty batch, maintaining the block height consistency with the DA layer while minimizing overhead. - - ```go - // In publishBlock method - batchData, err := m.retrieveBatch(ctx) - if err != nil { - if errors.Is(err, ErrNoBatch) { - if batchData == nil { - m.logger.Info("No batch retrieved from sequencer, skipping block production") - return nil - } - m.logger.Info("Creating empty block, height: ", newHeight) - } else { - return fmt.Errorf("failed to get transactions from batch: %w", err) - } - } else { - if batchData.Before(lastHeaderTime) { - return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) - } - m.logger.Info("Creating and publishing block, height: ", newHeight) - m.logger.Debug("block info", "num_tx", len(batchData.Batch.Transactions)) - } - - header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) - if err != nil { - return err - } - - if err = m.store.SaveBlockData(ctx, header, data, &signature); err != nil { - return SaveBlockError{err} - } - ``` + The block publishing logic has been enhanced to create empty blocks when a batch with no transactions is received. This uses the special `dataHashForEmptyTxs` value to indicate an empty batch, maintaining the block height consistency with the DA layer while minimizing overhead. + + ```go + // In publishBlock method + batchData, err := m.retrieveBatch(ctx) + if err != nil { + if errors.Is(err, ErrNoBatch) { + if batchData == nil { + m.logger.Info("No batch retrieved from sequencer, skipping block production") + return nil + } + m.logger.Info("Creating empty block, height: ", newHeight) + } else { + return fmt.Errorf("failed to get transactions from batch: %w", err) + } + } else { + if batchData.Before(lastHeaderTime) { + return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", batchData.Time, m.getLastBlockTime()) + } + m.logger.Info("Creating and publishing block, height: ", newHeight) + m.logger.Debug("block info", "num_tx", len(batchData.Batch.Transactions)) + } + + header, data, err = m.createBlock(ctx, newHeight, lastSignature, lastHeaderHash, batchData) + if err != nil { + return err + } + + if err = m.store.SaveBlockData(ctx, header, data, &signature); err != nil { + return SaveBlockError{err} + } + ``` 3. **Lazy Aggregation Loop**: - A dedicated lazy aggregation loop has been implemented with dual timer mechanisms. The `lazyTimer` ensures blocks are produced at regular intervals even during network inactivity, while the `blockTimer` handles normal block production when transactions are available. Transaction notifications from the `Reaper` to the `Manager` are now handled via the `txNotifyCh` channel: when the `Reaper` detects new transactions, it calls `Manager.NotifyNewTransactions()`, which performs a non-blocking signal on this channel. See the tests in `block/lazy_aggregation_test.go` for verification of this behavior. - - ```go - if r.manager != nil && len(newTxs) > 0 { - r.logger.Debug("Notifying manager of new transactions") - r.manager.NotifyNewTransactions() // Signals txNotifyCh - } - - // In Manager.NotifyNewTransactions - func (m *Manager) NotifyNewTransactions() { - select { - case m.txNotifyCh <- struct{}{}: - // Successfully sent notification - default: - // Channel buffer is full, notification already pending - } - } - // Modified lazyAggregationLoop - func (m *Manager) lazyAggregationLoop(ctx context.Context, blockTimer *time.Timer) { - // lazyTimer triggers block publication even during inactivity - lazyTimer := time.NewTimer(0) - defer lazyTimer.Stop() - - for { - select { - case <-ctx.Done(): - return - - case <-lazyTimer.C: - m.logger.Debug("Lazy timer triggered block production") - m.produceBlock(ctx, "lazy_timer", lazyTimer, blockTimer) - - case <-blockTimer.C: - if m.txsAvailable { - m.produceBlock(ctx, "block_timer", lazyTimer, blockTimer) - m.txsAvailable = false - } else { - // Ensure we keep ticking even when there are no txs - blockTimer.Reset(m.config.Node.BlockTime.Duration) - } - case <-m.txNotifyCh: - m.txsAvailable = true - } - } - } - ``` + A dedicated lazy aggregation loop has been implemented with dual timer mechanisms. The `lazyTimer` ensures blocks are produced at regular intervals even during network inactivity, while the `blockTimer` handles normal block production when transactions are available. Transaction notifications from the `Reaper` to the `Manager` are now handled via the `txNotifyCh` channel: when the `Reaper` detects new transactions, it calls `Manager.NotifyNewTransactions()`, which performs a non-blocking signal on this channel. See the tests in `block/lazy_aggregation_test.go` for verification of this behavior. 4. **Block Production**: - The block production function centralizes the logic for publishing blocks and resetting timers. It records the start time, attempts to publish a block, and then intelligently resets both timers based on the elapsed time. This ensures that block production remains on schedule even if the block creation process takes significant time. - - ```go - func (m *Manager) produceBlock(ctx context.Context, trigger string, lazyTimer, blockTimer *time.Timer) { - // Record the start time - start := time.Now() - - // Attempt to publish the block - if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { - m.logger.Error("error while publishing block", "trigger", trigger, "error", err) - } else { - m.logger.Debug("Successfully published block", "trigger", trigger) - } - - // Reset both timers for the next aggregation window - lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) - blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) - } - ``` + The block production function centralizes the logic for publishing blocks and resetting timers. It records the start time, attempts to publish a block, and then intelligently resets both timers based on the elapsed time. This ensures that block production remains on schedule even if the block creation process takes significant time. + + ```go + func (m *Manager) produceBlock(ctx context.Context, trigger string, lazyTimer, blockTimer *time.Timer) { + // Record the start time + start := time.Now() + + // Attempt to publish the block + if err := m.publishBlock(ctx); err != nil && ctx.Err() == nil { + m.logger.Error("error while publishing block", "trigger", trigger, "error", err) + } else { + m.logger.Debug("Successfully published block", "trigger", trigger) + } + + // Reset both timers for the next aggregation window + lazyTimer.Reset(getRemainingSleep(start, m.config.Node.LazyBlockInterval.Duration)) + blockTimer.Reset(getRemainingSleep(start, m.config.Node.BlockTime.Duration)) + } + ``` ### Key Changes From 28033e80921f0ca5518082afe4ca9ce5e1a0767e Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Apr 2026 15:33:15 +0200 Subject: [PATCH 05/14] Prevent multiple Start() calls across components --- block/internal/cache/manager.go | 7 ------- block/internal/executing/executor.go | 3 +++ block/internal/pruner/pruner.go | 3 +++ block/internal/reaping/reaper.go | 4 +++- block/internal/submitting/submitter.go | 3 +++ block/internal/syncing/syncer.go | 3 +++ 6 files changed, 15 insertions(+), 8 deletions(-) diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index e021c5ca44..a91b1410d7 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -246,13 +246,6 @@ func (m *implementation) CleanupOldTxs(olderThan time.Duration) int { return true }) - if removed > 0 { - m.logger.Debug(). - Int("removed", removed). - Dur("older_than", olderThan). - Msg("cleaned up old transaction hashes from cache") - } - return removed } diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 3ddc90211a..4cdf94a885 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -163,6 +163,9 @@ func (e *Executor) SetBlockProducer(bp BlockProducer) { // Start begins the execution component func (e *Executor) Start(ctx context.Context) error { + if e.cancel != nil { + return errors.New("executor already started") + } e.ctx, e.cancel = context.WithCancel(ctx) // Initialize state diff --git a/block/internal/pruner/pruner.go b/block/internal/pruner/pruner.go index 797f911d50..e56d9c2849 100644 --- a/block/internal/pruner/pruner.go +++ b/block/internal/pruner/pruner.go @@ -52,6 +52,9 @@ func New( // Start begins the pruning loop. func (p *Pruner) Start(ctx context.Context) error { + if p.cancel != nil { + return errors.New("pruner already started") + } if !p.cfg.IsPruningEnabled() { p.logger.Info().Msg("pruning is disabled, not starting pruner") return nil diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index a0fb642874..818772ed72 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -69,9 +69,11 @@ func NewReaper( // Start begins the execution component func (r *Reaper) Start(ctx context.Context) error { + if r.cancel != nil { + return errors.New("reaper already started") + } r.ctx, r.cancel = context.WithCancel(ctx) - // Start reaper loop r.wg.Go(r.reaperLoop) r.logger.Info().Dur("idle_interval", r.interval).Msg("reaper started") diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 34fab216de..73a05602fc 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -119,6 +119,9 @@ func NewSubmitter( // Start begins the submitting component func (s *Submitter) Start(ctx context.Context) error { + if s.cancel != nil { + return errors.New("submitter already started") + } s.ctx, s.cancel = context.WithCancel(ctx) // Initialize DA included height diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 802c1b243d..d9546d076c 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -163,6 +163,9 @@ func (s *Syncer) SetBlockSyncer(bs BlockSyncer) { // Start begins the syncing component func (s *Syncer) Start(ctx context.Context) (err error) { + if s.cancel != nil { + return errors.New("syncer already started") + } ctx, cancel := context.WithCancel(ctx) s.ctx, s.cancel = ctx, cancel From 2e58f04256f7f84944842ab8a5e0ede47c37a59f Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Apr 2026 16:14:03 +0200 Subject: [PATCH 06/14] fix unwanted log --- block/internal/reaping/reaper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index 818772ed72..ab9c2045bf 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -89,7 +89,7 @@ func (r *Reaper) reaperLoop() { for { submitted, err := r.drainMempool() - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { consecutiveFailures++ backoff := r.interval * time.Duration(1< Date: Thu, 9 Apr 2026 16:28:24 +0200 Subject: [PATCH 07/14] lock --- block/internal/submitting/submitter_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 946fba0ab6..f74beddf96 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -289,7 +289,6 @@ func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) { cm.SetHeaderDAIncluded(h2.Hash().String(), 101, 2) cm.SetDataDAIncluded(d2.DACommitment().String(), 101, 2) - s.ctx, s.cancel = ctx, cancel require.NoError(t, s.initializeDAIncludedHeight(ctx)) require.Equal(t, uint64(0), s.GetDAIncludedHeight()) @@ -518,7 +517,6 @@ func TestSubmitter_CacheClearedOnHeightInclusion(t *testing.T) { cm.SetHeaderDAIncluded(h2.Hash().String(), 101, 2) cm.SetDataDAIncluded(d2.DACommitment().String(), 101, 2) - s.ctx, s.cancel = ctx, cancel require.NoError(t, s.initializeDAIncludedHeight(ctx)) require.Equal(t, uint64(0), s.GetDAIncludedHeight()) From 6909b3099c30676864963d425bfc641843187274 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Apr 2026 16:50:26 +0200 Subject: [PATCH 08/14] updates --- block/internal/reaping/reaper.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index ab9c2045bf..7a38155bfe 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -155,6 +155,13 @@ type pendingTx struct { func (r *Reaper) drainMempool() (bool, error) { var totalSubmitted int + submitted := false + + defer func() { + if submitted && r.onTxsSubmitted != nil { + r.onTxsSubmitted() + } + }() for { txs, err := r.exec.GetTxs(r.ctx) @@ -172,20 +179,14 @@ func (r *Reaper) drainMempool() (bool, error) { n, err := r.submitFiltered(filtered) if err != nil { - // partial drain, still submit - if totalSubmitted > 0 && r.onTxsSubmitted != nil { - r.onTxsSubmitted() - } return totalSubmitted > 0, err } totalSubmitted += n + submitted = true } if totalSubmitted > 0 { r.logger.Debug().Int("total_txs", totalSubmitted).Msg("drained mempool") - if r.onTxsSubmitted != nil { - r.onTxsSubmitted() - } } return totalSubmitted > 0, nil From d28180e56927a99416c5d76106b924e88cc5c782 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Apr 2026 21:16:12 +0200 Subject: [PATCH 09/14] remove redundant --- block/internal/reaping/reaper.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index 7a38155bfe..0a1e38d8ff 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -155,10 +155,9 @@ type pendingTx struct { func (r *Reaper) drainMempool() (bool, error) { var totalSubmitted int - submitted := false defer func() { - if submitted && r.onTxsSubmitted != nil { + if totalSubmitted > 0 && r.onTxsSubmitted != nil { r.onTxsSubmitted() } }() @@ -182,7 +181,6 @@ func (r *Reaper) drainMempool() (bool, error) { return totalSubmitted > 0, err } totalSubmitted += n - submitted = true } if totalSubmitted > 0 { From 70c8b897754ee181fe7d29bfdcfed26d905e3afb Mon Sep 17 00:00:00 2001 From: julienrbrt Date: Thu, 9 Apr 2026 21:21:06 +0200 Subject: [PATCH 10/14] changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e62149b481..5eaa327134 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changes + +- Improve reaper to sustain txs burst better [#3236](https://github.com/evstack/ev-node/pull/3236) + ## v1.1.0 No changes from v1.1.0-rc.2. From 6c443814afea00d15d1cb3c018fdb01242f07e2c Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 9 Apr 2026 21:45:10 +0200 Subject: [PATCH 11/14] feedback --- block/internal/reaping/reaper.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index 0a1e38d8ff..7448c7d1a3 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "runtime" "sync" "time" @@ -96,7 +97,7 @@ func (r *Reaper) reaperLoop() { r.logger.Warn(). Err(err). Int("consecutive_failures", consecutiveFailures). - Dur("backoff", backoff). + Dur("next_retry_in", backoff). Msg("reaper error, backing off") if r.wait(backoff, nil) { return @@ -110,9 +111,13 @@ func (r *Reaper) reaperLoop() { } if submitted { + runtime.Gosched() continue } + // Note: if the cleanup ticker fires before the idle interval elapses, + // the remaining idle duration is discarded. drainMempool() is called + // immediately and a fresh idle wait starts from scratch. if r.wait(r.interval, cleanupTicker.C) { return } From 485cff27234b5fd07f1a1bde058c5130ef3b2193 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 10 Apr 2026 13:30:51 +0200 Subject: [PATCH 12/14] feedback --- block/internal/executing/executor.go | 10 +++++++-- block/internal/reaping/reaper.go | 31 +++++++++++++------------- block/internal/reaping/reaper_test.go | 16 ++++++------- block/internal/submitting/submitter.go | 11 +++++++-- block/internal/syncing/syncer.go | 1 + 5 files changed, 41 insertions(+), 28 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 4cdf94a885..e1c335f1e5 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -162,14 +162,20 @@ func (e *Executor) SetBlockProducer(bp BlockProducer) { } // Start begins the execution component -func (e *Executor) Start(ctx context.Context) error { +func (e *Executor) Start(ctx context.Context) (err error) { if e.cancel != nil { return errors.New("executor already started") } e.ctx, e.cancel = context.WithCancel(ctx) + defer func() { // if error during init cancel context + if err != nil { + e.cancel() + e.ctx, e.cancel = nil, nil + } + }() // Initialize state - if err := e.initializeState(); err != nil { + if err = e.initializeState(); err != nil { return fmt.Errorf("failed to initialize state: %w", err) } diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index 7448c7d1a3..796505e462 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -88,9 +88,9 @@ func (r *Reaper) reaperLoop() { consecutiveFailures := 0 for { - submitted, err := r.drainMempool() + submitted, err := r.drainMempool(cleanupTicker.C) - if err != nil && !errors.Is(err, context.Canceled) { + if err != nil && r.ctx.Err() == nil { consecutiveFailures++ backoff := r.interval * time.Duration(1< 0 { - r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes") - } - return false case <-timer.C: return false } @@ -158,7 +148,7 @@ type pendingTx struct { hash string } -func (r *Reaper) drainMempool() (bool, error) { +func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { var totalSubmitted int defer func() { @@ -168,6 +158,15 @@ func (r *Reaper) drainMempool() (bool, error) { }() for { + select { + case <-cleanupCh: + removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention) + if removed > 0 { + r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes") + } + default: + } + txs, err := r.exec.GetTxs(r.ctx) if err != nil { return totalSubmitted > 0, fmt.Errorf("failed to get txs from executor: %w", err) diff --git a/block/internal/reaping/reaper_test.go b/block/internal/reaping/reaper_test.go index 2cff6bdfa3..6bf4426d4a 100644 --- a/block/internal/reaping/reaper_test.go +++ b/block/internal/reaping/reaper_test.go @@ -87,7 +87,7 @@ func TestReaper_NewTxs_SubmitsAndPersistsAndNotifies(t *testing.T) { return &coresequencer.SubmitBatchTxsResponse{}, nil }).Once() - submitted, err := env.reaper.drainMempool() + submitted, err := env.reaper.drainMempool(nil) assert.NoError(t, err) assert.True(t, submitted) assert.True(t, env.cache.IsTxSeen(hashTx(tx1))) @@ -106,7 +106,7 @@ func TestReaper_AllSeen_NoSubmit(t *testing.T) { env.execMock.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1, tx2}, nil).Once() - submitted, err := env.reaper.drainMempool() + submitted, err := env.reaper.drainMempool(nil) assert.NoError(t, err) assert.False(t, submitted) assert.False(t, env.wasNotified()) @@ -129,7 +129,7 @@ func TestReaper_PartialSeen_FiltersAndPersists(t *testing.T) { return &coresequencer.SubmitBatchTxsResponse{}, nil }).Once() - submitted, err := env.reaper.drainMempool() + submitted, err := env.reaper.drainMempool(nil) assert.NoError(t, err) assert.True(t, submitted) assert.True(t, env.cache.IsTxSeen(hashTx(txOld))) @@ -147,7 +147,7 @@ func TestReaper_SequencerError_NoPersistence_NoNotify(t *testing.T) { env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). Return((*coresequencer.SubmitBatchTxsResponse)(nil), assert.AnError).Once() - _, err := env.reaper.drainMempool() + _, err := env.reaper.drainMempool(nil) assert.Error(t, err) assert.False(t, env.cache.IsTxSeen(hashTx(tx))) assert.False(t, env.wasNotified()) @@ -169,7 +169,7 @@ func TestReaper_DrainsMempoolInMultipleRounds(t *testing.T) { return &coresequencer.SubmitBatchTxsResponse{}, nil }).Twice() - submitted, err := env.reaper.drainMempool() + submitted, err := env.reaper.drainMempool(nil) assert.NoError(t, err) assert.True(t, submitted) assert.True(t, env.cache.IsTxSeen(hashTx(tx1))) @@ -183,7 +183,7 @@ func TestReaper_EmptyMempool_NoAction(t *testing.T) { env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Once() - submitted, err := env.reaper.drainMempool() + submitted, err := env.reaper.drainMempool(nil) assert.NoError(t, err) assert.False(t, submitted) assert.False(t, env.wasNotified()) @@ -201,7 +201,7 @@ func TestReaper_HashComputedOnce(t *testing.T) { env.seqMock.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once() - submitted, err := env.reaper.drainMempool() + submitted, err := env.reaper.drainMempool(nil) assert.NoError(t, err) assert.True(t, submitted) assert.True(t, env.cache.IsTxSeen(expectedHash)) @@ -227,7 +227,7 @@ func TestReaper_NilCallback_NoPanic(t *testing.T) { mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once() - submitted, err := r.drainMempool() + submitted, err := r.drainMempool(nil) assert.NoError(t, err) assert.True(t, submitted) } diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 73a05602fc..d92cf9258c 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -118,14 +118,21 @@ func NewSubmitter( } // Start begins the submitting component -func (s *Submitter) Start(ctx context.Context) error { +func (s *Submitter) Start(ctx context.Context) (err error) { if s.cancel != nil { return errors.New("submitter already started") } + s.ctx, s.cancel = context.WithCancel(ctx) + defer func() { // if error during init cancel context + if err != nil { + s.cancel() + s.ctx, s.cancel = nil, nil + } + }() // Initialize DA included height - if err := s.initializeDAIncludedHeight(ctx); err != nil { + if err = s.initializeDAIncludedHeight(ctx); err != nil { return err } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index d9546d076c..13cdad4c1f 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -162,6 +162,7 @@ func (s *Syncer) SetBlockSyncer(bs BlockSyncer) { } // Start begins the syncing component +// The component should not be started after being stopped. func (s *Syncer) Start(ctx context.Context) (err error) { if s.cancel != nil { return errors.New("syncer already started") From efb43cb3a4b1637317ef70d54c127a87da760d1f Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 10 Apr 2026 13:52:09 +0200 Subject: [PATCH 13/14] feedback --- block/internal/syncing/syncer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 13cdad4c1f..4d2cbb4afe 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -279,7 +279,6 @@ func (s *Syncer) Stop(ctx context.Context) error { s.logger.Info().Msg("syncer stopped") close(s.heightInCh) - s.cancel = nil return nil } From 2a071a29d8db8799d6f58107841f8b5e9e5a6ec0 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 10 Apr 2026 15:58:51 +0200 Subject: [PATCH 14/14] add bench --- block/internal/reaping/bench_test.go | 330 +++++++++++++++++++++++++++ 1 file changed, 330 insertions(+) create mode 100644 block/internal/reaping/bench_test.go diff --git a/block/internal/reaping/bench_test.go b/block/internal/reaping/bench_test.go new file mode 100644 index 0000000000..5ec0aaa69d --- /dev/null +++ b/block/internal/reaping/bench_test.go @@ -0,0 +1,330 @@ +package reaping + +import ( + "context" + "crypto/rand" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/rs/zerolog" + + "github.com/evstack/ev-node/block/internal/cache" + coreexecutor "github.com/evstack/ev-node/core/execution" + coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/store" +) + +func newBenchCache(b *testing.B) cache.CacheManager { + b.Helper() + cfg := config.Config{RootDir: b.TempDir()} + memDS := dssync.MutexWrap(ds.NewMapDatastore()) + st := store.New(memDS) + cm, err := cache.NewManager(cfg, st, zerolog.Nop()) + if err != nil { + b.Fatal(err) + } + return cm +} + +type infiniteExecutor struct { + mu sync.Mutex + batch [][]byte + pending atomic.Int64 +} + +func (e *infiniteExecutor) Feed(n int, txSize int) { + txs := make([][]byte, n) + for i := range txs { + tx := make([]byte, txSize) + _, _ = rand.Read(tx) + txs[i] = tx + } + e.mu.Lock() + e.batch = txs + e.mu.Unlock() + e.pending.Add(int64(n)) +} + +func (e *infiniteExecutor) GetTxs(_ context.Context) ([][]byte, error) { + e.mu.Lock() + txs := e.batch + e.batch = nil + e.mu.Unlock() + return txs, nil +} + +func (e *infiniteExecutor) ExecuteTxs(_ context.Context, _ [][]byte, _ uint64, _ time.Time, _ []byte) ([]byte, error) { + return nil, nil +} + +func (e *infiniteExecutor) FilterTxs(_ context.Context, txs [][]byte, _ uint64, _ uint64, _ bool) ([]coreexecutor.FilterStatus, error) { + return make([]coreexecutor.FilterStatus, len(txs)), nil +} + +func (e *infiniteExecutor) GetExecutionInfo(_ context.Context) (coreexecutor.ExecutionInfo, error) { + return coreexecutor.ExecutionInfo{}, nil +} + +func (e *infiniteExecutor) InitChain(_ context.Context, _ time.Time, _ uint64, _ string) ([]byte, error) { + return nil, nil +} + +func (e *infiniteExecutor) SetFinal(_ context.Context, _ uint64) error { return nil } + +type countingSequencer struct { + submitted atomic.Int64 +} + +func (s *countingSequencer) SubmitBatchTxs(_ context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + s.submitted.Add(int64(len(req.Batch.Transactions))) + return &coresequencer.SubmitBatchTxsResponse{}, nil +} + +func (s *countingSequencer) GetNextBatch(_ context.Context, _ coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { + return &coresequencer.GetNextBatchResponse{}, nil +} + +func (s *countingSequencer) VerifyBatch(_ context.Context, _ coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + return &coresequencer.VerifyBatchResponse{}, nil +} + +func (s *countingSequencer) GetDAHeight() uint64 { return 0 } +func (s *countingSequencer) SetDAHeight(_ uint64) {} + +func benchmarkReaperFlow(b *testing.B, batchSize int, txSize int, feedInterval time.Duration) { + scenario := fmt.Sprintf("batch=%d/txSize=%d", batchSize, txSize) + + b.Run(scenario, func(b *testing.B) { + exec := &infiniteExecutor{} + seq := &countingSequencer{} + + cm := newBenchCache(b) + var notified atomic.Int64 + + r, err := NewReaper( + exec, + seq, + genesis.Genesis{ChainID: "bench"}, + zerolog.Nop(), + cm, + 50*time.Millisecond, + func() { notified.Add(1) }, + ) + if err != nil { + b.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := r.Start(ctx); err != nil { + b.Fatal(err) + } + + stopFeeding := make(chan struct{}) + feederDone := make(chan struct{}) + go func() { + defer close(feederDone) + ticker := time.NewTicker(feedInterval) + defer ticker.Stop() + for { + select { + case <-stopFeeding: + return + case <-ticker.C: + exec.Feed(batchSize, txSize) + } + } + }() + + b.ResetTimer() + + var lastCount int64 + for i := 0; i < b.N; i++ { + exec.Feed(batchSize, txSize) + + deadline := time.After(5 * time.Second) + expected := seq.submitted.Load() + int64(batchSize) + for { + cur := seq.submitted.Load() + if cur >= expected { + lastCount = cur + break + } + select { + case <-deadline: + b.Fatalf("timeout: submitted %d, expected >= %d", cur, expected) + default: + } + } + } + + b.StopTimer() + close(stopFeeding) + <-feederDone + cancel() + r.Stop() + + b.ReportMetric(float64(lastCount)/b.Elapsed().Seconds(), "txs/sec") + }) +} + +func BenchmarkReaperFlow_Throughput(b *testing.B) { + sizes := []int{256, 1024, 4096} + batches := []int{10, 100, 500} + + for _, batchSize := range batches { + for _, txSize := range sizes { + benchmarkReaperFlow(b, batchSize, txSize, 10*time.Millisecond) + } + } +} + +func BenchmarkReaperFlow_Sustained(b *testing.B) { + b.Run("steady_100txs_256B", func(b *testing.B) { + exec := &infiniteExecutor{} + seq := &countingSequencer{} + cm := newBenchCache(b) + var notified atomic.Int64 + + r, err := NewReaper( + exec, seq, + genesis.Genesis{ChainID: "bench"}, + zerolog.Nop(), cm, + 10*time.Millisecond, + func() { notified.Add(1) }, + ) + if err != nil { + b.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := r.Start(ctx); err != nil { + b.Fatal(err) + } + + const batchSize = 100 + const txSize = 256 + const duration = 3 * time.Second + + feederDone := make(chan struct{}) + go func() { + defer close(feederDone) + for { + exec.Feed(batchSize, txSize) + time.Sleep(time.Millisecond) + select { + case <-ctx.Done(): + return + default: + } + } + }() + + b.ResetTimer() + time.Sleep(duration) + b.StopTimer() + + cancel() + r.Stop() + <-feederDone + + total := seq.submitted.Load() + elapsed := b.Elapsed().Seconds() + b.ReportMetric(float64(total)/elapsed, "txs/sec") + b.ReportMetric(float64(notified.Load())/elapsed, "notifies/sec") + b.Logf("submitted %d txs in %.1fs (%.0f txs/sec, %d notifications)", total, elapsed, float64(total)/elapsed, notified.Load()) + }) +} + +func BenchmarkReaperFlow_StartStop(b *testing.B) { + b.Run("lifecycle", func(b *testing.B) { + exec := &infiniteExecutor{} + seq := &countingSequencer{} + cm := newBenchCache(b) + + for i := 0; i < b.N; i++ { + r, err := NewReaper( + exec, seq, + genesis.Genesis{ChainID: "bench"}, + zerolog.Nop(), cm, + 100*time.Millisecond, + func() {}, + ) + if err != nil { + b.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + if err := r.Start(ctx); err != nil { + cancel() + b.Fatal(err) + } + r.Stop() + cancel() + } + }) + + b.Run("start_with_backlog", func(b *testing.B) { + const batchSize = 500 + const txSize = 256 + + for i := 0; i < b.N; i++ { + b.StopTimer() + + exec := &infiniteExecutor{} + seq := &countingSequencer{} + cm := newBenchCache(b) + + txs := make([][]byte, batchSize) + for j := range txs { + txs[j] = make([]byte, txSize) + _, _ = rand.Read(txs[j]) + } + exec.mu.Lock() + exec.batch = txs + exec.mu.Unlock() + + r, err := NewReaper( + exec, seq, + genesis.Genesis{ChainID: "bench"}, + zerolog.Nop(), cm, + 10*time.Millisecond, + func() {}, + ) + if err != nil { + b.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + b.StartTimer() + if err := r.Start(ctx); err != nil { + cancel() + b.Fatal(err) + } + + deadline := time.After(5 * time.Second) + for seq.submitted.Load() < batchSize { + select { + case <-deadline: + b.Fatal("timeout waiting for backlog drain") + default: + } + } + b.StopTimer() + + r.Stop() + cancel() + } + }) +}