diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 2971e4c4f4e..f991050dfad 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -285,14 +285,17 @@ class RegionExecutionCoordinator( private def executeNonDependeePortPhase(): Future[Unit] = { setPhase(ExecutingNonDependeePortsPhase) // Allocate output port storage objects - region.resourceConfig.get.portConfigs - .collect { - case (id, cfg: OutputPortConfig) => id -> cfg - } - .foreach { - case (pid, cfg) => - createOutputPortStorageObjects(Map(pid -> cfg)) - } + val outputPortConfigs = region.resourceConfig.get.portConfigs.collect { + case (id, cfg: OutputPortConfig) => id -> cfg + } + // TODO: remove once `operator_port_executions` is reliably populated for compare-versions runs. + logger.info( + s"[compare-debug] region=${region.id.id} outputPortConfigs.size=${outputPortConfigs.size} isRestart=$isRestart" + ) + outputPortConfigs.foreach { + case (pid, cfg) => + createOutputPortStorageObjects(Map(pid -> cfg)) + } val ops = region.getOperators.filter(_.dependeeInputs.isEmpty) @@ -576,10 +579,28 @@ class RegionExecutionCoordinator( schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) DocumentFactory.createDocument(storageUriToAdd, schema) if (!isRestart) { - WorkflowExecutionsResource.insertOperatorPortResultUri( - eid = eid, - globalPortId = outputPortId, - uri = storageUriToAdd + // TODO: remove once `operator_port_executions` is reliably populated for compare-versions runs. + logger.info( + s"[compare-debug] inserting operator_port_executions row eid=${eid.id} port=${outputPortId.opId.logicalOpId.id}/${outputPortId.portId.id} uri=$storageUriToAdd" + ) + try { + WorkflowExecutionsResource.insertOperatorPortResultUri( + eid = eid, + globalPortId = outputPortId, + uri = storageUriToAdd + ) + } catch { + case e: Throwable => + logger.error( + s"[compare-debug] insertOperatorPortResultUri FAILED eid=${eid.id} port=${outputPortId.opId.logicalOpId.id}/${outputPortId.portId.id}: ${e.getMessage}", + e + ) + throw e + } + } else { + // TODO: remove once `operator_port_executions` is reliably populated for compare-versions runs. + logger.info( + s"[compare-debug] SKIPPING insert (isRestart=true) eid=${eid.id} port=${outputPortId.opId.logicalOpId.id}/${outputPortId.portId.id}" ) } } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala index d3047db5802..fe54d154f62 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala @@ -50,8 +50,14 @@ import org.apache.texera.dao.SqlServer import org.apache.texera.dao.jooq.generated.Tables.OPERATOR_EXECUTIONS import org.apache.texera.web.model.websocket.request.{LogicalPlanPojo, WorkflowExecuteRequest} import org.apache.texera.workflow.{LogicalLink, WorkflowCompiler} -import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource +import org.apache.texera.web.resource.dashboard.user.workflow.{ + WorkflowExecutionsResource, + WorkflowVersionResource +} import org.apache.texera.web.service.{ExecutionResultService, WorkflowService} +import org.apache.texera.dao.jooq.generated.tables.daos.WorkflowDao +import com.fasterxml.jackson.core.`type`.TypeReference +import org.apache.texera.amber.util.JSONUtils.objectMapper import org.apache.texera.web.storage.ExecutionStateStore.updateWorkflowState import java.net.URI @@ -70,7 +76,11 @@ case class SyncExecutionRequest( targetOperatorIds: List[String], timeoutSeconds: Int, maxOperatorResultCharLimit: Int, - maxOperatorResultCellCharLimit: Int + maxOperatorResultCellCharLimit: Int, + // When true, do not wipe the previous execution's persisted operator-port results + // before starting this run. Used by the workflow-compare auto-run flow, which needs + // BOTH side's results to coexist in `operator_port_executions`. + preservePreviousResults: Boolean = false ) case class ConsoleMessageInfo( @@ -104,7 +114,12 @@ case class SyncExecutionResult( state: String, operators: Map[String, OperatorInfo], compilationErrors: Option[Map[String, String]], - errors: Option[List[String]] + errors: Option[List[String]], + // The eid of the execution this call created. Always present once the execution row + // is inserted (even on failure paths); -1 if we never got that far (e.g. service init + // failed before the DB insert). Used by the workflow-compare feature to navigate to + // the compare view after a fresh run. + eid: Long = -1L ) sealed trait TerminationReason @@ -171,7 +186,8 @@ class SyncExecutionResource extends LazyLogging { workflowService.initExecutionService( executeRequest, Some(user.getUser), - new URI(s"sync-execution://$workflowId") + new URI(s"sync-execution://$workflowId"), + preservePreviousResults = request.preservePreviousResults ) val executionService = workflowService.executionService.getValue @@ -259,7 +275,8 @@ class SyncExecutionResource extends LazyLogging { state = "Killed", operators = Map.empty, compilationErrors = None, - errors = Some(List(s"Timeout after $timeoutSeconds seconds")) + errors = Some(List(s"Timeout after $timeoutSeconds seconds")), + eid = executionService.workflowContext.executionId.id ) case e: Exception => logger.error(s"Error waiting for execution: ${e.getMessage}", e) @@ -268,7 +285,8 @@ class SyncExecutionResource extends LazyLogging { state = "Error", operators = Map.empty, compilationErrors = None, - errors = Some(List(e.getMessage)) + errors = Some(List(e.getMessage)), + eid = executionService.workflowContext.executionId.id ) } } @@ -333,7 +351,8 @@ class SyncExecutionResource extends LazyLogging { state = stateString, operators = operatorInfos, compilationErrors = None, - errors = if (fatalErrors.nonEmpty) Some(fatalErrors) else None + errors = if (fatalErrors.nonEmpty) Some(fatalErrors) else None, + eid = executionId.id ) } catch { @@ -343,6 +362,188 @@ class SyncExecutionResource extends LazyLogging { } } + /** + * Run a historical workflow version end-to-end and return the new eid. + * + * Used by the workflow-compare feature: when the user selects two versions to compare + * and a version has no completed execution, this endpoint reconstructs that version's + * workflow content (via the same delta-replay logic the version-history UI uses) and + * runs it synchronously on the given computing unit. + * + * Inline result payload is suppressed (caller doesn't need it — they'll fetch via the + * per-operator paginated result endpoint), but the eid in the response is the link to + * the materialized results. + */ + @POST + @Path("/{wid}/{cuid}/run-version/{vid}") + @RolesAllowed(Array("REGULAR", "ADMIN")) + def executeWorkflowVersionSync( + @PathParam("wid") workflowId: Long, + @PathParam("cuid") computingUnitId: Int, + @PathParam("vid") versionId: Int, + @Auth user: SessionUser + ): SyncExecutionResult = { + try { + // Reconstruct the workflow at the requested vid via the same delta-replay used by + // WorkflowVersionResource.retrieveWorkflowVersion. Operates on a fresh Workflow + // fetched directly so we don't mutate any cached state. + val ctx = SqlServer.getInstance().createDSLContext() + val workflowDao = new WorkflowDao(ctx.configuration()) + val currentWorkflow = workflowDao.fetchOneByWid(workflowId.toInt) + if (currentWorkflow == null) { + return SyncExecutionResult( + success = false, + state = "Error", + operators = Map.empty, + compilationErrors = None, + errors = Some(List(s"Workflow $workflowId not found")) + ) + } + val versionEntries = + WorkflowVersionResource.fetchSubsequentVersions(workflowId.toInt, versionId, ctx) + val historicalWorkflow = WorkflowVersionResource.applyPatch(versionEntries, currentWorkflow) + + val content = historicalWorkflow.getContent + if (content == null || content.isEmpty) { + return SyncExecutionResult( + success = false, + state = "Error", + operators = Map.empty, + compilationErrors = None, + errors = Some(List(s"Version $versionId of workflow $workflowId has empty content")) + ) + } + + val rootNode = objectMapper.readTree(content) + + // The saved workflow JSON keeps operator properties under a nested "operatorProperties" + // key and uses link shape { source: {operatorID, portID}, target: {operatorID, portID} } + // with port IDs like "input-0" / "output-0". LogicalOp subclasses expect properties at + // the top level, and LogicalLink expects { fromOpId, fromPortId: {id, internal}, + // toOpId, toPortId: {id, internal} }. These transforms mirror what + // ExecuteWorkflowService.getLogicalPlanRequest does on the frontend. + val flatOperatorsNode = flattenSavedOperators(rootNode.path("operators")) + val convertedLinksNode = convertSavedLinks(rootNode.path("links")) + + val operators: List[LogicalOp] = objectMapper + .readValue(flatOperatorsNode.toString, new TypeReference[java.util.List[LogicalOp]] {}) + .asScala + .toList + val links: List[LogicalLink] = objectMapper + .readValue(convertedLinksNode.toString, new TypeReference[java.util.List[LogicalLink]] {}) + .asScala + .toList + + if (operators.isEmpty) { + return SyncExecutionResult( + success = false, + state = "Error", + operators = Map.empty, + compilationErrors = None, + errors = Some(List(s"Version $versionId of workflow $workflowId has no operators")) + ) + } + + val logicalPlan = LogicalPlanPojo( + operators = operators, + links = links, + // Mark every operator as "view result" so the engine materializes their outputs. + // The compare endpoint then has port URIs for every operator on each side. + opsToViewResult = operators.map(_.operatorIdentifier.id), + opsToReuseResult = List.empty + ) + + val request = SyncExecutionRequest( + executionName = s"Compare auto-run (vid=$versionId)", + logicalPlan = logicalPlan, + workflowSettings = None, + targetOperatorIds = List.empty, // run the whole plan, not a sub-DAG + timeoutSeconds = 600, + maxOperatorResultCharLimit = 0, // suppress inline result payload + maxOperatorResultCellCharLimit = 0, + // Don't wipe the previous execution's port results — the compare flow needs both + // sides' rows to coexist in `operator_port_executions`. + preservePreviousResults = true + ) + + executeWorkflowSync(workflowId, computingUnitId, request, user) + } catch { + case e: Exception => + logger.error(s"Compare auto-run failed for wid=$workflowId vid=$versionId: ${e.getMessage}", e) + SyncExecutionResult( + success = false, + state = "Error", + operators = Map.empty, + compilationErrors = None, + errors = Some(List(s"Failed to run version $versionId: ${e.getMessage}")) + ) + } + } + + /** + * Transform the operators array from saved-workflow JSON into the LogicalPlanPojo shape. + * + * Saved: { operatorID, operatorType, operatorVersion, operatorProperties: {...}, inputPorts, outputPorts, ...UI-only... } + * Wanted: { operatorID, operatorType, operatorVersion, ...properties at top level..., inputPorts, outputPorts } + */ + private def flattenSavedOperators( + operatorsNode: com.fasterxml.jackson.databind.JsonNode + ): com.fasterxml.jackson.databind.node.ArrayNode = { + val uiOnlyFields = Seq( + "showAdvanced", + "isDisabled", + "customDisplayName", + "dynamicInputPorts", + "dynamicOutputPorts" + ) + val out = objectMapper.createArrayNode() + operatorsNode.elements().asScala.foreach { op => + val flat = op.deepCopy().asInstanceOf[com.fasterxml.jackson.databind.node.ObjectNode] + val props = flat.remove("operatorProperties") + if (props != null && props.isObject) { + props.fields().asScala.foreach { entry => + // Only copy keys that aren't already set on the operator (top-level keys win). + if (!flat.has(entry.getKey)) flat.set[com.fasterxml.jackson.databind.JsonNode](entry.getKey, entry.getValue) + } + } + uiOnlyFields.foreach(flat.remove) + out.add(flat) + } + out + } + + /** + * Transform the links array from saved-workflow JSON into LogicalLink shape. + * + * Saved: { linkID, source: {operatorID, portID:"output-0"}, target: {operatorID, portID:"input-0"} } + * Wanted: { fromOpId, fromPortId: {id, internal:false}, toOpId, toPortId: {id, internal:false} } + */ + private def convertSavedLinks( + linksNode: com.fasterxml.jackson.databind.JsonNode + ): com.fasterxml.jackson.databind.node.ArrayNode = { + val out = objectMapper.createArrayNode() + linksNode.elements().asScala.foreach { link => + val converted = objectMapper.createObjectNode() + val source = link.path("source") + val target = link.path("target") + converted.put("fromOpId", source.path("operatorID").asText()) + converted.put("toOpId", target.path("operatorID").asText()) + // portID is "input-N" / "output-N"; take the trailing integer. + val sourceId = source.path("portID").asText().split("-").lastOption.flatMap(s => scala.util.Try(s.toInt).toOption).getOrElse(0) + val targetId = target.path("portID").asText().split("-").lastOption.flatMap(s => scala.util.Try(s.toInt).toOption).getOrElse(0) + converted.set[com.fasterxml.jackson.databind.JsonNode]( + "fromPortId", + objectMapper.createObjectNode().put("id", sourceId).put("internal", false) + ) + converted.set[com.fasterxml.jackson.databind.JsonNode]( + "toPortId", + objectMapper.createObjectNode().put("id", targetId).put("internal", false) + ) + out.add(converted) + } + out + } + private def shutdownPreviousExecution(workflowService: WorkflowService): Unit = { try { val previousEs = workflowService.executionService.getValue diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 72fb1c364e5..3ee38c7bfd4 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -33,6 +33,7 @@ import org.apache.texera.amber.engine.architecture.logreplay.{ReplayDestination, import org.apache.texera.amber.engine.common.Utils.{maptoStatusCode, stringToAggregatedState} import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps import org.apache.texera.auth.{JwtParser, SessionUser} import org.apache.texera.dao.SqlServer @@ -45,6 +46,7 @@ import org.apache.texera.web.model.http.request.result.ResultExportRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._ import org.apache.texera.web.service.{ExecutionsMetadataPersistService, ResultExportService} import org.jooq.DSLContext +import org.jooq.impl.DSL import play.api.libs.json.Json import java.net.URI @@ -331,6 +333,20 @@ object WorkflowExecutionsResource { ) } + // True when at least one operator-port row was persisted for this eid. The compare + // flow filters on this so it can skip executions left over from older backend builds + // that completed but never wrote to operator_port_executions. + val hasResultsField = DSL + .field( + DSL.exists( + DSL + .selectOne() + .from(OPERATOR_PORT_EXECUTIONS) + .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(WORKFLOW_EXECUTIONS.EID)) + ) + ) + .as("has_results") + context .select( WORKFLOW_EXECUTIONS.EID, @@ -344,7 +360,8 @@ object WorkflowExecutionsResource { WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME, WORKFLOW_EXECUTIONS.BOOKMARKED, WORKFLOW_EXECUTIONS.NAME, - WORKFLOW_EXECUTIONS.LOG_LOCATION + WORKFLOW_EXECUTIONS.LOG_LOCATION, + hasResultsField ) .from(WORKFLOW_EXECUTIONS) .join(WORKFLOW_VERSION) @@ -509,6 +526,105 @@ object WorkflowExecutionsResource { urisOfEid.find(isMatchingExternalPortURI) } + /** + * Structural comparison of two executions' external output ports. + * + * Each entry is at (logicalOperatorId, externalOutputPortId) granularity. Status is + * `shared` when both executions produced a result for that key, `onlyInA` / `onlyInB` + * otherwise. Internal ports and input ports are excluded — only external output ports + * contribute, since those are the operator outputs surfaced to the user. + */ + def compareOperatorPortStructure( + eidA: ExecutionIdentity, + eidB: ExecutionIdentity + ): List[OperatorPortComparisonEntry] = { + def loadEntries(eid: ExecutionIdentity): Map[(String, Int), String] = { + context + .select(OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID, OPERATOR_PORT_EXECUTIONS.RESULT_URI) + .from(OPERATOR_PORT_EXECUTIONS) + .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) + .fetch() + .asScala + .iterator + .flatMap { record => + val serialized = record.value1() + val uri = record.value2() + if (serialized == null || uri == null || uri.isEmpty) None + else { + scala.util + .Try(GlobalPortIdentitySerde.deserializeFromString(serialized)) + .toOption + .filter(gpi => !gpi.input && !gpi.portId.internal) + .map(gpi => (gpi.opId.logicalOpId.id, gpi.portId.id) -> uri) + } + } + .toMap + } + + val mapA = loadEntries(eidA) + val mapB = loadEntries(eidB) + val allKeys = (mapA.keySet ++ mapB.keySet).toList.sortBy { case (op, port) => (op, port) } + + allKeys.map { + case key @ (opId, portId) => + val a = mapA.get(key) + val b = mapB.get(key) + val status = (a.isDefined, b.isDefined) match { + case (true, true) => "shared" + case (true, false) => "onlyInA" + case (false, true) => "onlyInB" + case _ => "missing" + } + OperatorPortComparisonEntry( + operatorId = opId, + portId = portId, + status = status, + resultUriA = a, + resultUriB = b + ) + } + } + + case class OperatorPortComparisonEntry( + operatorId: String, + portId: Int, + status: String, // "shared" | "onlyInA" | "onlyInB" + resultUriA: Option[String], + resultUriB: Option[String] + ) + + case class AttributeMeta(name: String, typeName: String) + + case class OperatorPortCompareResult( + operatorId: String, + portId: Int, + status: String, + rowCountA: Option[Long], + rowCountB: Option[Long], + schemaA: List[AttributeMeta], + schemaB: List[AttributeMeta], + schemaMatches: Boolean + ) + + case class WorkflowExecutionCompareSummary( + wid: Integer, + eidA: Integer, + eidB: Integer, + // vid each eid was executed against. The frontend uses these to load the + // historical workflow content for each side's DAG canvas. + vidA: Integer, + vidB: Integer, + operators: List[OperatorPortCompareResult] + ) + + case class ExecutionOperatorResultPage( + schema: List[AttributeMeta], + rows: List[com.fasterxml.jackson.databind.node.ObjectNode], + totalRowCount: Long, + pageIndex: Int, + pageSize: Int + ) + case class WorkflowExecutionEntry( eId: Integer, vId: Integer, @@ -521,7 +637,12 @@ object WorkflowExecutionsResource { completionTime: Timestamp, bookmarked: Boolean, name: String, - logLocation: String + logLocation: String, + // True when this execution has at least one row in `operator_port_executions` + // i.e. produced per-operator output that can be paginated / compared. The compare + // flow uses this to skip "completed but empty" executions left over from older + // backend builds that didn't reliably persist port results. + hasResults: Boolean ) case class WorkflowRuntimeStatistics( @@ -675,6 +796,210 @@ class WorkflowExecutionsResource { } } + /** + * Compare per-operator-port results between two executions of the same workflow. + * + * For each external output port that exists in either execution, returns the operator + * id, port id, status (`shared` / `onlyInA` / `onlyInB`), row counts, schemas, and a + * schemaMatches flag. Both executions must belong to the workflow `wid`. + */ + @GET + @Produces(Array(MediaType.APPLICATION_JSON)) + @Path("/{wid}/{eidA}/compare/{eidB}") + @RolesAllowed(Array("REGULAR", "ADMIN")) + def compareTwoExecutions( + @PathParam("wid") wid: Integer, + @PathParam("eidA") eidA: Integer, + @PathParam("eidB") eidB: Integer, + @Auth sessionUser: SessionUser + ): WorkflowExecutionCompareSummary = { + validateUserCanAccessWorkflow(sessionUser.getUser.getUid, wid) + + // Validate that both executions belong to this workflow. + val belongs = context + .select(WORKFLOW_EXECUTIONS.EID) + .from(WORKFLOW_EXECUTIONS) + .join(WORKFLOW_VERSION) + .on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID)) + .where( + WORKFLOW_VERSION.WID + .eq(wid) + .and(WORKFLOW_EXECUTIONS.EID.in(eidA, eidB)) + ) + .fetchInto(classOf[Integer]) + .asScala + .toSet + if (!belongs.contains(eidA) || !belongs.contains(eidB)) { + throw new BadRequestException(s"Executions $eidA / $eidB do not both belong to workflow $wid") + } + + val structure = compareOperatorPortStructure( + ExecutionIdentity(eidA.longValue()), + ExecutionIdentity(eidB.longValue()) + ) + + def describe(uri: Option[String]): (Option[Long], List[AttributeMeta]) = { + uri match { + case None => (None, List.empty) + case Some(u) => + try { + val (doc, schemaOption) = DocumentFactory.openDocument(URI.create(u)) + val attrs = schemaOption + .map(_.getAttributes.map(a => AttributeMeta(a.getName, a.getType.name()))) + .getOrElse(List.empty) + (Some(doc.getCount), attrs) + } catch { + case _: Throwable => (None, List.empty) + } + } + } + + val enriched = structure.map { entry => + val (countA, schemaA) = describe(entry.resultUriA) + val (countB, schemaB) = describe(entry.resultUriB) + OperatorPortCompareResult( + operatorId = entry.operatorId, + portId = entry.portId, + status = entry.status, + rowCountA = countA, + rowCountB = countB, + schemaA = schemaA, + schemaB = schemaB, + schemaMatches = entry.status == "shared" && schemaA == schemaB + ) + } + + // Order operators by their position in the workflow content (which preserves the + // user's creation / DAG order), so the left rail reads source → sink instead of + // alphabetical-by-id. Build a union order: A's operators first in A's order, then + // any operators that only existed in B (appended in B's order). + val operatorOrder = buildOperatorOrder(wid, eidA, eidB) + val rank: String => Int = opId => operatorOrder.indexOf(opId) match { + case -1 => Int.MaxValue // unknown operators float to the bottom + case n => n + } + val sorted = enriched.sortBy(e => (rank(e.operatorId), e.operatorId, e.portId)) + + val vidA = vidForExecution(eidA).getOrElse(Integer.valueOf(-1)) + val vidB = vidForExecution(eidB).getOrElse(Integer.valueOf(-1)) + WorkflowExecutionCompareSummary(wid, eidA, eidB, vidA, vidB, sorted) + } + + private def vidForExecution(eid: Integer): Option[Integer] = { + val opt = context + .select(WORKFLOW_EXECUTIONS.VID) + .from(WORKFLOW_EXECUTIONS) + .where(WORKFLOW_EXECUTIONS.EID.eq(eid)) + .fetchOptionalInto(classOf[Integer]) + if (opt.isPresent) Some(opt.get()) else None + } + + /** Return the ordered list of operator IDs across both executions' workflow versions. */ + private def buildOperatorOrder(wid: Integer, eidA: Integer, eidB: Integer): List[String] = { + def operatorIdsForExecution(eid: Integer): List[String] = { + try { + val vidOpt = context + .select(WORKFLOW_EXECUTIONS.VID) + .from(WORKFLOW_EXECUTIONS) + .where(WORKFLOW_EXECUTIONS.EID.eq(eid)) + .fetchOptionalInto(classOf[Integer]) + if (!vidOpt.isPresent) return List.empty + val workflowDao = + new org.apache.texera.dao.jooq.generated.tables.daos.WorkflowDao(context.configuration()) + val workflow = workflowDao.fetchOneByWid(wid) + if (workflow == null) return List.empty + val versions = WorkflowVersionResource.fetchSubsequentVersions(wid, vidOpt.get(), context) + val historical = WorkflowVersionResource.applyPatch(versions, workflow) + val content = historical.getContent + if (content == null || content.isEmpty) return List.empty + objectMapper + .readTree(content) + .path("operators") + .elements() + .asScala + .map(_.path("operatorID").asText("")) + .filter(_.nonEmpty) + .toList + } catch { + case _: Throwable => List.empty + } + } + + val orderA = operatorIdsForExecution(eidA) + val orderB = operatorIdsForExecution(eidB) + val seen = scala.collection.mutable.LinkedHashSet[String]() + (orderA ++ orderB).foreach(seen.add) + seen.toList + } + + /** + * Fetch a paginated page of rows from a specific operator's external output port for a + * past execution. Used by the workflow-compare feature to render results from arbitrary + * historical executions without an active websocket session. + */ + @GET + @Produces(Array(MediaType.APPLICATION_JSON)) + @Path("/{wid}/{eid}/result/{opId}/{portId}") + @RolesAllowed(Array("REGULAR", "ADMIN")) + def retrieveExecutionResultPage( + @PathParam("wid") wid: Integer, + @PathParam("eid") eid: Integer, + @PathParam("opId") opId: String, + @PathParam("portId") portId: Integer, + @QueryParam("page") page: Integer, + @QueryParam("pageSize") pageSize: Integer, + @Auth sessionUser: SessionUser + ): ExecutionOperatorResultPage = { + validateUserCanAccessWorkflow(sessionUser.getUser.getUid, wid) + + val pageIndex = Option(page).map(_.toInt).getOrElse(0).max(0) + val effectivePageSize = Option(pageSize).map(_.toInt).getOrElse(10).max(1).min(500) + + val executionBelongs = context + .select(WORKFLOW_EXECUTIONS.EID) + .from(WORKFLOW_EXECUTIONS) + .join(WORKFLOW_VERSION) + .on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID)) + .where(WORKFLOW_VERSION.WID.eq(wid).and(WORKFLOW_EXECUTIONS.EID.eq(eid))) + .fetchOneInto(classOf[Integer]) + if (executionBelongs == null) { + throw new BadRequestException(s"Execution $eid does not belong to workflow $wid") + } + + val resultUriOpt = getResultUriByLogicalPortId( + ExecutionIdentity(eid.longValue()), + OperatorIdentity(opId), + PortIdentity(id = portId.toInt, internal = false) + ) + val uri = resultUriOpt.getOrElse { + throw new NotFoundException(s"No result URI for op=$opId port=$portId in execution $eid") + } + + val (document, schemaOption) = DocumentFactory.openDocument(uri) + val virtualDocument = document.asInstanceOf[ + org.apache.texera.amber.core.storage.model.VirtualDocument[Tuple] + ] + val totalCount = virtualDocument.getCount + val from = pageIndex * effectivePageSize + val until = from + effectivePageSize + val pageTuples = + if (from >= totalCount) Iterable.empty[Tuple] + else virtualDocument.getRange(from, until.toInt, None).to(Iterable) + + val rows = org.apache.texera.web.service.ExecutionResultService.convertTuplesToJson(pageTuples) + val schema = schemaOption + .map(_.getAttributes.map(a => AttributeMeta(a.getName, a.getType.name()))) + .getOrElse(List.empty) + + ExecutionOperatorResultPage( + schema = schema, + rows = rows, + totalRowCount = totalCount, + pageIndex = pageIndex, + pageSize = effectivePageSize + ) + } + @GET @Produces(Array(MediaType.APPLICATION_JSON)) @Path("/{wid}/stats/{eid}") diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowVersionResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowVersionResource.scala index 7be74ae5b00..8fd19bde6e3 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowVersionResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowVersionResource.scala @@ -268,11 +268,16 @@ object WorkflowVersionResource { } /** - * Fetches all versions of a workflow from a specific version ID to the latest version + * Fetches the patches that need to be applied to the current workflow content to + * reconstruct the state at `vid`. Each row in workflow_version stores a reverse-patch + * (JsonDiff.asJson(new, old)) that, when applied to `new`'s content, produces `old`'s + * content. So to reconstruct state at `vid`, we apply patches for all versions STRICTLY + * NEWER than `vid` — applying `vid`'s own patch would over-shoot by one and land us at + * `vid - 1`. * * @param wid workflow ID to query - * @param vid starting version ID (inclusive) - * @return List of workflow versions ordered from latest to earliest + * @param vid target version ID (exclusive — its own patch is NOT included) + * @return List of workflow versions ordered from latest to earliest, with vid > given */ def fetchSubsequentVersions( wid: Integer, @@ -282,7 +287,7 @@ object WorkflowVersionResource { context .select(WORKFLOW_VERSION.VID, WORKFLOW_VERSION.CREATION_TIME, WORKFLOW_VERSION.CONTENT) .from(WORKFLOW_VERSION) - .where(WORKFLOW_VERSION.WID.eq(wid).and(WORKFLOW_VERSION.VID.ge(vid))) + .where(WORKFLOW_VERSION.WID.eq(wid).and(WORKFLOW_VERSION.VID.gt(vid))) .orderBy(WORKFLOW_VERSION.VID.desc()) .fetchInto(classOf[WorkflowVersion]) .asScala diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index aa593cdcc65..9b06c0f6929 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -183,7 +183,11 @@ class WorkflowService( def initExecutionService( req: WorkflowExecuteRequest, userOpt: Option[User], - sessionUri: URI + sessionUri: URI, + // When true, skip the wipe-previous-execution-resources step. The compare-versions + // auto-run flow sets this so both sides' `operator_port_executions` rows survive + // long enough to be diffed. + preservePreviousResults: Boolean = false ): Unit = { if (executionService.hasValue) { @@ -195,12 +199,14 @@ class WorkflowService( val workflowContext: WorkflowContext = createWorkflowContext() var controllerConf = ControllerConfig.default - // clean up results from previous run - val previousExecutionId = - WorkflowExecutionService.getLatestExecutionId(workflowId, req.computingUnitId) - previousExecutionId.foreach(eid => { - clearExecutionResources(eid) - }) // TODO: change this behavior after enabling cache. + if (!preservePreviousResults) { + // clean up results from previous run + val previousExecutionId = + WorkflowExecutionService.getLatestExecutionId(workflowId, req.computingUnitId) + previousExecutionId.foreach(eid => { + clearExecutionResources(eid) + }) // TODO: change this behavior after enabling cache. + } workflowContext.executionId = ExecutionsMetadataPersistService.insertNewExecution( workflowContext.workflowId, diff --git a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala index bd55124a729..ac4811c28e5 100644 --- a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala +++ b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala @@ -216,4 +216,119 @@ class WorkflowExecutionsResourceSpec assert(rows.get(0).getResultUri == uri.toString) } + // --- compareOperatorPortStructure --------------------------------------- + + private def insertExecutionForTestWorkflow(name: String): WorkflowExecutions = { + val execution = new WorkflowExecutions + execution.setVid(testVersion.getVid) + execution.setUid(testUser.getUid) + execution.setStatus(0.toByte) + execution.setResult("") + execution.setStartingTime(new Timestamp(System.currentTimeMillis())) + execution.setBookmarked(false) + execution.setName(name) + execution.setEnvironmentVersion("test-env-1.0") + workflowExecutionsDao.insert(execution) + execution + } + + private def insertPortRow( + eid: Integer, + logicalOpId: String, + portId: Int, + internal: Boolean = false, + isInput: Boolean = false + ): GlobalPortIdentity = { + val gpi = GlobalPortIdentity( + PhysicalOpIdentity(OperatorIdentity(logicalOpId), "main"), + PortIdentity(id = portId, internal = internal), + input = isInput + ) + WorkflowExecutionsResource.insertOperatorPortResultUri( + ExecutionIdentity(eid.longValue()), + gpi, + URI.create(s"vfs:///wid/${testWorkflowWid}/eid/${eid}/op/${logicalOpId}/port/${portId}") + ) + gpi + } + + "WorkflowExecutionsResource.compareOperatorPortStructure" should + "classify ports as shared, onlyInA, or onlyInB by logical op id and external port id" in { + val execA = insertExecutionForTestWorkflow("execA") + val execB = insertExecutionForTestWorkflow("execB") + + // shared operator with two external output ports + insertPortRow(execA.getEid, "op-shared", 0) + insertPortRow(execA.getEid, "op-shared", 1) + insertPortRow(execB.getEid, "op-shared", 0) + insertPortRow(execB.getEid, "op-shared", 1) + + // only on A + insertPortRow(execA.getEid, "op-only-a", 0) + + // only on B + insertPortRow(execB.getEid, "op-only-b", 0) + + val entries = WorkflowExecutionsResource.compareOperatorPortStructure( + ExecutionIdentity(execA.getEid.longValue()), + ExecutionIdentity(execB.getEid.longValue()) + ) + + // Granularity is (operator, external output port) + val keyed = entries.map(e => (e.operatorId, e.portId) -> e.status).toMap + assert(keyed.size == 4, s"expected 4 entries, got ${keyed.size}: $entries") + assert(keyed(("op-shared", 0)) == "shared") + assert(keyed(("op-shared", 1)) == "shared") + assert(keyed(("op-only-a", 0)) == "onlyInA") + assert(keyed(("op-only-b", 0)) == "onlyInB") + + // shared entries carry both URIs; one-sided entries carry only their own + val sharedP0 = entries.find(e => e.operatorId == "op-shared" && e.portId == 0).get + assert(sharedP0.resultUriA.isDefined && sharedP0.resultUriB.isDefined) + + val onlyA = entries.find(_.operatorId == "op-only-a").get + assert(onlyA.resultUriA.isDefined && onlyA.resultUriB.isEmpty) + + val onlyB = entries.find(_.operatorId == "op-only-b").get + assert(onlyB.resultUriA.isEmpty && onlyB.resultUriB.isDefined) + } + + it should "filter out internal ports and input ports" in { + val execA = insertExecutionForTestWorkflow("execA-filter") + val execB = insertExecutionForTestWorkflow("execB-filter") + + // valid external output that should appear + insertPortRow(execA.getEid, "op-x", 0) + insertPortRow(execB.getEid, "op-x", 0) + + // internal ports - should be ignored + insertPortRow(execA.getEid, "op-x", 5, internal = true) + insertPortRow(execB.getEid, "op-x", 5, internal = true) + + // input ports - should be ignored + insertPortRow(execA.getEid, "op-x", 6, isInput = true) + insertPortRow(execB.getEid, "op-x", 6, isInput = true) + + val entries = WorkflowExecutionsResource.compareOperatorPortStructure( + ExecutionIdentity(execA.getEid.longValue()), + ExecutionIdentity(execB.getEid.longValue()) + ) + + assert(entries.size == 1, s"only the external output port should survive, got: $entries") + assert(entries.head.operatorId == "op-x" && entries.head.portId == 0) + assert(entries.head.status == "shared") + } + + it should "return an empty list when neither execution has any port entries" in { + val execA = insertExecutionForTestWorkflow("execA-empty") + val execB = insertExecutionForTestWorkflow("execB-empty") + + val entries = WorkflowExecutionsResource.compareOperatorPortStructure( + ExecutionIdentity(execA.getEid.longValue()), + ExecutionIdentity(execB.getEid.longValue()) + ) + + assert(entries.isEmpty) + } + } diff --git a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowVersionResourceSpec.scala b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowVersionResourceSpec.scala index ecf704f663b..1560b02d6a4 100644 --- a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowVersionResourceSpec.scala +++ b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowVersionResourceSpec.scala @@ -99,9 +99,13 @@ class WorkflowVersionResourceSpec val newJson = objectMapper.createObjectNode() newJson.put("value", newValue) + // Match production direction: insertVersion stores a *reverse* patch + // (JsonDiff.asJson(NEW, OLD)) — applying it to NEW yields OLD. Tests previously + // built forward patches and relied on last-write-wins to pass; that masked the + // off-by-one in fetchSubsequentVersions. val patch = com.flipkart.zjsonpatch.JsonDiff.asJson( - oldJson, - newJson + newJson, + oldJson ) patch.toString } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala index 06d04e407f5..46c0619d67b 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala @@ -25,11 +25,13 @@ import org.apache.texera.amber.util.IcebergUtil import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.data.Record import org.apache.iceberg.data.parquet.GenericParquetWriter +import org.apache.iceberg.exceptions.CommitFailedException import org.apache.iceberg.io.{DataWriter, OutputFile} import org.apache.iceberg.parquet.Parquet -import org.apache.iceberg.{Schema, Table} +import org.apache.iceberg.{DataFile, Schema, Table} import scala.collection.mutable.ArrayBuffer +import scala.util.Random /** * IcebergTableWriter writes data to the given Iceberg table in an append-only way. @@ -126,14 +128,42 @@ private[storage] class IcebergTableWriter[T]( } finally { dataWriter.close() } - // Commit the new file to the table + // Commit the new file to the table. + // Multiple workers of the same operator share one Iceberg table per output port and + // all race to commit at the end. Iceberg uses optimistic concurrency on the JDBC + // catalog, so concurrent commits manifest as `CommitFailedException`. Iceberg's own + // internal retry budget (~4 attempts) is sometimes exhausted under high-parallelism + // fast operators; this wrapper retries with jittered exponential backoff so the + // worker doesn't fail the whole execution just because two workers raced. val dataFile = dataWriter.toDataFile - table.newAppend().appendFile(dataFile).commit() + commitWithRetry(dataFile) // Clear the item buffer buffer.clear() } } + private def commitWithRetry(dataFile: DataFile): Unit = { + val maxAttempts = 10 + var attempt = 0 + var delayMs = 100L + val maxDelayMs = 2000L + while (true) { + try { + table.newAppend().appendFile(dataFile).commit() + return + } catch { + case e: CommitFailedException => + attempt += 1 + if (attempt >= maxAttempts) throw e + // Jittered exponential backoff: avoid thundering-herd retries when many workers + // collide on the same table at the same instant. + val jitter = Random.nextLong(delayMs) + Thread.sleep(delayMs + jitter) + delayMs = (delayMs * 2).min(maxDelayMs) + } + } + } + /** * Close the writer, ensuring any remaining buffered items are flushed. */ diff --git a/frontend/proxy.config.json b/frontend/proxy.config.json index f68602e0714..b3c05792be0 100755 --- a/frontend/proxy.config.json +++ b/frontend/proxy.config.json @@ -50,6 +50,11 @@ "secure": false, "changeOrigin": true }, + "/api/execution/**": { + "target": "http://localhost:8085", + "secure": false, + "changeOrigin": false + }, "/api": { "target": "http://localhost:8080", "secure": false, diff --git a/frontend/src/app/app-routing.module.ts b/frontend/src/app/app-routing.module.ts index 179caf5c088..37fafb92519 100644 --- a/frontend/src/app/app-routing.module.ts +++ b/frontend/src/app/app-routing.module.ts @@ -26,6 +26,7 @@ import { UserProjectSectionComponent } from "./dashboard/component/user/user-pro import { UserProjectComponent } from "./dashboard/component/user/user-project/user-project.component"; import { UserComputingUnitComponent } from "./dashboard/component/user/user-computing-unit/user-computing-unit.component"; import { WorkspaceComponent } from "./workspace/component/workspace.component"; +import { CompareWorkspaceComponent } from "./workspace/component/compare-workspace/compare-workspace.component"; import { AboutComponent } from "./hub/component/about/about.component"; import { AuthGuardService } from "./common/service/user/auth-guard.service"; import { AdminUserComponent } from "./dashboard/component/admin/user/admin-user.component"; @@ -119,6 +120,10 @@ routes.push({ path: "workflow/:id", component: WorkspaceComponent, }, + { + path: "workflow/:wid/compare/:eidA/:eidB", + component: CompareWorkspaceComponent, + }, { path: "dataset", component: UserDatasetComponent, diff --git a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts index b15f668bff3..95f68d7a6f6 100644 --- a/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts +++ b/frontend/src/app/dashboard/service/user/workflow-executions/workflow-executions.service.ts @@ -26,6 +26,7 @@ import { WorkflowRuntimeStatistics } from "../../../type/workflow-runtime-statis import { ExecutionState } from "../../../../workspace/types/execute-workflow.interface"; export const WORKFLOW_EXECUTIONS_API_BASE_URL = `${AppSettings.getApiEndpoint()}/executions`; +export const SYNC_EXECUTION_API_BASE_URL = `${AppSettings.getApiEndpoint()}/execution`; @Injectable({ providedIn: "root", @@ -91,4 +92,85 @@ export class WorkflowExecutionsService { params, }); } + + /** + * Side-by-side comparison summary for two executions of the same workflow. + */ + compareTwoExecutions(wid: number, eidA: number, eidB: number): Observable { + return this.http.get( + `${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/${eidA}/compare/${eidB}` + ); + } + + /** + * Fetch a paginated page of rows from a specific operator port's persisted result for a + * past execution. Powers the per-operator result panel of the workflow-compare view. + */ + retrieveExecutionResultPage( + wid: number, + eid: number, + opId: string, + portId: number, + page: number, + pageSize: number + ): Observable { + const params = new HttpParams().set("page", page.toString()).set("pageSize", pageSize.toString()); + return this.http.get( + `${WORKFLOW_EXECUTIONS_API_BASE_URL}/${wid}/${eid}/result/${encodeURIComponent(opId)}/${portId}`, + { params } + ); + } + + /** + * Run a historical workflow version end-to-end on the given computing unit and return + * the new execution's eid. Synchronous: the call blocks server-side until the workflow + * finishes (success or failure). Used by the compare-versions flow when a selected + * version has no completed execution to compare against. + */ + runWorkflowVersion(wid: number, cuid: number, vid: number): Observable { + return this.http.post( + `${SYNC_EXECUTION_API_BASE_URL}/${wid}/${cuid}/run-version/${vid}`, + {} + ); + } +} + +export interface SyncRunVersionResult { + readonly success: boolean; + readonly state: string; + readonly errors?: ReadonlyArray; + readonly eid: number; +} + +export interface CompareAttributeMeta { + readonly name: string; + readonly typeName: string; +} + +export interface OperatorPortCompareResult { + readonly operatorId: string; + readonly portId: number; + readonly status: "shared" | "onlyInA" | "onlyInB"; + readonly rowCountA: number | null; + readonly rowCountB: number | null; + readonly schemaA: ReadonlyArray; + readonly schemaB: ReadonlyArray; + readonly schemaMatches: boolean; +} + +export interface WorkflowExecutionCompareSummary { + readonly wid: number; + readonly eidA: number; + readonly eidB: number; + readonly vidA: number; + readonly vidB: number; + readonly operators: ReadonlyArray; +} + +export interface ExecutionOperatorResultPage { + readonly schema: ReadonlyArray; + readonly rows: ReadonlyArray>; + readonly totalRowCount: number; + readonly pageIndex: number; + readonly pageSize: number; } diff --git a/frontend/src/app/dashboard/type/workflow-executions-entry.ts b/frontend/src/app/dashboard/type/workflow-executions-entry.ts index 4f2542ec5f3..8d48fe84c49 100644 --- a/frontend/src/app/dashboard/type/workflow-executions-entry.ts +++ b/frontend/src/app/dashboard/type/workflow-executions-entry.ts @@ -32,6 +32,10 @@ export interface WorkflowExecutionsEntry { result: string; bookmarked: boolean; logLocation: string; + // True when this execution actually persisted per-operator output rows + // (`operator_port_executions`). Used by the compare-versions flow to avoid reusing + // "completed but empty" executions left over from older backend builds. + hasResults: boolean; } export const EXECUTION_STATUS_CODE: Record = { diff --git a/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.html b/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.html new file mode 100644 index 00000000000..f5b8438d741 --- /dev/null +++ b/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.html @@ -0,0 +1,184 @@ +
+
+
Compare workflow executions
+
+ Workflow #{{ wid }} · Execution A: #{{ eidA }} vs. Execution B: #{{ eidB }} +
+
+ +
+ Note: + rows are compared positionally (row 1 vs row 1, etc.). Texera's parallel execution can reshuffle row order between runs — to make row-level diffs meaningful, end your workflow with a deterministic Sort on a stable key. Differences below may reflect ordering, not data. +
+ +
Loading comparison…
+
{{ loadError }}
+ +
+
+ + {{ diffCounts.identical }} match{{ diffCounts.identical === 1 ? "" : "es" }} + + + {{ diffCounts.outputDiffer }} output differ{{ diffCounts.outputDiffer === 1 ? "s" : "" }} + + + {{ diffCounts.propsDiffer }} props differ + + + {{ diffCounts.onlyInA }} only in A + + + {{ diffCounts.onlyInB }} only in B + + + Click an operator in either DAG to highlight it on both sides. + +
+ +
+ + + + +
+ +
+ +
+
+ {{ displayOperatorName(selected.operatorId) }} + output port {{ selected.portId }} +
+
+ A: {{ rowsOnA }} rows + B: {{ rowsOnB }} rows + + Δ {{ rowsOnB - rowsOnA > 0 ? "+" : "" }}{{ rowsOnB - rowsOnA }} + + + {{ schemaDiffSummary(selected) }} + +
+
+ +
{{ pageLoadError }}
+ +
+
+
+ identical + row differs + only in A + only in B +
+ +
+ +
+
+
+ Execution A + #{{ eidA }} +
+
+ + + + + + + + + + + +
{{ col }}
+ + {{ cell.value }} +
+
+ +
No rows captured for this operator in execution A.
+
+
+ +
+
+ Execution B + #{{ eidB }} +
+
+ + + + + + + + + + + +
{{ col }}
+ + {{ cell.value }} +
+
+ +
No rows captured for this operator in execution B.
+
+
+
+
+
+ +
+ {{ displayOperatorName(clickedOperatorMissing) }} + has no captured results in either execution. +
+ Operators only show up here once Texera persists their output port results to + operator_port_executions. If you're seeing this for every operator, neither + execution actually wrote per-operator results — try re-running the workflow so the + output ports get persisted, then refresh this page. +
+
+ +
+ Neither execution #{{ eidA }} nor #{{ eidB }} has any per-operator results stored. + Clicking operators in the DAG above can't show outputs because there's nothing + to compare. Re-run the workflow (on a setup that writes to + operator_port_executions) and try again. +
+ +
Click an operator in either DAG above to see its results.
+
+
+
+
+
+
diff --git a/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.scss b/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.scss new file mode 100644 index 00000000000..b4c0047fd73 --- /dev/null +++ b/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.scss @@ -0,0 +1,477 @@ +.compare-workspace { + display: flex; + flex-direction: column; + height: 100%; + width: 100%; + font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif; + background: #fafafa; +} + +.compare-header { + padding: 12px 20px; + border-bottom: 1px solid #e0e0e0; + background: #fff; + + .title { + font-size: 18px; + font-weight: 600; + } + .subtitle { + font-size: 13px; + color: #666; + margin-top: 2px; + } +} + +.determinism-banner { + margin: 10px 20px 0; + padding: 8px 12px; + background: #fff9e6; + border: 1px solid #f1c40f; + border-radius: 4px; + font-size: 12px; + color: #6e5400; + line-height: 1.5; + + strong { + margin-right: 6px; + } +} + +.status { + padding: 20px; + color: #666; +} +.status-error { + color: #c0392b; +} +.status-warning { + color: #92651a; + background: #fff9e6; + border: 1px solid #f1c40f; + border-radius: 4px; + margin: 12px 0; + + code { + background: rgba(0, 0, 0, 0.06); + padding: 1px 4px; + border-radius: 3px; + font-size: 11px; + } +} +.status-detail { + margin-top: 6px; + font-size: 12px; + color: #6e5400; + line-height: 1.5; +} + +.body { + flex: 1; + display: flex; + flex-direction: column; + min-height: 0; + padding: 12px 20px 0; + gap: 12px; + overflow: auto; +} + +.diff-summary { + display: flex; + flex-wrap: wrap; + align-items: center; + gap: 6px; + flex-shrink: 0; + padding: 0; + font-size: 12px; + + .diff-chip { + padding: 2px 10px; + border-radius: 12px; + font-family: ui-monospace, "SF Mono", Menlo, monospace; + border: 1px solid; + line-height: 1.4; + } + .diff-chip-identical { + color: #1a6e3a; + border-color: #9ed2a3; + background: #dff5e0; + } + .diff-chip-output { + color: #6e5400; + border-color: #e0c66a; + background: #fff4ce; + } + .diff-chip-props { + color: #5b3000; + border-color: #e0a36a; + background: #ffe6cf; + } + .diff-chip-only { + color: #7a1f1f; + border-color: #e7a3a3; + background: #fde2e2; + } + .diff-help { + margin-left: 8px; + color: #777; + font-size: 11px; + font-style: italic; + } +} + +.dag-stack { + display: flex; + flex-direction: column; + gap: 10px; + // Give each DAG a concrete height so its inner flexbox can size the paper. + // Without this, height: 100% inside compare-dag has nothing to resolve against + // and the JointJS paper collapses to a few pixels. + flex-shrink: 0; + + texera-compare-dag { + display: block; + height: 420px; + } +} + +.result-panel { + background: #fff; + border: 1px solid #e0e0e0; + border-radius: 4px; + padding: 12px; + margin-bottom: 20px; +} + +.selected-header { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 14px; + gap: 12px; + flex-wrap: wrap; + + .selected-name { + font-size: 16px; + display: flex; + align-items: baseline; + gap: 8px; + + strong { + color: #1f1f1f; + } + } + .port-suffix { + color: #777; + font-weight: 400; + font-size: 12px; + padding: 2px 8px; + background: #f0f0f0; + border-radius: 10px; + } + .selected-meta { + display: inline-flex; + align-items: center; + gap: 6px; + flex-wrap: wrap; + } + .meta-pill { + font-size: 12px; + color: #444; + background: #f5f5f5; + border: 1px solid #e0e0e0; + padding: 2px 8px; + border-radius: 10px; + font-family: ui-monospace, "SF Mono", Menlo, monospace; + } + .meta-pill-warn { + background: #fff4e6; + border-color: #f5b66c; + color: #6e4500; + } +} + +.op-list { + width: 320px; + border-right: 1px solid #e0e0e0; + background: #fff; + overflow-y: auto; + + .op-list-header { + padding: 8px 16px; + font-weight: 600; + border-bottom: 1px solid #eee; + background: #f5f5f5; + } + + ul { + list-style: none; + margin: 0; + padding: 0; + } + + li { + padding: 10px 16px; + cursor: pointer; + border-bottom: 1px solid #f0f0f0; + display: flex; + align-items: center; + gap: 10px; + + &:hover { + background: #f5f9ff; + } + &.selected { + background: #e6f1ff; + } + } + + .badge { + width: 12px; + height: 12px; + border-radius: 50%; + flex-shrink: 0; + } + .badge-green { + background: #2ecc71; + } + .badge-yellow { + background: #f1c40f; + } + .badge-red { + background: #e74c3c; + } + + .op-meta { + flex: 1; + min-width: 0; + } + .op-id { + font-weight: 500; + font-size: 13px; + text-overflow: ellipsis; + overflow: hidden; + white-space: nowrap; + } + .port { + color: #888; + font-weight: 400; + } + .op-detail { + font-size: 11px; + color: #888; + margin-top: 2px; + } +} + +.op-detail-pane { + flex: 1; + padding: 16px 20px; + overflow: auto; + min-width: 0; +} + +.schema-strip, +.stats-strip { + padding: 8px 12px; + background: #fff; + border: 1px solid #e0e0e0; + border-radius: 4px; + margin-bottom: 10px; + display: flex; + gap: 16px; + align-items: baseline; + font-size: 13px; +} + +.schema-label { + font-weight: 600; + color: #555; +} +.schema-summary { + font-family: monospace; + color: #333; +} + +.result-toolbar { + display: flex; + justify-content: space-between; + align-items: center; + gap: 16px; + margin-bottom: 10px; + flex-wrap: wrap; +} + +.diff-legend { + display: inline-flex; + align-items: center; + gap: 14px; + font-size: 11px; + color: #666; + + .legend-chip { + display: inline-block; + width: 14px; + height: 10px; + border-radius: 2px; + margin-right: 4px; + vertical-align: middle; + } + .legend-same { + background: #ffffff; + border: 1px solid #d9d9d9; + } + .legend-changed { + background: #fff4ce; + border: 1px solid #e0c66a; + } + .legend-removed { + background: #fde2e2; + border: 1px solid #e7a3a3; + } + .legend-added { + background: #dff5e0; + border: 1px solid #9ed2a3; + } +} + +.pagination { + display: inline-flex; + align-items: center; + gap: 8px; + font-size: 12px; + color: #555; + + button { + padding: 3px 10px; + border: 1px solid #d0d0d0; + background: #fff; + border-radius: 3px; + cursor: pointer; + font-size: 12px; + } + button:hover:not(:disabled) { + background: #f5f5f5; + } + button:disabled { + opacity: 0.4; + cursor: not-allowed; + } + .page-label { + font-family: ui-monospace, "SF Mono", Menlo, monospace; + } +} + +.side-by-side { + display: grid; + grid-template-columns: 1fr 1fr; + gap: 12px; +} + +.empty-tpl { + padding: 24px; + text-align: center; + color: #999; + font-size: 12px; + font-style: italic; +} + +.side { + background: #fff; + border: 1px solid #e0e0e0; + border-radius: 4px; + overflow: hidden; + display: flex; + flex-direction: column; + min-width: 0; + + .side-header { + padding: 8px 12px; + background: #f7f7f7; + border-bottom: 1px solid #e0e0e0; + flex-shrink: 0; + display: flex; + align-items: baseline; + gap: 8px; + + .side-title { + font-weight: 600; + font-size: 13px; + color: #1f1f1f; + } + .side-eid { + color: #888; + font-size: 11px; + font-family: ui-monospace, "SF Mono", Menlo, monospace; + } + } + + .side-scroll { + overflow-x: auto; + } +} + +.diff-table { + width: 100%; + border-collapse: collapse; + font-size: 12px; + font-family: ui-monospace, "SF Mono", Menlo, monospace; + + th, + td { + padding: 6px 10px; + text-align: left; + border-bottom: 1px solid #f0f0f0; + max-width: 240px; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + } + th { + background: #fafafa; + color: #444; + font-weight: 600; + font-size: 11px; + text-transform: uppercase; + letter-spacing: 0.04em; + border-bottom: 1px solid #e6e6e6; + position: sticky; + top: 0; + z-index: 1; + } + + // git-diff–style row tints + tr.row-same td { + background: #ffffff; + } + tr.row-changed td { + background: #fff8dc; + } + tr.row-onlyA td { + background: #fde2e2; + } + tr.row-onlyB td { + background: #dff5e0; + } + // hover effect + tbody tr:hover td { + filter: brightness(0.97); + } + + // Cell-level highlight on top of the row tint, so the user can pinpoint which value + // changed within a "row differs" row. + td.cell-diff { + background: rgba(231, 76, 60, 0.28) !important; + color: #7a1f1f; + font-weight: 600; + } + + td.cell-missing { + color: #bbb; + font-style: italic; + text-align: center; + } + .missing-mark { + color: #bbb; + } +} diff --git a/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.spec.ts b/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.spec.ts new file mode 100644 index 00000000000..e2932d534aa --- /dev/null +++ b/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.spec.ts @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { ComponentFixture, TestBed } from "@angular/core/testing"; +import { ActivatedRoute, convertToParamMap } from "@angular/router"; +import { of } from "rxjs"; +import { HttpClientTestingModule } from "@angular/common/http/testing"; +import { CompareWorkspaceComponent } from "./compare-workspace.component"; +import { + OperatorPortCompareResult, + WorkflowExecutionCompareSummary, + WorkflowExecutionsService, +} from "../../../dashboard/service/user/workflow-executions/workflow-executions.service"; + +class ExecutionsServiceStub { + compareTwoExecutions(): unknown { + const summary: WorkflowExecutionCompareSummary = { + wid: 1, + eidA: 2, + eidB: 3, + operators: [], + }; + return of(summary); + } + retrieveExecutionResultPage(): unknown { + return of({ schema: [], rows: [], totalRowCount: 0, pageIndex: 0, pageSize: 25 }); + } +} + +describe("CompareWorkspaceComponent", () => { + let component: CompareWorkspaceComponent; + let fixture: ComponentFixture; + + beforeEach(async () => { + await TestBed.configureTestingModule({ + imports: [CompareWorkspaceComponent, HttpClientTestingModule], + providers: [ + { provide: WorkflowExecutionsService, useClass: ExecutionsServiceStub }, + { + provide: ActivatedRoute, + useValue: { + snapshot: { paramMap: convertToParamMap({ wid: "1", eidA: "2", eidB: "3" }) }, + }, + }, + ], + }).compileComponents(); + + fixture = TestBed.createComponent(CompareWorkspaceComponent); + component = fixture.componentInstance; + fixture.detectChanges(); + }); + + it("renders header with the workflow + execution ids from the route", () => { + expect(component.wid).toBe(1); + expect(component.eidA).toBe(2); + expect(component.eidB).toBe(3); + }); + + it("classifies badges by status, schema match, and row count", () => { + const shared: OperatorPortCompareResult = { + operatorId: "op", + portId: 0, + status: "shared", + rowCountA: 10, + rowCountB: 10, + schemaA: [], + schemaB: [], + schemaMatches: true, + }; + expect(component.badgeClass(shared)).toBe("badge-green"); + + expect(component.badgeClass({ ...shared, rowCountA: 9 })).toBe("badge-yellow"); + expect(component.badgeClass({ ...shared, schemaMatches: false })).toBe("badge-red"); + expect(component.badgeClass({ ...shared, status: "onlyInA" })).toBe("badge-red"); + }); + + it("summarises schema diffs as adds / removes", () => { + const entry: OperatorPortCompareResult = { + operatorId: "op", + portId: 0, + status: "shared", + rowCountA: 1, + rowCountB: 1, + schemaA: [ + { name: "id", typeName: "INTEGER" }, + { name: "old", typeName: "STRING" }, + ], + schemaB: [ + { name: "id", typeName: "INTEGER" }, + { name: "new", typeName: "STRING" }, + ], + schemaMatches: false, + }; + const summary = component.schemaDiffSummary(entry); + expect(summary).toContain("+ new"); + expect(summary).toContain("− old"); + }); + + it("reports schemas match when both schemas are equal", () => { + const entry: OperatorPortCompareResult = { + operatorId: "op", + portId: 0, + status: "shared", + rowCountA: 1, + rowCountB: 1, + schemaA: [{ name: "id", typeName: "INTEGER" }], + schemaB: [{ name: "id", typeName: "INTEGER" }], + schemaMatches: true, + }; + expect(component.schemaDiffSummary(entry)).toBe("schemas match"); + }); +}); diff --git a/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.ts b/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.ts new file mode 100644 index 00000000000..1f23ccdc400 --- /dev/null +++ b/frontend/src/app/workspace/component/compare-workspace/compare-workspace.component.ts @@ -0,0 +1,404 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { CommonModule } from "@angular/common"; +import { Component, OnInit } from "@angular/core"; +import { ActivatedRoute } from "@angular/router"; +import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; +import { forkJoin } from "rxjs"; +import { + ExecutionOperatorResultPage, + OperatorPortCompareResult, + WorkflowExecutionCompareSummary, + WorkflowExecutionsService, +} from "../../../dashboard/service/user/workflow-executions/workflow-executions.service"; +import { WorkflowVersionService } from "../../../dashboard/service/user/workflow-version/workflow-version.service"; +import { CompareDagComponent, OperatorDiffStatus } from "./compare-dag/compare-dag.component"; + +/** + * One row's worth of side-by-side data. `kind` controls the visual treatment: + * - `same` both sides identical → no tint + * - `changed` both rows exist but differ on at least one cell → yellow tint, + * cells that differ get the per-cell red highlight + * - `onlyA` row exists in A, B is padding → red tint on A, "—" on B + * - `onlyB` row exists in B, A is padding → green tint on B, "—" on A + */ +type RowKind = "same" | "changed" | "onlyA" | "onlyB"; + +interface DiffCell { + readonly value: string; + readonly differs: boolean; + readonly missing: boolean; +} + +interface DiffRow { + readonly kind: RowKind; + readonly cellsA: ReadonlyArray; + readonly cellsB: ReadonlyArray; +} + +@UntilDestroy() +@Component({ + selector: "texera-compare-workspace", + standalone: true, + imports: [CommonModule, CompareDagComponent], + templateUrl: "./compare-workspace.component.html", + styleUrls: ["./compare-workspace.component.scss"], +}) +export class CompareWorkspaceComponent implements OnInit { + wid = 0; + eidA = 0; + eidB = 0; + + loading = true; + loadError: string | null = null; + summary: WorkflowExecutionCompareSummary | null = null; + + // Workflow content for each side's version, fed into the DAG canvases. + // The WorkflowVersionService parses content from string into a structured object, so we + // accept `any` here and let the DAG component handle either shape. + contentA: any = null; + contentB: any = null; + // operatorId → combined diff status (presence + properties + output). Single map + // shared by both DAGs so the labels and colors agree on each side. Onlyin-A status + // visibly renders only on side A's DAG (and vice versa) since that operator isn't + // present in the other side's content. + diffStatusMap: ReadonlyMap = new Map(); + // Counts for the header summary chip. + diffCounts = { identical: 0, propsDiffer: 0, outputDiffer: 0, onlyInA: 0, onlyInB: 0 }; + + selected: OperatorPortCompareResult | null = null; + // When set, the user clicked an operator that isn't in summary.operators — meaning + // neither execution captured per-operator results for it. Used to show a hint in the + // result panel instead of failing silently. + clickedOperatorMissing: string | null = null; + pageIndex = 0; + pageSize = 25; + pageA: ExecutionOperatorResultPage | null = null; + pageB: ExecutionOperatorResultPage | null = null; + pageLoadError: string | null = null; + diffRows: ReadonlyArray = []; + unionColumnNames: ReadonlyArray = []; + // Row count summaries shown above each side's table. + rowsOnA = 0; + rowsOnB = 0; + + constructor( + private route: ActivatedRoute, + private executionsService: WorkflowExecutionsService, + private workflowVersionService: WorkflowVersionService + ) {} + + ngOnInit(): void { + this.wid = Number(this.route.snapshot.paramMap.get("wid")); + this.eidA = Number(this.route.snapshot.paramMap.get("eidA")); + this.eidB = Number(this.route.snapshot.paramMap.get("eidB")); + this.executionsService + .compareTwoExecutions(this.wid, this.eidA, this.eidB) + .pipe(untilDestroyed(this)) + .subscribe({ + next: summary => { + this.summary = summary; + this.loadWorkflowContents(summary); + this.loading = false; + const firstShared = summary.operators.find(o => o.status === "shared") ?? summary.operators[0]; + if (firstShared) { + this.selectOperator(firstShared); + } + }, + error: err => { + this.loadError = err?.error?.message ?? err?.message ?? "Failed to load comparison"; + this.loading = false; + }, + }); + } + + private loadWorkflowContents(summary: WorkflowExecutionCompareSummary): void { + this.contentA = null; + this.contentB = null; + if (summary.vidA > 0) { + this.workflowVersionService + .retrieveWorkflowByVersion(summary.wid, summary.vidA) + .pipe(untilDestroyed(this)) + .subscribe(wf => { + this.contentA = wf?.content ?? null; + this.recomputeDiffStatus(); + }); + } + if (summary.vidB > 0) { + this.workflowVersionService + .retrieveWorkflowByVersion(summary.wid, summary.vidB) + .pipe(untilDestroyed(this)) + .subscribe(wf => { + this.contentB = wf?.content ?? null; + this.recomputeDiffStatus(); + }); + } + } + + /** + * Walk both side's operator sets and compute a per-operator diff status that combines + * - presence: operator missing on the other side → `onlyInA` / `onlyInB` + * - properties: same id on both sides but `operatorProperties` JSON differs → `propsDiffer` + * - output: ports' row counts or schemas differ → `outputDiffer` + * - otherwise → `identical` + * Pure presence/property checks come from the workflow content; output checks fall + * back to the per-port summary that the compare endpoint returned. + */ + private recomputeDiffStatus(): void { + if (!this.summary || !this.contentA || !this.contentB) return; + const opsA = new Map(); + const opsB = new Map(); + for (const op of this.parseOperators(this.contentA)) opsA.set(op.operatorID, op); + for (const op of this.parseOperators(this.contentB)) opsB.set(op.operatorID, op); + + // Build a quick lookup: operatorId → true when at least one of its ports has a + // row-count or schema mismatch. Cheaper than re-scanning summary inside the loop. + const opIdsWithOutputDiff = new Set(); + for (const entry of this.summary.operators) { + const portDiffers = + entry.status === "onlyInA" || + entry.status === "onlyInB" || + !entry.schemaMatches || + entry.rowCountA !== entry.rowCountB; + if (portDiffers) opIdsWithOutputDiff.add(entry.operatorId); + } + + const result = new Map(); + const counts = { identical: 0, propsDiffer: 0, outputDiffer: 0, onlyInA: 0, onlyInB: 0 }; + const allIds = new Set([...opsA.keys(), ...opsB.keys()]); + for (const id of allIds) { + const a = opsA.get(id); + const b = opsB.get(id); + let status: OperatorDiffStatus; + if (a && !b) { + status = "onlyInA"; + } else if (!a && b) { + status = "onlyInB"; + } else if (this.opPropertiesEqual(a, b)) { + status = opIdsWithOutputDiff.has(id) ? "outputDiffer" : "identical"; + } else { + status = "propsDiffer"; + } + result.set(id, status); + counts[status]++; + } + this.diffStatusMap = result; + this.diffCounts = counts; + } + + /** + * Pull the operators array out of a parsed-or-string workflow content. WorkflowVersion + * service usually parses it, but some legacy paths hand back a string; tolerate both. + */ + private parseOperators(content: any): ReadonlyArray<{ operatorID: string; operatorProperties?: any }> { + let parsed: any = content; + if (typeof content === "string") { + try { + parsed = JSON.parse(content); + } catch { + return []; + } + } + return Array.isArray(parsed?.operators) ? parsed.operators : []; + } + + private opPropertiesEqual(a: any, b: any): boolean { + // Deep-equal operatorProperties via JSON-string canonicalization. Field order matters + // here; if that becomes a real issue we'd switch to a stable stringify, but Texera's + // current persistence keeps key order consistent. + return JSON.stringify(a?.operatorProperties ?? {}) === JSON.stringify(b?.operatorProperties ?? {}); + } + + /** Click handler from either DAG canvas. Resolves to the first port of the operator. */ + onDagOperatorClicked(operatorId: string): void { + if (!this.summary) return; + const entry = this.summary.operators.find(o => o.operatorId === operatorId); + if (entry) { + this.clickedOperatorMissing = null; + this.selectOperator(entry); + } else { + // No per-operator results were persisted for this operator in either execution. + // Surface this in the result panel so the click isn't silently ignored. + this.clickedOperatorMissing = operatorId; + this.selected = null; + } + } + + selectOperator(entry: OperatorPortCompareResult): void { + this.selected = entry; + this.pageIndex = 0; + this.loadPage(); + } + + changePage(delta: number): void { + const next = Math.max(0, this.pageIndex + delta); + if (next === this.pageIndex) return; + this.pageIndex = next; + this.loadPage(); + } + + private loadPage(): void { + if (!this.selected) return; + this.pageLoadError = null; + this.pageA = null; + this.pageB = null; + this.diffRows = []; + this.unionColumnNames = []; + this.rowsOnA = 0; + this.rowsOnB = 0; + + const entry = this.selected; + const fetchA = + entry.status === "onlyInB" + ? null + : this.executionsService.retrieveExecutionResultPage( + this.wid, + this.eidA, + entry.operatorId, + entry.portId, + this.pageIndex, + this.pageSize + ); + const fetchB = + entry.status === "onlyInA" + ? null + : this.executionsService.retrieveExecutionResultPage( + this.wid, + this.eidB, + entry.operatorId, + entry.portId, + this.pageIndex, + this.pageSize + ); + + forkJoin({ + a: fetchA ?? Promise.resolve(null), + b: fetchB ?? Promise.resolve(null), + }) + .pipe(untilDestroyed(this)) + .subscribe({ + next: ({ a, b }) => { + this.pageA = a as ExecutionOperatorResultPage | null; + this.pageB = b as ExecutionOperatorResultPage | null; + this.computeRowDiff(); + }, + error: err => { + this.pageLoadError = err?.error?.message ?? err?.message ?? "Failed to load result page"; + }, + }); + } + + /** + * Build the side-by-side diff row list from the two fetched pages. Rows are paired + * positionally (A[i] vs B[i]) — the same positional convention the existing + * "deterministic sort" banner warns about. Each pair is classified into one of four + * kinds so the template can apply git-diff–style coloring. + */ + private computeRowDiff(): void { + const rowsA = this.pageA?.rows ?? []; + const rowsB = this.pageB?.rows ?? []; + const colsA = this.pageA?.schema.map(s => s.name) ?? []; + const colsB = this.pageB?.schema.map(s => s.name) ?? []; + this.rowsOnA = this.pageA?.totalRowCount ?? rowsA.length; + this.rowsOnB = this.pageB?.totalRowCount ?? rowsB.length; + + // Display the union of columns so per-cell diffs line up when schemas overlap. + // Order: columns from A first (preserving A's column order), then any + // B-only columns appended at the end. + const seen = new Set(); + const union: string[] = []; + [...colsA, ...colsB].forEach(name => { + if (!seen.has(name)) { + seen.add(name); + union.push(name); + } + }); + this.unionColumnNames = union; + + const maxRows = Math.max(rowsA.length, rowsB.length); + const rows: DiffRow[] = []; + for (let i = 0; i < maxRows; i++) { + const rowA = rowsA[i]; + const rowB = rowsB[i]; + const cellsA: DiffCell[] = []; + const cellsB: DiffCell[] = []; + let anyCellDiffers = false; + + union.forEach(col => { + const aPresent = !!rowA && col in rowA; + const bPresent = !!rowB && col in rowB; + const aVal = aPresent ? this.stringifyCell(rowA[col]) : ""; + const bVal = bPresent ? this.stringifyCell(rowB[col]) : ""; + const differs = aPresent && bPresent && aVal !== bVal; + if (differs) anyCellDiffers = true; + cellsA.push({ value: aVal, differs, missing: !aPresent }); + cellsB.push({ value: bVal, differs, missing: !bPresent }); + }); + + const kind: RowKind = !rowA ? "onlyB" : !rowB ? "onlyA" : anyCellDiffers ? "changed" : "same"; + rows.push({ kind, cellsA, cellsB }); + } + this.diffRows = rows; + } + + private stringifyCell(value: unknown): string { + if (value === null || value === undefined) return "NULL"; + if (typeof value === "string") return value; + if (typeof value === "number" || typeof value === "boolean") return String(value); + try { + return JSON.stringify(value); + } catch { + return String(value); + } + } + + schemaDiffSummary(entry: OperatorPortCompareResult): string { + const namesA = new Set(entry.schemaA.map(a => a.name)); + const namesB = new Set(entry.schemaB.map(a => a.name)); + const added = entry.schemaB.filter(a => !namesA.has(a.name)).map(a => a.name); + const removed = entry.schemaA.filter(a => !namesB.has(a.name)).map(a => a.name); + const parts: string[] = []; + if (added.length) parts.push(`+ ${added.join(", ")}`); + if (removed.length) parts.push(`− ${removed.join(", ")}`); + return parts.length ? parts.join(" ") : "schemas match"; + } + + /** + * Operator IDs are of the form `{OperatorType}-operator-{uuid}`. Strip the suffix so the + * left rail just shows the operator type (e.g. "TextInput" instead of + * "TextInput-operator-bd06d395-…"). + */ + displayOperatorName(operatorId: string): string { + const idx = operatorId.indexOf("-operator-"); + return idx > 0 ? operatorId.substring(0, idx) : operatorId; + } + + /** + * True when the current page is the last page of results on both sides. Used to disable + * the Next button so the user can't page past the end into an empty view. + */ + isLastPage(): boolean { + if (!this.pageA && !this.pageB) return true; + const totalA = this.pageA?.totalRowCount ?? 0; + const totalB = this.pageB?.totalRowCount ?? 0; + const maxTotal = Math.max(totalA, totalB); + return (this.pageIndex + 1) * this.pageSize >= maxTotal; + } +} diff --git a/frontend/src/app/workspace/component/left-panel/versions-list/versions-list.component.html b/frontend/src/app/workspace/component/left-panel/versions-list/versions-list.component.html index bed3bd1567a..9296708c764 100644 --- a/frontend/src/app/workspace/component/left-panel/versions-list/versions-list.component.html +++ b/frontend/src/app/workspace/component/left-panel/versions-list/versions-list.component.html @@ -17,6 +17,27 @@ under the License. --> +
+ + + {{ compareStatus }} + {{ compareError }} +
+ + + + (); + public compareError: string | null = null; + public compareLoading = false; + public compareStatus: string | null = null; constructor( private workflowActionService: WorkflowActionService, public workflowVersionService: WorkflowVersionService, + private workflowExecutionsService: WorkflowExecutionsService, + private computingUnitStatusService: ComputingUnitStatusService, + private router: Router, public route: ActivatedRoute ) {} @@ -124,4 +140,134 @@ export class VersionsListComponent implements OnInit { this.workflowVersionService.displayParticularVersion(workflow, vid, displayedVersionId); }); } + + toggleCompareMode(): void { + this.compareMode = !this.compareMode; + this.compareSelection.clear(); + this.compareError = null; + this.compareStatus = null; + } + + isVersionSelectedForCompare(vid: number): boolean { + return this.compareSelection.has(vid); + } + + onCompareCheckChange(vid: number, checked: boolean): void { + if (checked) { + if (this.compareSelection.size >= 2) { + // drop the earliest-added entry to keep at most 2 + const first = this.compareSelection.values().next().value; + if (first !== undefined) this.compareSelection.delete(first); + } + this.compareSelection.add(vid); + } else { + this.compareSelection.delete(vid); + } + } + + canCompare(): boolean { + return this.compareSelection.size === 2 && !this.compareLoading; + } + + runCompare(): void { + if (!this.canCompare()) return; + const wid = this.workflowActionService.getWorkflowMetadata()?.wid; + if (!wid) { + this.compareError = "Save the workflow first to enable comparison"; + return; + } + const [vidA, vidB] = Array.from(this.compareSelection); + this.compareLoading = true; + this.compareError = null; + this.compareStatus = "Resolving versions…"; + + // Resolve cuid once up front — needed if either version has to be auto-run. + this.computingUnitStatusService + .getSelectedComputingUnit() + .pipe(take(1), untilDestroyed(this)) + .subscribe(unit => { + const cuid = unit?.computingUnit?.cuid; + if (!cuid) { + this.compareLoading = false; + this.compareStatus = null; + this.compareError = "Select a computing unit before comparing versions"; + return; + } + + this.workflowExecutionsService + .retrieveWorkflowExecutions(wid) + .pipe( + switchMap(executions => + // Sequential A then B: the sync endpoint shuts down any in-progress execution + // on the same workflow, so parallel auto-runs would cancel each other. + this.resolveEidForVersion(wid, cuid, vidA, executions, "A").pipe( + switchMap(eidA => + this.resolveEidForVersion(wid, cuid, vidB, executions, "B").pipe( + switchMap(eidB => of({ eidA, eidB })) + ) + ) + ) + ), + untilDestroyed(this) + ) + .subscribe({ + next: ({ eidA, eidB }) => { + this.compareLoading = false; + this.compareStatus = null; + if (eidA === eidB) { + this.compareError = + "Both selected versions resolved to the same execution. " + + "Pick versions that span at least one separate run."; + return; + } + this.router.navigate(["/dashboard/user/workflow", wid, "compare", eidA, eidB]); + }, + error: err => { + this.compareLoading = false; + this.compareStatus = null; + this.compareError = err?.message ?? "Compare failed"; + }, + }); + }); + } + + /** + * For a given version, return the eid of a completed execution at that exact vid, or + * trigger a fresh sync run of that version and return the resulting eid. + * + * status === 3 corresponds to ExecutionState.Completed in EXECUTION_STATUS_CODE. + */ + private resolveEidForVersion( + wid: number, + cuid: number, + vid: number, + executions: ReadonlyArray<{ eId: number; vId: number; status: number; hasResults?: boolean }>, + label: "A" | "B" + ): Observable { + // Require both status=Completed AND hasResults — without per-operator rows in + // `operator_port_executions`, the compare endpoint can't show anything, so reusing + // such an eid is worse than running a fresh one. + const completed = executions + .filter(e => e.vId === vid && e.status === 3 && e.hasResults === true) + .reduce<{ eId: number; vId: number; status: number } | null>( + (latest, cur) => (latest === null || cur.eId > latest.eId ? cur : latest), + null + ); + if (completed) { + return of(completed.eId); + } + this.compareStatus = `Running version ${vid} (side ${label})… this may take a while`; + return this.workflowExecutionsService.runWorkflowVersion(wid, cuid, vid).pipe( + switchMap(result => { + if (!result.success) { + const why = result.errors && result.errors.length ? result.errors.join("; ") : result.state; + return throwError(() => new Error(`Auto-run for version ${vid} (side ${label}) failed: ${why}`)); + } + return of(result.eid); + }), + catchError(err => + throwError(() => new Error(err?.error?.message ?? err?.message ?? `Auto-run failed for version ${vid}`)) + ) + ); + } }