Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
// HTTP thread utilization
HTTP_THREAD_UTILIZATION("httpThreadUtilization", true),
// Track the concurrent executions of the API resources that use @ManagedAsync
MANAGED_ASYNC_ACTIVE_THREADS("threads", true);
MANAGED_ASYNC_ACTIVE_THREADS("threads", true),
// Backfill circuit breaking: number of active backfill Kafka topics currently running for this table
BACKFILL_TOPICS_IN_PROGRESS("backfillTopicsInProgress", false);


private final String _gaugeName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
PARTITION_GROUP_METADATA_FETCH_ERROR("failures", true),
OFFSET_AUTO_RESET_SKIPPED_OFFSETS("autoResetSkippedOffsets", false),
OFFSET_AUTO_RESET_BACKFILL_OFFSETS("autoResetBackfillOffsets", false),
OFFSET_AUTO_RESET_BACKFILL_SKIPPED_PAUSED("BackfillSkippedPaused", false),
OFFSET_AUTO_RESET_BACKFILL_SKIPPED_MAX_SEGMENTS("BackfillSkippedMaxSegments", false),
OFFSET_AUTO_RESET_BACKFILL_SKIPPED_MAX_CONCURRENT("BackfillSkippedMaxConcurrent", false),
OFFSET_AUTO_RESET_BACKFILL_SKIPPED_IN_FLIGHT("BackfillSkippedInFlight", false),
OFFSET_AUTO_RESET_HANDLER_INIT_FAILURE("BackfillHandlerInitFailure", false),
OFFSET_AUTO_RESET_AUTO_PAUSE_FAILURE("BackfillAutoPauseFailure", false),
OFFSET_AUTO_RESET_BACKFILL_CLEANUP_COMPLETED("BackfillCleanupCompleted", false),
// Audit logging metrics
AUDIT_REQUEST_FAILURES("failures", true),
AUDIT_RESPONSE_FAILURES("failures", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public static class ControllerPeriodicTasksConf {
"controller.realtime.offsetAutoReset.backfill.frequencyPeriod";
public static final String REALTIME_OFFSET_AUTO_RESET_BACKFILL_INITIAL_DELAY_IN_SECONDS =
"controller.realtime.offsetAutoReset.backfill.initialDelayInSeconds";
public static final String MAX_CONCURRENT_BACKFILLS_PER_CONTROLLER =
"controller.realtime.offsetAutoReset.maxConcurrentBackfillsPerController";
public static final String MAX_BACKFILL_COLLISIONS_BEFORE_AUTO_PAUSE =
"controller.realtime.offsetAutoReset.maxBackfillCollisionsBeforeAutoPause";
public static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_PERIOD =
"controller.broker.resource.validation.frequencyPeriod";
public static final String BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS =
Expand Down Expand Up @@ -1208,6 +1212,14 @@ public long getRealtimeOffsetAutoResetBackfillInitialDelaySeconds() {
getPeriodicTaskInitialDelayInSeconds());
}

public int getMaxConcurrentBackfillsPerController() {
return getProperty(ControllerPeriodicTasksConf.MAX_CONCURRENT_BACKFILLS_PER_CONTROLLER, -1);
}

public int getMaxBackfillCollisionsBeforeAutoPause() {
return getProperty(ControllerPeriodicTasksConf.MAX_BACKFILL_COLLISIONS_BEFORE_AUTO_PAUSE, 3);
}

public boolean isDeepStoreRetryUploadLLCSegmentEnabled() {
return getProperty(ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,24 @@ private String computeStartOffset(String nextOffset, StreamConfig streamConfig,
if (!streamConfig.isEnableOffsetAutoReset() || streamConfig.isBackfillTopic()) {
return nextOffset;
}
// Skip if backfill is manually paused for this topic
if (streamConfig.isOffsetAutoResetPaused()) {
LOGGER.info("Skipping offset auto reset for table {} topic {} — backfill is paused",
streamConfig.getTableNameWithType(), streamConfig.getTopicName());
return nextOffset;
}
// Skip if the table already has too many segments (lightweight ZK child-name list, no data deserialization)
int maxSegments = streamConfig.getOffsetAutoResetMaxSegmentsBeforeSkip();
if (maxSegments > 0) {
int segmentCount = ZKMetadataProvider.getSegments(_propertyStore, streamConfig.getTableNameWithType()).size();
if (segmentCount >= maxSegments) {
LOGGER.info("Skipping offset auto reset for table {} topic {} — segment count {} >= maxSegmentsBeforeSkip {}",
streamConfig.getTableNameWithType(), streamConfig.getTopicName(), segmentCount, maxSegments);
_controllerMetrics.addMeteredTableValue(streamConfig.getTableNameWithType(),
ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_SKIPPED_MAX_SEGMENTS, 1L);
return nextOffset;
}
}
long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
if (timeThreshold <= 0 && offsetThreshold <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
Expand All @@ -37,6 +38,7 @@
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
Expand All @@ -48,9 +50,12 @@ public class RealtimeOffsetAutoResetManager extends ControllerPeriodicTask<Realt
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeOffsetAutoResetManager.class);
private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
private final PinotHelixResourceManager _pinotHelixResourceManager;
private final ControllerConf _controllerConf;
private final Map<String, RealtimeOffsetAutoResetHandler> _tableToHandler;
private final Map<String, Set<String>> _tableTopicsUnderBackfill;
private final Map<String, Set<String>> _tableBackfillTopics;
// Key: "tableNameWithType:topicName:partitionId" — tracks consecutive in-flight collisions per partition
private final Map<String, Integer> _partitionInFlightCollisionCount;

public RealtimeOffsetAutoResetManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
Expand All @@ -60,9 +65,11 @@ public RealtimeOffsetAutoResetManager(ControllerConf config, PinotHelixResourceM
leadControllerManager, controllerMetrics);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
_pinotHelixResourceManager = pinotHelixResourceManager;
_controllerConf = config;
_tableToHandler = new ConcurrentHashMap<>();
_tableTopicsUnderBackfill = new ConcurrentHashMap<>();
_tableBackfillTopics = new ConcurrentHashMap<>();
_partitionInFlightCollisionCount = new ConcurrentHashMap<>();
}

@Override
Expand Down Expand Up @@ -105,36 +112,81 @@ protected void processTable(String tableNameWithType, RealtimeOffsetAutoResetMan
return;
}

// Skip triggering if the controller is already handling the maximum number of concurrent backfills
int maxConcurrent = _controllerConf.getMaxConcurrentBackfillsPerController();
if (context._shouldTriggerBackfillJobs && maxConcurrent > 0
&& _tableBackfillTopics.size() >= maxConcurrent) {
LOGGER.warn("Skipping backfill trigger for table {} — max concurrent backfills ({}) reached",
tableNameWithType, maxConcurrent);
_controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_SKIPPED_MAX_CONCURRENT, 1L);
context._shouldTriggerBackfillJobs = false;
}

if (context._shouldTriggerBackfillJobs) {
_tableTopicsUnderBackfill.putIfAbsent(tableNameWithType, ConcurrentHashMap.newKeySet());
String topicName = context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_NAME);
String partitionStr = context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION);
_tableTopicsUnderBackfill.get(tableNameWithType).add(topicName);

// Per-partition in-flight guard: _tableBackfillTopics contains the backfill Kafka topic names
// (not the main topic names), so any non-empty set indicates a backfill is already in flight.
// Track collisions per (table, topic, partition); auto-pause if collisions exceed the threshold.
String partitionKey = tableNameWithType + ":" + topicName + ":" + partitionStr;
Set<String> activeBackfillTopics = _tableBackfillTopics.get(tableNameWithType);
boolean anyBackfillInFlight = activeBackfillTopics != null && !activeBackfillTopics.isEmpty();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaker is described as per-partition, but the guard is actually table-wide: anyBackfillInFlight becomes true whenever any backfill topic exists for the table. After that, every unrelated (topic, partition) trigger increments its own collision counter and can auto-pause its source topic even though there was no collision on that partition. The check needs to prove that the active backfill matches the same source topic/partition, not just that the table has some backfill running.

if (anyBackfillInFlight) {
int collisions = _partitionInFlightCollisionCount.merge(partitionKey, 1, Integer::sum);
int maxCollisions = _controllerConf.getMaxBackfillCollisionsBeforeAutoPause();
_controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_SKIPPED_IN_FLIGHT, 1L);
LOGGER.warn("In-flight backfill collision #{} for partition key {} (active backfill topics: {})",
collisions, partitionKey, activeBackfillTopics);
if (maxCollisions > 0 && collisions >= maxCollisions) {
LOGGER.warn("Auto-pausing backfill for table {} topic {} after {} collisions",
tableNameWithType, topicName, collisions);
_controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_SKIPPED_PAUSED, 1L);
setPauseFlag(tableNameWithType, tableConfig, topicName);
context._shouldTriggerBackfillJobs = false;
}
// Below threshold: allow trigger to proceed (new backfill coexists with the ongoing one)
} else {
_partitionInFlightCollisionCount.remove(partitionKey);
}

