Skip to content

Commit 58835a6

Browse files
os-zhuangCopilot
andcommitted
feat(metadata-core): LayeredRepository (ADR-0008 M0 PR-5)
- src/layered-repository.ts: composes N child repos into a read-through stack. Reads walk top-to-bottom; writes go to the topmost writable layer; list() deduplicates by refKey; history() and watch() merge events from all layers, tagging each event source as <layer>:<original-source>. Multiplexed watch correctly cancels all child iterators via return(). - test/layered-repository.test.ts: 8 tests cover shadowing, fall-through, write routing, no-writable-layer error, list dedup, source tagging, child-iterator cancellation, and history merge. 79/79 metadata-core tests pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 32ce912 commit 58835a6

4 files changed

Lines changed: 392 additions & 0 deletions

File tree

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
'@objectstack/metadata-core': minor
3+
---
4+
5+
Add `LayeredRepository` — composes N `MetadataRepository`s into a
6+
read-through stack. Reads walk top-to-bottom; writes route to the
7+
topmost writable layer; `list()` deduplicates by `refKey` preferring
8+
the top; `history()` and `watch()` merge events from all layers,
9+
tagging each event's `source` with `<layer>:<original-source>`. The
10+
multiplexed `watch()` correctly cancels all child iterators when the
11+
consumer calls `return()`.
12+
13+
Enables the canonical "system built-ins under user overlay" pattern
14+
described in ADR-0008.
15+
16+
See ADR-0008 §10 PR-5.

