feat(websocket): add websocket cpb for a2a-rs#63
feat(websocket): add websocket cpb for a2a-rs#63hackeramitkumar wants to merge 1 commit intoa2aproject:mainfrom
Conversation
Signed-off-by: amitami2 <amitami2@cisco.com>
There was a problem hiding this comment.
Code Review
This pull request introduces the a2a-websocket crate, which implements the A2A v1 WebSocket protocol binding for both clients and servers. Key additions include a WebSocketTransport for multiplexed client requests and an axum::Router builder for serving A2A operations over persistent WebSocket connections. The reviewer identified several areas for improvement, including adding a timeout to connection attempts, implementing backpressure by replacing unbounded channels with bounded ones, and removing redundant streamEnd frames in the stream cancellation logic.
| let endpoint = endpoint.into(); | ||
| let parsed = parse_endpoint(&endpoint)?; | ||
|
|
||
| let stream = TcpStream::connect((parsed.host.as_str(), parsed.port)) |
There was a problem hiding this comment.
| )); | ||
| } | ||
|
|
||
| let (outbound_tx, outbound_rx) = mpsc::unbounded_channel::<OutboundClient>(); |
| ws.set_auto_pong(true); | ||
| let mut ws = FragmentCollector::new(ws); | ||
|
|
||
| let (out_tx, mut out_rx) = mpsc::unbounded_channel::<OutboundMessage>(); |
There was a problem hiding this comment.
| if envelope.cancel_stream.unwrap_or(false) { | ||
| let id = envelope.id.clone(); | ||
| let streams = streams.clone(); | ||
| let out_tx = out_tx.clone(); | ||
| tokio::spawn(async move { | ||
| let canceled = { | ||
| let mut map = streams.lock().await; | ||
| map.remove(&id) | ||
| }; | ||
| if let Some(tx) = canceled { | ||
| let _ = tx.send(()); | ||
| } | ||
| send_outbound( | ||
| &out_tx, | ||
| OutboundMessage::Frame(serialize_response( | ||
| WsResponseEnvelope::stream_end(id.clone()), | ||
| )), | ||
| ); | ||
| }); | ||
| return true; | ||
| } |
There was a problem hiding this comment.
The cancelStream handler here sends a streamEnd frame immediately, but the run_streaming_request task (at line 575) also sends a streamEnd frame when it receives the cancellation signal. This results in a redundant frame for active streams.
Furthermore, sending streamEnd here for a request ID that is not a streaming request (or doesn't exist) is incorrect according to the protocol logic. It is better to let the run_streaming_request task handle the streamEnd response exclusively.
if envelope.cancel_stream.unwrap_or(false) {
let id = envelope.id.clone();
let streams = streams.clone();
tokio::spawn(async move {
let mut map = streams.lock().await;
if let Some(tx) = map.remove(&id) {
let _ = tx.send(());
}
});
return true;
}
No description provided.