From c2b381c7d12b6a8a3b8ca3fe9943eea0f98a227b Mon Sep 17 00:00:00 2001 From: shauryachats Date: Wed, 6 May 2026 22:24:28 +0000 Subject: [PATCH 1/4] [bugfix] Fixing instance partition assignments for multi-stream realtime tables --- .../segment/RealtimeSegmentAssignment.java | 6 +++++ ...ReplicaGroupSegmentAssignmentStrategy.java | 23 ++++++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index 62821c252fdf..d4b991e54b7a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -38,6 +38,7 @@ import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.apache.pinot.spi.utils.IngestionConfigUtils; /** @@ -115,6 +116,11 @@ private List assignConsumingSegment(String segmentName, InstancePartitio } protected List assignConsumingSegment(int segmentPartitionId, InstancePartitions instancePartitions) { + // For multi-stream tables, Pinot partition IDs are encoded as (streamIndex * 10000 + streamPartitionId). + // Extract the stream-level partition id before computing the instance index to avoid incorrect slot mapping. + segmentPartitionId = + IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_tableConfig, segmentPartitionId); + int numReplicaGroups = instancePartitions.getNumReplicaGroups(); int numPartitions = instancePartitions.getNumPartitions(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java index 15047747a733..32ae550685a6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java @@ -33,6 +33,8 @@ import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +43,7 @@ public class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentS private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaGroupSegmentAssignmentStrategy.class); protected HelixManager _helixManager; + protected TableConfig _tableConfig; protected String _tableName; protected String _partitionColumn; protected int _replication; @@ -48,6 +51,7 @@ public class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentS @Override public void init(HelixManager helixManager, TableConfig tableConfig) { _helixManager = helixManager; + _tableConfig = tableConfig; _tableName = tableConfig.getTableName(); SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig(); Preconditions.checkState(validationAndRetentionConfig != null, "segmentsConfig is null"); @@ -77,9 +81,7 @@ public List assignSegment(String segmentName, Map> reassignSegments(Map> instancePartitionIdToSegmentsMap = Maps.newHashMapWithExpectedSize(numPartitions); for (String segmentName : currentAssignment.keySet()) { - int instancePartitionId = - SegmentUtils.getSegmentPartitionIdOrDefault(segmentName, _tableName, _helixManager, _partitionColumn) - % numPartitions; + int instancePartitionId = getPartitionIdFromSegmentName(segmentName, numPartitions); instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()).add(segmentName); } @@ -125,6 +125,17 @@ public Map> reassignSegments(Map Date: Thu, 7 May 2026 00:04:19 +0000 Subject: [PATCH 2/4] Fixed failing unit tests --- .../ReplicaGroupSegmentAssignmentStrategy.java | 6 ++---- .../pinot/spi/utils/IngestionConfigUtils.java | 18 ++++++++++++------ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java index 32ae550685a6..70d897b187f0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java @@ -33,7 +33,6 @@ import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,9 +129,8 @@ private int getPartitionIdFromSegmentName(String segmentName, int numPartitions) SegmentUtils.getSegmentPartitionIdOrDefault(segmentName, _tableName, _helixManager, _partitionColumn); // For multi-stream realtime tables, translate the Pinot partition ID (which encodes stream index and stream // partition as streamIndex * 10000 + streamPartitionId) to the stream partition id before computing the slot. - if (_tableConfig.getTableType() == TableType.REALTIME) { - rawPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_tableConfig, rawPartitionId); - } + // getStreamPartitionIdFromPinotPartitionId is a no-op for offline tables and single-stream realtime tables. + rawPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_tableConfig, rawPartitionId); return rawPartitionId % numPartitions; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 40f647c429cd..f1f0c7d948eb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -102,11 +102,6 @@ public static StreamConfig getFirstStreamConfig(TableConfig tableConfig) { return new StreamConfig(tableConfig.getTableName(), getFirstStreamConfigMap(tableConfig)); } - /// Returns `true` if the table contains multiple streams. - public static boolean hasMultipleStreams(TableConfig tableConfig) { - return getStreamConfigMaps(tableConfig).size() > 1; - } - /** * Getting the Pinot segment level partition id from the stream partition id. * @param partitionId the partition id from the stream @@ -117,8 +112,19 @@ public static int getPinotPartitionIdFromStreamPartitionId(int partitionId, int } /// Returns the stream partition id from the Pinot segment partition id. + /// Safe to call for any table type: returns `partitionId` unchanged for OFFLINE tables or REALTIME tables + /// that lack stream configs (treat as single-stream). public static int getStreamPartitionIdFromPinotPartitionId(TableConfig tableConfig, int partitionId) { - return hasMultipleStreams(tableConfig) ? getStreamPartitionIdFromPinotPartitionId(partitionId) : partitionId; + if (tableConfig.getTableType() != TableType.REALTIME) { + return partitionId; + } + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + if (ingestionConfig == null || ingestionConfig.getStreamIngestionConfig() == null) { + return partitionId; + } + List> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps(); + return streamConfigMaps != null && streamConfigMaps.size() > 1 + ? getStreamPartitionIdFromPinotPartitionId(partitionId) : partitionId; } /// Returns the stream partition id from the Pinot segment partition id. From 0d614e7c979fed18edea3872615af42417321430 Mon Sep 17 00:00:00 2001 From: shauryachats Date: Thu, 7 May 2026 00:16:13 +0000 Subject: [PATCH 3/4] Reverted hasMultipleStreams method --- .../org/apache/pinot/spi/utils/IngestionConfigUtils.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index f1f0c7d948eb..a0e67778d195 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -102,6 +102,11 @@ public static StreamConfig getFirstStreamConfig(TableConfig tableConfig) { return new StreamConfig(tableConfig.getTableName(), getFirstStreamConfigMap(tableConfig)); } + /// Returns `true` if the table contains multiple streams. + public static boolean hasMultipleStreams(TableConfig tableConfig) { + return getStreamConfigMaps(tableConfig).size() > 1; + } + /** * Getting the Pinot segment level partition id from the stream partition id. * @param partitionId the partition id from the stream From 647ba75875fd2e2dd4420e189e2db3172f16c1bd Mon Sep 17 00:00:00 2001 From: shauryachats Date: Fri, 22 May 2026 00:25:58 +0000 Subject: [PATCH 4/4] Addressed fixes --- .../pinot/spi/utils/IngestionConfigUtils.java | 25 +++++----- .../spi/utils/IngestionConfigUtilsTest.java | 48 +++++++++++++++++++ 2 files changed, 59 insertions(+), 14 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index a0e67778d195..b0731ad38315 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -102,9 +102,16 @@ public static StreamConfig getFirstStreamConfig(TableConfig tableConfig) { return new StreamConfig(tableConfig.getTableName(), getFirstStreamConfigMap(tableConfig)); } - /// Returns `true` if the table contains multiple streams. + /// Returns {@code true} if the table has multiple streams configured. + /// Safe to call for any table type: returns {@code false} for OFFLINE tables and REALTIME tables that lack stream + /// configs. public static boolean hasMultipleStreams(TableConfig tableConfig) { - return getStreamConfigMaps(tableConfig).size() > 1; + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + if (ingestionConfig == null || ingestionConfig.getStreamIngestionConfig() == null) { + return false; + } + List> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps(); + return streamConfigMaps != null && streamConfigMaps.size() > 1; } /** @@ -117,19 +124,9 @@ public static int getPinotPartitionIdFromStreamPartitionId(int partitionId, int } /// Returns the stream partition id from the Pinot segment partition id. - /// Safe to call for any table type: returns `partitionId` unchanged for OFFLINE tables or REALTIME tables - /// that lack stream configs (treat as single-stream). + /// Returns {@code partitionId} unchanged for tables without multiple streams. public static int getStreamPartitionIdFromPinotPartitionId(TableConfig tableConfig, int partitionId) { - if (tableConfig.getTableType() != TableType.REALTIME) { - return partitionId; - } - IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); - if (ingestionConfig == null || ingestionConfig.getStreamIngestionConfig() == null) { - return partitionId; - } - List> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps(); - return streamConfigMaps != null && streamConfigMaps.size() > 1 - ? getStreamPartitionIdFromPinotPartitionId(partitionId) : partitionId; + return hasMultipleStreams(tableConfig) ? getStreamPartitionIdFromPinotPartitionId(partitionId) : partitionId; } /// Returns the stream partition id from the Pinot segment partition id. diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java index 3390126b2ffb..b55fb22e71c9 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java @@ -160,6 +160,54 @@ public void testGetConfigMapWithPrefix() { Assert.assertEquals(2, IngestionConfigUtils.getConfigMapWithPrefix(testMap, "k1.").size()); } + @Test + public void testHasMultipleStreams() { + // OFFLINE table — must not throw, must return false + TableConfig offlineTable = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build(); + Assert.assertFalse(IngestionConfigUtils.hasMultipleStreams(offlineTable)); + + // REALTIME, no ingestionConfig + TableConfig realtimeTable = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable").setTimeColumnName("timeColumn").build(); + Assert.assertFalse(IngestionConfigUtils.hasMultipleStreams(realtimeTable)); + + // REALTIME, single stream + Map streamConfigMap1 = Collections.singletonMap("streamType", "kafka"); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(streamConfigMap1))); + realtimeTable.setIngestionConfig(ingestionConfig); + Assert.assertFalse(IngestionConfigUtils.hasMultipleStreams(realtimeTable)); + + // REALTIME, two streams + Map streamConfigMap2 = Collections.singletonMap("streamType", "kinesis"); + ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigMap1, streamConfigMap2))); + Assert.assertTrue(IngestionConfigUtils.hasMultipleStreams(realtimeTable)); + } + + @Test + public void testGetStreamPartitionIdFromPinotPartitionId() { + // OFFLINE table — must return partitionId unchanged + TableConfig offlineTable = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build(); + Assert.assertEquals(IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(offlineTable, 42), 42); + + // REALTIME, single stream — must return partitionId unchanged + Map streamConfigMap = Collections.singletonMap("streamType", "kafka"); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(streamConfigMap))); + TableConfig realtimeTable = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable").setTimeColumnName("timeColumn").build(); + realtimeTable.setIngestionConfig(ingestionConfig); + Assert.assertEquals(IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(realtimeTable, 42), 42); + + // REALTIME, multi-stream — encoded partition id 10003 (stream 1, partition 3) must decode to 3 + Map streamConfigMap2 = Collections.singletonMap("streamType", "kinesis"); + ingestionConfig.setStreamIngestionConfig( + new StreamIngestionConfig(List.of(streamConfigMap, streamConfigMap2))); + int pinotPartitionId = IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(3, 1); + Assert.assertEquals(IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(realtimeTable, pinotPartitionId), + 3); + } + @Test public void testGetStreamConfigIndexToStreamPartitions() { Set pinotPartitionIds = new HashSet<>(2);