Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
550 changes: 550 additions & 0 deletions README_DataGuard.md

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions agent-service/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Local-only demo datasets for DataGuard. Kept on disk for hand-testing the
# auto-trigger flow against single-category CSVs, but not committed —
# `agent-service/demo/diabetes_messy.csv` and its single-category siblings
# can be re-generated or re-downloaded; they shouldn't live in upstream.
demo/
136 changes: 134 additions & 2 deletions agent-service/src/agent/texera-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ import { assembleContext } from "./util/context-utils";
import { compileWorkflowAsync, type WorkflowCompilationResponse } from "../api/compile-api";
import { createLogger } from "../logger";
import type { Logger } from "pino";
import type { FixProposal, IssueType, PermissionDecision } from "../types/dataguard";
import { DataGuardSession } from "./tools/dataguard/dataguard-session";
import type { ApprovalGateway } from "./tools/dataguard/with-approval";
import type { LlmCallFn } from "./tools/dataguard/suggest-fix";
import { createDataGuardTools } from "./tools/dataguard/dataguard-tools";
import type { DatasetView } from "./tools/dataguard/dataset";

const PERSIST_DEBOUNCE_MS = 500;

Expand Down Expand Up @@ -80,8 +86,12 @@ type ReActStepCallback = (step: ReActStep) => void;
* (`WorkflowResultState`), and the tool surface exposed to the LLM. Each call
* to `sendMessage` drives one multi-step generation via the Vercel AI SDK,
* streaming step updates to subscribed websockets.
*
* Also implements `ApprovalGateway` — DataGuard's mutating tools call
* `requestApproval(this, …)` to pause the ReAct loop until the user clicks
* Allow / Deny / Modify in the chat panel.
*/
export class TexeraAgent {
export class TexeraAgent implements ApprovalGateway {
readonly agentId: string;
readonly agentName: string;
readonly modelType: string;
Expand Down Expand Up @@ -125,6 +135,11 @@ export class TexeraAgent {

private log: Logger;

// DataGuard state — see agent/tools/dataguard/
private readonly dataGuardSession = new DataGuardSession();
private pendingDecisions: Map<string, (d: PermissionDecision) => void> = new Map();
private decidedBuffer: Map<string, PermissionDecision> = new Map();

constructor(config: TexeraAgentConfig) {
this.agentId = config.agentId;
this.agentName = config.agentName || `Agent-${config.agentId}`;
Expand Down Expand Up @@ -228,13 +243,46 @@ export class TexeraAgent {
);
}

// DataGuard tools — read-only profile/suggest plus permission-gated apply.
const llmCall: LlmCallFn = async (prompt: string) => {
const { text } = await generateText({
model: this.model,
prompt,
temperature: 0.2,
});
return text;
};
Object.assign(
tools,
createDataGuardTools({
session: this.dataGuardSession,
gateway: this,
llmCall,
})
);

return tools;
}

getState(): AgentStateEnum {
return this.state;
}

/**
* Stateless LLM call used by DataGuard's server-driven /scan endpoint.
* Bypasses the ReAct loop (no tool calls, no step recording) — just sends
* a prompt to the configured model and returns its text response. Used to
* generate FixProposals server-side without going through the chat panel.
*/
public async callLlm(prompt: string): Promise<string> {
const result = await generateText({
model: this.model,
prompt,
temperature: 0.2,
});
return result.text;
}

getWorkflowState(): WorkflowState {
return this.workflowState;
}
Expand Down Expand Up @@ -312,7 +360,9 @@ export class TexeraAgent {
this.stepCallback = callback;
}

private generateStepId(): string {
// Public because DataGuard's permission-gating layer needs a fresh step id
// for a pending-approval step before any AI SDK step has been minted.
public generateStepId(): string {
return `step-${this.agentId}-${++this.stepCounter}-${Date.now()}`;
}

Expand Down Expand Up @@ -823,6 +873,83 @@ export class TexeraAgent {
return relevantSteps;
}

// ============================================================
// DataGuard / ApprovalGateway
// ============================================================

public getDataGuardSession(): DataGuardSession {
return this.dataGuardSession;
}

public setDataGuardDataset(dataset: DatasetView): void {
this.dataGuardSession.setDataset(dataset);
}

// ApprovalGateway: does this issueType have a standing "remember" rule?
public matchesAutoAllowRule(issueType: IssueType): boolean {
return this.dataGuardSession.matchesAutoAllowRule(issueType);
}

// ApprovalGateway: append a pending-approval step into the history and
// broadcast it through the existing stepCallback so the chat panel renders
// the prompt UI.
public emitPendingApproval(stepId: string, proposal: FixProposal): void {
const messageId = this.currentMessageId ?? "<no-message>";
const wf = this.workflowState.getWorkflowContent();
const step: ReActStep = {
id: stepId,
parentId: this.head,
messageId,
stepId: -1,
timestamp: Date.now(),
role: "agent",
content: proposal.action,
isBegin: false,
isEnd: false,
pendingApproval: {
toolName: "apply_fix",
proposal,
riskTier: proposal.riskTier,
},
beforeWorkflowContent: wf,
afterWorkflowContent: wf,
};
this.addStep(step);
this.head = stepId;
}

// ApprovalGateway: wait for the user's decision. Resolves when the server
// receives a WS {type:"decision", stepId, …} message and calls resolveDecision.
public awaitDecision(stepId: string): Promise<PermissionDecision> {
const buffered = this.decidedBuffer.get(stepId);
if (buffered) {
this.decidedBuffer.delete(stepId);
return Promise.resolve(buffered);
}
return new Promise<PermissionDecision>(resolve => {
this.pendingDecisions.set(stepId, resolve);
});
}

// Called from the WS handler when the user clicks Allow / Deny / Modify.
// Returns true if a waiting tool was unblocked, false if buffered for later.
public resolveDecision(stepId: string, decision: PermissionDecision): boolean {
if (decision.remember) {
// The "Allow & don't ask again" verdict also writes an auto-allow rule.
const proposal = this.dataGuardSession.getProposal(extractIssueIdFromStep(this.stepsById, stepId));
if (proposal) this.dataGuardSession.addAutoAllowRule(proposal.issueType);
}
const resolver = this.pendingDecisions.get(stepId);
if (resolver) {
this.pendingDecisions.delete(stepId);
resolver(decision);
return true;
}
// Decision arrived before the tool started awaiting — buffer it.
this.decidedBuffer.set(stepId, decision);
return false;
}

destroy(): void {
if (this.workflowChangeSubscription) {
this.workflowChangeSubscription.unsubscribe();
Expand All @@ -838,3 +965,8 @@ export class TexeraAgent {
this.currentMessageId = undefined;
}
}

function extractIssueIdFromStep(steps: Map<string, ReActStep>, stepId: string): string {
const step = steps.get(stepId);
return step?.pendingApproval?.proposal.issueId ?? "";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Contract tests for cutting the Modify verdict (task #11a / #15) and for
// the remember-flag scope rule (task #12). The HTTP body schema on
// POST /api/agents/:id/dataguard/apply-batch must:
//
// • reject verdict: "modify"
// • reject the modifiedAction field (no longer part of the contract)
// • reject { verdict: "deny", remember: true } — remember is only meaningful for "allow"
// • still accept { verdict: "allow", remember: true } and { verdict: "deny" }
//
// All assertions are at the schema layer (Elysia body validation runs before
// the handler), so we don't need a real loaded dataset or LLM-derived
// proposals to exercise them.

import { beforeEach, describe, expect, test } from "bun:test";
import { buildApp, _resetAgentStoreForTests } from "../../../../server";
import { env } from "../../../../config/env";

const API = env.API_PREFIX;
const app = buildApp();

function url(path: string): string {
return `http://localhost${path}`;
}

async function postJson(path: string, body: unknown): Promise<Response> {
return app.handle(
new Request(url(path), {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
})
);
}

async function readJson<T = unknown>(res: Response): Promise<T> {
return (await res.json()) as T;
}

async function createAgent(): Promise<string> {
const res = await postJson(`${API}/agents`, { modelType: "test-model" });
const body = await readJson<{ id: string }>(res);
return body.id;
}

beforeEach(() => {
_resetAgentStoreForTests();
});

describe(`POST ${API}/agents/:id/dataguard/apply-batch — Modify verdict cut (#11a)`, () => {
test('rejects verdict: "modify" with a 4xx body-schema error', async () => {
const id = await createAgent();
const res = await postJson(`${API}/agents/${id}/dataguard/apply-batch`, {
decisions: [{ issueId: "iss-1", verdict: "modify" }],
});
expect(res.status).toBeGreaterThanOrEqual(400);
expect(res.status).toBeLessThan(500);
});

test("rejects unknown field `modifiedAction` on a decision entry", async () => {
const id = await createAgent();
const res = await postJson(`${API}/agents/${id}/dataguard/apply-batch`, {
decisions: [{ issueId: "iss-1", verdict: "allow", modifiedAction: "Flag instead of replace" }],
});
expect(res.status).toBeGreaterThanOrEqual(400);
expect(res.status).toBeLessThan(500);
});

test('still accepts verdict: "allow" (baseline — parity check that the cut didn\'t over-reach)', async () => {
const id = await createAgent();
const res = await postJson(`${API}/agents/${id}/dataguard/apply-batch`, {
decisions: [{ issueId: "iss-not-loaded", verdict: "allow" }],
});
// No proposal recorded for this issueId, so the handler returns 200 with
// a per-result error string — the SCHEMA accepted the body, which is the
// point of this test.
expect(res.status).toBe(200);
});

test('still accepts verdict: "deny" (baseline)', async () => {
const id = await createAgent();
const res = await postJson(`${API}/agents/${id}/dataguard/apply-batch`, {
decisions: [{ issueId: "iss-not-loaded", verdict: "deny" }],
});
expect(res.status).toBe(200);
});

test('rejects a mixed batch where ANY decision uses verdict: "modify"', async () => {
const id = await createAgent();
const res = await postJson(`${API}/agents/${id}/dataguard/apply-batch`, {
decisions: [
{ issueId: "iss-1", verdict: "allow" },
{ issueId: "iss-2", verdict: "modify" },
{ issueId: "iss-3", verdict: "deny" },
],
});
expect(res.status).toBeGreaterThanOrEqual(400);
expect(res.status).toBeLessThan(500);
});
});

describe(`POST ${API}/agents/:id/dataguard/apply-batch — remember flag scope (#12)`, () => {
test('rejects { verdict: "deny", remember: true } — remember only applies to allow', async () => {
const id = await createAgent();
const res = await postJson(`${API}/agents/${id}/dataguard/apply-batch`, {
decisions: [{ issueId: "iss-1", verdict: "deny", remember: true }],
});
expect(res.status).toBeGreaterThanOrEqual(400);
expect(res.status).toBeLessThan(500);
});

test('accepts { verdict: "allow", remember: true } (baseline)', async () => {
const id = await createAgent();
const res = await postJson(`${API}/agents/${id}/dataguard/apply-batch`, {
decisions: [{ issueId: "iss-not-loaded", verdict: "allow", remember: true }],
});
// Same as above — handler can't find the proposal but the schema accepted.
expect(res.status).toBe(200);
});

test('accepts { verdict: "deny", remember: false } — only `remember: true` + deny is the forbidden combo', async () => {
const id = await createAgent();
const res = await postJson(`${API}/agents/${id}/dataguard/apply-batch`, {
decisions: [{ issueId: "iss-not-loaded", verdict: "deny", remember: false }],
});
expect(res.status).toBe(200);
});

test('accepts { verdict: "deny" } with `remember` omitted entirely', async () => {
const id = await createAgent();
const res = await postJson(`${API}/agents/${id}/dataguard/apply-batch`, {
decisions: [{ issueId: "iss-not-loaded", verdict: "deny" }],
});
expect(res.status).toBe(200);
});
});
Loading
Loading