Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.IngestionConfigUtils;


/**
Expand Down Expand Up @@ -115,6 +116,11 @@ private List<String> assignConsumingSegment(String segmentName, InstancePartitio
}

protected List<String> assignConsumingSegment(int segmentPartitionId, InstancePartitions instancePartitions) {
// For multi-stream tables, Pinot partition IDs are encoded as (streamIndex * 10000 + streamPartitionId).
// Extract the stream-level partition id before computing the instance index to avoid incorrect slot mapping.
segmentPartitionId =
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_tableConfig, segmentPartitionId);

int numReplicaGroups = instancePartitions.getNumReplicaGroups();
int numPartitions = instancePartitions.getNumPartitions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,13 +42,15 @@ public class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentS
private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaGroupSegmentAssignmentStrategy.class);

protected HelixManager _helixManager;
protected TableConfig _tableConfig;
protected String _tableName;
protected String _partitionColumn;
protected int _replication;

@Override
public void init(HelixManager helixManager, TableConfig tableConfig) {
_helixManager = helixManager;
_tableConfig = tableConfig;
_tableName = tableConfig.getTableName();
SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig();
Preconditions.checkState(validationAndRetentionConfig != null, "segmentsConfig is null");
Expand Down Expand Up @@ -77,9 +80,7 @@ public List<String> assignSegment(String segmentName, Map<String, Map<String, St
if (numPartitions == 1) {
partitionId = 0;
} else {
partitionId =
SegmentUtils.getSegmentPartitionIdOrDefault(segmentName, _tableName, _helixManager, _partitionColumn)
% numPartitions;
partitionId = getPartitionIdFromSegmentName(segmentName, numPartitions);
}
return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionId);
}
Expand All @@ -106,9 +107,7 @@ public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String,
} else {
Map<Integer, List<String>> instancePartitionIdToSegmentsMap = Maps.newHashMapWithExpectedSize(numPartitions);
for (String segmentName : currentAssignment.keySet()) {
int instancePartitionId =
SegmentUtils.getSegmentPartitionIdOrDefault(segmentName, _tableName, _helixManager, _partitionColumn)
% numPartitions;
int instancePartitionId = getPartitionIdFromSegmentName(segmentName, numPartitions);
instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()).add(segmentName);
}

Expand All @@ -125,6 +124,16 @@ public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String,
}
}

private int getPartitionIdFromSegmentName(String segmentName, int numPartitions) {
int rawPartitionId =
SegmentUtils.getSegmentPartitionIdOrDefault(segmentName, _tableName, _helixManager, _partitionColumn);
// For multi-stream realtime tables, translate the Pinot partition ID (which encodes stream index and stream
// partition as streamIndex * 10000 + streamPartitionId) to the stream partition id before computing the slot.
// getStreamPartitionIdFromPinotPartitionId is a no-op for offline tables and single-stream realtime tables.
rawPartitionId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_tableConfig, rawPartitionId);
return rawPartitionId % numPartitions;
}

/**
* Helper method to check whether the number of replica-groups matches the table replication for replica-group based
* instance partitions. Log a warning if they do not match and use the one inside the instance partitions. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,16 @@ public static StreamConfig getFirstStreamConfig(TableConfig tableConfig) {
return new StreamConfig(tableConfig.getTableName(), getFirstStreamConfigMap(tableConfig));
}

/// Returns `true` if the table contains multiple streams.
/// Returns {@code true} if the table has multiple streams configured.
/// Safe to call for any table type: returns {@code false} for OFFLINE tables and REALTIME tables that lack stream
/// configs.
public static boolean hasMultipleStreams(TableConfig tableConfig) {
return getStreamConfigMaps(tableConfig).size() > 1;
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
if (ingestionConfig == null || ingestionConfig.getStreamIngestionConfig() == null) {
return false;
}
List<Map<String, String>> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
return streamConfigMaps != null && streamConfigMaps.size() > 1;
}

/**
Expand All @@ -117,6 +124,7 @@ public static int getPinotPartitionIdFromStreamPartitionId(int partitionId, int
}

/// Returns the stream partition id from the Pinot segment partition id.
/// Returns {@code partitionId} unchanged for tables without multiple streams.
public static int getStreamPartitionIdFromPinotPartitionId(TableConfig tableConfig, int partitionId) {
return hasMultipleStreams(tableConfig) ? getStreamPartitionIdFromPinotPartitionId(partitionId) : partitionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,54 @@ public void testGetConfigMapWithPrefix() {
Assert.assertEquals(2, IngestionConfigUtils.getConfigMapWithPrefix(testMap, "k1.").size());
}

@Test
public void testHasMultipleStreams() {
// OFFLINE table — must not throw, must return false
TableConfig offlineTable = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
Assert.assertFalse(IngestionConfigUtils.hasMultipleStreams(offlineTable));

// REALTIME, no ingestionConfig
TableConfig realtimeTable =
new TableConfigBuilder(TableType.REALTIME).setTableName("myTable").setTimeColumnName("timeColumn").build();
Assert.assertFalse(IngestionConfigUtils.hasMultipleStreams(realtimeTable));

// REALTIME, single stream
Map<String, String> streamConfigMap1 = Collections.singletonMap("streamType", "kafka");
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(streamConfigMap1)));
realtimeTable.setIngestionConfig(ingestionConfig);
Assert.assertFalse(IngestionConfigUtils.hasMultipleStreams(realtimeTable));

// REALTIME, two streams
Map<String, String> streamConfigMap2 = Collections.singletonMap("streamType", "kinesis");
ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigMap1, streamConfigMap2)));
Assert.assertTrue(IngestionConfigUtils.hasMultipleStreams(realtimeTable));
}

@Test
public void testGetStreamPartitionIdFromPinotPartitionId() {
// OFFLINE table — must return partitionId unchanged
TableConfig offlineTable = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
Assert.assertEquals(IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(offlineTable, 42), 42);

// REALTIME, single stream — must return partitionId unchanged
Map<String, String> streamConfigMap = Collections.singletonMap("streamType", "kafka");
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(streamConfigMap)));
TableConfig realtimeTable =
new TableConfigBuilder(TableType.REALTIME).setTableName("myTable").setTimeColumnName("timeColumn").build();
realtimeTable.setIngestionConfig(ingestionConfig);
Assert.assertEquals(IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(realtimeTable, 42), 42);

// REALTIME, multi-stream — encoded partition id 10003 (stream 1, partition 3) must decode to 3
Map<String, String> streamConfigMap2 = Collections.singletonMap("streamType", "kinesis");
ingestionConfig.setStreamIngestionConfig(
new StreamIngestionConfig(List.of(streamConfigMap, streamConfigMap2)));
int pinotPartitionId = IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(3, 1);
Assert.assertEquals(IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(realtimeTable, pinotPartitionId),
3);
}

@Test
public void testGetStreamConfigIndexToStreamPartitions() {
Set<Integer> pinotPartitionIds = new HashSet<>(2);
Expand Down
Loading