Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions agent-service/src/agent/prompts.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { describe, expect, test } from "bun:test";
import { buildSystemPrompt } from "./prompts";
import { WorkflowSystemMetadata } from "./util/workflow-system-metadata";

/**
* Construct a fresh, empty metadata store. `buildAllowedOperatorSchemas` will
* yield "No operators available." which is fine — we are only asserting on the
* envelope sections (profiler guide, key principles, UDF guides), not on the
* operator-schema block.
*/
function emptyMetadata(): WorkflowSystemMetadata {
return new (WorkflowSystemMetadata as any)();
}

describe("buildSystemPrompt — base content (regression guard)", () => {
test("includes the dataflow primer and key principles", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("What is Dataflow?");
expect(prompt).toContain("Key Principles");
expect(prompt).toContain("Available Operators");
});

test("renders 'No operators available.' when metadata is empty", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("No operators available.");
});
});

describe("buildSystemPrompt — UDF gating", () => {
test("includes Python UDF guide when Python is allowed (default = all)", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("## Python UDF Guide");
});

test("includes R UDF guide when R is allowed (default = all)", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("## R UDF Guide");
});

test("omits Python UDF guide when the allowed list excludes PythonUDFV2", () => {
const prompt = buildSystemPrompt(emptyMetadata(), ["CSVFileScan"]);
expect(prompt).not.toContain("## Python UDF Guide");
});

test("omits R UDF guide when the allowed list excludes RUDF", () => {
const prompt = buildSystemPrompt(emptyMetadata(), ["CSVFileScan"]);
expect(prompt).not.toContain("## R UDF Guide");
});
});

describe("buildSystemPrompt — Profiler Guide (Phase 2)", () => {
test("always includes the Profiler Guide section", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("## Profiler Guide");
});

test("Profiler Guide is included even when the allowed-operator list is restrictive", () => {
// Profiler tools are always registered regardless of which builder operators
// are exposed, so the guide must not be gated on the allow-list.
const prompt = buildSystemPrompt(emptyMetadata(), ["CSVFileScan"]);
expect(prompt).toContain("## Profiler Guide");
});

test("names every Phase-1 read-only profiler tool", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("getProfilerSummary");
expect(prompt).toContain("listHotOperators");
expect(prompt).toContain("getOperatorMetrics");
expect(prompt).toContain("getOptimizationHints");
expect(prompt).toContain("compareToBaseline");
});

test("names every rule id surfaced by the rule engine", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("SCAN_FULL_TABLE_NO_FILTER");
expect(prompt).toContain("UPSTREAM_OVERPRODUCTION");
expect(prompt).toContain("JOIN_HIGH_FANIN_LOW_FANOUT");
expect(prompt).toContain("RUNTIME_OUTLIER");
expect(prompt).toContain("IDLE_HEAVY");
expect(prompt).toContain("LOW_PARALLELISM_HOT_OP");
});

test("instructs the agent to use the profiler proactively for performance questions", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("Proactively");
// Heuristic anchor: the guide must talk about slowness / bottlenecks
expect(prompt.toLowerCase()).toContain("slowness");
expect(prompt.toLowerCase()).toContain("bottleneck");
});

test("instructs the agent to surface proposals via proposeOperatorChange, not modifyOperator", () => {
const prompt = buildSystemPrompt(emptyMetadata());
// The load-bearing rule of Phase 3 — proposals go through the structured tool,
// not direct mutation.
expect(prompt).toContain("proposeOperatorChange");
expect(prompt).toContain("Never call `modifyOperator`");
expect(prompt).toContain("Apply / Reject card");
});

test("documents the required arguments for proposeOperatorChange", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("proposeOperatorChange — required arguments");
expect(prompt).toContain("operatorId");
expect(prompt).toContain("propertyChanges");
expect(prompt).toContain("reasoning");
expect(prompt).toContain("expectedImpact");
expect(prompt).toContain("firingHints");
});

