diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java index 2ecfca09df..227db3fabc 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java @@ -265,7 +265,9 @@ public SplitEnumerator restoreEnumerator new LeaseContext( sourceEnumeratorState.getLeaseId(), leaseContext.getKvSnapshotLeaseDurationMs()), - true); + true, + sourceEnumeratorState.isInitialDiscoveryFinished(), + sourceEnumeratorState.getUnassignedSplits()); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index d107712ff3..2f4f9749a6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -155,6 +155,22 @@ public class FlinkSourceEnumerator private final OffsetsInitializer startingOffsetsInitializer; private final OffsetsInitializer stoppingOffsetsInitializer; + /** + * The offsets initializer used for partitions discovered after the initial startup. Following + * FLIP-288 semantics, newly discovered partitions always start from earliest to prevent data + * loss. + */ + private final OffsetsInitializer newDiscoveryOffsetsInitializer; + + /** + * Splits whose starting offsets have been initialized but that have not yet been assigned to + * any reader. This map is persisted in checkpoint state (via {@link #snapshotState}) so that on + * failover restore these splits are directly placed into {@link #pendingSplitAssignment} + * without re-initialization, preserving the original offset strategy determined at first + * discovery time (FLIP-288). + */ + private final Map unassignedSplits; + private final LeaseContext leaseContext; /** checkpointId -> tableBuckets who finished consume kv snapshots. */ @@ -170,6 +186,14 @@ public class FlinkSourceEnumerator // split initializing has finished. private boolean noMoreNewSplits = false; + /** + * Whether the initial partition discovery has been completed. Following FLIP-288, this flag + * alone determines offset strategy: partitions discovered before this flag is set to {@code + * true} use the user-configured {@link #startingOffsetsInitializer}, while partitions + * discovered after use {@link #newDiscoveryOffsetsInitializer} (earliest) to prevent data loss. + */ + private boolean initialDiscoveryFinished; + private boolean lakeEnabled = false; private volatile boolean closed = false; @@ -260,7 +284,9 @@ public FlinkSourceEnumerator( partitionFilters, lakeSource, leaseContext, - checkpointTriggeredBefore); + checkpointTriggeredBefore, + false, + Collections.emptyList()); } public FlinkSourceEnumerator( @@ -278,7 +304,9 @@ public FlinkSourceEnumerator( @Nullable Predicate partitionFilters, @Nullable LakeSource lakeSource, LeaseContext leaseContext, - boolean checkpointTriggeredBefore) { + boolean checkpointTriggeredBefore, + boolean initialDiscoveryFinished, + List unassignedSplits) { this( tablePath, flussConf, @@ -295,7 +323,9 @@ public FlinkSourceEnumerator( partitionFilters, lakeSource, leaseContext, - checkpointTriggeredBefore); + checkpointTriggeredBefore, + initialDiscoveryFinished, + unassignedSplits); } public FlinkSourceEnumerator( @@ -314,7 +344,9 @@ public FlinkSourceEnumerator( @Nullable Predicate partitionFilters, @Nullable LakeSource lakeSource, LeaseContext leaseContext, - boolean checkpointTriggeredBefore) { + boolean checkpointTriggeredBefore, + boolean initialDiscoveryFinished, + Collection unassignedSplits) { this( tablePath, flussConf, @@ -332,7 +364,9 @@ public FlinkSourceEnumerator( lakeSource, new WorkerExecutor(context), leaseContext, - checkpointTriggeredBefore); + checkpointTriggeredBefore, + initialDiscoveryFinished, + unassignedSplits); } FlinkSourceEnumerator( @@ -369,7 +403,9 @@ public FlinkSourceEnumerator( lakeSource, workerExecutor, leaseContext, - checkpointTriggeredBefore); + checkpointTriggeredBefore, + false, + Collections.emptyList()); } FlinkSourceEnumerator( @@ -389,7 +425,9 @@ public FlinkSourceEnumerator( @Nullable LakeSource lakeSource, WorkerExecutor workerExecutor, LeaseContext leaseContext, - boolean checkpointTriggeredBefore) { + boolean checkpointTriggeredBefore, + boolean initialDiscoveryFinished, + Collection unassignedSplits) { checkArgument( splitPerAssignmentBatchSize > 0, "Split assignment batch size must be positive, but was %s.", @@ -407,6 +445,7 @@ public FlinkSourceEnumerator( ? null : new LinkedList<>(pendingHybridLakeFlussSplits); this.startingOffsetsInitializer = startingOffsetsInitializer; + this.newDiscoveryOffsetsInitializer = OffsetsInitializer.earliest(); this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; this.streaming = streaming; this.partitionFilters = partitionFilters; @@ -417,6 +456,8 @@ public FlinkSourceEnumerator( this.leaseContext = leaseContext; this.checkpointTriggeredBefore = checkpointTriggeredBefore; this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; + this.initialDiscoveryFinished = initialDiscoveryFinished; + this.unassignedSplits = indexByPartition(unassignedSplits); } @Override @@ -434,6 +475,13 @@ public void start() { ExceptionUtils.stripCompletionException(e)); } + // Find splits where the start offset has been initialized but not yet assigned to readers. + // These splits must not be reinitialized to keep offsets consistent with first discovery. + final Collection preinitializedSplits = unassignedSplits.values(); + if (!unassignedSplits.isEmpty()) { + addSplitToPendingAssignments(preinitializedSplits); + } + if (isPartitioned) { if (streaming) { if (lakeSource != null) { @@ -509,7 +557,9 @@ private void startInBatchMode() { // Use log-only splits to avoid generating mixed split // types (HybridSnapshotLogSplit + LogSplit) for // primary-key tables, which is not supported. - splits = this.initLogTablePartitionSplits(partitions); + splits = + this.initLogTablePartitionSplits( + partitions, startingOffsetsInitializer); } else { splits = this.getLogSplit(null, null); } @@ -526,6 +576,14 @@ private void startInBatchMode() { } private void startInStreamModeForNonPartitionedTable() { + // If we have restored unassigned splits from checkpoint state, skip re-initialization. + // These splits already have their offsets resolved and will be assigned to readers + // when they register (via addReader -> assignPendingSplits). + if (!pendingSplitAssignment.isEmpty()) { + noMoreNewSplits = true; + return; + } + if (lakeSource != null) { // Generate lake splits synchronously so that they are available before the // first checkpoint. This is consistent with the partitioned-table path in @@ -620,8 +678,16 @@ private void checkPartitionChanges(Set partitionInfos, Throwable tablePath, partitionInfos.size()); - final PartitionChange partitionChange = getPartitionChange(partitionInfos); + final PartitionChange partitionChange = + getPartitionChange(partitionInfos, !initialDiscoveryFinished); + if (partitionChange.isEmpty()) { + // No partition changes found. For the empty-table case (no initial partitions + // to track), mark initial discovery as finished immediately since there are + // no splits that need to be persisted in state first. + if (!initialDiscoveryFinished) { + initialDiscoveryFinished = true; + } LOG.debug("No partition changes detected for table {}", tablePath); return; } @@ -636,21 +702,32 @@ private void checkPartitionChanges(Set partitionInfos, Throwable handlePartitionsRemoved(partitionChange.removedPartitions); } - // handle new partitions - if (!partitionChange.newPartitions.isEmpty()) { + // handle initial partitions and new partitions + boolean hasNewOrInitialPartitions = + !partitionChange.initialPartitions.isEmpty() + || !partitionChange.newPartitions.isEmpty(); + if (hasNewOrInitialPartitions) { + Collection allNewPartitions = new ArrayList<>(); + allNewPartitions.addAll(partitionChange.initialPartitions); + allNewPartitions.addAll(partitionChange.newPartitions); LOG.info( - "Handling {} new partitions for table {}: {}", - partitionChange.newPartitions.size(), + "Handling {} partitions for table {} (initial={}, new={}): {}", + allNewPartitions.size(), tablePath, - partitionChange.newPartitions); + partitionChange.initialPartitions.size(), + partitionChange.newPartitions.size(), + allNewPartitions); workerExecutor.callAsync( - () -> initPartitionedSplits(partitionChange.newPartitions), - this::handleSplitsAdd); + () -> initPartitionedSplits(partitionChange), + (splits, throwable) -> { + handleSplitsAdd(splits, throwable); + }); } } - private PartitionChange getPartitionChange(Set fetchedPartitionInfos) { - final Set newPartitions = + private PartitionChange getPartitionChange( + Set fetchedPartitionInfos, boolean initialDiscovery) { + final Set allNewPartitions = fetchedPartitionInfos.stream() .map(p -> new Partition(p.getPartitionId(), p.getPartitionName())) .collect(Collectors.toSet()); @@ -679,7 +756,7 @@ private PartitionChange getPartitionChange(Set fetchedPartitionIn assignedOrPendingPartitions.forEach( p -> { - if (!newPartitions.remove(p)) { + if (!allNewPartitions.remove(p)) { removedPartitions.add(p); } }); @@ -687,25 +764,63 @@ private PartitionChange getPartitionChange(Set fetchedPartitionIn if (!removedPartitions.isEmpty()) { LOG.info("Discovered removed partitions: {}", removedPartitions); } - if (!newPartitions.isEmpty()) { - LOG.info("Discovered new partitions: {}", newPartitions); + if (!allNewPartitions.isEmpty()) { + LOG.info("Discovered new partitions: {}", allNewPartitions); } - return new PartitionChange(newPartitions, removedPartitions); + // Following Kafka's FLIP-288 pattern: if this is the initial discovery, + // all new partitions are classified as "initial partitions" and will use + // the user-configured offset initializer. After initial discovery is done, + // all new partitions are classified as "new partitions" and will use earliest. + Set initialPartitions = new HashSet<>(); + Set newPartitions; + if (initialDiscovery) { + initialPartitions.addAll(allNewPartitions); + newPartitions = Collections.emptySet(); + } else { + newPartitions = allNewPartitions; + } + + return new PartitionChange(initialPartitions, newPartitions, removedPartitions); } - private List initPartitionedSplits(Collection newPartitions) { + private List initPartitionedSplits(PartitionChange partitionChange) { + Collection initialPartitions = partitionChange.initialPartitions; + Collection newPartitions = partitionChange.newPartitions; + if (hasPrimaryKey && startingOffsetsInitializer instanceof SnapshotOffsetsInitializer) { - return initPrimaryKeyTablePartitionSplits(newPartitions); + // Snapshot mode for PK tables is already safe: it reads the snapshot or falls back + // to earliest offsets when no snapshot is available. + List allPartitions = new ArrayList<>(); + allPartitions.addAll(initialPartitions); + allPartitions.addAll(newPartitions); + return initPrimaryKeyTablePartitionSplits(allPartitions); } else { - return initLogTablePartitionSplits(newPartitions); + // For log tables (or PK tables in non-snapshot mode), use FLIP-288 semantics: + // - Initial partitions: use user-configured offset + // - New partitions: use earliest to prevent data loss + List splits = new ArrayList<>(); + if (!initialPartitions.isEmpty()) { + splits.addAll( + initLogTablePartitionSplits(initialPartitions, startingOffsetsInitializer)); + } + if (!newPartitions.isEmpty()) { + splits.addAll( + initLogTablePartitionSplits(newPartitions, newDiscoveryOffsetsInitializer)); + } + return splits; } } - private List initLogTablePartitionSplits(Collection newPartitions) { + private List initLogTablePartitionSplits( + Collection newPartitions, OffsetsInitializer effectiveOffsetsInitializer) { List splits = new ArrayList<>(); for (Partition partition : newPartitions) { - splits.addAll(getLogSplit(partition.getPartitionId(), partition.getPartitionName())); + splits.addAll( + getLogSplit( + partition.getPartitionId(), + partition.getPartitionName(), + effectiveOffsetsInitializer)); } return splits; } @@ -848,6 +963,13 @@ private List getSnapshotAndLogSplits( private List getLogSplit( @Nullable Long partitionId, @Nullable String partitionName) { + return getLogSplit(partitionId, partitionName, startingOffsetsInitializer); + } + + private List getLogSplit( + @Nullable Long partitionId, + @Nullable String partitionName, + OffsetsInitializer effectiveStartingOffsetsInitializer) { // always assume the bucket is from 0 to bucket num List splits = new ArrayList<>(); List bucketsNeedInitOffset = new ArrayList<>(); @@ -862,7 +984,7 @@ private List getLogSplit( if (!bucketsNeedInitOffset.isEmpty()) { Map startingOffsets = - startingOffsetsInitializer.getBucketOffsets( + effectiveStartingOffsetsInitializer.getBucketOffsets( partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever); Map stoppingOffsets = stoppingOffsetsInitializer.getBucketOffsets( @@ -978,6 +1100,12 @@ private void handleSplitsAdd(List splits, Throwable t) { t); } } + + initialDiscoveryFinished = true; + for (SourceSplitBase split : splits) { + unassignedSplits.put(split.getTableBucket(), split); + } + if (isPartitioned) { if (!streaming || scanPartitionDiscoveryIntervalMs <= 0) { // if not streaming or partition discovery is disabled @@ -1026,6 +1154,7 @@ private void assignPendingSplits(Set pendingReaders) { split -> { TableBucket tableBucket = split.getTableBucket(); assignedTableBuckets.add(tableBucket); + unassignedSplits.remove(tableBucket); if (isPartitioned) { long partitionId = @@ -1204,6 +1333,13 @@ Map> getPendingSplitAssignment() { @Override public void addSplitsBack(List splits, int subtaskId) { LOG.debug("Flink Source Enumerator adds splits back: {}", splits); + for (SourceSplitBase split : splits) { + unassignedSplits.put(split.getTableBucket(), split); + assignedTableBuckets.remove(split.getTableBucket()); + if (isPartitioned) { + assignedPartitions.remove(split.getTableBucket().getPartitionId()); + } + } addSplitToPendingAssignments(splits); // If the failed subtask has already restarted, we need to assign pending splits to it @@ -1225,8 +1361,13 @@ public SourceEnumeratorState snapshotState(long checkpointId) { assignedTableBuckets, assignedPartitions, pendingHybridLakeFlussSplits, - leaseContext.getKvSnapshotLeaseId()); - LOG.debug("Source Checkpoint is {}", enumeratorState); + leaseContext.getKvSnapshotLeaseId(), + initialDiscoveryFinished, + unassignedSplits.values()); + LOG.debug( + "Source Checkpoint is {}, initialDiscoveryFinished={}", + enumeratorState, + initialDiscoveryFinished); return enumeratorState; } @@ -1349,20 +1490,31 @@ private void maybeDropKvSnapshotLease() throws Exception { } } + private static Map indexByPartition( + Collection splits) { + return splits.stream().collect(Collectors.toMap(SourceSplitBase::getTableBucket, e -> e)); + } + // --------------- private class --------------- /** A container class to hold the newly added partitions and removed partitions. */ private static class PartitionChange { + private final Collection initialPartitions; private final Collection newPartitions; private final Collection removedPartitions; PartitionChange( - Collection newPartitions, Collection removedPartitions) { + Collection initialPartitions, + Collection newPartitions, + Collection removedPartitions) { + this.initialPartitions = initialPartitions; this.newPartitions = newPartitions; this.removedPartitions = removedPartitions; } public boolean isEmpty() { - return newPartitions.isEmpty() && removedPartitions.isEmpty(); + return initialPartitions.isEmpty() + && newPartitions.isEmpty() + && removedPartitions.isEmpty(); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java index 996313e607..3d0374d404 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -70,11 +71,13 @@ public class FlussSourceEnumeratorStateSerializer private static final int VERSION_0 = 0; private static final int VERSION_1 = 1; private static final int VERSION_2 = 2; + private static final int VERSION_3 = 3; + private static final int VERSION_4 = 4; private static final ThreadLocal SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); - private static final int CURRENT_VERSION = VERSION_2; + private static final int CURRENT_VERSION = VERSION_4; public FlussSourceEnumeratorStateSerializer(LakeSource lakeSource) { this.lakeSource = lakeSource; @@ -99,6 +102,12 @@ public byte[] serialize(SourceEnumeratorState state) throws IOException { // write lease context serializeLeaseId(out, state); + // write initial discovery finished flag (VERSION_3+) + out.writeBoolean(state.isInitialDiscoveryFinished()); + + // write unassigned splits (VERSION_4) + serializeUnassignedSplits(out, state.getUnassignedSplits()); + final byte[] result = out.getCopyOfBuffer(); out.clear(); return result; @@ -169,6 +178,10 @@ protected byte[] serializeV0(SourceEnumeratorState state) throws IOException { @Override public SourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException { switch (version) { + case VERSION_4: + return deserializeV4(serialized); + case VERSION_3: + return deserializeV3(serialized); case VERSION_2: return deserializeV2(serialized); case VERSION_1: @@ -229,6 +242,8 @@ private SourceEnumeratorState deserializeV2(byte[] serialized) throws IOExceptio // deserialize lease context LeaseContext leaseContext = deserializeLeaseId(in); + // V2 does not have initialDiscoveryFinished flag; default to true (safe choice + // to prevent data loss: treat all newly discovered partitions as post-initial). return new SourceEnumeratorState( assignBucketAndPartitions.f0, assignBucketAndPartitions.f1, @@ -236,6 +251,60 @@ private SourceEnumeratorState deserializeV2(byte[] serialized) throws IOExceptio leaseContext.getKvSnapshotLeaseId()); } + private SourceEnumeratorState deserializeV3(byte[] serialized) throws IOException { + DataInputDeserializer in = new DataInputDeserializer(serialized); + Tuple2, Map> assignBucketAndPartitions = + deserializeAssignBucketAndPartitions(in); + List remainingHybridLakeFlussSplits = + deserializeRemainingHybridLakeFlussSplits(in); + + // deserialize lease context + LeaseContext leaseContext = deserializeLeaseId(in); + + // deserialize initial discovery finished flag + boolean initialDiscoveryFinished = in.readBoolean(); + + // skip initial partition IDs if present (backward compatibility with older V3 format) + if (in.available() > 0) { + int initialPartitionIdsSize = in.readInt(); + for (int i = 0; i < initialPartitionIdsSize; i++) { + in.readLong(); + } + } + + return new SourceEnumeratorState( + assignBucketAndPartitions.f0, + assignBucketAndPartitions.f1, + remainingHybridLakeFlussSplits, + leaseContext.getKvSnapshotLeaseId(), + initialDiscoveryFinished); + } + + private SourceEnumeratorState deserializeV4(byte[] serialized) throws IOException { + DataInputDeserializer in = new DataInputDeserializer(serialized); + Tuple2, Map> assignBucketAndPartitions = + deserializeAssignBucketAndPartitions(in); + List remainingHybridLakeFlussSplits = + deserializeRemainingHybridLakeFlussSplits(in); + + // deserialize lease context + LeaseContext leaseContext = deserializeLeaseId(in); + + // deserialize initial discovery finished flag + boolean initialDiscoveryFinished = in.readBoolean(); + + // deserialize unassigned splits + List unassignedSplits = deserializeUnassignedSplits(in); + + return new SourceEnumeratorState( + assignBucketAndPartitions.f0, + assignBucketAndPartitions.f1, + remainingHybridLakeFlussSplits, + leaseContext.getKvSnapshotLeaseId(), + initialDiscoveryFinished, + unassignedSplits); + } + private Tuple2, Map> deserializeAssignBucketAndPartitions( DataInputDeserializer in) throws IOException { // deserialize assigned buckets @@ -302,4 +371,37 @@ private LeaseContext deserializeLeaseId(final DataInputDeserializer in) throws I return new LeaseContext( kvSnapshotLeaseId, LeaseContext.DEFAULT.getKvSnapshotLeaseDurationMs()); } + + private void serializeUnassignedSplits( + final DataOutputSerializer out, Collection unassignedSplits) + throws IOException { + out.writeInt(unassignedSplits.size()); + if (!unassignedSplits.isEmpty()) { + SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource); + out.writeInt(sourceSplitSerializer.getVersion()); + for (SourceSplitBase split : unassignedSplits) { + byte[] serializeBytes = sourceSplitSerializer.serialize(split); + out.writeInt(serializeBytes.length); + out.write(serializeBytes); + } + } + } + + private List deserializeUnassignedSplits(final DataInputDeserializer in) + throws IOException { + int numSplits = in.readInt(); + if (numSplits == 0) { + return new ArrayList<>(); + } + SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource); + int version = in.readInt(); + List splits = new ArrayList<>(numSplits); + for (int i = 0; i < numSplits; i++) { + int splitSizeInBytes = in.readInt(); + byte[] splitBytes = new byte[splitSizeInBytes]; + in.readFully(splitBytes); + splits.add(sourceSplitSerializer.deserialize(version, splitBytes)); + } + return splits; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java index 7ee021ff58..32fce1c772 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java @@ -22,6 +22,8 @@ import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -44,15 +46,62 @@ public class SourceEnumeratorState { // lease context for restore. private final String leaseId; + /** + * Whether the initial partition discovery has been completed. Following FLIP-288, partitions + * discovered after the initial startup always use earliest offsets to prevent data loss. + */ + private final boolean initialDiscoveryFinished; + + /** + * Splits that have been initialized (offsets resolved) but not yet assigned to readers. + * Following Kafka's FLIP-288 pattern, these are persisted in checkpoint state so that on + * restore they can be directly assigned without re-initialization, preserving the original + * offset strategy. + */ + private final Collection unassignedSplits; + public SourceEnumeratorState( Set assignedBuckets, Map assignedPartitions, @Nullable List remainingHybridLakeFlussSplits, String leaseId) { + this( + assignedBuckets, + assignedPartitions, + remainingHybridLakeFlussSplits, + leaseId, + false, + Collections.emptyList()); + } + + public SourceEnumeratorState( + Set assignedBuckets, + Map assignedPartitions, + @Nullable List remainingHybridLakeFlussSplits, + String leaseId, + boolean initialDiscoveryFinished) { + this( + assignedBuckets, + assignedPartitions, + remainingHybridLakeFlussSplits, + leaseId, + initialDiscoveryFinished, + Collections.emptyList()); + } + + public SourceEnumeratorState( + Set assignedBuckets, + Map assignedPartitions, + @Nullable List remainingHybridLakeFlussSplits, + String leaseId, + boolean initialDiscoveryFinished, + Collection unassignedSplits) { this.assignedBuckets = assignedBuckets; this.assignedPartitions = assignedPartitions; this.remainingHybridLakeFlussSplits = remainingHybridLakeFlussSplits; this.leaseId = leaseId; + this.initialDiscoveryFinished = initialDiscoveryFinished; + this.unassignedSplits = unassignedSplits; } public Set getAssignedBuckets() { @@ -72,6 +121,14 @@ public String getLeaseId() { return leaseId; } + public boolean isInitialDiscoveryFinished() { + return initialDiscoveryFinished; + } + + public Collection getUnassignedSplits() { + return unassignedSplits; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -85,12 +142,19 @@ public boolean equals(Object o) { && Objects.equals(assignedPartitions, that.assignedPartitions) && Objects.equals( remainingHybridLakeFlussSplits, that.remainingHybridLakeFlussSplits) - && Objects.equals(leaseId, that.leaseId); + && Objects.equals(leaseId, that.leaseId) + && initialDiscoveryFinished == that.initialDiscoveryFinished + && Objects.equals(unassignedSplits, that.unassignedSplits); } @Override public int hashCode() { - return Objects.hash(assignedBuckets, assignedPartitions, remainingHybridLakeFlussSplits); + return Objects.hash( + assignedBuckets, + assignedPartitions, + remainingHybridLakeFlussSplits, + initialDiscoveryFinished, + unassignedSplits); } @Override @@ -104,6 +168,10 @@ public String toString() { + remainingHybridLakeFlussSplits + ", leaseId=" + leaseId + + ", initialDiscoveryFinished=" + + initialDiscoveryFinished + + ", unassignedSplits=" + + unassignedSplits + '}'; } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index 184e1b6e71..da2db33c8f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -556,7 +556,9 @@ void testRestore() throws Throwable { null, null, LeaseContext.DEFAULT, - true); + true, + true, + Collections.emptyList()); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -577,6 +579,210 @@ void testRestore() throws Throwable { } } + @Test + void testNewPartitionsUseEarliestOffset() throws Throwable { + int numSubtasks = 3; + createTable(DEFAULT_TABLE_PATH, DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR); + ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(numSubtasks); + MockWorkExecutor workExecutor = new MockWorkExecutor(context); + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + false, + true, + context, + Collections.emptySet(), + Collections.emptyMap(), + null, + OffsetsInitializer.latest(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + streaming, + null, + null, + workExecutor, + LeaseContext.DEFAULT, + false)) { + + Map partitionNameByIds = + waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH); + enumerator.start(); + + // register readers + for (int i = 0; i < numSubtasks; i++) { + registerReader(context, enumerator, i); + } + + // First partition discovery: initial partitions should use latest offset + runPeriodicPartitionDiscovery(workExecutor); + runPeriodicPartitionDiscovery(workExecutor); + + // Snapshot state - initialDiscoveryFinished should be true + SourceEnumeratorState state = enumerator.snapshotState(1L); + assertThat(state.isInitialDiscoveryFinished()).isTrue(); + + // Verify initial partitions got latest offset (offset >= 0, not EARLIEST_OFFSET) + List initialAssignedSplits = + getReadersAssignments(context).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + for (SourceSplitBase split : initialAssignedSplits) { + assertThat(split).isInstanceOf(LogSplit.class); + LogSplit logSplit = (LogSplit) split; + // latest offset for empty partition should be 0 + assertThat(logSplit.getStartingOffset()) + .as("Initial partitions should use latest offset (>=0), not earliest (-2)") + .isGreaterThanOrEqualTo(0L); + } + + // Create new partitions + List newPartitions = Arrays.asList("newPartition1", "newPartition2"); + createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions); + + // Second partition discovery: new partitions should use earliest offset + int assignmentStart = context.getSplitsAssignmentSequence().size(); + runPeriodicPartitionDiscovery(workExecutor); + + List newAssignedSplits = + getReadersAssignments(context, assignmentStart).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + assertThat(newAssignedSplits).isNotEmpty(); + for (SourceSplitBase split : newAssignedSplits) { + assertThat(split).isInstanceOf(LogSplit.class); + LogSplit logSplit = (LogSplit) split; + assertThat(logSplit.getStartingOffset()) + .as("Newly discovered partitions should use earliest offset") + .isEqualTo(EARLIEST_OFFSET); + } + + // Verify snapshot state is consistent after new partitions + SourceEnumeratorState state2 = enumerator.snapshotState(2L); + assertThat(state2.isInitialDiscoveryFinished()).isTrue(); + } + } + + /** + * Tests FLIP-288 restore: after restore with initialDiscoveryFinished=true, unassigned splits + * from state preserve their original offset (latest), while newly discovered partitions still + * use earliest offset. + */ + @Test + void testRestorePreservesInitialDiscoveryFinished() throws Throwable { + int numSubtasks = 3; + long tableId = + createTable(DEFAULT_TABLE_PATH, DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR); + ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + Map partitions = waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH); + + // Pick one partition to simulate as an unassigned split from previous checkpoint. + // This split was initialized with "latest" offset (resolved to 0 for empty partition) + // before the failover, but had not been assigned to any reader yet. + Map.Entry unassignedPartitionEntry = partitions.entrySet().iterator().next(); + long unassignedPartitionId = unassignedPartitionEntry.getKey(); + String unassignedPartitionName = unassignedPartitionEntry.getValue(); + long latestResolvedOffset = 0L; + LogSplit unassignedSplit = + new LogSplit( + new TableBucket(tableId, unassignedPartitionId, 0), + unassignedPartitionName, + latestResolvedOffset); + + // Simulate restore: initialDiscoveryFinished=true, one unassigned split from state + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(numSubtasks); + MockWorkExecutor workExecutor = new MockWorkExecutor(context); + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + false, + true, + context, + Collections.emptySet(), + Collections.emptyMap(), + null, + OffsetsInitializer.latest(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + Integer.MAX_VALUE, + streaming, + null, + null, + workExecutor, + LeaseContext.DEFAULT, + false, + true, + Collections.singletonList(unassignedSplit))) { + + enumerator.start(); + + // register readers + for (int i = 0; i < numSubtasks; i++) { + registerReader(context, enumerator, i); + } + + // Partition discovery after restore: since initialDiscoveryFinished=true, + // newly discovered partitions should use earliest offset. + // The unassigned split's partition should be filtered out (already in pending). + runPeriodicPartitionDiscovery(workExecutor); + + // Collect all assigned splits across all readers + List assignedSplits = + getReadersAssignments(context).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + + // Verify the restored unassigned split preserves its original latest offset + List restoredSplits = + assignedSplits.stream() + .filter( + s -> + s.getTableBucket() + .equals( + new TableBucket( + tableId, + unassignedPartitionId, + 0))) + .collect(Collectors.toList()); + assertThat(restoredSplits).hasSize(1); + LogSplit restoredLogSplit = (LogSplit) restoredSplits.get(0); + assertThat(restoredLogSplit.getStartingOffset()) + .as( + "Unassigned split from state should preserve its original " + + "latest offset, not be re-initialized") + .isEqualTo(latestResolvedOffset); + + // Verify other partitions (truly new discoveries) use earliest offset + List newDiscoverySplits = + assignedSplits.stream() + .filter( + s -> + !s.getTableBucket() + .equals( + new TableBucket( + tableId, + unassignedPartitionId, + 0))) + .collect(Collectors.toList()); + assertThat(newDiscoverySplits).isNotEmpty(); + for (SourceSplitBase split : newDiscoverySplits) { + assertThat(split).isInstanceOf(LogSplit.class); + LogSplit logSplit = (LogSplit) split; + assertThat(logSplit.getStartingOffset()) + .as( + "After restore with initialDiscoveryFinished=true, " + + "newly discovered partitions should use earliest offset") + .isEqualTo(EARLIEST_OFFSET); + } + + // Verify state after restore + discovery + SourceEnumeratorState state = enumerator.snapshotState(3L); + assertThat(state.isInitialDiscoveryFinished()).isTrue(); + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwable { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java index fa90eda96f..a2713a9708 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java @@ -191,4 +191,104 @@ void testEmptyPendingSplitsCheckpointSerdeWithoutLakeSource() throws Exception { .isNotNull() .isEmpty(); } + + @Test + void testV3SerdeWithInitialDiscoveryFinished() throws Exception { + FlussSourceEnumeratorStateSerializer serializer = + new FlussSourceEnumeratorStateSerializer(null); + + Set assignedBuckets = + new HashSet<>(Arrays.asList(new TableBucket(1, 0), new TableBucket(1, 4L, 1))); + Map assignedPartitions = new HashMap<>(); + assignedPartitions.put(1L, "partition1"); + assignedPartitions.put(2L, "partition2"); + + SourceEnumeratorState sourceEnumeratorState = + new SourceEnumeratorState( + assignedBuckets, assignedPartitions, null, "testLeaseId", true); + + byte[] serialized = serializer.serialize(sourceEnumeratorState); + SourceEnumeratorState deserialized = + serializer.deserialize(serializer.getVersion(), serialized); + + assertThat(deserialized).isEqualTo(sourceEnumeratorState); + assertThat(deserialized.isInitialDiscoveryFinished()).isTrue(); + } + + @Test + void testV3SerdeWithInitialDiscoveryNotFinished() throws Exception { + FlussSourceEnumeratorStateSerializer serializer = + new FlussSourceEnumeratorStateSerializer(null); + + Set assignedBuckets = + new HashSet<>(Collections.singletonList(new TableBucket(1, 5L, 0))); + Map assignedPartitions = new HashMap<>(); + assignedPartitions.put(5L, "p5"); + + SourceEnumeratorState sourceEnumeratorState = + new SourceEnumeratorState( + assignedBuckets, + assignedPartitions, + Collections.emptyList(), + "leaseId2", + false); + + byte[] serialized = serializer.serialize(sourceEnumeratorState); + SourceEnumeratorState deserialized = + serializer.deserialize(serializer.getVersion(), serialized); + + assertThat(deserialized).isEqualTo(sourceEnumeratorState); + assertThat(deserialized.isInitialDiscoveryFinished()).isFalse(); + } + + @Test + void testV2CompatibilityDefaultsInitialDiscoveryFinished() throws Exception { + // Serialize a state using V2 format (without initialDiscoveryFinished) and verify + // that deserialization defaults to safe values. + FlussSourceEnumeratorStateSerializer serializer = + new FlussSourceEnumeratorStateSerializer(null); + + Set assignedBuckets = + new HashSet<>(Arrays.asList(new TableBucket(1, 0), new TableBucket(1, 2L, 1))); + Map assignedPartitions = new HashMap<>(); + assignedPartitions.put(2L, "partition2"); + + SourceEnumeratorState originalState = + new SourceEnumeratorState(assignedBuckets, assignedPartitions, null, "v2LeaseId"); + + // Serialize using current format (V3) and strip the trailing boolean to produce V2 bytes. + // V3 appends: boolean(1 byte) for initialDiscoveryFinished + byte[] v3Bytes = serializer.serialize(originalState); + // V3 adds: 1 byte (boolean false) + int v2Length = v3Bytes.length - 1; + byte[] v2Bytes = new byte[v2Length]; + System.arraycopy(v3Bytes, 0, v2Bytes, 0, v2Length); + + SourceEnumeratorState deserialized = serializer.deserialize(2, v2Bytes); + + assertThat(deserialized.getAssignedBuckets()).isEqualTo(assignedBuckets); + assertThat(deserialized.getAssignedPartitions()).isEqualTo(assignedPartitions); + assertThat(deserialized.getLeaseId()).isEqualTo("v2LeaseId"); + // V2 uses the 4-param constructor which defaults initialDiscoveryFinished=false. + // On restore, the first discovery round will treat all discovered partitions + // as initial partitions (safe behavior). + assertThat(deserialized.isInitialDiscoveryFinished()).isFalse(); + } + + @Test + void testV3SerdeWithEmptyState() throws Exception { + FlussSourceEnumeratorStateSerializer serializer = + new FlussSourceEnumeratorStateSerializer(null); + + SourceEnumeratorState sourceEnumeratorState = + new SourceEnumeratorState( + Collections.emptySet(), Collections.emptyMap(), null, "leaseEmpty", true); + + byte[] serialized = serializer.serialize(sourceEnumeratorState); + SourceEnumeratorState deserialized = + serializer.deserialize(serializer.getVersion(), serialized); + + assertThat(deserialized).isEqualTo(sourceEnumeratorState); + assertThat(deserialized.isInitialDiscoveryFinished()).isTrue(); + } }