Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions agent-service/src/agent/prompts.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { describe, expect, test } from "bun:test";
import { buildSystemPrompt } from "./prompts";
import { WorkflowSystemMetadata } from "./util/workflow-system-metadata";

describe("buildSystemPrompt", () => {
test("includes both operator type and display name", () => {
const metadata = new WorkflowSystemMetadata();
metadata.loadFromMetadata({
operators: [
{
operatorType: "SmartFileScan",
operatorVersion: "1",
jsonSchema: { properties: { fileName: { type: "string" } }, required: ["fileName"] },
additionalMetadata: {
userFriendlyName: "Smart Source",
operatorGroupName: "Data Input",
operatorDescription: "Auto-detects files and folders.",
inputPorts: [],
outputPorts: [{}],
},
},
],
groups: [],
});

const prompt = buildSystemPrompt(metadata, ["SmartFileScan"]);

expect(prompt).toContain("## SmartFileScan");
expect(prompt).toContain("Display name: Smart Source");
expect(prompt).toContain("Description: Auto-detects files and folders.");
});
});
2 changes: 2 additions & 0 deletions agent-service/src/agent/prompts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,12 @@ function buildAllowedOperatorSchemas(
for (const operatorType of operatorTypes) {
const compactSchema = metadataStore.getCompactSchema(operatorType);
const description = metadataStore.getDescription(operatorType);
const displayName = metadataStore.getAdditionalMetadata(operatorType)?.userFriendlyName;

if (compactSchema) {
schemas.push(
`## ${operatorType}\n` +
(displayName ? `Display name: ${displayName}\n` : "") +
(description ? `Description: ${description}\n` : "") +
`Schema:\n\`\`\`json\n${JSON.stringify(compactSchema, null, 2)}\n\`\`\``
);
Expand Down
27 changes: 27 additions & 0 deletions agent-service/src/types/agent.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { describe, expect, test } from "bun:test";
import { DEFAULT_AGENT_SETTINGS } from "./agent";

describe("DEFAULT_AGENT_SETTINGS", () => {
test("allows the smart source operator by default", () => {
expect(DEFAULT_AGENT_SETTINGS.allowedOperatorTypes).toContain("SmartFileScan");
});
});
1 change: 1 addition & 0 deletions agent-service/src/types/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export const DEFAULT_AGENT_SETTINGS: Omit<AgentSettings, "systemPrompt"> = {
executionTimeoutMs: 240000,
maxSteps: 100,
allowedOperatorTypes: [
"SmartFileScan",
"CSVFileScan",
"Filter",
"Projection",
Expand Down
4 changes: 4 additions & 0 deletions amber/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ libraryDependencies ++= Seq(
// For ScalaPB 0.11.x:
libraryDependencies += "com.thesamet.scalapb" %% "scalapb-json4s" % "0.12.0"

// Used by LLMSourceResource to extract text from PDF samples before prompting the LLM.
// Without this the LLM only sees raw PDF bytes and produces generic catch-all schemas.
libraryDependencies += "org.apache.pdfbox" % "pdfbox" % "3.0.3"

// enable protobuf compilation in Test
Test / PB.protoSources += PB.externalSourcePath.value

Expand Down
9 changes: 9 additions & 0 deletions amber/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,12 @@ SQLAlchemy==2.0.37
pg8000==1.31.5
pympler==1.1
boto3==1.40.53
# Libraries the LLM File Source operator's generated parsers commonly reach for.
# Without these the worker fails with "No module named 'pdfplumber'" the moment
# the LLM emits PDF / HTML / Excel-handling code.
pdfplumber==0.11.9
pypdf==6.11.0
openpyxl==3.1.5
lxml==5.3.0
beautifulsoup4==4.13.4
chardet==5.2.0
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ import org.apache.texera.amber.engine.common.ambermessage._
import org.apache.texera.amber.engine.common.{CheckpointState, Utils}
import org.apache.texera.amber.config.PythonUtils

import java.nio.file.Path
import java.io.{FileOutputStream, PrintStream}
import java.nio.file.{Files, Path, Paths}
import java.util.concurrent.{ExecutorService, Executors}
import scala.sys.process.{BasicIO, Process}
import scala.sys.process.{Process, ProcessLogger}

object PythonWorkflowWorker {
def props(workerConfig: WorkerConfig): Props = Props(new PythonWorkflowWorker(workerConfig))
Expand Down Expand Up @@ -171,6 +172,16 @@ class PythonWorkflowWorker(
// Set the Iceberg related arguments based on the catalog type.
val isPostgres = StorageConfig.icebergCatalogType == "postgres"
val isRest = StorageConfig.icebergCatalogType == "rest"
// Redirect the Python subprocess's stdout/stderr to a per-worker log file so Python-side
// exceptions (e.g., from user UDFs or generated LLM parsers) are recoverable. Previously
// these went to the JVM's own stdout/stderr, which deploy-daemon.sh redirects to /dev/null,
// making any Python crash invisible from disk.
val workerLogPath = pythonWorkerLogPath(workerConfig.workerId.name)
val workerLog = new PrintStream(new FileOutputStream(workerLogPath.toFile, true), true)
val logger = ProcessLogger(
line => workerLog.println(line),
line => workerLog.println(line)
)
pythonServerProcess = Process(
Seq(
PythonUtils.getPythonExecutable,
Expand All @@ -194,7 +205,16 @@ class PythonWorkflowWorker(
StorageConfig.s3Username,
StorageConfig.s3Password
)
).run(BasicIO.standard(false))
).run(logger)
}

/** Choose a stable on-disk path for this worker's stdout/stderr capture. */
private def pythonWorkerLogPath(workerId: String): Path = {
val logsDir = Paths.get("logs", "python-workers")
Files.createDirectories(logsDir)
// Sanitize worker IDs (they contain '/' separators that would create subdirs).
val safe = workerId.replace('/', '_').replace(':', '_')
logsDir.resolve(s"$safe.log")
}

override def loadFromCheckpoint(chkpt: CheckpointState): Unit = ???
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class TexeraWebApplication
environment.servlets.setSessionHandler(new SessionHandler)

environment.jersey.register(classOf[SystemMetadataResource])
environment.jersey.register(classOf[SmartFileInferenceResource])
environment.jersey.register(classOf[LLMSourceResource])
// environment.jersey().register(classOf[MockKillWorkerResource])

environment.jersey.register(classOf[HealthCheckResource])
Expand Down
Loading
Loading