Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,17 @@ class RegionExecutionCoordinator(
private def executeNonDependeePortPhase(): Future[Unit] = {
setPhase(ExecutingNonDependeePortsPhase)
// Allocate output port storage objects
region.resourceConfig.get.portConfigs
.collect {
case (id, cfg: OutputPortConfig) => id -> cfg
}
.foreach {
case (pid, cfg) =>
createOutputPortStorageObjects(Map(pid -> cfg))
}
val outputPortConfigs = region.resourceConfig.get.portConfigs.collect {
case (id, cfg: OutputPortConfig) => id -> cfg
}
// TODO: remove once `operator_port_executions` is reliably populated for compare-versions runs.
logger.info(
s"[compare-debug] region=${region.id.id} outputPortConfigs.size=${outputPortConfigs.size} isRestart=$isRestart"
)
outputPortConfigs.foreach {
case (pid, cfg) =>
createOutputPortStorageObjects(Map(pid -> cfg))
}

val ops = region.getOperators.filter(_.dependeeInputs.isEmpty)

Expand Down Expand Up @@ -576,10 +579,28 @@ class RegionExecutionCoordinator(
schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing"))
DocumentFactory.createDocument(storageUriToAdd, schema)
if (!isRestart) {
WorkflowExecutionsResource.insertOperatorPortResultUri(
eid = eid,
globalPortId = outputPortId,
uri = storageUriToAdd
// TODO: remove once `operator_port_executions` is reliably populated for compare-versions runs.
logger.info(
s"[compare-debug] inserting operator_port_executions row eid=${eid.id} port=${outputPortId.opId.logicalOpId.id}/${outputPortId.portId.id} uri=$storageUriToAdd"
)
try {
WorkflowExecutionsResource.insertOperatorPortResultUri(
eid = eid,
globalPortId = outputPortId,
uri = storageUriToAdd
)
} catch {
case e: Throwable =>
logger.error(
s"[compare-debug] insertOperatorPortResultUri FAILED eid=${eid.id} port=${outputPortId.opId.logicalOpId.id}/${outputPortId.portId.id}: ${e.getMessage}",
e
)
throw e
}
} else {
// TODO: remove once `operator_port_executions` is reliably populated for compare-versions runs.
logger.info(
s"[compare-debug] SKIPPING insert (isRestart=true) eid=${eid.id} port=${outputPortId.opId.logicalOpId.id}/${outputPortId.portId.id}"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ import org.apache.texera.dao.SqlServer
import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_EXECUTIONS
import org.apache.texera.web.model.websocket.request.{LogicalPlanPojo, WorkflowExecuteRequest}
import org.apache.texera.workflow.{LogicalLink, WorkflowCompiler}
import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import org.apache.texera.web.resource.dashboard.user.workflow.{
WorkflowExecutionsResource,
WorkflowVersionResource
}
import org.apache.texera.web.service.{ExecutionResultService, WorkflowService}
import org.apache.texera.dao.jooq.generated.tables.daos.WorkflowDao
import com.fasterxml.jackson.core.`type`.TypeReference
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.apache.texera.web.storage.ExecutionStateStore.updateWorkflowState

import java.net.URI
Expand All @@ -70,7 +76,11 @@ case class SyncExecutionRequest(
targetOperatorIds: List[String],
timeoutSeconds: Int,
maxOperatorResultCharLimit: Int,
maxOperatorResultCellCharLimit: Int
maxOperatorResultCellCharLimit: Int,
// When true, do not wipe the previous execution's persisted operator-port results
// before starting this run. Used by the workflow-compare auto-run flow, which needs
// BOTH side's results to coexist in `operator_port_executions`.
preservePreviousResults: Boolean = false
)

case class ConsoleMessageInfo(
Expand Down Expand Up @@ -104,7 +114,12 @@ case class SyncExecutionResult(
state: String,
operators: Map[String, OperatorInfo],
compilationErrors: Option[Map[String, String]],
errors: Option[List[String]]
errors: Option[List[String]],
// The eid of the execution this call created. Always present once the execution row
// is inserted (even on failure paths); -1 if we never got that far (e.g. service init
// failed before the DB insert). Used by the workflow-compare feature to navigate to
// the compare view after a fresh run.
eid: Long = -1L
)

sealed trait TerminationReason
Expand Down Expand Up @@ -171,7 +186,8 @@ class SyncExecutionResource extends LazyLogging {
workflowService.initExecutionService(
executeRequest,
Some(user.getUser),
new URI(s"sync-execution://$workflowId")
new URI(s"sync-execution://$workflowId"),
preservePreviousResults = request.preservePreviousResults
)

val executionService = workflowService.executionService.getValue
Expand Down Expand Up @@ -259,7 +275,8 @@ class SyncExecutionResource extends LazyLogging {
state = "Killed",
operators = Map.empty,
compilationErrors = None,
errors = Some(List(s"Timeout after $timeoutSeconds seconds"))
errors = Some(List(s"Timeout after $timeoutSeconds seconds")),
eid = executionService.workflowContext.executionId.id
)
case e: Exception =>
logger.error(s"Error waiting for execution: ${e.getMessage}", e)
Expand All @@ -268,7 +285,8 @@ class SyncExecutionResource extends LazyLogging {
state = "Error",
operators = Map.empty,
compilationErrors = None,
errors = Some(List(e.getMessage))
errors = Some(List(e.getMessage)),
eid = executionService.workflowContext.executionId.id
)
}
}
Expand Down Expand Up @@ -333,7 +351,8 @@ class SyncExecutionResource extends LazyLogging {
state = stateString,
operators = operatorInfos,
compilationErrors = None,
errors = if (fatalErrors.nonEmpty) Some(fatalErrors) else None
errors = if (fatalErrors.nonEmpty) Some(fatalErrors) else None,
eid = executionId.id
)

} catch {
Expand All @@ -343,6 +362,188 @@ class SyncExecutionResource extends LazyLogging {
}
}

/**
* Run a historical workflow version end-to-end and return the new eid.
*
* Used by the workflow-compare feature: when the user selects two versions to compare
* and a version has no completed execution, this endpoint reconstructs that version's
* workflow content (via the same delta-replay logic the version-history UI uses) and
* runs it synchronously on the given computing unit.
*
* Inline result payload is suppressed (caller doesn't need it — they'll fetch via the
* per-operator paginated result endpoint), but the eid in the response is the link to
* the materialized results.
*/
@POST
@Path("/{wid}/{cuid}/run-version/{vid}")
@RolesAllowed(Array("REGULAR", "ADMIN"))
def executeWorkflowVersionSync(
@PathParam("wid") workflowId: Long,
@PathParam("cuid") computingUnitId: Int,
@PathParam("vid") versionId: Int,
@Auth user: SessionUser
): SyncExecutionResult = {
try {
// Reconstruct the workflow at the requested vid via the same delta-replay used by
// WorkflowVersionResource.retrieveWorkflowVersion. Operates on a fresh Workflow
// fetched directly so we don't mutate any cached state.
val ctx = SqlServer.getInstance().createDSLContext()
val workflowDao = new WorkflowDao(ctx.configuration())
val currentWorkflow = workflowDao.fetchOneByWid(workflowId.toInt)
if (currentWorkflow == null) {
return SyncExecutionResult(
success = false,
state = "Error",
operators = Map.empty,
compilationErrors = None,
errors = Some(List(s"Workflow $workflowId not found"))
)
}
val versionEntries =
WorkflowVersionResource.fetchSubsequentVersions(workflowId.toInt, versionId, ctx)
val historicalWorkflow = WorkflowVersionResource.applyPatch(versionEntries, currentWorkflow)

val content = historicalWorkflow.getContent
if (content == null || content.isEmpty) {
return SyncExecutionResult(
success = false,
state = "Error",
operators = Map.empty,
compilationErrors = None,
errors = Some(List(s"Version $versionId of workflow $workflowId has empty content"))
)
}

val rootNode = objectMapper.readTree(content)

// The saved workflow JSON keeps operator properties under a nested "operatorProperties"
// key and uses link shape { source: {operatorID, portID}, target: {operatorID, portID} }
// with port IDs like "input-0" / "output-0". LogicalOp subclasses expect properties at
// the top level, and LogicalLink expects { fromOpId, fromPortId: {id, internal},
// toOpId, toPortId: {id, internal} }. These transforms mirror what
// ExecuteWorkflowService.getLogicalPlanRequest does on the frontend.
val flatOperatorsNode = flattenSavedOperators(rootNode.path("operators"))
val convertedLinksNode = convertSavedLinks(rootNode.path("links"))

val operators: List[LogicalOp] = objectMapper
.readValue(flatOperatorsNode.toString, new TypeReference[java.util.List[LogicalOp]] {})
.asScala
.toList
val links: List[LogicalLink] = objectMapper
.readValue(convertedLinksNode.toString, new TypeReference[java.util.List[LogicalLink]] {})
.asScala
.toList

if (operators.isEmpty) {
return SyncExecutionResult(
success = false,
state = "Error",
operators = Map.empty,
compilationErrors = None,
errors = Some(List(s"Version $versionId of workflow $workflowId has no operators"))
)
}

val logicalPlan = LogicalPlanPojo(
operators = operators,
links = links,
// Mark every operator as "view result" so the engine materializes their outputs.
// The compare endpoint then has port URIs for every operator on each side.
opsToViewResult = operators.map(_.operatorIdentifier.id),
opsToReuseResult = List.empty
)

val request = SyncExecutionRequest(
executionName = s"Compare auto-run (vid=$versionId)",
logicalPlan = logicalPlan,
workflowSettings = None,
targetOperatorIds = List.empty, // run the whole plan, not a sub-DAG
timeoutSeconds = 600,
maxOperatorResultCharLimit = 0, // suppress inline result payload
maxOperatorResultCellCharLimit = 0,
// Don't wipe the previous execution's port results — the compare flow needs both
// sides' rows to coexist in `operator_port_executions`.
preservePreviousResults = true
)

executeWorkflowSync(workflowId, computingUnitId, request, user)
} catch {
case e: Exception =>
logger.error(s"Compare auto-run failed for wid=$workflowId vid=$versionId: ${e.getMessage}", e)
SyncExecutionResult(
success = false,
state = "Error",
operators = Map.empty,
compilationErrors = None,
errors = Some(List(s"Failed to run version $versionId: ${e.getMessage}"))
)
}
}

/**
* Transform the operators array from saved-workflow JSON into the LogicalPlanPojo shape.
*
* Saved: { operatorID, operatorType, operatorVersion, operatorProperties: {...}, inputPorts, outputPorts, ...UI-only... }
* Wanted: { operatorID, operatorType, operatorVersion, ...properties at top level..., inputPorts, outputPorts }
*/
private def flattenSavedOperators(
operatorsNode: com.fasterxml.jackson.databind.JsonNode
): com.fasterxml.jackson.databind.node.ArrayNode = {
val uiOnlyFields = Seq(
"showAdvanced",
"isDisabled",
"customDisplayName",
"dynamicInputPorts",
"dynamicOutputPorts"
)
val out = objectMapper.createArrayNode()
operatorsNode.elements().asScala.foreach { op =>
val flat = op.deepCopy().asInstanceOf[com.fasterxml.jackson.databind.node.ObjectNode]
val props = flat.remove("operatorProperties")
if (props != null && props.isObject) {
props.fields().asScala.foreach { entry =>
// Only copy keys that aren't already set on the operator (top-level keys win).
if (!flat.has(entry.getKey)) flat.set[com.fasterxml.jackson.databind.JsonNode](entry.getKey, entry.getValue)
}
}
uiOnlyFields.foreach(flat.remove)
out.add(flat)
}
out
}

/**
* Transform the links array from saved-workflow JSON into LogicalLink shape.
*
* Saved: { linkID, source: {operatorID, portID:"output-0"}, target: {operatorID, portID:"input-0"} }
* Wanted: { fromOpId, fromPortId: {id, internal:false}, toOpId, toPortId: {id, internal:false} }
*/
private def convertSavedLinks(
linksNode: com.fasterxml.jackson.databind.JsonNode
): com.fasterxml.jackson.databind.node.ArrayNode = {
val out = objectMapper.createArrayNode()
linksNode.elements().asScala.foreach { link =>
val converted = objectMapper.createObjectNode()
val source = link.path("source")
val target = link.path("target")
converted.put("fromOpId", source.path("operatorID").asText())
converted.put("toOpId", target.path("operatorID").asText())
// portID is "input-N" / "output-N"; take the trailing integer.
val sourceId = source.path("portID").asText().split("-").lastOption.flatMap(s => scala.util.Try(s.toInt).toOption).getOrElse(0)
val targetId = target.path("portID").asText().split("-").lastOption.flatMap(s => scala.util.Try(s.toInt).toOption).getOrElse(0)
converted.set[com.fasterxml.jackson.databind.JsonNode](
"fromPortId",
objectMapper.createObjectNode().put("id", sourceId).put("internal", false)
)
converted.set[com.fasterxml.jackson.databind.JsonNode](
"toPortId",
objectMapper.createObjectNode().put("id", targetId).put("internal", false)
)
out.add(converted)
}
out
}

private def shutdownPreviousExecution(workflowService: WorkflowService): Unit = {
try {
val previousEs = workflowService.executionService.getValue
Expand Down
Loading
Loading