Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ import org.apache.texera.amber.operator.source.apis.twitter.v2.{
}
import org.apache.texera.amber.operator.source.dataset.FileListerSourceOpDesc
import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc
import org.apache.texera.amber.operator.source.http.{PollingHttpSourceOpDesc, WebSocketSourceOpDesc}
import org.apache.texera.amber.operator.http.HttpRequestOpDesc
import org.apache.texera.amber.operator.llm.LLMAgentOpDesc
import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc
import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
import org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc
Expand Down Expand Up @@ -182,6 +185,10 @@ trait StateTransferFunc
value = classOf[TwitterSearchSourceOpDesc],
name = "TwitterSearch"
),
new Type(value = classOf[PollingHttpSourceOpDesc], name = "PollingHttpSource"),
new Type(value = classOf[WebSocketSourceOpDesc], name = "WebSocketSource"),
new Type(value = classOf[HttpRequestOpDesc], name = "HttpRequest"),
new Type(value = classOf[LLMAgentOpDesc], name = "LLMAgent"),
new Type(value = classOf[ChoroplethMapOpDesc], name = "ChoroplethMap"),
new Type(value = classOf[TimeSeriesOpDesc], name = "TimeSeriesPlot"),
new Type(value = classOf[CandlestickChartOpDesc], name = "CandlestickChart"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.operator.http

import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import org.apache.texera.amber.core.executor.OpExecWithClassName
import org.apache.texera.amber.core.tuple.AttributeType
import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp, SchemaPropagationFunc}
import org.apache.texera.amber.operator.LogicalOp
import org.apache.texera.amber.operator.http.util.{HttpMethod, KeyValuePair}
import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
import org.apache.texera.amber.util.JSONUtils.objectMapper

class HttpRequestOpDesc extends LogicalOp {

@JsonProperty(required = true)
@JsonSchemaTitle("URL")
@JsonPropertyDescription(
"Target URL. Supports ${fieldName} placeholders that will be filled from the input tuple."
)
var url: String = _

@JsonProperty(required = true)
@JsonSchemaTitle("HTTP Method")
@JsonPropertyDescription("HTTP method to use for each request")
var method: HttpMethod = HttpMethod.POST

@JsonProperty
@JsonSchemaTitle("Headers")
@JsonPropertyDescription("Optional headers, e.g. Authorization: Bearer <token>")
var headers: java.util.List[KeyValuePair] = new java.util.ArrayList[KeyValuePair]()

@JsonProperty
@JsonSchemaTitle("Request Body Template")
@JsonPropertyDescription(
"Body sent with each request. Supports ${fieldName} placeholders from the input tuple. Ignored for GET."
)
var bodyTemplate: String = ""

@JsonProperty(required = true)
@JsonSchemaTitle("Timeout (seconds)")
@JsonPropertyDescription("Per-request timeout in seconds")
var timeoutSeconds: Int = 10

@JsonProperty(required = true)
@JsonSchemaTitle("Fail on error")
@JsonPropertyDescription(
"If true, a non-2xx response or transport error fails the workflow. " +
"If false (default), the error is recorded in the output tuple and processing continues."
)
var failOnError: Boolean = false

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithClassName(
"org.apache.texera.amber.operator.http.HttpRequestOpExec",
objectMapper.writeValueAsString(this)
)
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
.withPropagateSchema(SchemaPropagationFunc(inputSchemas => {
val inputSchema = inputSchemas.values.head
val outputSchema = inputSchema
.add("http_request_status", AttributeType.INTEGER)
.add("http_request_body", AttributeType.STRING)
.add("http_request_error", AttributeType.STRING)
Map(operatorInfo.outputPorts.head.id -> outputSchema)
}))
}

