Skip to content

Add Kafka offset-commit restart case for delivery verification#9

Merged
yusufozturk merged 2 commits into
mainfrom
kafka-offset-commit-restart
Jun 10, 2026
Merged

Add Kafka offset-commit restart case for delivery verification#9
yusufozturk merged 2 commits into
mainfrom
kafka-offset-commit-restart

Conversation

@namles

@namles namles commented Jun 10, 2026

Copy link
Copy Markdown
Member

Introduce a new test case to verify that delivery-bound source acknowledgments persist correctly during a Kafka offset-commit restart. This ensures that after a graceful restart, the consumer resumes from committed offsets without re-consuming messages, thus enforcing delivery limits and preventing over-delivery.

Summary by CodeRabbit

  • New Features
    • Added a kafka_offset_commit_restart test case type to validate message delivery behavior during offset-commit restart scenarios.
    • Added MaxOverDeliveryPct configuration parameter to set a non-negative tolerance threshold for duplicate re-delivery in correctness checks.

@coderabbitai

coderabbitai Bot commented Jun 10, 2026

Copy link
Copy Markdown

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4c5cac91-3b8e-4250-a473-50d16e46050f

📥 Commits

Reviewing files that changed from the base of the PR and between 12b665c and 30f0a79.

📒 Files selected for processing (1)
  • internal/config/case.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/config/case.go

Walkthrough

This PR adds a new test case type kafka_offset_commit_restart that verifies Kafka consumer offset-commit behavior across graceful subject restarts, adds Correctness.MaxOverDeliveryPct with validation, and implements the full runner flow including setup, restart, evaluation, and result persistence.

Changes

Kafka Offset Commit Restart Correctness Verification

Layer / File(s) Summary
MaxOverDeliveryPct configuration field
internal/config/case.go
CorrectnessConfig gains MaxOverDeliveryPct (max_overdelivery_pct) and TestCase.Validate() rejects negative values.
kafka_offset_commit_restart case type dispatch
internal/runner/runner.go
Runner.Run recognizes kafka_offset_commit_restart and routes execution to runKafkaOffsetCommitRestart.
Offset-commit restart test execution flow
internal/runner/runner.go
runKafkaOffsetCommitRestart performs environment setup, starts generator/subject/receiver, waits for full delivery, settles, gracefully restarts the subject, observes receiver for re-consumption, computes loss and over-delivery percentages, validates against ExpectedLossPct and MaxOverDeliveryPct, and persists the RunResult.

Sequence Diagram

sequenceDiagram
  participant TestRunner
  participant Generator
  participant Subject
  participant Receiver
  participant MetricsStore
  
  TestRunner->>Generator: Start
  TestRunner->>Subject: Start
  TestRunner->>Receiver: Start
  Generator->>Receiver: Produce messages
  Receiver->>Receiver: Consume and track count
  Generator->>TestRunner: Complete (sent count)
  Receiver->>TestRunner: Reached full delivery count
  TestRunner->>TestRunner: Settle period
  TestRunner->>Subject: Stop gracefully
  TestRunner->>Subject: Restart
  Subject->>Receiver: Resume consumption
  Receiver->>TestRunner: Final count (detect re-consumption)
  TestRunner->>MetricsStore: Compute loss and over-delivery %
  TestRunner->>MetricsStore: Validate against thresholds
  TestRunner->>MetricsStore: Persist RunResult
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 I nibble configs, count each hop,
Restarting subjects—never stop.
Duplicates measured, thresholds met,
Kafka's tales in logs are set.
Hop, test, persist — and then I plot. 🥕

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: adding a new Kafka offset-commit restart test case for delivery verification, which aligns with the primary modifications across both files.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch kafka-offset-commit-restart

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/config/case.go`:
- Around line 526-533: The Case validation currently does not reject negative
MaxOverDeliveryPct, so add a validation check in the Case.Validate() method to
ensure MaxOverDeliveryPct is >= 0 and return an explicit validation error when
it's negative; reference the MaxOverDeliveryPct field on the Case struct and
update Validate() to produce a clear error message like "max_overdelivery_pct
must be non-negative" (or equivalent) so malformed case files fail fast during
parsing/validation rather than later in the runner.

In `@internal/runner/runner.go`:
- Around line 1767-1770: The check currently reads only tc.Generator.TotalLines
and aborts if it's 0; update the logic in the runner (the block referencing
tc.Generator.TotalLines) to account for plural generators or actual generator
output by computing the effective total lines across tc.Generators when
tc.Generator is not set or has TotalLines==0 (e.g., sum TotalLines from
tc.Generators or derive from provided generator outputs) and only error if the
computed total is <= 0; apply the same fix to the second occurrence around the
block that references the same fields (the block near the other check at lines
~1795-1800) so both paths accept cases using plural generators or non-singular
generator configurations.
- Around line 1802-1893: The runner currently uses LinesReceived
(rm.LinesReceived / recvMetrics.LinesReceived) to detect full delivery and
compute loss/over-delivery, which can false-pass when unique records are lost
but total deliveries match; update the logic to use UniqueLines for delivery and
stability checks (replace rm.LinesReceived checks in the "waiting for full
delivery" loop and the "observing for re-consumption" loop with rm.UniqueLines),
and compute lossPct/overPct using recvMetrics.UniqueLines (and consider
recvMetrics.Duplicates separately for reporting); also update printed messages
that show counts to reflect UniqueLines where appropriate (keep
LinesReceived/Duplicates available in logs but base correctness decisions on
UniqueLines).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e3e3d78f-297b-4266-8684-d8357d624dbc

📥 Commits

Reviewing files that changed from the base of the PR and between 4cdfcf9 and 12b665c.

📒 Files selected for processing (2)
  • internal/config/case.go
  • internal/runner/runner.go

Comment thread internal/config/case.go
Comment thread internal/runner/runner.go
Comment thread internal/runner/runner.go
@yusufozturk

Copy link
Copy Markdown
Member

@namles can you resolve the ai reviews? Thanks in advance!

@cloudflare-workers-and-pages

cloudflare-workers-and-pages Bot commented Jun 10, 2026

Copy link
Copy Markdown

Deploying pipebench with  Cloudflare Pages  Cloudflare Pages

Latest commit: 30f0a79
Status: ✅  Deploy successful!
Preview URL: https://d6619978.pipebench.pages.dev
Branch Preview URL: https://kafka-offset-commit-restart.pipebench.pages.dev

View logs

@yusufozturk yusufozturk merged commit c89f5b0 into main Jun 10, 2026
4 checks passed
@yusufozturk yusufozturk deleted the kafka-offset-commit-restart branch June 10, 2026 22:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants