diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index 62821c252fdf..d4b991e54b7a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -38,6 +38,7 @@ import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.apache.pinot.spi.utils.IngestionConfigUtils; /** @@ -115,6 +116,11 @@ private List assignConsumingSegment(String segmentName, InstancePartitio } protected List assignConsumingSegment(int segmentPartitionId, InstancePartitions instancePartitions) { + // For multi-stream tables, Pinot partition IDs are encoded as (streamIndex * 10000 + streamPartitionId). + // Extract the stream-level partition id before computing the instance index to avoid incorrect slot mapping. + segmentPartitionId = + IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_tableConfig, segmentPartitionId); + int numReplicaGroups = instancePartitions.getNumReplicaGroups(); int numPartitions = instancePartitions.getNumPartitions(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java index 15047747a733..70d897b187f0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java @@ -33,6 +33,7 @@ import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +42,7 @@ public class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentS private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaGroupSegmentAssignmentStrategy.class); protected HelixManager _helixManager; + protected TableConfig _tableConfig; protected String _tableName; protected String _partitionColumn; protected int _replication; @@ -48,6 +50,7 @@ public class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentS @Override public void init(HelixManager helixManager, TableConfig tableConfig) { _helixManager = helixManager; + _tableConfig = tableConfig; _tableName = tableConfig.getTableName(); SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig(); Preconditions.checkState(validationAndRetentionConfig != null, "segmentsConfig is null"); @@ -77,9 +80,7 @@ public List assignSegment(String segmentName, Map> reassignSegments(Map> instancePartitionIdToSegmentsMap = Maps.newHashMapWithExpectedSize(numPartitions); for (String segmentName : currentAssignment.keySet()) { - int instancePartitionId = - SegmentUtils.getSegmentPartitionIdOrDefault(segmentName, _tableName, _helixManager, _partitionColumn) - % numPartitions; + int instancePartitionId = getPartitionIdFromSegmentName(segmentName, numPartitions); instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()).add(segmentName); } @@ -125,6 +124,16 @@ public Map> reassignSegments(Map 1; + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + if (ingestionConfig == null || ingestionConfig.getStreamIngestionConfig() == null) { + return false; + } + List> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps(); + return streamConfigMaps != null && streamConfigMaps.size() > 1; } /** @@ -117,6 +124,7 @@ public static int getPinotPartitionIdFromStreamPartitionId(int partitionId, int } /// Returns the stream partition id from the Pinot segment partition id. + /// Returns {@code partitionId} unchanged for tables without multiple streams. public static int getStreamPartitionIdFromPinotPartitionId(TableConfig tableConfig, int partitionId) { return hasMultipleStreams(tableConfig) ? getStreamPartitionIdFromPinotPartitionId(partitionId) : partitionId; } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java index 3390126b2ffb..b55fb22e71c9 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java @@ -160,6 +160,54 @@ public void testGetConfigMapWithPrefix() { Assert.assertEquals(2, IngestionConfigUtils.getConfigMapWithPrefix(testMap, "k1.").size()); } + @Test + public void testHasMultipleStreams() { + // OFFLINE table — must not throw, must return false + TableConfig offlineTable = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build(); + Assert.assertFalse(IngestionConfigUtils.hasMultipleStreams(offlineTable)); + + // REALTIME, no ingestionConfig + TableConfig realtimeTable = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable").setTimeColumnName("timeColumn").build(); + Assert.assertFalse(IngestionConfigUtils.hasMultipleStreams(realtimeTable)); + + // REALTIME, single stream + Map streamConfigMap1 = Collections.singletonMap("streamType", "kafka"); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(streamConfigMap1))); + realtimeTable.setIngestionConfig(ingestionConfig); + Assert.assertFalse(IngestionConfigUtils.hasMultipleStreams(realtimeTable)); + + // REALTIME, two streams + Map streamConfigMap2 = Collections.singletonMap("streamType", "kinesis"); + ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigMap1, streamConfigMap2))); + Assert.assertTrue(IngestionConfigUtils.hasMultipleStreams(realtimeTable)); + } + + @Test + public void testGetStreamPartitionIdFromPinotPartitionId() { + // OFFLINE table — must return partitionId unchanged + TableConfig offlineTable = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build(); + Assert.assertEquals(IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(offlineTable, 42), 42); + + // REALTIME, single stream — must return partitionId unchanged + Map streamConfigMap = Collections.singletonMap("streamType", "kafka"); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(streamConfigMap))); + TableConfig realtimeTable = + new TableConfigBuilder(TableType.REALTIME).setTableName("myTable").setTimeColumnName("timeColumn").build(); + realtimeTable.setIngestionConfig(ingestionConfig); + Assert.assertEquals(IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(realtimeTable, 42), 42); + + // REALTIME, multi-stream — encoded partition id 10003 (stream 1, partition 3) must decode to 3 + Map streamConfigMap2 = Collections.singletonMap("streamType", "kinesis"); + ingestionConfig.setStreamIngestionConfig( + new StreamIngestionConfig(List.of(streamConfigMap, streamConfigMap2))); + int pinotPartitionId = IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(3, 1); + Assert.assertEquals(IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(realtimeTable, pinotPartitionId), + 3); + } + @Test public void testGetStreamConfigIndexToStreamPartitions() { Set pinotPartitionIds = new HashSet<>(2);