Skip to content

Commit b3ef42d

Browse files
authored
Merge pull request #9 from ThinkOffApp/codexmb/dm-poller-support
Add DM inbox polling to the room watcher
2 parents 3a46eae + d8eeeb0 commit b3ef42d

6 files changed

Lines changed: 283 additions & 23 deletions

File tree

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,12 @@ For Codex Desktop GUI (non-tmux) use command-mode nudging:
130130
"nudge_mode": "command",
131131
"nudge_command": "/ABSOLUTE/PATH/ide-agent-kit/tools/codex_gui_nudge.sh"
132132
},
133+
"dm_poller": {
134+
"enabled": true,
135+
"seen_file": "/tmp/codex-dm-seen.txt",
136+
"limit": 100,
137+
"human_only": false
138+
},
133139
"tmux": {
134140
"ide_session": "codex",
135141
"nudge_text": "check room and respond only if you have something relevant to say [codex]"
@@ -143,6 +149,8 @@ Run:
143149
node bin/cli.mjs rooms watch --config /ABSOLUTE/PATH/ide-agent-kit-codex.json
144150
```
145151

152+
When `dm_poller.enabled` is set, the same watcher also polls `/api/v1/messages?limit=100`, keeps a separate DM seen-state file, and nudges on new `type: "dm"` rows addressed to your configured handle. DM notifications are appended to the normal notification file so existing `rooms check` and GUI-nudge flows continue to work.
153+
146154
There is also a ready-to-copy example at:
147155

148156
```bash

config/codex.desktop.example.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,11 @@
3939
"nudge_command": "/ABSOLUTE/PATH/ide-agent-kit/tools/codex_gui_nudge.sh",
4040
"notification_file": "/tmp/codex-room-notifications.txt",
4141
"seen_file": "/tmp/codex-room-seen.txt"
42+
},
43+
"dm_poller": {
44+
"enabled": true,
45+
"seen_file": "/tmp/codex-dm-seen.txt",
46+
"limit": 100,
47+
"human_only": false
4248
}
4349
}

src/config.mjs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@ const DEFAULT_CONFIG = {
1717
nudge_mode: 'tmux',
1818
nudge_command: ''
1919
},
20+
dm_poller: {
21+
enabled: false,
22+
handle: '',
23+
interval_sec: 30,
24+
seen_file: '/tmp/iak-dm-seen-ids.txt',
25+
api_key: '',
26+
human_only: false,
27+
limit: 100
28+
},
2029
github: { webhook_secret: '', event_kinds: ['pull_request', 'issue_comment', 'check_suite', 'workflow_run'] },
2130
outbound: { default_webhook_url: '' },
2231
rate_limit: { message_interval_sec: 30 },
@@ -63,6 +72,7 @@ export function loadConfig(configPath) {
6372
receipts: { ...DEFAULT_CONFIG.receipts, ...raw.receipts },
6473
tmux: { ...DEFAULT_CONFIG.tmux, ...raw.tmux },
6574
poller: { ...DEFAULT_CONFIG.poller, ...raw.poller },
75+
dm_poller: { ...DEFAULT_CONFIG.dm_poller, ...raw.dm_poller },
6676
github: { ...DEFAULT_CONFIG.github, ...raw.github },
6777
outbound: { ...DEFAULT_CONFIG.outbound, ...raw.outbound },
6878
rate_limit: { ...DEFAULT_CONFIG.rate_limit, ...raw.rate_limit },
@@ -75,7 +85,6 @@ export function loadConfig(configPath) {
7585
},
7686
discord: { ...DEFAULT_CONFIG.discord, ...raw.discord },
7787
acp: { ...DEFAULT_CONFIG.acp, ...raw.acp },
78-
openclaw: raw.openclaw || {},
79-
poller: raw.poller || {}
88+
openclaw: raw.openclaw || {}
8089
};
8190
}

src/team-relay/room-poller.mjs

Lines changed: 164 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,47 @@ function saveSeenIds(path, ids) {
4040
writeFileSync(path, arr.join('\n') + '\n');
4141
}
4242

