From 8cbd27e044f73d700e06e460aedab0ec52d50316 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Sat, 11 Apr 2026 02:29:47 +0530 Subject: [PATCH 1/2] fix: v2 compatibility --- .github/workflows/main.yml | 1 + build.gradle | 2 +- gradle.properties | 2 +- .../kestra/plugin/transform/MapFlowTest.java | 38 +++++++++++++------ 4 files changed, 29 insertions(+), 14 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 2d9fc98..d509c2d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -46,4 +46,5 @@ jobs: with: skip-test: ${{ github.event.inputs.skip-test == 'true' }} kestra-version: ${{ github.event.inputs.kestra-version }} + java-version: '25' secrets: inherit diff --git a/build.gradle b/build.gradle index 8a1ffbe..d591366 100644 --- a/build.gradle +++ b/build.gradle @@ -18,7 +18,7 @@ tasks.withType(JavaCompile) { options.compilerArgs.add("-parameters") } -final targetJavaVersion = JavaVersion.VERSION_21 +final targetJavaVersion = JavaVersion.VERSION_25 group "io.kestra.plugin" allprojects { diff --git a/gradle.properties b/gradle.properties index 7604614..ae65d6c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ version=1.5.2-SNAPSHOT -kestraVersion=1.3.0 +kestraVersion=2.0.0-SNAPSHOT diff --git a/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java b/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java index b575982..1ce9655 100644 --- a/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java +++ b/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java @@ -1,10 +1,13 @@ package io.kestra.plugin.transform; +import io.kestra.core.exceptions.InternalException; import io.kestra.core.junit.annotations.ExecuteFlow; import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.State; +import io.kestra.core.services.TaskOutputService; +import jakarta.inject.Inject; import org.junit.jupiter.api.Test; import java.util.List; @@ -16,6 +19,9 @@ @KestraTest(startRunner = true) class MapFlowTest { + @Inject + private TaskOutputService taskOutputService; + @Test @ExecuteFlow("flows/map_flow.yaml") void executesFlow(Execution execution) { @@ -23,7 +29,7 @@ void executesFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("map"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); List> records = (List>) outputs.get("records"); assertThat(records.size(), is(1)); @@ -35,12 +41,12 @@ void executesFlow(Execution execution) { @Test @ExecuteFlow("flows/map_flow_store.yaml") - void executesStoreFlow(Execution execution) { + void executesStoreFlow(Execution execution) throws InternalException { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); List taskRuns = execution.findTaskRunsByTaskId("map"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); assertThat(outputs.containsKey("records"), is(false)); Object uri = outputs.get("uri"); @@ -55,7 +61,7 @@ void executesUnnestStoreFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("explode"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); assertThat(outputs.containsKey("records"), is(false)); Object uri = outputs.get("uri"); @@ -70,7 +76,7 @@ void executesFilterStoreFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("filter"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); assertThat(outputs.containsKey("records"), is(false)); Object uri = outputs.get("uri"); @@ -85,7 +91,7 @@ void executesAggregateStoreFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("aggregate"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); assertThat(outputs.containsKey("records"), is(false)); Object uri = outputs.get("uri"); @@ -100,7 +106,7 @@ void executesZipFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("zip"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); List> records = (List>) outputs.get("records"); assertThat(records.size(), is(2)); @@ -116,7 +122,7 @@ void executesZipStoreFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("zip"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); assertThat(outputs.containsKey("records"), is(false)); Object uri = outputs.get("uri"); @@ -130,7 +136,7 @@ void executesSelectFlow(Execution execution) { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); TaskRun taskRun = execution.findTaskRunsByTaskId("select").getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); List> records = (List>) outputs.get("records"); assertThat(records, hasSize(1)); @@ -175,7 +181,7 @@ void executesSelectBinaryStoreFlow(Execution execution) { assertThat(uri.toString().startsWith("kestra://"), is(true)); TaskRun readBackRun = execution.findTaskRunsByTaskId("read_back").getFirst(); - Map readBackOutputs = (Map) readBackRun.getOutputs(); + Map readBackOutputs = outputsOf(readBackRun); List> records = (List>) readBackOutputs.get("records"); assertThat(records, hasSize(1)); assertThat(((Number) records.getFirst().get("a")).longValue(), is(1L)); @@ -188,7 +194,7 @@ void executesSelectLengthMismatchSkipFlow(Execution execution) { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); TaskRun taskRun = execution.findTaskRunsByTaskId("select").getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); List> records = (List>) outputs.get("records"); assertThat(records, hasSize(1)); @@ -202,10 +208,18 @@ void executesSelectOnErrorKeepFlow(Execution execution) { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); TaskRun taskRun = execution.findTaskRunsByTaskId("select").getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); List> records = (List>) outputs.get("records"); assertThat(records, hasSize(1)); assertThat(records.getFirst().get("total_spent"), is("not-a-number")); } + + protected Map outputsOf(TaskRun taskRun) { + try { + return taskOutputService.getOutputs(taskRun); + } catch (InternalException e) { + throw new RuntimeException(e); + } + } } From 4eeeab90f41325e7e7ea3a2cbd3730d4b95a5796 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Sat, 11 Apr 2026 02:42:27 +0530 Subject: [PATCH 2/2] fix: v2 compatibility --- build.gradle | 1 + gradle.properties | 1 + plugin-transform-grok/src/test/resources/application.yml | 7 +++++++ plugin-transform-json/src/test/resources/application.yml | 6 ++++++ .../test/java/io/kestra/plugin/transform/MapFlowTest.java | 6 +++--- .../src/test/resources/application.yml | 6 ++++++ 6 files changed, 24 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index d591366..e8ce3ca 100644 --- a/build.gradle +++ b/build.gradle @@ -121,6 +121,7 @@ subprojects { testImplementation group: "io.kestra", name: "repository-memory", version: kestraVersion testImplementation group: "io.kestra", name: "runner-memory", version: kestraVersion testImplementation group: "io.kestra", name: "storage-local", version: kestraVersion + testImplementation group: "io.kestra", name: "indexer", version: kestraVersion // test testImplementation "org.junit.jupiter:junit-jupiter-engine" diff --git a/gradle.properties b/gradle.properties index ae65d6c..31ad56a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,3 @@ version=1.5.2-SNAPSHOT kestraVersion=2.0.0-SNAPSHOT +org.gradle.jvmargs=-Xmx2g diff --git a/plugin-transform-grok/src/test/resources/application.yml b/plugin-transform-grok/src/test/resources/application.yml index 636ef67..520f082 100644 --- a/plugin-transform-grok/src/test/resources/application.yml +++ b/plugin-transform-grok/src/test/resources/application.yml @@ -7,3 +7,10 @@ kestra: type: local local: base-path: /tmp/unittest + worker: + controllers: + type: STATIC + static: + endpoints: + - host: localhost + diff --git a/plugin-transform-json/src/test/resources/application.yml b/plugin-transform-json/src/test/resources/application.yml index 636ef67..e264bbb 100644 --- a/plugin-transform-json/src/test/resources/application.yml +++ b/plugin-transform-json/src/test/resources/application.yml @@ -7,3 +7,9 @@ kestra: type: local local: base-path: /tmp/unittest + worker: + controllers: + type: STATIC + static: + endpoints: + - host: localhost \ No newline at end of file diff --git a/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java b/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java index 1ce9655..6f29cd8 100644 --- a/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java +++ b/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java @@ -153,14 +153,14 @@ void executesSelectStoreFlow(Execution execution) { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); TaskRun selectRun = execution.findTaskRunsByTaskId("select").getFirst(); - Map selectOutputs = (Map) selectRun.getOutputs(); + Map selectOutputs = (Map) outputsOf(selectRun); assertThat(selectOutputs.containsKey("records"), is(false)); Object uri = selectOutputs.get("uri"); assertThat(uri != null, is(true)); assertThat(uri.toString().startsWith("kestra://"), is(true)); TaskRun readBackRun = execution.findTaskRunsByTaskId("read_back").getFirst(); - Map readBackOutputs = (Map) readBackRun.getOutputs(); + Map readBackOutputs = (Map) outputsOf(readBackRun); List> records = (List>) readBackOutputs.get("records"); assertThat(records, hasSize(2)); assertThat(((Number) records.getFirst().get("a")).longValue(), is(1L)); @@ -175,7 +175,7 @@ void executesSelectBinaryStoreFlow(Execution execution) { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); TaskRun selectRun = execution.findTaskRunsByTaskId("select").getFirst(); - Map selectOutputs = (Map) selectRun.getOutputs(); + Map selectOutputs = (Map) outputsOf(selectRun); Object uri = selectOutputs.get("uri"); assertThat(uri != null, is(true)); assertThat(uri.toString().startsWith("kestra://"), is(true)); diff --git a/plugin-transform-records/src/test/resources/application.yml b/plugin-transform-records/src/test/resources/application.yml index 636ef67..919190e 100644 --- a/plugin-transform-records/src/test/resources/application.yml +++ b/plugin-transform-records/src/test/resources/application.yml @@ -7,3 +7,9 @@ kestra: type: local local: base-path: /tmp/unittest + worker: + controllers: + type: STATIC + static: + endpoints: + - host: localhost