packages/metadata-core/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ export * from './canonicalize.js';
1212
export * from './repository.js';
1313
export * from './in-memory-repository.js';
1414
export * from './cache.js';
15+
export * from './layered-repository.js';
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.
2+
3+
/**
4+
* `LayeredRepository` — composes N child `MetadataRepository`s into a
5+
* single read-through stack. See ADR-0008 §10 PR-5.
6+
*
7+
* Read semantics
8+
* ──────────────
9+
* - `get(ref)` walks the layers top-to-bottom; first non-null wins.
10+
* - `list()` deduplicates by `refKey(ref)`, preferring the top layer.
11+
* - `history()` and `watch()` merge events from all layers, each
12+
* tagged with the source layer label in `evt.source`
13+
* (`<label>:<original-source>`).
14+
*
15+
* Write semantics
16+
* ───────────────
17+
* - `put` / `delete` are routed to the topmost writable layer.
18+
* - A layer is writable unless `readOnly: true` in its config.
19+
* - The write target's existing HEAD is used for the parent check —
20+
* i.e. if a key exists only in a lower layer, writing produces a
21+
* create on the top layer (an "overlay" of the lower one).
22+
*
23+
* Event tagging
24+
* ─────────────
25+
* - Each emitted event carries `source` rewritten as
26+
* `<layer.label>:<source>` so subscribers can tell which layer
27+
* produced the change. Original event content is otherwise
28+
* unchanged.
29+
*
30+
* This implementation never re-orders events relative to seq within a
31+
* single layer, but seqs across layers may interleave. Downstream
32+
* consumers (Cache, SchemaRegistry) only need monotonicity within a
33+
* (layer, branch) tuple — they treat each (layer.label, ref) as a
34+
* separate stream.
35+
*/
36+
37+
import {
38+
type MetadataRepository,
39+
type MetaRef,
40+
type MetadataItem,
41+
type MetadataItemHeader,
42+
type MetadataEvent,
43+
type PutOptions,
44+
type PutResult,
45+
type DeleteOptions,
46+
type DeleteResult,
47+
type ListFilter,
48+
type WatchFilter,
49+
type HistoryOptions,
50+
refKey,
51+
} from './index.js';
52+
53+
export interface LayerConfig {
54+
/** Stable identifier for this layer (e.g. "user", "team", "system"). */
55+
label: string;
56+
/** The backing repository. */
57+
repo: MetadataRepository;
58+
/** When true, this layer rejects writes (used for built-ins). */
59+
readOnly?: boolean;
60+
}
61+
62+
export interface LayeredRepositoryOptions {
63+
/**
64+
* Layers in priority order, highest first. The first writable layer
65+
* receives all writes.
66+
*/
67+
layers: LayerConfig[];
68+
}
69+
70+
const tagSource = (label: string, evt: MetadataEvent): MetadataEvent => ({
71+
...evt,
72+
source: `${label}:${evt.source}`,
73+
});
74+
75+
export class LayeredRepository implements MetadataRepository {
76+
private readonly layers: LayerConfig[];
77+
/** Index of the first writable layer; -1 if none. */
78+
private readonly writableIdx: number;
79+
80+
constructor(opts: LayeredRepositoryOptions) {
81+
if (!opts.layers.length) {
82+
throw new Error('LayeredRepository requires at least one layer');
83+
}
84+
this.layers = opts.layers;
85+
this.writableIdx = opts.layers.findIndex((l) => !l.readOnly);
86+
}
87+
88+
async get(ref: MetaRef): Promise<MetadataItem | null> {
89+
for (const layer of this.layers) {
90+
const item = await layer.repo.get(ref);
91+
if (item) return item;
92+
}
93+
return null;
94+
}
95+
96+
async put(ref: MetaRef, spec: unknown, opts: PutOptions): Promise<PutResult> {
97+
if (this.writableIdx < 0) {
98+
throw new Error('LayeredRepository: no writable layer configured');
99+
}
100+
return this.layers[this.writableIdx]!.repo.put(ref, spec, opts);
101+
}
102+
103+
async delete(ref: MetaRef, opts: DeleteOptions): Promise<DeleteResult> {
104+
if (this.writableIdx < 0) {
105+
throw new Error('LayeredRepository: no writable layer configured');
106+
}
107+
return this.layers[this.writableIdx]!.repo.delete(ref, opts);
108+
}
109+
110+
async *list(filter: ListFilter): AsyncIterable<MetadataItemHeader> {
111+
// Yield headers from top→bottom, deduplicating by refKey.
112+
const seen = new Set<string>();
113+
const limit = filter.limit ?? Infinity;
114+
let yielded = 0;
115+
for (const layer of this.layers) {
116+
for await (const header of layer.repo.list(filter)) {
117+
const key = refKey(header.ref);
118+
if (seen.has(key)) continue;
119+
seen.add(key);
120+
yield header;
121+
if (++yielded >= limit) return;
122+
}
123+
}
124+
}
125+
126+
async *history(ref: MetaRef, opts: HistoryOptions = {}): AsyncIterable<MetadataEvent> {
127+
// Merge histories from all layers, tagged. Order preserves each
128+
// layer's monotonic seq, but events across layers may interleave.
129+
// We collect everything then sort by ts as a best-effort total order.
130+
const events: MetadataEvent[] = [];
131+
for (const layer of this.layers) {
132+
for await (const evt of layer.repo.history(ref, opts)) {
133+
events.push(tagSource(layer.label, evt));
134+
}
135+
}
136+
events.sort((a, b) => (a.ts < b.ts ? -1 : a.ts > b.ts ? 1 : 0));
137+
const limit = opts.limit ?? Infinity;
138+
let yielded = 0;
139+
for (const evt of events) {
140+
yield evt;
141+
if (++yielded >= limit) return;
142+
}
143+
}
144+
145+
watch(filter: WatchFilter, since?: number): AsyncIterable<MetadataEvent> {
146+
// Fan out to all child watchers; multiplex into a single iterator.
147+
return multiplexWatch(this.layers, filter, since);
148+
}
149+
}
150+
151+
/**
152+
* Multiplex N async iterables of MetadataEvent into one, tagging each
153+
* event's `source` with its layer label. Implemented as a manual
154+
* AsyncIterator so we can correctly cancel all child iterators when the
155+
* consumer breaks out.
156+
*/
157+
function multiplexWatch(
158+
layers: LayerConfig[],
159+
filter: WatchFilter,
160+
since: number | undefined,
161+
): AsyncIterable<MetadataEvent> {
162+
return {
163+
[Symbol.asyncIterator]() {
164+
const children = layers.map((layer) => ({
165+
label: layer.label,
166+
iter: layer.repo.watch(filter, since)[Symbol.asyncIterator](),
167+
pending: null as Promise<{ label: string; result: IteratorResult<MetadataEvent> }> | null,
168+
done: false,
169+
}));
170+
let closed = false;
171+
172+
const pumpAll = () => {
173+
for (const c of children) {
174+
if (c.done || c.pending) continue;
175+
c.pending = c.iter.next().then((result) => ({ label: c.label, result }));
176+
}
177+
};
178+
179+
const closeAll = async () => {
180+
if (closed) return;
181+
closed = true;
182+
await Promise.all(
183+
children.map(async (c) => {
184+
try { await c.iter.return?.(undefined); } catch { /* ignore */ }
185+
}),
186+
);
187+
};
188+
189+
return {
190+
async next(): Promise<IteratorResult<MetadataEvent>> {
191+
while (!closed) {
192+
pumpAll();
193+
const inflight = children.filter((c) => c.pending);
194+
if (!inflight.length) return { value: undefined, done: true };
195+
const winner = await Promise.race(inflight.map((c) => c.pending!));
196+
const target = children.find((c) => c.label === winner.label)!;
197+
target.pending = null;
198+
if (winner.result.done) {
199+
target.done = true;
200+
continue;
201+
}
202+
return {
203+
value: tagSource(winner.label, winner.result.value as MetadataEvent),
204+
done: false,
205+
};
206+
}
207+
return { value: undefined, done: true };
208+
},
209+
async return() {
210+
await closeAll();
211+
return { value: undefined, done: true };
212+
},
213+
async throw(err) {
214+
await closeAll();
215+
throw err;
216+
},
217+
} as AsyncIterator<MetadataEvent>;
218+
},
219+
};
220+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.
2+
3+
import { describe, it, expect } from 'vitest';
4+
import { InMemoryRepository } from '../src/in-memory-repository.js';
5+
import { LayeredRepository } from '../src/layered-repository.js';
6+
import type { MetaRef, MetadataEvent } from '../src/types.js';
7+
8+
const ref = (name: string): MetaRef => ({
9+
org: 'system',
10+
project: 'test',
11+
branch: 'main',
12+
type: 'view',
13+
name,
14+
});
15+
16+
const sleep = (ms: number) => new Promise<void>((r) => setTimeout(r, ms));
17+
18+
describe('LayeredRepository', () => {
19+
it('get: top layer shadows lower layers', async () => {
20+
const builtins = new InMemoryRepository();
21+
const userspace = new InMemoryRepository();
22+
await builtins.put(ref('a'), { from: 'builtin' }, { parentVersion: null, actor: 't' });
23+
await userspace.put(ref('a'), { from: 'user' }, { parentVersion: null, actor: 't' });
24+
25+
const layered = new LayeredRepository({
26+
layers: [
27+
{ label: 'user', repo: userspace },
28+
{ label: 'system', repo: builtins, readOnly: true },
29+
],
30+
});
31+
const got = await layered.get(ref('a'));
32+
expect(got?.body).toEqual({ from: 'user' });
33+
});
34+
35+
it('get: falls through to lower layers when top is empty', async () => {
36+
const builtins = new InMemoryRepository();
37+
const userspace = new InMemoryRepository();
38+
await builtins.put(ref('only_in_builtin'), { v: 1 }, { parentVersion: null, actor: 't' });
39+
40+
const layered = new LayeredRepository({
41+
layers: [
42+
{ label: 'user', repo: userspace },
43+
{ label: 'system', repo: builtins, readOnly: true },
44+
],
45+
});
46+
const got = await layered.get(ref('only_in_builtin'));
47+
expect(got?.body).toEqual({ v: 1 });
48+
});
49+
50+
it('put: routes to the topmost writable layer', async () => {
51+
const builtins = new InMemoryRepository();
52+
const userspace = new InMemoryRepository();
53+
const layered = new LayeredRepository({
54+
layers: [
55+
{ label: 'user', repo: userspace },
56+
{ label: 'system', repo: builtins, readOnly: true },
57+
],
58+
});
59+
await layered.put(ref('written'), { v: 1 }, { parentVersion: null, actor: 't' });
60+
expect(await userspace.get(ref('written'))).not.toBeNull();
61+
expect(await builtins.get(ref('written'))).toBeNull();
62+
});
63+
64+
it('put: throws if no writable layer exists', async () => {
65+
const ro = new InMemoryRepository();
66+
const layered = new LayeredRepository({
67+
layers: [{ label: 'system', repo: ro, readOnly: true }],
68+
});
69+
await expect(
70+
layered.put(ref('x'), { v: 1 }, { parentVersion: null, actor: 't' }),
71+
).rejects.toThrow(/no writable layer/);
72+
});
73+
74+
it('list: deduplicates by refKey, preferring the top layer', async () => {
75+
const builtins = new InMemoryRepository();
76+
const userspace = new InMemoryRepository();
77+
await builtins.put(ref('shared'), { from: 'builtin' }, { parentVersion: null, actor: 't' });
78+
await builtins.put(ref('only_builtin'), { v: 1 }, { parentVersion: null, actor: 't' });
79+
await userspace.put(ref('shared'), { from: 'user' }, { parentVersion: null, actor: 't' });
80+
await userspace.put(ref('only_user'), { v: 1 }, { parentVersion: null, actor: 't' });
81+
82+
const layered = new LayeredRepository({
83+
layers: [
84+
{ label: 'user', repo: userspace },
85+
{ label: 'system', repo: builtins, readOnly: true },
86+
],
87+
});
88+
const names: string[] = [];
89+
for await (const header of layered.list({ type: 'view' })) {
90+
names.push(header.ref.name);
91+
}
92+
expect(new Set(names)).toEqual(new Set(['shared', 'only_user', 'only_builtin']));
93+
// 'shared' must come from user (top) → headers should be unique.
94+
expect(names.length).toBe(3);
95+
});
96+
97+
it('watch: tags events with <layer>:<source>', async () => {
98+
const a = new InMemoryRepository();
99+
const b = new InMemoryRepository();
100+
const layered = new LayeredRepository({
101+
layers: [
102+
{ label: 'top', repo: a },
103+
{ label: 'bottom', repo: b },
104+
],
105+
});
106+
const iter = layered.watch({ org: 'system', project: 'test', branch: 'main' })[Symbol.asyncIterator]();
107+
const collected: MetadataEvent[] = [];
108+
const done = (async () => {
109+
for (let i = 0; i < 2; i++) {
110+
const r = await iter.next();
111+
if (r.done) return;
112+
collected.push(r.value as MetadataEvent);
113+
}
114+
})();
115+
await sleep(10);
116+
await a.put(ref('x'), { from: 'top' }, { parentVersion: null, actor: 't' });
117+
await b.put(ref('y'), { from: 'bottom' }, { parentVersion: null, actor: 't' });
118+
await Promise.race([done, sleep(1000)]);
119+
await iter.return?.(undefined);
120+
expect(collected).toHaveLength(2);
121+
const sources = collected.map((e) => e.source).sort();
122+
expect(sources).toEqual(['bottom:in-memory', 'top:in-memory']);
123+
});
124+
125+
it('watch: closing the layered iterator closes all child iterators', async () => {
126+
const a = new InMemoryRepository();
127+
const b = new InMemoryRepository();
128+
const layered = new LayeredRepository({
129+
layers: [{ label: 'a', repo: a }, { label: 'b', repo: b }],
130+
});
131+
const iter = layered.watch({ org: 'system', project: 'test', branch: 'main' })[Symbol.asyncIterator]();
132+
// Schedule a next() that will park (no events yet).
133+
const pending = iter.next();
134+
await sleep(10);
135+
// Closing must unblock pending; otherwise this test will time out.
136+
await iter.return?.(undefined);
137+
const result = await pending;
138+
expect(result.done).toBe(true);
139+
});
140+
141+
it('history: merges from all layers, sorted by ts', async () => {
142+
const a = new InMemoryRepository({ now: () => new Date('2025-01-01T00:00:00Z') });
143+
const b = new InMemoryRepository({ now: () => new Date('2025-01-02T00:00:00Z') });
144+
await a.put(ref('item'), { who: 'a' }, { parentVersion: null, actor: 't' });
145+
await b.put(ref('item'), { who: 'b' }, { parentVersion: null, actor: 't' });
146+
const layered = new LayeredRepository({
147+
layers: [{ label: 'a', repo: a }, { label: 'b', repo: b }],
148+
});
149+
const events: MetadataEvent[] = [];
150+
for await (const evt of layered.history(ref('item'))) events.push(evt);
151+
expect(events).toHaveLength(2);
152+
expect(events[0]!.source.startsWith('a:')).toBe(true);
153+
expect(events[1]!.source.startsWith('b:')).toBe(true);
154+
});
155+
});

0 commit comments

Comments
 (0)