diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index 4e9d6c6e2cd..f0663667310 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -77,6 +77,9 @@ import org.apache.texera.amber.operator.source.apis.twitter.v2.{ } import org.apache.texera.amber.operator.source.dataset.FileListerSourceOpDesc import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc +import org.apache.texera.amber.operator.source.http.{PollingHttpSourceOpDesc, WebSocketSourceOpDesc} +import org.apache.texera.amber.operator.http.HttpRequestOpDesc +import org.apache.texera.amber.operator.llm.LLMAgentOpDesc import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc import org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc @@ -182,6 +185,10 @@ trait StateTransferFunc value = classOf[TwitterSearchSourceOpDesc], name = "TwitterSearch" ), + new Type(value = classOf[PollingHttpSourceOpDesc], name = "PollingHttpSource"), + new Type(value = classOf[WebSocketSourceOpDesc], name = "WebSocketSource"), + new Type(value = classOf[HttpRequestOpDesc], name = "HttpRequest"), + new Type(value = classOf[LLMAgentOpDesc], name = "LLMAgent"), new Type(value = classOf[ChoroplethMapOpDesc], name = "ChoroplethMap"), new Type(value = classOf[TimeSeriesOpDesc], name = "TimeSeriesPlot"), new Type(value = classOf[CandlestickChartOpDesc], name = "CandlestickChart"), diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/HttpRequestOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/HttpRequestOpDesc.scala new file mode 100644 index 00000000000..e44ee9f56c0 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/HttpRequestOpDesc.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.http + +import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.AttributeType +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp, SchemaPropagationFunc} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.http.util.{HttpMethod, KeyValuePair} +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +class HttpRequestOpDesc extends LogicalOp { + + @JsonProperty(required = true) + @JsonSchemaTitle("URL") + @JsonPropertyDescription( + "Target URL. Supports ${fieldName} placeholders that will be filled from the input tuple." + ) + var url: String = _ + + @JsonProperty(required = true) + @JsonSchemaTitle("HTTP Method") + @JsonPropertyDescription("HTTP method to use for each request") + var method: HttpMethod = HttpMethod.POST + + @JsonProperty + @JsonSchemaTitle("Headers") + @JsonPropertyDescription("Optional headers, e.g. Authorization: Bearer ") + var headers: java.util.List[KeyValuePair] = new java.util.ArrayList[KeyValuePair]() + + @JsonProperty + @JsonSchemaTitle("Request Body Template") + @JsonPropertyDescription( + "Body sent with each request. Supports ${fieldName} placeholders from the input tuple. Ignored for GET." + ) + var bodyTemplate: String = "" + + @JsonProperty(required = true) + @JsonSchemaTitle("Timeout (seconds)") + @JsonPropertyDescription("Per-request timeout in seconds") + var timeoutSeconds: Int = 10 + + @JsonProperty(required = true) + @JsonSchemaTitle("Fail on error") + @JsonPropertyDescription( + "If true, a non-2xx response or transport error fails the workflow. " + + "If false (default), the error is recorded in the output tuple and processing continues." + ) + var failOnError: Boolean = false + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "org.apache.texera.amber.operator.http.HttpRequestOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withPropagateSchema(SchemaPropagationFunc(inputSchemas => { + val inputSchema = inputSchemas.values.head + val outputSchema = inputSchema + .add("http_request_status", AttributeType.INTEGER) + .add("http_request_body", AttributeType.STRING) + .add("http_request_error", AttributeType.STRING) + Map(operatorInfo.outputPorts.head.id -> outputSchema) + })) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + userFriendlyName = "HTTP Request", + operatorDescription = + "For each input tuple, make an HTTP request (URL/body support ${fieldName} interpolation) and append the response.", + operatorGroupName = OperatorGroupConstants.API_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/HttpRequestOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/HttpRequestOpExec.scala new file mode 100644 index 00000000000..abc5a1afa4c --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/HttpRequestOpExec.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.http + +import org.apache.texera.amber.core.executor.OperatorExecutor +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} +import org.apache.texera.amber.operator.http.util.{HttpClientFactory, HttpMethod, TemplateInterpolator} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +import java.net.URI +import java.net.http.{HttpRequest, HttpResponse} +import java.time.Duration +import scala.collection.mutable + +class HttpRequestOpExec(descString: String) extends OperatorExecutor { + private val desc: HttpRequestOpDesc = + objectMapper.readValue(descString, classOf[HttpRequestOpDesc]) + + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { + val resolvedUrl = TemplateInterpolator.interpolate(desc.url, tuple) + val resolvedBody = TemplateInterpolator.interpolate(desc.bodyTemplate, tuple) + + val (status, body, error) = try { + val builder = HttpRequest + .newBuilder() + .uri(URI.create(resolvedUrl)) + .timeout(Duration.ofSeconds(math.max(1, desc.timeoutSeconds).toLong)) + + Option(desc.headers).foreach { hs => + hs.forEach { kv => + if (kv != null && kv.key != null && kv.value != null) { + builder.header(kv.key, kv.value) + } + } + } + + val bodyPublisher = + if (resolvedBody != null && resolvedBody.nonEmpty) + HttpRequest.BodyPublishers.ofString(resolvedBody) + else HttpRequest.BodyPublishers.noBody() + + val method = if (desc.method == null) HttpMethod.POST else desc.method + method match { + case HttpMethod.GET => builder.GET() + case HttpMethod.POST => builder.POST(bodyPublisher) + case HttpMethod.PUT => builder.PUT(bodyPublisher) + case HttpMethod.PATCH => builder.method("PATCH", bodyPublisher) + case HttpMethod.DELETE => builder.DELETE() + } + + val response = HttpClientFactory.sharedClient + .send(builder.build(), HttpResponse.BodyHandlers.ofString()) + val code = response.statusCode() + val errOpt = + if (code >= 200 && code < 300) null + else s"HTTP $code" + (Integer.valueOf(code), response.body(), errOpt) + } catch { + case t: Throwable => + if (desc.failOnError) throw t + (Integer.valueOf(-1), "", s"${t.getClass.getSimpleName}: ${t.getMessage}") + } + + if (desc.failOnError && error != null) { + throw new RuntimeException(s"HTTP request failed: $error (body=$body)") + } + + val fields = mutable.LinkedHashMap[String, Any]() + tuple.schema.getAttributeNames.foreach { name => + fields(name) = tuple.getField[Any](name) + } + fields("http_request_status") = status + fields("http_request_body") = body + fields("http_request_error") = error + Iterator(TupleLike(fields.toSeq: _*)) + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/HttpClientFactory.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/HttpClientFactory.scala new file mode 100644 index 00000000000..204bb78021c --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/HttpClientFactory.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.http.util + +import java.net.http.HttpClient +import java.time.Duration + +object HttpClientFactory { + lazy val sharedClient: HttpClient = HttpClient + .newBuilder() + .connectTimeout(Duration.ofSeconds(15)) + .followRedirects(HttpClient.Redirect.NORMAL) + .build() +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/HttpMethod.java b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/HttpMethod.java new file mode 100644 index 00000000000..e6cd4e0d90e --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/HttpMethod.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.http.util; + +import com.fasterxml.jackson.annotation.JsonValue; + +public enum HttpMethod { + GET("GET"), + POST("POST"), + PUT("PUT"), + PATCH("PATCH"), + DELETE("DELETE"); + + private final String name; + + HttpMethod(String name) { + this.name = name; + } + + @JsonValue + public String getName() { + return this.name; + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/KeyValuePair.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/KeyValuePair.scala new file mode 100644 index 00000000000..470a30e58a7 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/KeyValuePair.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.http.util + +import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty} +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle + +class KeyValuePair( + @JsonProperty("key") @JsonSchemaTitle("Key") val key: String, + @JsonProperty("value") @JsonSchemaTitle("Value") val value: String +) { + @JsonCreator + def this() = this(null, null) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/TemplateInterpolator.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/TemplateInterpolator.scala new file mode 100644 index 00000000000..87153ba46cc --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/http/util/TemplateInterpolator.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.http.util + +import org.apache.texera.amber.core.tuple.Tuple + +import scala.util.matching.Regex + +object TemplateInterpolator { + private val placeholder: Regex = """\$\{([A-Za-z0-9_\.\- ]+)\}""".r + + // Replace ${fieldName} occurrences in `template` with the matching field's string value + // from `tuple`. Unknown fields are replaced with an empty string. + def interpolate(template: String, tuple: Tuple): String = { + if (template == null) return "" + placeholder.replaceAllIn( + template, + m => { + val name = m.group(1) + val value = + try { + val raw = tuple.getField[Any](name) + if (raw == null) "" else raw.toString + } catch { + case _: Throwable => "" + } + Regex.quoteReplacement(value) + } + ) + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/llm/LLMAgentOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/llm/LLMAgentOpDesc.scala new file mode 100644 index 00000000000..c9ca878424f --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/llm/LLMAgentOpDesc.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.llm + +import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.AttributeType +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp, SchemaPropagationFunc} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +class LLMAgentOpDesc extends LogicalOp { + + @JsonProperty(required = true) + @JsonSchemaTitle("Provider") + @JsonPropertyDescription("LLM provider to call") + var provider: LLMProvider = LLMProvider.ANTHROPIC + + @JsonProperty + @JsonSchemaTitle("API Key") + @JsonPropertyDescription( + "API key for the chosen provider. Leave blank to fall back to the ANTHROPIC_API_KEY or OPENAI_API_KEY environment variable on the worker JVM." + ) + var apiKey: String = "" + + @JsonProperty(required = true) + @JsonSchemaTitle("Model") + @JsonPropertyDescription( + "Model name. Anthropic examples: claude-haiku-4-5-20251001, claude-sonnet-4-6, claude-opus-4-7. OpenAI examples: gpt-4o-mini, gpt-4o." + ) + var model: String = "claude-haiku-4-5-20251001" + + @JsonProperty(required = true) + @JsonSchemaTitle("System Prompt") + @JsonPropertyDescription( + "System / instructions message. Supports ${fieldName} placeholders that will be filled from the input tuple." + ) + var systemPrompt: String = + "You are a concise analyst. Respond with a short, factual summary." + + @JsonProperty(required = true) + @JsonSchemaTitle("User Prompt Template") + @JsonPropertyDescription( + "User message sent to the model. Supports ${fieldName} placeholders that will be filled from the input tuple." + ) + var userPromptTemplate: String = "${response_body}" + + @JsonProperty(required = true) + @JsonSchemaTitle("Max tokens") + @JsonPropertyDescription("Maximum tokens in the model's reply") + var maxTokens: Int = 1024 + + @JsonProperty(required = true) + @JsonSchemaTitle("Temperature") + @JsonPropertyDescription("Sampling temperature, typically 0.0 to 1.0") + var temperature: Double = 1.0 + + @JsonProperty(required = true) + @JsonSchemaTitle("Output column name") + @JsonPropertyDescription("Name of the new column that will hold the model's reply text") + var outputColumnName: String = "llm_response" + + @JsonProperty(required = true) + @JsonSchemaTitle("Timeout (seconds)") + @JsonPropertyDescription("Per-request timeout in seconds") + var timeoutSeconds: Int = 60 + + @JsonProperty(required = true) + @JsonSchemaTitle("Fail on error") + @JsonPropertyDescription( + "If true, a non-2xx response or transport error fails the workflow. If false (default), the error is recorded in `llm_error` and processing continues." + ) + var failOnError: Boolean = false + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "org.apache.texera.amber.operator.llm.LLMAgentOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withPropagateSchema(SchemaPropagationFunc(inputSchemas => { + val inputSchema = inputSchemas.values.head + val outputColName = + if (outputColumnName == null || outputColumnName.trim.isEmpty) "llm_response" + else outputColumnName + val outputSchema = inputSchema + .add(outputColName, AttributeType.STRING) + .add("llm_error", AttributeType.STRING) + Map(operatorInfo.outputPorts.head.id -> outputSchema) + })) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + userFriendlyName = "LLM Agent", + operatorDescription = + "Call an LLM (Anthropic or OpenAI) on each input tuple using a templated prompt; append the reply as a new column.", + operatorGroupName = OperatorGroupConstants.API_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/llm/LLMAgentOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/llm/LLMAgentOpExec.scala new file mode 100644 index 00000000000..1b2748ea34d --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/llm/LLMAgentOpExec.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.llm + +import com.fasterxml.jackson.databind.node.ObjectNode +import com.typesafe.scalalogging.LazyLogging +import org.apache.texera.amber.core.executor.OperatorExecutor +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} +import org.apache.texera.amber.operator.http.util.{HttpClientFactory, TemplateInterpolator} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +import java.net.URI +import java.net.http.{HttpRequest, HttpResponse} +import java.time.Duration +import scala.collection.mutable + +class LLMAgentOpExec(descString: String) extends OperatorExecutor with LazyLogging { + private val desc: LLMAgentOpDesc = + objectMapper.readValue(descString, classOf[LLMAgentOpDesc]) + + private val outputColName: String = + if (desc.outputColumnName == null || desc.outputColumnName.trim.isEmpty) "llm_response" + else desc.outputColumnName + + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { + val provider = if (desc.provider == null) LLMProvider.ANTHROPIC else desc.provider + + val (replyText, errorMessage) = try { + val apiKey = resolveApiKey(provider) + if (apiKey == null || apiKey.isEmpty) { + throw new RuntimeException( + s"No API key for $provider. Set the operator's API Key field or " + + s"the ${envVarName(provider)} environment variable on the worker JVM." + ) + } + val systemResolved = TemplateInterpolator.interpolate(desc.systemPrompt, tuple) + val userResolved = TemplateInterpolator.interpolate(desc.userPromptTemplate, tuple) + + val bodyJson = buildRequestBody(provider, systemResolved, userResolved) + + val request = buildRequest(provider, apiKey, bodyJson) + val response = HttpClientFactory.sharedClient + .send(request, HttpResponse.BodyHandlers.ofString()) + + if (response.statusCode() >= 300) { + ("", s"HTTP ${response.statusCode()}: ${response.body()}") + } else { + val parsed = extractReplyText(provider, response.body()) + (parsed, null.asInstanceOf[String]) + } + } catch { + case t: Throwable => + if (desc.failOnError) throw t + ("", s"${t.getClass.getSimpleName}: ${t.getMessage}") + } + + if (desc.failOnError && errorMessage != null) { + throw new RuntimeException(s"LLM call failed: $errorMessage") + } + + val fields = mutable.LinkedHashMap[String, Any]() + tuple.schema.getAttributeNames.foreach { name => + fields(name) = tuple.getField[Any](name) + } + fields(outputColName) = replyText + fields("llm_error") = errorMessage + Iterator(TupleLike(fields.toSeq: _*)) + } + + private def resolveApiKey(provider: LLMProvider): String = { + if (desc.apiKey != null && desc.apiKey.nonEmpty) desc.apiKey + else { + val v = System.getenv(envVarName(provider)) + if (v == null) "" else v + } + } + + private def envVarName(provider: LLMProvider): String = provider match { + case LLMProvider.ANTHROPIC => "ANTHROPIC_API_KEY" + case LLMProvider.OPENAI => "OPENAI_API_KEY" + } + + private def buildRequestBody( + provider: LLMProvider, + systemContent: String, + userContent: String + ): String = { + val root: ObjectNode = objectMapper.createObjectNode() + root.put("model", desc.model) + root.put("max_tokens", desc.maxTokens) + root.put("temperature", desc.temperature) + + provider match { + case LLMProvider.ANTHROPIC => + root.put("system", systemContent) + val messages = root.putArray("messages") + val userMsg = messages.addObject() + userMsg.put("role", "user") + userMsg.put("content", userContent) + case LLMProvider.OPENAI => + val messages = root.putArray("messages") + val sysMsg = messages.addObject() + sysMsg.put("role", "system") + sysMsg.put("content", systemContent) + val userMsg = messages.addObject() + userMsg.put("role", "user") + userMsg.put("content", userContent) + } + objectMapper.writeValueAsString(root) + } + + private def buildRequest( + provider: LLMProvider, + apiKey: String, + bodyJson: String + ): HttpRequest = { + val (url, headers) = provider match { + case LLMProvider.ANTHROPIC => + ( + "https://api.anthropic.com/v1/messages", + Seq( + "x-api-key" -> apiKey, + "anthropic-version" -> "2023-06-01", + "content-type" -> "application/json" + ) + ) + case LLMProvider.OPENAI => + ( + "https://api.openai.com/v1/chat/completions", + Seq( + "Authorization" -> s"Bearer $apiKey", + "content-type" -> "application/json" + ) + ) + } + + val builder = HttpRequest + .newBuilder() + .uri(URI.create(url)) + .timeout(Duration.ofSeconds(math.max(1, desc.timeoutSeconds).toLong)) + .POST(HttpRequest.BodyPublishers.ofString(bodyJson)) + + headers.foreach { case (k, v) => builder.header(k, v) } + builder.build() + } + + private def extractReplyText(provider: LLMProvider, responseBody: String): String = { + val root = objectMapper.readTree(responseBody) + provider match { + case LLMProvider.ANTHROPIC => + // { "content": [ { "type": "text", "text": "..." } ], ... } + val contentArr = root.path("content") + if (contentArr.isArray && contentArr.size() > 0) { + contentArr.get(0).path("text").asText("") + } else "" + case LLMProvider.OPENAI => + // { "choices": [ { "message": { "content": "..." } } ], ... } + val choices = root.path("choices") + if (choices.isArray && choices.size() > 0) { + choices.get(0).path("message").path("content").asText("") + } else "" + } + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/llm/LLMProvider.java b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/llm/LLMProvider.java new file mode 100644 index 00000000000..6b230e9da9f --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/llm/LLMProvider.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.llm; + +import com.fasterxml.jackson.annotation.JsonValue; + +public enum LLMProvider { + ANTHROPIC("Anthropic"), + OPENAI("OpenAI"); + + private final String name; + + LLMProvider(String name) { + this.name = name; + } + + @JsonValue + public String getName() { + return this.name; + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/PollingHttpSourceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/PollingHttpSourceOpDesc.scala new file mode 100644 index 00000000000..4292adbaf0b --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/PollingHttpSourceOpDesc.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.http + +import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp, SchemaPropagationFunc} +import org.apache.texera.amber.operator.http.util.{HttpMethod, KeyValuePair} +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.operator.source.SourceOperatorDescriptor +import org.apache.texera.amber.util.JSONUtils.objectMapper + +class PollingHttpSourceOpDesc extends SourceOperatorDescriptor { + + @JsonProperty(required = true) + @JsonSchemaTitle("URL") + @JsonPropertyDescription("Endpoint to poll (e.g. https://api.example.com/feed)") + var url: String = _ + + @JsonProperty(required = true) + @JsonSchemaTitle("HTTP Method") + @JsonPropertyDescription("HTTP method to use for each poll") + var method: HttpMethod = HttpMethod.GET + + @JsonProperty(required = true) + @JsonSchemaTitle("Interval (seconds)") + @JsonPropertyDescription("Seconds to wait between polls") + var intervalSeconds: Int = 30 + + @JsonProperty(required = true) + @JsonSchemaTitle("Max iterations") + @JsonPropertyDescription("0 means poll forever; positive value caps the number of polls") + var maxIterations: Int = 0 + + @JsonProperty + @JsonSchemaTitle("Headers") + @JsonPropertyDescription("Optional headers, e.g. Authorization: Bearer ") + var headers: java.util.List[KeyValuePair] = new java.util.ArrayList[KeyValuePair]() + + @JsonProperty + @JsonSchemaTitle("Request Body") + @JsonPropertyDescription("Body to send (POST/PUT/PATCH only); ignored for GET") + var requestBody: String = "" + + override def sourceSchema(): Schema = + Schema() + .add("response_body", AttributeType.STRING) + .add("status_code", AttributeType.INTEGER) + .add("polled_at", AttributeType.TIMESTAMP) + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .sourcePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "org.apache.texera.amber.operator.source.http.PollingHttpSourceOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withParallelizable(false) + .withPropagateSchema( + SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> sourceSchema())) + ) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + userFriendlyName = "Polling HTTP Source", + operatorDescription = + "Repeatedly call an HTTP endpoint at a fixed interval and emit each response as a tuple", + operatorGroupName = OperatorGroupConstants.API_GROUP, + inputPorts = List.empty, + outputPorts = List(OutputPort()) + ) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/PollingHttpSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/PollingHttpSourceOpExec.scala new file mode 100644 index 00000000000..b8e694ac9e0 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/PollingHttpSourceOpExec.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.http + +import com.typesafe.scalalogging.LazyLogging +import org.apache.texera.amber.core.executor.SourceOperatorExecutor +import org.apache.texera.amber.core.tuple.TupleLike +import org.apache.texera.amber.operator.http.util.{HttpClientFactory, HttpMethod} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +import java.net.URI +import java.net.http.{HttpRequest, HttpResponse} +import java.sql.Timestamp +import java.time.Duration + +class PollingHttpSourceOpExec(descString: String) extends SourceOperatorExecutor with LazyLogging { + private val desc: PollingHttpSourceOpDesc = + objectMapper.readValue(descString, classOf[PollingHttpSourceOpDesc]) + + logger.info( + s"[PollingHttpSource] url=${desc.url} interval=${desc.intervalSeconds}s " + + s"maxIterations=${desc.maxIterations} method=${desc.method} descRaw=$descString" + ) + + override def produceTuple(): Iterator[TupleLike] = new Iterator[TupleLike] { + private var iteration: Long = 0L + private var firstCall: Boolean = true + + override def hasNext: Boolean = + desc.maxIterations <= 0 || iteration < desc.maxIterations + + override def next(): TupleLike = { + // Sleep between polls (skip the wait on the very first iteration so the + // workflow emits its first tuple promptly). + if (firstCall) firstCall = false + else Thread.sleep(math.max(0, desc.intervalSeconds).toLong * 1000L) + iteration += 1 + poll() + } + } + + private def poll(): TupleLike = { + val requestBuilder = HttpRequest + .newBuilder() + .uri(URI.create(desc.url)) + .timeout(Duration.ofSeconds(30)) + + Option(desc.headers).foreach { hs => + hs.forEach { kv => + if (kv != null && kv.key != null && kv.value != null) { + requestBuilder.header(kv.key, kv.value) + } + } + } + + val bodyPublisher = + if (desc.requestBody != null && desc.requestBody.nonEmpty) + HttpRequest.BodyPublishers.ofString(desc.requestBody) + else HttpRequest.BodyPublishers.noBody() + + val method = if (desc.method == null) HttpMethod.GET else desc.method + method match { + case HttpMethod.GET => requestBuilder.GET() + case HttpMethod.POST => requestBuilder.POST(bodyPublisher) + case HttpMethod.PUT => requestBuilder.PUT(bodyPublisher) + case HttpMethod.PATCH => requestBuilder.method("PATCH", bodyPublisher) + case HttpMethod.DELETE => requestBuilder.DELETE() + } + + val (body, status) = + try { + val response = HttpClientFactory.sharedClient + .send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()) + (response.body(), response.statusCode()) + } catch { + case t: Throwable => (s"ERROR: ${t.getMessage}", -1) + } + + TupleLike(body, Integer.valueOf(status), new Timestamp(System.currentTimeMillis())) + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/WebSocketSourceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/WebSocketSourceOpDesc.scala new file mode 100644 index 00000000000..30fac1ab091 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/WebSocketSourceOpDesc.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.http + +import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp, SchemaPropagationFunc} +import org.apache.texera.amber.operator.http.util.KeyValuePair +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.operator.source.SourceOperatorDescriptor +import org.apache.texera.amber.util.JSONUtils.objectMapper + +class WebSocketSourceOpDesc extends SourceOperatorDescriptor { + + @JsonProperty(required = true) + @JsonSchemaTitle("WebSocket URL") + @JsonPropertyDescription("ws:// or wss:// endpoint to subscribe to") + var wsUrl: String = _ + + @JsonProperty + @JsonSchemaTitle("Subscribe Message") + @JsonPropertyDescription( + "Optional message sent to the server immediately after connecting (e.g. a JSON subscribe payload)" + ) + var subscribeMessage: String = "" + + @JsonProperty + @JsonSchemaTitle("Headers") + @JsonPropertyDescription("Optional headers sent during the websocket handshake") + var headers: java.util.List[KeyValuePair] = new java.util.ArrayList[KeyValuePair]() + + override def sourceSchema(): Schema = + Schema() + .add("message", AttributeType.STRING) + .add("received_at", AttributeType.TIMESTAMP) + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .sourcePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "org.apache.texera.amber.operator.source.http.WebSocketSourceOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withParallelizable(false) + .withPropagateSchema( + SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> sourceSchema())) + ) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + userFriendlyName = "WebSocket Source", + operatorDescription = + "Connect to a websocket endpoint and emit each received frame as a tuple", + operatorGroupName = OperatorGroupConstants.API_GROUP, + inputPorts = List.empty, + outputPorts = List(OutputPort()) + ) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/WebSocketSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/WebSocketSourceOpExec.scala new file mode 100644 index 00000000000..d192e5b857e --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/http/WebSocketSourceOpExec.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.http + +import com.typesafe.scalalogging.LazyLogging +import org.apache.texera.amber.core.executor.SourceOperatorExecutor +import org.apache.texera.amber.core.tuple.TupleLike +import org.apache.texera.amber.operator.http.util.HttpClientFactory +import org.apache.texera.amber.util.JSONUtils.objectMapper + +import java.net.URI +import java.net.http.WebSocket +import java.net.http.WebSocket.Listener +import java.sql.Timestamp +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{CompletionStage, LinkedBlockingQueue, TimeUnit} + +class WebSocketSourceOpExec(descString: String) + extends SourceOperatorExecutor + with LazyLogging { + private val desc: WebSocketSourceOpDesc = + objectMapper.readValue(descString, classOf[WebSocketSourceOpDesc]) + + // The end-of-stream marker. Reference identity is used to disambiguate from + // any legitimate message that happens to contain the same characters. + private val sentinel: String = new String("__WS_CLOSED_SENTINEL__") + private val queue = new LinkedBlockingQueue[String]() + private val framesReceived = new AtomicLong(0L) + @volatile private var webSocket: WebSocket = _ + + override def open(): Unit = { + logger.info( + s"[WebSocketSource] opening: url=${desc.wsUrl} subscribeMsgLen=${Option(desc.subscribeMessage).map(_.length).getOrElse(0)}" + ) + val builder = HttpClientFactory.sharedClient.newWebSocketBuilder() + Option(desc.headers).foreach { hs => + hs.forEach { kv => + if (kv != null && kv.key != null && kv.value != null) { + builder.header(kv.key, kv.value) + } + } + } + + val listener = new Listener { + private val partial = new StringBuilder() + + override def onOpen(ws: WebSocket): Unit = { + // Request unlimited messages up front so per-frame backpressure + // bookkeeping cannot accidentally stop delivery. + ws.request(Long.MaxValue) + logger.info("[WebSocketSource] onOpen — requesting unlimited messages") + if (desc.subscribeMessage != null && desc.subscribeMessage.nonEmpty) { + val send = desc.subscribeMessage + logger.info(s"[WebSocketSource] sending subscribe (${send.length} chars): $send") + ws.sendText(send, true) + } else { + logger.info("[WebSocketSource] no subscribe message configured") + } + } + + override def onText( + ws: WebSocket, + data: CharSequence, + last: Boolean + ): CompletionStage[_] = { + partial.append(data) + if (last) { + val msg = partial.toString + partial.clear() + val n = framesReceived.incrementAndGet() + if (n <= 5 || n % 100 == 0) { + val preview = msg.substring(0, math.min(120, msg.length)) + logger.info(s"[WebSocketSource] frame #$n (${msg.length} chars): $preview") + } + queue.offer(msg) + } + null + } + + override def onClose(ws: WebSocket, statusCode: Int, reason: String): CompletionStage[_] = { + logger.warn( + s"[WebSocketSource] onClose status=$statusCode reason='$reason' frames=${framesReceived.get()}" + ) + queue.offer(sentinel) + null + } + + override def onError(ws: WebSocket, error: Throwable): Unit = { + logger.error( + s"[WebSocketSource] onError frames=${framesReceived.get()}: ${error.getClass.getSimpleName}: ${error.getMessage}", + error + ) + queue.offer(sentinel) + } + } + + webSocket = builder + .buildAsync(toUri(desc.wsUrl), listener) + .toCompletableFuture + .get(30, TimeUnit.SECONDS) + logger.info("[WebSocketSource] handshake complete") + } + + // java.net.URI is strict about a few characters (notably `@`) that are + // legal in WebSocket URLs in practice — e.g. Binance stream names like + // `btcusdt@trade`. Percent-encode the most common offenders so users can + // paste the URL verbatim from the provider docs. + private def toUri(raw: String): URI = { + if (raw == null) { + throw new IllegalArgumentException("WebSocket URL must not be null") + } + // Defensive: strip whitespace (users frequently paste with trailing + // spaces/newlines) before percent-encoding characters that are legal in + // practice but rejected by java.net.URI (notably `@`). + val encoded = raw.trim.replace("@", "%40") + URI.create(encoded) + } + + override def produceTuple(): Iterator[TupleLike] = new Iterator[TupleLike] { + private var pending: String = _ + + override def hasNext: Boolean = { + if (pending != null && (pending ne sentinel)) return true + try { + pending = queue.take() + } catch { + case _: InterruptedException => + Thread.currentThread().interrupt() + return false + } + pending ne sentinel + } + + override def next(): TupleLike = { + if (pending == null || (pending eq sentinel)) { + throw new NoSuchElementException("WebSocket source has no more messages") + } + val msg = pending + pending = null + TupleLike(msg, new Timestamp(System.currentTimeMillis())) + } + } + + override def close(): Unit = { + Option(webSocket).foreach { ws => + try ws.sendClose(WebSocket.NORMAL_CLOSURE, "operator closed") + catch { case _: Throwable => /* ignore */ } + } + queue.offer(sentinel) + } +} diff --git a/frontend/src/app/workspace/component/result-panel/result-panel-modal.component.ts b/frontend/src/app/workspace/component/result-panel/result-panel-modal.component.ts index 278a01dd5a7..fa41b1116f5 100644 --- a/frontend/src/app/workspace/component/result-panel/result-panel-modal.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-panel-modal.component.ts @@ -48,7 +48,10 @@ export class RowModalComponent implements OnChanges { // Index of current displayed row in currentResult readonly operatorId: string = inject(NZ_MODAL_DATA).operatorId; rowIndex: number = inject(NZ_MODAL_DATA).rowIndex; - currentDisplayRowData: Record = {}; + // Row data already rendered in the table — used as immediate display and as a + // fallback if the paginated result service is unavailable or slow. + readonly fallbackRow: Record = inject(NZ_MODAL_DATA).fallbackRow ?? {}; + currentDisplayRowData: Record = this.fallbackRow; constructor( public modal: NzModalRef, @@ -59,12 +62,18 @@ export class RowModalComponent implements OnChanges { } ngOnChanges(): void { + // Show the in-table row immediately so the modal body is never blank. + this.currentDisplayRowData = this.fallbackRow; + // Then attempt to fetch the full row from the paginated result service — + // if available, overwrite with the canonical (full) tuple. this.workflowResultService .getPaginatedResultService(this.operatorId) ?.selectTuple(this.rowIndex, this.resizeService.pageSize) .pipe(untilDestroyed(this)) .subscribe(res => { - this.currentDisplayRowData = res.tuple; + if (res?.tuple && Object.keys(res.tuple).length > 0) { + this.currentDisplayRowData = res.tuple; + } }); } } diff --git a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts index 72a0dbbf72c..11dac1c862a 100644 --- a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts @@ -333,7 +333,7 @@ export class ResultTableFrameComponent implements OnInit, OnChanges { // modal title nzTitle: "Row Details", nzContent: RowModalComponent, - nzData: { operatorId: this.operatorId, rowIndex: currentRowIndex }, // set the index value and page size to the modal for navigation + nzData: { operatorId: this.operatorId, rowIndex: currentRowIndex, fallbackRow: rowData }, // also pass the row data already rendered in the table so the modal can fall back to it if the paginated lookup is unavailable // prevent browser focusing close button (ugly square highlight) nzAutofocus: null, // modal footer buttons diff --git a/frontend/src/assets/operator_images/HttpRequest.png b/frontend/src/assets/operator_images/HttpRequest.png new file mode 100644 index 00000000000..566b49ff8b6 Binary files /dev/null and b/frontend/src/assets/operator_images/HttpRequest.png differ diff --git a/frontend/src/assets/operator_images/LLMAgent.png b/frontend/src/assets/operator_images/LLMAgent.png new file mode 100644 index 00000000000..bed0b4af7c4 Binary files /dev/null and b/frontend/src/assets/operator_images/LLMAgent.png differ diff --git a/frontend/src/assets/operator_images/PollingHttpSource.png b/frontend/src/assets/operator_images/PollingHttpSource.png new file mode 100644 index 00000000000..cfd9dc7fe89 Binary files /dev/null and b/frontend/src/assets/operator_images/PollingHttpSource.png differ diff --git a/frontend/src/assets/operator_images/WebSocketSource.png b/frontend/src/assets/operator_images/WebSocketSource.png new file mode 100644 index 00000000000..0dbff88c032 Binary files /dev/null and b/frontend/src/assets/operator_images/WebSocketSource.png differ