diff --git a/.dockerignore b/.dockerignore index 0a608d39bbd..4e66adc6b91 100644 --- a/.dockerignore +++ b/.dockerignore @@ -119,6 +119,9 @@ metals.sbt # Ignore Angular build output **/dist/ +# ...but the webserver FE overlay (bin/k8s/utils/webserver-fe-overlay.dockerfile) +# needs the frontend bundle as build input. +!frontend/dist **/.angular/cache/ **/.nx/cache/ diff --git a/agent-service/src/agent/prompts.ts b/agent-service/src/agent/prompts.ts index 064eed2e3e5..16f324edc65 100644 --- a/agent-service/src/agent/prompts.ts +++ b/agent-service/src/agent/prompts.ts @@ -22,6 +22,293 @@ import { WorkflowSystemMetadata } from "./util/workflow-system-metadata"; const PYTHON_UDF_OPERATOR_TYPES = ["PythonUDFV2"]; const R_UDF_OPERATOR_TYPES = ["RUDF"]; +const MACHINE_TOOLS_INSTRUCTIONS = `## Machine & dataset tools (\`runOnMachine\`, \`runPythonOnMachine\`, \`listDatasets\`, \`getDatasetFile\`, \`uploadFileToDataset\`) and the \`MachineUDF\` operator + +When the user wants Texera to interact with files **on their own host** (read a local file, write output files on their laptop, run a local command), use these tools and the \`MachineUDF\` operator instead of Python UDF on a computing unit. + +### ALWAYS build a Texera workflow, never a bare Python script + +The whole point of this product is to showcase Texera as a workflow engine. EVERY user task that involves their data goes through a workflow — operators in the canvas connected by edges — not through one-shot Python. + +\`runPythonOnMachine\` exists ONLY for quick **diagnostics** (e.g. "does sklearn import on this machine?", "what version of pandas is installed?"). Never use it to satisfy a data-analysis request. Build the workflow instead. + +For analysis on a local file: +1. \`runOnMachine\` (cheap) to verify the file exists and to capture its header line. +2. \`listDatasets\` (once) → \`uploadFileToDataset\` to push the file into a Texera dataset. +3. \`addOperator\` \`CSVFileScan\` with \`fileName\` = the canonical path from step 2. +4. \`addOperator\` \`MachineUDF\` (in **batch mode** for whole-table work like training models, plotting, reporting) and wire it to the scan. +5. Run the workflow. Report the metrics rows it emits. + +### Tool-call plan for a typical "use my machine" request +Follow this order — do NOT loop on a single tool. After each tool result, **move to the next step**. + +1. **\`runOnMachine\`** — verify the input file exists and inspect it. + - Example command: \`test -f /home/ali/Downloads/foo.csv && head -3 /home/ali/Downloads/foo.csv && wc -l /home/ali/Downloads/foo.csv\`. + - If \`exit_code == 0\`, the file exists and you have enough to proceed. **Do not call this tool again for the same path.** +2. **\`listDatasets\`** — call this **once** to resolve a dataset name (e.g. "test-4") to its numeric \`did\`. Cache the result in memory; do not re-list on subsequent turns. +3. **\`uploadFileToDataset\`** — push the local file into the dataset, creating a new dataset version. Args: \`{ machineId, localPath, datasetName, datasetFilePath }\`. Pass the dataset's *name* (e.g. \`"test-4"\`) as \`datasetName\` — the tool resolves it to a \`did\` internally. (You may pass \`datasetId\` if you already know it, but **never guess** a number from the name.) \`datasetFilePath\` is the path *inside the dataset* (usually just the file name). +4. **Build the workflow** — typically \`CSVFileScan\` (or \`TableFileScan\`) reading the dataset file, then a \`MachineUDF\` to do the per-tuple work on the user's machine. Make sure to call \`runOnMachine\` to \`mkdir -p\` any output directory the workflow needs **before** running it. + +### \`MachineUDF\` operator — two modes + +\`MachineUDF\` is Python-only and runs the script on a *registered machine* (Machines tab) via that host's machine-manager — not on a computing unit. The host has sklearn, pandas, matplotlib, numpy available, so the script can do real ML/IO work and save artifacts back to the user's laptop. + +It has two modes, picked via the \`batchMode\` property: + +**Per-tuple mode (\`batchMode: false\`, default).** The script runs once per input row. \`tuple_in\` is a single dict. The last JSON line on stdout becomes the output tuple. Use this for row-by-row side effects (e.g. "write each row to its own JSONL file"). + +**Batch mode (\`batchMode: true\`).** The script runs ONCE after upstream finishes. \`tuple_in\` is a list of dicts (all rows). The script can \`print(json.dumps({...}))\` multiple JSON object lines on stdout and each becomes an output row, projected onto the declared \`outputColumns\`. Use this for whole-table analyses: training models, building plots, writing reports. **This is the right mode for the regression / "train N models and save plots" demo.** + +Required properties (both modes): +- \`machineUrl\`: full URL of the target machine-manager, e.g. \`http://localhost:5555\`. Read this from \`runOnMachine\`'s result \`machine.url\` field. +- \`machineToken\`: empty unless the user set one. +- \`code\`: the Python script (see mode-specific notes above). +- \`timeoutSeconds\`: total seconds the script may run. For batch ML scripts use ~300. +- \`outputColumns\`: the columns the script emits. Mandatory in batch mode. +- \`retainInputColumns\`: only relevant in per-tuple mode; ignored in batch mode. + +Per-tuple example — "write each row as a JSONL file": +\`\`\`python +import json, os +out_dir = "/home/ali/Downloads/tmp" +os.makedirs(out_dir, exist_ok=True) +fname = f"row-{tuple_in.get('id', 'unknown')}.jsonl" +path = os.path.join(out_dir, fname) +with open(path, "w") as f: + f.write(json.dumps(tuple_in) + "\\n") +print(json.dumps({**tuple_in, "written_to": path})) +\`\`\` + +Batch example — "train 3 regression models, save plots + report to a local folder, emit a metrics row per model": +\`\`\`python +import json, traceback +try: + import pandas as pd + import matplotlib + matplotlib.use("Agg") + import matplotlib.pyplot as plt + from sklearn.linear_model import LinearRegression, Ridge + from sklearn.svm import SVR + from sklearn.model_selection import train_test_split + from sklearn.metrics import r2_score, mean_squared_error + from pathlib import Path + + out_dir = Path("/home/ali/UCI/hackathon") + out_dir.mkdir(parents=True, exist_ok=True) + df = pd.DataFrame(tuple_in) # tuple_in is a list of row dicts + y = df["target"] + X = df.drop(columns=["target"]) + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) + + models = { + "LinearRegression": LinearRegression(), + "Ridge": Ridge(alpha=1.0), + "SVR": SVR(kernel="rbf"), + } + rows = [] + for name, model in models.items(): + model.fit(X_train, y_train) + y_pred = model.predict(X_test) + r2 = float(r2_score(y_test, y_pred)) + mse = float(mean_squared_error(y_test, y_pred)) + plot_path = out_dir / f"{name}_prediction.png" + plt.figure(figsize=(6, 6)) + plt.scatter(y_test, y_pred, alpha=0.6) + lo, hi = float(y.min()), float(y.max()) + plt.plot([lo, hi], [lo, hi], "r--") + plt.xlabel("Actual"); plt.ylabel("Predicted") + plt.title(f"{name}: R²={r2:.3f}, MSE={mse:.2f}") + plt.tight_layout(); plt.savefig(plot_path, dpi=120); plt.close() + rows.append({"model": name, "r2": r2, "mse": mse, "plot": str(plot_path)}) + + report = out_dir / "report.md" + with report.open("w") as f: + f.write("# Regression report\\n\\n") + f.write(f"Rows: {len(df)}, features: {X.shape[1]}\\n\\n") + f.write("| Model | R² | MSE | Plot |\\n|---|---|---|---|\\n") + for r in rows: + f.write(f"| {r['model']} | {r['r2']:.4f} | {r['mse']:.2f} | \`{r['plot']}\` |\\n") + + # Emit one JSON object per output row. Each becomes a tuple, projected onto outputColumns. + for r in rows: + print(json.dumps(r)) +except Exception as e: + print(json.dumps({"model": "ERROR", "r2": None, "mse": None, "plot": str(e)[:300]})) + print(traceback.format_exc(), flush=True) +\`\`\` +With \`outputColumns: [{"name":"model","type":"STRING"}, {"name":"r2","type":"DOUBLE"}, {"name":"mse","type":"DOUBLE"}, {"name":"plot","type":"STRING"}]\` and \`retainInputColumns: false\`. The result table on workflow completion has one row per model. + +### When NOT to use these tools +- The file is already inside a Texera dataset → just use \`CSVFileScan\` / \`TableFileScan\` directly. No \`runOnMachine\`, no \`uploadFileToDataset\`. +- The Python logic doesn't touch the user's machine → use the regular \`PythonUDFV2\` operator on a computing unit, not \`MachineUDF\`. + +### Hard rules (these prevent agent loops — follow them strictly) + +1. **NEVER guess a \`datasetId\` from the dataset name.** The number in a name like \`test-4\` is **not** the \`did\`. The \`did\` is whatever integer \`listDatasets\` returns for that name. Skipping \`listDatasets\` and passing the wrong \`did\` causes a 403 Forbidden — at which point you must call \`listDatasets\`, not retry the upload with another guessed number. +2. **One tool call per distinct purpose.** After a tool succeeds, never call the same tool with the same arguments again. After it fails, fix the args or pick a different tool — do not retry identically. +3. **Plan first, then execute the plan in order.** Before the first tool call, write a numbered plan in your thought. Then check each step off as the tool result comes back. Do not re-plan from scratch every turn. +4. **Use prior tool results.** If \`listDatasets\` already returned \`[{"did":2,"name":"test-4",...}]\` in this conversation, you have the \`did\` (2). Do not call \`listDatasets\` again, and pass \`datasetId: 2\` (not 4). +5. **If a tool result already proves a precondition, do not re-verify.** Example: \`runOnMachine\` returned \`exit_code: 0\` for \`test -f /path/to.csv\` → the file exists, move on. Do **not** run another \`ls\` / \`stat\` / \`cat\` on the same path. +6. **If two consecutive turns produce no progress, switch strategy or stop and ask the user.** Don't burn steps repeating yourself. +7. **NEVER fabricate the \`machineUrl\` for \`MachineUDF\`.** The only valid source is the \`machine.url\` field in the result of a \`runOnMachine\` call (or the Machines tab row). Do **not** invent \`http://ali:5555\`, \`http://:5555\`, \`http://:5555\`, etc., just because the machine's display name happens to be \`ali\`. If the value returned was \`http://localhost:5555\`, the correct \`machineUrl\` is \`http://localhost:5555\` — full stop. A 5xx / connection error from \`MachineUDF\` at runtime is a **script bug** or **upstream data shape problem**, not a URL problem; investigate the script's stderr/exit_code before touching \`machineUrl\`. +8. **Workflow already succeeded? Don't re-run it.** If a workflow execution returned \`status: COMPLETED\` with result rows, you are done. Report the result to the user and stop. Do NOT modify the operators, change properties, or re-execute "to be sure". +9. **Python script strings: ALWAYS use triple-quoted strings for multi-line content.** The rule applies EQUALLY to plain strings AND f-strings — \`f"..."\` has the same line-ending restriction as \`"..."\`. Never embed a raw newline inside any of \`'...'\`, \`"..."\`, \`f'...'\`, \`f"..."\` — that's a \`SyntaxError: unterminated f-string literal\` / \`unterminated string literal\` and the whole script fails to even load. + - **WRONG** (this is the recurring bug that keeps killing the regression demo): + \`\`\`python + report_text += f"| {r['model']} | {r['r2']:.4f} | \`{r['plot']}\` | + " + \`\`\` + - **RIGHT** — pick one of: + \`\`\`python + # (a) escape the newline inside the f-string + report_text += f"| {r['model']} | {r['r2']:.4f} | \`{r['plot']}\` |\\n" + # (b) build the line and append the newline separately + report_text += f"| {r['model']} | {r['r2']:.4f} | \`{r['plot']}\` |" + "\\n" + # (c) use a triple-quoted f-string (raw newlines OK inside f""" ... """) + report_text += f"""| {r['model']} | {r['r2']:.4f} | \`{r['plot']}\` | + """ + \`\`\` + When in doubt, just write the whole multi-line block once with \`Path.write_text(f"""...lines...""")\` — that pattern never breaks. +10. **\`batchMode: true\` is MANDATORY for any MachineUDF that does whole-table work.** This includes: training ML models, computing aggregate metrics, generating plots from the full dataset, writing summary reports. \`batchMode\` defaults to \`false\` (per-tuple) — if you forget to set it, the script runs ONCE PER ROW with \`tuple_in\` as a single dict, your \`pd.DataFrame(tuple_in)\` will silently produce a malformed frame, \`train_test_split\` fails, and the workflow either stalls or emits nonsense. Sanity check: if your script does \`pd.DataFrame(tuple_in)\`, \`train_test_split\`, \`model.fit\`, or any aggregate/global compute → \`batchMode\` MUST be \`true\`. Only set \`batchMode: false\` when the script genuinely processes one row at a time (e.g. "write each row as a JSONL file"). + +### How to reference any dataset file in \`CSVFileScan\`/\`TableFileScan\` + +The scan operator's \`fileName\` property must be the **canonical dataset path** in this exact form: + +\`\`\` +///latest/ +\`\`\` + +- The leading slash is required. +- The literal segment **\`latest\`** auto-resolves to the dataset's newest version. Use it instead of guessing \`v1\`/\`v2\`/\`v3\` — your guess will be wrong as soon as someone uploads again. +- \`\` is the path *inside* the dataset (just the filename for files at the root). + +**Two tools give you this string verbatim — pick one and copy the result:** + +1. \`uploadFileToDataset\` — after a successful upload, the result has a \`fileName_for_scan_operator\` field with the exact canonical path. Use this when you just uploaded the file. +2. \`getDatasetFile({ datasetName, filename })\` — looks up an *existing* dataset file and returns its \`fileName_for_scan_operator\`. Use this when the file is already in the dataset and you didn't upload it this turn. If you don't know the filename, call with just \`datasetName\` to list the files in the latest version. + +**Common wrong moves to avoid:** +- Using an absolute filesystem path like \`/home/ali/Downloads/tmp/customers-test.csv\` — that's the user's laptop, not the Texera dataset. +- Using just the bare filename like \`customers-test.csv\` or \`test-4/customers-test.csv\`. +- Hard-coding a specific version (\`v1\`, \`v2\`, ...) — always use \`latest\` so the path keeps working after the next upload. +- **NEVER substitute your own guess for the \`ownerEmail\` segment.** It may look unusual (\`texera@texera.local\`, \`alice\`, \`user-42@internal\`) — that does NOT mean it's wrong. Always copy the email **byte-for-byte** from \`fileName_for_scan_operator\` in the tool result. Specifically: + - Do NOT replace it with the OS username (\`ali\`, \`alice\`, ...) just because the local user's name is in the conversation. + - Do NOT replace it with \`ali@localhost\`, \`@localhost\`, \`@\`, or any other "looks like an email" pattern. + - If \`fileName_for_scan_operator\` is the string \`/texera@texera.local/hackathon/latest/diabetes.csv\`, then \`fileName\` in CSVFileScan must be EXACTLY \`/texera@texera.local/hackathon/latest/diabetes.csv\`. Not \`/ali@localhost/hackathon/...\`. Not \`/texera/hackathon/...\`. The string returned by the tool is the truth. + +If a scan operator already exists with a wrong \`fileName\`, call \`modifyOperator\` and set \`fileName\` to the canonical path from \`uploadFileToDataset\` / \`getDatasetFile\`. Do not invent a path of your own. + +### Worked example — full demo end-to-end + +User request: *"use machine 1 to read /home/me/data.csv, upload it to dataset 'sales', then create a workflow that for every row writes /home/me/out/row-{id}.jsonl on my machine."* + +Plan: +1. \`runOnMachine({ machineId: 1, command: "test -f /home/me/data.csv && head -3 /home/me/data.csv && wc -l /home/me/data.csv" })\` — verify file + capture column names. +2. \`listDatasets()\` — find the \`did\` for \`sales\`. Suppose result includes \`{"did": 7, "name": "sales"}\`. +3. \`uploadFileToDataset({ machineId: 1, localPath: "/home/me/data.csv", datasetId: 7, datasetFilePath: "data.csv" })\` — pushes the file as a new dataset version. +4. \`runOnMachine({ machineId: 1, command: "mkdir -p /home/me/out" })\` — make sure the output directory exists on the user's machine. +5. \`addOperator\` \`CSVFileScan\` and set its \`fileName\` to the \`fileName_for_scan_operator\` value returned by \`uploadFileToDataset\` (do not retype or guess the path). +6. \`addOperator\` \`MachineUDF\` connected to the scan: properties \`{ "machineUrl": "http://localhost:5555", "code": "", "retainInputColumns": true, "outputColumns": [{ "name": "written_to", "type": "STRING" }] }\`. +7. Done — respond to the user with the dataset version uploaded, the workflow built, and what they should click to run it. + +That is **7 tool calls maximum** for this kind of request, not 30. + +### Worked example — local ML / regression (showcase Texera workflow with Sklearn operators) + +User request: *"read /home/ali/UCI/hackathon/diabetes.csv on machine 'ali', train 3 regression models predicting 'target', save a prediction-vs-actual plot per model to /home/ali/UCI/hackathon/, and write a markdown report there."* + +**Use Texera's Sklearn operators for the actual ML work** — that's the whole point. Don't bury the training inside a single fat \`MachineUDF\` script when there are first-class operators for it. \`MachineUDF\` (batch mode) is only used at the END to write artifacts (plots, report) to the user's laptop disk. + +The pipeline (11 operators, all visible in the canvas — three parallel ML branches that each end in their own per-model MachineUDF writer): + +\`\`\` +CSVFileScan ── Split ─┬─► SklearnTrainingLinearRegression ─► SklearnPrediction (out: prediction) ─► MachineUDF (LR — plot) + ├─► SklearnTrainingRidge ─► SklearnPrediction (out: prediction) ─► MachineUDF (Ridge — plot) + └─► SVRTrainer ─► SklearnPrediction (out: prediction) ─► MachineUDF (SVR — plot + report.md) +\`\`\` + +The train port (Split:0) fans out to all three trainers; the test port (Split:1) fans out to all three SklearnPrediction operators. + +Plan (12 tool calls max — addOperator counts as one each): +1. \`runOnMachine({ machineId: 1, command: "test -f /home/ali/UCI/hackathon/diabetes.csv && head -1 /home/ali/UCI/hackathon/diabetes.csv" })\` — verify file + columns. **Capture \`machine.url\` from the response** — that exact string is your \`machineUrl\` later; do not modify it. +2. \`listDatasets()\` → find the \`did\` for "hackathon". +3. \`uploadFileToDataset({ machineId: 1, localPath: "/home/ali/UCI/hackathon/diabetes.csv", datasetName: "hackathon", datasetFilePath: "diabetes.csv" })\` — note \`fileName_for_scan_operator\` from the response (copy verbatim). +4. \`addOperator\` \`CSVFileScan\` with \`fileName\` = the canonical path from step 3. +5. \`addOperator\` \`Split\` with \`{ "k": 80, "random": true, "seed": 42 }\`. Split has two output ports: port 0 = train, port 1 = test. +6. \`addOperator\` \`SklearnTrainingLinearRegression\` with \`{ "target": "target", "countVectorizer": false, "tfidfTransformer": false }\`. Wire Split:0 → its \`training\` input. +7. \`addOperator\` \`SklearnTrainingRidge\` with the same property shape. Wire Split:0 → its \`training\` input. +8. \`addOperator\` \`SVRTrainer\` with \`{ "groundTruthAttribute": "target", "Selected Features": [all feature columns], "paraList": [{"kernel":"rbf","C":1.0,"epsilon":0.1}] }\` (one parameter set). Wire Split:0 → its \`training\` input. +9. \`addOperator\` three \`SklearnPrediction\` instances, each wired \`model\` ← one trainer's output and the data port ← Split:1 (test set). Set \`Output Attribute Name\` to **\`prediction\`** for ALL THREE (so each predictor's downstream schema is identical: original test columns + \`prediction\`) and \`Ground Truth Attribute Name to Ignore\` = \`target\`. +10. \`addOperator\` THREE \`MachineUDF\` operators — one per branch. (MachineUDF has only ONE input port; do NOT try to wire three SklearnPrediction outputs into a single MachineUDF.) For each: + - \`batchMode: true\` + - \`machineUrl\`: **EXACTLY** the value of \`machine.url\` from step 1 (typically \`http://localhost:5555\` — do not invent a hostname) + - \`code\`: the per-branch script (template below), with \`MODEL_NAME\` set to \`LinearRegression\` / \`Ridge\` / \`SVR\`. Pick ONE branch (e.g. the SVR one) to additionally write \`report.md\` by setting \`WRITE_REPORT = True\`. + - \`outputColumns\`: \`model: STRING, r2: DOUBLE, mse: DOUBLE, plot: STRING\` + - \`retainInputColumns: false\`, \`timeoutSeconds: 300\` + Wire each SklearnPrediction output → its own MachineUDF input. +11. Run the workflow. Three MachineUDFs each emit one row (total 3 result rows across the three output streams). +12. Report metrics + artifact paths to the user. Done. + +**Per-branch MachineUDF script template** — each branch sets its own \`MODEL_NAME\` and only ONE branch sets \`WRITE_REPORT = True\`: +\`\`\`python +import json, traceback +MODEL_NAME = "LinearRegression" # change per branch: LinearRegression / Ridge / SVR +WRITE_REPORT = False # set True on exactly ONE branch (typically the last one wired) +try: + import pandas as pd + import matplotlib + matplotlib.use("Agg") + import matplotlib.pyplot as plt + from sklearn.metrics import r2_score, mean_squared_error + from pathlib import Path + + out_dir = Path("/home/ali/UCI/hackathon") + out_dir.mkdir(parents=True, exist_ok=True) + df = pd.DataFrame(tuple_in) + y_true = df["target"] + y_pred = df["prediction"] + r2 = float(r2_score(y_true, y_pred)) + mse = float(mean_squared_error(y_true, y_pred)) + plot = out_dir / f"{MODEL_NAME}_prediction.png" + plt.figure(figsize=(6, 6)) + plt.scatter(y_true, y_pred, alpha=0.6) + lo, hi = float(y_true.min()), float(y_true.max()) + plt.plot([lo, hi], [lo, hi], "r--") + plt.xlabel("Actual"); plt.ylabel("Predicted") + plt.title(f"{MODEL_NAME}: R²={r2:.3f}, MSE={mse:.2f}") + plt.tight_layout(); plt.savefig(plot, dpi=120); plt.close() + + row = {"model": MODEL_NAME, "r2": r2, "mse": mse, "plot": str(plot)} + + if WRITE_REPORT: + # Index page that points to all three plots (paths follow MODEL_NAME convention). + # Use a TRIPLE-QUOTED string so multi-line content cannot accidentally introduce + # a SyntaxError. Do NOT embed raw newlines inside single- or double-quoted strings. + report = out_dir / "report.md" + rows_md = "\\n".join( + f"| {name} | \`{out_dir / f'{name}_prediction.png'}\` |" + for name in ("LinearRegression", "Ridge", "SVR") + ) + report.write_text(f"""# Regression report + +Generated by Texera workflow (CSVFileScan → Split → 3× Sklearn trainer → 3× SklearnPrediction → 3× MachineUDF). + +| Model | Plot | +|---|---| +{rows_md} +""") + row["report"] = str(report) + + print(json.dumps(row)) +except Exception as e: + print(json.dumps({"model": MODEL_NAME, "r2": None, "mse": None, "plot": str(e)[:300]})) + print(traceback.format_exc(), flush=True) +\`\`\` + +Why three branches with three MachineUDFs and not "one fat MachineUDF doing everything": the user wants to **see** the workflow showcase Texera. A canvas with \`CSVFileScan → Split → 3 trainers → 3 predictors → 3 reporters\` demonstrates the platform; wrapping all of that into one Python blob hides it. Each MachineUDF in this design is a tiny per-model writer, not a giant ML pipeline. +`; + const PYTHON_UDF_INSTRUCTIONS = `## Python UDF Guide Python UDF operators run user-defined Python code. There are 2 APIs to process data: @@ -91,7 +378,21 @@ class ProcessTableOperator(UDFTableOperator): - Keep each UDF focused on one task. - Only change the python code property, not other properties. - If adding extra columns, specify them in the Extra Output Columns property. -- Prefer native operators over Python UDF when possible.`; +- Prefer native operators over Python UDF when possible. +- **NEVER embed a raw newline inside a single- or double-quoted string** (\`'...'\` or \`"..."\`) — that is a \`SyntaxError: unterminated string literal\` and the entire UDF fails to load. For multi-line text (markdown blocks, multi-line error messages, file content) use triple-quoted strings (\`"""..."""\`) or build with explicit \`"\\n"\` escapes. Example of the **wrong** pattern that keeps breaking PythonUDF + MachineUDF runs: + \`\`\`python + # WRONG — raw newline inside single quotes + msg = 'line one + line two' + \`\`\` + Fix: + \`\`\`python + # RIGHT — triple-quoted, raw newlines allowed + msg = """line one + line two""" + # or equivalently: + msg = "line one\\nline two" + \`\`\``; const R_UDF_INSTRUCTIONS = `## R UDF Guide @@ -290,6 +591,7 @@ export function buildSystemPrompt(metadataStore: WorkflowSystemMetadata, allowed const extraSections: string[] = []; if (pythonAllowed) extraSections.push(PYTHON_UDF_INSTRUCTIONS); if (rAllowed) extraSections.push(R_UDF_INSTRUCTIONS); + extraSections.push(MACHINE_TOOLS_INSTRUCTIONS); const base = SYSTEM_PROMPT_TEMPLATE.replace("{{OPERATOR_SCHEMA}}", operatorSchemas); return extraSections.length > 0 ? `${base}\n${extraSections.join("\n\n")}\n` : base; diff --git a/agent-service/src/agent/texera-agent.ts b/agent-service/src/agent/texera-agent.ts index 37eb12d8688..b2374abb6c9 100644 --- a/agent-service/src/agent/texera-agent.ts +++ b/agent-service/src/agent/texera-agent.ts @@ -47,6 +47,18 @@ import { TOOL_NAME_EXECUTE_OPERATOR, type ExecutionConfig, } from "./tools/workflow-execution-tools"; +import { + createRunOnMachineTool, + createRunPythonOnMachineTool, + createListDatasetsTool, + createUploadFileToDatasetTool, + createGetDatasetFileTool, + TOOL_NAME_RUN_ON_MACHINE, + TOOL_NAME_RUN_PYTHON_ON_MACHINE, + TOOL_NAME_LIST_DATASETS, + TOOL_NAME_UPLOAD_FILE_TO_DATASET, + TOOL_NAME_GET_DATASET_FILE, +} from "./tools/machine-tools"; import { assembleContext } from "./util/context-utils"; import { compileWorkflowAsync, type WorkflowCompilationResponse } from "../api/compile-api"; import { createLogger } from "../logger"; @@ -228,6 +240,19 @@ export class TexeraAgent { ); } + if (this.delegateConfig?.userToken) { + const userToken = this.delegateConfig.userToken; + tools[TOOL_NAME_RUN_ON_MACHINE] = createRunOnMachineTool(userToken); + // `runPythonOnMachine` is intentionally NOT registered. The product showcases + // Texera as a workflow engine — every data-analysis path must end in a + // workflow on the canvas, not a side-channel script. Keep the helper in + // machine-tools.ts as dead code for now in case we need it for diagnostics + // later, but do not expose it to the LLM. + tools[TOOL_NAME_LIST_DATASETS] = createListDatasetsTool(userToken); + tools[TOOL_NAME_UPLOAD_FILE_TO_DATASET] = createUploadFileToDatasetTool(userToken); + tools[TOOL_NAME_GET_DATASET_FILE] = createGetDatasetFileTool(userToken); + } + return tools; } diff --git a/agent-service/src/agent/tools/index.ts b/agent-service/src/agent/tools/index.ts index 7e2d9570703..4fd5edfec7c 100644 --- a/agent-service/src/agent/tools/index.ts +++ b/agent-service/src/agent/tools/index.ts @@ -20,3 +20,4 @@ export * from "./tools-utility"; export * from "./workflow-crud-tools"; export * from "./workflow-execution-tools"; +export * from "./machine-tools"; diff --git a/agent-service/src/agent/tools/machine-tools.ts b/agent-service/src/agent/tools/machine-tools.ts new file mode 100644 index 00000000000..1c301751cb1 --- /dev/null +++ b/agent-service/src/agent/tools/machine-tools.ts @@ -0,0 +1,511 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { z } from "zod"; +import { tool } from "ai"; +import { env } from "../../config/env"; + +export const TOOL_NAME_RUN_ON_MACHINE = "runOnMachine"; +export const TOOL_NAME_RUN_PYTHON_ON_MACHINE = "runPythonOnMachine"; +export const TOOL_NAME_LIST_DATASETS = "listDatasets"; +export const TOOL_NAME_UPLOAD_FILE_TO_DATASET = "uploadFileToDataset"; +export const TOOL_NAME_GET_DATASET_FILE = "getDatasetFile"; + +interface MachineRecord { + mid: number; + name: string; + url: string; + token?: string | null; +} + +async function lookupMachine(userToken: string, mid: number): Promise { + const res = await fetch(`${env.TEXERA_DASHBOARD_SERVICE_ENDPOINT}/api/machines/${mid}`, { + headers: { Authorization: `Bearer ${userToken}` }, + }); + if (!res.ok) { + throw new Error(`Failed to look up machine ${mid}: HTTP ${res.status} ${await res.text()}`); + } + return (await res.json()) as MachineRecord; +} + +interface DatasetSummary { + did: number; + name: string; + ownerEmail?: string; + isPublic?: boolean; +} + +async function fetchDatasetList(userToken: string): Promise { + const res = await fetch(`${env.FILE_SERVICE_ENDPOINT}/api/dataset/list`, { + headers: { Authorization: `Bearer ${userToken}` }, + }); + if (!res.ok) { + throw new Error(`Failed to list datasets: HTTP ${res.status} ${await res.text()}`); + } + const body = (await res.json()) as Array<{ + dataset: { did: number; name: string; isPublic?: boolean }; + ownerEmail?: string; + }>; + return body.map(d => ({ + did: d.dataset.did, + name: d.dataset.name, + ownerEmail: d.ownerEmail, + isPublic: d.dataset.isPublic, + })); +} + +interface DatasetFileNode { + name: string; + type?: string; + parentDir?: string; + size?: number; + children?: DatasetFileNode[]; +} + +/** + * Walk file-service's nested file-node tree and yield flat (relativePath, size) entries + * for everything of type "file". + */ +function flattenFileNodes( + node: DatasetFileNode, + acc: { relativePath: string; size?: number }[] = [] +): { relativePath: string; size?: number }[] { + if (node.type === "file") { + // parentDir comes from LakeFS as e.g. "/texera///"; + // strip the first three segments so we get just "/" relative to + // the dataset version root. + const parent = (node.parentDir ?? "").split("/").filter(Boolean).slice(3); + const relative = [...parent, node.name].join("/"); + acc.push({ relativePath: relative, size: node.size }); + } + for (const c of node.children ?? []) flattenFileNodes(c, acc); + return acc; +} + +/** + * `run-on-machine`: lets the agent execute a shell command on one of the user's + * registered machines (Machines tab) via the remote machine-manager service. + */ +export function createRunOnMachineTool(userToken: string) { + return tool({ + description: + "Run a shell command on one of the user's registered machines (Machines tab) " + + "by hitting that host's machine-manager service. Use this to inspect or prepare " + + "the target environment (list files, check paths, install deps) before building " + + "a workflow that uses the Machine UDF operator on the same host.", + inputSchema: z.object({ + machineId: z + .number() + .int() + .describe("The numeric machine id (`mid`) from the Machines tab."), + command: z + .string() + .describe("The shell command to run on the target machine. Runs via `bash -c`."), + cwd: z + .string() + .optional() + .describe("Working directory on the target machine. Defaults to the user's home."), + timeoutSeconds: z + .number() + .min(1) + .max(600) + .default(60) + .describe("How long to wait for the command before timing out."), + }), + execute: async (args) => { + try { + const machine = await lookupMachine(userToken, args.machineId); + const headers: Record = { "Content-Type": "application/json" }; + if (machine.token && machine.token.trim().length > 0) { + headers["Authorization"] = `Bearer ${machine.token.trim()}`; + } + const res = await fetch(`${machine.url.replace(/\/$/, "")}/exec`, { + method: "POST", + headers, + body: JSON.stringify({ + cmd: args.command, + cwd: args.cwd ?? null, + timeout_seconds: args.timeoutSeconds, + }), + }); + const bodyText = await res.text(); + if (!res.ok) { + return { + success: false, + error: `machine-manager returned HTTP ${res.status}: ${bodyText}`, + machine: { mid: machine.mid, name: machine.name, url: machine.url }, + }; + } + const body = JSON.parse(bodyText) as { + exit_code: number; + stdout: string; + stderr: string; + }; + return { + success: body.exit_code === 0, + machine: { mid: machine.mid, name: machine.name, url: machine.url }, + exit_code: body.exit_code, + stdout: body.stdout, + stderr: body.stderr, + }; + } catch (e) { + return { + success: false, + error: e instanceof Error ? e.message : String(e), + }; + } + }, + }); +} + +/** + * `runPythonOnMachine`: run a self-contained Python script on the user's machine. + * + * Unlike `runOnMachine` (which only runs a shell command, intended for cheap + * inspection / setup), this hits machine-manager's `/python` endpoint, which + * executes the script under a data-science Python (sklearn, pandas, matplotlib, + * numpy, ...). Use this for ANY analysis task where the data lives on the + * user's laptop and the outputs (plots, reports, model files) should also be + * written to the user's laptop — load the CSV, train models, save artifacts, + * all in one call. No Texera workflow / dataset upload needed. + * + * The script can `print(json.dumps({...}))` on its last line to return a + * structured result the agent can then read. + */ +export function createRunPythonOnMachineTool(userToken: string) { + return tool({ + description: + "DIAGNOSTICS ONLY. Run a tiny Python snippet on the user's machine to check the environment " + + "(e.g. `import sklearn; print(sklearn.__version__)`). DO NOT use this to actually do the " + + "user's data analysis — that always goes in a Texera workflow with the `MachineUDF` operator " + + "in batch mode. The script's last `print(json.dumps({...}))` line is returned as `result`.", + inputSchema: z.object({ + machineId: z + .number() + .int() + .describe("Numeric machine id (`mid`) from the Machines tab."), + code: z + .string() + .describe( + "Self-contained Python source. The script's global scope already has `tuple_in` (None " + + "for this use). Print a JSON object on the last line to return a structured result." + ), + timeoutSeconds: z + .number() + .min(1) + .max(600) + .default(120) + .describe("Seconds before the script is killed. Default 120."), + }), + execute: async args => { + try { + const machine = await lookupMachine(userToken, args.machineId); + const headers: Record = { "Content-Type": "application/json" }; + if (machine.token && machine.token.trim().length > 0) { + headers["Authorization"] = `Bearer ${machine.token.trim()}`; + } + const res = await fetch(`${machine.url.replace(/\/$/, "")}/python`, { + method: "POST", + headers, + body: JSON.stringify({ + code: args.code, + tuple_in: null, + timeout_seconds: args.timeoutSeconds, + }), + }); + const bodyText = await res.text(); + if (!res.ok) { + return { + success: false, + error: `machine-manager returned HTTP ${res.status}: ${bodyText}`, + machine: { mid: machine.mid, name: machine.name, url: machine.url }, + }; + } + const body = JSON.parse(bodyText) as { + exit_code: number; + stdout: string; + stderr: string; + result: unknown; + }; + return { + success: body.exit_code === 0, + machine: { mid: machine.mid, name: machine.name, url: machine.url }, + exit_code: body.exit_code, + stdout: body.stdout, + stderr: body.stderr, + result: body.result, + }; + } catch (e) { + return { + success: false, + error: e instanceof Error ? e.message : String(e), + }; + } + }, + }); +} + +/** + * `getDatasetFile`: resolve a (datasetName, filename) pair to the canonical + * scan-operator fileName the workflow needs, using whatever version is the latest. + * Returns the exact string the agent should put in CSVFileScan.fileName. + */ +export function createGetDatasetFileTool(userToken: string) { + return tool({ + description: + "Resolve a Texera dataset file to the exact fileName string that CSVFileScan / TableFileScan operators expect. " + + "Pass the dataset's human name (e.g. 'test-4') and the file path inside the dataset (e.g. 'customers-test.csv'). " + + "Returns the canonical path ///latest/ — copy that verbatim into the scan operator's fileName property.", + inputSchema: z.object({ + datasetName: z.string().describe("Dataset name as shown in the Datasets tab, e.g. 'test-4'."), + filename: z + .string() + .optional() + .describe( + "Optional file path inside the dataset (e.g. 'customers-test.csv'). If omitted, returns the list of files in the latest version so you can pick one." + ), + }), + execute: async args => { + try { + const datasets = await fetchDatasetList(userToken); + const match = datasets.find(d => d.name === args.datasetName); + if (!match || !match.ownerEmail) { + return { + success: false, + error: `Dataset "${args.datasetName}" not found. Available: ${datasets.map(d => d.name).join(", ") || "(none)"}.`, + }; + } + const latestResp = await fetch( + `${env.FILE_SERVICE_ENDPOINT}/api/dataset/${match.did}/version/latest`, + { headers: { Authorization: `Bearer ${userToken}` } } + ); + if (!latestResp.ok) { + return { + success: false, + error: `Failed to fetch latest version of dataset ${args.datasetName}: HTTP ${latestResp.status} ${await latestResp.text()}`, + }; + } + const latestBody = (await latestResp.json()) as { + datasetVersion?: { name?: string }; + fileNodes?: DatasetFileNode[]; + }; + const versionName = latestBody.datasetVersion?.name ?? "latest"; + const files = (latestBody.fileNodes ?? []).flatMap(n => flattenFileNodes(n)); + + if (!args.filename) { + return { + success: true, + datasetName: match.name, + ownerEmail: match.ownerEmail, + latestVersion: versionName, + files: files.map(f => ({ + path: f.relativePath, + size: f.size, + fileName_for_scan_operator: `/${match.ownerEmail}/${match.name}/latest/${f.relativePath}`, + })), + hint: "Pass `filename` next time to get a single canonical scan-operator fileName.", + }; + } + const target = files.find(f => f.relativePath === args.filename); + if (!target) { + return { + success: false, + error: `File "${args.filename}" not found in latest version of dataset "${args.datasetName}". Files present: ${files.map(f => f.relativePath).join(", ") || "(none)"}.`, + }; + } + const canonical = `/${match.ownerEmail}/${match.name}/latest/${target.relativePath}`; + return { + success: true, + datasetName: match.name, + ownerEmail: match.ownerEmail, + latestVersion: versionName, + file: target.relativePath, + fileName_for_scan_operator: canonical, + hint: `Set the scan operator's "fileName" property to exactly: ${canonical}`, + }; + } catch (e) { + return { success: false, error: e instanceof Error ? e.message : String(e) }; + } + }, + }); +} + +/** + * `listDatasets`: returns the user's accessible Texera datasets (id + name). + * Use this to resolve a dataset *name* (what the user types) to a *did* (what + * file-service and uploadFileToDataset need). + */ +export function createListDatasetsTool(userToken: string) { + return tool({ + description: + "List the user's accessible Texera datasets. Returns each dataset's numeric id (did) and name. " + + "Use this when the user refers to a dataset by name and you need its id.", + inputSchema: z.object({}), + execute: async () => { + try { + const datasets = await fetchDatasetList(userToken); + return { success: true, datasets }; + } catch (e) { + return { + success: false, + error: e instanceof Error ? e.message : String(e), + }; + } + }, + }); +} + +/** + * `uploadFileToDataset`: have a registered machine read a local file and upload + * it to a Texera dataset, creating a new dataset version. + */ +export function createUploadFileToDatasetTool(userToken: string) { + return tool({ + description: + "Upload a file that lives on a registered machine into a Texera dataset, creating a new version. " + + "machine-manager on that machine reads the local file and pushes it to file-service. " + + "Pass either datasetId (numeric did) OR datasetName (the human-readable name) — the tool resolves the name automatically. " + + "If you pass both, datasetName wins. Do NOT guess datasetId from numbers in the name.", + inputSchema: z.object({ + machineId: z + .number() + .int() + .describe("Numeric machine id (`mid`) from the Machines tab."), + localPath: z + .string() + .describe("Absolute path on the machine, e.g. /home/ali/Downloads/customers-100.csv."), + datasetId: z + .number() + .int() + .optional() + .describe("Numeric dataset id (`did`). Optional if datasetName is provided."), + datasetName: z + .string() + .optional() + .describe( + "Dataset name (e.g. 'test-4'). Resolved to did via listDatasets internally. Preferred over datasetId." + ), + datasetFilePath: z + .string() + .describe( + "Destination path *inside the dataset*, e.g. customers-100.csv or subdir/file.csv." + ), + }), + execute: async args => { + try { + console.log("[DBG-UPLOAD] args:", JSON.stringify(args), "tokenLen:", userToken?.length, "tokenPrefix:", userToken?.slice(0, 30)); + let resolvedDid = args.datasetId; + if (args.datasetName) { + const datasets = await fetchDatasetList(userToken); + const match = datasets.find(d => d.name === args.datasetName); + if (!match) { + return { + success: false, + error: `Dataset name "${args.datasetName}" not found. Available: ${datasets.map(d => `${d.name} (did=${d.did})`).join(", ") || "(none)"}.`, + }; + } + resolvedDid = match.did; + console.log("[DBG-UPLOAD] resolved name", args.datasetName, "->", resolvedDid); + } + if (resolvedDid == null) { + return { + success: false, + error: "Must provide either datasetId or datasetName.", + }; + } + // Sanity check the id actually exists for this user, fail-fast with a useful message. + const allDatasets = await fetchDatasetList(userToken); + if (!allDatasets.some(d => d.did === resolvedDid)) { + return { + success: false, + error: `Dataset did=${resolvedDid} is not accessible to this user. Available: ${allDatasets.map(d => `${d.name} (did=${d.did})`).join(", ") || "(none)"}.`, + }; + } + args = { ...args, datasetId: resolvedDid }; + const machine = await lookupMachine(userToken, args.machineId); + const headers: Record = { "Content-Type": "application/json" }; + if (machine.token && machine.token.trim().length > 0) { + headers["Authorization"] = `Bearer ${machine.token.trim()}`; + } + const res = await fetch(`${machine.url.replace(/\/$/, "")}/upload-to-dataset`, { + method: "POST", + headers, + body: JSON.stringify({ + local_path: args.localPath, + dataset_id: resolvedDid, + file_path: args.datasetFilePath, + file_service_url: env.FILE_SERVICE_ENDPOINT, + auth_token: userToken, + }), + }); + const bodyText = await res.text(); + console.log("[DBG-UPLOAD] mm response:", res.status, bodyText.slice(0, 300)); + if (!res.ok) { + return { + success: false, + error: `machine-manager returned HTTP ${res.status}: ${bodyText}`, + }; + } + const parsed = JSON.parse(bodyText) as { + dataset_id: number; + file_path: string; + bytes_uploaded: number; + version_name?: string; + dataset_name?: string; + }; + // Build the canonical scan-operator fileName the workflow needs: + // ///latest/ + // Using the "latest" sentinel makes this robust to subsequent uploads — the + // FileResolver in amber resolves "latest" to the newest dataset_version row. + const ownerEmail = allDatasets.find(d => d.did === resolvedDid)?.ownerEmail; + const datasetForScan = parsed.dataset_name ?? args.datasetName ?? null; + const csvFileScanPath = + ownerEmail != null && datasetForScan != null + ? `/${ownerEmail}/${datasetForScan}/latest/${parsed.file_path}` + : null; + console.log("[DBG-UPLOAD] csvFileScanPath:", csvFileScanPath, "ownerEmail:", ownerEmail, "datasetForScan:", datasetForScan); + return { + success: true, + result: parsed, + fileName_for_scan_operator: csvFileScanPath, + hint: + csvFileScanPath != null + ? [ + `Upload succeeded. To wire a CSVFileScan / TableFileScan to this file,`, + `set its "fileName" property to EXACTLY this string (no quotes, no changes):`, + ``, + ` ${csvFileScanPath}`, + ``, + `Notes:`, + ` - The leading slash and the literal segment "latest" are required.`, + ` - "latest" auto-resolves to the newest version, so you never need to hard-code v1/v2/v3.`, + ` - Do NOT use an absolute filesystem path like /home/... — the scan operator reads from the Texera dataset, not from local disk.`, + ` - If a scan operator already exists with a different fileName, call modifyOperator to set fileName to the value above.`, + ].join("\n") + : "Upload succeeded but canonical path resolution failed; ask the user for the dataset path.", + }; + } catch (e) { + return { + success: false, + error: e instanceof Error ? e.message : String(e), + }; + } + }, + }); +} diff --git a/agent-service/src/config/env.ts b/agent-service/src/config/env.ts index 16a25b9be77..3fd405a3ceb 100644 --- a/agent-service/src/config/env.ts +++ b/agent-service/src/config/env.ts @@ -30,6 +30,7 @@ const EnvSchema = z.object({ LOG_PRETTY: z.coerce.boolean().default(false), TEXERA_DASHBOARD_SERVICE_ENDPOINT: z.string().url().default("http://localhost:8080"), + FILE_SERVICE_ENDPOINT: z.string().url().default("http://localhost:9092"), LLM_ENDPOINT: z.string().url().default("http://localhost:9096"), WORKFLOW_COMPILING_SERVICE_ENDPOINT: z.string().url().default("http://localhost:9090"), WORKFLOW_EXECUTION_SERVICE_ENDPOINT: z.string().url().default("http://localhost:8085"), diff --git a/agent-service/src/types/agent.ts b/agent-service/src/types/agent.ts index 765f5a7cb46..4e73294115e 100644 --- a/agent-service/src/types/agent.ts +++ b/agent-service/src/types/agent.ts @@ -87,7 +87,9 @@ export const DEFAULT_AGENT_SETTINGS: Omit = { executionTimeoutMs: 240000, maxSteps: 100, allowedOperatorTypes: [ + // --- core data manipulation --- "CSVFileScan", + "JSONLFileScan", "Filter", "Projection", "TypeCasting", @@ -98,13 +100,30 @@ export const DEFAULT_AGENT_SETTINGS: Omit = { "KeywordSearch", "HashJoin", "Aggregate", + "Split", + // --- visualizations --- "LineChart", "BarChart", "PieChart", "Histogram", "Scatterplot", + "ScatterMatrixChart", "WordCloud", + "ImageVisualizer", + // --- python / custom code --- "PythonUDFV2", + "PythonTableReducer", + "MachineUDF", + // --- regression / ML --- + // Both `SklearnXxx` (inference) and `SklearnTrainingXxx` (training) shapes + // are exposed; agent picks the trainer for fitting and the matching + // predictor/evaluator downstream. + "SklearnLinearRegression", + "SklearnTrainingLinearRegression", + "SklearnRidge", + "SklearnTrainingRidge", + "SVRTrainer", + "SklearnPrediction", ], }; diff --git a/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala b/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala index 98b7c68c974..6d7d1ff21cf 100644 --- a/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala +++ b/amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala @@ -160,6 +160,7 @@ class TexeraWebApplication environment.jersey.register(classOf[UserQuotaResource]) environment.jersey.register(classOf[AdminSettingsResource]) environment.jersey.register(classOf[AIAssistantResource]) + environment.jersey.register(classOf[MachineResource]) AuthResource.createAdminUser() diff --git a/amber/src/main/scala/org/apache/texera/web/resource/MachineResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/MachineResource.scala new file mode 100644 index 00000000000..455851c800c --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/web/resource/MachineResource.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource + +import io.dropwizard.auth.Auth +import org.apache.texera.auth.SessionUser +import org.apache.texera.dao.SqlServer +import org.apache.texera.dao.jooq.generated.Tables.MACHINE +import org.apache.texera.dao.jooq.generated.tables.daos.MachineDao +import org.apache.texera.dao.jooq.generated.tables.pojos.Machine + +import javax.annotation.security.RolesAllowed +import javax.ws.rs._ +import javax.ws.rs.core.MediaType + +object MachineResource { + case class MachineRequest(name: String, url: String, token: Option[String]) +} + +@Path("/machines") +@Produces(Array(MediaType.APPLICATION_JSON)) +@Consumes(Array(MediaType.APPLICATION_JSON)) +@RolesAllowed(Array("REGULAR", "ADMIN")) +class MachineResource { + import MachineResource._ + + private def dao: MachineDao = + new MachineDao(SqlServer.getInstance().createDSLContext().configuration()) + + @GET + def list(@Auth user: SessionUser): java.util.List[Machine] = { + SqlServer + .getInstance() + .createDSLContext() + .selectFrom(MACHINE) + .where(MACHINE.UID.eq(user.getUid)) + .fetchInto(classOf[Machine]) + } + + @POST + def create(@Auth user: SessionUser, req: MachineRequest): Machine = { + require(req.name != null && req.name.trim.nonEmpty, "name required") + require(req.url != null && req.url.trim.nonEmpty, "url required") + val m = new Machine() + m.setUid(user.getUid) + m.setName(req.name.trim) + m.setUrl(req.url.trim) + m.setToken(req.token.map(_.trim).filter(_.nonEmpty).orNull) + dao.insert(m) + m + } + + @GET + @Path("/{mid}") + def get(@Auth user: SessionUser, @PathParam("mid") mid: Integer): Machine = { + val m = dao.fetchOneByMid(mid) + if (m == null || m.getUid != user.getUid) throw new NotFoundException() + m + } + + @PUT + @Path("/{mid}") + def update( + @Auth user: SessionUser, + @PathParam("mid") mid: Integer, + req: MachineRequest + ): Machine = { + val m = dao.fetchOneByMid(mid) + if (m == null || m.getUid != user.getUid) throw new NotFoundException() + if (req.name != null && req.name.trim.nonEmpty) m.setName(req.name.trim) + if (req.url != null && req.url.trim.nonEmpty) m.setUrl(req.url.trim) + m.setToken(req.token.map(_.trim).filter(_.nonEmpty).orNull) + dao.update(m) + m + } + + @DELETE + @Path("/{mid}") + def delete(@Auth user: SessionUser, @PathParam("mid") mid: Integer): Unit = { + val m = dao.fetchOneByMid(mid) + if (m == null || m.getUid != user.getUid) throw new NotFoundException() + dao.delete(m) + } +} diff --git a/common/config/src/main/resources/gui.conf b/common/config/src/main/resources/gui.conf index d58d94ac7b9..4abb6a91a5b 100644 --- a/common/config/src/main/resources/gui.conf +++ b/common/config/src/main/resources/gui.conf @@ -37,10 +37,10 @@ gui { # Can be configured as { username: "texera", password: "password" } # If configured, this will be automatically filled into the local login input box default-local-user { - username = "" + username = "texera" username = ${?GUI_LOGIN_DEFAULT_LOCAL_USER_USERNAME} - password = "" + password = "texera" password = ${?GUI_LOGIN_DEFAULT_LOCAL_USER_PASSWORD} } } @@ -109,7 +109,7 @@ gui { active-time-in-minutes = ${?GUI_WORKFLOW_WORKSPACE_ACTIVE_TIME_IN_MINUTES} # whether AI copilot feature is enabled - copilot-enabled = false + copilot-enabled = true copilot-enabled = ${?GUI_WORKFLOW_WORKSPACE_COPILOT_ENABLED} # the limit of columns to be displayed in the result table diff --git a/common/config/src/main/resources/llm.conf b/common/config/src/main/resources/llm.conf index 23b9360cdab..8cfcad2fc29 100644 --- a/common/config/src/main/resources/llm.conf +++ b/common/config/src/main/resources/llm.conf @@ -18,10 +18,10 @@ # LLM Configuration llm { # Base URL for LiteLLM service - base-url = "http://0.0.0.0:4000" + base-url = "https://cherry00.ics.uci.edu/litellm" base-url = ${?LITELLM_BASE_URL} # Master key for LiteLLM authentication - master-key = "" + master-key = "sk-44f0e832d0a67bca7e999c8a50698749c3d39b88dd161d522e7a311ae375c9d8" master-key = ${?LITELLM_MASTER_KEY} } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/FileResolver.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/FileResolver.scala index c8a407df993..7bdc224e8a6 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/FileResolver.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/FileResolver.scala @@ -82,22 +82,42 @@ object FileResolver { * @param fileName The file path to parse * @return Some((ownerEmail, datasetName, versionName, fileRelativePath)) if valid, None otherwise */ + /** Sentinel that means "resolve to the dataset's latest version". + * Use it in place of the version segment, e.g. /alice/myDataset/latest/foo.csv + */ + private val LATEST_VERSION_TOKEN = "latest" + + /** + * Parse a dataset-file path. Accepts both: + * 4-segment: /ownerEmail/datasetName/versionName/fileRelativePath + * 3-segment: /ownerEmail/datasetName/fileRelativePath (auto-latest) + * The 3-segment form is signalled internally by versionName == LATEST_VERSION_TOKEN. + */ private def parseDatasetFilePath( fileName: String ): Option[(String, String, String, Array[String])] = { val filePath = Paths.get(fileName) val pathSegments = (0 until filePath.getNameCount).map(filePath.getName(_).toString).toArray - if (pathSegments.length < 4) { + if (pathSegments.length < 3) { return None } val ownerEmail = pathSegments(0) val datasetName = pathSegments(1) - val versionName = pathSegments(2) - val fileRelativePathSegments = pathSegments.drop(3) - Some((ownerEmail, datasetName, versionName, fileRelativePathSegments)) + // If we have at least 4 segments AND the third segment is NOT "latest", treat it + // as an explicit version name; otherwise resolve to the dataset's latest version. + if (pathSegments.length >= 4 && pathSegments(2) != LATEST_VERSION_TOKEN) { + val versionName = pathSegments(2) + val fileRelativePathSegments = pathSegments.drop(3) + Some((ownerEmail, datasetName, versionName, fileRelativePathSegments)) + } else { + val versionStart = if (pathSegments(2) == LATEST_VERSION_TOKEN) 3 else 2 + val fileRelativePathSegments = pathSegments.drop(versionStart) + if (fileRelativePathSegments.isEmpty) return None + Some((ownerEmail, datasetName, LATEST_VERSION_TOKEN, fileRelativePathSegments)) + } } /** @@ -138,12 +158,23 @@ object FileResolver { .and(DATASET.NAME.eq(datasetName)) .fetchOneInto(classOf[Dataset]) - // fetch the dataset version from DB - val datasetVersion = ctx - .selectFrom(DATASET_VERSION) - .where(DATASET_VERSION.DID.eq(dataset.getDid)) - .and(DATASET_VERSION.NAME.eq(versionName)) - .fetchOneInto(classOf[DatasetVersion]) + // fetch the dataset version from DB. If the caller passed the special + // "latest" token (or the path was 3-segment, which we normalize to "latest"), + // resolve to the most recently created version of this dataset. + val datasetVersion = if (versionName == LATEST_VERSION_TOKEN) { + ctx + .selectFrom(DATASET_VERSION) + .where(DATASET_VERSION.DID.eq(dataset.getDid)) + .orderBy(DATASET_VERSION.DVID.desc()) + .limit(1) + .fetchOneInto(classOf[DatasetVersion]) + } else { + ctx + .selectFrom(DATASET_VERSION) + .where(DATASET_VERSION.DID.eq(dataset.getDid)) + .and(DATASET_VERSION.NAME.eq(versionName)) + .fetchOneInto(classOf[DatasetVersion]) + } if (dataset == null || datasetVersion == null) { throw new FileNotFoundException(s"Dataset file $fileName not found.") diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index 4e9d6c6e2cd..ba3533bbf3c 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -90,6 +90,7 @@ import org.apache.texera.amber.operator.substringSearch.SubstringSearchOpDesc import org.apache.texera.amber.operator.symmetricDifference.SymmetricDifferenceOpDesc import org.apache.texera.amber.operator.typecasting.TypeCastingOpDesc import org.apache.texera.amber.operator.udf.java.JavaUDFOpDesc +import org.apache.texera.amber.operator.udf.machine.MachineUDFOpDesc import org.apache.texera.amber.operator.udf.python._ import org.apache.texera.amber.operator.udf.python.source.PythonUDFSourceOpDescV2 import org.apache.texera.amber.operator.udf.r.{RUDFOpDesc, RUDFSourceOpDesc} @@ -214,6 +215,7 @@ trait StateTransferFunc new Type(value = classOf[PythonUDFOpDescV2], name = "PythonUDFV2"), new Type(value = classOf[PythonUDFSourceOpDescV2], name = "PythonUDFSourceV2"), new Type(value = classOf[DualInputPortsPythonUDFOpDescV2], name = "DualInputPortsPythonUDFV2"), + new Type(value = classOf[MachineUDFOpDesc], name = "MachineUDF"), new Type(value = classOf[MySQLSourceOpDesc], name = "MySQLSource"), new Type(value = classOf[PostgreSQLSourceOpDesc], name = "PostgreSQLSource"), new Type(value = classOf[AsterixDBSourceOpDesc], name = "AsterixDBSource"), diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/machine/MachineUDFOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/machine/MachineUDFOpDesc.scala new file mode 100644 index 00000000000..0e432459925 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/machine/MachineUDFOpDesc.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.machine + +import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{Attribute, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.PhysicalOp.oneToOnePhysicalOp +import org.apache.texera.amber.core.workflow._ +import org.apache.texera.amber.operator.map.MapOpDesc +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +/** + * Machine UDF: like Python UDF, but the supplied Python code runs on a *registered machine* + * via the machine-manager HTTP service rather than on a Texera computing unit. The tuple is + * injected into the snippet as a global dict `tuple_in`, and the script is expected to print + * a JSON object on its last stdout line; that JSON becomes the output tuple. + */ +class MachineUDFOpDesc extends MapOpDesc { + + @JsonProperty(required = true) + @JsonSchemaTitle("Machine URL") + @JsonPropertyDescription("Base URL of the target machine-manager, e.g. http://localhost:5555") + var machineUrl: String = "http://localhost:5555" + + @JsonProperty + @JsonSchemaTitle("Machine token") + @JsonPropertyDescription("Bearer token for the machine-manager. Leave blank if not required.") + var machineToken: String = "" + + @JsonProperty( + required = true, + defaultValue = + "# `tuple_in` is the current input row as a dict.\n" + + "# Print one JSON object on the last line to emit it as the output tuple.\n" + + "import json\n" + + "row = dict(tuple_in)\n" + + "row['echoed'] = True\n" + + "print(json.dumps(row))\n" + ) + @JsonSchemaTitle("Python script") + @JsonPropertyDescription( + "Code executed on the target machine for each input tuple. The tuple is available as `tuple_in`." + ) + var code: String = "" + + @JsonProperty(defaultValue = "60") + @JsonSchemaTitle("Per-call timeout (seconds)") + var timeoutSeconds: Int = 60 + + @JsonProperty(defaultValue = "false") + @JsonSchemaTitle("Batch mode (run once on the whole table)") + @JsonPropertyDescription( + "When true, the script receives ALL input rows at once as `tuple_in` (a list of dicts) " + + "and runs ONE time after upstream finishes. Use this for whole-table analyses (e.g. " + + "train ML models, build summary plots) that need the full dataset. Output rows come " + + "from JSON lines the script prints. When false (default), the script runs per tuple " + + "with `tuple_in` as a single dict." + ) + var batchMode: Boolean = false + + @JsonProperty(defaultValue = "true") + @JsonSchemaTitle("Retain input columns") + @JsonPropertyDescription( + "Per-tuple mode: keep the input row's columns in the output. Ignored in batch mode " + + "(the output schema is exactly the declared output columns)." + ) + var retainInputColumns: Boolean = true + + @JsonProperty + @JsonSchemaTitle("Extra output column(s)") + @JsonPropertyDescription("Columns added by the script's returned JSON.") + var outputColumns: List[Attribute] = List() + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "org.apache.texera.amber.operator.udf.machine.MachineUDFOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withPropagateSchema(SchemaPropagationFunc(inputSchemas => { + val inputSchema = inputSchemas.values.head + // Batch mode never keeps the input schema — the operator emits whatever rows the script + // prints, with the user-declared columns. + val effectiveRetain = retainInputColumns && !batchMode + var outputSchema = if (effectiveRetain) inputSchema else Schema() + if (outputColumns != null) { + if (effectiveRetain) { + for (column <- outputColumns) { + if (inputSchema.containsAttribute(column.getName)) { + throw new RuntimeException( + s"Column name ${column.getName} already exists on the input schema" + ) + } + } + } + outputSchema = outputSchema.add(outputColumns) + } + Map(operatorInfo.outputPorts.head.id -> outputSchema) + })) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Machine UDF", + "Run a Python snippet on a registered machine (via machine-manager) for each input tuple", + OperatorGroupConstants.PYTHON_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/machine/MachineUDFOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/machine/MachineUDFOpExec.scala new file mode 100644 index 00000000000..7bbb66ea986 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/machine/MachineUDFOpExec.scala @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.machine + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode} +import org.apache.texera.amber.core.executor.OperatorExecutor +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +import java.net.URI +import java.net.http.{HttpClient, HttpRequest, HttpResponse} +import java.time.Duration +import scala.collection.mutable + +/** + * Runs a user Python script on a *registered machine* via its machine-manager service. + * + * Two modes (selected by `desc.batchMode`): + * - per-tuple (default): one HTTP /python call per input row. The output of each call's + * last JSON stdout line becomes the output tuple. Input row is exposed to the script + * as `tuple_in` (dict). + * - batch: buffer every input row, and on `onFinish` issue ONE HTTP call with the full + * buffer. `tuple_in` is then a list of dicts. The script can print multiple JSON lines + * and each becomes an output tuple — useful for e.g. training N models and emitting + * one metric row per model. + * + * In both cases we pin HTTP/1.1 to work around uvicorn's h2c upgrade dropping the request + * body (we hit that bug with HTTP/2 in earlier testing). + */ +class MachineUDFOpExec(descString: String) extends OperatorExecutor { + private val desc: MachineUDFOpDesc = + objectMapper.readValue(descString, classOf[MachineUDFOpDesc]) + + private val httpClient: HttpClient = + HttpClient + .newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .connectTimeout(Duration.ofSeconds(10)) + .build() + + // Per-tuple mode: nothing to buffer. Batch mode: accumulate input rows here. + private val batchBuffer: mutable.ArrayBuffer[ObjectNode] = + mutable.ArrayBuffer.empty[ObjectNode] + + private def tupleToObjectNode(tuple: Tuple): ObjectNode = { + val node = objectMapper.createObjectNode() + tuple.schema.getAttributes.foreach { attr => + val v = tuple.getField[AnyRef](attr.getName) + node.putPOJO(attr.getName, v) + } + node + } + + /** + * POST to `/python` with a JSON body and return the parsed response body. + * `tupleInPayload` is either an ObjectNode (per-tuple) or an ArrayNode (batch). + */ + private def callMachinePython(tupleInPayload: JsonNode): JsonNode = { + val payload = objectMapper.createObjectNode() + payload.put("code", Option(desc.code).getOrElse("")) + payload.set[ObjectNode]("tuple_in", tupleInPayload) + payload.put("timeout_seconds", desc.timeoutSeconds.toDouble) + + val payloadJson = objectMapper.writeValueAsString(payload) + val targetUrl = desc.machineUrl.stripSuffix("/") + "/python" + System.err.println( + s"[MachineUDFOpExec] -> POST $targetUrl bodyLen=${payloadJson.length} bodyPreview=${payloadJson.take(180)}" + ) + + val requestBuilder = HttpRequest + .newBuilder(URI.create(targetUrl)) + .timeout(Duration.ofSeconds(desc.timeoutSeconds.toLong + 15L)) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(payloadJson)) + + if (desc.machineToken != null && desc.machineToken.trim.nonEmpty) { + requestBuilder.header("Authorization", s"Bearer ${desc.machineToken.trim}") + } + + val response = + httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()) + + if (response.statusCode() / 100 != 2) { + throw new RuntimeException( + s"machine-manager ${desc.machineUrl} returned HTTP ${response.statusCode()}: ${response.body()}" + ) + } + + val body = objectMapper.readTree(response.body()) + val exitCode = body.path("exit_code").asInt(-1) + val stderr = body.path("stderr").asText("") + if (exitCode != 0) { + throw new RuntimeException( + s"machine-manager script failed (exit=$exitCode): $stderr" + ) + } + body + } + + /** + * For per-tuple mode, build a single output tuple from the script's parsed `result` + * (the last JSON line). If `retainInputColumns`, original input values are re-used + * for any column the script did not override (preserves type identity with the input + * schema). + */ + private def buildPerTupleOutput(inputTuple: Tuple, scriptResult: JsonNode): TupleLike = { + val builder = mutable.LinkedHashMap[String, Any]() + val inputAttrNames = mutable.Set.empty[String] + if (desc.retainInputColumns) { + inputTuple.schema.getAttributes.foreach { attr => + builder(attr.getName) = inputTuple.getField[Any](attr.getName) + inputAttrNames += attr.getName + } + } + if (scriptResult.isObject) { + val it = scriptResult.fields() + while (it.hasNext) { + val entry = it.next() + val key = entry.getKey + if (!inputAttrNames.contains(key)) { + builder(key) = jsonToScala(entry.getValue) + } + } + } + TupleLike(builder.toSeq: _*) + } + + /** + * For batch mode, the script can emit one or more rows. We accept either: + * - `result` is an ObjectNode → emit a single row from the declared output columns. + * - `result` is an ArrayNode of ObjectNode → emit one row per element. + * - script's stdout has multiple JSON object lines → each becomes a row. + * We accept the last form via the response's `stdout` field, splitting on lines. + */ + private def buildBatchOutputs(responseBody: JsonNode): Iterator[TupleLike] = { + val outputColumns = Option(desc.outputColumns).getOrElse(List()).map(_.getName) + val result = responseBody.path("result") + + val rows: Seq[JsonNode] = + if (result.isArray) { + val arr = result.asInstanceOf[ArrayNode] + (0 until arr.size()).map(arr.get) + } else if (result.isObject) { + Seq(result) + } else { + // No structured `result`; fall back to scanning stdout for JSON object lines. + val stdout = responseBody.path("stdout").asText("") + stdout + .split("\n") + .toSeq + .map(_.trim) + .filter(_.nonEmpty) + .flatMap { line => + scala.util.Try(objectMapper.readTree(line)).toOption.filter(_.isObject) + } + } + + rows.iterator.map(row => batchRowFromJson(row, outputColumns)) + } + + private def batchRowFromJson(row: JsonNode, outputColumnNames: List[String]): TupleLike = { + val builder = mutable.LinkedHashMap[String, Any]() + // Project only the declared output columns, in declared order. Missing columns become null. + for (colName <- outputColumnNames) { + val v = row.path(colName) + builder(colName) = if (v.isMissingNode) null else jsonToScala(v) + } + TupleLike(builder.toSeq: _*) + } + + private def jsonToScala(node: JsonNode): Any = { + if (node == null || node.isNull) null + else if (node.isInt) node.asInt() + else if (node.isLong) node.asLong() + else if (node.isDouble || node.isFloat) node.asDouble() + else if (node.isBoolean) node.asBoolean() + else node.asText() + } + + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { + if (desc.batchMode) { + // Buffer; emit nothing until input is finished. + batchBuffer += tupleToObjectNode(tuple) + Iterator.empty + } else { + val body = callMachinePython(tupleToObjectNode(tuple)) + Iterator.single(buildPerTupleOutput(tuple, body.path("result"))) + } + } + + override def onFinish(port: Int): Iterator[TupleLike] = { + if (!desc.batchMode) return Iterator.empty + val arr = objectMapper.createArrayNode() + batchBuffer.foreach(arr.add) + System.err.println( + s"[MachineUDFOpExec] batch finished; sending ${batchBuffer.size} rows to ${desc.machineUrl}" + ) + val body = callMachinePython(arr) + buildBatchOutputs(body) + } +} diff --git a/frontend/src/app/app-routing.constant.ts b/frontend/src/app/app-routing.constant.ts index 4181df8a954..cb54dad3542 100644 --- a/frontend/src/app/app-routing.constant.ts +++ b/frontend/src/app/app-routing.constant.ts @@ -36,6 +36,7 @@ export const DASHBOARD_USER_WORKFLOW = `${DASHBOARD_USER}/workflow`; export const DASHBOARD_USER_DATASET = `${DASHBOARD_USER}/dataset`; export const DASHBOARD_USER_DATASET_CREATE = `${DASHBOARD_USER_DATASET}/create`; export const DASHBOARD_USER_COMPUTING_UNIT = `${DASHBOARD_USER}/compute`; +export const DASHBOARD_USER_MACHINE = `${DASHBOARD_USER}/machines`; export const DASHBOARD_USER_QUOTA = `${DASHBOARD_USER}/quota`; export const DASHBOARD_USER_DISCUSSION = `${DASHBOARD_USER}/discussion`; diff --git a/frontend/src/app/app-routing.module.ts b/frontend/src/app/app-routing.module.ts index 179caf5c088..a3d56072ba2 100644 --- a/frontend/src/app/app-routing.module.ts +++ b/frontend/src/app/app-routing.module.ts @@ -25,6 +25,7 @@ import { UserQuotaComponent } from "./dashboard/component/user/user-quota/user-q import { UserProjectSectionComponent } from "./dashboard/component/user/user-project/user-project-section/user-project-section.component"; import { UserProjectComponent } from "./dashboard/component/user/user-project/user-project.component"; import { UserComputingUnitComponent } from "./dashboard/component/user/user-computing-unit/user-computing-unit.component"; +import { UserMachineComponent } from "./dashboard/component/user/user-machine/user-machine.component"; import { WorkspaceComponent } from "./workspace/component/workspace.component"; import { AboutComponent } from "./hub/component/about/about.component"; import { AuthGuardService } from "./common/service/user/auth-guard.service"; @@ -135,6 +136,10 @@ routes.push({ path: "compute", component: UserComputingUnitComponent, }, + { + path: "machines", + component: UserMachineComponent, + }, { path: "quota", component: UserQuotaComponent, diff --git a/frontend/src/app/common/service/machine/machine.service.ts b/frontend/src/app/common/service/machine/machine.service.ts new file mode 100644 index 00000000000..5aac591ec27 --- /dev/null +++ b/frontend/src/app/common/service/machine/machine.service.ts @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Injectable } from "@angular/core"; +import { HttpClient } from "@angular/common/http"; +import { Observable } from "rxjs"; +import { AppSettings } from "../../app-setting"; +import { Machine, MachineRequest } from "../../type/machine"; + +const MACHINE_API = "machines"; + +@Injectable({ providedIn: "root" }) +export class MachineService { + constructor(private http: HttpClient) {} + + list(): Observable { + return this.http.get(`${AppSettings.getApiEndpoint()}/${MACHINE_API}`); + } + + create(req: MachineRequest): Observable { + return this.http.post(`${AppSettings.getApiEndpoint()}/${MACHINE_API}`, req); + } + + update(mid: number, req: MachineRequest): Observable { + return this.http.put(`${AppSettings.getApiEndpoint()}/${MACHINE_API}/${mid}`, req); + } + + delete(mid: number): Observable { + return this.http.delete(`${AppSettings.getApiEndpoint()}/${MACHINE_API}/${mid}`); + } +} diff --git a/frontend/src/app/common/type/machine.ts b/frontend/src/app/common/type/machine.ts new file mode 100644 index 00000000000..6d9e435390b --- /dev/null +++ b/frontend/src/app/common/type/machine.ts @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export interface Machine { + mid: number; + uid: number; + name: string; + url: string; + token?: string | null; + creationTime?: string; +} + +export interface MachineRequest { + name: string; + url: string; + token?: string | null; +} diff --git a/frontend/src/app/dashboard/component/dashboard.component.html b/frontend/src/app/dashboard/component/dashboard.component.html index b04eafb3107..166a05b5425 100644 --- a/frontend/src/app/dashboard/component/dashboard.component.html +++ b/frontend/src/app/dashboard/component/dashboard.component.html @@ -109,6 +109,17 @@ nzType="deployment-unit"> Compute +
  • + + Machines +
  • + + +
    +

    Machines

    + + + +
    + + + + + ID + Name + URL + Token + Actions + + + + + {{ m.mid }} + {{ m.name }} + {{ m.url }} + + ●●●● + (none) + + + + + + + + + No machines registered yet. Click Add machine to register one. + + + + +
    + + + +
    + + Name + + + + + + URL + + + + + + Bearer token (optional) + + + + +
    +
    +
    diff --git a/frontend/src/app/dashboard/component/user/user-machine/user-machine.component.scss b/frontend/src/app/dashboard/component/user/user-machine/user-machine.component.scss new file mode 100644 index 00000000000..0a18740b740 --- /dev/null +++ b/frontend/src/app/dashboard/component/user/user-machine/user-machine.component.scss @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +.machine-card { + margin: 24px; +} + +.machine-toolbar { + display: flex; + align-items: center; + margin-bottom: 16px; + gap: 8px; + + h2 { + margin: 0; + } +} + +.toolbar-spacer { + flex: 1; +} + +.muted { + color: rgba(0, 0, 0, 0.35); +} + +.empty-row { + text-align: center; + color: rgba(0, 0, 0, 0.45); +} diff --git a/frontend/src/app/dashboard/component/user/user-machine/user-machine.component.ts b/frontend/src/app/dashboard/component/user/user-machine/user-machine.component.ts new file mode 100644 index 00000000000..6b27619b34a --- /dev/null +++ b/frontend/src/app/dashboard/component/user/user-machine/user-machine.component.ts @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Component, OnInit } from "@angular/core"; +import { FormsModule } from "@angular/forms"; +import { NgFor, NgIf } from "@angular/common"; +import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; +import { NzCardComponent } from "ng-zorro-antd/card"; +import { NzButtonComponent } from "ng-zorro-antd/button"; +import { NzIconDirective } from "ng-zorro-antd/icon"; +import { NzInputDirective } from "ng-zorro-antd/input"; +import { NzModalService, NzModalModule } from "ng-zorro-antd/modal"; +import { NzTableModule } from "ng-zorro-antd/table"; +import { NzFormModule } from "ng-zorro-antd/form"; +import { NzMessageService } from "ng-zorro-antd/message"; +import { MachineService } from "../../../../common/service/machine/machine.service"; +import { Machine, MachineRequest } from "../../../../common/type/machine"; + +@UntilDestroy() +@Component({ + selector: "texera-user-machine", + standalone: true, + imports: [ + FormsModule, + NgFor, + NgIf, + NzCardComponent, + NzButtonComponent, + NzIconDirective, + NzInputDirective, + NzModalModule, + NzTableModule, + NzFormModule, + ], + templateUrl: "./user-machine.component.html", + styleUrls: ["./user-machine.component.scss"], +}) +export class UserMachineComponent implements OnInit { + machines: Machine[] = []; + loading = false; + + showAddModal = false; + editing: Machine | null = null; + form: MachineRequest = { name: "", url: "http://localhost:5555", token: "" }; + + constructor( + private machineService: MachineService, + private modal: NzModalService, + private message: NzMessageService + ) {} + + ngOnInit() { + this.refresh(); + } + + refresh() { + this.loading = true; + this.machineService + .list() + .pipe(untilDestroyed(this)) + .subscribe({ + next: machines => { + this.machines = machines; + this.loading = false; + }, + error: err => { + this.message.error("Failed to load machines: " + (err?.error?.message ?? err?.message ?? err)); + this.loading = false; + }, + }); + } + + openAdd() { + this.editing = null; + this.form = { name: "", url: "http://localhost:5555", token: "" }; + this.showAddModal = true; + } + + openEdit(m: Machine) { + this.editing = m; + this.form = { name: m.name, url: m.url, token: m.token ?? "" }; + this.showAddModal = true; + } + + cancel() { + this.showAddModal = false; + } + + save() { + const req: MachineRequest = { + name: this.form.name.trim(), + url: this.form.url.trim(), + token: this.form.token?.trim() || null, + }; + if (!req.name || !req.url) { + this.message.warning("Name and URL are required"); + return; + } + const op$ = this.editing + ? this.machineService.update(this.editing.mid, req) + : this.machineService.create(req); + op$.pipe(untilDestroyed(this)).subscribe({ + next: () => { + this.showAddModal = false; + this.refresh(); + }, + error: err => { + this.message.error("Failed to save: " + (err?.error?.message ?? err?.message ?? err)); + }, + }); + } + + remove(m: Machine) { + this.modal.confirm({ + nzTitle: `Delete machine "${m.name}"?`, + nzOkText: "Delete", + nzOkDanger: true, + nzOnOk: () => + this.machineService + .delete(m.mid) + .pipe(untilDestroyed(this)) + .subscribe({ + next: () => this.refresh(), + error: err => + this.message.error("Failed to delete: " + (err?.error?.message ?? err?.message ?? err)), + }), + }); + } +} diff --git a/frontend/src/app/dashboard/component/user/user-ml-model/user-ml-model.component.html b/frontend/src/app/dashboard/component/user/user-ml-model/user-ml-model.component.html new file mode 100644 index 00000000000..046a12d4d9e --- /dev/null +++ b/frontend/src/app/dashboard/component/user/user-ml-model/user-ml-model.component.html @@ -0,0 +1,62 @@ + + +
    + +

    ML Models

    +
    + + +
    +
    + + + + +
    diff --git a/frontend/src/app/dashboard/component/user/user-ml-model/user-ml-model.component.scss b/frontend/src/app/dashboard/component/user/user-ml-model/user-ml-model.component.scss new file mode 100644 index 00000000000..fe0b7cc65f5 --- /dev/null +++ b/frontend/src/app/dashboard/component/user/user-ml-model/user-ml-model.component.scss @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +@import "../../dashboard.component"; +@import "../../section-style"; +@import "../../button-style"; + +.ml-model-menu-bar { + height: 55px; + overflow-y: hidden; +} diff --git a/frontend/src/app/dashboard/component/user/user-ml-model/user-ml-model.component.ts b/frontend/src/app/dashboard/component/user/user-ml-model/user-ml-model.component.ts new file mode 100644 index 00000000000..5118ff8e954 --- /dev/null +++ b/frontend/src/app/dashboard/component/user/user-ml-model/user-ml-model.component.ts @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; +import { AfterViewInit, Component, ViewChild } from "@angular/core"; +import { UserService } from "../../../../common/service/user/user.service"; +import { Router } from "@angular/router"; +import { SearchService } from "../../../service/user/search.service"; +import { DatasetService } from "../../../service/user/dataset/dataset.service"; +import { SortMethod } from "../../../type/sort-method"; +import { DashboardEntry } from "../../../type/dashboard-entry"; +import { SearchResultsComponent } from "../search-results/search-results.component"; +import { FiltersComponent } from "../filters/filters.component"; +import { firstValueFrom } from "rxjs"; +import { DASHBOARD_USER_ML_MODEL } from "../../../../app-routing.constant"; +import { NzModalService } from "ng-zorro-antd/modal"; +import { UserDatasetVersionCreatorComponent } from "../user-dataset/user-dataset-explorer/user-dataset-version-creator/user-dataset-version-creator.component"; +import { DashboardDataset } from "../../../type/dashboard-dataset.interface"; +import { NzMessageService } from "ng-zorro-antd/message"; +import { map, tap } from "rxjs/operators"; + +@UntilDestroy() +@Component({ + selector: "texera-ml-model-section", + templateUrl: "user-ml-model.component.html", + styleUrls: ["user-ml-model.component.scss"], +}) +export class UserMlModelComponent implements AfterViewInit { + public sortMethod = SortMethod.EditTimeDesc; + lastSortMethod: SortMethod | null = null; + public isLogin = this.userService.isLogin(); + public currentUid = this.userService.getCurrentUser()?.uid; + public hasMismatch = false; + + protected readonly DASHBOARD_USER_ML_MODEL = DASHBOARD_USER_ML_MODEL; + + private _searchResultsComponent?: SearchResultsComponent; + @ViewChild(SearchResultsComponent) get searchResultsComponent(): SearchResultsComponent { + if (this._searchResultsComponent) { + return this._searchResultsComponent; + } + throw new Error("Property cannot be accessed before it is initialized."); + } + + set searchResultsComponent(value: SearchResultsComponent) { + this._searchResultsComponent = value; + } + + private _filters?: FiltersComponent; + @ViewChild(FiltersComponent) get filters(): FiltersComponent { + if (this._filters) { + return this._filters; + } + throw new Error("Property cannot be accessed before it is initialized."); + } + + set filters(value: FiltersComponent) { + value.masterFilterListChange.pipe(untilDestroyed(this)).subscribe({ next: () => this.search() }); + this._filters = value; + } + + private masterFilterList: ReadonlyArray | null = null; + constructor( + private modalService: NzModalService, + private userService: UserService, + private router: Router, + private searchService: SearchService, + private datasetService: DatasetService, + private message: NzMessageService + ) { + this.userService + .userChanged() + .pipe(untilDestroyed(this)) + .subscribe(() => { + this.isLogin = this.userService.isLogin(); + this.currentUid = this.userService.getCurrentUser()?.uid; + }); + } + + ngAfterViewInit() { + this.userService + .userChanged() + .pipe(untilDestroyed(this)) + .subscribe(() => this.search()); + } + + async search(forced: Boolean = false, filterScope: "all" | "public" | "private" = "private"): Promise { + const sameList = + this.masterFilterList !== null && + this.filters.masterFilterList.length === this.masterFilterList.length && + this.filters.masterFilterList.every((v, i) => v === this.masterFilterList![i]); + if (!forced && sameList && this.sortMethod === this.lastSortMethod) { + return; + } + this.lastSortMethod = this.sortMethod; + this.masterFilterList = this.filters.masterFilterList; + if (!this.searchResultsComponent) { + throw new Error("searchResultsComponent is undefined."); + } + const filterParams = this.filters.getSearchFilterParameters(); + const isLogin = filterScope === "public" ? false : this.isLogin; + const includePublic = filterScope === "all" || filterScope === "public"; + + this.searchResultsComponent.reset((start, count) => { + return firstValueFrom( + this.searchService + .executeSearch( + this.filters.getSearchKeywords(), + filterParams, + start, + count, + "dataset", + this.sortMethod, + isLogin, + includePublic + ) + .pipe( + tap(({ hasMismatch }) => { + this.hasMismatch = hasMismatch ?? false; + if (this.hasMismatch) { + this.message.warning( + "There is a mismatch between some ML models in the database and LakeFS. Only matched items are displayed.", + { nzDuration: 4000 } + ); + } + }), + map(({ entries, more }) => ({ entries, more })) + ) + ); + }); + await this.searchResultsComponent.loadMore(); + } + + public onClickOpenMlModelAddComponent(): void { + const modal = this.modalService.create({ + nzTitle: "Create New ML Model", + nzContent: UserDatasetVersionCreatorComponent, + nzFooter: null, + nzData: { + isCreatingVersion: false, + }, + nzBodyStyle: { + resize: "both", + overflow: "auto", + minHeight: "200px", + minWidth: "550px", + maxWidth: "90vw", + maxHeight: "80vh", + }, + nzWidth: "fit-content", + }); + modal.afterClose.pipe(untilDestroyed(this)).subscribe(result => { + if (result != null) { + const dashboardDataset: DashboardDataset = result as DashboardDataset; + this.router.navigate([`${DASHBOARD_USER_ML_MODEL}/${dashboardDataset.asset.aid}`]); + } + }); + } + + public deleteMlModel(entry: DashboardEntry): void { + if (entry.dataset?.asset?.aid == undefined) { + return; + } + this.datasetService + .deleteDatasets(entry.dataset.asset.aid) + .pipe(untilDestroyed(this)) + .subscribe(_ => { + this.searchResultsComponent.entries = this.searchResultsComponent.entries.filter( + datasetEntry => datasetEntry.dataset?.asset?.aid !== entry.dataset?.asset?.aid + ); + }); + } +} diff --git a/frontend/src/app/workspace/service/operator-metadata/mlflow-operator-schema.ts b/frontend/src/app/workspace/service/operator-metadata/mlflow-operator-schema.ts new file mode 100644 index 00000000000..3a8b6ee5832 --- /dev/null +++ b/frontend/src/app/workspace/service/operator-metadata/mlflow-operator-schema.ts @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { GroupInfo, OperatorSchema } from "../../types/operator-schema.interface"; + +/** Frontend-injected operator schema for MLFlow (mimics CSV File Scan with dataset selection). */ +export const MLFLOW_OPERATOR_SCHEMA: OperatorSchema = { + operatorType: "MLFlow", + jsonSchema: { + type: "object", + properties: { + fileName: { + type: "string", + title: "file name", + description: "Select a file from your datasets (e.g. ML model file)", + }, + }, + required: ["fileName"], + }, + additionalMetadata: { + userFriendlyName: "MLFlow", + operatorDescription: "Load an ML model or data file from your datasets", + operatorGroupName: "Machine Learning", + inputPorts: [], + outputPorts: [{}], + }, + operatorVersion: "1.0", +}; + +export const MACHINE_LEARNING_GROUP_NAME = "Machine Learning"; + +/** Group entry for the operator menu. */ +export const MACHINE_LEARNING_GROUP: GroupInfo = { + groupName: MACHINE_LEARNING_GROUP_NAME, +}; diff --git a/frontend/src/assets/operator_images/MLFlow.png b/frontend/src/assets/operator_images/MLFlow.png new file mode 100755 index 00000000000..c570e230ac2 Binary files /dev/null and b/frontend/src/assets/operator_images/MLFlow.png differ diff --git a/frontend/src/assets/operator_images/MachineUDF.png b/frontend/src/assets/operator_images/MachineUDF.png new file mode 100644 index 00000000000..10f2095e38f Binary files /dev/null and b/frontend/src/assets/operator_images/MachineUDF.png differ diff --git a/machine-manager/.gitignore b/machine-manager/.gitignore new file mode 100644 index 00000000000..c1c662bded2 --- /dev/null +++ b/machine-manager/.gitignore @@ -0,0 +1,4 @@ +.venv/ +__pycache__/ +*.egg-info/ +*.pyc diff --git a/machine-manager/bin/run.sh b/machine-manager/bin/run.sh new file mode 100755 index 00000000000..d5a1d238dfe --- /dev/null +++ b/machine-manager/bin/run.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +# Start machine-manager on this host. +# +# Usage: +# MACHINE_MANAGER_TOKEN= ./bin/run.sh +# +# First run will create a venv and install dependencies. +set -euo pipefail +cd "$(dirname "$0")/.." + +VENV_DIR="${VENV_DIR:-.venv}" +if [[ ! -d "$VENV_DIR" ]]; then + python3 -m venv "$VENV_DIR" + "$VENV_DIR/bin/pip" install --upgrade pip + "$VENV_DIR/bin/pip" install -e . +fi + +# Pick the Python interpreter the /python endpoint will use for running user +# code. Prefer an existing data-science venv (sklearn / pandas / matplotlib) +# so analysis scripts work out of the box; fall back to the manager's own +# venv if nothing better is available. +if [[ -z "${MACHINE_MANAGER_PYTHON:-}" ]]; then + for candidate in \ + "$HOME/IdeaProjects/texera/.venv/bin/python" \ + "$VENV_DIR/bin/python"; do + if [[ -x "$candidate" ]]; then + export MACHINE_MANAGER_PYTHON="$candidate" + break + fi + done +fi +echo "[machine-manager] /python interpreter = ${MACHINE_MANAGER_PYTHON:-}" + +exec "$VENV_DIR/bin/python" -m machine_manager.server diff --git a/machine-manager/pyproject.toml b/machine-manager/pyproject.toml new file mode 100644 index 00000000000..8de00a5dcce --- /dev/null +++ b/machine-manager/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "machine-manager" +version = "0.1.0" +description = "Texera machine-manager: per-host agent that executes commands, runs Python with tuple injection, deploys code, and uploads files to Texera datasets." +requires-python = ">=3.10" +dependencies = [ + "fastapi>=0.115", + "uvicorn[standard]>=0.30", + "httpx>=0.27", + "pydantic>=2.7", +] + +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/machine-manager/src/machine_manager/__init__.py b/machine-manager/src/machine_manager/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/machine-manager/src/machine_manager/server.py b/machine-manager/src/machine_manager/server.py new file mode 100644 index 00000000000..6c995d7870b --- /dev/null +++ b/machine-manager/src/machine_manager/server.py @@ -0,0 +1,383 @@ +"""Texera machine-manager service. + +Runs on the target host. Exposes a small HTTP API so Texera services can: +- run shell commands (POST /exec) +- run a Python snippet with an injected tuple (POST /python) +- write a file under a sandbox directory (POST /deploy-code) +- upload a local file into a Texera dataset (POST /upload-to-dataset) + +Auth: shared Bearer token from env MACHINE_MANAGER_TOKEN (skipped if unset, for dev). +Port: env MACHINE_MANAGER_PORT, default 5555. +Sandbox: env MACHINE_MANAGER_SANDBOX_DIR, default ~/.texera/machine-manager/sandbox. +""" +from __future__ import annotations + +import asyncio +import contextlib +import json +import os +import shlex +import subprocess +import sys +import tempfile +import textwrap +import traceback +from pathlib import Path +from typing import Any + +import httpx +from fastapi import Depends, FastAPI, HTTPException, Request, status +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field + + +# --- config ----------------------------------------------------------------- + +TOKEN = os.environ.get("MACHINE_MANAGER_TOKEN", "").strip() +SANDBOX_DIR = Path( + os.environ.get( + "MACHINE_MANAGER_SANDBOX_DIR", + str(Path.home() / ".texera" / "machine-manager" / "sandbox"), + ) +).resolve() +SANDBOX_DIR.mkdir(parents=True, exist_ok=True) + +# Python interpreter used for /python execution. Defaults to machine-manager's +# own interpreter, which typically has only FastAPI/uvicorn. Override with a +# data-science venv (sklearn, pandas, matplotlib, ...) when the agent needs to +# run real analysis workloads. +PYTHON_INTERPRETER = os.environ.get("MACHINE_MANAGER_PYTHON", sys.executable) + + +# --- auth ------------------------------------------------------------------- + +async def require_token(request: Request) -> None: + if not TOKEN: + # dev mode: no token configured, allow all + return + header = request.headers.get("authorization", "") + if not header.startswith("Bearer "): + raise HTTPException(status.HTTP_401_UNAUTHORIZED, "missing bearer token") + if header.removeprefix("Bearer ").strip() != TOKEN: + raise HTTPException(status.HTTP_403_FORBIDDEN, "bad token") + + +# --- request/response models ------------------------------------------------ + +class ExecRequest(BaseModel): + cmd: str = Field(..., description="Command to run. Passed to a shell.") + cwd: str | None = None + timeout_seconds: float = 60.0 + env: dict[str, str] | None = None + + +class ExecResponse(BaseModel): + exit_code: int + stdout: str + stderr: str + + +class PythonRequest(BaseModel): + code: str + # `tuple_in` is whatever JSON the caller wants injected as a Python global. + # For per-tuple MachineUDF this is a dict; for batch MachineUDF it's a list of + # dicts; for ad-hoc runs (`runPythonOnMachine`) callers pass null. + tuple_in: Any = None + timeout_seconds: float = 60.0 + + +class PythonResponse(BaseModel): + exit_code: int + stdout: str + stderr: str + result: Any | None = None # parsed last JSON line of stdout, if any + + +class DeployRequest(BaseModel): + relative_path: str = Field(..., description="Path under the sandbox dir.") + content: str + overwrite: bool = True + + +class DeployResponse(BaseModel): + absolute_path: str + bytes_written: int + + +class UploadRequest(BaseModel): + local_path: str + dataset_id: int + file_path: str = Field(..., description="Destination path inside the dataset.") + file_service_url: str = Field(..., description="Base URL of file-service, e.g. http://localhost:9092") + auth_token: str = Field(..., description="Texera JWT for the calling user.") + + +class UploadResponse(BaseModel): + dataset_id: int + file_path: str + bytes_uploaded: int + version_name: str | None = None + dataset_name: str | None = None + + +# --- app -------------------------------------------------------------------- + +app = FastAPI( + title="texera-machine-manager", + version="0.1.0", + dependencies=[Depends(require_token)], +) + + +@app.get("/healthz") +async def healthz() -> dict[str, Any]: + return { + "ok": True, + "sandbox": str(SANDBOX_DIR), + "auth_required": bool(TOKEN), + "python_interpreter": PYTHON_INTERPRETER, + } + + +@app.post("/exec", response_model=ExecResponse) +async def run_exec(req: ExecRequest) -> ExecResponse: + try: + proc = await asyncio.create_subprocess_shell( + req.cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=req.cwd or None, + env={**os.environ, **(req.env or {})}, + ) + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(), timeout=req.timeout_seconds + ) + except asyncio.TimeoutError: + proc.kill() + with contextlib.suppress(Exception): + await proc.wait() + raise HTTPException(status.HTTP_504_GATEWAY_TIMEOUT, "command timed out") + return ExecResponse( + exit_code=proc.returncode if proc.returncode is not None else -1, + stdout=stdout.decode("utf-8", "replace"), + stderr=stderr.decode("utf-8", "replace"), + ) + except HTTPException: + raise + except Exception as e: + raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, str(e)) + + +@app.middleware("http") +async def log_invalid_python(request: Request, call_next): + if request.url.path == "/python": + body = await request.body() + print("[MM-DBG] /python body:", body[:500]) + async def receive(): + return {"type": "http.request", "body": body} + request._receive = receive + return await call_next(request) + + +@app.post("/python", response_model=PythonResponse) +async def run_python(req: PythonRequest) -> PythonResponse: + # Inject tuple_in as a global. The user code can `print(json.dumps({...}))` + # to return a row; we'll parse the last JSON line. + preamble = textwrap.dedent( + """ + import json as __mm_json + import sys as __mm_sys + tuple_in = __mm_json.loads(__mm_sys.stdin.read() or 'null') + """ + ).strip() + full = preamble + "\n" + req.code + + # Pre-flight syntax check. Surfacing a SyntaxError here means the caller + # (an LLM-built MachineUDF) gets a tight, actionable error instead of + # having to spin up a subprocess just to see the same message. We compile + # against `req.code` so reported line numbers match the user's script. + try: + compile(req.code, "", "exec") + except SyntaxError as e: + hint = ( + "Hint: this is almost always a RAW NEWLINE inside a single- or " + "double-quoted string (including f-strings). Use triple-quoted " + 'strings or explicit \\n escapes.' + ) + return PythonResponse( + exit_code=2, + stdout="", + stderr=f"SyntaxError in MachineUDF script: {e.msg} (line {e.lineno}, offset {e.offset}).\n{hint}", + result=None, + ) + + with tempfile.NamedTemporaryFile( + prefix="mm-", suffix=".py", delete=False, mode="w", encoding="utf-8" + ) as f: + f.write(full) + script_path = f.name + + try: + proc = await asyncio.create_subprocess_exec( + PYTHON_INTERPRETER, + script_path, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate( + json.dumps(req.tuple_in).encode("utf-8") if req.tuple_in is not None else b"null" + ), + timeout=req.timeout_seconds, + ) + except asyncio.TimeoutError: + proc.kill() + with contextlib.suppress(Exception): + await proc.wait() + raise HTTPException(status.HTTP_504_GATEWAY_TIMEOUT, "python timed out") + + out = stdout.decode("utf-8", "replace") + err = stderr.decode("utf-8", "replace") + result: Any | None = None + for line in reversed(out.splitlines()): + line = line.strip() + if not line: + continue + try: + result = json.loads(line) + break + except json.JSONDecodeError: + continue + + return PythonResponse( + exit_code=proc.returncode if proc.returncode is not None else -1, + stdout=out, + stderr=err, + result=result, + ) + finally: + with contextlib.suppress(Exception): + os.unlink(script_path) + + +@app.post("/deploy-code", response_model=DeployResponse) +async def deploy_code(req: DeployRequest) -> DeployResponse: + target = (SANDBOX_DIR / req.relative_path).resolve() + if SANDBOX_DIR not in target.parents and target != SANDBOX_DIR: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "path escapes sandbox") + target.parent.mkdir(parents=True, exist_ok=True) + if target.exists() and not req.overwrite: + raise HTTPException(status.HTTP_409_CONFLICT, "file exists and overwrite=false") + data = req.content.encode("utf-8") + target.write_bytes(data) + return DeployResponse(absolute_path=str(target), bytes_written=len(data)) + + +@app.post("/upload-to-dataset", response_model=UploadResponse) +async def upload_to_dataset(req: UploadRequest) -> UploadResponse: + local = Path(os.path.expanduser(req.local_path)).resolve() + if not local.is_file(): + raise HTTPException(status.HTTP_404_NOT_FOUND, f"local file not found: {local}") + + size = local.stat().st_size + headers = { + "Authorization": f"Bearer {req.auth_token}", + "Content-Type": "application/octet-stream", + "Content-Length": str(size), + } + + version_name: str | None = None + dataset_name: str | None = None + async with httpx.AsyncClient(timeout=300.0) as client: + with local.open("rb") as f: + resp = await client.post( + f"{req.file_service_url.rstrip('/')}/api/dataset/{req.dataset_id}/upload", + params={ + "filePath": req.file_path, + "message": f"Uploaded {local.name} via machine-manager", + }, + content=f.read(), + headers=headers, + ) + if resp.status_code >= 300: + raise HTTPException(resp.status_code, f"file-service upload failed: {resp.text}") + + # The upload only stages the file; commit it as a new dataset version + # so the workflow can read it via //. Send an empty + # message body so file-service names the version cleanly (e.g. "v2" + # rather than "v2 - ") for simpler downstream path handling. + commit_resp = await client.post( + f"{req.file_service_url.rstrip('/')}/api/dataset/{req.dataset_id}/version/create", + content="", + headers={ + "Authorization": f"Bearer {req.auth_token}", + "Content-Type": "text/plain", + }, + ) + # "No changes" => the staged file matches the latest version already. + # Treat as success and fall back to the latest version metadata. + no_changes = ( + commit_resp.status_code == 400 + and "No changes detected" in commit_resp.text + ) + if commit_resp.status_code >= 300 and not no_changes: + raise HTTPException( + commit_resp.status_code, + f"file-service version create failed: {commit_resp.text}", + ) + version_name = None + dataset_name = None + if not no_changes: + try: + commit_body = commit_resp.json() + version_name = commit_body.get("datasetVersion", {}).get("name") + dataset_name = commit_body.get("dataset", {}).get("name") or commit_body.get( + "datasetName" + ) + except Exception: + pass + if version_name is None: + try: + latest_resp = await client.get( + f"{req.file_service_url.rstrip('/')}/api/dataset/{req.dataset_id}/version/latest", + headers={"Authorization": f"Bearer {req.auth_token}"}, + ) + if latest_resp.status_code < 300: + latest_body = latest_resp.json() + version_name = latest_body.get("datasetVersion", {}).get("name") + except Exception: + pass + + return UploadResponse( + dataset_id=req.dataset_id, + file_path=req.file_path, + bytes_uploaded=size, + version_name=version_name, + dataset_name=dataset_name, + ) + + +@app.exception_handler(Exception) +async def all_errors(_: Request, exc: Exception) -> JSONResponse: + return JSONResponse( + status_code=500, + content={"error": str(exc), "trace": traceback.format_exc()}, + ) + + +def main() -> None: + import uvicorn + + uvicorn.run( + "machine_manager.server:app", + host=os.environ.get("MACHINE_MANAGER_HOST", "0.0.0.0"), + port=int(os.environ.get("MACHINE_MANAGER_PORT", "5555")), + log_level="info", + ) + + +if __name__ == "__main__": + main() diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql index d6b488e582d..c6e4f2d132d 100644 --- a/sql/texera_ddl.sql +++ b/sql/texera_ddl.sql @@ -74,6 +74,7 @@ DROP TABLE IF EXISTS dataset_user_likes CASCADE; DROP TABLE IF EXISTS dataset_view_count CASCADE; DROP TABLE IF EXISTS site_settings CASCADE; DROP TABLE IF EXISTS computing_unit_user_access CASCADE; +DROP TABLE IF EXISTS machine CASCADE; -- ============================================ -- 4. Create PostgreSQL enum types @@ -435,6 +436,19 @@ CREATE TABLE IF NOT EXISTS computing_unit_user_access FOREIGN KEY (uid) REFERENCES "user"(uid) ON DELETE CASCADE ); +-- machine table (Texera-managed remote/local hosts running machine-manager) +CREATE TABLE IF NOT EXISTS machine +( + mid SERIAL PRIMARY KEY, + uid INT NOT NULL, + name VARCHAR(128) NOT NULL, + url VARCHAR(512) NOT NULL, + token VARCHAR(512), + creation_time TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (uid, name), + FOREIGN KEY (uid) REFERENCES "user"(uid) ON DELETE CASCADE +); + -- START Fulltext search index creation (DO NOT EDIT THIS LINE) CREATE EXTENSION IF NOT EXISTS pgroonga;