override def operatorInfo: OperatorInfo =
OperatorInfo(
userFriendlyName = "HTTP Request",
operatorDescription =
"For each input tuple, make an HTTP request (URL/body support ${fieldName} interpolation) and append the response.",
operatorGroupName = OperatorGroupConstants.API_GROUP,
inputPorts = List(InputPort()),
outputPorts = List(OutputPort())
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.operator.http

import org.apache.texera.amber.core.executor.OperatorExecutor
import org.apache.texera.amber.core.tuple.{Tuple, TupleLike}
import org.apache.texera.amber.operator.http.util.{HttpClientFactory, HttpMethod, TemplateInterpolator}
import org.apache.texera.amber.util.JSONUtils.objectMapper

import java.net.URI
import java.net.http.{HttpRequest, HttpResponse}
import java.time.Duration
import scala.collection.mutable

class HttpRequestOpExec(descString: String) extends OperatorExecutor {
private val desc: HttpRequestOpDesc =
objectMapper.readValue(descString, classOf[HttpRequestOpDesc])

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
val resolvedUrl = TemplateInterpolator.interpolate(desc.url, tuple)
val resolvedBody = TemplateInterpolator.interpolate(desc.bodyTemplate, tuple)

val (status, body, error) = try {
val builder = HttpRequest
.newBuilder()
.uri(URI.create(resolvedUrl))
.timeout(Duration.ofSeconds(math.max(1, desc.timeoutSeconds).toLong))

Option(desc.headers).foreach { hs =>
hs.forEach { kv =>
if (kv != null && kv.key != null && kv.value != null) {
builder.header(kv.key, kv.value)
}
}
}

val bodyPublisher =
if (resolvedBody != null && resolvedBody.nonEmpty)
HttpRequest.BodyPublishers.ofString(resolvedBody)
else HttpRequest.BodyPublishers.noBody()

val method = if (desc.method == null) HttpMethod.POST else desc.method
method match {
case HttpMethod.GET => builder.GET()
case HttpMethod.POST => builder.POST(bodyPublisher)
case HttpMethod.PUT => builder.PUT(bodyPublisher)
case HttpMethod.PATCH => builder.method("PATCH", bodyPublisher)
case HttpMethod.DELETE => builder.DELETE()
}

val response = HttpClientFactory.sharedClient
.send(builder.build(), HttpResponse.BodyHandlers.ofString())
val code = response.statusCode()
val errOpt =
if (code >= 200 && code < 300) null
else s"HTTP $code"
(Integer.valueOf(code), response.body(), errOpt)
} catch {
case t: Throwable =>
if (desc.failOnError) throw t
(Integer.valueOf(-1), "", s"${t.getClass.getSimpleName}: ${t.getMessage}")
}

if (desc.failOnError && error != null) {
throw new RuntimeException(s"HTTP request failed: $error (body=$body)")
}

val fields = mutable.LinkedHashMap[String, Any]()
tuple.schema.getAttributeNames.foreach { name =>
fields(name) = tuple.getField[Any](name)
}
fields("http_request_status") = status
fields("http_request_body") = body
fields("http_request_error") = error
Iterator(TupleLike(fields.toSeq: _*))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.operator.http.util

import java.net.http.HttpClient
import java.time.Duration

object HttpClientFactory {
lazy val sharedClient: HttpClient = HttpClient
.newBuilder()
.connectTimeout(Duration.ofSeconds(15))
.followRedirects(HttpClient.Redirect.NORMAL)
.build()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.operator.http.util;

import com.fasterxml.jackson.annotation.JsonValue;

public enum HttpMethod {
GET("GET"),
POST("POST"),
PUT("PUT"),
PATCH("PATCH"),
DELETE("DELETE");

private final String name;

HttpMethod(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return this.name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.operator.http.util

import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty}
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle

class KeyValuePair(
@JsonProperty("key") @JsonSchemaTitle("Key") val key: String,
@JsonProperty("value") @JsonSchemaTitle("Value") val value: String
) {
@JsonCreator
def this() = this(null, null)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.operator.http.util

import org.apache.texera.amber.core.tuple.Tuple

import scala.util.matching.Regex

object TemplateInterpolator {
private val placeholder: Regex = """\$\{([A-Za-z0-9_\.\- ]+)\}""".r

// Replace ${fieldName} occurrences in `template` with the matching field's string value
// from `tuple`. Unknown fields are replaced with an empty string.
def interpolate(template: String, tuple: Tuple): String = {
if (template == null) return ""
placeholder.replaceAllIn(
template,
m => {
val name = m.group(1)
val value =
try {
val raw = tuple.getField[Any](name)
if (raw == null) "" else raw.toString
} catch {
case _: Throwable => ""
}
Regex.quoteReplacement(value)
}
)
}
}
Loading
Loading