Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 75 additions & 18 deletions packages/shared/src/relayClient.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { sha256 } from "@noble/hashes/sha2";
import * as NodeServices from "@effect/platform-node/NodeServices";
import { describe, expect, it } from "@effect/vitest";
import { assert, describe, it } from "@effect/vitest";
import * as ConfigProvider from "effect/ConfigProvider";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Encoding from "effect/Encoding";
import * as FileSystem from "effect/FileSystem";
import * as Layer from "effect/Layer";
import * as Path from "effect/Path";
import * as Sink from "effect/Sink";
import * as Stream from "effect/Stream";
import * as TestClock from "effect/testing/TestClock";
import { HttpClient, HttpClientResponse } from "effect/unstable/http";
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";

Expand Down Expand Up @@ -47,15 +50,13 @@ const makeHttpClientLayer = (bytes: Uint8Array) =>
);

const makeSpawnerLayer = (commands: Array<string>) =>
Layer.succeed(
ChildProcessSpawner.ChildProcessSpawner,
ChildProcessSpawner.make((command) =>
Layer.mock(ChildProcessSpawner.ChildProcessSpawner, {
spawn: (command) =>
Effect.sync(() => {
commands.push(ChildProcess.isStandardCommand(command) ? command.command : "piped-command");
return makeHandle();
}),
),
);
});

