From 291e31cb2fc610cec51b0cb4eff54607be0bfe76 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Tue, 23 Jun 2026 14:15:29 -0700 Subject: [PATCH 1/5] increase amount of data for csv scan to avoid engine finishing before a pause is generated leading to a failed ci --- .../amber/engine/e2e/ReconfigurationIntegrationSpec.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala index 5d2ed7e5e4c..473becb7d51 100644 --- a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala +++ b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala @@ -167,7 +167,8 @@ class ReconfigurationIntegrationSpec TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps, newOpExecInitInfo) "Engine" should "be able to modify a python UDF worker in workflow" in { - val sourceOpDesc = TestOperators.smallCsvScanOpDesc() + // Medium source keeps the run in flight long enough to pause before it completes. + val sourceOpDesc = TestOperators.mediumCsvScanOpDesc() val udfOpDesc = TestOperators.pythonOpDesc() val code = """ |from pytexera import * @@ -228,7 +229,8 @@ class ReconfigurationIntegrationSpec } "Engine" should "be able to modify two python UDFs in workflow" in { - val sourceOpDesc = TestOperators.smallCsvScanOpDesc() + // Medium source keeps the run in flight long enough to pause before it completes. + val sourceOpDesc = TestOperators.mediumCsvScanOpDesc() val udfOpDesc1 = TestOperators.pythonOpDesc() val udfOpDesc2 = TestOperators.pythonOpDesc() val code = """ From a0841b4e9ac59e4dce92ae6c77d0d921c2ac9aec Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 26 Jun 2026 16:37:14 -0700 Subject: [PATCH 2/5] bound the csv source spec to 10000 --- .../engine/e2e/ReconfigurationIntegrationSpec.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala index 473becb7d51..dd41162ee90 100644 --- a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala +++ b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala @@ -166,9 +166,14 @@ class ReconfigurationIntegrationSpec ): Map[OperatorIdentity, List[Tuple]] = TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps, newOpExecInitInfo) + private def boundedCsvSource() = { + val src = TestOperators.mediumCsvScanOpDesc() + src.limit = Some(10000) + src + } + "Engine" should "be able to modify a python UDF worker in workflow" in { - // Medium source keeps the run in flight long enough to pause before it completes. - val sourceOpDesc = TestOperators.mediumCsvScanOpDesc() + val sourceOpDesc = boundedCsvSource() val udfOpDesc = TestOperators.pythonOpDesc() val code = """ |from pytexera import * @@ -229,8 +234,7 @@ class ReconfigurationIntegrationSpec } "Engine" should "be able to modify two python UDFs in workflow" in { - // Medium source keeps the run in flight long enough to pause before it completes. - val sourceOpDesc = TestOperators.mediumCsvScanOpDesc() + val sourceOpDesc = boundedCsvSource() val udfOpDesc1 = TestOperators.pythonOpDesc() val udfOpDesc2 = TestOperators.pythonOpDesc() val code = """ From b1728beec57078c0b87380d9448004a01a18c8bb Mon Sep 17 00:00:00 2001 From: "Matthew B." Date: Tue, 30 Jun 2026 15:35:06 -0700 Subject: [PATCH 3/5] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Matthew B. --- .../amber/engine/e2e/ReconfigurationIntegrationSpec.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala index dd41162ee90..57419ef0fc8 100644 --- a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala +++ b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala @@ -167,9 +167,7 @@ class ReconfigurationIntegrationSpec TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps, newOpExecInitInfo) private def boundedCsvSource() = { - val src = TestOperators.mediumCsvScanOpDesc() - src.limit = Some(10000) - src + TestOperators.mediumCsvScanOpDesc() } "Engine" should "be able to modify a python UDF worker in workflow" in { From a0cef707800333db70b84de57f6af27c735231dd Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Wed, 1 Jul 2026 11:06:59 -0700 Subject: [PATCH 4/5] fix slow python startup time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test that keeps failing: it starts a workflow, pauses it, swaps some code, then resumes it. To pause, it asks each worker "please pause" and waits 5 seconds for a reply. The problem: some workers run Python. Starting Python is slow (it has to boot the interpreter and load libraries). If a worker is still busy starting up Python, it can't reply "paused" within 5 seconds → the test gives up → CI fails. The test that uses two Python workers is slowest to start, so it fails most. --- .../e2e/ReconfigurationIntegrationSpec.scala | 29 +++++++++++-------- .../texera/amber/engine/e2e/TestUtils.scala | 12 ++++---- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala index 57419ef0fc8..cff8ce74e36 100644 --- a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala +++ b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala @@ -105,26 +105,31 @@ class ReconfigurationIntegrationSpec } /** - * Run a trivial pure-Scala workflow (TextInput → terminal) once before the - * timed tests start, so the first 5-second `startWorkflow` await in - * [[TestUtils.shouldReconfigure]] doesn't have to absorb JVM JIT - * warmup, pekko dispatcher first-touch, and `RegionExecutionCoordinator` - * class loading. - * - * Hard-capped at 10 seconds total, defensively wrapped: if warmup itself - * times out or throws, log and continue — the existing `Retries` mixin - * still backs up individual test cases. This ensures warmup can never - * hang the suite. + * Runs a TextInput -> Python UDF workflow once before the timed tests so + * Python worker cold-start is paid here, not inside a timed test. Capped and + * wrapped so warmup can never fail or hang the suite. */ private def warmupOnce(): Unit = { - val warmupCap = Duration.fromSeconds(10) + val warmupCap = Duration.fromSeconds(60) setUpWorkflowExecutionData(specId) var client: AmberClient = null try { val src = new TextInputSourceOpDesc() src.textInput = "warmup" + val udf = TestOperators.pythonOpDesc() val warmupCtx = TestUtils.workflowContext(specId) - val workflow = buildWorkflow(List(src), List.empty, warmupCtx) + val workflow = buildWorkflow( + List(src, udf), + List( + LogicalLink( + src.operatorIdentifier, + PortIdentity(), + udf.operatorIdentifier, + PortIdentity() + ) + ), + warmupCtx + ) client = new AmberClient( system, workflow.context, diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala index ac71483a5d3..d001c4471c1 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala @@ -301,6 +301,8 @@ object TestUtils { ControllerConfig.default, error => {} ) + // Timeout for control-command acks (start/pause/reconfigure/resume). + val commandTimeout = Duration.fromSeconds(30) val completion = Promise[Unit]() var result: Map[OperatorIdentity, List[Tuple]] = null client.registerCallback[ExecutionStateUpdate](evt => { @@ -311,14 +313,14 @@ object TestUtils { }) Await.result( client.controllerInterface.startWorkflow(EmptyRequest(), ()), - Duration.fromSeconds(5) + commandTimeout ) val pausedReached = stateReached(client, PAUSED) Await.result( client.controllerInterface.pauseWorkflow(EmptyRequest(), ()), - Duration.fromSeconds(5) + commandTimeout ) - Await.result(pausedReached, Duration.fromSeconds(10)) + Await.result(pausedReached, commandTimeout) val physicalOps = targetOps.flatMap(op => workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier) ) @@ -330,11 +332,11 @@ object TestUtils { ), () ), - Duration.fromSeconds(5) + commandTimeout ) Await.result( client.controllerInterface.resumeWorkflow(EmptyRequest(), ()), - Duration.fromSeconds(5) + commandTimeout ) Await.result(completion, Duration.fromMinutes(1)) result From 12e0a6e595ebd44d1a1df4159f9c38f79949740e Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Wed, 1 Jul 2026 11:50:02 -0700 Subject: [PATCH 5/5] add slow source to avoid ci failure --- .../e2e/ReconfigurationIntegrationSpec.scala | 17 ++++++++-------- .../texera/amber/operator/TestOperators.scala | 20 +++++++++++++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala index cff8ce74e36..b4545a19b15 100644 --- a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala +++ b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala @@ -171,12 +171,13 @@ class ReconfigurationIntegrationSpec ): Map[OperatorIdentity, List[Tuple]] = TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps, newOpExecInitInfo) - private def boundedCsvSource() = { - TestOperators.mediumCsvScanOpDesc() - } + // Small source that emits slowly (30 rows, 0.25s apart) so a pause lands + // mid-run and the workflow still completes quickly after resume. + private def slowSource() = + TestOperators.slowRegionSourceOpDesc(numTuple = 30, delaySeconds = 0.25) "Engine" should "be able to modify a python UDF worker in workflow" in { - val sourceOpDesc = boundedCsvSource() + val sourceOpDesc = slowSource() val udfOpDesc = TestOperators.pythonOpDesc() val code = """ |from pytexera import * @@ -207,7 +208,7 @@ class ReconfigurationIntegrationSpec } "Engine" should "propagate reconfiguration through a source operator in workflow" in { - val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000) + val sourceOpDesc = slowSource() val udfOpDesc = TestOperators.pythonOpDesc() val code = """ |from pytexera import * @@ -215,7 +216,7 @@ class ReconfigurationIntegrationSpec |class ProcessTupleOperator(UDFOperatorV2): | @overrides | def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: - | tuple_['field_1'] = tuple_['field_1'] + '_reconfigured' + | tuple_['Region'] = tuple_['Region'] + '_reconfigured' | yield tuple_ |""".stripMargin val result = shouldReconfigure( @@ -232,12 +233,12 @@ class ReconfigurationIntegrationSpec OpExecWithCode(code, "python") ) assert(result(udfOpDesc.operatorIdentifier).exists { t => - t.getField("field_1").asInstanceOf[String].contains("_reconfigured") + t.getField("Region").asInstanceOf[String].contains("_reconfigured") }) } "Engine" should "be able to modify two python UDFs in workflow" in { - val sourceOpDesc = boundedCsvSource() + val sourceOpDesc = slowSource() val udfOpDesc1 = TestOperators.pythonOpDesc() val udfOpDesc2 = TestOperators.pythonOpDesc() val code = """ diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala index b3c38735957..69783b5e1fb 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala @@ -181,4 +181,24 @@ object TestOperators { udf } + // Emits `numTuple` rows with a "Region" column, sleeping `delaySeconds` + // between rows. + def slowRegionSourceOpDesc(numTuple: Int, delaySeconds: Double): PythonUDFSourceOpDescV2 = { + val udf = new PythonUDFSourceOpDescV2() + udf.workers = 1 + udf.columns = List(new Attribute("Region", AttributeType.STRING)) + udf.code = s""" + |from pytexera import * + |import time + | + |class UDFSourceOperator(UDFSourceOperator): + | @overrides + | def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]: + | for i in range($numTuple): + | time.sleep($delaySeconds) + | yield {'Region': 'Asia'} + |""".stripMargin + udf + } + }