StreamConfig topicStreamConfig = IngestionConfigUtils.getStreamConfigs(tableConfig).stream()
.filter(config -> topicName.equals(config.getTopicName()))
.findFirst().orElseThrow(() -> new RuntimeException("No matching topic found"));
LOGGER.info("Triggering backfill jobs with StreamConfig {}, topicName {}, properties {}",
topicStreamConfig, topicName, context._backfillJobProperties);
try {
long startOffset = Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM));
long endOffset = Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO));
if (_tableToHandler.get(tableNameWithType).triggerBackfillJob(tableNameWithType,
topicStreamConfig,
topicName,
Integer.parseInt(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)),
startOffset,
endOffset)) {
_controllerMetrics.addMeteredTableValue(tableNameWithType, ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_OFFSETS,
endOffset - startOffset);
if (context._shouldTriggerBackfillJobs) {
LOGGER.info("Triggering backfill jobs with StreamConfig {}, topicName {}, properties {}",
topicStreamConfig, topicName, context._backfillJobProperties);
try {
long startOffset = Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM));
long endOffset = Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO));
if (_tableToHandler.get(tableNameWithType).triggerBackfillJob(tableNameWithType,
topicStreamConfig,
topicName,
Integer.parseInt(partitionStr),
startOffset,
endOffset)) {
_controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_OFFSETS, endOffset - startOffset);
}
} catch (NumberFormatException e) {
LOGGER.error("Invalid backfill job properties for table: {}, properties: {}, error: {}",
tableNameWithType, context._backfillJobProperties, e.getMessage(), e);
}
} catch (NumberFormatException e) {
LOGGER.error("Invalid backfill job properties for table: {}, properties: {}, error: {}",
tableNameWithType, context._backfillJobProperties, e.getMessage(), e);
}
}

