diff --git a/packages/shared/src/relayClient.test.ts b/packages/shared/src/relayClient.test.ts index 7e552194dae..ed081017544 100644 --- a/packages/shared/src/relayClient.test.ts +++ b/packages/shared/src/relayClient.test.ts @@ -1,13 +1,19 @@ import { sha256 } from "@noble/hashes/sha2"; import * as NodeServices from "@effect/platform-node/NodeServices"; -import { describe, expect, it } from "@effect/vitest"; +import { assert, describe, expect, it } from "@effect/vitest"; import * as ConfigProvider from "effect/ConfigProvider"; +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Encoding from "effect/Encoding"; +import * as Fiber from "effect/Fiber"; import * as FileSystem from "effect/FileSystem"; import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; +import * as Path from "effect/Path"; +import * as PlatformError from "effect/PlatformError"; import * as Sink from "effect/Sink"; import * as Stream from "effect/Stream"; +import * as TestClock from "effect/testing/TestClock"; import { HttpClient, HttpClientResponse } from "effect/unstable/http"; import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; @@ -57,6 +63,47 @@ const makeSpawnerLayer = (commands: Array) => ), ); +const lockedFileInfo: FileSystem.File.Info = { + type: "File", + mtime: Option.none(), + atime: Option.none(), + birthtime: Option.none(), + dev: 0, + ino: Option.none(), + mode: 0o644, + nlink: Option.none(), + uid: Option.none(), + gid: Option.none(), + rdev: Option.none(), + size: FileSystem.Size(0), + blksize: Option.none(), + blocks: Option.none(), +}; + +const fileSystemError = (tag: "AlreadyExists" | "NotFound", method: string, path: string) => + PlatformError.systemError({ + _tag: tag, + module: "FileSystem", + method, + pathOrDescriptor: path, + description: tag === "AlreadyExists" ? "File already exists" : "No such file or directory", + }); + +const makeLockedFileSystemLayer = (attempts: { count: number }) => + FileSystem.layerNoop({ + makeDirectory: () => Effect.void, + remove: () => Effect.void, + stat: (path) => + path.endsWith(".lock") + ? Effect.succeed(lockedFileInfo) + : Effect.fail(fileSystemError("NotFound", "stat", path)), + writeFileString: (path) => + Effect.gen(function* () { + attempts.count += 1; + return yield* fileSystemError("AlreadyExists", "writeFileString", path); + }), + }); + describe("RelayClient", () => { it.effect("resolves explicit overrides before managed and PATH executables", () => Effect.gen(function* () { @@ -227,6 +274,100 @@ describe("RelayClient", () => { ); }); + it.effect("fails with install_locked after the lock retry schedule is exhausted", () => { + const commands: Array = []; + const bytes = new TextEncoder().encode("test-cloudflared-binary"); + const attempts = { count: 0 }; + return Effect.gen(function* () { + const baseDir = "/tmp/t3-cloudflared-test-locked"; + const manager = yield* makeCloudflaredRelayClient({ + baseDir, + platform: "linux", + arch: "x64", + releaseAsset: { + url: "https://example.test/cloudflared", + sha256: Encoding.encodeHex(sha256(bytes)), + archive: "binary", + }, + configProvider: emptyConfigProvider, + }); + + const install = yield* manager.install.pipe(Effect.flip, Effect.forkScoped); + yield* Effect.yieldNow; + yield* TestClock.adjust(Duration.seconds(10)); + const error = yield* Fiber.join(install); + + assert.ok(error instanceof RelayClientInstallError); + assert.equal(error.reason, "install_locked"); + assert.equal(attempts.count, 100); + assert.equal(commands.length, 0); + }).pipe( + Effect.scoped, + Effect.provide( + Layer.mergeAll( + NodeServices.layer, + makeLockedFileSystemLayer(attempts), + TestClock.layer(), + makeHttpClientLayer(bytes), + makeSpawnerLayer(commands), + ), + ), + ); + }); + + it.effect("removes stale install locks before downloading the managed executable", () => { + const commands: Array = []; + const bytes = new TextEncoder().encode("test-cloudflared-binary"); + return Effect.gen(function* () { + const fileSystem = yield* FileSystem.FileSystem; + const baseDir = yield* fileSystem.makeTempDirectoryScoped({ + prefix: "t3-cloudflared-test-stale-lock-", + }); + const managedPath = resolveManagedCloudflaredPath({ + baseDir, + platform: "linux", + arch: "x64", + }); + const lockPath = `${managedPath}.lock`; + const path = yield* Path.Path; + yield* fileSystem.makeDirectory(path.dirname(managedPath), { recursive: true }); + yield* fileSystem.writeFileString(lockPath, "stale"); + yield* fileSystem.utimes(lockPath, 0, 0); + const manager = yield* makeCloudflaredRelayClient({ + baseDir, + platform: "linux", + arch: "x64", + releaseAsset: { + url: "https://example.test/cloudflared", + sha256: Encoding.encodeHex(sha256(bytes)), + archive: "binary", + }, + configProvider: emptyConfigProvider, + }); + + yield* TestClock.adjust(Duration.minutes(6)); + const installed = yield* manager.install; + + assert.deepStrictEqual(installed, { + status: "available", + executablePath: managedPath, + source: "managed", + version: CLOUDFLARED_VERSION, + }); + assert.equal(commands.length, 1); + }).pipe( + Effect.scoped, + Effect.provide( + Layer.mergeAll( + NodeServices.layer, + TestClock.layer(), + makeHttpClientLayer(bytes), + makeSpawnerLayer(commands), + ), + ), + ); + }); + it.effect("observes PATH changes after the manager has been constructed", () => Effect.gen(function* () { const fileSystem = yield* FileSystem.FileSystem; diff --git a/packages/shared/src/relayClient.ts b/packages/shared/src/relayClient.ts index 35d002466e9..0e6f41f7a81 100644 --- a/packages/shared/src/relayClient.ts +++ b/packages/shared/src/relayClient.ts @@ -1,4 +1,3 @@ -import * as Clock from "effect/Clock"; import type { RelayClientInstallProgressEvent, RelayClientInstallProgressStage, @@ -8,6 +7,8 @@ import * as ConfigProvider from "effect/ConfigProvider"; import * as Context from "effect/Context"; import * as Crypto from "effect/Crypto"; import * as Data from "effect/Data"; +import * as DateTime from "effect/DateTime"; +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Encoding from "effect/Encoding"; import * as FileSystem from "effect/FileSystem"; @@ -15,6 +16,7 @@ import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as Path from "effect/Path"; import * as PlatformError from "effect/PlatformError"; +import * as Schedule from "effect/Schedule"; import * as Semaphore from "effect/Semaphore"; import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"; import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; @@ -62,6 +64,12 @@ class CloudflaredCommandError extends Data.TaggedError("CloudflaredCommandError" readonly exitCode: number; }> {} +class RelayClientInstallLockUnavailable extends Data.TaggedError( + "RelayClientInstallLockUnavailable", +)<{ + readonly lockPath: string; +}> {} + export interface CloudflaredReleaseAsset { readonly url: string; readonly sha256: string; @@ -99,8 +107,11 @@ const CLOUDFLARED_RELEASE_ASSETS: Readonly< }; const INSTALL_LOCK_RETRY_COUNT = 100; -const INSTALL_LOCK_RETRY_DELAY = "100 millis"; -const INSTALL_LOCK_STALE_MS = 5 * 60 * 1_000; +const INSTALL_LOCK_RETRY_DELAY = Duration.millis(100); +const INSTALL_LOCK_STALE_AFTER = Duration.minutes(5); +const INSTALL_LOCK_RETRY_SCHEDULE = Schedule.spaced(INSTALL_LOCK_RETRY_DELAY).pipe( + Schedule.both(Schedule.recurs(INSTALL_LOCK_RETRY_COUNT - 1)), +); const trimmedString = (name: string) => Config.string(name).pipe( @@ -354,28 +365,45 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function const acquireInstallLock = Effect.fn("cloudflared.acquireInstallLock")(function* ( lockPath: string, ) { - for (let attempt = 0; attempt < INSTALL_LOCK_RETRY_COUNT; attempt += 1) { - const acquired = yield* fileSystem.writeFileString(lockPath, "", { flag: "wx" }).pipe( - Effect.as(true), - Effect.catch((error) => - isAlreadyExists(error) ? Effect.succeed(false) : Effect.fail(error), - ), - ); - if (acquired) return; - - const now = yield* Clock.currentTimeMillis; - const lockInfo = yield* fileSystem.stat(lockPath).pipe(Effect.option); - const mtime = Option.flatMap(lockInfo, (info) => info.mtime); - if (Option.isSome(mtime) && now - mtime.value.getTime() > INSTALL_LOCK_STALE_MS) { - yield* fileSystem.remove(lockPath, { force: true }); - continue; + const acquireInstallLockOnce = Effect.fn("cloudflared.acquireInstallLockOnce")(function* () { + while (true) { + const acquired = yield* fileSystem.writeFileString(lockPath, "", { flag: "wx" }).pipe( + Effect.as(true), + Effect.catch((error) => + isAlreadyExists(error) ? Effect.succeed(false) : Effect.fail(error), + ), + ); + if (acquired) return; + + const now = yield* DateTime.now; + const lockInfo = yield* fileSystem.stat(lockPath).pipe(Effect.option); + const mtime = Option.flatMap(lockInfo, (info) => info.mtime); + const staleBefore = DateTime.subtractDuration(now, INSTALL_LOCK_STALE_AFTER); + if ( + Option.isSome(mtime) && + DateTime.Order(DateTime.fromDateUnsafe(mtime.value), staleBefore) < 0 + ) { + yield* fileSystem.remove(lockPath, { force: true }); + continue; + } + return yield* new RelayClientInstallLockUnavailable({ lockPath }); } - yield* Effect.sleep(INSTALL_LOCK_RETRY_DELAY); - } - return yield* new RelayClientInstallError({ - reason: "install_locked", - message: "Another relay client installation is still in progress.", }); + + return yield* acquireInstallLockOnce().pipe( + Effect.retry({ + schedule: INSTALL_LOCK_RETRY_SCHEDULE, + while: (error) => error instanceof RelayClientInstallLockUnavailable, + }), + Effect.catchTag("RelayClientInstallLockUnavailable", () => + Effect.fail( + new RelayClientInstallError({ + reason: "install_locked", + message: "Another relay client installation is still in progress.", + }), + ), + ), + ); }); const installUnlocked = Effect.fn("cloudflared.installUnlocked")(function* (