Skip to content

refactor: reaper to drain mempool#3236

Open
julienrbrt wants to merge 12 commits intomainfrom
julien/reaper
Open

refactor: reaper to drain mempool#3236
julienrbrt wants to merge 12 commits intomainfrom
julien/reaper

Conversation

@julienrbrt
Copy link
Copy Markdown
Member

@julienrbrt julienrbrt commented Apr 9, 2026

Overview

Improve reaper by effectively having no interval during tx burst

Summary by CodeRabbit

  • Bug Fixes

    • Components (executor, pruner, submitter, syncer) now fail fast when started twice to avoid duplicate starts.
  • Improvements

    • Reaper redesigned to drain the mempool with improved error recovery, exponential backoff, and optional notification callback.
    • Cache optimized for bulk marking of seen transactions and periodic cleanup.
  • Tests

    • Updated tests to cover the new reaper flow and component start behavior.
  • Documentation

    • ADR and changelog updated to reflect reaping and aggregation notes.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 9, 2026

Warning

Rate limit exceeded

@julienrbrt has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 1 minutes and 20 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 1 minutes and 20 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 73c65065-9937-475f-9325-fc40fcb52bbb

📥 Commits

Reviewing files that changed from the base of the PR and between 70c8b89 and 6c44381.

📒 Files selected for processing (1)
  • block/internal/reaping/reaper.go
📝 Walkthrough

Walkthrough

Reaper was decoupled from Executor and now uses an optional onTxsSubmitted callback; its loop changed from ticker-driven SubmitTxs() to drain-and-backoff mempool submission. CacheManager gained a bulk SetTxsSeen method. Multiple components (Executor, Pruner, Submitter, Syncer) now guard against double-start. Tests and docs updated accordingly.

Changes

Cohort / File(s) Summary
Reaper & tests
block/internal/reaping/reaper.go, block/internal/reaping/reaper_test.go
Removed Executor dependency, added onTxsSubmitted func() callback, replaced ticker SubmitTxs flow with drainMempool() + exponential backoff; added helpers (filterNewTxs, submitFiltered, wait); tests reworked to use new flow and callback.
Cache Manager
block/internal/cache/manager.go
Added SetTxsSeen(hashes []string) to interface and implementation to mark txs seen in bulk; removed conditional debug log in CleanupOldTxs.
Component startup guards
block/internal/executing/executor.go, block/internal/pruner/pruner.go, block/internal/submitting/submitter.go, block/internal/syncing/syncer.go
Added early-return checks for cancel != nil to prevent double-starts, returning an error when already started.
Component wiring
block/components.go
newAggregatorComponents now passes executor.NotifyNewTransactions (callback) into Reaper instead of the Executor instance.
Submitter tests
block/internal/submitting/submitter_test.go
Removed manual assignment of test context/cancel into Submitter before initializeDAIncludedHeight.
Documentation / Changelog
docs/adr/adr-021-lazy-aggregation.md, CHANGELOG.md
Formatting/indentation tweaks in ADR examples; added CHANGELOG entry noting reaper improvements.

Sequence Diagrams

sequenceDiagram
    participant Reaper
    participant Mempool
    participant Cache
    participant Sequencer
    participant Callback

    loop drainMempool (repeat, backoff on error)
        Reaper->>Mempool: GetTxs() batch
        Mempool-->>Reaper: transactions
        Reaper->>Cache: filter unseen txs
        Cache-->>Reaper: unseen transactions + hashes

        alt unseen txs exist
            Reaper->>Sequencer: submitFiltered() batch
            alt success
                Sequencer-->>Reaper: submitted count
                Reaper->>Cache: SetTxsSeen(hashes) (bulk)
                Reaper->>Callback: onTxsSubmitted()
                Callback-->>Reaper: invoked
            else error
                Sequencer-->>Reaper: error
                Note over Reaper: apply exponential backoff and wait
            end
        else no unseen txs
            Note over Reaper: continue draining or wait for cleanup interval
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~70 minutes

Suggested reviewers

  • tuxcanfly
  • tzdybal
  • gupadhyaya
  • randygrok

