Add server-side events and html UI [PoC]#171
Add server-side events and html UI [PoC]#171joostjager wants to merge 3 commits intolightningdevkit:mainfrom
Conversation
Protobuf added complexity without much benefit for our use case — the
binary encoding is opaque, hard to debug with standard HTTP tools, and
requires proto toolchain maintenance. JSON is human-readable, widely
supported, and sufficient for our throughput needs.
This removes prost and all .proto files entirely, renaming the
ldk-server-protos crate to ldk-server-json-models. Types are rewritten
as hand-written Rust structs and enums with serde derives rather than
prost-generated code. Fixed-size byte fields (hashes, channel IDs,
public keys) use [u8; 32] and [u8; 33] with hex serde instead of
String, giving type safety at the model layer.
Several proto-era patterns are cleaned up: wrapper structs that only
existed because protobuf wraps oneof in a message are removed, fields
that were Option only because proto message fields are nullable are
made required where the server always provides them, and the
EventEnvelope wrapper is dropped in favor of using Event directly.
Storage namespaces are changed from ("payments", "") to
("ldk-server", "payments") so existing protobuf-encoded data is
silently ignored rather than failing to deserialize, avoiding the
need for migration code or manual database wipes.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove the RabbitMQ-based EventPublisher trait and lapin dependency in favor of a built-in SSE streaming endpoint. Events are now delivered directly to clients over HTTP/TLS via a /Subscribe endpoint, eliminating the need for an external message broker. The server uses a tokio broadcast channel internally. The new SseBody type implements hyper::body::Body to stream events as JSON in SSE format. Event publishing becomes synchronous and fire-and-forget when no subscribers are connected. Add a subscribe command to the CLI that connects to the SSE endpoint and prints each event as a JSON line to stdout. The client library exposes a typed async event stream via LdkServerClient::subscribe(). E2E tests use CliEventConsumer which spawns the CLI subscribe command as a child process, replacing the previous raw TLS/SSE consumer and RabbitMQ consumer. AI tools were used in preparing this commit.
Add CORS headers to all server responses and handle OPTIONS preflight requests, enabling browser-based clients to connect directly to ldk-server. Include a single-file web UI (index.html) that demonstrates connecting to the JSON API and SSE event stream from the browser using @microsoft/fetch-event-source. The UI shows node info, lists peers with keysend buttons, supports BOLT11 payments, and displays the live event stream. AI tools were used in preparing this commit.
|
👋 Hi! I see this is a draft PR. |
| Full::new(Bytes::new()).boxed() | ||
| } | ||
|
|
||
| fn with_cors_headers(mut response: ServiceResponse) -> ServiceResponse { |
There was a problem hiding this comment.
we probably want this configurable
benthecarman
left a comment
There was a problem hiding this comment.
this is great overall!
| [dependencies] | ||
| ldk-server-json-models = { path = "../ldk-server-json-models" } | ||
| reqwest = { version = "0.11.13", default-features = false, features = ["rustls-tls"] } | ||
| reqwest = { version = "0.11.13", default-features = false, features = ["rustls-tls", "stream"] } |
There was a problem hiding this comment.
I know we are planning on moving to bitreq, is this something that it supports?
| let forwarded_payment_creation_time = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as i64; | ||
|
|
||
| match event_publisher.publish( | ||
| let _ = event_sender.send( |
There was a problem hiding this comment.
we should still handle the error properly
| } | ||
| } | ||
|
|
||
| fn error_to_response(e: LdkServerError) -> ServiceResponse { |
There was a problem hiding this comment.
Wouldn't this be better as just a From impl? Then we can just do ?
| pub const GRAPH_GET_CHANNEL_PATH: &str = "GraphGetChannel"; | ||
| pub const GRAPH_LIST_NODES_PATH: &str = "GraphListNodes"; | ||
| pub const GRAPH_GET_NODE_PATH: &str = "GraphGetNode"; | ||
| pub const SUBSCRIBE_PATH: &str = "Subscribe"; |
There was a problem hiding this comment.
Curious on your guy's thoughts on adding something like Subscribe/Payment/<id> and then you just get the events for that id.
Could see this nice for being able to permission a client to only certain events. Also makes it so you can just have a single stream for a payment instead of needing to filter between all events.
| use lapin::types::FieldTable; | ||
| use lapin::{ConnectionProperties, ExchangeKind}; | ||
| impl CliEventConsumer { | ||
| /// Start the CLI subscribe command and begin receiving events in the background. |
There was a problem hiding this comment.
using the cli here seems a little overly complex but i guess thats more e2e so maybe that's correct.
| let payload = response.bytes().await.map_err(|e| { | ||
| LdkServerError::new(InternalError, format!("Failed to read response body: {}", e)) | ||
| })?; | ||
| let error_response = | ||
| serde_json::from_slice::<ErrorResponse>(&payload).map_err(|e| { | ||
| LdkServerError::new( | ||
| JsonParseError, | ||
| format!("Failed to decode error response (status {}): {}", status, e), | ||
| ) | ||
| })?; |
There was a problem hiding this comment.
You should be able to do let error_response: ErrorResponse = response.json() to clean this up a bunch
| Ok(c) => c, | ||
| Err(_) => break, | ||
| }; | ||
| buffer.push_str(&String::from_utf8_lossy(&chunk)); |
There was a problem hiding this comment.
Is there any potential concern here with multiple byte characters being split between multiple chunks and causing errors here?
| Err(_) => break, | ||
| }; | ||
| buffer.push_str(&String::from_utf8_lossy(&chunk)); | ||
| while let Some(pos) = buffer.find("\n\n") { |
There was a problem hiding this comment.
I know this is handling for the SSE format but would be good to have some comments here explaining
| > { | ||
| use futures_util::StreamExt; | ||
| let url = format!("https://{}/{SUBSCRIBE_PATH}", self.base_url); | ||
| let auth_header = self.compute_auth_header(&[]); |
There was a problem hiding this comment.
unrelated to this PR but realizing the auth header should probably commit to the endpoint too
Based on #168