Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 69 additions & 59 deletions apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import * as ManagedRuntime from "effect/ManagedRuntime";
import * as PubSub from "effect/PubSub";
import * as Scope from "effect/Scope";
import * as Stream from "effect/Stream";
import { it as effectIt } from "@effect/vitest";
import { afterEach, describe, expect, it, vi } from "vite-plus/test";

import { deriveServerPaths, ServerConfig } from "../../config.ts";
Expand Down Expand Up @@ -890,70 +891,79 @@ describe("ProviderCommandReactor", () => {
});
});

it("rejects changing models after start when the provider requires a new thread", async () => {
const harness = await createHarness({ requiresNewThreadForModelChange: true });
const now = "2026-01-01T00:00:00.000Z";
effectIt.effect(
"rejects changing models after start when the provider requires a new thread",
() =>
Effect.gen(function* () {
const harness = yield* Effect.promise(() =>
createHarness({ requiresNewThreadForModelChange: true }),
);
const now = "2026-01-01T00:00:00.000Z";

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-restricted-1"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-restricted-1"),
role: "user",
text: "first",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);
yield* harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-restricted-1"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-restricted-1"),
role: "user",
text: "first",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
});

await waitFor(() => harness.sendTurn.mock.calls.length === 1);
yield* Effect.promise(() => waitFor(() => harness.sendTurn.mock.calls.length === 1));

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-restricted-2"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-restricted-2"),
role: "user",
text: "second",
attachments: [],
},
modelSelection: {
instanceId: ProviderInstanceId.make("codex"),
model: "gpt-5.1-codex",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);
yield* harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-restricted-2"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-restricted-2"),
role: "user",
text: "second",
attachments: [],
},
modelSelection: {
instanceId: ProviderInstanceId.make("codex"),
model: "gpt-5.1-codex",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
});

await waitFor(async () => {
const readModel = await harness.readModel();
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
return (
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ??
false
);
});
yield* Effect.promise(() =>
waitFor(async () => {
const readModel = await harness.readModel();
const thread = readModel.threads.find(
(entry) => entry.id === ThreadId.make("thread-1"),
);
return (
thread?.activities.some(
(activity) => activity.kind === "provider.turn.start.failed",
) ?? false
);
}),
);

expect(harness.sendTurn).toHaveBeenCalledTimes(1);
const readModel = await harness.readModel();
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
expect(
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
).toMatchObject({
payload: {
detail: expect.stringContaining("cannot switch models after the conversation has started"),
},
});
});
expect(harness.sendTurn).toHaveBeenCalledTimes(1);
const readModel = yield* Effect.promise(() => harness.readModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
expect(
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
).toMatchObject({
payload: {
detail: expect.stringContaining(
"cannot switch models after the conversation has started",
),
},
});
}),
);

