Describe the bug
The Cosmos Spark change feed connector can spend excessive driver time planning a microbatch when a single streaming query reads a container with many feed ranges / continuation tokens.
The hot path appears to be ChangeFeedMicroBatchStream.planInputPartitions, which extracts a per-partition starting ChangeFeedState from the same start offset. Today, that repeatedly copies and sorts the full continuation-token list for each planned partition.
If P is the number of planned Spark input partitions / feed ranges and T is the number of continuation tokens in the starting offset, this behaves roughly like:
P * (T log T + T)
For large containers, this can dominate or stall microbatch planning before executors start consuming data. In some examples, we were seeing this take >15min on jobs for >30k continuation tokens / feed ranges.
Exception or Stack Trace
No exception is thrown. The symptom is very long driver-side microbatch planning / offset planning time.
Observed stack / hot path from investigation:
ChangeFeedMicroBatchStream.planInputPartitions
- per partition:
SparkBridgeImplementationInternal.extractChangeFeedStateForRange(...)
ChangeFeedState.extractForEffectiveRange(...)
ChangeFeedState.extractContinuationTokens(...)
- repeated copy/sort/scan of the same continuation-token list
To Reproduce
Steps to reproduce the behavior:
- Use a Cosmos container with a large number of physical feed ranges / continuation tokens.
- Run a Spark Structured Streaming change feed query over the full container with one checkpoint / one streaming query.
- Use a starting offset/checkpoint that contains many composite continuation tokens.
- Trigger a microbatch and observe driver-side planning time in
planInputPartitions.
- Compare planning time with an implementation that snapshots/sorts the start continuation tokens once per planning pass and reuses that sorted view for all per-partition extraction calls.
Code Snippet
Representative Spark read:
val df = spark.readStream
.format("cosmos.oltp.changeFeed")
.options(cosmosConfig)
.option("spark.cosmos.changeFeed.mode", "Incremental")
.option("spark.cosmos.changeFeed.startFrom", "Beginning")
.load()
df.writeStream
.format("delta")
.option("checkpointLocation", checkpointPath)
.start(outputPath)
The issue is most visible when the checkpoint/start offset contains many continuation tokens and Spark plans many feed-range partitions.
Expected behavior
The connector should avoid repeatedly sorting/parsing the same start-offset continuation tokens for each planned partition. For example, in this particular call-site (extractForEffectiveRange) and the equivalent ones across the version implementations:
|
val parsedStartChangeFeedState = SparkBridgeImplementationInternal.parseChangeFeedState(start.changeFeedState) |
|
end |
|
.inputPartitions |
|
.get |
|
.map(partition => { |
|
val index = partitionIndexMap.asScala.getOrElseUpdate(partition.feedRange, partitionIndex.incrementAndGet()) |
|
partition |
|
.withContinuationState( |
|
SparkBridgeImplementationInternal |
|
.extractChangeFeedStateForRange(parsedStartChangeFeedState, partition.feedRange), |
|
clearEndLsn = false) |
|
.withIndex(index) |
|
}) |
If we were to:
- Sort the continuation tokens ahead of time
- Use binary search when looking for overlapping feed ranges
It would drastically reduce the overhead. I do acknowledge that for malformed or legacy overlapping ranges, we may want to just fallback on the full sorted scan.
Screenshots
N/A
Setup (please complete the following information):
- OS: Linux
- IDE: N/A
- Library/Libraries:
com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12 / Cosmos Spark change feed connector
- Java version: 17
- App Server/Environment: Apache Spark Structured Streaming
- Frameworks: Spark 3.x
Additional context
This issue is a bottleneck before Spark executors even begin to read rows, and occurs on the driver's microbatch planning.
Increasing spark.cosmos.changeFeed.itemCountPerTriggerHint may amortize fixed per-trigger overhead over more rows, but it does not remove the repeated continuation-token planning work.
The issue is more severe for unsharded/full-container streaming queries because both the planned partition count and continuation-token count can be large. Sharded consumers may avoid the worst case because each query/checkpoint owns a smaller subset of feed ranges.
Information Checklist
Describe the bug
The Cosmos Spark change feed connector can spend excessive driver time planning a microbatch when a single streaming query reads a container with many feed ranges / continuation tokens.
The hot path appears to be
ChangeFeedMicroBatchStream.planInputPartitions, which extracts a per-partition startingChangeFeedStatefrom the same start offset. Today, that repeatedly copies and sorts the full continuation-token list for each planned partition.If
Pis the number of planned Spark input partitions / feed ranges andTis the number of continuation tokens in the starting offset, this behaves roughly like:P * (T log T + T)For large containers, this can dominate or stall microbatch planning before executors start consuming data. In some examples, we were seeing this take >15min on jobs for >30k continuation tokens / feed ranges.
Exception or Stack Trace
No exception is thrown. The symptom is very long driver-side microbatch planning / offset planning time.
Observed stack / hot path from investigation:
ChangeFeedMicroBatchStream.planInputPartitionsSparkBridgeImplementationInternal.extractChangeFeedStateForRange(...)ChangeFeedState.extractForEffectiveRange(...)ChangeFeedState.extractContinuationTokens(...)To Reproduce
Steps to reproduce the behavior:
planInputPartitions.Code Snippet
Representative Spark read:
The issue is most visible when the checkpoint/start offset contains many continuation tokens and Spark plans many feed-range partitions.
Expected behavior
The connector should avoid repeatedly sorting/parsing the same start-offset continuation tokens for each planned partition. For example, in this particular call-site (
extractForEffectiveRange) and the equivalent ones across the version implementations:azure-sdk-for-java/sdk/cosmos/azure-cosmos-spark_3-5/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala
Lines 120 to 132 in 8c1c59e
If we were to:
It would drastically reduce the overhead. I do acknowledge that for malformed or legacy overlapping ranges, we may want to just fallback on the full sorted scan.
Screenshots
N/A
Setup (please complete the following information):
com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12/ Cosmos Spark change feed connectorAdditional context
This issue is a bottleneck before Spark executors even begin to read rows, and occurs on the driver's microbatch planning.
Increasing
spark.cosmos.changeFeed.itemCountPerTriggerHintmay amortize fixed per-trigger overhead over more rows, but it does not remove the repeated continuation-token planning work.The issue is more severe for unsharded/full-container streaming queries because both the planned partition count and continuation-token count can be large. Sharded consumers may avoid the worst case because each query/checkpoint owns a smaller subset of feed ranges.
Information Checklist