diff --git a/packages/shared/src/relayClient.test.ts b/packages/shared/src/relayClient.test.ts index 7e552194dae..d8bab980890 100644 --- a/packages/shared/src/relayClient.test.ts +++ b/packages/shared/src/relayClient.test.ts @@ -1,13 +1,16 @@ import { sha256 } from "@noble/hashes/sha2"; import * as NodeServices from "@effect/platform-node/NodeServices"; -import { describe, expect, it } from "@effect/vitest"; +import { assert, describe, it } from "@effect/vitest"; import * as ConfigProvider from "effect/ConfigProvider"; +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Encoding from "effect/Encoding"; import * as FileSystem from "effect/FileSystem"; import * as Layer from "effect/Layer"; +import * as Path from "effect/Path"; import * as Sink from "effect/Sink"; import * as Stream from "effect/Stream"; +import * as TestClock from "effect/testing/TestClock"; import { HttpClient, HttpClientResponse } from "effect/unstable/http"; import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; @@ -47,15 +50,13 @@ const makeHttpClientLayer = (bytes: Uint8Array) => ); const makeSpawnerLayer = (commands: Array) => - Layer.succeed( - ChildProcessSpawner.ChildProcessSpawner, - ChildProcessSpawner.make((command) => + Layer.mock(ChildProcessSpawner.ChildProcessSpawner, { + spawn: (command) => Effect.sync(() => { commands.push(ChildProcess.isStandardCommand(command) ? command.command : "piped-command"); return makeHandle(); }), - ), - ); + }); describe("RelayClient", () => { it.effect("resolves explicit overrides before managed and PATH executables", () => @@ -80,7 +81,7 @@ describe("RelayClient", () => { }), }); - expect(yield* manager.resolve).toEqual({ + assert.deepStrictEqual(yield* manager.resolve, { status: "available", executablePath: overridePath, source: "override", @@ -130,16 +131,17 @@ describe("RelayClient", () => { platform: "linux", arch: "x64", }); - expect(installed).toEqual({ + assert.deepStrictEqual(installed, { status: "available", executablePath: managedPath, source: "managed", version: CLOUDFLARED_VERSION, }); - expect(new TextDecoder().decode(yield* fileSystem.readFile(managedPath))).toBe( + assert.equal( + new TextDecoder().decode(yield* fileSystem.readFile(managedPath)), "test-cloudflared-binary", ); - expect(progress).toEqual([ + assert.deepStrictEqual(progress, [ "checking", "waiting_for_lock", "downloading", @@ -148,7 +150,7 @@ describe("RelayClient", () => { "validating", "activating", ]); - expect(yield* manager.resolve).toEqual(installed); + assert.deepStrictEqual(yield* manager.resolve, installed); }).pipe( Effect.scoped, Effect.provide( @@ -180,8 +182,8 @@ describe("RelayClient", () => { }); const error = yield* manager.install.pipe(Effect.flip); - expect(error).toBeInstanceOf(RelayClientInstallError); - expect(error.reason).toBe("invalid_checksum"); + assert.ok(error instanceof RelayClientInstallError); + assert.equal(error.reason, "invalid_checksum"); }).pipe( Effect.scoped, Effect.provide( @@ -217,12 +219,17 @@ describe("RelayClient", () => { const [first, second] = yield* Effect.all([manager.install, manager.install], { concurrency: "unbounded", }); - expect(second).toEqual(first); - expect(commands).toHaveLength(1); + assert.deepStrictEqual(second, first); + assert.equal(commands.length, 1); }).pipe( Effect.scoped, Effect.provide( - Layer.mergeAll(NodeServices.layer, makeHttpClientLayer(bytes), makeSpawnerLayer(commands)), + Layer.mergeAll( + TestClock.layer(), + NodeServices.layer, + makeHttpClientLayer(bytes), + makeSpawnerLayer(commands), + ), ), ); }); @@ -243,7 +250,7 @@ describe("RelayClient", () => { configProvider: () => ConfigProvider.fromEnv({ env: { PATH: path } }), }); - expect(yield* manager.resolve).toEqual({ + assert.deepStrictEqual(yield* manager.resolve, { status: "missing", version: CLOUDFLARED_VERSION, }); @@ -253,7 +260,7 @@ describe("RelayClient", () => { yield* fileSystem.chmod(executablePath, 0o755); path = binDir; - expect(yield* manager.resolve).toEqual({ + assert.deepStrictEqual(yield* manager.resolve, { status: "available", executablePath, source: "path", @@ -270,4 +277,54 @@ describe("RelayClient", () => { ), ), ); + + it.effect("removes stale install locks before installing", () => { + const commands: Array = []; + const bytes = new TextEncoder().encode("test-cloudflared-binary"); + return Effect.gen(function* () { + const fileSystem = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const baseDir = yield* fileSystem.makeTempDirectoryScoped({ + prefix: "t3-cloudflared-test-", + }); + const managedPath = resolveManagedCloudflaredPath({ + baseDir, + platform: "linux", + arch: "x64", + }); + const lockPath = `${managedPath}.lock`; + yield* fileSystem.makeDirectory(path.dirname(lockPath), { recursive: true }); + yield* fileSystem.writeFileString(lockPath, "stale"); + yield* fileSystem.utimes(lockPath, 0, 0); + yield* TestClock.adjust(Duration.minutes(6)); + + const manager = yield* makeCloudflaredRelayClient({ + baseDir, + platform: "linux", + arch: "x64", + releaseAsset: { + url: "https://example.test/cloudflared", + sha256: Encoding.encodeHex(sha256(bytes)), + archive: "binary", + }, + configProvider: emptyConfigProvider, + }); + + const installed = yield* manager.install; + + assert.deepStrictEqual(installed, { + status: "available", + executablePath: managedPath, + source: "managed", + version: CLOUDFLARED_VERSION, + }); + assert.equal(yield* fileSystem.exists(lockPath), false); + assert.equal(commands.length, 1); + }).pipe( + Effect.scoped, + Effect.provide( + Layer.mergeAll(NodeServices.layer, makeHttpClientLayer(bytes), makeSpawnerLayer(commands)), + ), + ); + }); }); diff --git a/packages/shared/src/relayClient.ts b/packages/shared/src/relayClient.ts index 35d002466e9..3bc6e8f000b 100644 --- a/packages/shared/src/relayClient.ts +++ b/packages/shared/src/relayClient.ts @@ -1,4 +1,3 @@ -import * as Clock from "effect/Clock"; import type { RelayClientInstallProgressEvent, RelayClientInstallProgressStage, @@ -8,6 +7,8 @@ import * as ConfigProvider from "effect/ConfigProvider"; import * as Context from "effect/Context"; import * as Crypto from "effect/Crypto"; import * as Data from "effect/Data"; +import * as DateTime from "effect/DateTime"; +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Encoding from "effect/Encoding"; import * as FileSystem from "effect/FileSystem"; @@ -15,6 +16,7 @@ import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as Path from "effect/Path"; import * as PlatformError from "effect/PlatformError"; +import * as Schedule from "effect/Schedule"; import * as Semaphore from "effect/Semaphore"; import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"; import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; @@ -99,8 +101,18 @@ const CLOUDFLARED_RELEASE_ASSETS: Readonly< }; const INSTALL_LOCK_RETRY_COUNT = 100; -const INSTALL_LOCK_RETRY_DELAY = "100 millis"; -const INSTALL_LOCK_STALE_MS = 5 * 60 * 1_000; +const INSTALL_LOCK_RETRY_DELAY = Duration.millis(100); +const INSTALL_LOCK_STALE_AGE = Duration.minutes(5); + +class RelayClientInstallLockBusy extends Data.TaggedError("RelayClientInstallLockBusy")<{ + readonly lockPath: string; +}> {} + +const retryWhileInstallLockBusy = Schedule.spaced(INSTALL_LOCK_RETRY_DELAY).pipe( + Schedule.both(Schedule.recurs(INSTALL_LOCK_RETRY_COUNT - 1)), + Schedule.setInputType(), + Schedule.while(({ input }) => input._tag === "RelayClientInstallLockBusy"), +); const trimmedString = (name: string) => Config.string(name).pipe( @@ -161,8 +173,8 @@ export function resolveManagedCloudflaredPath(input: { function resolveReleaseAsset( platform: NodeJS.Platform, arch: string, -): CloudflaredReleaseAsset | null { - return CLOUDFLARED_RELEASE_ASSETS[`${platform}-${arch}`] ?? null; +): Option.Option { + return Option.fromUndefinedOr(CLOUDFLARED_RELEASE_ASSETS[`${platform}-${arch}`]); } function isAlreadyExists(error: PlatformError.PlatformError): boolean { @@ -207,7 +219,9 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function const installSemaphore = yield* Semaphore.make(1); const platform = options.platform ?? process.platform; const arch = options.arch ?? process.arch; - const releaseAsset = options.releaseAsset ?? resolveReleaseAsset(platform, arch); + const releaseAsset = Option.fromUndefinedOr(options.releaseAsset).pipe( + Option.orElse(() => resolveReleaseAsset(platform, arch)), + ); const loadCloudflaredConfig = Effect.suspend(() => CloudflaredConfig.pipe( Effect.provideService( @@ -236,15 +250,15 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function const resolvePathExecutable = Effect.gen(function* () { const config = yield* loadCloudflaredConfig; const pathValue = Option.getOrUndefined(config.path); - if (!pathValue) return null; + if (!pathValue) return Option.none(); const delimiter = platform === "win32" ? ";" : ":"; for (const directory of pathValue.split(delimiter)) { const trimmed = directory.trim().replace(/^"|"$/gu, ""); if (trimmed.length === 0) continue; const candidate = path.join(trimmed, executableFileName(platform)); - if (yield* isExecutableFile(candidate)) return candidate; + if (yield* isExecutableFile(candidate)) return Option.some(candidate); } - return null; + return Option.none(); }); const resolve: RelayClientShape["resolve"] = Effect.gen(function* () { @@ -268,15 +282,15 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function }; } const pathExecutable = yield* resolvePathExecutable; - if (pathExecutable) { + if (Option.isSome(pathExecutable)) { return { status: "available", - executablePath: pathExecutable, + executablePath: pathExecutable.value, source: "path", version: CLOUDFLARED_VERSION, }; } - return releaseAsset + return Option.isSome(releaseAsset) ? { status: "missing", version: CLOUDFLARED_VERSION } : { status: "unsupported", @@ -351,36 +365,56 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function return bytes; }); + const isInstallLockStale = Effect.fn("cloudflared.isInstallLockStale")(function* ( + lockPath: string, + ): Effect.fn.Return { + const lockInfo = yield* fileSystem.stat(lockPath).pipe(Effect.option); + const lockModifiedAt = Option.flatMap(lockInfo, (info) => info.mtime); + if (Option.isNone(lockModifiedAt)) return false; + + const now = yield* DateTime.now; + const lockAge = DateTime.distance(DateTime.makeUnsafe(lockModifiedAt.value), now); + return Duration.isGreaterThan(lockAge, INSTALL_LOCK_STALE_AGE); + }); + + const attemptAcquireInstallLock = Effect.fn("cloudflared.attemptAcquireInstallLock")(function* ( + lockPath: string, + ): Effect.fn.Return { + const acquired = yield* fileSystem.writeFileString(lockPath, "", { flag: "wx" }).pipe( + Effect.as(true), + Effect.catch((error) => + isAlreadyExists(error) ? Effect.succeed(false) : Effect.fail(error), + ), + ); + if (acquired) return; + + if (yield* isInstallLockStale(lockPath)) { + yield* fileSystem.remove(lockPath, { force: true }); + return yield* attemptAcquireInstallLock(lockPath); + } + + return yield* new RelayClientInstallLockBusy({ lockPath }); + }); + const acquireInstallLock = Effect.fn("cloudflared.acquireInstallLock")(function* ( lockPath: string, - ) { - for (let attempt = 0; attempt < INSTALL_LOCK_RETRY_COUNT; attempt += 1) { - const acquired = yield* fileSystem.writeFileString(lockPath, "", { flag: "wx" }).pipe( - Effect.as(true), - Effect.catch((error) => - isAlreadyExists(error) ? Effect.succeed(false) : Effect.fail(error), + ): Effect.fn.Return { + return yield* attemptAcquireInstallLock(lockPath).pipe( + Effect.retry(retryWhileInstallLockBusy), + Effect.catchTag("RelayClientInstallLockBusy", () => + Effect.fail( + new RelayClientInstallError({ + reason: "install_locked", + message: "Another relay client installation is still in progress.", + }), ), - ); - if (acquired) return; - - const now = yield* Clock.currentTimeMillis; - const lockInfo = yield* fileSystem.stat(lockPath).pipe(Effect.option); - const mtime = Option.flatMap(lockInfo, (info) => info.mtime); - if (Option.isSome(mtime) && now - mtime.value.getTime() > INSTALL_LOCK_STALE_MS) { - yield* fileSystem.remove(lockPath, { force: true }); - continue; - } - yield* Effect.sleep(INSTALL_LOCK_RETRY_DELAY); - } - return yield* new RelayClientInstallError({ - reason: "install_locked", - message: "Another relay client installation is still in progress.", - }); + ), + ); }); const installUnlocked = Effect.fn("cloudflared.installUnlocked")(function* ( report: (stage: RelayClientInstallProgressStage) => Effect.Effect, - ) { + ): Effect.fn.Return { yield* report("checking"); const existing = yield* resolve; if (existing.status === "available") return existing; @@ -391,7 +425,7 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function message: `${CLOUDFLARED_PATH_ENV_NAME} does not point to an executable file.`, }); } - if (!releaseAsset) { + if (Option.isNone(releaseAsset)) { return yield* new RelayClientInstallError({ reason: "unsupported_platform", message: `T3 Code does not provide a managed relay client binary for ${platform}-${arch}.`, @@ -427,16 +461,16 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function }); const archivePath = path.join( tempDirectory, - releaseAsset.archive === "tgz" ? "cloudflared.tgz" : executableFileName(platform), + releaseAsset.value.archive === "tgz" ? "cloudflared.tgz" : executableFileName(platform), ); - const download = yield* downloadAsset(releaseAsset, report); + const download = yield* downloadAsset(releaseAsset.value, report); yield* report("installing"); yield* fileSystem .writeFile(archivePath, download) .pipe(wrapInstallFailure("write_failed", "Could not write the relay client download.")); const executablePath = path.join(tempDirectory, executableFileName(platform)); - if (releaseAsset.archive === "tgz") { + if (releaseAsset.value.archive === "tgz") { yield* runCommand("tar", ["-xzf", archivePath, "-C", tempDirectory]).pipe( wrapInstallFailure("write_failed", "Could not extract the relay client."), );