Skip to content

feat: introduce processing rate to max ratio as poll-idle-ratio replacement#19622

Open
Fly-Style wants to merge 6 commits into
apache:masterfrom
Fly-Style:feat/poll-idle-ratio-replacement
Open

feat: introduce processing rate to max ratio as poll-idle-ratio replacement#19622
Fly-Style wants to merge 6 commits into
apache:masterfrom
Fly-Style:feat/poll-idle-ratio-replacement

Conversation

@Fly-Style

@Fly-Style Fly-Style commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Summary

Kafka's poll-idle-ratio is not the best possible metric to represent the idle cost for autoscaler - it reflects time spent polling, not whether the task has spare processing capacity. Also, it is not supported for Kinesis, which is still in play in read-world productions.

This patch adds an alternative idle signal : 1 - (avgProcessingRate / maxObservedRate), gated behind a new opt-in flag useUtilizationRatio (default false, so existing deployments are unaffected).

  • CostBasedAutoScaler: tracks a bounded watermark of the task's best-observed processing rate (maxObservedRate), feeding CostMetrics.
  • WeightedCostFunction: when the flag is on, derives idle ratio from that utilization ratio instead of pollIdleRatio; falls back to IDEAL_IDLE_RATIO until a watermark sample exists (cold start).
  • CostBasedAutoScalerConfig: new useUtilizationRatio boolean, wired through builder/serde/equals/hashCode.
  • CostMetrics: new nullable maxObservedRate field; old constructor kept (delegates with null) since three test call sites still use it.

This PR has:

  • been self-reviewed.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.

@Fly-Style Fly-Style changed the title Introduce processing rate to max ratio as poll-idle-ratio replacement feat: introduce processing rate to max ratio as poll-idle-ratio replacement Jun 23, 2026
@Fly-Style Fly-Style requested a review from kfaraz June 24, 2026 08:45

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 1
P3 0
Total 1
Severity Findings
P0 0
P1 0
P2 1
P3 0
Total 1

Reviewed 8 of 8 changed files.


This is an automated review by Codex GPT-5.5


final int lowInitialTaskCount = 1;
// This ensures tasks are busy processing (low idle ratio)
Executors.newSingleThreadExecutor().submit(() -> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Shut down the background publisher executor

The new test creates a single-thread ExecutorService and immediately drops it. That executor uses a non-daemon worker and is never shut down, so the embedded-test JVM can stay alive after the test returns; on timeout or failure it can also keep publishing into the topic while cleanup is running. Keep a reference and shut it down/cancel the Future in a finally block or use an existing managed executor.

@kfaraz kfaraz left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, this makes sense. Haven't reviewed the tests yet.

* bottleneck, so the rate observed is the task's own ceiling. The max of these samples
* becomes {@link CostMetrics#getMaxObservedRate()}.
*/
private final EvictingQueue<Double> lagGatedRateSamples;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final EvictingQueue<Double> lagGatedRateSamples;
private final EvictingQueue<Double> lagGatedProcessingRateSamples;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed in other way due to new circumstances.

/**
* Derives the current idle ratio from measured utilization ({@code avgProcessingRate / maxObservedRate}).
*/
static double estimateIdleFromUtilization(CostMetrics metrics)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this method should exist in CostMetrics itself?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename this method to estimateIdleRatioFromProcessingRate() and move it to the class CostMetrics.

{
final Double maxObservedRate = metrics.getMaxObservedRate();
if (maxObservedRate == null || maxObservedRate <= 0) {
return IDEAL_IDLE_RATIO;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if there has been no lag for a while, we will assume that we are already in the optimal usage band?
This would mean that scale down would not happen when, ideally, it should.

I wonder if it would be better to collect samples of avgProcessingRate even when there is no lag, and simply use the max to determine the max observed rate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a slippery place, but after short consideration I'd agree with you. Reworking that part.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! We can tune it further based on how it performs in real clusters.

@Fly-Style Fly-Style requested a review from kfaraz June 25, 2026 10:11

@kfaraz kfaraz left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One final non-blocking comment. Rest looks good.

/**
* Derives the current idle ratio from measured utilization ({@code avgProcessingRate / maxObservedRate}).
*/
static double estimateIdleFromUtilization(CostMetrics metrics)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename this method to estimateIdleRatioFromProcessingRate() and move it to the class CostMetrics.

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 1
P3 0
Total 1

Reviewed 8 of 8 changed files.


This is an automated review by Codex GPT-5.5

static double estimateIdleFromUtilization(CostMetrics metrics)
{
final Double maxObservedRate = metrics.getMaxObservedRate();
final double utilization = Math.min(1.0, metrics.getAvgProcessingRate() / maxObservedRate);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Restore fallback before dividing by maxObservedRate

When usePollIdleRatio is false, this method derives idle ratio from maxObservedRate, but CostMetrics still allows that value to be null and the added tests explicitly cover null and zero watermarks. Dividing by the boxed value now throws on null, and a zero watermark yields Infinity/NaN behavior instead of the intended IDEAL_IDLE_RATIO cold-start fallback. Please keep the maxObservedRate == null || maxObservedRate <= 0 guard before this division.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants