Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions apps/server/src/vcs/VcsStatusBroadcaster.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,4 +417,69 @@ describe("VcsStatusBroadcaster", () => {
assert.isTrue(Option.isSome(yield* Deferred.poll(remoteInterrupted)));
}).pipe(Effect.provide(testLayer));
});

// `it.live`, not `it.effect`: this exercises a real `fs.watch` plus
// `Stream.debounce`, which need the live clock (TestClock would freeze them).
it.live(
"pushes a local status update when .git/HEAD changes on disk",
() => {
const state = {
currentLocalStatus: baseLocalStatus,
currentRemoteStatus: baseRemoteStatus,
localStatusCalls: 0,
remoteStatusCalls: 0,
localInvalidationCalls: 0,
remoteInvalidationCalls: 0,
};

return Effect.gen(function* () {
const fileSystem = yield* FileSystem.FileSystem;
const path = yield* Path.Path;
const repoDir = yield* fileSystem.makeTempDirectoryScoped({ prefix: "t3-vcs-watch-" });
const gitDir = path.join(repoDir, ".git");
yield* fileSystem.makeDirectory(gitDir, { recursive: true });
const headPath = path.join(gitDir, "HEAD");
yield* fileSystem.writeFileString(headPath, "ref: refs/heads/main\n");

const broadcaster = yield* VcsStatusBroadcaster.VcsStatusBroadcaster;
const snapshot = yield* Deferred.make<VcsStatusStreamEvent>();
const localUpdated = yield* Deferred.make<VcsStatusStreamEvent>();
yield* Stream.runForEach(broadcaster.streamStatus({ cwd: repoDir }), (event) => {
if (event._tag === "snapshot") {
return Deferred.succeed(snapshot, event).pipe(Effect.ignore);
}
if (event._tag === "localUpdated") {
return Deferred.succeed(localUpdated, event).pipe(Effect.ignore);
}
return Effect.void;
}).pipe(Effect.forkScoped);

// Wait for the initial snapshot so the cached local status is the base ref
// before we change what the workflow will report on the next refresh.
yield* Deferred.await(snapshot);
state.currentLocalStatus = { ...baseLocalStatus, refName: "feature/switched-in-terminal" };

// Simulate an external `git switch` by rewriting HEAD. The watcher starts
// asynchronously, so rewrite on an interval (spaced past the debounce)
// until it observes a change — keeps the test deterministic, not racy.
yield* Effect.forkScoped(
Effect.gen(function* () {
let revision = 0;
while (true) {
revision += 1;
yield* fileSystem.writeFileString(headPath, `ref: refs/heads/branch-${revision}\n`);
yield* Effect.sleep(Duration.millis(250));
}
}),
);

const event = yield* Deferred.await(localUpdated);
assert.deepStrictEqual(event, {
_tag: "localUpdated",
local: state.currentLocalStatus,
} satisfies VcsStatusStreamEvent);
}).pipe(Effect.provide(makeTestLayer(state)));
},
20_000,
);
});
63 changes: 58 additions & 5 deletions apps/server/src/vcs/VcsStatusBroadcaster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as Exit from "effect/Exit";
import * as Fiber from "effect/Fiber";
import * as FileSystem from "effect/FileSystem";
import * as Layer from "effect/Layer";
import * as Path from "effect/Path";
import * as PubSub from "effect/PubSub";
import * as Ref from "effect/Ref";
import * as Schedule from "effect/Schedule";
Expand All @@ -24,6 +25,9 @@ import { mergeGitStatusParts } from "@t3tools/shared/git";
import * as GitWorkflowService from "../git/GitWorkflowService.ts";

const DEFAULT_VCS_STATUS_REFRESH_INTERVAL = Duration.seconds(30);
// Git writes several files per operation (HEAD, then logs, index, etc.); debounce
// so one checkout triggers a single local-status refresh rather than a burst.
const GIT_WATCH_DEBOUNCE = Duration.millis(150);
const VCS_STATUS_REFRESH_FAILURE_BASE_DELAY = Duration.seconds(30);
const VCS_STATUS_REFRESH_FAILURE_MAX_DELAY = Duration.minutes(15);