43+
const DM_SEEN_FILE_DEFAULT = '/tmp/iak-dm-seen-ids.txt';
44+
45+
function normalizeHandle(handle) {
46+
if (typeof handle !== 'string') return '';
47+
const trimmed = handle.trim();
48+
if (!trimmed) return '';
49+
return trimmed.startsWith('@') ? trimmed : `@${trimmed}`;
50+
}
51+
52+
function parsePositiveInt(value, fallback) {
53+
const parsed = Number.parseInt(value, 10);
54+
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
55+
}
56+
57+
function appendNotifications(path, lines) {
58+
if (lines.length === 0) return;
59+
appendFileSync(path, lines.join('\n') + '\n');
60+
}
61+
62+
function triggerNudge({ nudgeMode, nudgeCommandText, nudgeText, session }) {
63+
if (nudgeMode === 'command') {
64+
return nudgeCommand(nudgeCommandText, { text: nudgeText, session });
65+
}
66+
if (nudgeMode === 'none') {
67+
return true;
68+
}
69+
return nudgeTmux(session, nudgeText);
70+
}
71+
72+
export function isRelevantDirectMessage(message, { selfHandle, humanOnly = false } = {}) {
73+
if (!message || message.type !== 'dm' || !message.id) return false;
74+
const sender = normalizeHandle(message.from || message.sender);
75+
const recipient = normalizeHandle(message.to);
76+
const expectedRecipient = normalizeHandle(selfHandle);
77+
if (!sender || !recipient || !expectedRecipient) return false;
78+
if (recipient !== expectedRecipient) return false;
79+
if (sender === expectedRecipient) return false;
80+
if (humanOnly && !message?.metadata?.user) return false;
81+
return true;
82+
}
83+
4384
function nudgeTmux(session, text) {
4485
try {
4586
execSync(`tmux has-session -t ${JSON.stringify(session)} 2>/dev/null`);
@@ -71,6 +112,21 @@ async function fetchRoomMessages(room, apiKey, limit = 10) {
71112
}
72113
}
73114

115+
async function fetchDirectMessages(apiKey, limit = 100) {
116+
const url = `https://groupmind.one/api/v1/messages?limit=${limit}`;
117+
try {
118+
const result = execSync(
119+
`curl -sS -4 -H "X-API-Key: ${apiKey}" "${url}"`,
120+
{ encoding: 'utf8', timeout: 15000 }
121+
);
122+
const data = JSON.parse(result);
123+
return data.messages || (Array.isArray(data) ? data : []);
124+
} catch (e) {
125+
console.error(` fetch direct messages failed: ${e.message}`);
126+
return [];
127+
}
128+
}
129+
74130
/**
75131
* Read and clear the notification file. Returns array of message lines.
76132
* This is the primary way the IDE agent retrieves new messages.
@@ -96,8 +152,17 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config
96152
const nudgeText = config?.tmux?.nudge_text || 'check rooms';
97153
const nudgeMode = config?.poller?.nudge_mode || 'tmux';
98154
const nudgeCommandText = config?.poller?.nudge_command || '';
99-
const pollInterval = interval || config?.poller?.interval_sec || 30;
100-
const selfHandle = handle || config?.poller?.handle || '@unknown';
155+
const pollInterval = parsePositiveInt(interval || config?.poller?.interval_sec, 30);
156+
const selfHandle = normalizeHandle(handle || config?.poller?.handle || '@unknown');
157+
const dmCfg = config?.dm_poller || {};
158+
const dmEnabled = dmCfg.enabled === true;
159+
const dmHandle = normalizeHandle(dmCfg.handle || selfHandle);
160+
const dmSeenFile = dmCfg.seen_file || DM_SEEN_FILE_DEFAULT;
161+
const dmNotifyFile = dmCfg.notification_file || notifyFile;
162+
const dmPollInterval = parsePositiveInt(dmCfg.interval_sec, pollInterval);
163+
const dmApiKey = dmCfg.api_key || dmCfg.apiKey || config?.poller?.api_key || config?.poller?.apiKey || apiKey;
164+
const dmHumanOnly = dmCfg.human_only === true;
165+
const dmLimit = parsePositiveInt(dmCfg.limit, 100);
101166

102167
console.log(`Room poller started`);
103168
console.log(` rooms: ${rooms.join(', ')}`);
@@ -111,9 +176,19 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config
111176
console.log(` nudge command: ${nudgeCommandText || '(missing)'}`);
112177
}
113178
console.log(` seen file: ${seenFile}`);
179+
if (dmEnabled) {
180+
console.log(` direct messages: enabled`);
181+
console.log(` dm handle: ${dmHandle}`);
182+
console.log(` dm interval: ${dmPollInterval}s`);
183+
console.log(` dm seen file: ${dmSeenFile}`);
184+
console.log(` dm notify file: ${dmNotifyFile}`);
185+
console.log(` dm limit: ${dmLimit}`);
186+
console.log(` dm human only: ${dmHumanOnly}`);
187+
}
114188
console.log(` queue: ${queuePath}`);
115189

116190
const seen = loadSeenIds(seenFile);
191+
const dmSeen = dmEnabled ? loadSeenIds(dmSeenFile) : new Set();
117192

118193
// Seed: mark current messages as seen on first run
119194
if (seen.size === 0) {
@@ -128,8 +203,25 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config
128203
console.log(` seeded ${seen.size} IDs`);
129204
}
130205

206+
if (dmEnabled && dmSeen.size === 0) {
207+
console.log(` seeding seen IDs from current direct messages...`);
208+
const directMessages = await fetchDirectMessages(dmApiKey, dmLimit);
209+
for (const message of directMessages) {
210+
if (isRelevantDirectMessage(message, { selfHandle: dmHandle, humanOnly: dmHumanOnly })) {
211+
dmSeen.add(message.id);
212+
}
213+
}
214+
saveSeenIds(dmSeenFile, dmSeen);
215+
console.log(` seeded ${dmSeen.size} DM IDs`);
216+
}
217+
218+
let roomPollInFlight = false;
219+
let dmPollInFlight = false;
131220

132-
async function poll() {
221+
async function pollRooms() {
222+
if (roomPollInFlight) return;
223+
roomPollInFlight = true;
224+
try {
133225
let newCount = 0;
134226
const newMessages = [];
135227
for (const room of rooms) {
@@ -140,8 +232,9 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config
140232
seen.add(mid);
141233

142234
const sender = m.from || m.sender || '?';
235+
const normalizedSender = normalizeHandle(sender);
143236
// Skip own messages
144-
if (sender === selfHandle || sender === selfHandle.replace('@', '')) continue;
237+
if (normalizedSender === selfHandle) continue;
145238

146239
const body = (m.body || '').slice(0, 500);
147240
const ts = m.created_at || new Date().toISOString();
@@ -176,37 +269,88 @@ export async function startRoomPoller({ rooms, apiKey, handle, interval, config
176269

177270
if (newCount > 0) {
178271
// Primary: write to notification file (always works)
179-
appendFileSync(notifyFile, newMessages.join('\n') + '\n');
180-
181-
// Secondary: try configured nudge mode.
182-
let nudged = false;
183-
if (nudgeMode === 'command') {
184-
nudged = nudgeCommand(nudgeCommandText, { text: nudgeText, session });
185-
} else if (nudgeMode === 'none') {
186-
nudged = true;
187-
} else {
188-
nudged = nudgeTmux(session, nudgeText);
189-
}
272+
appendNotifications(notifyFile, newMessages);
273+
const nudged = triggerNudge({ nudgeMode, nudgeCommandText, nudgeText, session });
190274
console.log(` ${newCount} new message(s) → notified${nudged ? ' + nudge' : ''}`);
191275
}
276+
} finally {
277+
roomPollInFlight = false;
278+
}
279+
}
280+
281+
async function pollDirectMessages() {
282+
if (!dmEnabled || dmPollInFlight) return;
283+
dmPollInFlight = true;
284+
try {
285+
let newCount = 0;
286+
const newMessages = [];
287+
const directMessages = await fetchDirectMessages(dmApiKey, dmLimit);
288+
for (const message of directMessages) {
289+
if (!isRelevantDirectMessage(message, { selfHandle: dmHandle, humanOnly: dmHumanOnly })) continue;
290+
const mid = message.id;
291+
if (!mid || dmSeen.has(mid)) continue;
292+
dmSeen.add(mid);
293+
294+
const sender = normalizeHandle(message.from || message.sender) || '?';
295+
const recipient = normalizeHandle(message.to) || dmHandle;
296+
const body = (message.body || '').slice(0, 500);
297+
const ts = message.created_at || new Date().toISOString();
298+
299+
const rawEvent = {
300+
trace_id: randomUUID(),
301+
event_id: mid,
302+
source: 'antfarm',
303+
kind: 'antfarm.dm.created',
304+
timestamp: ts,
305+
room: null,
306+
actor: { login: sender },
307+
payload: { body, type: 'dm', to: recipient },
308+
intent: null,
309+
memory_context: null,
310+
enrichment_errors: []
311+
};
312+
const event = await enrichEvent(rawEvent, config);
313+
appendFileSync(queuePath, JSON.stringify(event) + '\n');
314+
315+
const line = `[${ts.slice(0, 19)}] [dm] ${sender} -> ${recipient}: ${body.replace(/\n/g, ' ').slice(0, 200)}`;
316+
newMessages.push(line);
317+
newCount++;
318+
319+
console.log(` [${ts.slice(0, 19)}] ${sender} DM -> ${recipient}: ${body.slice(0, 80)}...`);
320+
}
321+
322+
saveSeenIds(dmSeenFile, dmSeen);
323+
324+
if (newCount > 0) {
325+
appendNotifications(dmNotifyFile, newMessages);
326+
const nudged = triggerNudge({ nudgeMode, nudgeCommandText, nudgeText, session });
327+
console.log(` ${newCount} new direct message(s) → notified${nudged ? ' + nudge' : ''}`);
328+
}
329+
} finally {
330+
dmPollInFlight = false;
331+
}
192332
}
193333

194334
// Initial poll
195-
await poll();
335+
await pollRooms();
336+
await pollDirectMessages();
196337

197338
// Start interval
198-
const timer = setInterval(poll, pollInterval * 1000);
339+
const roomTimer = setInterval(pollRooms, pollInterval * 1000);
340+
const dmTimer = dmEnabled ? setInterval(pollDirectMessages, dmPollInterval * 1000) : null;
199341

200342
// Handle shutdown
201343
process.on('SIGINT', () => {
202344
console.log('\nPoller stopped.');
203-
clearInterval(timer);
345+
clearInterval(roomTimer);
346+
if (dmTimer) clearInterval(dmTimer);
204347
process.exit(0);
205348
});
206349
process.on('SIGTERM', () => {
207-
clearInterval(timer);
350+
clearInterval(roomTimer);
351+
if (dmTimer) clearInterval(dmTimer);
208352
process.exit(0);
209353
});
210354

211-
return timer;
355+
return { roomTimer, dmTimer };
212356
}

test/config.test.mjs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,29 @@
11
// SPDX-License-Identifier: AGPL-3.0-only
22

3-
import { describe, it } from 'node:test';
3+
import { describe, it, afterEach } from 'node:test';
44
import { strict as assert } from 'node:assert';
5+
import { mkdtempSync, rmSync, writeFileSync } from 'node:fs';
6+
import { join } from 'node:path';
7+
import { tmpdir } from 'node:os';
58
import { loadConfig } from '../src/config.mjs';
69

10+
const tempDirs = [];
11+
12+
afterEach(() => {
13+
while (tempDirs.length > 0) {
14+
rmSync(tempDirs.pop(), { recursive: true, force: true });
15+
}
16+
});
17+
718
describe('config', () => {
819
it('loadConfig returns defaults when no file exists', () => {
920
const cfg = loadConfig('/tmp/iak-nonexistent-config.json');
1021
assert.ok(cfg.listen);
1122
assert.ok(cfg.queue);
1223
assert.ok(cfg.receipts);
1324
assert.ok(cfg.tmux);
25+
assert.ok(cfg.poller);
26+
assert.ok(cfg.dm_poller);
1427
assert.ok(cfg.automation);
1528
assert.ok(cfg.comments);
1629
});
@@ -30,4 +43,30 @@ describe('config', () => {
3043
assert.ok(Array.isArray(cfg.comments.github.repos));
3144
assert.equal(cfg.comments.interval_sec, 120);
3245
});
46+
47+
it('merges partial poller and dm_poller config with defaults', () => {
48+
const dir = mkdtempSync(join(tmpdir(), 'iak-config-'));
49+
tempDirs.push(dir);
50+
const configPath = join(dir, 'config.json');
51+
writeFileSync(configPath, JSON.stringify({
52+
poller: {
53+
rooms: ['thinkoff-development'],
54+
handle: '@CodexMB'
55+
},
56+
dm_poller: {
57+
enabled: true
58+
}
59+
}));
60+
61+
const cfg = loadConfig(configPath);
62+
63+
assert.deepEqual(cfg.poller.rooms, ['thinkoff-development']);
64+
assert.equal(cfg.poller.handle, '@CodexMB');
65+
assert.equal(cfg.poller.interval_sec, 30);
66+
assert.equal(cfg.poller.seen_file, '/tmp/iak-seen-ids.txt');
67+
assert.equal(cfg.dm_poller.enabled, true);
68+
assert.equal(cfg.dm_poller.interval_sec, 30);
69+
assert.equal(cfg.dm_poller.seen_file, '/tmp/iak-dm-seen-ids.txt');
70+
assert.equal(cfg.dm_poller.limit, 100);
71+
});
3372
});

0 commit comments

Comments
 (0)