Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changes

- Improve reaper to sustain txs burst better [#3236](https://github.com/evstack/ev-node/pull/3236)

## v1.1.0

No changes from v1.1.0-rc.2.
Expand Down
2 changes: 1 addition & 1 deletion block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ func newAggregatorComponents(
sequencer,
genesis,
logger,
executor,
cacheManager,
config.Node.ScrapeInterval.Duration,
executor.NotifyNewTransactions,
)
if err != nil {
return nil, fmt.Errorf("failed to create reaper: %w", err)
Expand Down
16 changes: 9 additions & 7 deletions block/internal/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type CacheManager interface {
// Transaction operations
IsTxSeen(hash string) bool
SetTxSeen(hash string)
SetTxsSeen(hashes []string)
CleanupOldTxs(olderThan time.Duration) int

// Pending events syncing coordination
Expand Down Expand Up @@ -210,6 +211,14 @@ func (m *implementation) SetTxSeen(hash string) {
m.txTimestamps.Store(hash, time.Now())
}

func (m *implementation) SetTxsSeen(hashes []string) {
now := time.Now()
for _, hash := range hashes {
m.txCache.setSeen(hash, 0)
m.txTimestamps.Store(hash, now)
}
}

// CleanupOldTxs removes transaction hashes older than olderThan and returns
// the count removed. Defaults to DefaultTxCacheRetention if olderThan <= 0.
func (m *implementation) CleanupOldTxs(olderThan time.Duration) int {
Expand Down Expand Up @@ -237,13 +246,6 @@ func (m *implementation) CleanupOldTxs(olderThan time.Duration) int {
return true
})

if removed > 0 {
m.logger.Debug().
Int("removed", removed).
Dur("older_than", olderThan).
Msg("cleaned up old transaction hashes from cache")
}

return removed
}

Expand Down
3 changes: 3 additions & 0 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func (e *Executor) SetBlockProducer(bp BlockProducer) {

// Start begins the execution component
func (e *Executor) Start(ctx context.Context) error {
if e.cancel != nil {
return errors.New("executor already started")
}
e.ctx, e.cancel = context.WithCancel(ctx)

// Initialize state
Expand Down
3 changes: 3 additions & 0 deletions block/internal/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func New(

// Start begins the pruning loop.
func (p *Pruner) Start(ctx context.Context) error {
if p.cancel != nil {
return errors.New("pruner already started")
}
if !p.cfg.IsPruningEnabled() {
p.logger.Info().Msg("pruning is disabled, not starting pruner")
return nil
Expand Down
225 changes: 129 additions & 96 deletions block/internal/reaping/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"encoding/hex"
"errors"
"fmt"
"runtime"
"sync"
"time"

"github.com/rs/zerolog"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/executing"
coreexecutor "github.com/evstack/ev-node/core/execution"
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/genesis"
Expand All @@ -21,40 +21,35 @@ import (
const (
// MaxBackoffInterval is the maximum backoff interval for retries
MaxBackoffInterval = 30 * time.Second
CleanupInterval = 1 * time.Hour
)

// Reaper is responsible for periodically retrieving transactions from the executor,
// filtering out already seen transactions, and submitting new transactions to the sequencer.
type Reaper struct {
exec coreexecutor.Executor
sequencer coresequencer.Sequencer
chainID string
interval time.Duration
cache cache.CacheManager
executor *executing.Executor

// shared components
exec coreexecutor.Executor
sequencer coresequencer.Sequencer
chainID string
interval time.Duration
cache cache.CacheManager
onTxsSubmitted func()

logger zerolog.Logger

// Lifecycle
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

// NewReaper creates a new Reaper instance.
func NewReaper(
exec coreexecutor.Executor,
sequencer coresequencer.Sequencer,
genesis genesis.Genesis,
logger zerolog.Logger,
executor *executing.Executor,
cache cache.CacheManager,
scrapeInterval time.Duration,
onTxsSubmitted func(),
) (*Reaper, error) {
if executor == nil {
return nil, errors.New("executor cannot be nil")
}
if cache == nil {
return nil, errors.New("cache cannot be nil")
}
Expand All @@ -63,71 +58,88 @@ func NewReaper(
}

return &Reaper{
exec: exec,
sequencer: sequencer,
chainID: genesis.ChainID,
interval: scrapeInterval,
logger: logger.With().Str("component", "reaper").Logger(),
cache: cache,
executor: executor,
exec: exec,
sequencer: sequencer,
chainID: genesis.ChainID,
interval: scrapeInterval,
logger: logger.With().Str("component", "reaper").Logger(),
cache: cache,
onTxsSubmitted: onTxsSubmitted,
}, nil
}

// Start begins the execution component
func (r *Reaper) Start(ctx context.Context) error {
if r.cancel != nil {
return errors.New("reaper already started")
}
r.ctx, r.cancel = context.WithCancel(ctx)

// Start reaper loop
r.wg.Go(r.reaperLoop)

r.logger.Info().Dur("interval", r.interval).Msg("reaper started")
r.logger.Info().Dur("idle_interval", r.interval).Msg("reaper started")
return nil
}

func (r *Reaper) reaperLoop() {
ticker := time.NewTicker(r.interval)
defer ticker.Stop()

cleanupTicker := time.NewTicker(1 * time.Hour)
cleanupTicker := time.NewTicker(CleanupInterval)
defer cleanupTicker.Stop()

consecutiveFailures := 0

for {
select {
case <-r.ctx.Done():
return
case <-ticker.C:
err := r.SubmitTxs()
if err != nil {
// Increment failure counter and apply exponential backoff
consecutiveFailures++
backoff := r.interval * time.Duration(1<<min(consecutiveFailures, 5)) // Cap at 2^5 = 32x
backoff = min(backoff, MaxBackoffInterval)
r.logger.Warn().
Err(err).
Int("consecutive_failures", consecutiveFailures).
Dur("next_retry_in", backoff).
Msg("reaper encountered error, applying backoff")

// Reset ticker with backoff interval
ticker.Reset(backoff)
} else {
// Reset failure counter and backoff on success
if consecutiveFailures > 0 {
r.logger.Info().Msg("reaper recovered from errors, resetting backoff")
consecutiveFailures = 0
ticker.Reset(r.interval)
}
}
case <-cleanupTicker.C:
// Clean up transaction hashes older than 24 hours
// This prevents unbounded growth of the transaction seen cache
removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention)
if removed > 0 {
r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes")
submitted, err := r.drainMempool()

if err != nil && !errors.Is(err, context.Canceled) {
consecutiveFailures++
backoff := r.interval * time.Duration(1<<min(consecutiveFailures, 5))
backoff = min(backoff, MaxBackoffInterval)
r.logger.Warn().
Err(err).
Int("consecutive_failures", consecutiveFailures).
Dur("next_retry_in", backoff).
Msg("reaper error, backing off")
if r.wait(backoff, nil) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the main change, if we get txs we retry immediately. if we do not we wait for the interval

return
}
continue
}

if consecutiveFailures > 0 {
r.logger.Info().Msg("reaper recovered from errors")
consecutiveFailures = 0
}

if submitted {
runtime.Gosched()
continue
}

// Note: if the cleanup ticker fires before the idle interval elapses,
// the remaining idle duration is discarded. drainMempool() is called
// immediately and a fresh idle wait starts from scratch.
if r.wait(r.interval, cleanupTicker.C) {
return
}
}
}

// wait blocks for the given duration. Returns true if the context was cancelled.
// When cleanupCh is non-nil, processes cache cleanup if that channel fires first.
func (r *Reaper) wait(d time.Duration, cleanupCh <-chan time.Time) bool {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-r.ctx.Done():
return true
case <-cleanupCh:
removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention)
if removed > 0 {
r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes")
}
return false
case <-timer.C:
return false
}
}

Expand All @@ -137,60 +149,81 @@ func (r *Reaper) Stop() error {
r.cancel()
}
r.wg.Wait()

r.logger.Info().Msg("reaper stopped")
return nil
}

// SubmitTxs retrieves transactions from the executor and submits them to the sequencer.
// Returns an error if any critical operation fails.
func (r *Reaper) SubmitTxs() error {
txs, err := r.exec.GetTxs(r.ctx)
if err != nil {
r.logger.Error().Err(err).Msg("failed to get txs from executor")
return fmt.Errorf("failed to get txs from executor: %w", err)
type pendingTx struct {
tx []byte
hash string
}

func (r *Reaper) drainMempool() (bool, error) {
var totalSubmitted int

defer func() {
if totalSubmitted > 0 && r.onTxsSubmitted != nil {
r.onTxsSubmitted()
}
}()

for {
txs, err := r.exec.GetTxs(r.ctx)
if err != nil {
return totalSubmitted > 0, fmt.Errorf("failed to get txs from executor: %w", err)
}
if len(txs) == 0 {
break
}

filtered := r.filterNewTxs(txs)
if len(filtered) == 0 {
break
}

n, err := r.submitFiltered(filtered)
if err != nil {
return totalSubmitted > 0, err
}
totalSubmitted += n
}
if len(txs) == 0 {
r.logger.Debug().Msg("no new txs")
return nil

if totalSubmitted > 0 {
r.logger.Debug().Int("total_txs", totalSubmitted).Msg("drained mempool")
}

var newTxs [][]byte
return totalSubmitted > 0, nil
}

func (r *Reaper) filterNewTxs(txs [][]byte) []pendingTx {
pending := make([]pendingTx, 0, len(txs))
for _, tx := range txs {
txHash := hashTx(tx)
if !r.cache.IsTxSeen(txHash) {
newTxs = append(newTxs, tx)
h := hashTx(tx)
if !r.cache.IsTxSeen(h) {
pending = append(pending, pendingTx{tx: tx, hash: h})
}
}
return pending
}

if len(newTxs) == 0 {
r.logger.Debug().Msg("no new txs to submit")
return nil
func (r *Reaper) submitFiltered(batch []pendingTx) (int, error) {
txs := make([][]byte, len(batch))
hashes := make([]string, len(batch))
for i, p := range batch {
txs[i] = p.tx
hashes[i] = p.hash
}

r.logger.Debug().Int("txCount", len(newTxs)).Msg("submitting txs to sequencer")

_, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{
_, err := r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{
Id: []byte(r.chainID),
Batch: &coresequencer.Batch{Transactions: newTxs},
Batch: &coresequencer.Batch{Transactions: txs},
})
if err != nil {
return fmt.Errorf("failed to submit txs to sequencer: %w", err)
return 0, fmt.Errorf("failed to submit txs to sequencer: %w", err)
}

for _, tx := range newTxs {
txHash := hashTx(tx)
r.cache.SetTxSeen(txHash)
}

// Notify the executor that new transactions are available
if len(newTxs) > 0 {
r.logger.Debug().Msg("notifying executor of new transactions")
r.executor.NotifyNewTransactions()
}

r.logger.Debug().Msg("successfully submitted txs")
return nil
r.cache.SetTxsSeen(hashes)
return len(txs), nil
}

func hashTx(tx []byte) string {
Expand Down
Loading
Loading