Add Kafka (Redpanda) support: consume, round-trip, batch, and crash/restart cases#6
Conversation
Generator gains a `kafka` producer mode (franz-go) with per-message vs JSON-array packing (kafka_batch); orchestrator renders a Redpanda broker + one-shot topic-init and wires generator/subject depends_on; new kafka_performance / kafka_correctness types reuse the perf/correctness verdict flow. Example cases for vmetric/vector/fluent-bit/logstash, and the subject registry bumps the director to 2.0.2. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
kafka_restart_correctness (SIGTERM) and kafka_crash_correctness (SIGKILL) buffer with the receiver down, restart the subject, and verify every record is delivered. kafka_inflight_crash_correctness keeps the receiver up and SIGKILLs the subject mid-delivery to exercise the at-least-once over-delivery window. All assert no loss; duplicates are reported, not failed. Adds the runner dispatch + runKafkaInflightCrash, IsKafkaType() + validation, and drops the subject->receiver depends_on so the receiver-down phase can run. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- tcp_to_kafka_to_tcp_correctness / _performance: TCP -> Kafka target -> Kafka device -> TCP round-trip inside one director with two if-isolated routes. The device-static `if` on each route confines a device to a single target; without it vmetric fans every device to every target and kafka-in loops back into kafka-out (infinite Kafka feedback loop). - kafka_batch_to_tcp_correctness: generator packs 10 JSON objects per message as a JSON array (kafka_batch: 10); the device splits them back into events (split_mode: json_array). Tests device array parsing. - tcp_to_kafka_batch_to_tcp_correctness: target batches up to 100 events per message (batch_mode: json_array, max_events: 100); the device splits them. Tests target batching + device splitting together. - orchestrator: gate the generator's depends_on on generator mode — TCP generators wait on the subject, Kafka generators wait on redpanda-init. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A single shared client batches thin across partitions and caps produce throughput (~1.6M rec/s across 4 partitions). Giving each of the cfg.Connections workers its own client plus a deeper MaxBufferedRecords buffer scales produce to ~8.4M rec/s, so the subject's consumer — not the generator — becomes the bottleneck in kafka_to_tcp_performance. Both Kafka correctness cases still pass 2000/2000 with the change. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Bump to partitions=8 + device workers=8 + Redpanda smp=6/6G + 8 generator connections. Each consumer worker joins the group as its own member and owns one partition, so consumption runs in parallel. With the per-connection generator clients and pre-sized VMFL buffers this reaches ~3.35M EPS vs ~1.9M on a single partition. Redpanda needs >=1 GB/shard (smp=8/2G segfaults). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- kafka_batch_to_tcp_correctness: add vector + logstash. Both split a batched JSON-array message into one event per element (vector via json decoding, logstash via codec=>json), so counts reconcile 1:1. fluent-bit is omitted — its kafka input cannot split array messages. - tcp_to_kafka_to_tcp_correctness / _performance (round-trip): add vector, fluent-bit, logstash. Each produces to AND consumes from Kafka in one pipeline without crossing the legs — vector by explicit `inputs`, fluent-bit by Tag/Match, logstash by tag conditionals (avoiding the fan-out loop). Correctness is paced (rate: 200) so slower JVM subjects round-trip losslessly; the performance case floods (loss informational). - tcp_to_kafka_batch_to_tcp_correctness stays vmetric-only and documents why: only vmetric's kafka target packs json_array on produce. Verified: array-split + round-trip correctness are 2000/2000, 0% loss for every listed subject; round-trip perf produces EPS for all four. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…zir, telegraf
Expand the Kafka consume cases from 4 to 9 subjects. Each new subject consumes
the topic and forwards to the TCP receiver with exact 1:1 counts in the
correctness run (verified 2000/2000, 0% loss):
- otel-collector / bindplane-agent: kafka receiver (logs.topics + encoding raw)
-> syslog exporter (rfc3164/tcp).
- grafana-alloy: otelcol.receiver.kafka { logs { topics encoding } } -> syslog.
- tenzir: from_kafka with broker options -> write_ndjson -> save_tcp.
- telegraf: kafka_consumer (version 2.8.0 — sarama default is too old for
Redpanda and EOFs) -> socket_writer.
Excluded (documented in the subjects block): axosyslog (pinned image exposes no
kafka source driver), cribl-stream (re-emits ~26 duplicate events/run —
at-least-once on the tcpjson output), filebeat (no plain-TCP output;
elasticsearch-only), fluentd/nxlog/rotel (no Kafka in their pinned images).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
cribl sustains ~940K EPS consuming Kafka -> TCP, second only to vmetric among the non-trivial subjects. It's listed in the performance case but NOT in kafka_to_tcp_correctness because it re-emits ~26 duplicate events per run (at-least-once flush on the tcpjson output) — harmless for a throughput measurement, but it fails an exact-count correctness check. Kafka source via a kafka-typed input routed to the existing tcpjson output. auth/ stays gitignored and is regenerated on boot (same as the tcp cribl case). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Every vmetric otel device enabled BOTH otlp_grpc_status and otlp_http_status, so the device bound 4317 (gRPC) and 4318 (HTTP) at once. Combined with the generator picking the transport (GENERATOR_OTLP_TRANSPORT) and the receiver dual-binding both ports, a gRPC<->HTTP mismatch could never surface: whatever was sent always had a listener, and the correctness check only counts records. Now each case serves only the transport under test: - otlp_grpc_to_otlp_grpc_* -> otlp_http_status: false (gRPC only) - otlp_to_otlp_generic_* -> otlp_grpc_status: false (HTTP only) - otlp_pipeline_to_otlp_* -> otlp_grpc_status: false (HTTP only) A regression that served the wrong transport now fails (nothing bound on the other port -> 100% loss) instead of passing silently. Verified all three correctness cases still pass 500/500 over their intended transport. Note: this isolates INGRESS (generator->device). The egress leg (vmetric target->receiver) still can't catch a mismatch because the bench receiver dual-binds 4317+4318 by design. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
WalkthroughThis PR adds comprehensive Kafka/Redpanda support to the PipeBench framework, enabling producer and consumer benchmarking. It introduces Kafka configuration primitives, a franz-go-based generator, Docker compose orchestration with Redpanda services, and seven new test case families covering correctness, performance, batching, and crash-recovery scenarios across ten subject implementations. ChangesKafka framework integration
Kafka test cases and configurations
OTLP configuration updates
🎯 4 (Complex) | ⏱️ ~60 minutes
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cases/kafka_to_tcp_performance/configs/bindplane-agent.yaml`:
- Around line 2-4: The inline header comment incorrectly says `encoding: text`;
update it to reflect the actual kafka receiver configuration by changing the
comment to indicate `encoding: "raw"` (i.e., refer to the kafka receiver
encoding setting `encoding: "raw"` used in the bindplane-agent kafka receiver)
so the comment matches the active config and prevents confusion during
troubleshooting.
In `@internal/orchestrator/docker.go`:
- Around line 171-175: The docker compose template currently emits a depends_on
reference to redpanda-init whenever .GenMode == "kafka", but redpanda-init is
only rendered when .KafkaEnabled is true, causing a broken dependency; update
the template to wrap the depends_on redpanda-init block with the same
KafkaEnabled guard (i.e., only emit the depends_on: redpanda-init: condition:
service_completed_successfully when .KafkaEnabled is true) so references to
redpanda-init are emitted only when the redpanda-init service is rendered (also
apply the same guard to the other redpanda-init usages around the section
covering the existing 348-392 range).
- Around line 220-223: The plural generator services are missing the Kafka env
vars; update the template that renders each generator-<id> (the loop that
creates per-generator services) to include GENERATOR_KAFKA_TOPIC and
GENERATOR_KAFKA_BATCH when .KafkaEnabled is true, using .KafkaTopic and
.GenKafkaBatch (same keys currently added only to the singular generator block).
Locate the portion that emits generator-<id> service blocks (the range/loop for
generators) and copy the conditional env entries ({{- if .KafkaEnabled }}
GENERATOR_KAFKA_TOPIC: "{{ .KafkaTopic }}" GENERATOR_KAFKA_BATCH: "{{
.GenKafkaBatch }}" {{- end }}) into that per-generator template so each
generator-<id> inherits the case-configured Kafka values.
In `@internal/runner/runner.go`:
- Around line 1632-1655: The function runKafkaInflightCrash constructs a
results.RunResult (variable result) but never persists it; add a call to
r.store.Save(...) to save result to the results store before returning, mirror
the same error handling and context usage used in
runPersistenceCorrectness/runPersistenceShutdownCorrectness (i.e., call
r.store.Save with the same ctx/result and return any save error), and ensure
FailReason is preserved when saving.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 404142e5-fe46-416f-91bd-7da377a249a7
⛔ Files ignored due to path filters (1)
containers/generator/go.sumis excluded by!**/*.sum
📒 Files selected for processing (64)
README.mdcases/kafka_batch_to_tcp_correctness/case.yamlcases/kafka_batch_to_tcp_correctness/configs/logstash.confcases/kafka_batch_to_tcp_correctness/configs/vector.tomlcases/kafka_batch_to_tcp_correctness/configs/vmetric.ymlcases/kafka_crash_correctness/case.yamlcases/kafka_crash_correctness/configs/vmetric.ymlcases/kafka_inflight_crash_correctness/case.yamlcases/kafka_inflight_crash_correctness/configs/vmetric.ymlcases/kafka_restart_correctness/case.yamlcases/kafka_restart_correctness/configs/vmetric.ymlcases/kafka_to_tcp_correctness/case.yamlcases/kafka_to_tcp_correctness/configs/bindplane-agent.yamlcases/kafka_to_tcp_correctness/configs/fluent-bit.confcases/kafka_to_tcp_correctness/configs/grafana-alloy.alloycases/kafka_to_tcp_correctness/configs/logstash.confcases/kafka_to_tcp_correctness/configs/otel-collector.yamlcases/kafka_to_tcp_correctness/configs/telegraf.confcases/kafka_to_tcp_correctness/configs/tenzir.yamlcases/kafka_to_tcp_correctness/configs/vector.tomlcases/kafka_to_tcp_correctness/configs/vmetric.ymlcases/kafka_to_tcp_performance/case.yamlcases/kafka_to_tcp_performance/configs/bindplane-agent.yamlcases/kafka_to_tcp_performance/configs/cribl-stream/cribl.initedcases/kafka_to_tcp_performance/configs/cribl-stream/cribl.ymlcases/kafka_to_tcp_performance/configs/cribl-stream/inputs.ymlcases/kafka_to_tcp_performance/configs/cribl-stream/messages.ymlcases/kafka_to_tcp_performance/configs/cribl-stream/outputs.ymlcases/kafka_to_tcp_performance/configs/cribl-stream/pipelines/route.ymlcases/kafka_to_tcp_performance/configs/fluent-bit.confcases/kafka_to_tcp_performance/configs/grafana-alloy.alloycases/kafka_to_tcp_performance/configs/logstash.confcases/kafka_to_tcp_performance/configs/otel-collector.yamlcases/kafka_to_tcp_performance/configs/telegraf.confcases/kafka_to_tcp_performance/configs/tenzir.yamlcases/kafka_to_tcp_performance/configs/vector.tomlcases/kafka_to_tcp_performance/configs/vmetric.ymlcases/otlp_grpc_to_otlp_grpc_correctness/configs/vmetric.ymlcases/otlp_grpc_to_otlp_grpc_performance/configs/vmetric.ymlcases/otlp_pipeline_to_otlp_correctness/configs/vmetric.ymlcases/otlp_pipeline_to_otlp_performance/configs/vmetric.ymlcases/otlp_to_otlp_generic_correctness/configs/vmetric.ymlcases/otlp_to_otlp_generic_performance/configs/vmetric.ymlcases/tcp_to_kafka_batch_to_tcp_correctness/case.yamlcases/tcp_to_kafka_batch_to_tcp_correctness/configs/vmetric.ymlcases/tcp_to_kafka_to_tcp_correctness/case.yamlcases/tcp_to_kafka_to_tcp_correctness/configs/fluent-bit.confcases/tcp_to_kafka_to_tcp_correctness/configs/logstash.confcases/tcp_to_kafka_to_tcp_correctness/configs/vector.tomlcases/tcp_to_kafka_to_tcp_correctness/configs/vmetric.ymlcases/tcp_to_kafka_to_tcp_performance/case.yamlcases/tcp_to_kafka_to_tcp_performance/configs/fluent-bit.confcases/tcp_to_kafka_to_tcp_performance/configs/logstash.confcases/tcp_to_kafka_to_tcp_performance/configs/vector.tomlcases/tcp_to_kafka_to_tcp_performance/configs/vmetric.ymlcmd/harness/main.gocontainers/generator/go.modcontainers/generator/kafka.gocontainers/generator/main.gointernal/config/case.gointernal/config/subject.gointernal/orchestrator/compose_render_test.gointernal/orchestrator/docker.gointernal/runner/runner.go
… result save - bindplane-agent.yaml (kafka_to_tcp correctness + performance): header comment said "encoding: text" but the kafka receiver uses encoding: "raw". Corrected so the comment matches the active config. - orchestrator/docker.go: the singular generator's `depends_on: redpanda-init` was gated on GenMode=="kafka" alone, while redpanda-init only renders when KafkaEnabled. Now gated on `and (eq .GenMode "kafka") .KafkaEnabled`, matching the subject's already-guarded dep — a dep never references an unrendered service. No output change for any current case (mode=kafka always pairs with a kafka block); closes a latent compose-parse failure for a misconfigured case. - runner/runKafkaInflightCrash: built the RunResult but never called r.store.Save, so the in-flight crash result was never persisted (Run's contract is to return the *persisted* result). Added Save mirroring the other persistence/crash runners. Verified the result now lands in the store and the case still passes 20000/20000. Skipped (not currently valid): the plural generator-<id> template block lacks the Kafka env vars, but no case uses plural `generators:`, so that block is never rendered — fixing it would be speculative dead-path maintenance. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Deploying pipebench with
|
| Latest commit: |
bb6fbb2
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://dbcafad1.pipebench.pages.dev |
| Branch Preview URL: | https://kafka-support.pipebench.pages.dev |
Summary
Adds first-class Kafka support to the bench. The generator can now produce to
Kafka, the orchestrator stands up a Redpanda broker, and a new test type
exercises the consumer path through the perf/correctness verdict flow. On top
of that come nine new cases covering straight consume, full round-trip,
batch packing, and crash/restart resilience — run against up to nine subjects.
Harness changes
kafka_performance/kafka_correctness), reusing the existing perf/correctness verdicts.IsKafkaType()+ validation in the runner.kafkaproducer mode (franz-go) with per-message vsJSON-array packing (
kafka_batch). One franz-go client per connection plus adeeper buffer scales produce from ~1.6M to ~8.4M rec/s, so the subject's
consumer — not the generator — is the bottleneck in the perf case.
and gates generator/subject
depends_onon the generator mode (TCP waits onthe subject; Kafka waits on
redpanda-init).runKafkaInflightCrash.vmetric/director2.0.1 → 2.0.2.New cases
Consume path
kafka_to_tcp_correctness— Kafka → TCP, exact 1:1 counts, 9 subjects.kafka_to_tcp_performance— 8-partition parallel consume (8 device workers,8 generator connections, Redpanda smp=6/6G), ~3.35M EPS vs ~1.9M single-partition.
Round-trip (TCP → Kafka target → Kafka device → TCP, two
if-isolated routesto avoid the Kafka feedback loop)
tcp_to_kafka_to_tcp_correctness(paced, lossless) /tcp_to_kafka_to_tcp_performance(flood, loss informational).Batch mode
kafka_batch_to_tcp_correctness— generator packs 10 objects/message as a JSONarray; device splits them back (
split_mode: json_array).tcp_to_kafka_batch_to_tcp_correctness— target batches up to 100 events/message;device splits. vmetric-only (only its Kafka target packs
json_array).Resilience (all assert no loss; duplicates reported, not failed)
kafka_restart_correctness(SIGTERM) — buffer with receiver down, restart, verify.kafka_crash_correctness(SIGKILL) — same, hard kill.kafka_inflight_crash_correctness— SIGKILL mid-delivery to exercise theat-least-once over-delivery window.
Subjects
kafka_to_tcp_correctnessruns 9 subjects (verified 2000/2000, 0% loss):vmetric, vector, fluent-bit, logstash, otel-collector, bindplane-agent,
grafana-alloy, tenzir, telegraf. cribl-stream is added to the perf case only
(re-emits ~26 dup events/run — fine for throughput, fails exact-count).
Exclusions (axosyslog, filebeat, fluentd, nxlog, rotel) are documented inline.
Also included
under test (gRPC or HTTP, not both), so a transport mismatch fails with
100% loss instead of passing silently. Isolates the ingress leg; egress is
noted as still dual-bound by the bench receiver.
Verification
Consume, round-trip, batch-split, and crash/restart correctness cases all pass
with 0% loss for every listed subject; perf cases produce EPS across subjects.
Summary by CodeRabbit
New Features
Documentation