ensureBackfillJobsRunning(tableNameWithType);
ensureCompletedBackfillJobsCleanedUp(tableConfig);

Set<String> activeTopics = _tableBackfillTopics.get(tableNameWithType);
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.BACKFILL_TOPICS_IN_PROGRESS,
activeTopics != null ? activeTopics.size() : 0L);
}

/**
Expand Down Expand Up @@ -175,6 +227,8 @@ private void ensureCompletedBackfillJobsCleanedUp(TableConfig tableConfig) {
if (cleanedUpTopics.containsAll(_tableBackfillTopics.get(tableNameWithType))) {
_tableTopicsUnderBackfill.remove(tableNameWithType);
_tableBackfillTopics.remove(tableNameWithType);
// Remove all per-partition collision counters for this table
_partitionInFlightCollisionCount.keySet().removeIf(k -> k.startsWith(tableNameWithType + ":"));
if (_tableToHandler.get(tableNameWithType) != null) {
_tableToHandler.get(tableNameWithType).close();
_tableToHandler.remove(tableNameWithType);
Expand All @@ -184,6 +238,8 @@ private void ensureCompletedBackfillJobsCleanedUp(TableConfig tableConfig) {
}
if (cleanedUpTopics.size() > 0) {
LOGGER.info("Cleaned up complete backfill topics {} for table {}", cleanedUpTopics, tableNameWithType);
_controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_CLEANUP_COMPLETED, cleanedUpTopics.size());
}
}

Expand All @@ -193,6 +249,30 @@ protected void nonLeaderCleanup(List<String> tableNamesWithType) {
_tableTopicsUnderBackfill.remove(tableNameWithType);
_tableBackfillTopics.remove(tableNameWithType);
_tableToHandler.remove(tableNameWithType);
_partitionInFlightCollisionCount.keySet().removeIf(k -> k.startsWith(tableNameWithType + ":"));
}
}

private void setPauseFlag(String tableNameWithType, TableConfig tableConfig, String topicName) {
List<Map<String, String>> streamConfigMaps =
tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
for (Map<String, String> map : streamConfigMaps) {
// Topic name is stored under the prefixed key "stream.<type>.topic.name"
String streamType = map.get(StreamConfigProperties.STREAM_TYPE);
String topicKey =
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME);
if (topicName.equals(map.get(topicKey))) {
map.put(StreamConfigProperties.OFFSET_AUTO_RESET_PAUSE, "true");
break;
}
}
try {
_pinotHelixResourceManager.updateTableConfig(tableConfig);
LOGGER.info("Set offset auto reset pause flag for table {} topic {}", tableNameWithType, topicName);
} catch (Exception e) {
LOGGER.error("Failed to set pause flag for table {} topic {}", tableNameWithType, topicName, e);
_controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.OFFSET_AUTO_RESET_AUTO_PAUSE_FAILURE, 1L);
}
}

Expand Down Expand Up @@ -235,6 +315,8 @@ private RealtimeOffsetAutoResetHandler getOrConstructHandler(TableConfig tableCo
return handler;
} catch (Exception e) {
LOGGER.error("Cannot create RealtimeOffsetAutoResetHandler", e);
_controllerMetrics.addMeteredTableValue(tableConfig.getTableName(),
ControllerMeter.OFFSET_AUTO_RESET_HANDLER_INIT_FAILURE, 1L);
return null;
}
}
Expand Down
Loading
Loading