test("teaches the proactive-call-out example using proposeOperatorChange", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("Example — proactive bottleneck call-out (Phase 3 flow)");
expect(prompt).toContain("Apply or Reject");
});

test("permits multiple independent proposals in a single turn", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("Example — multiple independent suggestions");
expect(prompt).toContain("more than once");
expect(prompt).toContain("Do NOT bundle");
});

test("teaches proposeOptimizationPlan for related multi-step optimizations (Phase 4)", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("Multi-step optimization plans");
expect(prompt).toContain("proposeOptimizationPlan");
expect(prompt).toContain("Apply All");
});

test("explains when to use a plan vs single proposals vs multiple proposals", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("When to use a plan vs single proposals");
// The three branches must each be named so the model can choose correctly.
expect(prompt).toContain("RELATED and ORDERED");
expect(prompt).toContain("INDEPENDENT");
expect(prompt).toContain("exactly one change");
});

test("documents the required arguments for proposeOptimizationPlan", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("proposeOptimizationPlan — required arguments");
expect(prompt).toContain("planTitle");
expect(prompt).toContain("planRationale");
expect(prompt).toContain("steps");
expect(prompt).toContain("2–10");
});

test("includes a Phase 4 multi-step plan example", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("Example — multi-step plan (Phase 4 flow)");
});

test("distinguishes direct user requests (use modifyOperator) from agent-initiated proposals", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("Direct user requests are different");
expect(prompt).toContain("Example — direct user request");
});

test("teaches the no-bottleneck case via a final example", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("Example — no bottleneck found");
});

test("tells the agent how to react to NO_DATA from the snapshot tools", () => {
const prompt = buildSystemPrompt(emptyMetadata());
expect(prompt).toContain("No profiler data available");
});
});

describe("buildSystemPrompt — ordering invariants", () => {
test("operator schemas appear before the appended guide sections", () => {
const prompt = buildSystemPrompt(emptyMetadata());
const operatorsIdx = prompt.indexOf("## Available Operators");
const profilerIdx = prompt.indexOf("## Profiler Guide");
expect(operatorsIdx).toBeGreaterThanOrEqual(0);
expect(profilerIdx).toBeGreaterThan(operatorsIdx);
});

test("Profiler Guide comes after the UDF guides when both are present", () => {
const prompt = buildSystemPrompt(emptyMetadata());
const pyIdx = prompt.indexOf("## Python UDF Guide");
const rIdx = prompt.indexOf("## R UDF Guide");
const profilerIdx = prompt.indexOf("## Profiler Guide");
expect(profilerIdx).toBeGreaterThan(pyIdx);
expect(profilerIdx).toBeGreaterThan(rIdx);
});
});
101 changes: 101 additions & 0 deletions agent-service/src/agent/prompts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,103 @@ import { WorkflowSystemMetadata } from "./util/workflow-system-metadata";
const PYTHON_UDF_OPERATOR_TYPES = ["PythonUDFV2"];
const R_UDF_OPERATOR_TYPES = ["RUDF"];

