Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ merge.
| Python | 3.12 |
| Node | 24 |

JDK 17 is required, not just recommended. The `sbt-jacoco 3.5.0` plugin
([`project/plugins.sbt`](project/plugins.sbt)) ships JaCoCo 0.8.11, which
cannot instrument class files compiled to newer bytecode versions — under
JDK 21+ `sbt test` fails during the instrumentation pass with
`java.io.IOException: Error while instrumenting <Class>.class with JaCoCo`,
before any test runs. Point `JAVA_HOME` at a Temurin 17 install for sbt
invocations until the plugin is upgraded.

One Python venv shared across worktrees, sibling of the texera checkout:

```
Expand All @@ -90,6 +98,21 @@ in [`udf.conf`](common/config/src/main/resources/udf.conf) or
`export UDF_PYTHON_PATH="$(pwd)/../venv312/bin/python"` (env var overrides).
Without it, `sbt` Python-integration tests fail to launch a worker.

Backend services that touch datasets (`FileService`,
`WorkflowComputingUnitManagingService`, anything calling
`S3StorageClient`/`LakeFSStorageClient`) need MinIO + LakeFS running locally.
Bring them up with:

```bash
docker compose -f file-service/src/main/resources/docker-compose.yml up -d minio lakefs
```

MinIO listens on `:9000`, LakeFS on `:8000`; both are wired to the
defaults in [`storage.conf`](common/config/src/main/resources/storage.conf).
Without these, startup fails at
[`S3StorageClient.createBucketIfNotExist`](file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala)
with `Connection refused` against `localhost:9000`.

[`.jvmopts`](.jvmopts) holds every `--add-opens` flag Texera needs for
JDK 17+, with each group annotated by its upstream source (Kryo,
Apache Arrow, Apache Pekko). sbt's launcher and the [`.run/`](.run)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.texera.service.resource.{
AccessControlResource,
HealthCheckResource,
LiteLLMModelsResource,
LiteLLMProxyResource
LiteLLMProxyResource,
OpenRouterModelsResource
}
import org.eclipse.jetty.server.session.SessionHandler
import java.nio.file.Path
Expand Down Expand Up @@ -69,6 +70,7 @@ class AccessControlService extends Application[AccessControlServiceConfiguration
environment.jersey.register(classOf[AccessControlResource])
environment.jersey.register(classOf[LiteLLMProxyResource])
environment.jersey.register(classOf[LiteLLMModelsResource])
environment.jersey.register(new OpenRouterModelsResource)

// Register JWT authentication filter
environment.jersey.register(new AuthDynamicFeature(classOf[JwtAuthFilter]))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.texera.service.resource

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.scalalogging.LazyLogging
import jakarta.ws.rs.core.{MediaType, Response}
import jakarta.ws.rs.{GET, Path, Produces}
import org.apache.texera.config.{GuiConfig, LLMConfig}

import java.net.URI
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import java.nio.charset.StandardCharsets
import java.time.{Clock, Duration}
import scala.jdk.CollectionConverters.IteratorHasAsScala

case class OpenRouterModelSummary(
id: String,
name: String,
contextLength: Option[Long],
pricing: Map[String, String]
)

case class OpenRouterModelsResponse(
data: Seq[OpenRouterModelSummary],
cachedAtEpochMillis: Long,
expiresAtEpochMillis: Long,
stale: Boolean,
error: Option[String] = None
)

case class OpenRouterModelsError(error: String)

object OpenRouterModelsResource {
private val mapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
private val openRouterModelsUri =
URI.create("https://openrouter.ai/api/v1/models?output_modalities=text")
private val client = HttpClient
.newBuilder()
.connectTimeout(Duration.ofSeconds(3))
.build()

def buildOpenRouterModelsRequest(openRouterApiKey: Option[String]): HttpRequest = {
val builder = HttpRequest
.newBuilder(openRouterModelsUri)
.timeout(Duration.ofSeconds(5))
.header("Accept", "application/json")
.header("User-Agent", "Apache-Texera")

openRouterApiKey
.map(_.trim)
.filter(_.nonEmpty)
.foreach(apiKey => builder.header("Authorization", s"Bearer $apiKey"))

builder.GET().build()
}

def fetchOpenRouterModelsJson(
openRouterApiKey: Option[String] = LLMConfig.openRouterApiKey
): String = {
val request = buildOpenRouterModelsRequest(openRouterApiKey)

val response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8))
if (response.statusCode() / 100 != 2) {
throw new RuntimeException(s"OpenRouter returned HTTP ${response.statusCode()}")
}
response.body()
}

def summarizeModels(rawJson: String): Seq[OpenRouterModelSummary] = {
val root = mapper.readTree(rawJson)
val data = root.path("data")
if (!data.isArray) {
throw new IllegalArgumentException("OpenRouter response does not contain a data array")
}

data.elements().asScala.flatMap { model =>
for {
id <- nonEmptyText(model, "id")
name <- nonEmptyText(model, "name")
} yield OpenRouterModelSummary(
id = id,
name = name,
contextLength = longValue(model, "context_length"),
pricing = pricing(model.path("pricing"))
)
}.toSeq
}

private def nonEmptyText(node: JsonNode, fieldName: String): Option[String] =
Option(node.get(fieldName))
.filterNot(n => n.isNull || n.isMissingNode)
.map(_.asText().trim)
.filter(_.nonEmpty)

private def longValue(node: JsonNode, fieldName: String): Option[Long] =
Option(node.get(fieldName))
.filter(n => n.isNumber)
.map(_.asLong())

private def pricing(node: JsonNode): Map[String, String] =
if (node == null || !node.isObject) {
Map.empty
} else {
node
.fields()
.asScala
.filterNot(entry => entry.getValue.isNull || entry.getValue.isMissingNode)
.map(entry => entry.getKey -> entry.getValue.asText())
.toMap
}
}

@Path("/models/openrouter")
@Produces(Array(MediaType.APPLICATION_JSON))
class OpenRouterModelsResource(
fetchModelsJson: () => String = () => OpenRouterModelsResource.fetchOpenRouterModelsJson(),
clock: Clock = Clock.systemUTC(),
cacheTtl: Duration = Duration.ofHours(1),
staleFailureRetryTtl: Duration = Duration.ofMinutes(5),
isCopilotEnabled: () => Boolean = () => GuiConfig.guiWorkflowWorkspaceCopilotEnabled
) extends LazyLogging {

private var cachedResponse: Option[OpenRouterModelsResponse] = None

@GET
def getOpenRouterModels: Response = synchronized {
if (!isCopilotEnabled()) {
return Response
.status(Response.Status.FORBIDDEN)
.entity("""{"error": "Copilot feature is disabled"}""")
.build()
}

val now = clock.millis()
cachedResponse.filter(_.expiresAtEpochMillis > now) match {
case Some(cached) => Response.ok(cached).build()
case None => refresh(now)
}
}

private def refresh(now: Long): Response =
try {
val models = OpenRouterModelsResource.summarizeModels(fetchModelsJson())
val response = OpenRouterModelsResponse(
data = models,
cachedAtEpochMillis = now,
expiresAtEpochMillis = now + cacheTtl.toMillis,
stale = false
)
cachedResponse = Some(response)
Response.ok(response).build()
} catch {
case e: Exception =>
logger.warn(s"Failed to fetch OpenRouter models: ${e.getMessage}", e)
cachedResponse match {
case Some(cached) =>
val staleResponse = cached.copy(
expiresAtEpochMillis = now + staleFailureRetryTtl.toMillis,
stale = true,
error = Some(e.getMessage)
)
cachedResponse = Some(staleResponse)
Response.ok(staleResponse).build()
case None =>
Response
.status(Response.Status.SERVICE_UNAVAILABLE)
.entity(OpenRouterModelsError(s"Failed to fetch OpenRouter models: ${e.getMessage}"))
.build()
}
}
}
Loading
Loading