Skip to content

Commit ee14f54

Browse files
committed
feat: add FilesystemSink for telemetry audit mode
1 parent 13b34a3 commit ee14f54

3 files changed

Lines changed: 234 additions & 0 deletions

File tree

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import { resolveResourceAttributes } from '../config';
2+
import type { ResourceAttributes } from '../schemas/common-attributes';
3+
import { FilesystemSink } from '../sinks/filesystem-sink';
4+
import { mkdir, readFile, rm } from 'fs/promises';
5+
import { randomUUID } from 'node:crypto';
6+
import { tmpdir } from 'node:os';
7+
import { join } from 'node:path';
8+
import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest';
9+
10+
const testDir = join(tmpdir(), `agentcore-fs-sink-${randomUUID()}`);
11+
const outputDir = join(testDir, 'telemetry');
12+
13+
let resource: ResourceAttributes;
14+
const ORIGINAL_CONFIG_DIR = process.env.AGENTCORE_CONFIG_DIR;
15+
16+
beforeAll(async () => {
17+
process.env.AGENTCORE_CONFIG_DIR = join(testDir, '.agentcore');
18+
resource = await resolveResourceAttributes('cli');
19+
});
20+
21+
afterAll(() => {
22+
if (ORIGINAL_CONFIG_DIR === undefined) {
23+
delete process.env.AGENTCORE_CONFIG_DIR;
24+
} else {
25+
process.env.AGENTCORE_CONFIG_DIR = ORIGINAL_CONFIG_DIR;
26+
}
27+
});
28+
29+
function createSink(opts: { dir?: string; log?: (msg: string) => void } = {}) {
30+
return new FilesystemSink({ outputDir: opts.dir ?? outputDir, resource, log: opts.log });
31+
}
32+
33+
function expectFilePath(sink: FilesystemSink): string {
34+
const p = sink.filePath;
35+
if (p === undefined) throw new Error('expected filePath to be defined');
36+
return p;
37+
}
38+
39+
describe('FilesystemSink', () => {
40+
beforeEach(async () => {
41+
await rm(testDir, { recursive: true, force: true });
42+
await mkdir(testDir, { recursive: true });
43+
});
44+
afterAll(() => rm(testDir, { recursive: true, force: true }));
45+
46+
it('filePath is undefined before any records', () => {
47+
const sink = createSink();
48+
expect(sink.filePath).toBeUndefined();
49+
});
50+
51+
it('derives file name from first record command_group and session id', () => {
52+
const sink = createSink();
53+
sink.record(100, { command_group: 'deploy', command: 'deploy' });
54+
expect(sink.filePath).toBe(join(outputDir, `deploy-${resource['agentcore-cli.session_id']}.json`));
55+
});
56+
57+
it('uses "unknown" as entrypoint when command_group is absent', () => {
58+
const sink = createSink();
59+
sink.record(100, { command: 'orphan' });
60+
expect(sink.filePath).toBe(join(outputDir, `unknown-${resource['agentcore-cli.session_id']}.json`));
61+
});
62+
63+
it('entrypoint stays fixed after first record', () => {
64+
const sink = createSink();
65+
sink.record(100, { command_group: 'deploy', command: 'deploy' });
66+
sink.record(200, { command_group: 'status', command: 'status' });
67+
expect(sink.filePath).toContain('deploy-');
68+
});
69+
70+
it('flush writes buffered entries as a JSON array', async () => {
71+
const sink = createSink();
72+
sink.record(42, { command_group: 'deploy', command: 'deploy', exit_reason: 'success' });
73+
74+
await sink.flush();
75+
76+
const path = expectFilePath(sink);
77+
const written = JSON.parse(await readFile(path, 'utf-8'));
78+
expect(written).toHaveLength(1);
79+
expect(written[0]).toMatchObject({
80+
metric_name: 'cli.command_run',
81+
resource,
82+
attributes: {
83+
command_group: 'deploy',
84+
command: 'deploy',
85+
exit_reason: 'success',
86+
duration_ms: 42,
87+
},
88+
});
89+
});
90+
91+
it('accumulates multiple records into a single file', async () => {
92+
const sink = createSink();
93+
sink.record(10, { command_group: 'add', command: 'add.agent' });
94+
sink.record(20, { command_group: 'add', command: 'add.memory' });
95+
96+
await sink.flush();
97+
98+
const path = expectFilePath(sink);
99+
const written = JSON.parse(await readFile(path, 'utf-8'));
100+
expect(written).toHaveLength(2);
101+
expect(written[0].attributes.duration_ms).toBe(10);
102+
expect(written[1].attributes.duration_ms).toBe(20);
103+
});
104+
105+
it('creates output directory if it does not exist', async () => {
106+
const nested = join(testDir, 'deep', 'nested', 'telemetry');
107+
const sink = createSink({ dir: nested });
108+
sink.record(1, { command_group: 'status', command: 'status' });
109+
110+
await sink.flush();
111+
112+
const path = expectFilePath(sink);
113+
const written = JSON.parse(await readFile(path, 'utf-8'));
114+
expect(written).toHaveLength(1);
115+
});
116+
117+
it('flush is a no-op when no records exist', async () => {
118+
const sink = createSink();
119+
await expect(sink.flush()).resolves.toBeUndefined();
120+
});
121+
122+
it('shutdown writes entries and logs audit message', async () => {
123+
const logged: string[] = [];
124+
const sink = createSink({ log: msg => logged.push(msg) });
125+
sink.record(99, { command_group: 'invoke', command: 'invoke' });
126+
127+
await sink.shutdown();
128+
129+
const path = expectFilePath(sink);
130+
const written = JSON.parse(await readFile(path, 'utf-8'));
131+
expect(written).toHaveLength(1);
132+
expect(logged).toHaveLength(1);
133+
expect(logged[0]).toContain('[audit mode]');
134+
expect(logged[0]).toContain(path);
135+
});
136+
137+
it('shutdown does not log when no records were written', async () => {
138+
const logged: string[] = [];
139+
const sink = createSink({ log: msg => logged.push(msg) });
140+
141+
await sink.shutdown();
142+
143+
expect(logged).toHaveLength(0);
144+
});
145+
146+
it('shutdown after flush with no new records still logs', async () => {
147+
const logged: string[] = [];
148+
const sink = createSink({ log: msg => logged.push(msg) });
149+
sink.record(1, { command_group: 'deploy', command: 'deploy' });
150+
await sink.flush();
151+
152+
await sink.shutdown();
153+
154+
expect(logged).toHaveLength(1);
155+
expect(logged[0]).toContain('[audit mode]');
156+
});
157+
158+
it('subsequent flushes include all records', async () => {
159+
const sink = createSink();
160+
sink.record(1, { command_group: 'deploy', command: 'deploy' });
161+
await sink.flush();
162+
163+
sink.record(2, { command_group: 'deploy', command: 'deploy' });
164+
await sink.flush();
165+
166+
const path = expectFilePath(sink);
167+
const written = JSON.parse(await readFile(path, 'utf-8'));
168+
expect(written).toHaveLength(2);
169+
});
170+
});

