Skip to content

[feature] Support sink.committer-coordinator-operator.enabled#8372

Open
fishfishfishfishaa wants to merge 1 commit into
apache:masterfrom
fishfishfishfishaa:pip30-redesign-0628
Open

[feature] Support sink.committer-coordinator-operator.enabled#8372
fishfishfishfishaa wants to merge 1 commit into
apache:masterfrom
fishfishfishfishaa:pip30-redesign-0628

Conversation

@fishfishfishfishaa

Copy link
Copy Markdown

Description

This PR implements PIP-30: Improvement For Paimon Committer In Flink.

One important background is that, for a given Flink checkpoint, the coordinator checkpoint is triggered before writer tasks start their checkpoint. Because of this ordering, I first tried a minimal prototype which maintained coordinator state through a custom state mechanism when Flink did not expose checkpoint state for this use case: #7963.

However, that approach introduced several problems, including two independent state systems, higher complexity, extra dependency on HDFS, and additional blocking while waiting for custom state to be persisted to HDFS.

Thanks to @liubiao for proposing the current design:
https://docs.google.com/document/d/1asWKzoytfeB1D8bS_yRIAHnpR40frLf0fnPn2-WSL74/edit?tab=t.0

The key idea is to move the durable file-info state to the writer task side, so it is persisted together with Flink checkpoint state. Meanwhile, PWC keeps an in-memory pending state for received file info. If the coordinator itself fails over, the in-memory committable map can be rebuilt by writer tasks replaying their pending file info from writer state. The coordinator only persists necessary metadata through the normal coordinator checkpoint interface, such as commitUser.

Following this direction, this PR focuses the optimization on the unaware bucket append path, while keeping the original FixedBucketSink path unaffected.

Design

When the option is enabled, writer tasks still flush data and produce committables during prepareSnapshotPreBarrier.

In addition, each writer task records the produced file info in writer-side state in a checkpointId -> fileInfo form. This state is part of Flink operator state and is persisted together with Flink checkpoints.

During snapshotState:

  1. The writer first updates its local pending file-info state.
  2. Then it sends the unacknowledged file info to PaimonWriterCoordinator through FileInfoRequest.
  3. The writer waits for the PWC ACK before returning from snapshotState.

If the request fails, writer snapshot fails and the checkpoint cannot complete. If the request blocks for too long, it is guarded by Flink checkpoint timeout.

On the PWC side:

  1. PWC validates whether the request comes from a valid registered subtask attempt.
  2. PWC stores the received file info as reliable in-memory pending state.
  3. If the current checkpoint envelope has not received all subtasks, PWC replies ACK immediately.
  4. If the current request is from the last subtask of the checkpoint envelope, PWC stages pending file info up to this checkpoint into the committable map, and then replies ACK.
  5. Later, Flink checkpoint coordinator triggers notifyCheckpointComplete, and PWC commits all committables up to that checkpoint.

After commit succeeds, PWC sends CommitCompleteEvent to active writer subtasks. Writers use this event to clean local pending state up to the committed checkpoint, preventing writer-side state from growing forever.

Recovery Semantics

Writer Task Failover

When a writer task fails over, the new attempt restores writer state in initializeState.

The recovered writer then sends its pending file info to PWC through the same request path. PWC tracks valid execution attempts through executionAttemptReady and executionAttemptFailed, so stale attempt requests can be rejected, and pending file info from failed attempts can be cleaned or replaced.

For recovered file info, PWC waits until all recovered writer requests are received, and then calls filterAndCommitUpToCheckpoint. If this commit creates a new snapshot, PWC treats it as an incomplete previous commit and triggers one extra failover, following the original recommit semantics.

JM / PWC Failover

For JM / PWC failover, PWC does not persist the full in-memory committable map as coordinator state.

Instead, PWC only restores necessary coordinator metadata from Flink coordinator checkpoint state, such as commitUser. The actual pending file info is restored from writer task state. After recovery, writer tasks replay their pending file info through FileInfoRequest, and PWC rebuilds its in-memory committable map from these recovered requests.

In other words, PWC pending commit state is recovered by writer-state replay, not by independently persisting the full coordinator-side committable map.

Checkpoint Abort Without Task Failure

If a checkpoint is aborted but writer tasks do not fail, already ACKed file info is kept as reliable pending state in PWC memory.

A later successful checkpoint envelope can stage and commit this file info. This preserves the original semantics that checkpoint abort does not mean data rollback.

Scope

This PR focuses on the unaware bucket append path, especially RowAppendTableSink, where removing the committer operator can help reduce the failover region.

It does not change the original FixedBucketSink commit path. Fixed bucket and other paths involving shuffle, assigner, index, or compact operators are not included in this optimization, because those operators may already bind regions together or require different recovery semantics.

Tests

This PR adds and updates tests for the new PWC commit path.

Unit tests cover:

  • writer-side coordinated committable state and ACK tracking;
  • FileInfoRequest request / ACK behavior;
  • checkpoint complete and checkpoint abort handling in PWC;
  • recovered file info and recommit behavior;
  • stale attempt validation;
  • writer operator interaction with the coordinated file-info sender.

IT coverage verifies coordinator commit behavior and ensures non-target sink paths are not affected.

E2E coverage verifies:

  • committing through PWC without the original committer / compact operators in the job plan;
  • checkpoint abort without task failure, followed by successful commit through a later checkpoint;
  • TaskManager failure / region failover behavior;
  • savepoint restore replaying pending file info.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant