Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 142 additions & 1 deletion packages/shared/src/relayClient.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import { sha256 } from "@noble/hashes/sha2";
import * as NodeServices from "@effect/platform-node/NodeServices";
import { describe, expect, it } from "@effect/vitest";
import { assert, describe, expect, it } from "@effect/vitest";
import * as ConfigProvider from "effect/ConfigProvider";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Encoding from "effect/Encoding";
import * as Fiber from "effect/Fiber";
import * as FileSystem from "effect/FileSystem";
import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Path from "effect/Path";
import * as PlatformError from "effect/PlatformError";
import * as Sink from "effect/Sink";
import * as Stream from "effect/Stream";
import * as TestClock from "effect/testing/TestClock";
import { HttpClient, HttpClientResponse } from "effect/unstable/http";
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";

Expand Down Expand Up @@ -57,6 +63,47 @@ const makeSpawnerLayer = (commands: Array<string>) =>
),
);

const lockedFileInfo: FileSystem.File.Info = {
type: "File",
mtime: Option.none(),
atime: Option.none(),
birthtime: Option.none(),
dev: 0,
ino: Option.none(),
mode: 0o644,
nlink: Option.none(),
uid: Option.none(),
gid: Option.none(),
rdev: Option.none(),
size: FileSystem.Size(0),
blksize: Option.none(),
blocks: Option.none(),
};

const fileSystemError = (tag: "AlreadyExists" | "NotFound", method: string, path: string) =>
PlatformError.systemError({
_tag: tag,
module: "FileSystem",
method,
pathOrDescriptor: path,
description: tag === "AlreadyExists" ? "File already exists" : "No such file or directory",
});

const makeLockedFileSystemLayer = (attempts: { count: number }) =>
FileSystem.layerNoop({
makeDirectory: () => Effect.void,
remove: () => Effect.void,
stat: (path) =>
path.endsWith(".lock")
? Effect.succeed(lockedFileInfo)
: Effect.fail(fileSystemError("NotFound", "stat", path)),
writeFileString: (path) =>
Effect.gen(function* () {
attempts.count += 1;
return yield* fileSystemError("AlreadyExists", "writeFileString", path);
}),
});

describe("RelayClient", () => {
it.effect("resolves explicit overrides before managed and PATH executables", () =>
Effect.gen(function* () {
Expand Down Expand Up @@ -227,6 +274,100 @@ describe("RelayClient", () => {
);
});

it.effect("fails with install_locked after the lock retry schedule is exhausted", () => {
const commands: Array<string> = [];
const bytes = new TextEncoder().encode("test-cloudflared-binary");
const attempts = { count: 0 };
return Effect.gen(function* () {
const baseDir = "/tmp/t3-cloudflared-test-locked";
const manager = yield* makeCloudflaredRelayClient({
baseDir,
platform: "linux",
arch: "x64",
releaseAsset: {
url: "https://example.test/cloudflared",
sha256: Encoding.encodeHex(sha256(bytes)),
archive: "binary",
},
configProvider: emptyConfigProvider,
});

const install = yield* manager.install.pipe(Effect.flip, Effect.forkScoped);
yield* Effect.yieldNow;
yield* TestClock.adjust(Duration.seconds(10));
const error = yield* Fiber.join(install);

assert.ok(error instanceof RelayClientInstallError);
assert.equal(error.reason, "install_locked");
assert.equal(attempts.count, 100);
assert.equal(commands.length, 0);
}).pipe(
Effect.scoped,
Effect.provide(
Layer.mergeAll(
NodeServices.layer,
makeLockedFileSystemLayer(attempts),
TestClock.layer(),
makeHttpClientLayer(bytes),
makeSpawnerLayer(commands),
),
),
);
});

it.effect("removes stale install locks before downloading the managed executable", () => {
const commands: Array<string> = [];
const bytes = new TextEncoder().encode("test-cloudflared-binary");
return Effect.gen(function* () {
const fileSystem = yield* FileSystem.FileSystem;
const baseDir = yield* fileSystem.makeTempDirectoryScoped({
prefix: "t3-cloudflared-test-stale-lock-",
});
const managedPath = resolveManagedCloudflaredPath({
baseDir,
platform: "linux",
arch: "x64",
});
const lockPath = `${managedPath}.lock`;
const path = yield* Path.Path;
yield* fileSystem.makeDirectory(path.dirname(managedPath), { recursive: true });
yield* fileSystem.writeFileString(lockPath, "stale");
yield* fileSystem.utimes(lockPath, 0, 0);
const manager = yield* makeCloudflaredRelayClient({
baseDir,
platform: "linux",
arch: "x64",
releaseAsset: {
url: "https://example.test/cloudflared",
sha256: Encoding.encodeHex(sha256(bytes)),
archive: "binary",
},
configProvider: emptyConfigProvider,
});

yield* TestClock.adjust(Duration.minutes(6));
const installed = yield* manager.install;

assert.deepStrictEqual(installed, {
status: "available",
executablePath: managedPath,
source: "managed",
version: CLOUDFLARED_VERSION,
});
assert.equal(commands.length, 1);
}).pipe(
Effect.scoped,
Effect.provide(
Layer.mergeAll(
NodeServices.layer,
TestClock.layer(),
makeHttpClientLayer(bytes),
makeSpawnerLayer(commands),
),
),
);
});

it.effect("observes PATH changes after the manager has been constructed", () =>
Effect.gen(function* () {
const fileSystem = yield* FileSystem.FileSystem;
Expand Down
74 changes: 51 additions & 23 deletions packages/shared/src/relayClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import * as Clock from "effect/Clock";
import type {
RelayClientInstallProgressEvent,
RelayClientInstallProgressStage,
Expand All @@ -8,13 +7,16 @@ import * as ConfigProvider from "effect/ConfigProvider";
import * as Context from "effect/Context";
import * as Crypto from "effect/Crypto";
import * as Data from "effect/Data";
import * as DateTime from "effect/DateTime";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Encoding from "effect/Encoding";
import * as FileSystem from "effect/FileSystem";
import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Path from "effect/Path";
import * as PlatformError from "effect/PlatformError";
import * as Schedule from "effect/Schedule";
import * as Semaphore from "effect/Semaphore";
import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http";
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";
Expand Down Expand Up @@ -62,6 +64,12 @@ class CloudflaredCommandError extends Data.TaggedError("CloudflaredCommandError"
readonly exitCode: number;
}> {}

class RelayClientInstallLockUnavailable extends Data.TaggedError(
"RelayClientInstallLockUnavailable",
)<{
readonly lockPath: string;
}> {}

export interface CloudflaredReleaseAsset {
readonly url: string;
readonly sha256: string;
Expand Down Expand Up @@ -99,8 +107,11 @@ const CLOUDFLARED_RELEASE_ASSETS: Readonly<
};

const INSTALL_LOCK_RETRY_COUNT = 100;
const INSTALL_LOCK_RETRY_DELAY = "100 millis";
const INSTALL_LOCK_STALE_MS = 5 * 60 * 1_000;
const INSTALL_LOCK_RETRY_DELAY = Duration.millis(100);
const INSTALL_LOCK_STALE_AFTER = Duration.minutes(5);
const INSTALL_LOCK_RETRY_SCHEDULE = Schedule.spaced(INSTALL_LOCK_RETRY_DELAY).pipe(
Schedule.both(Schedule.recurs(INSTALL_LOCK_RETRY_COUNT - 1)),
);

const trimmedString = (name: string) =>
Config.string(name).pipe(
Expand Down Expand Up @@ -354,28 +365,45 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function
const acquireInstallLock = Effect.fn("cloudflared.acquireInstallLock")(function* (
lockPath: string,
) {
for (let attempt = 0; attempt < INSTALL_LOCK_RETRY_COUNT; attempt += 1) {
const acquired = yield* fileSystem.writeFileString(lockPath, "", { flag: "wx" }).pipe(
Effect.as(true),
Effect.catch((error) =>
isAlreadyExists(error) ? Effect.succeed(false) : Effect.fail(error),
),
);
if (acquired) return;

const now = yield* Clock.currentTimeMillis;
const lockInfo = yield* fileSystem.stat(lockPath).pipe(Effect.option);
const mtime = Option.flatMap(lockInfo, (info) => info.mtime);
if (Option.isSome(mtime) && now - mtime.value.getTime() > INSTALL_LOCK_STALE_MS) {
yield* fileSystem.remove(lockPath, { force: true });
continue;
const acquireInstallLockOnce = Effect.fn("cloudflared.acquireInstallLockOnce")(function* () {
while (true) {
const acquired = yield* fileSystem.writeFileString(lockPath, "", { flag: "wx" }).pipe(
Effect.as(true),
Effect.catch((error) =>
isAlreadyExists(error) ? Effect.succeed(false) : Effect.fail(error),
),
);
if (acquired) return;

const now = yield* DateTime.now;
const lockInfo = yield* fileSystem.stat(lockPath).pipe(Effect.option);
const mtime = Option.flatMap(lockInfo, (info) => info.mtime);
const staleBefore = DateTime.subtractDuration(now, INSTALL_LOCK_STALE_AFTER);
if (
Option.isSome(mtime) &&
DateTime.Order(DateTime.fromDateUnsafe(mtime.value), staleBefore) < 0
) {
yield* fileSystem.remove(lockPath, { force: true });
continue;
}
return yield* new RelayClientInstallLockUnavailable({ lockPath });
}
yield* Effect.sleep(INSTALL_LOCK_RETRY_DELAY);
}
return yield* new RelayClientInstallError({
reason: "install_locked",
message: "Another relay client installation is still in progress.",
});

return yield* acquireInstallLockOnce().pipe(
Effect.retry({
schedule: INSTALL_LOCK_RETRY_SCHEDULE,
while: (error) => error instanceof RelayClientInstallLockUnavailable,
}),
Effect.catchTag("RelayClientInstallLockUnavailable", () =>
Effect.fail(
new RelayClientInstallError({
reason: "install_locked",
message: "Another relay client installation is still in progress.",
}),
),
),
);
});

const installUnlocked = Effect.fn("cloudflared.installUnlocked")(function* (
Expand Down
Loading