[Cosmos] Add azure-cosmos-avad-test module for AVAD Change Feed Processor soak testing#49050
Draft
jeet1995 wants to merge 28 commits intoAzure:mainfrom
Draft
[Cosmos] Add azure-cosmos-avad-test module for AVAD Change Feed Processor soak testing#49050jeet1995 wants to merge 28 commits intoAzure:mainfrom
jeet1995 wants to merge 28 commits intoAzure:mainfrom
Conversation
… testing Adds a new internal soak test tool under sdk/cosmos/ for validating All Versions and Deletes (AVAD) Change Feed Processor correctness. Components: - Ingestor: generates create/replace/upsert/delete workload - AvadReader: CFP in AVAD mode with previous-image and CRTS validation - LatestVersionReader: CFP in latest-version mode for parity checks - Reconciler: gap detection, LSN/CRTS ordering, AVAD-superset-of-LV checks - HealthMonitor: online health checks via Cosmos reconciliation container - Helm chart: AKS deployment with StatefulSets for consumers - Chaos scenarios: pod kill, restart storm, lease throttle, partition split, network fault, node drain Key AVAD validations: - LSN extracted from ChangeFeedMetaData (correct for delete tombstones) - CRTS (conflict resolution timestamp) captured and ordering validated - previousImage presence checked on replace/delete events - AVAD superset of LV parity assertion Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Moves AVAD code from standalone azure-cosmos-avad-test module into azure-cosmos-benchmark: - Java sources under com.azure.cosmos.avadtest package - Operational files (Dockerfile, Helm, chaos, scripts) under avad-soak/ - Converted from picocli to jcommander (no new dependencies) - Removed standalone module from sdk/cosmos/pom.xml - Reverted picocli addition to external_dependencies.txt Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Convert Java 9+ features to Java 8 equivalents: - Switch expressions -> if/else chains - ProcessHandle.current().pid() -> ManagementFactory.getRuntimeMXBean().getName() - Path.of() -> Paths.get() - String.isBlank() -> .trim().isEmpty() - List.of() -> Collections.singletonList() - String.repeat() -> StringBuilder loop - var -> explicit types Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Blocking fixes:
- AvadReader: add shutdown hook (matching LatestVersionReader pattern)
- Ingestor: implement rate limiting with Flux.interval instead of
unbounded Flux.generate + sample (which did not enforce OPS_PER_SEC)
- Ingestor: fix delete correlation by stamping eventId into document
before delete so AVAD reader's previous image has correct key
- Reconciler: fix LSN/CRTS ordering checks to sort by delivery order
(seqNo) then verify monotonicity, instead of sorting by value (no-op)
- Ingestor: fix SLF4J format - {:.1f} is not valid SLF4J syntax
Watch fixes:
- ReconciliationWriter: fix close() to drain before dispose
- ReconciliationWriter: use CosmosException.getStatusCode() for retry
- LatestVersionReader: use metadata.getLogSequenceNumber() not _lsn
- AvadReader: implement CRTS ordering validation per partition
- HealthMonitor: treat -1 (query failures) as unhealthy
- Dockerfile: fix build context path (point to module root)
- setup-acr.sh: fix PROJECT_DIR to reach module root
- application.properties: remove hardcoded endpoint
- infra/README: document secret prerequisites, remove hardcoded names
Scope changes:
- Remove 4 chaos scenarios, keep only pod-kill and partition-split
- Pre-compute payload string instead of per-operation allocation
- Clear deleted IDs from ring buffer to avoid retargeting
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace individual point operations (createItem, replaceItem, upsertItem, deleteItem) with executeBulkOperations for higher throughput. Changes: - Build batch of CosmosItemOperation per tick, submit via bulk API - Merge replace into upsert (both trigger AVAD previousImage) - Operation mix: 40% create, 40% upsert, 20% delete - Tick interval increased to 100ms (larger batches, better bulk efficiency) - Track OpMeta per operation for correlating bulk responses to event log - Remove read-before-write for replaces (upsert handles it) - Delete uses docId as correlation key (no read-before-delete needed) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
SoakMetrics was never wired to any workload component — /metrics endpoint always returned zeros. Each component already tracks its own counters via LongAdder and reports via SLF4J. Removed SoakMetrics.java and the /metrics endpoint from HealthServer. HealthServer now only serves /health and /ready (K8s probes). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
All components now follow the same close() sequence:
1. Log 'Closing {Component}...'
2. Stop workload-specific resources (CFP processors, running flag)
3. Close eventLog (try-catch)
4. Close reconWriter
5. Close Cosmos client
6. Log summary and 'closed' confirmation
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Each component (Ingestor, AvadReader, LatestVersionReader) was creating 2 CosmosAsyncClient instances — one for the workload, one inside ReconciliationWriter. This wastes connections and memory. Now ReconciliationWriter accepts a shared client instead of creating its own. The component owns the client lifecycle and closes it in close(). ReconciliationWriter only drains its sink and disposes its subscription. Also: store and dispose Reactor subscriptions in Ingestor to prevent leaks, and explicitly shut down HealthServer's ExecutorService. Before: 6 CosmosAsyncClients across 3 components After: 3 CosmosAsyncClients (1 per component) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace application.properties with a structured JSON config file. TestConfig now supports two load paths: --config config.json → JSON file with env var overrides (no --config) → env vars only (backward compatible) Precedence: env var > JSON value > built-in default. Secrets (COSMOS_KEY) should always come from env vars. JSON schema groups settings by concern: cosmos.* — endpoint, database, containers, region ingestor.* — opsPerSec, docSize, partitions, duration, workers logging.* — produced/consumed log file paths Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Local mode (run-local.sh / run-local.ps1): - Launches ingestor + avad-reader + lv-reader as local JVM processes - Each on its own health port (8080/8081/8082) - Builds module automatically on first run - Filters classpath to use logback over log4j - Monitors process health, logs to output dir - Ctrl+C stops all three cleanly AKS mode (run-soak.sh): - Cleaned up to reference only pod-kill + partition-split chaos - Removed references to deleted chaos scripts - Simplified health check (pod readiness, no /metrics scraping) Removed run-cutover.sh (SSH-to-VM orchestration — superseded). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
AllVersionsAndDeletes mode does not support startTime option. Also removed from LV reader (not needed for fresh soak runs). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
CFP lease management requires contentResponseOnWriteEnabled(true). AvadReader and Ingestor were missing it — only LatestVersionReader had it. Now all three clients set it consistently. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
CFP's handleChanges callback must complete before the lease is checkpointed. The old async Sink/Flux pipeline let the lease advance before writes were persisted — causing drops on backpressure and data loss on crash. Now record() blocks until the upsert succeeds or retries exhaust. This ensures no lease checkpoint races. Retries use exponential backoff (500ms, 1s, 2s, 4s) with the same CosmosException-based retryability check. Removed: Sinks buffer, Disposable subscription, requeue logic, dropCount (writes either succeed or error — no silent drops). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The old approach matched bulk responses to metadata by scanning a List for matching docId — this failed when multiple ops targeted the same docId or when op.getId() didn't match expectations. Now uses IdentityHashMap<CosmosItemOperation, OpMeta> keyed by object reference. executeBulkOperations returns the same CosmosItemOperation in the response, so reference equality gives O(1) lookup with no ambiguity. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Flux.interval + concatMap causes OverflowException when bulk batch takes longer than the tick interval — concatMap requests 1 at a time but interval keeps emitting ticks that can't be buffered. Now uses a simple while loop: submit batch (blocking via toStream), sleep for remaining tick time. This guarantees rate limiting without reactive backpressure issues. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
CRTS ordering validation belongs in the Reconciler (offline), not in the reader. The reader should just record what it sees — the Reconciler already has checkOrderingByCrts() for this. Removes ConcurrentHashMap<String, AtomicLong> that would grow unboundedly over multi-day runs (one entry per partition key). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
record() now throws RuntimeException on permanent write failure instead of silently returning. This causes CFP's handleChanges to fail, which prevents the lease continuation token from advancing. CFP will retry the batch on the next poll. Without this, a recon write failure would silently drop the event and advance the lease — making the gap undetectable. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
HealthMonitor no longer writes snapshots to soak-health. It still queries the reconciliation container for gap/parity/previousImage checks and logs results, but doesn't persist them. The offline Reconciler is the authoritative validation path. soak-health was redundant and produced misleading UNHEALTHY entries due to flaky N+1 gap detection queries. Removed: healthContainer field, writeHealthSnapshot method, soak-health from setup-cosmos.sh, unused Jackson imports. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Remove EventLog (CSV file logging) entirely. Single reconciliation path through Cosmos 'reconciliation' container. Reconciler rewritten to query Cosmos by source field: - 5 source types: ingestor, cfp-lv, cfp-avad, spark-lv, spark-avad - Per-reader gap detection, LSN/CRTS ordering, previousImage checks - Auto-selects checks by source type (AVAD gets CRTS + previousImage) - Skips sources that don't exist yet (e.g., spark-* before Spark runs) CLI modes: --mode reconcile --source ingestor --against cfp-avad --mode reconcile --full (runs all 8 check pairs) Removed: EventLog.java, producedLogFile/consumedLogFile config, --produced/--consumed/--lv/--avad CLI args, logging section from config.json. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Databricks-compatible PySpark notebooks that read the change feed using azure-cosmos-spark connector and write to the reconciliation container with source='spark-lv' and source='spark-avad'. spark_lv_reader.py: - Incremental (Latest Version) mode - Structured streaming with 10s trigger - Writes correlationId, seqNo, opType, partitionKey, lsn spark_avad_reader.py: - Full Fidelity (AVAD) mode - Extracts operationType, lsn, crts from metadata - Checks previousImage on replace/delete - Handles delete tombstones (extracts fields from previous image) - Includes ad-hoc correctness check cell Both write to the reconciliation container using the same doc schema as the Java CFP readers, enabling cross-engine parity checks via the Reconciler's --full suite. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The Java reconciler chokes on cross-partition SELECT DISTINCT over millions of docs. Spark handles this natively with bulk reads + DataFrame operations. spark_reconciler.py runs all 8 reconciliation checks: Q1: Summary dashboard (count/unique/min/max per source) Q2: Gap detection (ingestor → each of 4 consumers) Q3: Parity (cfp-lv ⊆ cfp-avad, spark-lv ⊆ spark-avad) Q4: Cross-engine (cfp ↔ spark, both modes) Q5: LSN ordering per partition (window function + lag) Q6: CRTS ordering per partition (AVAD only) Q7: previousImage validation (AVAD only) Q8: Duplicate detection (at-least-once rate) Skips sources with 0 events (e.g., spark-* before notebooks run). Returns PASS/FAIL via dbutils.notebook.exit for job orchestration. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The fat jar's manifest mainClass is com.azure.cosmos.benchmark.Main (the benchmark's main), not our avadtest.Main. Use -cp instead of -jar so we can specify the correct main class. Simplified to single-stage Dockerfile — build fat jar locally first (mvn package -Dpackage-with-dependencies), then COPY into runtime image. Avoids dependency resolution failures in ACR Tasks. Updated setup-acr.sh to build locally before pushing. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
ReconciliationWriter now uses bulk upsert (executeBulkOperations) instead of per-event blocking upserts. Events are buffered via record(), then flushed as a single bulk batch at the end of each handleChanges callback. Still synchronous — flush blocks until all writes complete, so lease doesn't advance until persisted. CFP tuning: - feedPollDelay: 1s → 100ms (tighter polling loop) - maxItemCount: default(100) → 1000 (larger pages per poll) - preferredRegion should match AKS region (East US) AKS result: ~14x improvement (1,405 → 19,398 ops in 2 min). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Config: use try/except for dbutils.widgets.get() instead of getAll().name (which returns strings, not objects) - Checkpoint: use /Workspace/ path instead of /tmp/ (DBFS disabled) - LV reader: remove _lsn column reference (not exposed by connector) - AVAD reader: remove _rawBody/previous/metadata references — Spark connector flattens AVAD events to the same schema as LV. Use available columns directly. Verified: spark-lv wrote 60,801 docs to reconciliation container. spark-avad running but needs live ingestor data (startFrom=Now). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The Spark connector uses 'AllVersionsAndDeletes' (not 'FullFidelity') as the changeFeed.mode value. startFrom must be 'Now' for AVAD mode. Note: spark-avad requires the container to have changeFeedPolicy enabled (Full Fidelity retention). Without it, the AVAD change feed stream produces no events. Enable via Azure Portal: Container → Settings → Change Feed → Full Fidelity. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Root cause: notebook referenced non-existent columns (eventId, seqNo, tenantId) which are user document fields flattened only in LV mode. In AVAD mode, these are nested inside _rawBody JSON. The filter on correlationId (aliased from null eventId) dropped all rows. Changes: - Use get_json_object(_rawBody, $.eventId) to extract user fields - Use actual AVAD columns: _lsn, operationType, crts, previous - Check previous column for hasPreviousImage instead of hardcoded False - Change startFrom to Beginning (Now returns nothing without active writes) - Document the AVAD schema columns from ChangeFeedTable.scala Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Adds \�zure-cosmos-avad-test\ — an internal soak test tool under \sdk/cosmos/\ for validating All Versions and Deletes (AVAD) Change Feed Processor correctness under sustained load and chaos conditions.
Components
Key AVAD Validations
Structure
Follows the pattern of \�zure-cosmos-benchmark:
Verification
\\�ash
cd sdk/cosmos/azure-cosmos-avad-test
mvn clean compile
\\