const PROFILER_INSTRUCTIONS = `## Profiler Guide

When the user has run their workflow with the profiler heatmap enabled, a per-message profiler snapshot is attached and the read-only profiler tools become useful for diagnosing performance.

### Read-only profiler tools

- \`getProfilerSummary\` — top-level snapshot: view, hottest operator, hint count, baseline status. Always call this first when investigating performance.
- \`listHotOperators\` — top-N hottest operators with full per-operator metrics.
- \`getOperatorMetrics\` — full metrics for a single operator by id.
- \`getOptimizationHints\` — hints fired by the rule engine. Rule ids include: SCAN_FULL_TABLE_NO_FILTER, UPSTREAM_OVERPRODUCTION, JOIN_HIGH_FANIN_LOW_FANOUT, RUNTIME_OUTLIER, IDLE_HEAVY, LOW_PARALLELISM_HOT_OP.
- \`compareToBaseline\` — per-operator deltas (current run vs an uploaded baseline run).

If a tool returns a "No profiler data available" message, tell the user how to enable profiling (turn on the gauge icon in the run-bar and re-run the workflow) — do not guess about performance without data.

### When to use the profiler

- **Proactively**: whenever the user asks about slowness, performance, bottlenecks, optimization, why a workflow is taking long, or anything similar — start by calling \`getProfilerSummary\`.
- **On request**: questions about specific operator runtimes, fired hints, or run-vs-run comparisons.
- **Do NOT use**: for questions about building workflows, schema, data semantics, or operator behavior unrelated to performance.

### Canonical flow for performance questions

1. Call \`getProfilerSummary\` to confirm data exists and identify the hottest operator.
2. Call \`getOptimizationHints\` to see what the rule engine flagged.
3. Optionally call \`listHotOperators\` or \`getOperatorMetrics\` for more detail.
4. Summarize findings — cite the operator id, heat score, and the specific hint(s) that fired.
5. If a hint directly supports a mechanical change, call \`proposeOperatorChange\` to surface a structured proposal. The frontend will render an Apply / Reject card next to your message.
6. **Never call \`modifyOperator\` (or any other mutating tool) for profiler-driven suggestions.** \`proposeOperatorChange\` is the only correct channel — the UI's Apply button is the confirmation gate, and it invokes the mutation directly on the user's side without re-asking you.

### proposeOperatorChange — required arguments

Every call must provide:
- \`operatorId\`: the exact operator id (not the display name).
- \`propertyChanges\`: a sparse object containing ONLY the keys to change (merge-style; do not echo unchanged properties).
- \`reasoning\`: cites the firing hint(s) by ruleId (e.g. "RUNTIME_OUTLIER and LOW_PARALLELISM_HOT_OP on python-udf-1").
- \`expectedImpact\`: what the user should see after applying.
- \`firingHints\` (optional but recommended): the ruleIds (e.g. \`["RUNTIME_OUTLIER", "LOW_PARALLELISM_HOT_OP"]\`).

Only propose changes that fired hints directly support. Do not invent optimizations the rule engine didn't surface.

### Direct user requests are different

If the user explicitly says "set workers to 4 on python-udf-1" or similar imperative instruction, call \`modifyOperator\` directly — they have already approved the change in the request itself. \`proposeOperatorChange\` is for agent-initiated suggestions where the user has NOT yet expressed an intent to change anything.

### Example — proactive bottleneck call-out (Phase 3 flow)

User: "My workflow feels slow."

(You call \`getProfilerSummary\` → hottest is \`python-udf-1\` score 0.97; then \`getOptimizationHints\` → \`RUNTIME_OUTLIER\` and \`LOW_PARALLELISM_HOT_OP\` both fire on \`python-udf-1\`. Then you call \`proposeOperatorChange\` with operatorId=\`python-udf-1\`, propertyChanges=\`{ workers: 4 }\`, reasoning="RUNTIME_OUTLIER and LOW_PARALLELISM_HOT_OP both fired on python-udf-1.", expectedImpact="Should cut runtime via more parallelism.", firingHints=\`["RUNTIME_OUTLIER","LOW_PARALLELISM_HOT_OP"]\`.)

Response: "The Python UDF (\`python-udf-1\`) is your bottleneck — its runtime is far above the workflow median (RUNTIME_OUTLIER), and it's running with only 1 worker while being hot (LOW_PARALLELISM_HOT_OP). I've proposed increasing \`workers\` from 1 to 4 — you can Apply or Reject the proposal below."

### Example — multiple independent suggestions

You may call \`proposeOperatorChange\` more than once in a single turn when several distinct operators have independent hints. Each proposal renders its own Apply / Reject card. Do NOT bundle unrelated changes into a single proposal.

### Multi-step optimization plans (proposeOptimizationPlan)

When the optimization is a SEQUENCE of related changes that build on each other (e.g. "first push the Filter upstream, then bump the UDF workers, then re-shard"), use \`proposeOptimizationPlan\` instead of multiple \`proposeOperatorChange\` calls. The frontend renders the plan as one card with per-step Apply / Reject buttons plus an "Apply All" button.

### When to use a plan vs single proposals

- **Use \`proposeOptimizationPlan\`** when the steps are RELATED and ORDERED — fixing one issue depends on or interacts with another (e.g. push a Filter upstream, then retune the downstream operator's parameters). Minimum 2 steps; maximum 10.
- **Use multiple \`proposeOperatorChange\`** when the suggestions are INDEPENDENT — each operator has its own unrelated hint and the user might want to accept some and reject others without ordering implications.
- **Use a single \`proposeOperatorChange\`** when there is exactly one change to propose.

### proposeOptimizationPlan — required arguments

- \`planTitle\`: short title (e.g. "Optimize the Python UDF bottleneck").
- \`planRationale\`: why these steps together, citing the firing hints.
- \`firingHints\` (optional): ruleIds justifying the plan as a whole.
- \`steps\`: ordered array (length 2–10). Each step has \`operatorId\`, sparse \`propertyChanges\`, \`description\`, \`reasoning\`, \`expectedImpact\`.

### Example — multi-step plan (Phase 4 flow)

User: "What can we do to make this faster?"

(You investigate via \`getProfilerSummary\` and \`getOptimizationHints\` → \`SCAN_FULL_TABLE_NO_FILTER\` on \`csv-scan-1\` and \`LOW_PARALLELISM_HOT_OP\` on \`python-udf-1\`. The fixes are ordered — filtering upstream changes the data volume the UDF sees. You call \`proposeOptimizationPlan\` with planTitle="Reduce Python UDF load", planRationale="SCAN_FULL_TABLE_NO_FILTER and LOW_PARALLELISM_HOT_OP both feed into the same hot path; filtering upstream lowers the working set before we add parallelism.", steps=[{ operatorId: "filter-1", propertyChanges: { predicate: "is not null" }, description: "Add a Filter between csv-scan-1 and python-udf-1", reasoning: "SCAN_FULL_TABLE_NO_FILTER", expectedImpact: "Drops rows before the UDF" }, { operatorId: "python-udf-1", propertyChanges: { workers: 4 }, description: "Bump UDF workers to 4", reasoning: "LOW_PARALLELISM_HOT_OP", expectedImpact: "Parallelizes the remaining work" }].)

Response: "Two changes will compound here. I've proposed a 2-step plan — apply them in order via the card below."

### Example — direct user request (use modifyOperator, not propose)

User: "Set workers to 4 on python-udf-1."

(You call \`modifyOperator\` directly. No proposal — the user already approved by asking.)

Response: "Done — \`python-udf-1\` now has 4 workers."

### Example — no bottleneck found

User: "Anything slow?"

(You call \`getProfilerSummary\` → max score 0.34, 0 hints. You do NOT call \`proposeOperatorChange\` — there's nothing to propose.)

Response: "I checked the profiler — nothing's bottlenecked. The hottest operator (\`csv-scan-1\`) is at a heat score of only 0.34 and no optimization hints fired. The workflow looks healthy."`;

const PYTHON_UDF_INSTRUCTIONS = `## Python UDF Guide

Python UDF operators run user-defined Python code. There are 2 APIs to process data:
Expand Down Expand Up @@ -290,6 +387,10 @@ export function buildSystemPrompt(metadataStore: WorkflowSystemMetadata, allowed
const extraSections: string[] = [];
if (pythonAllowed) extraSections.push(PYTHON_UDF_INSTRUCTIONS);
if (rAllowed) extraSections.push(R_UDF_INSTRUCTIONS);
// Profiler instructions are not gated on allowed operator types — the read-only
// profiler tools are always available regardless of which builder operators
// are exposed to the agent.
extraSections.push(PROFILER_INSTRUCTIONS);

const base = SYSTEM_PROMPT_TEMPLATE.replace("{{OPERATOR_SCHEMA}}", operatorSchemas);
return extraSections.length > 0 ? `${base}\n${extraSections.join("\n\n")}\n` : base;
Expand Down
Loading
Loading