Expand All @@ -44,6 +48,8 @@ interface CachedVcsStatus {

interface ActiveRemotePoller {
readonly fiber: Fiber.Fiber<void, never>;
/** Watches `.git` (HEAD/packed-refs) so external checkouts refresh local status. */
readonly watchFiber: Fiber.Fiber<void, never>;
readonly subscriberCount: number;
}

Expand Down Expand Up @@ -99,6 +105,7 @@ export const layer = Layer.effect(
Effect.gen(function* () {
const workflow = yield* GitWorkflowService.GitWorkflowService;
const fs = yield* FileSystem.FileSystem;
const path = yield* Path.Path;
const changesPubSub = yield* Effect.acquireRelease(
PubSub.unbounded<VcsStatusChange>(),
(pubsub) => PubSub.shutdown(pubsub),
Expand Down Expand Up @@ -300,6 +307,44 @@ export const layer = Layer.effect(
});
};

// Resolve the directory that holds this checkout's `HEAD`, so we can watch it.
const resolveGitWatchDir = Effect.fn("VcsStatusBroadcaster.resolveGitWatchDir")(function* (
cwd: string,
) {
const dotGit = path.join(cwd, ".git");
const type = yield* fs.stat(dotGit).pipe(
Effect.map((info) => info.type),
Effect.orElseSucceed(() => null),
);
// Standard repo: HEAD lives directly inside `<cwd>/.git`.
if (type === "Directory") return dotGit;
// Linked worktree: `.git` is a file `gitdir: <path>` and HEAD lives there.
if (type === "File") {
const content = yield* fs.readFileString(dotGit).pipe(Effect.orElseSucceed(() => ""));
const target = content.match(/^gitdir:\s*(.+?)\s*$/m)?.[1]?.trim();
if (!target) return null;
return path.isAbsolute(target) ? target : path.resolve(cwd, target);
}
return null;
});

// Push a fresh local status whenever HEAD/packed-refs change on disk — e.g. a
// `git switch`/`git checkout` run in a terminal outside the app. Without this
// the UI only reconciles on window refocus.
const makeLocalWatchLoop = (cwd: string) =>
Effect.gen(function* () {
const watchDir = yield* resolveGitWatchDir(cwd);
if (!watchDir) return;
yield* fs.watch(watchDir).pipe(
Stream.filter((event) => {
const base = path.basename(event.path);
return base === "HEAD" || base === "packed-refs";
}),
Stream.debounce(GIT_WATCH_DEBOUNCE),
Stream.runForEach(() => refreshLocalStatus(cwd).pipe(Effect.ignore)),
);
}).pipe(Effect.ignoreCause({ log: true }));

const retainRemotePoller = Effect.fn("VcsStatusBroadcaster.retainRemotePoller")(function* (
cwd: string,
automaticRemoteRefreshInterval: Effect.Effect<Duration.Duration, never>,
Expand All @@ -315,12 +360,17 @@ export const layer = Layer.effect(
return Effect.succeed([undefined, nextPollers] as const);
}

return makeRemoteRefreshLoop(cwd, automaticRemoteRefreshInterval).pipe(
Effect.forkIn(broadcasterScope),
Effect.map((fiber) => {
return Effect.all([
makeRemoteRefreshLoop(cwd, automaticRemoteRefreshInterval).pipe(
Effect.forkIn(broadcasterScope),
),
makeLocalWatchLoop(cwd).pipe(Effect.forkIn(broadcasterScope)),
]).pipe(
Effect.map(([fiber, watchFiber]) => {
const nextPollers = new Map(activePollers);
nextPollers.set(cwd, {
fiber,
watchFiber,
subscriberCount: 1,
});
return [undefined, nextPollers] as const;
Expand Down Expand Up @@ -349,11 +399,14 @@ export const layer = Layer.effect(

const nextPollers = new Map(activePollers);
nextPollers.delete(cwd);
return [existing.fiber, nextPollers] as const;
return [existing, nextPollers] as const;
});

if (pollerToInterrupt) {
yield* Fiber.interrupt(pollerToInterrupt).pipe(Effect.ignore);
yield* Effect.all(
[Fiber.interrupt(pollerToInterrupt.fiber), Fiber.interrupt(pollerToInterrupt.watchFiber)],
{ discard: true },
).pipe(Effect.ignore);
}
});

Expand Down
Loading