src/cli/telemetry/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ export type { TelemetryPreference } from './config.js';
33
export { TelemetryClient, CANCELLED } from './client.js';
44
export { type MetricSink, CompositeSink } from './sinks/metric-sink.js';
55
export { OtelMetricSink, type OtelMetricSinkConfig } from './sinks/otel-metric-sink.js';
6+
export { FilesystemSink, type FilesystemSinkConfig } from './sinks/filesystem-sink.js';
67
export { classifyError, isUserError } from './error-classification.js';
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import type { ResourceAttributes } from '../schemas/common-attributes.js';
2+
import type { MetricSink } from './metric-sink.js';
3+
import { mkdir, writeFile } from 'fs/promises';
4+
import { join } from 'path';
5+
6+
export interface FilesystemSinkConfig {
7+
outputDir: string;
8+
resource: ResourceAttributes;
9+
log?: (message: string) => void;
10+
}
11+
12+
interface MetricEntry {
13+
metric_name: 'cli.command_run';
14+
resource: ResourceAttributes;
15+
attributes: Record<string, string | number>;
16+
}
17+
18+
export class FilesystemSink implements MetricSink {
19+
private readonly entries: MetricEntry[] = [];
20+
private readonly outputDir: string;
21+
private readonly resource: ResourceAttributes;
22+
private readonly sessionId: string;
23+
private readonly log: (message: string) => void;
24+
private entrypoint: string | undefined;
25+
26+
constructor(config: FilesystemSinkConfig) {
27+
this.outputDir = config.outputDir;
28+
this.resource = config.resource;
29+
this.sessionId = config.resource['agentcore-cli.session_id'];
30+
this.log = config.log ?? (msg => console.error(msg));
31+
}
32+
33+
get filePath(): string | undefined {
34+
if (this.entrypoint === undefined) return undefined;
35+
return join(this.outputDir, `${this.entrypoint}-${this.sessionId}.json`);
36+
}
37+
38+
record(value: number, attrs: Record<string, string | number>): void {
39+
this.entrypoint ??= String(attrs.command_group ?? 'unknown');
40+
this.entries.push({
41+
metric_name: 'cli.command_run',
42+
resource: this.resource,
43+
attributes: { ...attrs, duration_ms: value },
44+
});
45+
}
46+
47+
async flush(): Promise<void> {
48+
await this.writeEntries();
49+
}
50+
51+
async shutdown(): Promise<void> {
52+
await this.writeEntries();
53+
if (this.filePath !== undefined) {
54+
this.log(`[audit mode] Telemetry written to ${this.filePath}`);
55+
}
56+
}
57+
58+
private async writeEntries(): Promise<void> {
59+
if (this.entries.length === 0 || this.filePath === undefined) return;
60+
await mkdir(this.outputDir, { recursive: true });
61+
await writeFile(this.filePath, JSON.stringify(this.entries, null, 2));
62+
}
63+
}

0 commit comments

Comments
 (0)