it("starts a first turn on the requested provider instance even when it differs from the thread model", async () => {
const harness = await createHarness({
Expand Down
25 changes: 13 additions & 12 deletions apps/server/src/provider/Layers/GrokAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,21 @@ exec ${JSON.stringify(mockAgentCommand)} ${JSON.stringify(mockAgentPath)} "$@"
return wrapperPath;
}

async function waitForFileContent(filePath: string, attempts = 40): Promise<string> {
const readAttempt = async (remainingAttempts: number): Promise<string> => {
if (remainingAttempts <= 0) {
throw new Error(`Timed out waiting for file content at ${filePath}`);
}
try {
const raw = await readFile(filePath, "utf8");
function waitForFileContent(filePath: string, attempts = 40): Effect.Effect<string> {
const readAttempt = (remainingAttempts: number): Effect.Effect<string> =>
Effect.gen(function* () {
if (remainingAttempts <= 0) {
return yield* Effect.die(new Error(`Timed out waiting for file content at ${filePath}`));
}
const raw = yield* Effect.tryPromise(() => readFile(filePath, "utf8")).pipe(
Effect.orElseSucceed(() => ""),
);
if (raw.trim().length > 0) {
return raw;
}
} catch {}
await Effect.runPromise(Effect.sleep("25 millis"));
return readAttempt(remainingAttempts - 1);
};
yield* Effect.sleep("25 millis");
return yield* readAttempt(remainingAttempts - 1);
});
return readAttempt(attempts);
}

Expand Down Expand Up @@ -169,7 +170,7 @@ it.layer(grokAdapterTestLayer)("GrokAdapterLive", (it) => {

yield* adapter.stopSession(threadId);

const exitLog = yield* Effect.promise(() => waitForFileContent(exitLogPath));
const exitLog = yield* waitForFileContent(exitLogPath);
assert.include(exitLog, "SIGTERM");
}),
);
Expand Down
115 changes: 54 additions & 61 deletions apps/server/src/provider/Layers/GrokProvider.test.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,60 @@
import * as NodeServices from "@effect/platform-node/NodeServices";
import { describe, expect, it } from "@effect/vitest";
import * as Effect from "effect/Effect";
import * as FileSystem from "effect/FileSystem";
import * as Path from "effect/Path";
import * as Schema from "effect/Schema";
import { describe, expect, it } from "vite-plus/test";
import { ChildProcessSpawner } from "effect/unstable/process";
import { GrokSettings } from "@t3tools/contracts";

import { buildInitialGrokProviderSnapshot, checkGrokProviderStatus } from "./GrokProvider.ts";

const decodeGrokSettings = Schema.decodeSync(GrokSettings);

const runNode = <A, E>(
effect: Effect.Effect<
A,
E,
FileSystem.FileSystem | Path.Path | ChildProcessSpawner.ChildProcessSpawner
>,
): Promise<A> => Effect.runPromise(effect.pipe(Effect.provide(NodeServices.layer)));

describe("buildInitialGrokProviderSnapshot", () => {
it("returns a disabled snapshot when settings.enabled is false", async () => {
const snapshot = await Effect.runPromise(
buildInitialGrokProviderSnapshot(decodeGrokSettings({ enabled: false })),
);
expect(snapshot.enabled).toBe(false);
expect(snapshot.status).toBe("disabled");
expect(snapshot.installed).toBe(false);
expect(snapshot.message).toContain("disabled");
});
it.effect("returns a disabled snapshot when settings.enabled is false", () =>
Effect.gen(function* () {
const snapshot = yield* buildInitialGrokProviderSnapshot(
decodeGrokSettings({ enabled: false }),
);
expect(snapshot.enabled).toBe(false);
expect(snapshot.status).toBe("disabled");
expect(snapshot.installed).toBe(false);
expect(snapshot.message).toContain("disabled");
}),
);

it("returns a pending snapshot by default", async () => {
const snapshot = await Effect.runPromise(
buildInitialGrokProviderSnapshot(decodeGrokSettings({})),
);
expect(snapshot.enabled).toBe(true);
expect(snapshot.installed).toBe(true);
expect(snapshot.status).toBe("warning");
expect(snapshot.version).toBeNull();
expect(snapshot.message).toContain("Checking Grok");
expect(snapshot.requiresNewThreadForModelChange).toBe(true);
});
it.effect("returns a pending snapshot by default", () =>
Effect.gen(function* () {
const snapshot = yield* buildInitialGrokProviderSnapshot(decodeGrokSettings({}));
expect(snapshot.enabled).toBe(true);
expect(snapshot.installed).toBe(true);
expect(snapshot.status).toBe("warning");
expect(snapshot.version).toBeNull();
expect(snapshot.message).toContain("Checking Grok");
expect(snapshot.requiresNewThreadForModelChange).toBe(true);
}),
);
});

describe("checkGrokProviderStatus", () => {
it("reports the binary as missing when the binary path does not resolve", async () => {
const snapshot = await runNode(
checkGrokProviderStatus(
it.layer(NodeServices.layer)("checkGrokProviderStatus", (it) => {
it.effect("reports the binary as missing when the binary path does not resolve", () =>
Effect.gen(function* () {
const snapshot = yield* checkGrokProviderStatus(
decodeGrokSettings({
enabled: true,
binaryPath: "/definitely/not/installed/grok-binary",
}),
),
);
expect(snapshot.enabled).toBe(true);
expect(snapshot.installed).toBe(false);
expect(snapshot.status).toBe("error");
expect(snapshot.message).toMatch(/not installed|not on PATH|Failed to execute/);
});
);
expect(snapshot.enabled).toBe(true);
expect(snapshot.installed).toBe(false);
expect(snapshot.status).toBe("error");
expect(snapshot.message).toMatch(/not installed|not on PATH|Failed to execute/);
}),
);

it("reports an installed CLI as unhealthy when --version exits non-zero", async () => {
const snapshot = await runNode(
Effect.scoped(
it.effect("reports an installed CLI as unhealthy when --version exits non-zero", () =>
Effect.gen(function* () {
const snapshot = yield* Effect.scoped(
Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem;
const path = yield* Path.Path;
Expand All @@ -77,18 +70,18 @@ describe("checkGrokProviderStatus", () => {
decodeGrokSettings({ enabled: true, binaryPath: grokPath }),
);
}),
),
);
);

expect(snapshot.enabled).toBe(true);
expect(snapshot.installed).toBe(true);
expect(snapshot.status).toBe("error");
expect(snapshot.message).toContain("broken grok install");
});
expect(snapshot.enabled).toBe(true);
expect(snapshot.installed).toBe(true);
expect(snapshot.status).toBe("error");
expect(snapshot.message).toContain("broken grok install");
}),
);

it("reports an error when ACP model discovery is unavailable", async () => {
const snapshot = await runNode(
Effect.scoped(
it.effect("reports an error when ACP model discovery is unavailable", () =>
Effect.gen(function* () {
const snapshot = yield* Effect.scoped(
Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem;
const path = yield* Path.Path;
Expand All @@ -104,12 +97,12 @@ describe("checkGrokProviderStatus", () => {
decodeGrokSettings({ enabled: true, binaryPath: grokPath }),
);
}),
),
);
);

expect(snapshot.status).toBe("error");
expect(snapshot.installed).toBe(true);
expect(snapshot.models.map((model) => model.slug)).toEqual(["grok-build"]);
expect(snapshot.message).toContain("ACP startup failed");
});
expect(snapshot.status).toBe("error");
expect(snapshot.installed).toBe(true);
expect(snapshot.models.map((model) => model.slug)).toEqual(["grok-build"]);
expect(snapshot.message).toContain("ACP startup failed");
}),
);
});
Loading
Loading