Poem

🐰 The Reaper hops free, no Executor chain,
Drains mempool rounds, then rests through the rain.
Cache marks a crowd with one gentle sweep,
Callbacks ring out when submissions leap.
Start guards stand watch — now everything sleeps.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description provides a high-level goal but lacks implementation details, context, or issue references required by the template. Add implementation details explaining why the interval-based approach was replaced, the benefits of continuous mempool draining, and link any related issues (e.g., PR #3236 or a GitHub issue).
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately reflects the primary refactoring change: the reaper component now drains the mempool continuously instead of using an interval-based approach.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch julien/reaper

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Copy Markdown
Contributor

claude bot commented Apr 9, 2026

Claude finished @julienrbrt's task in 2m 21s —— View job


Code Review

  • Gather context (PR description, changed files list)
  • Read repository guidelines (AGENTS.md)
  • Read all changed files
  • Analyze concurrency, error handling, architecture
  • Post comprehensive review

Overall the redesign is a clear improvement — decoupling the Reaper from *executing.Executor via a callback is the right direction, and the drain-and-Gosched loop eliminates the fixed-ticker latency during bursts. A few correctness issues need attention before merge.


Critical / Major

1. Lifecycle rollback missing on failed startup (executor.go, submitter.go)

e.cancel (and s.cancel in submitter) is set before initializeState() completes. If initialization fails, cancel != nil for all future calls to Start(), permanently making the instance appear "already started" — turning any transient boot error into a permanently dead component.

// block/internal/executing/executor.go:165-174
func (e *Executor) Start(ctx context.Context) error {
    if e.cancel != nil {
        return errors.New("executor already started")
    }
    e.ctx, e.cancel = context.WithCancel(ctx)   // ← set here
    if err := e.initializeState(); err != nil {  // ← fails here
        return fmt.Errorf("failed to initialize state: %w", err)
        // cancel is now set; next Start() returns "already started"
    }
    ...
}

Fix: defer a rollback when initialization fails.

func (e *Executor) Start(ctx context.Context) (err error) {
    if e.cancel != nil {
        return errors.New("executor already started")
    }
    e.ctx, e.cancel = context.WithCancel(ctx)
    defer func() {
        if err != nil {
            e.cancel()
            e.ctx, e.cancel = nil, nil
        }
    }()
    if err = e.initializeState(); err != nil {
        return fmt.Errorf("failed to initialize state: %w", err)
    }
    ...
}

The same pattern applies to submitter.go:121-128s.cancel is set before initializeDAIncludedHeight could fail. Fix this →


2. Syncer restart is unsafe after Stop() (syncer.go)

Stop() closes s.heightInCh and resets s.cancel = nil. This means the new guard if s.cancel != nil passes after a stop, but processLoop and any sender will operate on a closed channel — either receiving a zero-value indefinitely or panicking on send.

// Stop():
close(s.heightInCh)  // closed here
s.cancel = nil        // guard reset here → Start() guard now passes

// A subsequent Start() succeeds, processLoop reads from closed s.heightInCh → spin

The other components (Executor, Reaper, Pruner, Submitter) do not reset cancel in Stop(), making them intentionally single-use. Syncer should be consistent. Either:

  • Option A (preferred): Remove s.cancel = nil from Stop() to make Syncer single-use too.
  • Option B: Reinitialize s.heightInCh = make(chan ...) in Start() before the guard check.

Fix this →


Moderate

3. Perpetual drain starves the cleanup ticker (reaper.go:84-125)

drainMempool() loops until GetTxs returns empty — if the mempool grows continuously (constant influx of unique txs), drainMempool() never returns, and the cleanupTicker in reaperLoop can never fire. The tx cache could grow unbounded in this scenario.

Consider capping the number of drain iterations per cycle, or moving the cleanup check inside drainMempool:

const maxDrainRounds = 100 // example cap

func (r *Reaper) drainMempool() (bool, error) {
    for i := 0; i < maxDrainRounds; i++ {
        ...
    }
    ...
}

4. Error filter misses context.DeadlineExceeded (reaper.go:93)

if err != nil && !errors.Is(err, context.Canceled) {

A parent context with a deadline would yield context.DeadlineExceeded, which is not filtered. This would trigger backoff + log spam on normal deadline-driven shutdown. Use r.ctx.Err() != nil to cover both:

if err != nil && r.ctx.Err() == nil {
    // genuine error, apply backoff
}

Fix this →


Minor / Nitpicks

5. CleanupInterval constant should be unexported (reaper.go:24)

CleanupInterval = 1 * time.Hour  // exported but only used internally

CleanupInterval is exported but has no external callers. Use cleanupInterval unless external configuration is planned.

6. runtime.Gosched() needs a comment (reaper.go:114)

if submitted {
    runtime.Gosched()
    continue
}

It's not obvious why Gosched() is used here rather than just continue. A one-line comment ("yield to allow executor/submission goroutines to react before we fetch the next batch") would help future readers understand the intent.

7. Test TestReaper_HashComputedOnce is misnamed (reaper_test.go:192)

The test only verifies that the hash is present in the cache after draining. It does not verify that the hash function was called once (vs. multiple times). Rename to TestReaper_CachesHashAfterSubmission or add actual call-count verification via a hash-function hook.

8. ADR references outdated callback owner (docs/adr/adr-021-lazy-aggregation.md:92)

The ADR still says Manager.NotifyNewTransactions() but the implementation now wires executor.NotifyNewTransactions (see block/components.go:283). Update the ADR to reflect the actual owner.


Positive Observations

  • Decoupling Reaper from *executing.Executor via a callback is the right abstraction — it breaks a direct dependency on a concrete type and makes the Reaper independently testable.
  • The drainMempoolfilterNewTxssubmitFiltered decomposition is clean and each function has a single responsibility.
  • SetTxsSeen([]string) bulk cache update with a single time.Now() for the batch is correct and avoids per-tx syscall overhead.
  • The exponential backoff capped at MaxBackoffInterval is well-designed.
  • The wait() helper cleanly unifies context cancellation, cleanup ticker, and idle timer into one blocking select.
  • Test coverage for the new flow is good — nil callback, sequencer errors, multi-round draining are all exercised.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 9, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedApr 9, 2026, 7:45 PM

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 9, 2026

PR Preview Action v1.8.1

QR code for preview link

🚀 View preview at
https://evstack.github.io/docs-preview/pr-3236/

Built to branch main at 2026-04-09 19:46 UTC.
Preview will be ready when the GitHub Pages deployment is complete.

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 9, 2026

Codecov Report

❌ Patch coverage is 56.70103% with 42 lines in your changes missing coverage. Please review.
✅ Project coverage is 61.82%. Comparing base (3abebca) to head (6c44381).

Files with missing lines Patch % Lines
block/internal/reaping/reaper.go 65.06% 24 Missing and 5 partials ⚠️
block/internal/cache/manager.go 0.00% 5 Missing ⚠️
block/internal/executing/executor.go 0.00% 2 Missing ⚠️
block/internal/pruner/pruner.go 0.00% 2 Missing ⚠️
block/internal/submitting/submitter.go 0.00% 1 Missing and 1 partial ⚠️
block/internal/syncing/syncer.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3236      +/-   ##
==========================================
+ Coverage   61.67%   61.82%   +0.14%     
==========================================
  Files         120      120              
  Lines       12635    12662      +27     
==========================================
+ Hits         7793     7828      +35     
+ Misses       3968     3957      -11     
- Partials      874      877       +3     
Flag Coverage Δ
combined 61.82% <56.70%> (+0.14%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Int("consecutive_failures", consecutiveFailures).
Dur("backoff", backoff).
Msg("reaper error, backing off")
if r.wait(backoff, nil) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the main change, if we get txs we retry immediately. if we do not we wait for the interval

@julienrbrt julienrbrt marked this pull request as ready for review April 9, 2026 19:16
@julienrbrt julienrbrt requested review from chatton and tac0turtle April 9, 2026 19:21
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
block/internal/executing/executor.go (1)

166-174: ⚠️ Potential issue | 🟠 Major

Rollback the lifecycle state when startup fails.

e.cancel is set before initializeState(). If initialization fails, no worker is running, but every later Start() now returns "executor already started". That turns transient boot failures into a permanently unusable executor instance.

Suggested rollback pattern
-func (e *Executor) Start(ctx context.Context) error {
+func (e *Executor) Start(ctx context.Context) (err error) {
 	if e.cancel != nil {
 		return errors.New("executor already started")
 	}
 	e.ctx, e.cancel = context.WithCancel(ctx)
+	defer func() {
+		if err != nil {
+			e.cancel()
+			e.ctx = nil
+			e.cancel = nil
+		}
+	}()
 
 	// Initialize state
-	if err := e.initializeState(); err != nil {
+	if err = e.initializeState(); err != nil {
 		return fmt.Errorf("failed to initialize state: %w", err)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/executing/executor.go` around lines 166 - 174, The code sets
e.cancel before calling initializeState(), so if initializeState() fails the
executor remains marked as started (e.cancel != nil) and future Start() calls
fail; update Start() to defer a rollback that clears e.ctx and e.cancel when
initialization fails: after setting e.ctx, e.cancel = context.WithCancel(ctx),
add a defer function (or conditional cleanup on error) that calls e.cancel() and
sets e.ctx, e.cancel back to nil if initializeState() returns an error, ensuring
the executor's lifecycle fields (e.ctx and e.cancel) are only left set on
successful initialization in initializeState().
🧹 Nitpick comments (1)
block/internal/reaping/reaper_test.go (1)

192-208: Test name doesn't match what's being verified.

The test name TestReaper_HashComputedOnce implies verification that hash computation occurs only once, but the test only checks that the expected hash exists in the cache after draining. This doesn't actually prove the hash was computed once versus multiple times.

Consider either:

  1. Renaming to something like TestReaper_CacheLookupByHash to reflect what's actually tested, or
  2. If single-computation is the intent, instrumenting the hash function to verify call count.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/reaping/reaper_test.go` around lines 192 - 208, The test
TestReaper_HashComputedOnce is misnamed because it only asserts the hash is
present in the cache after calling env.reaper.drainMempool() and does not verify
the hash function was invoked exactly once; either rename the test to reflect
its behavior (e.g., TestReaper_CacheLookupByHash) or change the test to actually
assert single computation by instrumenting hashTx (or injecting a hash function
into reaper) and counting calls around env.reaper.drainMempool(), then assert
the call count equals one while still checking env.cache.IsTxSeen(expectedHash).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/syncing/syncer.go`:
- Around line 166-168: The Start/Stop lifecycle is unsafe because Stop() clears
s.cancel but closes s.heightInCh, so a subsequent Start() can reuse a closed
channel; fix by making Start fully reinitialize restart-sensitive state instead
of relying on s.cancel: in Start() recreate channels (e.g., s.heightInCh, any
other closed/internal channels) and reset any state used by processLoop before
clearing/setting s.cancel, or alternatively make Stop() leave s.cancel set to
prevent restarts; update the guard around s.cancel and the Stop() implementation
to either preserve s.cancel for single-use Syncer or ensure all channels and
state are recreated in Start() (refer to Start, Stop, s.cancel, s.heightInCh,
and processLoop).

In `@docs/adr/adr-021-lazy-aggregation.md`:
- Line 92: The ADR text references Manager.NotifyNewTransactions but the code
now wires reaping.NewReaper(..., executor.NotifyNewTransactions), so update the
document to reflect the actual callback owner: replace references to
Manager.NotifyNewTransactions with executor.NotifyNewTransactions and explain
that the Reaper calls the executor callback (executor.NotifyNewTransactions)
which signals txNotifyCh; mention reaping.NewReaper and txNotifyCh to make the
connection explicit.

---

Outside diff comments:
In `@block/internal/executing/executor.go`:
- Around line 166-174: The code sets e.cancel before calling initializeState(),
so if initializeState() fails the executor remains marked as started (e.cancel
!= nil) and future Start() calls fail; update Start() to defer a rollback that
clears e.ctx and e.cancel when initialization fails: after setting e.ctx,
e.cancel = context.WithCancel(ctx), add a defer function (or conditional cleanup
on error) that calls e.cancel() and sets e.ctx, e.cancel back to nil if
initializeState() returns an error, ensuring the executor's lifecycle fields
(e.ctx and e.cancel) are only left set on successful initialization in
initializeState().

---

Nitpick comments:
In `@block/internal/reaping/reaper_test.go`:
- Around line 192-208: The test TestReaper_HashComputedOnce is misnamed because
it only asserts the hash is present in the cache after calling
env.reaper.drainMempool() and does not verify the hash function was invoked
exactly once; either rename the test to reflect its behavior (e.g.,
TestReaper_CacheLookupByHash) or change the test to actually assert single
computation by instrumenting hashTx (or injecting a hash function into reaper)
and counting calls around env.reaper.drainMempool(), then assert the call count
equals one while still checking env.cache.IsTxSeen(expectedHash).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: aa4a7354-a77a-4989-a619-2024cd382a3b

📥 Commits

Reviewing files that changed from the base of the PR and between 3abebca and d28180e.

📒 Files selected for processing (10)
  • block/components.go
  • block/internal/cache/manager.go
  • block/internal/executing/executor.go
  • block/internal/pruner/pruner.go
  • block/internal/reaping/reaper.go
  • block/internal/reaping/reaper_test.go
  • block/internal/submitting/submitter.go
  • block/internal/submitting/submitter_test.go
  • block/internal/syncing/syncer.go
  • docs/adr/adr-021-lazy-aggregation.md
💤 Files with no reviewable changes (1)
  • block/internal/submitting/submitter_test.go

Comment on lines +166 to +168
if s.cancel != nil {
return errors.New("syncer already started")
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Restart is still unsafe after Stop().

Stop() clears s.cancel but also closes s.heightInCh, so a second Start() passes this guard and reuses a closed channel. That can make processLoop spin on an always-ready receive and any later send to heightInCh panic. Either keep Syncer single-use by not clearing s.cancel, or fully recreate restart-sensitive state before allowing another start.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer.go` around lines 166 - 168, The Start/Stop
lifecycle is unsafe because Stop() clears s.cancel but closes s.heightInCh, so a
subsequent Start() can reuse a closed channel; fix by making Start fully
reinitialize restart-sensitive state instead of relying on s.cancel: in Start()
recreate channels (e.g., s.heightInCh, any other closed/internal channels) and
reset any state used by processLoop before clearing/setting s.cancel, or
alternatively make Stop() leave s.cancel set to prevent restarts; update the
guard around s.cancel and the Stop() implementation to either preserve s.cancel
for single-use Syncer or ensure all channels and state are recreated in Start()
(refer to Start, Stop, s.cancel, s.heightInCh, and processLoop).

}
}
```
A dedicated lazy aggregation loop has been implemented with dual timer mechanisms. The `lazyTimer` ensures blocks are produced at regular intervals even during network inactivity, while the `blockTimer` handles normal block production when transactions are available. Transaction notifications from the `Reaper` to the `Manager` are now handled via the `txNotifyCh` channel: when the `Reaper` detects new transactions, it calls `Manager.NotifyNewTransactions()`, which performs a non-blocking signal on this channel. See the tests in `block/lazy_aggregation_test.go` for verification of this behavior.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

The callback owner is outdated in the ADR.

The implementation now wires reaping.NewReaper(..., executor.NotifyNewTransactions) in block/components.go, so this should reference the executor callback rather than Manager.NotifyNewTransactions(). As written, the design doc no longer matches the code path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/adr/adr-021-lazy-aggregation.md` at line 92, The ADR text references
Manager.NotifyNewTransactions but the code now wires reaping.NewReaper(...,
executor.NotifyNewTransactions), so update the document to reflect the actual
callback owner: replace references to Manager.NotifyNewTransactions with
executor.NotifyNewTransactions and explain that the Reaper calls the executor
callback (executor.NotifyNewTransactions) which signals txNotifyCh; mention
reaping.NewReaper and txNotifyCh to make the connection explicit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant