Skip to content

Commit 753a659

Browse files
hyperpolymathclaude
andcommitted
feat: add Groove feedback-o-tron and inter-service health mesh
Add three new endpoints to PanLL's groove server: - POST /.well-known/groove/feedback — accepts and routes feedback events through the Groove mesh to any connected service - GET /.well-known/groove/mesh — returns cached health view of all groove peers discovered via periodic probing - GET /.well-known/groove/status — health status endpoint for mesh probing Mesh monitor runs on a background thread, probing known groove ports (6473, 8080, 8081, 8091-8093) every 30 seconds. Feedback routing looks up the target service in the mesh and forwards via raw HTTP. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 88928a9 commit 753a659

1 file changed

Lines changed: 336 additions & 0 deletions

File tree

src-gossamer/src/groove.rs

Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,30 @@
3535
3636
use std::io::{Read, Write};
3737
use std::net::{SocketAddr, TcpListener, TcpStream};
38+
use std::sync::Mutex;
39+
use std::time::{SystemTime, UNIX_EPOCH};
40+
41+
use once_cell::sync::Lazy;
42+
43+
/// Known groove peers for health mesh probing.
44+
/// PanLL probes these ports to discover neighbouring services.
45+
const MESH_PROBE_PORTS: &[u16] = &[6473, 8080, 8081, 8091, 8092, 8093];
46+
47+
/// Cached mesh health state: peer service_id -> status.
48+
static MESH_STATE: Lazy<Mutex<Vec<PeerHealth>>> = Lazy::new(|| Mutex::new(Vec::new()));
49+
50+
/// Health status of a single peer in the mesh.
51+
#[derive(Clone)]
52+
struct PeerHealth {
53+
/// Peer's self-reported service ID (or "unknown" if probe failed).
54+
service_id: String,
55+
/// Port the peer was discovered on.
56+
port: u16,
57+
/// "up", "degraded", or "down".
58+
status: String,
59+
/// Unix timestamp (milliseconds) of last successful probe.
60+
last_seen_ms: u64,
61+
}
3862

3963
/// The groove capability manifest for PanLL.
4064
/// Matches Groove.idr panllManifest: offers [PanelUI (websocket, /ws)].
@@ -50,6 +74,22 @@ const MANIFEST: &str = r#"{
5074
"endpoint": "/ws",
5175
"requires_auth": false,
5276
"panel_compatible": true
77+
},
78+
"feedback": {
79+
"type": "feedback",
80+
"description": "Groove-routed feedback collector — accepts and routes feedback to any mesh service",
81+
"protocol": "http",
82+
"endpoint": "/.well-known/groove/feedback",
83+
"requires_auth": false,
84+
"panel_compatible": true
85+
},
86+
"health_mesh": {
87+
"type": "health-mesh",
88+
"description": "Inter-service health mesh — monitors peer status via groove probing",
89+
"protocol": "http",
90+
"endpoint": "/.well-known/groove/mesh",
91+
"requires_auth": false,
92+
"panel_compatible": true
5393
}
5494
},
5595
"consumes": ["voice", "text", "presence", "integrity", "octad-storage", "scanning"],
@@ -82,6 +122,9 @@ pub fn spawn() {
82122
}
83123
})
84124
.expect("Failed to spawn groove server thread");
125+
126+
// Start the mesh health monitor (probes peers every 30s).
127+
spawn_mesh_monitor();
85128
}
86129

87130
/// Run the groove discovery HTTP server (blocking).
@@ -128,6 +171,34 @@ fn handle_request(stream: &mut TcpStream) -> Result<(), Box<dyn std::error::Erro
128171
send_response(stream, 200, "application/json", MANIFEST)?;
129172
}
130173

174+
// GET /.well-known/groove/status — Health status for mesh probing.
175+
("GET", "/.well-known/groove/status") => {
176+
send_response(
177+
stream,
178+
200,
179+
"application/json",
180+
r#"{"status":"ok","service":"panll","groove_version":"1"}"#,
181+
)?;
182+
}
183+
184+
// GET /.well-known/groove/mesh — Return the cached mesh health view.
185+
("GET", "/.well-known/groove/mesh") => {
186+
let mesh_json = build_mesh_json();
187+
send_response(stream, 200, "application/json", &mesh_json)?;
188+
}
189+
190+
// POST /.well-known/groove/feedback — Receive feedback routed via Groove.
191+
//
192+
// Schema: { "type": "feedback", "target_service": string,
193+
// "category": string, "message": string, "metadata": object }
194+
//
195+
// If target_service == "panll", saves locally via feedback::commands.
196+
// Otherwise, routes to the target service through the mesh.
197+
("POST", "/.well-known/groove/feedback") => {
198+
let body = extract_body(request);
199+
handle_groove_feedback(stream, &body)?;
200+
}
201+
131202
// GET /health — Simple health check.
132203
("GET", "/health") => {
133204
send_response(
@@ -147,6 +218,269 @@ fn handle_request(stream: &mut TcpStream) -> Result<(), Box<dyn std::error::Erro
147218
Ok(())
148219
}
149220

221+
/// Extract the HTTP body from a raw request string.
222+
///
223+
/// Finds the blank line separating headers from body and returns everything
224+
/// after it. Returns an empty string if no body is present.
225+
fn extract_body(request: &str) -> String {
226+
if let Some(idx) = request.find("\r\n\r\n") {
227+
request[idx + 4..].to_string()
228+
} else if let Some(idx) = request.find("\n\n") {
229+
request[idx + 2..].to_string()
230+
} else {
231+
String::new()
232+
}
233+
}
234+
235+
/// Handle a POST /.well-known/groove/feedback request.
236+
///
237+
/// Validates the feedback JSON, saves it locally if targeted at PanLL,
238+
/// or attempts to route it to the target service via the mesh.
239+
fn handle_groove_feedback(
240+
stream: &mut TcpStream,
241+
body: &str,
242+
) -> Result<(), Box<dyn std::error::Error>> {
243+
let parsed: Result<serde_json::Value, _> = serde_json::from_str(body);
244+
let feedback = match parsed {
245+
Ok(v) => v,
246+
Err(_) => {
247+
send_response(stream, 400, "application/json", r#"{"ok":false,"error":"invalid JSON"}"#)?;
248+
return Ok(());
249+
}
250+
};
251+
252+
let target = feedback
253+
.get("target_service")
254+
.and_then(|v| v.as_str())
255+
.unwrap_or("panll");
256+
257+
if target == "panll" {
258+
// Save locally — delegate to feedback module's storage.
259+
let dir = dirs::home_dir()
260+
.map(|h| h.join(".panll").join("feedback"))
261+
.ok_or("Cannot determine home directory")?;
262+
let _ = std::fs::create_dir_all(&dir);
263+
264+
let timestamp = SystemTime::now()
265+
.duration_since(UNIX_EPOCH)
266+
.unwrap_or_default()
267+
.as_millis();
268+
269+
let wrapped = serde_json::json!({
270+
"id": format!("groove-feedback-{timestamp}"),
271+
"timestamp": timestamp,
272+
"source": "groove",
273+
"report": feedback,
274+
});
275+
276+
let filepath = dir.join(format!("{timestamp}.json"));
277+
let content = serde_json::to_string_pretty(&wrapped).unwrap_or_default();
278+
let _ = std::fs::write(&filepath, content);
279+
280+
let resp = serde_json::json!({
281+
"ok": true,
282+
"routed_to": "panll",
283+
"id": format!("groove-feedback-{timestamp}"),
284+
});
285+
send_response(stream, 200, "application/json", &resp.to_string())?;
286+
} else {
287+
// Route to the target service via the mesh.
288+
match route_feedback_to_peer(target, body) {
289+
Ok(()) => {
290+
let resp = serde_json::json!({
291+
"ok": true,
292+
"routed_to": target,
293+
});
294+
send_response(stream, 200, "application/json", &resp.to_string())?;
295+
}
296+
Err(e) => {
297+
let resp = serde_json::json!({
298+
"ok": false,
299+
"error": format!("routing failed: {e}"),
300+
"target_service": target,
301+
});
302+
send_response(stream, 502, "application/json", &resp.to_string())?;
303+
}
304+
}
305+
}
306+
307+
Ok(())
308+
}
309+
310+
/// Attempt to route feedback to a peer service in the mesh.
311+
///
312+
/// Looks up the peer's port from the cached mesh state and POSTs the
313+
/// feedback to their `/.well-known/groove/feedback` endpoint.
314+
fn route_feedback_to_peer(target: &str, body: &str) -> Result<(), Box<dyn std::error::Error>> {
315+
let peers = MESH_STATE.lock().map_err(|_| "mesh lock poisoned")?;
316+
let peer = peers
317+
.iter()
318+
.find(|p| p.service_id == target && p.status == "up")
319+
.ok_or_else(|| format!("peer '{target}' not found or not up in mesh"))?;
320+
321+
let addr = format!("127.0.0.1:{}", peer.port);
322+
let mut stream = TcpStream::connect(&addr)?;
323+
stream.set_write_timeout(Some(std::time::Duration::from_secs(2)))?;
324+
stream.set_read_timeout(Some(std::time::Duration::from_secs(2)))?;
325+
326+
let request = format!(
327+
"POST /.well-known/groove/feedback HTTP/1.0\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
328+
addr,
329+
body.len(),
330+
body
331+
);
332+
stream.write_all(request.as_bytes())?;
333+
Ok(())
334+
}
335+
336+
/// Build a JSON string representing the current mesh health view.
337+
fn build_mesh_json() -> String {
338+
let peers = MESH_STATE.lock().unwrap_or_else(|e| e.into_inner());
339+
let now_ms = SystemTime::now()
340+
.duration_since(UNIX_EPOCH)
341+
.unwrap_or_default()
342+
.as_millis() as u64;
343+
344+
let peer_entries: Vec<String> = peers
345+
.iter()
346+
.map(|p| {
347+
format!(
348+
r#"{{"service_id":"{}","port":{},"status":"{}","last_seen_ms":{}}}"#,
349+
p.service_id, p.port, p.status, p.last_seen_ms
350+
)
351+
})
352+
.collect();
353+
354+
format!(
355+
r#"{{"service_id":"panll","timestamp_ms":{},"peers":[{}]}}"#,
356+
now_ms,
357+
peer_entries.join(",")
358+
)
359+
}
360+
361+
/// Probe known groove peers and update the cached mesh state.
362+
///
363+
/// Called periodically from the mesh monitor thread. For each port in
364+
/// `MESH_PROBE_PORTS`, sends `GET /.well-known/groove/status` and records
365+
/// the result (up/down + service_id).
366+
pub fn probe_mesh_peers() {
367+
let now_ms = SystemTime::now()
368+
.duration_since(UNIX_EPOCH)
369+
.unwrap_or_default()
370+
.as_millis() as u64;
371+
372+
let mut results = Vec::new();
373+
374+
for &port in MESH_PROBE_PORTS {
375+
// Skip our own port.
376+
if port == 8000 {
377+
continue;
378+
}
379+
380+
let addr = format!("127.0.0.1:{}", port);
381+
let status = match TcpStream::connect_timeout(
382+
&addr.parse().unwrap(),
383+
std::time::Duration::from_millis(500),
384+
) {
385+
Ok(mut stream) => {
386+
stream
387+
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
388+
.ok();
389+
stream
390+
.set_write_timeout(Some(std::time::Duration::from_millis(500)))
391+
.ok();
392+
393+
let request = format!(
394+
"GET /.well-known/groove/status HTTP/1.0\r\nHost: {}\r\nConnection: close\r\n\r\n",
395+
addr
396+
);
397+
if stream.write_all(request.as_bytes()).is_ok() {
398+
let mut buf = vec![0u8; 4096];
399+
let service_id = match stream.read(&mut buf) {
400+
Ok(n) if n > 0 => {
401+
let resp = String::from_utf8_lossy(&buf[..n]);
402+
// Extract service_id from JSON body.
403+
extract_service_id_from_response(&resp)
404+
}
405+
_ => "unknown".to_string(),
406+
};
407+
PeerHealth {
408+
service_id,
409+
port,
410+
status: "up".to_string(),
411+
last_seen_ms: now_ms,
412+
}
413+
} else {
414+
PeerHealth {
415+
service_id: "unknown".to_string(),
416+
port,
417+
status: "down".to_string(),
418+
last_seen_ms: 0,
419+
}
420+
}
421+
}
422+
Err(_) => PeerHealth {
423+
service_id: "unknown".to_string(),
424+
port,
425+
status: "down".to_string(),
426+
last_seen_ms: 0,
427+
},
428+
};
429+
430+
// Only include peers that responded (up or previously known).
431+
if status.status == "up" {
432+
results.push(status);
433+
}
434+
}
435+
436+
if let Ok(mut state) = MESH_STATE.lock() {
437+
*state = results;
438+
}
439+
}
440+
441+
/// Extract the service_id field from a groove status JSON response.
442+
///
443+
/// Performs a simple substring search to avoid pulling in a full JSON parser
444+
/// for probe responses. Falls back to "unknown" if not found.
445+
fn extract_service_id_from_response(response: &str) -> String {
446+
// Find the JSON body (after headers).
447+
let body = if let Some(idx) = response.find("\r\n\r\n") {
448+
&response[idx + 4..]
449+
} else if let Some(idx) = response.find("\n\n") {
450+
&response[idx + 2..]
451+
} else {
452+
response
453+
};
454+
455+
// Try to parse as JSON for the service_id field.
456+
if let Ok(v) = serde_json::from_str::<serde_json::Value>(body) {
457+
if let Some(id) = v.get("service").and_then(|s| s.as_str()) {
458+
return id.to_string();
459+
}
460+
if let Some(id) = v.get("service_id").and_then(|s| s.as_str()) {
461+
return id.to_string();
462+
}
463+
}
464+
465+
"unknown".to_string()
466+
}
467+
468+
/// Spawn the mesh health monitor thread.
469+
///
470+
/// Probes known groove peers every 30 seconds and updates the cached mesh
471+
/// state. This runs alongside the groove HTTP server.
472+
pub fn spawn_mesh_monitor() {
473+
std::thread::Builder::new()
474+
.name("panll-mesh-monitor".into())
475+
.spawn(|| {
476+
loop {
477+
probe_mesh_peers();
478+
std::thread::sleep(std::time::Duration::from_secs(30));
479+
}
480+
})
481+
.expect("Failed to spawn mesh monitor thread");
482+
}
483+
150484
/// Send an HTTP response with the given content type and body.
151485
fn send_response(
152486
stream: &mut TcpStream,
@@ -156,8 +490,10 @@ fn send_response(
156490
) -> Result<(), Box<dyn std::error::Error>> {
157491
let status_text = match status {
158492
200 => "OK",
493+
204 => "No Content",
159494
400 => "Bad Request",
160495
404 => "Not Found",
496+
502 => "Bad Gateway",
161497
_ => "Unknown",
162498
};
163499
let response = format!(

0 commit comments

Comments
 (0)