Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -105,26 +105,31 @@ class ReconfigurationIntegrationSpec
}

/**
* Run a trivial pure-Scala workflow (TextInput → terminal) once before the
* timed tests start, so the first 5-second `startWorkflow` await in
* [[TestUtils.shouldReconfigure]] doesn't have to absorb JVM JIT
* warmup, pekko dispatcher first-touch, and `RegionExecutionCoordinator`
* class loading.
*
* Hard-capped at 10 seconds total, defensively wrapped: if warmup itself
* times out or throws, log and continue — the existing `Retries` mixin
* still backs up individual test cases. This ensures warmup can never
* hang the suite.
* Runs a TextInput -> Python UDF workflow once before the timed tests so
* Python worker cold-start is paid here, not inside a timed test. Capped and
* wrapped so warmup can never fail or hang the suite.
*/
private def warmupOnce(): Unit = {
val warmupCap = Duration.fromSeconds(10)
val warmupCap = Duration.fromSeconds(60)
setUpWorkflowExecutionData(specId)
var client: AmberClient = null
try {
val src = new TextInputSourceOpDesc()
src.textInput = "warmup"
val udf = TestOperators.pythonOpDesc()
val warmupCtx = TestUtils.workflowContext(specId)
val workflow = buildWorkflow(List(src), List.empty, warmupCtx)
val workflow = buildWorkflow(
List(src, udf),
List(
LogicalLink(
src.operatorIdentifier,
PortIdentity(),
udf.operatorIdentifier,
PortIdentity()
)
),
warmupCtx
)
client = new AmberClient(
system,
workflow.context,
Expand Down Expand Up @@ -166,8 +171,13 @@ class ReconfigurationIntegrationSpec
): Map[OperatorIdentity, List[Tuple]] =
TestUtils.shouldReconfigure(system, ctx, operators, links, targetOps, newOpExecInitInfo)

// Small source that emits slowly (30 rows, 0.25s apart) so a pause lands
// mid-run and the workflow still completes quickly after resume.
private def slowSource() =
TestOperators.slowRegionSourceOpDesc(numTuple = 30, delaySeconds = 0.25)

"Engine" should "be able to modify a python UDF worker in workflow" in {
val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
val sourceOpDesc = slowSource()
val udfOpDesc = TestOperators.pythonOpDesc()
val code = """
|from pytexera import *
Expand Down Expand Up @@ -198,15 +208,15 @@ class ReconfigurationIntegrationSpec
}

"Engine" should "propagate reconfiguration through a source operator in workflow" in {
val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000)
val sourceOpDesc = slowSource()
val udfOpDesc = TestOperators.pythonOpDesc()
val code = """
|from pytexera import *
|
|class ProcessTupleOperator(UDFOperatorV2):
| @overrides
| def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
| tuple_['field_1'] = tuple_['field_1'] + '_reconfigured'
| tuple_['Region'] = tuple_['Region'] + '_reconfigured'
| yield tuple_
|""".stripMargin
val result = shouldReconfigure(
Expand All @@ -223,12 +233,12 @@ class ReconfigurationIntegrationSpec
OpExecWithCode(code, "python")
)
assert(result(udfOpDesc.operatorIdentifier).exists { t =>
t.getField("field_1").asInstanceOf[String].contains("_reconfigured")
t.getField("Region").asInstanceOf[String].contains("_reconfigured")
})
}

"Engine" should "be able to modify two python UDFs in workflow" in {
val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
val sourceOpDesc = slowSource()
val udfOpDesc1 = TestOperators.pythonOpDesc()
val udfOpDesc2 = TestOperators.pythonOpDesc()
val code = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ object TestUtils {
ControllerConfig.default,
error => {}
)
// Timeout for control-command acks (start/pause/reconfigure/resume).
val commandTimeout = Duration.fromSeconds(30)
val completion = Promise[Unit]()
var result: Map[OperatorIdentity, List[Tuple]] = null
client.registerCallback[ExecutionStateUpdate](evt => {
Expand All @@ -311,14 +313,14 @@ object TestUtils {
})
Await.result(
client.controllerInterface.startWorkflow(EmptyRequest(), ()),
Duration.fromSeconds(5)
commandTimeout
)
val pausedReached = stateReached(client, PAUSED)
Await.result(
client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
Duration.fromSeconds(5)
commandTimeout
)
Await.result(pausedReached, Duration.fromSeconds(10))
Await.result(pausedReached, commandTimeout)
val physicalOps = targetOps.flatMap(op =>
workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
)
Expand All @@ -330,11 +332,11 @@ object TestUtils {
),
()
),
Duration.fromSeconds(5)
commandTimeout
)
Await.result(
client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
Duration.fromSeconds(5)
commandTimeout
)
Await.result(completion, Duration.fromMinutes(1))
result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,24 @@ object TestOperators {
udf
}

// Emits `numTuple` rows with a "Region" column, sleeping `delaySeconds`
// between rows.
def slowRegionSourceOpDesc(numTuple: Int, delaySeconds: Double): PythonUDFSourceOpDescV2 = {
val udf = new PythonUDFSourceOpDescV2()
udf.workers = 1
udf.columns = List(new Attribute("Region", AttributeType.STRING))
udf.code = s"""
|from pytexera import *
|import time
|
|class UDFSourceOperator(UDFSourceOperator):
| @overrides
| def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
| for i in range($numTuple):
| time.sleep($delaySeconds)
| yield {'Region': 'Asia'}
|""".stripMargin
udf
}

}
Loading