Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/generated/flink_connector_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@
<td>Boolean</td>
<td>Indicates whether to further sort data belonged to each sink task after range partitioning.</td>
</tr>
<tr>
<td><h5>sink.committer-coordinator-operator.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Allow coordinator replace committer operator, only support for append table now.</td>
</tr>
<tr>
<td><h5>sink.committer-cpu</h5></td>
<td style="word-wrap: break-word;">1.0</td>
Expand Down
1 change: 1 addition & 0 deletions paimon-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ under the License.
<unzip src="/tmp/paimon-e2e-tests-jars/bundled-hadoop.jar" dest="target/temp"/>
<!-- Delete the conflicting file -->
<delete file="target/temp/org/apache/commons/cli/CommandLine.class"/>
<delete file="/tmp/paimon-e2e-tests-jars/bundled-hadoop.jar"/>
<!-- Repackage the modified content -->
<zip destfile="/tmp/paimon-e2e-tests-jars/bundled-hadoop.jar" basedir="target/temp"/>
<!-- Delete the extracted temporary folder -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public abstract class E2eTestBase {
private final boolean withKafka;
private final boolean withHive;
private final boolean withSpark;
private final int taskManagerReplicas;

protected E2eTestBase() {
this(false, false);
Expand All @@ -68,9 +69,15 @@ protected E2eTestBase(boolean withKafka, boolean withHive) {
}

protected E2eTestBase(boolean withKafka, boolean withHive, boolean withSpark) {
this(withKafka, withHive, withSpark, 1);
}

protected E2eTestBase(
boolean withKafka, boolean withHive, boolean withSpark, int taskManagerReplicas) {
this.withKafka = withKafka;
this.withHive = withHive;
this.withSpark = withSpark;
this.taskManagerReplicas = taskManagerReplicas;
}

protected static final String TEST_DATA_DIR = "/test-data";
Expand Down Expand Up @@ -104,10 +111,13 @@ public void before() throws Exception {
.getResource("docker-compose.yaml")
.toURI()))
.withEnv("NETWORK_ID", ((Network.NetworkImpl) network).getName())
.withEnv("FLINK_ENV_FILE", flinkEnvFile())
.withLogConsumer("jobmanager-1", new LogConsumer(LOG))
.withLogConsumer("taskmanager-1", new LogConsumer(LOG))
.withStartupTimeout(Duration.ofMinutes(3))
.withLocalCompose(true);
for (int i = 1; i <= taskManagerReplicas; i++) {
environment.withLogConsumer("taskmanager-" + i, new LogConsumer(LOG));
}
if (withKafka) {
services.add("kafka");
environment.withLogConsumer("kafka-1", new Slf4jLogConsumer(LOG));
Expand Down Expand Up @@ -140,11 +150,17 @@ public void before() throws Exception {
".*Master: I have been elected leader! New state: ALIVE.*", 1));
}
environment.withServices(services.toArray(new String[0])).withLocalCompose(true);
if (taskManagerReplicas > 1) {
environment.withScaledService("taskmanager", taskManagerReplicas);
environment.withExposedService("jobmanager-1", 8081);
}

environment.waitingFor("jobmanager-1", buildWaitStrategy(".*Registering TaskManager.*", 1));
environment.waitingFor(
"taskmanager-1",
buildWaitStrategy(".*Successful registration at resource manager.*", 1));
for (int i = 1; i <= taskManagerReplicas; i++) {
environment.waitingFor(
"taskmanager-" + i,
buildWaitStrategy(".*Successful registration at resource manager.*", 1));
}
environment.start();

jobManager = environment.getContainerByServiceName("jobmanager-1").get();
Expand All @@ -156,6 +172,20 @@ public void before() throws Exception {
flinkVersion = flinkVersionMatcher.find() ? flinkVersionMatcher.group(1) : null;
}

protected String flinkEnvFile() {
return "flink.env";
}

protected String flinkRestUrl() {
if (taskManagerReplicas <= 1) {
throw new IllegalStateException("Flink REST is not exposed for this test.");
}
return String.format(
"http://%s:%d",
environment.getServiceHost("jobmanager-1", 8081),
environment.getServicePort("jobmanager-1", 8081));
}

private WaitStrategy buildWaitStrategy(String regex, int times) {
// Increase timeout from 60s (default value) to 180s
return Wait.forLogMessage(regex, times).withStartupTimeout(Duration.ofSeconds(180));
Expand Down
Loading
Loading