diff --git a/apps/server/src/vcs/VcsStatusBroadcaster.test.ts b/apps/server/src/vcs/VcsStatusBroadcaster.test.ts index d47dda29bf0..4ccf5d28039 100644 --- a/apps/server/src/vcs/VcsStatusBroadcaster.test.ts +++ b/apps/server/src/vcs/VcsStatusBroadcaster.test.ts @@ -417,4 +417,69 @@ describe("VcsStatusBroadcaster", () => { assert.isTrue(Option.isSome(yield* Deferred.poll(remoteInterrupted))); }).pipe(Effect.provide(testLayer)); }); + + // `it.live`, not `it.effect`: this exercises a real `fs.watch` plus + // `Stream.debounce`, which need the live clock (TestClock would freeze them). + it.live( + "pushes a local status update when .git/HEAD changes on disk", + () => { + const state = { + currentLocalStatus: baseLocalStatus, + currentRemoteStatus: baseRemoteStatus, + localStatusCalls: 0, + remoteStatusCalls: 0, + localInvalidationCalls: 0, + remoteInvalidationCalls: 0, + }; + + return Effect.gen(function* () { + const fileSystem = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const repoDir = yield* fileSystem.makeTempDirectoryScoped({ prefix: "t3-vcs-watch-" }); + const gitDir = path.join(repoDir, ".git"); + yield* fileSystem.makeDirectory(gitDir, { recursive: true }); + const headPath = path.join(gitDir, "HEAD"); + yield* fileSystem.writeFileString(headPath, "ref: refs/heads/main\n"); + + const broadcaster = yield* VcsStatusBroadcaster.VcsStatusBroadcaster; + const snapshot = yield* Deferred.make(); + const localUpdated = yield* Deferred.make(); + yield* Stream.runForEach(broadcaster.streamStatus({ cwd: repoDir }), (event) => { + if (event._tag === "snapshot") { + return Deferred.succeed(snapshot, event).pipe(Effect.ignore); + } + if (event._tag === "localUpdated") { + return Deferred.succeed(localUpdated, event).pipe(Effect.ignore); + } + return Effect.void; + }).pipe(Effect.forkScoped); + + // Wait for the initial snapshot so the cached local status is the base ref + // before we change what the workflow will report on the next refresh. + yield* Deferred.await(snapshot); + state.currentLocalStatus = { ...baseLocalStatus, refName: "feature/switched-in-terminal" }; + + // Simulate an external `git switch` by rewriting HEAD. The watcher starts + // asynchronously, so rewrite on an interval (spaced past the debounce) + // until it observes a change — keeps the test deterministic, not racy. + yield* Effect.forkScoped( + Effect.gen(function* () { + let revision = 0; + while (true) { + revision += 1; + yield* fileSystem.writeFileString(headPath, `ref: refs/heads/branch-${revision}\n`); + yield* Effect.sleep(Duration.millis(250)); + } + }), + ); + + const event = yield* Deferred.await(localUpdated); + assert.deepStrictEqual(event, { + _tag: "localUpdated", + local: state.currentLocalStatus, + } satisfies VcsStatusStreamEvent); + }).pipe(Effect.provide(makeTestLayer(state))); + }, + 20_000, + ); }); diff --git a/apps/server/src/vcs/VcsStatusBroadcaster.ts b/apps/server/src/vcs/VcsStatusBroadcaster.ts index 40cdcf2c809..a08f1ce698d 100644 --- a/apps/server/src/vcs/VcsStatusBroadcaster.ts +++ b/apps/server/src/vcs/VcsStatusBroadcaster.ts @@ -5,6 +5,7 @@ import * as Exit from "effect/Exit"; import * as Fiber from "effect/Fiber"; import * as FileSystem from "effect/FileSystem"; import * as Layer from "effect/Layer"; +import * as Path from "effect/Path"; import * as PubSub from "effect/PubSub"; import * as Ref from "effect/Ref"; import * as Schedule from "effect/Schedule"; @@ -24,6 +25,9 @@ import { mergeGitStatusParts } from "@t3tools/shared/git"; import * as GitWorkflowService from "../git/GitWorkflowService.ts"; const DEFAULT_VCS_STATUS_REFRESH_INTERVAL = Duration.seconds(30); +// Git writes several files per operation (HEAD, then logs, index, etc.); debounce +// so one checkout triggers a single local-status refresh rather than a burst. +const GIT_WATCH_DEBOUNCE = Duration.millis(150); const VCS_STATUS_REFRESH_FAILURE_BASE_DELAY = Duration.seconds(30); const VCS_STATUS_REFRESH_FAILURE_MAX_DELAY = Duration.minutes(15); @@ -44,6 +48,8 @@ interface CachedVcsStatus { interface ActiveRemotePoller { readonly fiber: Fiber.Fiber; + /** Watches `.git` (HEAD/packed-refs) so external checkouts refresh local status. */ + readonly watchFiber: Fiber.Fiber; readonly subscriberCount: number; } @@ -99,6 +105,7 @@ export const layer = Layer.effect( Effect.gen(function* () { const workflow = yield* GitWorkflowService.GitWorkflowService; const fs = yield* FileSystem.FileSystem; + const path = yield* Path.Path; const changesPubSub = yield* Effect.acquireRelease( PubSub.unbounded(), (pubsub) => PubSub.shutdown(pubsub), @@ -300,6 +307,44 @@ export const layer = Layer.effect( }); }; + // Resolve the directory that holds this checkout's `HEAD`, so we can watch it. + const resolveGitWatchDir = Effect.fn("VcsStatusBroadcaster.resolveGitWatchDir")(function* ( + cwd: string, + ) { + const dotGit = path.join(cwd, ".git"); + const type = yield* fs.stat(dotGit).pipe( + Effect.map((info) => info.type), + Effect.orElseSucceed(() => null), + ); + // Standard repo: HEAD lives directly inside `/.git`. + if (type === "Directory") return dotGit; + // Linked worktree: `.git` is a file `gitdir: ` and HEAD lives there. + if (type === "File") { + const content = yield* fs.readFileString(dotGit).pipe(Effect.orElseSucceed(() => "")); + const target = content.match(/^gitdir:\s*(.+?)\s*$/m)?.[1]?.trim(); + if (!target) return null; + return path.isAbsolute(target) ? target : path.resolve(cwd, target); + } + return null; + }); + + // Push a fresh local status whenever HEAD/packed-refs change on disk — e.g. a + // `git switch`/`git checkout` run in a terminal outside the app. Without this + // the UI only reconciles on window refocus. + const makeLocalWatchLoop = (cwd: string) => + Effect.gen(function* () { + const watchDir = yield* resolveGitWatchDir(cwd); + if (!watchDir) return; + yield* fs.watch(watchDir).pipe( + Stream.filter((event) => { + const base = path.basename(event.path); + return base === "HEAD" || base === "packed-refs"; + }), + Stream.debounce(GIT_WATCH_DEBOUNCE), + Stream.runForEach(() => refreshLocalStatus(cwd).pipe(Effect.ignore)), + ); + }).pipe(Effect.ignoreCause({ log: true })); + const retainRemotePoller = Effect.fn("VcsStatusBroadcaster.retainRemotePoller")(function* ( cwd: string, automaticRemoteRefreshInterval: Effect.Effect, @@ -315,12 +360,17 @@ export const layer = Layer.effect( return Effect.succeed([undefined, nextPollers] as const); } - return makeRemoteRefreshLoop(cwd, automaticRemoteRefreshInterval).pipe( - Effect.forkIn(broadcasterScope), - Effect.map((fiber) => { + return Effect.all([ + makeRemoteRefreshLoop(cwd, automaticRemoteRefreshInterval).pipe( + Effect.forkIn(broadcasterScope), + ), + makeLocalWatchLoop(cwd).pipe(Effect.forkIn(broadcasterScope)), + ]).pipe( + Effect.map(([fiber, watchFiber]) => { const nextPollers = new Map(activePollers); nextPollers.set(cwd, { fiber, + watchFiber, subscriberCount: 1, }); return [undefined, nextPollers] as const; @@ -349,11 +399,14 @@ export const layer = Layer.effect( const nextPollers = new Map(activePollers); nextPollers.delete(cwd); - return [existing.fiber, nextPollers] as const; + return [existing, nextPollers] as const; }); if (pollerToInterrupt) { - yield* Fiber.interrupt(pollerToInterrupt).pipe(Effect.ignore); + yield* Effect.all( + [Fiber.interrupt(pollerToInterrupt.fiber), Fiber.interrupt(pollerToInterrupt.watchFiber)], + { discard: true }, + ).pipe(Effect.ignore); } });