describe("RelayClient", () => {
it.effect("resolves explicit overrides before managed and PATH executables", () =>
Expand All @@ -80,7 +81,7 @@ describe("RelayClient", () => {
}),
});

expect(yield* manager.resolve).toEqual({
assert.deepStrictEqual(yield* manager.resolve, {
status: "available",
executablePath: overridePath,
source: "override",
Expand Down Expand Up @@ -130,16 +131,17 @@ describe("RelayClient", () => {
platform: "linux",
arch: "x64",
});
expect(installed).toEqual({
assert.deepStrictEqual(installed, {
status: "available",
executablePath: managedPath,
source: "managed",
version: CLOUDFLARED_VERSION,
});
expect(new TextDecoder().decode(yield* fileSystem.readFile(managedPath))).toBe(
assert.equal(
new TextDecoder().decode(yield* fileSystem.readFile(managedPath)),
"test-cloudflared-binary",
);
expect(progress).toEqual([
assert.deepStrictEqual(progress, [
"checking",
"waiting_for_lock",
"downloading",
Expand All @@ -148,7 +150,7 @@ describe("RelayClient", () => {
"validating",
"activating",
]);
expect(yield* manager.resolve).toEqual(installed);
assert.deepStrictEqual(yield* manager.resolve, installed);
}).pipe(
Effect.scoped,
Effect.provide(
Expand Down Expand Up @@ -180,8 +182,8 @@ describe("RelayClient", () => {
});

const error = yield* manager.install.pipe(Effect.flip);
expect(error).toBeInstanceOf(RelayClientInstallError);
expect(error.reason).toBe("invalid_checksum");
assert.ok(error instanceof RelayClientInstallError);
assert.equal(error.reason, "invalid_checksum");
}).pipe(
Effect.scoped,
Effect.provide(
Expand Down Expand Up @@ -217,12 +219,17 @@ describe("RelayClient", () => {
const [first, second] = yield* Effect.all([manager.install, manager.install], {
concurrency: "unbounded",
});
expect(second).toEqual(first);
expect(commands).toHaveLength(1);
assert.deepStrictEqual(second, first);
assert.equal(commands.length, 1);
}).pipe(
Effect.scoped,
Effect.provide(
Layer.mergeAll(NodeServices.layer, makeHttpClientLayer(bytes), makeSpawnerLayer(commands)),
Layer.mergeAll(
TestClock.layer(),
NodeServices.layer,
makeHttpClientLayer(bytes),
makeSpawnerLayer(commands),
),
),
);
});
Expand All @@ -243,7 +250,7 @@ describe("RelayClient", () => {
configProvider: () => ConfigProvider.fromEnv({ env: { PATH: path } }),
});

expect(yield* manager.resolve).toEqual({
assert.deepStrictEqual(yield* manager.resolve, {
status: "missing",
version: CLOUDFLARED_VERSION,
});
Expand All @@ -253,7 +260,7 @@ describe("RelayClient", () => {
yield* fileSystem.chmod(executablePath, 0o755);
path = binDir;

expect(yield* manager.resolve).toEqual({
assert.deepStrictEqual(yield* manager.resolve, {
status: "available",
executablePath,
source: "path",
Expand All @@ -270,4 +277,54 @@ describe("RelayClient", () => {
),
),
);

it.effect("removes stale install locks before installing", () => {
const commands: Array<string> = [];
const bytes = new TextEncoder().encode("test-cloudflared-binary");
return Effect.gen(function* () {
const fileSystem = yield* FileSystem.FileSystem;
const path = yield* Path.Path;
const baseDir = yield* fileSystem.makeTempDirectoryScoped({
prefix: "t3-cloudflared-test-",
});
const managedPath = resolveManagedCloudflaredPath({
baseDir,
platform: "linux",
arch: "x64",
});
const lockPath = `${managedPath}.lock`;
yield* fileSystem.makeDirectory(path.dirname(lockPath), { recursive: true });
yield* fileSystem.writeFileString(lockPath, "stale");
yield* fileSystem.utimes(lockPath, 0, 0);
yield* TestClock.adjust(Duration.minutes(6));

const manager = yield* makeCloudflaredRelayClient({
baseDir,
platform: "linux",
arch: "x64",
releaseAsset: {
url: "https://example.test/cloudflared",
sha256: Encoding.encodeHex(sha256(bytes)),
archive: "binary",
},
configProvider: emptyConfigProvider,
});

const installed = yield* manager.install;

assert.deepStrictEqual(installed, {
status: "available",
executablePath: managedPath,
source: "managed",
version: CLOUDFLARED_VERSION,
});
assert.equal(yield* fileSystem.exists(lockPath), false);
assert.equal(commands.length, 1);
}).pipe(
Effect.scoped,
Effect.provide(
Layer.mergeAll(NodeServices.layer, makeHttpClientLayer(bytes), makeSpawnerLayer(commands)),
),
);
});
});
112 changes: 73 additions & 39 deletions packages/shared/src/relayClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import * as Clock from "effect/Clock";
import type {
RelayClientInstallProgressEvent,
RelayClientInstallProgressStage,
Expand All @@ -8,13 +7,16 @@ import * as ConfigProvider from "effect/ConfigProvider";
import * as Context from "effect/Context";
import * as Crypto from "effect/Crypto";
import * as Data from "effect/Data";
import * as DateTime from "effect/DateTime";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Encoding from "effect/Encoding";
import * as FileSystem from "effect/FileSystem";
import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Path from "effect/Path";
import * as PlatformError from "effect/PlatformError";
import * as Schedule from "effect/Schedule";
import * as Semaphore from "effect/Semaphore";
import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http";
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";
Expand Down Expand Up @@ -99,8 +101,18 @@ const CLOUDFLARED_RELEASE_ASSETS: Readonly<
};

const INSTALL_LOCK_RETRY_COUNT = 100;
const INSTALL_LOCK_RETRY_DELAY = "100 millis";
const INSTALL_LOCK_STALE_MS = 5 * 60 * 1_000;
const INSTALL_LOCK_RETRY_DELAY = Duration.millis(100);
const INSTALL_LOCK_STALE_AGE = Duration.minutes(5);

class RelayClientInstallLockBusy extends Data.TaggedError("RelayClientInstallLockBusy")<{
readonly lockPath: string;
}> {}

const retryWhileInstallLockBusy = Schedule.spaced(INSTALL_LOCK_RETRY_DELAY).pipe(
Schedule.both(Schedule.recurs(INSTALL_LOCK_RETRY_COUNT - 1)),
Schedule.setInputType<RelayClientInstallLockBusy | PlatformError.PlatformError>(),
Schedule.while(({ input }) => input._tag === "RelayClientInstallLockBusy"),
);

const trimmedString = (name: string) =>
Config.string(name).pipe(
Expand Down Expand Up @@ -161,8 +173,8 @@ export function resolveManagedCloudflaredPath(input: {
function resolveReleaseAsset(
platform: NodeJS.Platform,
arch: string,
): CloudflaredReleaseAsset | null {
return CLOUDFLARED_RELEASE_ASSETS[`${platform}-${arch}`] ?? null;
): Option.Option<CloudflaredReleaseAsset> {
return Option.fromUndefinedOr(CLOUDFLARED_RELEASE_ASSETS[`${platform}-${arch}`]);
}

function isAlreadyExists(error: PlatformError.PlatformError): boolean {
Expand Down Expand Up @@ -207,7 +219,9 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function
const installSemaphore = yield* Semaphore.make(1);
const platform = options.platform ?? process.platform;
const arch = options.arch ?? process.arch;
const releaseAsset = options.releaseAsset ?? resolveReleaseAsset(platform, arch);
const releaseAsset = Option.fromUndefinedOr(options.releaseAsset).pipe(
Option.orElse(() => resolveReleaseAsset(platform, arch)),
);
const loadCloudflaredConfig = Effect.suspend(() =>
CloudflaredConfig.pipe(
Effect.provideService(
Expand Down Expand Up @@ -236,15 +250,15 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function
const resolvePathExecutable = Effect.gen(function* () {
const config = yield* loadCloudflaredConfig;
const pathValue = Option.getOrUndefined(config.path);
if (!pathValue) return null;
if (!pathValue) return Option.none();
const delimiter = platform === "win32" ? ";" : ":";
for (const directory of pathValue.split(delimiter)) {
const trimmed = directory.trim().replace(/^"|"$/gu, "");
if (trimmed.length === 0) continue;
const candidate = path.join(trimmed, executableFileName(platform));
if (yield* isExecutableFile(candidate)) return candidate;
if (yield* isExecutableFile(candidate)) return Option.some(candidate);
}
return null;
return Option.none();
});

const resolve: RelayClientShape["resolve"] = Effect.gen(function* () {
Expand All @@ -268,15 +282,15 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function
};
}
const pathExecutable = yield* resolvePathExecutable;
if (pathExecutable) {
if (Option.isSome(pathExecutable)) {
return {
status: "available",
executablePath: pathExecutable,
executablePath: pathExecutable.value,
source: "path",
version: CLOUDFLARED_VERSION,
};
}
return releaseAsset
return Option.isSome(releaseAsset)
? { status: "missing", version: CLOUDFLARED_VERSION }
: {
status: "unsupported",
Expand Down Expand Up @@ -351,36 +365,56 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function
return bytes;
});

const isInstallLockStale = Effect.fn("cloudflared.isInstallLockStale")(function* (
lockPath: string,
): Effect.fn.Return<boolean> {
const lockInfo = yield* fileSystem.stat(lockPath).pipe(Effect.option);
const lockModifiedAt = Option.flatMap(lockInfo, (info) => info.mtime);
if (Option.isNone(lockModifiedAt)) return false;

const now = yield* DateTime.now;
const lockAge = DateTime.distance(DateTime.makeUnsafe(lockModifiedAt.value), now);
return Duration.isGreaterThan(lockAge, INSTALL_LOCK_STALE_AGE);
});

const attemptAcquireInstallLock = Effect.fn("cloudflared.attemptAcquireInstallLock")(function* (
lockPath: string,
): Effect.fn.Return<void, RelayClientInstallLockBusy | PlatformError.PlatformError> {
const acquired = yield* fileSystem.writeFileString(lockPath, "", { flag: "wx" }).pipe(
Effect.as(true),
Effect.catch((error) =>
isAlreadyExists(error) ? Effect.succeed(false) : Effect.fail(error),
),
);
if (acquired) return;

if (yield* isInstallLockStale(lockPath)) {
yield* fileSystem.remove(lockPath, { force: true });
return yield* attemptAcquireInstallLock(lockPath);
}

return yield* new RelayClientInstallLockBusy({ lockPath });
});

const acquireInstallLock = Effect.fn("cloudflared.acquireInstallLock")(function* (
lockPath: string,
) {
for (let attempt = 0; attempt < INSTALL_LOCK_RETRY_COUNT; attempt += 1) {
const acquired = yield* fileSystem.writeFileString(lockPath, "", { flag: "wx" }).pipe(
Effect.as(true),
Effect.catch((error) =>
isAlreadyExists(error) ? Effect.succeed(false) : Effect.fail(error),
): Effect.fn.Return<void, RelayClientInstallError | PlatformError.PlatformError> {
return yield* attemptAcquireInstallLock(lockPath).pipe(
Effect.retry(retryWhileInstallLockBusy),
Effect.catchTag("RelayClientInstallLockBusy", () =>
Effect.fail(
new RelayClientInstallError({
reason: "install_locked",
message: "Another relay client installation is still in progress.",
}),
),
);
if (acquired) return;

const now = yield* Clock.currentTimeMillis;
const lockInfo = yield* fileSystem.stat(lockPath).pipe(Effect.option);
const mtime = Option.flatMap(lockInfo, (info) => info.mtime);
if (Option.isSome(mtime) && now - mtime.value.getTime() > INSTALL_LOCK_STALE_MS) {
yield* fileSystem.remove(lockPath, { force: true });
continue;
}
yield* Effect.sleep(INSTALL_LOCK_RETRY_DELAY);
}
return yield* new RelayClientInstallError({
reason: "install_locked",
message: "Another relay client installation is still in progress.",
});
),
);
});

const installUnlocked = Effect.fn("cloudflared.installUnlocked")(function* (
report: (stage: RelayClientInstallProgressStage) => Effect.Effect<void>,
) {
): Effect.fn.Return<AvailableRelayClient, RelayClientInstallError> {
yield* report("checking");
const existing = yield* resolve;
if (existing.status === "available") return existing;
Expand All @@ -391,7 +425,7 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function
message: `${CLOUDFLARED_PATH_ENV_NAME} does not point to an executable file.`,
});
}
if (!releaseAsset) {
if (Option.isNone(releaseAsset)) {
return yield* new RelayClientInstallError({
reason: "unsupported_platform",
message: `T3 Code does not provide a managed relay client binary for ${platform}-${arch}.`,
Expand Down Expand Up @@ -427,16 +461,16 @@ export const makeCloudflaredRelayClient = Effect.fn("cloudflared.make")(function
});
const archivePath = path.join(
tempDirectory,
releaseAsset.archive === "tgz" ? "cloudflared.tgz" : executableFileName(platform),
releaseAsset.value.archive === "tgz" ? "cloudflared.tgz" : executableFileName(platform),
);
const download = yield* downloadAsset(releaseAsset, report);
const download = yield* downloadAsset(releaseAsset.value, report);
yield* report("installing");
yield* fileSystem
.writeFile(archivePath, download)
.pipe(wrapInstallFailure("write_failed", "Could not write the relay client download."));

const executablePath = path.join(tempDirectory, executableFileName(platform));
if (releaseAsset.archive === "tgz") {
if (releaseAsset.value.archive === "tgz") {
yield* runCommand("tar", ["-xzf", archivePath, "-C", tempDirectory]).pipe(
wrapInstallFailure("write_failed", "Could not extract the relay client."),
);
Expand Down
Loading