diff --git a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala index 5d2ed7e5e4c..b4545a19b15 100644 --- a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala +++ b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala @@ -105,26 +105,31 @@ class ReconfigurationIntegrationSpec } /** - * Run a trivial pure-Scala workflow (TextInput → terminal) once before the - * timed tests start, so the first 5-second `startWorkflow` await in - * [[TestUtils.shouldReconfigure]] doesn't have to absorb JVM JIT - * warmup, pekko dispatcher first-touch, and `RegionExecutionCoordinator` - * class loading. - * - * Hard-capped at 10 seconds total, defensively wrapped: if warmup itself - * times out or throws, log and continue — the existing `Retries` mixin - * still backs up individual test cases. This ensures warmup can never - * hang the suite. + * Runs a TextInput -> Python UDF workflow once before the timed tests so + * Python worker cold-start is paid here, not inside a timed test. Capped and + * wrapped so warmup can never fail or hang the suite. */ private def warmupOnce(): Unit = { - val warmupCap = Duration.fromSeconds(10) + val warmupCap = Duration.fromSeconds(60) setUpWorkflowExecutionData(specId) var client: AmberClient = null try { val src = new TextInputSourceOpDesc() src.textInput = "warmup" + val udf = TestOperators.pythonOpDesc() val warmupCtx = TestUtils.workflowContext(specId) - val workflow = buildWorkflow(List(src), List.empty, warmupCtx) + val workflow = buildWorkflow( + List(src, udf), + List( + LogicalLink( + src.operatorIdentifier, + PortIdentity(), + udf.operatorIdentifier, + PortIdentity() + ) + ), + warmupCtx + ) client = new AmberClient( system, workflow.context, @@ -166,8 +171,13 @@ class ReconfigurationIntegrationSpec ): Map[OperatorIdentity, List[Tuple]] = TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps, newOpExecInitInfo) + // Small source that emits slowly (30 rows, 0.25s apart) so a pause lands + // mid-run and the workflow still completes quickly after resume. + private def slowSource() = + TestOperators.slowRegionSourceOpDesc(numTuple = 30, delaySeconds = 0.25) + "Engine" should "be able to modify a python UDF worker in workflow" in { - val sourceOpDesc = TestOperators.smallCsvScanOpDesc() + val sourceOpDesc = slowSource() val udfOpDesc = TestOperators.pythonOpDesc() val code = """ |from pytexera import * @@ -198,7 +208,7 @@ class ReconfigurationIntegrationSpec } "Engine" should "propagate reconfiguration through a source operator in workflow" in { - val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000) + val sourceOpDesc = slowSource() val udfOpDesc = TestOperators.pythonOpDesc() val code = """ |from pytexera import * @@ -206,7 +216,7 @@ class ReconfigurationIntegrationSpec |class ProcessTupleOperator(UDFOperatorV2): | @overrides | def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: - | tuple_['field_1'] = tuple_['field_1'] + '_reconfigured' + | tuple_['Region'] = tuple_['Region'] + '_reconfigured' | yield tuple_ |""".stripMargin val result = shouldReconfigure( @@ -223,12 +233,12 @@ class ReconfigurationIntegrationSpec OpExecWithCode(code, "python") ) assert(result(udfOpDesc.operatorIdentifier).exists { t => - t.getField("field_1").asInstanceOf[String].contains("_reconfigured") + t.getField("Region").asInstanceOf[String].contains("_reconfigured") }) } "Engine" should "be able to modify two python UDFs in workflow" in { - val sourceOpDesc = TestOperators.smallCsvScanOpDesc() + val sourceOpDesc = slowSource() val udfOpDesc1 = TestOperators.pythonOpDesc() val udfOpDesc2 = TestOperators.pythonOpDesc() val code = """ diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala index ac71483a5d3..d001c4471c1 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala @@ -301,6 +301,8 @@ object TestUtils { ControllerConfig.default, error => {} ) + // Timeout for control-command acks (start/pause/reconfigure/resume). + val commandTimeout = Duration.fromSeconds(30) val completion = Promise[Unit]() var result: Map[OperatorIdentity, List[Tuple]] = null client.registerCallback[ExecutionStateUpdate](evt => { @@ -311,14 +313,14 @@ object TestUtils { }) Await.result( client.controllerInterface.startWorkflow(EmptyRequest(), ()), - Duration.fromSeconds(5) + commandTimeout ) val pausedReached = stateReached(client, PAUSED) Await.result( client.controllerInterface.pauseWorkflow(EmptyRequest(), ()), - Duration.fromSeconds(5) + commandTimeout ) - Await.result(pausedReached, Duration.fromSeconds(10)) + Await.result(pausedReached, commandTimeout) val physicalOps = targetOps.flatMap(op => workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier) ) @@ -330,11 +332,11 @@ object TestUtils { ), () ), - Duration.fromSeconds(5) + commandTimeout ) Await.result( client.controllerInterface.resumeWorkflow(EmptyRequest(), ()), - Duration.fromSeconds(5) + commandTimeout ) Await.result(completion, Duration.fromMinutes(1)) result diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala index b3c38735957..69783b5e1fb 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala @@ -181,4 +181,24 @@ object TestOperators { udf } + // Emits `numTuple` rows with a "Region" column, sleeping `delaySeconds` + // between rows. + def slowRegionSourceOpDesc(numTuple: Int, delaySeconds: Double): PythonUDFSourceOpDescV2 = { + val udf = new PythonUDFSourceOpDescV2() + udf.workers = 1 + udf.columns = List(new Attribute("Region", AttributeType.STRING)) + udf.code = s""" + |from pytexera import * + |import time + | + |class UDFSourceOperator(UDFSourceOperator): + | @overrides + | def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]: + | for i in range($numTuple): + | time.sleep($delaySeconds) + | yield {'Region': 'Asia'} + |""".stripMargin + udf + } + }