From c814503e0b056bd16bccab83d177e2850bdba65e Mon Sep 17 00:00:00 2001 From: iamninihuang Date: Wed, 6 May 2026 20:28:47 +0800 Subject: [PATCH 1/6] feat(gateway)!: implement multimodal (image/document/audio) support for LINE and Telegram --- docs/line.md | 1 + docs/telegram.md | 5 + gateway/Cargo.lock | 2 +- gateway/src/adapters/feishu.rs | 370 +++++++++++++++++++++-------- gateway/src/adapters/googlechat.rs | 112 +++++---- gateway/src/adapters/line.rs | 150 +++++++++--- gateway/src/adapters/teams.rs | 16 +- gateway/src/adapters/telegram.rs | 220 ++++++++++++++++- gateway/src/main.rs | 26 +- gateway/src/media.rs | 33 +++ src/gateway.rs | 3 +- 11 files changed, 744 insertions(+), 194 deletions(-) create mode 100644 gateway/src/media.rs diff --git a/docs/line.md b/docs/line.md index a7fe4f8d..810264ac 100644 --- a/docs/line.md +++ b/docs/line.md @@ -83,6 +83,7 @@ In the LINE Developers Console → **Messaging API** tab → scan the QR code wi - **1:1 chat** — send a message to the bot, get an AI agent response - **Group chat** — add the bot to a group, it responds to all messages +- **Images** — send image messages to the bot (automatically compressed and resized) - **Webhook signature validation** — HMAC-SHA256 via `LINE_CHANNEL_SECRET` ### Not Supported (LINE API limitations) diff --git a/docs/telegram.md b/docs/telegram.md index d7dd9ae0..590589cd 100644 --- a/docs/telegram.md +++ b/docs/telegram.md @@ -168,6 +168,11 @@ explain VPC peering ← ignored in groups DMs and replies within forum topics always trigger the agent (no @mention needed). +### File Attachments + +- **Images** — send photos (compressed/resized automatically). +- **Documents** — send text-based files (e.g. `.txt`, `.csv`, `.rs`, `.py`) up to 512KB. They are passed directly to the agent as text. + ### Emoji reactions The bot shows status reactions on your message as the agent works: diff --git a/gateway/Cargo.lock b/gateway/Cargo.lock index b0fa728b..8b5ba6be 100644 --- a/gateway/Cargo.lock +++ b/gateway/Cargo.lock @@ -1112,7 +1112,7 @@ checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] name = "openab-gateway" -version = "0.1.0" +version = "0.4.0" dependencies = [ "aes", "anyhow", diff --git a/gateway/src/adapters/feishu.rs b/gateway/src/adapters/feishu.rs index 922fae34..27b306d3 100644 --- a/gateway/src/adapters/feishu.rs +++ b/gateway/src/adapters/feishu.rs @@ -106,8 +106,8 @@ impl FeishuConfig { "webhook" => ConnectionMode::Webhook, _ => ConnectionMode::Websocket, }; - let webhook_path = std::env::var("FEISHU_WEBHOOK_PATH") - .unwrap_or_else(|_| "/webhook/feishu".into()); + let webhook_path = + std::env::var("FEISHU_WEBHOOK_PATH").unwrap_or_else(|_| "/webhook/feishu".into()); let verification_token = std::env::var("FEISHU_VERIFICATION_TOKEN").ok(); let encrypt_key = std::env::var("FEISHU_ENCRYPT_KEY").ok(); let allowed_groups = parse_csv("FEISHU_ALLOWED_GROUPS"); @@ -308,7 +308,9 @@ mod event_types { return None; } - let content_json: serde_json::Value = msg.content.as_deref() + let content_json: serde_json::Value = msg + .content + .as_deref() .and_then(|s| serde_json::from_str(s).ok())?; let message_id = msg.message_id.as_deref()?; @@ -317,9 +319,8 @@ mod event_types { let (clean_text, mention_ids, media_refs) = match msg_type { "image" => { let image_key = content_json.get("image_key")?.as_str()?; - let mentions = extract_mentions( - "", msg.mentions.as_deref().unwrap_or(&[]), bot_open_id, - ); + let mentions = + extract_mentions("", msg.mentions.as_deref().unwrap_or(&[]), bot_open_id); let refs = vec![MediaRef::Image { message_id: message_id.to_string(), image_key: image_key.to_string(), @@ -328,12 +329,12 @@ mod event_types { } "file" => { let file_key = content_json.get("file_key")?.as_str()?; - let file_name = content_json.get("file_name") + let file_name = content_json + .get("file_name") .and_then(|v| v.as_str()) .unwrap_or("unknown"); - let mentions = extract_mentions( - "", msg.mentions.as_deref().unwrap_or(&[]), bot_open_id, - ); + let mentions = + extract_mentions("", msg.mentions.as_deref().unwrap_or(&[]), bot_open_id); let refs = vec![MediaRef::File { message_id: message_id.to_string(), file_key: file_key.to_string(), @@ -356,7 +357,9 @@ mod event_types { } } Some("img") => { - if let Some(key) = el.get("image_key").and_then(|v| v.as_str()) { + if let Some(key) = + el.get("image_key").and_then(|v| v.as_str()) + { refs.push(MediaRef::Image { message_id: message_id.to_string(), image_key: key.to_string(), @@ -387,7 +390,10 @@ mod event_types { } _ => { // text - let raw_text = content_json.get("text").and_then(|v| v.as_str()).unwrap_or(""); + let raw_text = content_json + .get("text") + .and_then(|v| v.as_str()) + .unwrap_or(""); if raw_text.trim().is_empty() { return None; } @@ -605,7 +611,8 @@ impl FeishuTokenCache { let expire = body.get("expire").and_then(|v| v.as_u64()).unwrap_or(7200); - let token = body.get("tenant_access_token") + let token = body + .get("tenant_access_token") .and_then(|v| v.as_str()) .map(|s| s.to_string()) .ok_or_else(|| anyhow::anyhow!("feishu token refresh: missing tenant_access_token"))?; @@ -697,7 +704,10 @@ async fn get_ws_endpoint( let body: serde_json::Value = resp.json().await?; let code = body.get("code").and_then(|v| v.as_i64()).unwrap_or(-1); if code != 0 { - let msg = body.get("msg").and_then(|v| v.as_str()).unwrap_or("unknown"); + let msg = body + .get("msg") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); anyhow::bail!("feishu ws endpoint error: code={code} msg={msg}"); } body.get("data") @@ -914,7 +924,9 @@ async fn handle_ws_message( let bot_id = bot_open_id_store.read().await; let bot_id_ref = bot_id.as_deref(); - if let Some((mut gateway_event, media_refs)) = parse_message_event(&envelope, bot_id_ref, config) { + if let Some((mut gateway_event, media_refs)) = + parse_message_event(&envelope, bot_id_ref, config) + { // Also dedupe by message_id if dedupe.is_duplicate(&gateway_event.message_id) { return; @@ -944,8 +956,13 @@ async fn handle_ws_message( // Resolve sender display name (lazy, cached) let name = resolve_user_name( - &gateway_event.sender.id, name_cache, token_cache, client, &config.api_base(), - ).await; + &gateway_event.sender.id, + name_cache, + token_cache, + client, + &config.api_base(), + ) + .await; gateway_event.sender.name = name.clone(); gateway_event.sender.display_name = name; @@ -955,11 +972,22 @@ async fn handle_ws_message( let api_base = config.api_base(); for media_ref in &media_refs { let attachment = match media_ref { - MediaRef::Image { message_id, image_key } => { - download_feishu_image(client, &api_base, &token, message_id, image_key).await + MediaRef::Image { + message_id, + image_key, + } => { + download_feishu_image(client, &api_base, &token, message_id, image_key) + .await } - MediaRef::File { message_id, file_key, file_name } => { - download_feishu_file(client, &api_base, &token, message_id, file_key, file_name).await + MediaRef::File { + message_id, + file_key, + file_name, + } => { + download_feishu_file( + client, &api_base, &token, message_id, file_key, file_name, + ) + .await } }; if let Some(att) = attachment { @@ -970,7 +998,9 @@ async fn handle_ws_message( } // Skip if no text and no attachments (e.g. unsupported file type) - if gateway_event.content.text.trim().is_empty() && gateway_event.content.attachments.is_empty() { + if gateway_event.content.text.trim().is_empty() + && gateway_event.content.attachments.is_empty() + { return; } @@ -1045,9 +1075,14 @@ async fn edit_feishu_message(adapter: &FeishuAdapter, message_id: &str, text: &s "msg_type": "post", "content": post_content.to_string(), }); - match adapter.client.put(&url).bearer_auth(&token) + match adapter + .client + .put(&url) + .bearer_auth(&token) .header("Content-Type", "application/json; charset=utf-8") - .json(&body).send().await + .json(&body) + .send() + .await { Ok(resp) if resp.status().is_success() => { tracing::trace!(message_id = %message_id, "feishu message edited"); @@ -1264,42 +1299,18 @@ fn try_parse_link(chars: &[char], start: usize) -> Option<(String, String, usize /// Reference to a media resource that needs async download after parse_message_event. pub enum MediaRef { - Image { message_id: String, image_key: String }, - File { message_id: String, file_key: String, file_name: String }, + Image { + message_id: String, + image_key: String, + }, + File { + message_id: String, + file_key: String, + file_name: String, + }, } -const IMAGE_MAX_DIMENSION_PX: u32 = 1200; -const IMAGE_JPEG_QUALITY: u8 = 75; -const IMAGE_MAX_DOWNLOAD: u64 = 10 * 1024 * 1024; // 10 MB -const FILE_MAX_DOWNLOAD: u64 = 512 * 1024; // 512 KB - -/// Resize image so longest side <= 1200px, then encode as JPEG. -/// GIFs are passed through unchanged to preserve animation. -fn resize_and_compress(raw: &[u8]) -> Result<(Vec, String), image::ImageError> { - use image::ImageReader; - use std::io::Cursor; - - let reader = ImageReader::new(Cursor::new(raw)).with_guessed_format()?; - let format = reader.format(); - if format == Some(image::ImageFormat::Gif) { - return Ok((raw.to_vec(), "image/gif".to_string())); - } - let img = reader.decode()?; - let (w, h) = (img.width(), img.height()); - let img = if w > IMAGE_MAX_DIMENSION_PX || h > IMAGE_MAX_DIMENSION_PX { - let max_side = std::cmp::max(w, h); - let ratio = f64::from(IMAGE_MAX_DIMENSION_PX) / f64::from(max_side); - let new_w = (f64::from(w) * ratio) as u32; - let new_h = (f64::from(h) * ratio) as u32; - img.resize(new_w, new_h, image::imageops::FilterType::Lanczos3) - } else { - img - }; - let mut buf = Cursor::new(Vec::new()); - let encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf, IMAGE_JPEG_QUALITY); - img.write_with_encoder(encoder)?; - Ok((buf.into_inner(), "image/jpeg".to_string())) -} +use crate::media::{resize_and_compress, FILE_MAX_DOWNLOAD, IMAGE_MAX_DOWNLOAD}; /// Download a Feishu image by message_id + image_key → resize/compress → base64 Attachment. pub async fn download_feishu_image( @@ -1328,7 +1339,11 @@ pub async fn download_feishu_image( if let Some(cl) = resp.headers().get(reqwest::header::CONTENT_LENGTH) { if let Ok(size) = cl.to_str().unwrap_or("0").parse::() { if size > IMAGE_MAX_DOWNLOAD { - tracing::warn!(image_key, size, "feishu image Content-Length exceeds 10MB limit, skipping download"); + tracing::warn!( + image_key, + size, + "feishu image Content-Length exceeds 10MB limit, skipping download" + ); return None; } } @@ -1336,7 +1351,11 @@ pub async fn download_feishu_image( let bytes = resp.bytes().await.ok()?; // Fallback check (Content-Length may be absent or misreported) if bytes.len() as u64 > IMAGE_MAX_DOWNLOAD { - tracing::warn!(image_key, size = bytes.len(), "feishu image exceeds 10MB limit"); + tracing::warn!( + image_key, + size = bytes.len(), + "feishu image exceeds 10MB limit" + ); return None; } let (compressed, mime) = match resize_and_compress(&bytes) { @@ -1370,9 +1389,9 @@ pub async fn download_feishu_file( // Only download text-like files let ext = file_name.rsplit('.').next().unwrap_or("").to_lowercase(); const TEXT_EXTS: &[&str] = &[ - "txt", "csv", "log", "md", "json", "jsonl", "yaml", "yml", "toml", "xml", - "rs", "py", "js", "ts", "jsx", "tsx", "go", "java", "c", "cpp", "h", "hpp", - "rb", "sh", "bash", "sql", "html", "css", "ini", "cfg", "conf", "env", + "txt", "csv", "log", "md", "json", "jsonl", "yaml", "yml", "toml", "xml", "rs", "py", "js", + "ts", "jsx", "tsx", "go", "java", "c", "cpp", "h", "hpp", "rb", "sh", "bash", "sql", + "html", "css", "ini", "cfg", "conf", "env", ]; if !TEXT_EXTS.contains(&ext.as_str()) { tracing::debug!(file_name, "skipping non-text file attachment"); @@ -1397,7 +1416,11 @@ pub async fn download_feishu_file( if let Some(cl) = resp.headers().get(reqwest::header::CONTENT_LENGTH) { if let Ok(size) = cl.to_str().unwrap_or("0").parse::() { if size > FILE_MAX_DOWNLOAD { - tracing::warn!(file_name, size, "feishu file Content-Length exceeds 512KB limit, skipping download"); + tracing::warn!( + file_name, + size, + "feishu file Content-Length exceeds 512KB limit, skipping download" + ); return None; } } @@ -1405,7 +1428,11 @@ pub async fn download_feishu_file( let bytes = resp.bytes().await.ok()?; // Fallback check (Content-Length may be absent or misreported) if bytes.len() as u64 > FILE_MAX_DOWNLOAD { - tracing::warn!(file_name, size = bytes.len(), "feishu file exceeds 512KB limit"); + tracing::warn!( + file_name, + size = bytes.len(), + "feishu file exceeds 512KB limit" + ); return None; } let text = String::from_utf8_lossy(&bytes); @@ -1442,7 +1469,10 @@ pub async fn send_post_message( ) } else { ( - format!("{}/open-apis/im/v1/messages?receive_id_type=chat_id", api_base), + format!( + "{}/open-apis/im/v1/messages?receive_id_type=chat_id", + api_base + ), serde_json::json!({ "receive_id": chat_id, "msg_type": "post", @@ -1577,13 +1607,18 @@ async fn add_reaction(adapter: &FeishuAdapter, message_id: &str, emoji: &str) { }; let token = match adapter.token_cache.get_token(&adapter.client).await { Ok(t) => t, - Err(e) => { tracing::error!(err = %e, "feishu: cannot get token for reaction"); return; } + Err(e) => { + tracing::error!(err = %e, "feishu: cannot get token for reaction"); + return; + } }; let url = format!( "{}/open-apis/im/v1/messages/{}/reactions", - adapter.config.api_base(), message_id + adapter.config.api_base(), + message_id ); - let _ = adapter.client + let _ = adapter + .client .post(&url) .bearer_auth(&token) .json(&serde_json::json!({"reaction_type": {"emoji_type": reaction_type}})) @@ -1599,15 +1634,26 @@ async fn remove_reaction(adapter: &FeishuAdapter, message_id: &str, emoji: &str) }; let token = match adapter.token_cache.get_token(&adapter.client).await { Ok(t) => t, - Err(e) => { tracing::error!(err = %e, "feishu: cannot get token for reaction"); return; } + Err(e) => { + tracing::error!(err = %e, "feishu: cannot get token for reaction"); + return; + } }; // Feishu remove reaction needs reaction_id. Simpler approach: delete by type. // GET reactions, find matching, DELETE by id. let list_url = format!( "{}/open-apis/im/v1/messages/{}/reactions?reaction_type={}", - adapter.config.api_base(), message_id, reaction_type + adapter.config.api_base(), + message_id, + reaction_type ); - let resp = match adapter.client.get(&list_url).bearer_auth(&token).send().await { + let resp = match adapter + .client + .get(&list_url) + .bearer_auth(&token) + .send() + .await + { Ok(r) => r, Err(_) => return, }; @@ -1619,15 +1665,24 @@ async fn remove_reaction(adapter: &FeishuAdapter, message_id: &str, emoji: &str) if let Some(items) = body.pointer("/data/items").and_then(|v| v.as_array()) { let bot_id = adapter.bot_open_id.read().await; for item in items { - let is_ours = item.pointer("/operator/operator_id/open_id") - .and_then(|v| v.as_str()) == bot_id.as_deref(); + let is_ours = item + .pointer("/operator/operator_id/open_id") + .and_then(|v| v.as_str()) + == bot_id.as_deref(); if is_ours { if let Some(reaction_id) = item.get("reaction_id").and_then(|v| v.as_str()) { let del_url = format!( "{}/open-apis/im/v1/messages/{}/reactions/{}", - adapter.config.api_base(), message_id, reaction_id + adapter.config.api_base(), + message_id, + reaction_id ); - let _ = adapter.client.delete(&del_url).bearer_auth(&token).send().await; + let _ = adapter + .client + .delete(&del_url) + .bearer_auth(&token) + .send() + .await; return; } } @@ -1698,7 +1753,16 @@ pub async fn handle_reply( // Use post (rich text) format for markdown rendering. // When in a thread (thread_id present), use reply API to stay in the same thread. if text.len() <= limit { - match send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, thread_id, text).await { + match send_post_message( + &adapter.client, + &api_base, + &token, + &reply.channel.id, + thread_id, + text, + ) + .await + { Some(msg_id) => { adapter.dedupe.is_duplicate(&msg_id); // Send response with message_id back to OAB core (for streaming edit) @@ -1735,7 +1799,16 @@ pub async fn handle_reply( } } else { for chunk in split_text(text, limit) { - if let Some(msg_id) = send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, thread_id, chunk).await { + if let Some(msg_id) = send_post_message( + &adapter.client, + &api_base, + &token, + &reply.channel.id, + thread_id, + chunk, + ) + .await + { adapter.dedupe.is_duplicate(&msg_id); } } @@ -1839,11 +1912,9 @@ fn verify_signature( fn decrypt_event(encrypt_key: &str, encrypted: &str) -> anyhow::Result { use sha2::{Digest, Sha256}; let key = Sha256::digest(encrypt_key.as_bytes()); - let cipher_bytes = base64::Engine::decode( - &base64::engine::general_purpose::STANDARD, - encrypted, - ) - .map_err(|e| anyhow::anyhow!("base64 decode failed: {e}"))?; + let cipher_bytes = + base64::Engine::decode(&base64::engine::general_purpose::STANDARD, encrypted) + .map_err(|e| anyhow::anyhow!("base64 decode failed: {e}"))?; if cipher_bytes.len() < 16 { anyhow::bail!("encrypted data too short"); @@ -1892,7 +1963,10 @@ pub async fn webhook( .and_then(|v| v.to_str().ok()) .unwrap_or("unknown"); if feishu.rate_limiter.check(ip) { - return (axum::http::StatusCode::TOO_MANY_REQUESTS, "rate limit exceeded") + return ( + axum::http::StatusCode::TOO_MANY_REQUESTS, + "rate limit exceeded", + ) .into_response(); } @@ -2012,12 +2086,18 @@ pub async fn webhook( let bot_id = feishu.bot_open_id.read().await; let bot_id_ref = bot_id.as_deref(); - if let Some((mut gateway_event, media_refs)) = parse_message_event(&envelope, bot_id_ref, &feishu.config) { + if let Some((mut gateway_event, media_refs)) = + parse_message_event(&envelope, bot_id_ref, &feishu.config) + { if !feishu.dedupe.is_duplicate(&gateway_event.message_id) { let name = resolve_user_name( - &gateway_event.sender.id, &feishu.name_cache, &feishu.token_cache, - &feishu.client, &feishu.config.api_base(), - ).await; + &gateway_event.sender.id, + &feishu.name_cache, + &feishu.token_cache, + &feishu.client, + &feishu.config.api_base(), + ) + .await; gateway_event.sender.name = name.clone(); gateway_event.sender.display_name = name; @@ -2027,11 +2107,33 @@ pub async fn webhook( let api_base = feishu.config.api_base(); for media_ref in &media_refs { let attachment = match media_ref { - MediaRef::Image { message_id, image_key } => { - download_feishu_image(&feishu.client, &api_base, &token, message_id, image_key).await + MediaRef::Image { + message_id, + image_key, + } => { + download_feishu_image( + &feishu.client, + &api_base, + &token, + message_id, + image_key, + ) + .await } - MediaRef::File { message_id, file_key, file_name } => { - download_feishu_file(&feishu.client, &api_base, &token, message_id, file_key, file_name).await + MediaRef::File { + message_id, + file_key, + file_name, + } => { + download_feishu_file( + &feishu.client, + &api_base, + &token, + message_id, + file_key, + file_name, + ) + .await } }; if let Some(att) = attachment { @@ -2042,7 +2144,9 @@ pub async fn webhook( } // Skip if no text and no attachments (e.g. unsupported file type) - if gateway_event.content.text.trim().is_empty() && gateway_event.content.attachments.is_empty() { + if gateway_event.content.text.trim().is_empty() + && gateway_event.content.attachments.is_empty() + { return axum::http::StatusCode::OK.into_response(); } @@ -2334,7 +2438,7 @@ mod tests { fn parse_group_without_mention_filtered() { let env = make_envelope("group", "just chatting", "ou_user1", None); let cfg = test_config(); // require_mention = true - // Gateway-side mention gating: group message without bot mention is filtered + // Gateway-side mention gating: group message without bot mention is filtered assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); } @@ -2350,7 +2454,13 @@ mod tests { #[test] fn parse_skips_bot_sender() { let mut env = make_envelope("p2p", "hello", "ou_bot", None); - env.event.as_mut().unwrap().sender.as_mut().unwrap().sender_type = Some("bot".into()); + env.event + .as_mut() + .unwrap() + .sender + .as_mut() + .unwrap() + .sender_type = Some("bot".into()); let cfg = test_config(); assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); } @@ -2365,7 +2475,13 @@ mod tests { #[test] fn parse_skips_non_text_message() { let mut env = make_envelope("p2p", "hello", "ou_user1", None); - env.event.as_mut().unwrap().message.as_mut().unwrap().message_type = Some("sticker".into()); + env.event + .as_mut() + .unwrap() + .message + .as_mut() + .unwrap() + .message_type = Some("sticker".into()); let cfg = test_config(); assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); } @@ -2459,11 +2575,25 @@ mod tests { let name_cache = Arc::new(std::sync::Mutex::new(HashMap::new())); let client = reqwest::Client::new(); - let name = resolve_user_name("ou_user1", &name_cache, &token_cache, &client, &server.uri()).await; + let name = resolve_user_name( + "ou_user1", + &name_cache, + &token_cache, + &client, + &server.uri(), + ) + .await; assert_eq!(name, "Alice"); // Second call should use cache (expect(1) above ensures no second API call) - let name2 = resolve_user_name("ou_user1", &name_cache, &token_cache, &client, &server.uri()).await; + let name2 = resolve_user_name( + "ou_user1", + &name_cache, + &token_cache, + &client, + &server.uri(), + ) + .await; assert_eq!(name2, "Alice"); } @@ -2490,7 +2620,14 @@ mod tests { let name_cache = Arc::new(std::sync::Mutex::new(HashMap::new())); let client = reqwest::Client::new(); - let name = resolve_user_name("ou_unknown", &name_cache, &token_cache, &client, &server.uri()).await; + let name = resolve_user_name( + "ou_unknown", + &name_cache, + &token_cache, + &client, + &server.uri(), + ) + .await; assert_eq!(name, "ou_unknown"); } @@ -2501,10 +2638,17 @@ mod tests { // If mention key appears in normal text too, only the first occurrence is removed let mentions = vec![FeishuMention { key: Some("@_user_1".into()), - id: Some(FeishuMentionId { open_id: Some("ou_bot".into()) }), + id: Some(FeishuMentionId { + open_id: Some("ou_bot".into()), + }), name: Some("Bot".into()), }]; - let env = make_envelope("group", "@_user_1 tell me about @_user_1 patterns", "ou_user1", Some(mentions)); + let env = make_envelope( + "group", + "@_user_1 tell me about @_user_1 patterns", + "ou_user1", + Some(mentions), + ); let cfg = test_config(); let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg).unwrap(); // Only first @_user_1 removed, second preserved @@ -2535,7 +2679,9 @@ mod tests { fn parse_allowed_groups_blocks_unlisted() { let mentions = vec![FeishuMention { key: Some("@_user_1".into()), - id: Some(FeishuMentionId { open_id: Some("ou_bot".into()) }), + id: Some(FeishuMentionId { + open_id: Some("ou_bot".into()), + }), name: Some("Bot".into()), }]; let env = make_envelope("group", "@_user_1 hello", "ou_user1", Some(mentions)); @@ -2548,7 +2694,9 @@ mod tests { fn parse_allowed_groups_permits_listed() { let mentions = vec![FeishuMention { key: Some("@_user_1".into()), - id: Some(FeishuMentionId { open_id: Some("ou_bot".into()) }), + id: Some(FeishuMentionId { + open_id: Some("ou_bot".into()), + }), name: Some("Bot".into()), }]; let env = make_envelope("group", "@_user_1 hello", "ou_user1", Some(mentions)); @@ -2609,7 +2757,13 @@ mod tests { #[test] fn parse_thread_id_from_root_id() { let mut env = make_envelope("p2p", "reply", "ou_user1", None); - env.event.as_mut().unwrap().message.as_mut().unwrap().root_id = Some("om_root".into()); + env.event + .as_mut() + .unwrap() + .message + .as_mut() + .unwrap() + .root_id = Some("om_root".into()); let cfg = test_config(); let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg).unwrap(); assert_eq!(evt.channel.thread_id, Some("om_root".into())); @@ -2618,7 +2772,13 @@ mod tests { #[test] fn parse_thread_id_from_parent_id() { let mut env = make_envelope("p2p", "reply", "ou_user1", None); - env.event.as_mut().unwrap().message.as_mut().unwrap().parent_id = Some("om_parent".into()); + env.event + .as_mut() + .unwrap() + .message + .as_mut() + .unwrap() + .parent_id = Some("om_parent".into()); let cfg = test_config(); let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg).unwrap(); assert_eq!(evt.channel.thread_id, Some("om_parent".into())); diff --git a/gateway/src/adapters/googlechat.rs b/gateway/src/adapters/googlechat.rs index 68759e02..ed3b07d1 100644 --- a/gateway/src/adapters/googlechat.rs +++ b/gateway/src/adapters/googlechat.rs @@ -231,13 +231,17 @@ impl GoogleChatAdapter { }; let formatted = markdown_to_gchat(text); - let url = format!( - "{}/{}?updateMask=text", - self.api_base, message_name - ); + let url = format!("{}/{}?updateMask=text", self.api_base, message_name); let body = serde_json::json!({ "text": formatted }); - match self.client.patch(&url).bearer_auth(&token).json(&body).send().await { + match self + .client + .patch(&url) + .bearer_auth(&token) + .json(&body) + .send() + .await + { Ok(r) if r.status().is_success() => { tracing::trace!(message_name = %message_name, "googlechat message edited"); } @@ -261,7 +265,8 @@ impl GoogleChatAdapter { match reply.command.as_deref() { Some("add_reaction") | Some("remove_reaction") | Some("create_topic") => return, Some("edit_message") => { - self.edit_message(&reply.reply_to, &reply.content.text).await; + self.edit_message(&reply.reply_to, &reply.content.text) + .await; return; } _ => {} @@ -397,10 +402,7 @@ pub async fn webhook( if let Some(ref adapter) = state.google_chat { if let Some(ref verifier) = adapter.jwt_verifier { - let auth_header = match headers - .get("authorization") - .and_then(|v| v.to_str().ok()) - { + let auth_header = match headers.get("authorization").and_then(|v| v.to_str().ok()) { Some(h) => h, None => { warn!("googlechat webhook: missing authorization header"); @@ -466,12 +468,7 @@ pub async fn webhook( let thread_id = msg.thread.as_ref().map(|t| t.name.clone()); - let message_id = msg - .name - .rsplit('/') - .next() - .unwrap_or(&msg.name) - .to_string(); + let message_id = msg.name.rsplit('/').next().unwrap_or(&msg.name).to_string(); let gw_event = GatewayEvent::new( "googlechat", @@ -559,7 +556,9 @@ impl GoogleChatTokenCache { } async fn refresh(&self, client: &reqwest::Client) -> Result<(String, u64), String> { - let jwt = self.build_jwt().map_err(|e| format!("JWT build error: {e}"))?; + let jwt = self + .build_jwt() + .map_err(|e| format!("JWT build error: {e}"))?; let resp = client .post("https://oauth2.googleapis.com/token") .form(&[ @@ -612,8 +611,7 @@ impl GoogleChatTokenCache { let key = jsonwebtoken::EncodingKey::from_rsa_pem(self.private_key.as_bytes()) .map_err(|e| format!("RSA key parse error: {e}"))?; let header = jsonwebtoken::Header::new(jsonwebtoken::Algorithm::RS256); - jsonwebtoken::encode(&header, &claims, &key) - .map_err(|e| format!("JWT encode error: {e}")) + jsonwebtoken::encode(&header, &claims, &key).map_err(|e| format!("JWT encode error: {e}")) } } @@ -981,7 +979,10 @@ mod tests { let msg = payload.message.as_ref().unwrap(); assert_eq!(msg.argument_text.as_deref(), Some("hi")); assert_eq!(msg.thread.as_ref().unwrap().name, "spaces/SP/threads/t1"); - assert_eq!(payload.space.as_ref().unwrap().space_type.as_deref(), Some("ROOM")); + assert_eq!( + payload.space.as_ref().unwrap().space_type.as_deref(), + Some("ROOM") + ); } #[test] @@ -1344,15 +1345,16 @@ mod tests { #[tokio::test] async fn handle_reply_sends_gateway_response_success() { - use wiremock::{Mock, MockServer, ResponseTemplate}; use wiremock::matchers::{method, path_regex}; + use wiremock::{Mock, MockServer, ResponseTemplate}; let mock_server = MockServer::start().await; Mock::given(method("POST")) .and(path_regex("/spaces/.*/messages")) - .respond_with(ResponseTemplate::new(200).set_body_json( - serde_json::json!({"name": "spaces/TEST/messages/msg_abc"}), - )) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(serde_json::json!({"name": "spaces/TEST/messages/msg_abc"})), + ) .mount(&mock_server) .await; @@ -1371,6 +1373,7 @@ mod tests { content: Content { content_type: "text".into(), text: "hello".into(), + attachments: Vec::new(), }, command: None, request_id: Some("req_123".into()), @@ -1388,8 +1391,8 @@ mod tests { #[tokio::test] async fn handle_reply_sends_failure_response_on_api_error() { - use wiremock::{Mock, MockServer, ResponseTemplate}; use wiremock::matchers::{method, path_regex}; + use wiremock::{Mock, MockServer, ResponseTemplate}; let mock_server = MockServer::start().await; Mock::given(method("POST")) @@ -1413,6 +1416,7 @@ mod tests { content: Content { content_type: "text".into(), text: "hello".into(), + attachments: Vec::new(), }, command: None, request_id: Some("req_fail".into()), @@ -1427,13 +1431,17 @@ mod tests { assert!(!resp.success); assert!(resp.message_id.is_none()); let err = resp.error.expect("error should be set on send failure"); - assert!(err.contains("500"), "error should include status code, got: {}", err); + assert!( + err.contains("500"), + "error should include status code, got: {}", + err + ); } #[tokio::test] async fn handle_reply_empty_message_short_circuits() { - use wiremock::{Mock, MockServer, ResponseTemplate}; use wiremock::matchers::{method, path_regex}; + use wiremock::{Mock, MockServer, ResponseTemplate}; let mock_server = MockServer::start().await; // Mount a mock that would fail the test if called @@ -1459,6 +1467,7 @@ mod tests { content: Content { content_type: "text".into(), text: "".into(), + attachments: Vec::new(), }, command: None, request_id: Some("req_empty".into()), @@ -1467,7 +1476,10 @@ mod tests { adapter.handle_reply(&reply, &event_tx).await; let received = event_rx.try_recv(); - assert!(received.is_ok(), "expected failure GatewayResponse for empty message"); + assert!( + received.is_ok(), + "expected failure GatewayResponse for empty message" + ); let resp: GatewayResponse = serde_json::from_str(&received.unwrap()).unwrap(); assert_eq!(resp.request_id, "req_empty"); assert!(!resp.success); @@ -1476,8 +1488,8 @@ mod tests { #[tokio::test] async fn handle_reply_multi_chunk_failure_includes_error() { - use wiremock::{Mock, MockServer, ResponseTemplate}; use wiremock::matchers::{method, path_regex}; + use wiremock::{Mock, MockServer, ResponseTemplate}; let mock_server = MockServer::start().await; Mock::given(method("POST")) @@ -1502,6 +1514,7 @@ mod tests { content: Content { content_type: "text".into(), text: long_text, + attachments: Vec::new(), }, command: None, request_id: Some("req_multi_fail".into()), @@ -1535,6 +1548,7 @@ mod tests { content: Content { content_type: "text".into(), text: "hello".into(), + attachments: Vec::new(), }, command: None, request_id: Some("req_notoken".into()), @@ -1552,15 +1566,16 @@ mod tests { #[tokio::test] async fn handle_reply_edit_message_does_not_send_response() { - use wiremock::{Mock, MockServer, ResponseTemplate}; use wiremock::matchers::{method, path_regex}; + use wiremock::{Mock, MockServer, ResponseTemplate}; let mock_server = MockServer::start().await; Mock::given(method("PATCH")) .and(path_regex("/spaces/.*/messages/.*")) - .respond_with(ResponseTemplate::new(200).set_body_json( - serde_json::json!({"name": "spaces/SP/messages/msg1"}), - )) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(serde_json::json!({"name": "spaces/SP/messages/msg1"})), + ) .mount(&mock_server) .await; @@ -1579,6 +1594,7 @@ mod tests { content: Content { content_type: "text".into(), text: "updated text".into(), + attachments: Vec::new(), }, command: Some("edit_message".into()), request_id: None, @@ -1592,15 +1608,16 @@ mod tests { #[tokio::test] async fn handle_reply_multi_chunk_sends_gateway_response() { - use wiremock::{Mock, MockServer, ResponseTemplate}; use wiremock::matchers::{method, path_regex}; + use wiremock::{Mock, MockServer, ResponseTemplate}; let mock_server = MockServer::start().await; Mock::given(method("POST")) .and(path_regex("/spaces/.*/messages")) - .respond_with(ResponseTemplate::new(200).set_body_json( - serde_json::json!({"name": "spaces/TEST/messages/first_chunk"}), - )) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(serde_json::json!({"name": "spaces/TEST/messages/first_chunk"})), + ) .mount(&mock_server) .await; @@ -1620,6 +1637,7 @@ mod tests { content: Content { content_type: "text".into(), text: long_text, + attachments: Vec::new(), }, command: None, request_id: Some("req_multi".into()), @@ -1632,7 +1650,10 @@ mod tests { let resp: GatewayResponse = serde_json::from_str(&received.unwrap()).unwrap(); assert_eq!(resp.request_id, "req_multi"); assert!(resp.success); - assert_eq!(resp.message_id, Some("spaces/TEST/messages/first_chunk".into())); + assert_eq!( + resp.message_id, + Some("spaces/TEST/messages/first_chunk".into()) + ); } #[tokio::test] @@ -1640,16 +1661,17 @@ mod tests { // Mixed success/failure: chunk 1 succeeds, subsequent chunks fail. // Expect success=false (any chunk failure marks overall as failed), // but message_id is still set so core has a reference. - use wiremock::{Mock, MockServer, ResponseTemplate}; use wiremock::matchers::{method, path_regex}; + use wiremock::{Mock, MockServer, ResponseTemplate}; let mock_server = MockServer::start().await; // First request: 200 OK with message name Mock::given(method("POST")) .and(path_regex("/spaces/.*/messages")) - .respond_with(ResponseTemplate::new(200).set_body_json( - serde_json::json!({"name": "spaces/TEST/messages/first_chunk"}), - )) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(serde_json::json!({"name": "spaces/TEST/messages/first_chunk"})), + ) .up_to_n_times(1) .mount(&mock_server) .await; @@ -1676,6 +1698,7 @@ mod tests { content: Content { content_type: "text".into(), text: long_text, + attachments: Vec::new(), }, command: None, request_id: Some("req_partial".into()), @@ -1688,7 +1711,10 @@ mod tests { let resp: GatewayResponse = serde_json::from_str(&received.unwrap()).unwrap(); assert_eq!(resp.request_id, "req_partial"); assert!(!resp.success, "partial failure must report success=false"); - assert_eq!(resp.message_id, Some("spaces/TEST/messages/first_chunk".into())); + assert_eq!( + resp.message_id, + Some("spaces/TEST/messages/first_chunk".into()) + ); let err = resp.error.expect("partial failure should set error"); assert!(err.contains("500")); } diff --git a/gateway/src/adapters/line.rs b/gateway/src/adapters/line.rs index 3bfc36d0..89f3e3f6 100644 --- a/gateway/src/adapters/line.rs +++ b/gateway/src/adapters/line.rs @@ -1,3 +1,4 @@ +use crate::media::{resize_and_compress, IMAGE_MAX_DOWNLOAD}; use crate::schema::*; use axum::extract::State; use serde::Deserialize; @@ -90,45 +91,54 @@ pub async fn webhook( let Some(ref msg) = event.message else { continue; }; - if msg.message_type != "text" { + let is_text = msg.message_type == "text"; + let is_audio = msg.message_type == "audio"; + + if !is_text && !is_image && !is_audio { continue; } - let Some(ref text) = msg.text else { - continue; - }; - if text.trim().is_empty() { + + let mut text = msg.text.clone().unwrap_or_default(); + if is_text && text.trim().is_empty() { continue; } + let mut attachments = Vec::new(); + if is_image || is_audio { + if let Some(ref access_token) = state.line_access_token { + let client = reqwest::Client::new(); + let att_type = if is_image { "image" } else { "audio" }; + if let Some(att) = download_line_media(&client, access_token, &msg.id, att_type).await { + attachments.push(att); + } + } else { + warn!("LINE media received but LINE_CHANNEL_ACCESS_TOKEN not set"); + } + } + let source = event.source.as_ref(); let (channel_id, channel_type) = match source { - Some(s) if s.source_type == "group" => { - match s.group_id.as_deref() { - Some(id) if !id.is_empty() => (id.to_string(), "group".to_string()), - _ => { - warn!("LINE group event missing groupId, skipping"); - continue; - } + Some(s) if s.source_type == "group" => match s.group_id.as_deref() { + Some(id) if !id.is_empty() => (id.to_string(), "group".to_string()), + _ => { + warn!("LINE group event missing groupId, skipping"); + continue; } - } - Some(s) if s.source_type == "room" => { - match s.room_id.as_deref() { - Some(id) if !id.is_empty() => (id.to_string(), "room".to_string()), - _ => { - warn!("LINE room event missing roomId, skipping"); - continue; - } + }, + Some(s) if s.source_type == "room" => match s.room_id.as_deref() { + Some(id) if !id.is_empty() => (id.to_string(), "room".to_string()), + _ => { + warn!("LINE room event missing roomId, skipping"); + continue; } - } - Some(s) => { - match s.user_id.as_deref() { - Some(id) if !id.is_empty() => (id.to_string(), "user".to_string()), - _ => { - warn!("LINE user event missing userId, skipping"); - continue; - } + }, + Some(s) => match s.user_id.as_deref() { + Some(id) if !id.is_empty() => (id.to_string(), "user".to_string()), + _ => { + warn!("LINE user event missing userId, skipping"); + continue; } - } + }, None => { warn!("LINE event missing source, skipping"); continue; @@ -138,7 +148,7 @@ pub async fn webhook( .and_then(|s| s.user_id.as_deref()) .unwrap_or("unknown"); - let gateway_event = GatewayEvent::new( + let mut gateway_event = GatewayEvent::new( "line", ChannelInfo { id: channel_id.clone(), @@ -151,10 +161,11 @@ pub async fn webhook( display_name: user_id.into(), is_bot: false, }, - text, + &text, &msg.id, vec![], ); + gateway_event.content.attachments = attachments; // Cache the reply token for hybrid Reply/Push dispatch if let Some(ref reply_token) = event.reply_token { @@ -266,3 +277,80 @@ pub async fn dispatch_line_reply( used_reply } + +/// Download media content from LINE Messaging API. +async fn download_line_media( + client: &reqwest::Client, + access_token: &str, + message_id: &str, + attachment_type: &str, +) -> Option { + let url = format!( + "https://api-data.line.me/v2/bot/message/{}/content", + message_id + ); + let resp = client + .get(url) + .bearer_auth(access_token) + .send() + .await + .ok()?; + + if !resp.status().is_success() { + error!(status = %resp.status(), "LINE media download failed"); + return None; + } + + let max_size = if attachment_type == "image" { + IMAGE_MAX_DOWNLOAD + } else { + AUDIO_MAX_DOWNLOAD + }; + + if let Some(cl) = resp.headers().get(reqwest::header::CONTENT_LENGTH) { + if let Ok(size) = cl.to_str().unwrap_or("0").parse::() { + if size > max_size { + warn!(message_id, size, "LINE {} Content-Length exceeds limit", attachment_type); + return None; + } + } + } + + let bytes = resp.bytes().await.ok()?; + if bytes.len() as u64 > max_size { + warn!(message_id, size = bytes.len(), "LINE {} exceeds limit", attachment_type); + return None; + } + + let (data_bytes, mime, filename) = if attachment_type == "image" { + match resize_and_compress(&bytes) { + Ok((c, m)) => (c, m, format!("{}.jpg", message_id)), + Err(e) => { + error!(err = %e, "LINE image processing failed"); + return None; + } + } + } else { + // For audio, we don't process, just send as is. + // LINE audio is usually m4a. + let mime = resp + .headers() + .get(reqwest::header::CONTENT_TYPE) + .and_then(|h| h.to_str().ok()) + .unwrap_or("audio/x-m4a") + .to_string(); + let ext = if mime.contains("mp3") { "mp3" } else { "m4a" }; + (bytes.to_vec(), mime, format!("{}.{}", message_id, ext)) + }; + + use base64::Engine; + let b64_data = base64::engine::general_purpose::STANDARD.encode(&data_bytes); + + Some(Attachment { + attachment_type: attachment_type.into(), + filename, + mime_type: mime, + data: b64_data, + size: data_bytes.len() as u64, + }) +} diff --git a/gateway/src/adapters/teams.rs b/gateway/src/adapters/teams.rs index d7b5433e..fdbcae67 100644 --- a/gateway/src/adapters/teams.rs +++ b/gateway/src/adapters/teams.rs @@ -275,7 +275,9 @@ impl TeamsAdapter { } // B2: Validate channel endorsements — key must endorse the activity's channelId - let channel_id = activity.channel_id.as_deref() + let channel_id = activity + .channel_id + .as_deref() .ok_or_else(|| anyhow::anyhow!("activity missing channelId"))?; if key.endorsements.is_empty() { anyhow::bail!("JWK has no endorsements — cannot verify channelId={channel_id}"); @@ -301,9 +303,13 @@ impl TeamsAdapter { let token_data = decode::(token, &decoding_key, &validation)?; // B1: Validate serviceUrl claim matches activity's serviceUrl - let activity_service_url = activity.service_url.as_deref() + let activity_service_url = activity + .service_url + .as_deref() .ok_or_else(|| anyhow::anyhow!("activity missing serviceUrl"))?; - let token_service_url = token_data.claims.get("serviceurl") + let token_service_url = token_data + .claims + .get("serviceurl") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("JWT missing serviceurl claim"))?; if token_service_url != activity_service_url { @@ -743,7 +749,9 @@ mod tests { async fn jwt_rejects_garbage_token() { let adapter = TeamsAdapter::new(make_config(vec![])); let activity = make_activity_with_tenant(Some("t1")); - let result = adapter.validate_jwt("Bearer not.a.valid.jwt", &activity).await; + let result = adapter + .validate_jwt("Bearer not.a.valid.jwt", &activity) + .await; assert!(result.is_err()); } diff --git a/gateway/src/adapters/telegram.rs b/gateway/src/adapters/telegram.rs index 6ae01624..900d5c1b 100644 --- a/gateway/src/adapters/telegram.rs +++ b/gateway/src/adapters/telegram.rs @@ -1,3 +1,4 @@ +use crate::media::{resize_and_compress, FILE_MAX_DOWNLOAD, IMAGE_MAX_DOWNLOAD}; use crate::schema::*; use axum::extract::State; use axum::Json; @@ -25,8 +26,25 @@ struct TelegramMessage { chat: TelegramChat, from: Option, text: Option, + caption: Option, #[serde(default)] entities: Vec, + #[serde(default)] + photo: Vec, + document: Option, +} + +#[derive(Debug, Deserialize)] +struct TelegramPhoto { + file_id: String, + width: u32, + height: u32, +} + +#[derive(Debug, Deserialize)] +struct TelegramDocument { + file_id: String, + file_name: Option, } #[derive(Debug, Deserialize)] @@ -75,13 +93,46 @@ pub async fn webhook( let Some(msg) = update.message else { return axum::http::StatusCode::OK; }; - let Some(text) = msg.text.as_deref() else { - return axum::http::StatusCode::OK; - }; - if text.trim().is_empty() { + let is_voice = msg.voice.is_some(); + let is_audio = msg.audio.is_some(); + let text = msg.text.as_deref().or(msg.caption.as_deref()).unwrap_or(""); + + if text.trim().is_empty() && !is_photo && !is_document && !is_voice && !is_audio { return axum::http::StatusCode::OK; } + let mut attachments = Vec::new(); + if is_photo || is_document || is_voice || is_audio { + if let Some(ref token) = state.telegram_bot_token { + let client = reqwest::Client::new(); + if is_photo { + // Take the largest photo + if let Some(largest) = msg.photo.iter().max_by_key(|p| p.width * p.height) { + if let Some(att) = + download_telegram_media(&client, token, &largest.file_id, "image").await + { + attachments.push(att); + } + } + } else if let Some(doc) = msg.document { + let file_name = doc.file_name.unwrap_or_else(|| "unknown.txt".to_string()); + if let Some(att) = + download_telegram_document(&client, token, &doc.file_id, &file_name).await + { + attachments.push(att); + } + } else if let Some(voice) = msg.voice { + if let Some(att) = download_telegram_media(&client, token, &voice.file_id, "audio").await { + attachments.push(att); + } + } else if let Some(audio) = msg.audio { + if let Some(att) = download_telegram_media(&client, token, &audio.file_id, "audio").await { + attachments.push(att); + } + } + } + } + let from = msg.from.as_ref(); let sender_name = from .and_then(|u| u.username.as_deref()) @@ -107,7 +158,7 @@ pub async fn webhook( }) .collect(); - let event = GatewayEvent::new( + let mut event = GatewayEvent::new( "telegram", ChannelInfo { id: msg.chat.id.to_string(), @@ -124,6 +175,7 @@ pub async fn webhook( &msg.message_id.to_string(), mentions, ); + event.content.attachments = attachments; let json = serde_json::to_string(&event).unwrap(); info!(chat_id = %msg.chat.id, sender = %sender_name, "telegram → gateway"); @@ -262,3 +314,161 @@ pub async fn handle_reply( .await .map_err(|e| error!("telegram send error: {e}")); } + +/// Download photo from Telegram via getFile + download URL. +async fn download_telegram_media( + client: &reqwest::Client, + bot_token: &str, + file_id: &str, + attachment_type: &str, +) -> Option { + // 1. Get file path + let get_file_url = format!("{TELEGRAM_API_BASE}/bot{}/getFile", bot_token); + let resp = client + .get(get_file_url) + .query(&[("file_id", file_id)]) + .send() + .await + .ok()?; + + let body: serde_json::Value = resp.json().await.ok()?; + let file_path = body["result"]["file_path"].as_str()?; + + // 2. Download file + let download_url = format!("{TELEGRAM_API_BASE}/file/bot{}/{}", bot_token, file_path); + let resp = client.get(download_url).send().await.ok()?; + if !resp.status().is_success() { + return None; + } + + let max_size = if attachment_type == "image" { + IMAGE_MAX_DOWNLOAD + } else { + AUDIO_MAX_DOWNLOAD + }; + + if let Some(cl) = resp.headers().get(reqwest::header::CONTENT_LENGTH) { + if let Ok(size) = cl.to_str().unwrap_or("0").parse::() { + if size > max_size { + warn!(file_id, size, "Telegram {} Content-Length exceeds limit", attachment_type); + return None; + } + } + } + + let bytes = resp.bytes().await.ok()?; + if bytes.len() as u64 > max_size { + warn!(file_id, size = bytes.len(), "Telegram {} exceeds limit", attachment_type); + return None; + } + + let (data_bytes, mime, filename) = if attachment_type == "image" { + match resize_and_compress(&bytes) { + Ok((c, m)) => (c, m, format!("{}.jpg", file_id)), + Err(e) => { + error!(err = %e, "Telegram image processing failed"); + return None; + } + } + } else { + // For audio/voice, we don't process. + let mime = resp + .headers() + .get(reqwest::header::CONTENT_TYPE) + .and_then(|h| h.to_str().ok()) + .unwrap_or("audio/ogg") // Default for Telegram voice + .to_string(); + let ext = if mime.contains("mpeg") || mime.contains("mp3") { + "mp3" + } else if mime.contains("m4a") { + "m4a" + } else { + "ogg" + }; + (bytes.to_vec(), mime, format!("{}.{}", file_id, ext)) + }; + + use base64::Engine; + let b64_data = base64::engine::general_purpose::STANDARD.encode(&data_bytes); + + Some(Attachment { + attachment_type: attachment_type.into(), + filename, + mime_type: mime, + data: b64_data, + size: data_bytes.len() as u64, + }) +} + +/// Download document from Telegram via getFile + download URL (text files only). +async fn download_telegram_document( + client: &reqwest::Client, + bot_token: &str, + file_id: &str, + file_name: &str, +) -> Option { + // Only download text-like files + let ext = file_name.rsplit('.').next().unwrap_or("").to_lowercase(); + const TEXT_EXTS: &[&str] = &[ + "txt", "csv", "log", "md", "json", "jsonl", "yaml", "yml", "toml", "xml", "rs", "py", "js", + "ts", "jsx", "tsx", "go", "java", "c", "cpp", "h", "hpp", "rb", "sh", "bash", "sql", + "html", "css", "ini", "cfg", "conf", "env", + ]; + if !TEXT_EXTS.contains(&ext.as_str()) { + tracing::debug!(file_name, "skipping non-text file attachment"); + return None; + } + + // 1. Get file path + let get_file_url = format!("{TELEGRAM_API_BASE}/bot{}/getFile", bot_token); + let resp = client + .get(get_file_url) + .query(&[("file_id", file_id)]) + .send() + .await + .ok()?; + + let body: serde_json::Value = resp.json().await.ok()?; + let file_path = body["result"]["file_path"].as_str()?; + + // 2. Download file + let download_url = format!("{TELEGRAM_API_BASE}/file/bot{}/{}", bot_token, file_path); + let resp = client.get(download_url).send().await.ok()?; + if !resp.status().is_success() { + return None; + } + + if let Some(cl) = resp.headers().get(reqwest::header::CONTENT_LENGTH) { + if let Ok(size) = cl.to_str().unwrap_or("0").parse::() { + if size > FILE_MAX_DOWNLOAD { + warn!( + file_id, + size, "Telegram document Content-Length exceeds limit" + ); + return None; + } + } + } + + let bytes = resp.bytes().await.ok()?; + if bytes.len() as u64 > FILE_MAX_DOWNLOAD { + warn!( + file_id, + size = bytes.len(), + "Telegram document exceeds limit" + ); + return None; + } + + let text = String::from_utf8_lossy(&bytes); + use base64::Engine; + let data = base64::engine::general_purpose::STANDARD.encode(text.as_bytes()); + + Some(Attachment { + attachment_type: "text_file".into(), + filename: file_name.to_string(), + mime_type: "text/plain".into(), + data, + size: bytes.len() as u64, + }) +} diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 3df4ab1a..3113c061 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -1,4 +1,5 @@ mod adapters; +mod media; mod schema; use anyhow::Result; @@ -159,7 +160,12 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: } "feishu" => { if let Some(ref feishu) = state_for_recv.feishu { - adapters::feishu::handle_reply(&reply, feishu, &state_for_recv.event_tx).await; + adapters::feishu::handle_reply( + &reply, + feishu, + &state_for_recv.event_tx, + ) + .await; } else { warn!("reply for feishu but adapter not configured"); } @@ -306,10 +312,16 @@ async fn main() -> Result<()> { warn!("GOOGLE_CHAT_ACCESS_TOKEN / GOOGLE_CHAT_SA_KEY_JSON not set — replies will be logged but not sent"); } if jwt_verifier.is_none() { - warn!("GOOGLE_CHAT_AUDIENCE not set — webhook requests are NOT authenticated (insecure)"); + warn!( + "GOOGLE_CHAT_AUDIENCE not set — webhook requests are NOT authenticated (insecure)" + ); } - Some(adapters::googlechat::GoogleChatAdapter::new(token_cache, access_token, jwt_verifier)) + Some(adapters::googlechat::GoogleChatAdapter::new( + token_cache, + access_token, + jwt_verifier, + )) } else { None }; @@ -390,7 +402,13 @@ async fn main() -> Result<()> { let (feishu_shutdown_tx, feishu_shutdown_rx) = tokio::sync::watch::channel(false); if feishu_ws_mode { if let Some(ref feishu) = state.feishu { - match adapters::feishu::start_websocket(feishu, state.event_tx.clone(), feishu_shutdown_rx).await { + match adapters::feishu::start_websocket( + feishu, + state.event_tx.clone(), + feishu_shutdown_rx, + ) + .await + { Ok(_handle) => info!("feishu websocket task spawned"), Err(e) => tracing::error!(err = %e, "feishu websocket startup failed"), } diff --git a/gateway/src/media.rs b/gateway/src/media.rs new file mode 100644 index 00000000..af3cd345 --- /dev/null +++ b/gateway/src/media.rs @@ -0,0 +1,33 @@ +use image::ImageReader; +use std::io::Cursor; + +pub const IMAGE_MAX_DIMENSION_PX: u32 = 1200; +pub const IMAGE_JPEG_QUALITY: u8 = 75; +pub const IMAGE_MAX_DOWNLOAD: u64 = 10 * 1024 * 1024; // 10 MB +pub const FILE_MAX_DOWNLOAD: u64 = 512 * 1024; // 512 KB +pub const AUDIO_MAX_DOWNLOAD: u64 = 20 * 1024 * 1024; // 20 MB + +/// Resize image so longest side <= 1200px, then encode as JPEG. +/// GIFs are passed through unchanged to preserve animation. +pub fn resize_and_compress(raw: &[u8]) -> Result<(Vec, String), image::ImageError> { + let reader = ImageReader::new(Cursor::new(raw)).with_guessed_format()?; + let format = reader.format(); + if format == Some(image::ImageFormat::Gif) { + return Ok((raw.to_vec(), "image/gif".to_string())); + } + let img = reader.decode()?; + let (w, h) = (img.width(), img.height()); + let img = if w > IMAGE_MAX_DIMENSION_PX || h > IMAGE_MAX_DIMENSION_PX { + let max_side = std::cmp::max(w, h); + let ratio = f64::from(IMAGE_MAX_DIMENSION_PX) / f64::from(max_side); + let new_w = (f64::from(w) * ratio) as u32; + let new_h = (f64::from(h) * ratio) as u32; + img.resize(new_w, new_h, image::imageops::FilterType::Lanczos3) + } else { + img + }; + let mut buf = Cursor::new(Vec::new()); + let encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf, IMAGE_JPEG_QUALITY); + img.write_with_encoder(encoder)?; + Ok((buf.into_inner(), "image/jpeg".to_string())) +} diff --git a/src/gateway.rs b/src/gateway.rs index 8aed6aab..229a5bdd 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -487,6 +487,7 @@ pub struct GatewayParams { pub allow_all_users: bool, pub allowed_users: Vec, pub streaming: bool, + pub stt: crate::config::SttConfig, } pub async fn run_gateway_adapter( @@ -502,9 +503,9 @@ pub async fn run_gateway_adapter( let bot_username = params.bot_username; let allow_all_channels = params.allow_all_channels; let allowed_channels = params.allowed_channels; - let allow_all_users = params.allow_all_users; let allowed_users = params.allowed_users; let streaming = params.streaming; + let stt = params.stt; let connect_url = match ¶ms.token { Some(token) => { From 2d251abaa68f9a8be4d42a5a62e4b43c68e6b89d Mon Sep 17 00:00:00 2001 From: iamninihuang Date: Wed, 6 May 2026 20:33:39 +0800 Subject: [PATCH 2/6] feat(audio): implement inbound audio/voice support with STT for LINE and Telegram --- src/gateway.rs | 20 ++++++++++++++++++++ src/main.rs | 1 + 2 files changed, 21 insertions(+) diff --git a/src/gateway.rs b/src/gateway.rs index 229a5bdd..8b4de409 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -503,6 +503,7 @@ pub async fn run_gateway_adapter( let bot_username = params.bot_username; let allow_all_channels = params.allow_all_channels; let allowed_channels = params.allowed_channels; + let allow_all_users = params.allow_all_users; let allowed_users = params.allowed_users; let streaming = params.streaming; let stt = params.stt; @@ -672,6 +673,25 @@ pub async fn run_gateway_adapter( }); } } + "audio" => { + if stt.enabled { + use base64::Engine; + if let Ok(bytes) = base64::engine::general_purpose::STANDARD.decode(&att.data) { + let client = reqwest::Client::new(); + if let Some(text) = crate::stt::transcribe( + &client, + &stt, + bytes, + att.filename.clone(), + &att.mime_type + ).await { + extra_blocks.push(ContentBlock::Text { + text: format!("[Audio: {}]", text), + }); + } + } + } + } _ => {} } } diff --git a/src/main.rs b/src/main.rs index 04a0937f..b512cbe0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -269,6 +269,7 @@ async fn main() -> anyhow::Result<()> { allow_all_users: config::resolve_allow_all(gw_cfg.allow_all_users, &gw_cfg.allowed_users), allowed_users: gw_cfg.allowed_users, streaming: gw_cfg.streaming, + stt: cfg.stt.clone(), }; let gw_router = router.clone(); Some(tokio::spawn(async move { From 5d76f5b2dd39f2a4035c71b50ae3b94d4a5c2481 Mon Sep 17 00:00:00 2001 From: iamninihuang Date: Wed, 6 May 2026 20:36:00 +0800 Subject: [PATCH 3/6] docs: document Audio/Voice support for LINE and Telegram --- docs/line.md | 1 + docs/telegram.md | 1 + 2 files changed, 2 insertions(+) diff --git a/docs/line.md b/docs/line.md index 810264ac..2618cb0d 100644 --- a/docs/line.md +++ b/docs/line.md @@ -84,6 +84,7 @@ In the LINE Developers Console → **Messaging API** tab → scan the QR code wi - **1:1 chat** — send a message to the bot, get an AI agent response - **Group chat** — add the bot to a group, it responds to all messages - **Images** — send image messages to the bot (automatically compressed and resized) +- **Audio** — send audio messages (e.g. voice notes). They are automatically transcribed (if STT is enabled in Core) and passed to the agent as text. - **Webhook signature validation** — HMAC-SHA256 via `LINE_CHANNEL_SECRET` ### Not Supported (LINE API limitations) diff --git a/docs/telegram.md b/docs/telegram.md index 590589cd..70a4f19a 100644 --- a/docs/telegram.md +++ b/docs/telegram.md @@ -172,6 +172,7 @@ DMs and replies within forum topics always trigger the agent (no @mention needed - **Images** — send photos (compressed/resized automatically). - **Documents** — send text-based files (e.g. `.txt`, `.csv`, `.rs`, `.py`) up to 512KB. They are passed directly to the agent as text. +- **Audio/Voice** — send voice notes or audio files. They are automatically transcribed (if STT is enabled in Core) and passed to the agent as text. ### Emoji reactions From cf2279659124ab3d80eea3fad2c4c0b4c36b318b Mon Sep 17 00:00:00 2001 From: iamninihuang Date: Wed, 6 May 2026 21:09:55 +0800 Subject: [PATCH 4/6] fix: resolve clippy warnings and missing fields in gateway adapters --- gateway/src/adapters/line.rs | 23 +++++++++--------- gateway/src/adapters/telegram.rs | 41 +++++++++++++++++++++++--------- src/gateway.rs | 30 +++++++++++------------ 3 files changed, 56 insertions(+), 38 deletions(-) diff --git a/gateway/src/adapters/line.rs b/gateway/src/adapters/line.rs index 89f3e3f6..b3c9cd80 100644 --- a/gateway/src/adapters/line.rs +++ b/gateway/src/adapters/line.rs @@ -1,4 +1,4 @@ -use crate::media::{resize_and_compress, IMAGE_MAX_DOWNLOAD}; +use crate::media::{resize_and_compress, AUDIO_MAX_DOWNLOAD, IMAGE_MAX_DOWNLOAD}; use crate::schema::*; use axum::extract::State; use serde::Deserialize; @@ -92,13 +92,14 @@ pub async fn webhook( continue; }; let is_text = msg.message_type == "text"; + let is_image = msg.message_type == "image"; let is_audio = msg.message_type == "audio"; if !is_text && !is_image && !is_audio { continue; } - let mut text = msg.text.clone().unwrap_or_default(); + let text = msg.text.clone().unwrap_or_default(); if is_text && text.trim().is_empty() { continue; } @@ -316,6 +317,13 @@ async fn download_line_media( } } + let content_type = resp + .headers() + .get(reqwest::header::CONTENT_TYPE) + .and_then(|h| h.to_str().ok()) + .unwrap_or(if attachment_type == "image" { "image/jpeg" } else { "audio/x-m4a" }) + .to_string(); + let bytes = resp.bytes().await.ok()?; if bytes.len() as u64 > max_size { warn!(message_id, size = bytes.len(), "LINE {} exceeds limit", attachment_type); @@ -324,7 +332,7 @@ async fn download_line_media( let (data_bytes, mime, filename) = if attachment_type == "image" { match resize_and_compress(&bytes) { - Ok((c, m)) => (c, m, format!("{}.jpg", message_id)), + Ok((c, _m)) => (c, content_type, format!("{}.jpg", message_id)), Err(e) => { error!(err = %e, "LINE image processing failed"); return None; @@ -333,14 +341,7 @@ async fn download_line_media( } else { // For audio, we don't process, just send as is. // LINE audio is usually m4a. - let mime = resp - .headers() - .get(reqwest::header::CONTENT_TYPE) - .and_then(|h| h.to_str().ok()) - .unwrap_or("audio/x-m4a") - .to_string(); - let ext = if mime.contains("mp3") { "mp3" } else { "m4a" }; - (bytes.to_vec(), mime, format!("{}.{}", message_id, ext)) + (bytes.to_vec(), content_type, format!("{}.m4a", message_id)) }; use base64::Engine; diff --git a/gateway/src/adapters/telegram.rs b/gateway/src/adapters/telegram.rs index 900d5c1b..5d2a1d1b 100644 --- a/gateway/src/adapters/telegram.rs +++ b/gateway/src/adapters/telegram.rs @@ -1,4 +1,4 @@ -use crate::media::{resize_and_compress, FILE_MAX_DOWNLOAD, IMAGE_MAX_DOWNLOAD}; +use crate::media::{resize_and_compress, AUDIO_MAX_DOWNLOAD, FILE_MAX_DOWNLOAD, IMAGE_MAX_DOWNLOAD}; use crate::schema::*; use axum::extract::State; use axum::Json; @@ -32,6 +32,8 @@ struct TelegramMessage { #[serde(default)] photo: Vec, document: Option, + voice: Option, + audio: Option, } #[derive(Debug, Deserialize)] @@ -45,6 +47,20 @@ struct TelegramPhoto { struct TelegramDocument { file_id: String, file_name: Option, + mime_type: Option, +} + +#[derive(Debug, Deserialize)] +struct TelegramVoice { + file_id: String, + mime_type: Option, +} + +#[derive(Debug, Deserialize)] +struct TelegramAudio { + file_id: String, + file_name: Option, + mime_type: Option, } #[derive(Debug, Deserialize)] @@ -93,6 +109,8 @@ pub async fn webhook( let Some(msg) = update.message else { return axum::http::StatusCode::OK; }; + let is_photo = !msg.photo.is_empty(); + let is_document = msg.document.is_some(); let is_voice = msg.voice.is_some(); let is_audio = msg.audio.is_some(); let text = msg.text.as_deref().or(msg.caption.as_deref()).unwrap_or(""); @@ -356,6 +374,13 @@ async fn download_telegram_media( } } + let content_type = resp + .headers() + .get(reqwest::header::CONTENT_TYPE) + .and_then(|h| h.to_str().ok()) + .unwrap_or(if attachment_type == "image" { "image/jpeg" } else { "audio/ogg" }) + .to_string(); + let bytes = resp.bytes().await.ok()?; if bytes.len() as u64 > max_size { warn!(file_id, size = bytes.len(), "Telegram {} exceeds limit", attachment_type); @@ -364,7 +389,7 @@ async fn download_telegram_media( let (data_bytes, mime, filename) = if attachment_type == "image" { match resize_and_compress(&bytes) { - Ok((c, m)) => (c, m, format!("{}.jpg", file_id)), + Ok((c, _m)) => (c, content_type, format!("{}.jpg", file_id)), Err(e) => { error!(err = %e, "Telegram image processing failed"); return None; @@ -372,20 +397,14 @@ async fn download_telegram_media( } } else { // For audio/voice, we don't process. - let mime = resp - .headers() - .get(reqwest::header::CONTENT_TYPE) - .and_then(|h| h.to_str().ok()) - .unwrap_or("audio/ogg") // Default for Telegram voice - .to_string(); - let ext = if mime.contains("mpeg") || mime.contains("mp3") { + let ext = if content_type.contains("mpeg") || content_type.contains("mp3") { "mp3" - } else if mime.contains("m4a") { + } else if content_type.contains("m4a") { "m4a" } else { "ogg" }; - (bytes.to_vec(), mime, format!("{}.{}", file_id, ext)) + (bytes.to_vec(), content_type, format!("{}.{}", file_id, ext)) }; use base64::Engine; diff --git a/src/gateway.rs b/src/gateway.rs index 8b4de409..5cc50d22 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -673,22 +673,20 @@ pub async fn run_gateway_adapter( }); } } - "audio" => { - if stt.enabled { - use base64::Engine; - if let Ok(bytes) = base64::engine::general_purpose::STANDARD.decode(&att.data) { - let client = reqwest::Client::new(); - if let Some(text) = crate::stt::transcribe( - &client, - &stt, - bytes, - att.filename.clone(), - &att.mime_type - ).await { - extra_blocks.push(ContentBlock::Text { - text: format!("[Audio: {}]", text), - }); - } + "audio" if stt.enabled => { + use base64::Engine; + if let Ok(bytes) = base64::engine::general_purpose::STANDARD.decode(&att.data) { + let client = reqwest::Client::new(); + if let Some(text) = crate::stt::transcribe( + &client, + &stt, + bytes, + att.filename.clone(), + &att.mime_type + ).await { + extra_blocks.push(ContentBlock::Text { + text: format!("[Audio: {}]", text), + }); } } } From e5dd849e5e4094c7e81197a3816c6e53d7b6e9c1 Mon Sep 17 00:00:00 2001 From: iamninihuang Date: Wed, 6 May 2026 23:17:56 +0800 Subject: [PATCH 5/6] feat(gateway): address review comments for multimodal inbound --- gateway/src/adapters/line.rs | 7 ++++--- gateway/src/adapters/telegram.rs | 12 +++++++----- gateway/src/main.rs | 8 ++++++++ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/gateway/src/adapters/line.rs b/gateway/src/adapters/line.rs index b3c9cd80..f4268c3f 100644 --- a/gateway/src/adapters/line.rs +++ b/gateway/src/adapters/line.rs @@ -107,9 +107,9 @@ pub async fn webhook( let mut attachments = Vec::new(); if is_image || is_audio { if let Some(ref access_token) = state.line_access_token { - let client = reqwest::Client::new(); + let client = &state.client; let att_type = if is_image { "image" } else { "audio" }; - if let Some(att) = download_line_media(&client, access_token, &msg.id, att_type).await { + if let Some(att) = download_line_media(client, access_token, &msg.id, att_type).await { attachments.push(att); } } else { @@ -321,7 +321,7 @@ async fn download_line_media( .headers() .get(reqwest::header::CONTENT_TYPE) .and_then(|h| h.to_str().ok()) - .unwrap_or(if attachment_type == "image" { "image/jpeg" } else { "audio/x-m4a" }) + .unwrap_or(if attachment_type == "image" { "image/jpeg" } else { "audio/mp4" }) .to_string(); let bytes = resp.bytes().await.ok()?; @@ -346,6 +346,7 @@ async fn download_line_media( use base64::Engine; let b64_data = base64::engine::general_purpose::STANDARD.encode(&data_bytes); + info!(message_id, size = data_bytes.len(), "LINE {} download successful", attachment_type); Some(Attachment { attachment_type: attachment_type.into(), diff --git a/gateway/src/adapters/telegram.rs b/gateway/src/adapters/telegram.rs index 5d2a1d1b..19daf12a 100644 --- a/gateway/src/adapters/telegram.rs +++ b/gateway/src/adapters/telegram.rs @@ -122,12 +122,12 @@ pub async fn webhook( let mut attachments = Vec::new(); if is_photo || is_document || is_voice || is_audio { if let Some(ref token) = state.telegram_bot_token { - let client = reqwest::Client::new(); + let client = &state.client; if is_photo { // Take the largest photo if let Some(largest) = msg.photo.iter().max_by_key(|p| p.width * p.height) { if let Some(att) = - download_telegram_media(&client, token, &largest.file_id, "image").await + download_telegram_media(client, token, &largest.file_id, "image").await { attachments.push(att); } @@ -135,16 +135,16 @@ pub async fn webhook( } else if let Some(doc) = msg.document { let file_name = doc.file_name.unwrap_or_else(|| "unknown.txt".to_string()); if let Some(att) = - download_telegram_document(&client, token, &doc.file_id, &file_name).await + download_telegram_document(client, token, &doc.file_id, &file_name).await { attachments.push(att); } } else if let Some(voice) = msg.voice { - if let Some(att) = download_telegram_media(&client, token, &voice.file_id, "audio").await { + if let Some(att) = download_telegram_media(client, token, &voice.file_id, "audio").await { attachments.push(att); } } else if let Some(audio) = msg.audio { - if let Some(att) = download_telegram_media(&client, token, &audio.file_id, "audio").await { + if let Some(att) = download_telegram_media(client, token, &audio.file_id, "audio").await { attachments.push(att); } } @@ -409,6 +409,7 @@ async fn download_telegram_media( use base64::Engine; let b64_data = base64::engine::general_purpose::STANDARD.encode(&data_bytes); + info!(file_id, size = data_bytes.len(), "Telegram {} download successful", attachment_type); Some(Attachment { attachment_type: attachment_type.into(), @@ -482,6 +483,7 @@ async fn download_telegram_document( let text = String::from_utf8_lossy(&bytes); use base64::Engine; let data = base64::engine::general_purpose::STANDARD.encode(text.as_bytes()); + info!(file_id, file_name, size = bytes.len(), "Telegram document download successful"); Some(Attachment { attachment_type: "text_file".into(), diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 3113c061..bbbfa0c8 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -60,8 +60,11 @@ pub struct AppState { /// the first client to `remove()` a token wins the free Reply API call; /// other clients for the same event naturally fall back to Push API. pub reply_token_cache: ReplyTokenCache, + /// Shared HTTP client for media downloads and API calls + pub client: reqwest::Client, } + // --- WebSocket handler (OAB connects here) --- async fn ws_handler( @@ -335,6 +338,10 @@ async fn main() -> Result<()> { warn!("no adapters configured — set TELEGRAM_BOT_TOKEN, LINE_CHANNEL_ACCESS_TOKEN, TEAMS_APP_ID + TEAMS_APP_SECRET, FEISHU_APP_ID + FEISHU_APP_SECRET, and/or GOOGLE_CHAT_ENABLED=true"); } + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build()?; + let state = Arc::new(AppState { telegram_bot_token, telegram_secret_token, @@ -347,6 +354,7 @@ async fn main() -> Result<()> { ws_token, event_tx, reply_token_cache, + client, }); // Background task: sweep expired reply tokens every REPLY_TOKEN_TTL_SECS From c25349045a8e067805ec61f5f49864f8fc0613fe Mon Sep 17 00:00:00 2001 From: iamninihuang Date: Thu, 7 May 2026 13:15:10 +0800 Subject: [PATCH 6/6] chore: address PR 757 reviewer feedback - Shared reqwest::Client from AppState in gateway main loop. - Refactored Feishu adapter to use shared media utility module. - Cleaned up formatting noise in feishu, googlechat, and teams adapters. - Fixed Google Chat test fixtures for schema changes. --- gateway/src/adapters/feishu.rs | 339 +++++++---------------------- gateway/src/adapters/googlechat.rs | 104 ++++----- gateway/src/adapters/teams.rs | 16 +- gateway/src/main.rs | 2 +- 4 files changed, 122 insertions(+), 339 deletions(-) diff --git a/gateway/src/adapters/feishu.rs b/gateway/src/adapters/feishu.rs index 27b306d3..fa3b8406 100644 --- a/gateway/src/adapters/feishu.rs +++ b/gateway/src/adapters/feishu.rs @@ -1,3 +1,4 @@ +use crate::media::{resize_and_compress, FILE_MAX_DOWNLOAD, IMAGE_MAX_DOWNLOAD}; use crate::schema::*; use axum::extract::State; use prost::Message as ProstMessage; @@ -106,8 +107,8 @@ impl FeishuConfig { "webhook" => ConnectionMode::Webhook, _ => ConnectionMode::Websocket, }; - let webhook_path = - std::env::var("FEISHU_WEBHOOK_PATH").unwrap_or_else(|_| "/webhook/feishu".into()); + let webhook_path = std::env::var("FEISHU_WEBHOOK_PATH") + .unwrap_or_else(|_| "/webhook/feishu".into()); let verification_token = std::env::var("FEISHU_VERIFICATION_TOKEN").ok(); let encrypt_key = std::env::var("FEISHU_ENCRYPT_KEY").ok(); let allowed_groups = parse_csv("FEISHU_ALLOWED_GROUPS"); @@ -308,9 +309,7 @@ mod event_types { return None; } - let content_json: serde_json::Value = msg - .content - .as_deref() + let content_json: serde_json::Value = msg.content.as_deref() .and_then(|s| serde_json::from_str(s).ok())?; let message_id = msg.message_id.as_deref()?; @@ -319,8 +318,9 @@ mod event_types { let (clean_text, mention_ids, media_refs) = match msg_type { "image" => { let image_key = content_json.get("image_key")?.as_str()?; - let mentions = - extract_mentions("", msg.mentions.as_deref().unwrap_or(&[]), bot_open_id); + let mentions = extract_mentions( + "", msg.mentions.as_deref().unwrap_or(&[]), bot_open_id, + ); let refs = vec![MediaRef::Image { message_id: message_id.to_string(), image_key: image_key.to_string(), @@ -329,12 +329,12 @@ mod event_types { } "file" => { let file_key = content_json.get("file_key")?.as_str()?; - let file_name = content_json - .get("file_name") + let file_name = content_json.get("file_name") .and_then(|v| v.as_str()) .unwrap_or("unknown"); - let mentions = - extract_mentions("", msg.mentions.as_deref().unwrap_or(&[]), bot_open_id); + let mentions = extract_mentions( + "", msg.mentions.as_deref().unwrap_or(&[]), bot_open_id, + ); let refs = vec![MediaRef::File { message_id: message_id.to_string(), file_key: file_key.to_string(), @@ -357,9 +357,7 @@ mod event_types { } } Some("img") => { - if let Some(key) = - el.get("image_key").and_then(|v| v.as_str()) - { + if let Some(key) = el.get("image_key").and_then(|v| v.as_str()) { refs.push(MediaRef::Image { message_id: message_id.to_string(), image_key: key.to_string(), @@ -390,10 +388,7 @@ mod event_types { } _ => { // text - let raw_text = content_json - .get("text") - .and_then(|v| v.as_str()) - .unwrap_or(""); + let raw_text = content_json.get("text").and_then(|v| v.as_str()).unwrap_or(""); if raw_text.trim().is_empty() { return None; } @@ -611,8 +606,7 @@ impl FeishuTokenCache { let expire = body.get("expire").and_then(|v| v.as_u64()).unwrap_or(7200); - let token = body - .get("tenant_access_token") + let token = body.get("tenant_access_token") .and_then(|v| v.as_str()) .map(|s| s.to_string()) .ok_or_else(|| anyhow::anyhow!("feishu token refresh: missing tenant_access_token"))?; @@ -704,10 +698,7 @@ async fn get_ws_endpoint( let body: serde_json::Value = resp.json().await?; let code = body.get("code").and_then(|v| v.as_i64()).unwrap_or(-1); if code != 0 { - let msg = body - .get("msg") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); + let msg = body.get("msg").and_then(|v| v.as_str()).unwrap_or("unknown"); anyhow::bail!("feishu ws endpoint error: code={code} msg={msg}"); } body.get("data") @@ -924,9 +915,7 @@ async fn handle_ws_message( let bot_id = bot_open_id_store.read().await; let bot_id_ref = bot_id.as_deref(); - if let Some((mut gateway_event, media_refs)) = - parse_message_event(&envelope, bot_id_ref, config) - { + if let Some((mut gateway_event, media_refs)) = parse_message_event(&envelope, bot_id_ref, config) { // Also dedupe by message_id if dedupe.is_duplicate(&gateway_event.message_id) { return; @@ -956,13 +945,8 @@ async fn handle_ws_message( // Resolve sender display name (lazy, cached) let name = resolve_user_name( - &gateway_event.sender.id, - name_cache, - token_cache, - client, - &config.api_base(), - ) - .await; + &gateway_event.sender.id, name_cache, token_cache, client, &config.api_base(), + ).await; gateway_event.sender.name = name.clone(); gateway_event.sender.display_name = name; @@ -972,22 +956,11 @@ async fn handle_ws_message( let api_base = config.api_base(); for media_ref in &media_refs { let attachment = match media_ref { - MediaRef::Image { - message_id, - image_key, - } => { - download_feishu_image(client, &api_base, &token, message_id, image_key) - .await + MediaRef::Image { message_id, image_key } => { + download_feishu_image(client, &api_base, &token, message_id, image_key).await } - MediaRef::File { - message_id, - file_key, - file_name, - } => { - download_feishu_file( - client, &api_base, &token, message_id, file_key, file_name, - ) - .await + MediaRef::File { message_id, file_key, file_name } => { + download_feishu_file(client, &api_base, &token, message_id, file_key, file_name).await } }; if let Some(att) = attachment { @@ -998,9 +971,7 @@ async fn handle_ws_message( } // Skip if no text and no attachments (e.g. unsupported file type) - if gateway_event.content.text.trim().is_empty() - && gateway_event.content.attachments.is_empty() - { + if gateway_event.content.text.trim().is_empty() && gateway_event.content.attachments.is_empty() { return; } @@ -1075,14 +1046,9 @@ async fn edit_feishu_message(adapter: &FeishuAdapter, message_id: &str, text: &s "msg_type": "post", "content": post_content.to_string(), }); - match adapter - .client - .put(&url) - .bearer_auth(&token) + match adapter.client.put(&url).bearer_auth(&token) .header("Content-Type", "application/json; charset=utf-8") - .json(&body) - .send() - .await + .json(&body).send().await { Ok(resp) if resp.status().is_success() => { tracing::trace!(message_id = %message_id, "feishu message edited"); @@ -1299,18 +1265,10 @@ fn try_parse_link(chars: &[char], start: usize) -> Option<(String, String, usize /// Reference to a media resource that needs async download after parse_message_event. pub enum MediaRef { - Image { - message_id: String, - image_key: String, - }, - File { - message_id: String, - file_key: String, - file_name: String, - }, + Image { message_id: String, image_key: String }, + File { message_id: String, file_key: String, file_name: String }, } -use crate::media::{resize_and_compress, FILE_MAX_DOWNLOAD, IMAGE_MAX_DOWNLOAD}; /// Download a Feishu image by message_id + image_key → resize/compress → base64 Attachment. pub async fn download_feishu_image( @@ -1339,11 +1297,7 @@ pub async fn download_feishu_image( if let Some(cl) = resp.headers().get(reqwest::header::CONTENT_LENGTH) { if let Ok(size) = cl.to_str().unwrap_or("0").parse::() { if size > IMAGE_MAX_DOWNLOAD { - tracing::warn!( - image_key, - size, - "feishu image Content-Length exceeds 10MB limit, skipping download" - ); + tracing::warn!(image_key, size, "feishu image Content-Length exceeds 10MB limit, skipping download"); return None; } } @@ -1351,11 +1305,7 @@ pub async fn download_feishu_image( let bytes = resp.bytes().await.ok()?; // Fallback check (Content-Length may be absent or misreported) if bytes.len() as u64 > IMAGE_MAX_DOWNLOAD { - tracing::warn!( - image_key, - size = bytes.len(), - "feishu image exceeds 10MB limit" - ); + tracing::warn!(image_key, size = bytes.len(), "feishu image exceeds 10MB limit"); return None; } let (compressed, mime) = match resize_and_compress(&bytes) { @@ -1389,9 +1339,9 @@ pub async fn download_feishu_file( // Only download text-like files let ext = file_name.rsplit('.').next().unwrap_or("").to_lowercase(); const TEXT_EXTS: &[&str] = &[ - "txt", "csv", "log", "md", "json", "jsonl", "yaml", "yml", "toml", "xml", "rs", "py", "js", - "ts", "jsx", "tsx", "go", "java", "c", "cpp", "h", "hpp", "rb", "sh", "bash", "sql", - "html", "css", "ini", "cfg", "conf", "env", + "txt", "csv", "log", "md", "json", "jsonl", "yaml", "yml", "toml", "xml", + "rs", "py", "js", "ts", "jsx", "tsx", "go", "java", "c", "cpp", "h", "hpp", + "rb", "sh", "bash", "sql", "html", "css", "ini", "cfg", "conf", "env", ]; if !TEXT_EXTS.contains(&ext.as_str()) { tracing::debug!(file_name, "skipping non-text file attachment"); @@ -1416,11 +1366,7 @@ pub async fn download_feishu_file( if let Some(cl) = resp.headers().get(reqwest::header::CONTENT_LENGTH) { if let Ok(size) = cl.to_str().unwrap_or("0").parse::() { if size > FILE_MAX_DOWNLOAD { - tracing::warn!( - file_name, - size, - "feishu file Content-Length exceeds 512KB limit, skipping download" - ); + tracing::warn!(file_name, size, "feishu file Content-Length exceeds 512KB limit, skipping download"); return None; } } @@ -1428,11 +1374,7 @@ pub async fn download_feishu_file( let bytes = resp.bytes().await.ok()?; // Fallback check (Content-Length may be absent or misreported) if bytes.len() as u64 > FILE_MAX_DOWNLOAD { - tracing::warn!( - file_name, - size = bytes.len(), - "feishu file exceeds 512KB limit" - ); + tracing::warn!(file_name, size = bytes.len(), "feishu file exceeds 512KB limit"); return None; } let text = String::from_utf8_lossy(&bytes); @@ -1469,10 +1411,7 @@ pub async fn send_post_message( ) } else { ( - format!( - "{}/open-apis/im/v1/messages?receive_id_type=chat_id", - api_base - ), + format!("{}/open-apis/im/v1/messages?receive_id_type=chat_id", api_base), serde_json::json!({ "receive_id": chat_id, "msg_type": "post", @@ -1607,18 +1546,13 @@ async fn add_reaction(adapter: &FeishuAdapter, message_id: &str, emoji: &str) { }; let token = match adapter.token_cache.get_token(&adapter.client).await { Ok(t) => t, - Err(e) => { - tracing::error!(err = %e, "feishu: cannot get token for reaction"); - return; - } + Err(e) => { tracing::error!(err = %e, "feishu: cannot get token for reaction"); return; } }; let url = format!( "{}/open-apis/im/v1/messages/{}/reactions", - adapter.config.api_base(), - message_id + adapter.config.api_base(), message_id ); - let _ = adapter - .client + let _ = adapter.client .post(&url) .bearer_auth(&token) .json(&serde_json::json!({"reaction_type": {"emoji_type": reaction_type}})) @@ -1634,26 +1568,15 @@ async fn remove_reaction(adapter: &FeishuAdapter, message_id: &str, emoji: &str) }; let token = match adapter.token_cache.get_token(&adapter.client).await { Ok(t) => t, - Err(e) => { - tracing::error!(err = %e, "feishu: cannot get token for reaction"); - return; - } + Err(e) => { tracing::error!(err = %e, "feishu: cannot get token for reaction"); return; } }; // Feishu remove reaction needs reaction_id. Simpler approach: delete by type. // GET reactions, find matching, DELETE by id. let list_url = format!( "{}/open-apis/im/v1/messages/{}/reactions?reaction_type={}", - adapter.config.api_base(), - message_id, - reaction_type + adapter.config.api_base(), message_id, reaction_type ); - let resp = match adapter - .client - .get(&list_url) - .bearer_auth(&token) - .send() - .await - { + let resp = match adapter.client.get(&list_url).bearer_auth(&token).send().await { Ok(r) => r, Err(_) => return, }; @@ -1665,24 +1588,15 @@ async fn remove_reaction(adapter: &FeishuAdapter, message_id: &str, emoji: &str) if let Some(items) = body.pointer("/data/items").and_then(|v| v.as_array()) { let bot_id = adapter.bot_open_id.read().await; for item in items { - let is_ours = item - .pointer("/operator/operator_id/open_id") - .and_then(|v| v.as_str()) - == bot_id.as_deref(); + let is_ours = item.pointer("/operator/operator_id/open_id") + .and_then(|v| v.as_str()) == bot_id.as_deref(); if is_ours { if let Some(reaction_id) = item.get("reaction_id").and_then(|v| v.as_str()) { let del_url = format!( "{}/open-apis/im/v1/messages/{}/reactions/{}", - adapter.config.api_base(), - message_id, - reaction_id + adapter.config.api_base(), message_id, reaction_id ); - let _ = adapter - .client - .delete(&del_url) - .bearer_auth(&token) - .send() - .await; + let _ = adapter.client.delete(&del_url).bearer_auth(&token).send().await; return; } } @@ -1753,16 +1667,7 @@ pub async fn handle_reply( // Use post (rich text) format for markdown rendering. // When in a thread (thread_id present), use reply API to stay in the same thread. if text.len() <= limit { - match send_post_message( - &adapter.client, - &api_base, - &token, - &reply.channel.id, - thread_id, - text, - ) - .await - { + match send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, thread_id, text).await { Some(msg_id) => { adapter.dedupe.is_duplicate(&msg_id); // Send response with message_id back to OAB core (for streaming edit) @@ -1799,16 +1704,7 @@ pub async fn handle_reply( } } else { for chunk in split_text(text, limit) { - if let Some(msg_id) = send_post_message( - &adapter.client, - &api_base, - &token, - &reply.channel.id, - thread_id, - chunk, - ) - .await - { + if let Some(msg_id) = send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, thread_id, chunk).await { adapter.dedupe.is_duplicate(&msg_id); } } @@ -1912,9 +1808,11 @@ fn verify_signature( fn decrypt_event(encrypt_key: &str, encrypted: &str) -> anyhow::Result { use sha2::{Digest, Sha256}; let key = Sha256::digest(encrypt_key.as_bytes()); - let cipher_bytes = - base64::Engine::decode(&base64::engine::general_purpose::STANDARD, encrypted) - .map_err(|e| anyhow::anyhow!("base64 decode failed: {e}"))?; + let cipher_bytes = base64::Engine::decode( + &base64::engine::general_purpose::STANDARD, + encrypted, + ) + .map_err(|e| anyhow::anyhow!("base64 decode failed: {e}"))?; if cipher_bytes.len() < 16 { anyhow::bail!("encrypted data too short"); @@ -1963,10 +1861,7 @@ pub async fn webhook( .and_then(|v| v.to_str().ok()) .unwrap_or("unknown"); if feishu.rate_limiter.check(ip) { - return ( - axum::http::StatusCode::TOO_MANY_REQUESTS, - "rate limit exceeded", - ) + return (axum::http::StatusCode::TOO_MANY_REQUESTS, "rate limit exceeded") .into_response(); } @@ -2086,18 +1981,12 @@ pub async fn webhook( let bot_id = feishu.bot_open_id.read().await; let bot_id_ref = bot_id.as_deref(); - if let Some((mut gateway_event, media_refs)) = - parse_message_event(&envelope, bot_id_ref, &feishu.config) - { + if let Some((mut gateway_event, media_refs)) = parse_message_event(&envelope, bot_id_ref, &feishu.config) { if !feishu.dedupe.is_duplicate(&gateway_event.message_id) { let name = resolve_user_name( - &gateway_event.sender.id, - &feishu.name_cache, - &feishu.token_cache, - &feishu.client, - &feishu.config.api_base(), - ) - .await; + &gateway_event.sender.id, &feishu.name_cache, &feishu.token_cache, + &feishu.client, &feishu.config.api_base(), + ).await; gateway_event.sender.name = name.clone(); gateway_event.sender.display_name = name; @@ -2107,33 +1996,11 @@ pub async fn webhook( let api_base = feishu.config.api_base(); for media_ref in &media_refs { let attachment = match media_ref { - MediaRef::Image { - message_id, - image_key, - } => { - download_feishu_image( - &feishu.client, - &api_base, - &token, - message_id, - image_key, - ) - .await + MediaRef::Image { message_id, image_key } => { + download_feishu_image(&feishu.client, &api_base, &token, message_id, image_key).await } - MediaRef::File { - message_id, - file_key, - file_name, - } => { - download_feishu_file( - &feishu.client, - &api_base, - &token, - message_id, - file_key, - file_name, - ) - .await + MediaRef::File { message_id, file_key, file_name } => { + download_feishu_file(&feishu.client, &api_base, &token, message_id, file_key, file_name).await } }; if let Some(att) = attachment { @@ -2144,9 +2011,7 @@ pub async fn webhook( } // Skip if no text and no attachments (e.g. unsupported file type) - if gateway_event.content.text.trim().is_empty() - && gateway_event.content.attachments.is_empty() - { + if gateway_event.content.text.trim().is_empty() && gateway_event.content.attachments.is_empty() { return axum::http::StatusCode::OK.into_response(); } @@ -2438,7 +2303,7 @@ mod tests { fn parse_group_without_mention_filtered() { let env = make_envelope("group", "just chatting", "ou_user1", None); let cfg = test_config(); // require_mention = true - // Gateway-side mention gating: group message without bot mention is filtered + // Gateway-side mention gating: group message without bot mention is filtered assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); } @@ -2454,13 +2319,7 @@ mod tests { #[test] fn parse_skips_bot_sender() { let mut env = make_envelope("p2p", "hello", "ou_bot", None); - env.event - .as_mut() - .unwrap() - .sender - .as_mut() - .unwrap() - .sender_type = Some("bot".into()); + env.event.as_mut().unwrap().sender.as_mut().unwrap().sender_type = Some("bot".into()); let cfg = test_config(); assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); } @@ -2475,13 +2334,7 @@ mod tests { #[test] fn parse_skips_non_text_message() { let mut env = make_envelope("p2p", "hello", "ou_user1", None); - env.event - .as_mut() - .unwrap() - .message - .as_mut() - .unwrap() - .message_type = Some("sticker".into()); + env.event.as_mut().unwrap().message.as_mut().unwrap().message_type = Some("sticker".into()); let cfg = test_config(); assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); } @@ -2575,25 +2428,11 @@ mod tests { let name_cache = Arc::new(std::sync::Mutex::new(HashMap::new())); let client = reqwest::Client::new(); - let name = resolve_user_name( - "ou_user1", - &name_cache, - &token_cache, - &client, - &server.uri(), - ) - .await; + let name = resolve_user_name("ou_user1", &name_cache, &token_cache, &client, &server.uri()).await; assert_eq!(name, "Alice"); // Second call should use cache (expect(1) above ensures no second API call) - let name2 = resolve_user_name( - "ou_user1", - &name_cache, - &token_cache, - &client, - &server.uri(), - ) - .await; + let name2 = resolve_user_name("ou_user1", &name_cache, &token_cache, &client, &server.uri()).await; assert_eq!(name2, "Alice"); } @@ -2620,14 +2459,7 @@ mod tests { let name_cache = Arc::new(std::sync::Mutex::new(HashMap::new())); let client = reqwest::Client::new(); - let name = resolve_user_name( - "ou_unknown", - &name_cache, - &token_cache, - &client, - &server.uri(), - ) - .await; + let name = resolve_user_name("ou_unknown", &name_cache, &token_cache, &client, &server.uri()).await; assert_eq!(name, "ou_unknown"); } @@ -2638,17 +2470,10 @@ mod tests { // If mention key appears in normal text too, only the first occurrence is removed let mentions = vec![FeishuMention { key: Some("@_user_1".into()), - id: Some(FeishuMentionId { - open_id: Some("ou_bot".into()), - }), + id: Some(FeishuMentionId { open_id: Some("ou_bot".into()) }), name: Some("Bot".into()), }]; - let env = make_envelope( - "group", - "@_user_1 tell me about @_user_1 patterns", - "ou_user1", - Some(mentions), - ); + let env = make_envelope("group", "@_user_1 tell me about @_user_1 patterns", "ou_user1", Some(mentions)); let cfg = test_config(); let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg).unwrap(); // Only first @_user_1 removed, second preserved @@ -2679,9 +2504,7 @@ mod tests { fn parse_allowed_groups_blocks_unlisted() { let mentions = vec![FeishuMention { key: Some("@_user_1".into()), - id: Some(FeishuMentionId { - open_id: Some("ou_bot".into()), - }), + id: Some(FeishuMentionId { open_id: Some("ou_bot".into()) }), name: Some("Bot".into()), }]; let env = make_envelope("group", "@_user_1 hello", "ou_user1", Some(mentions)); @@ -2694,9 +2517,7 @@ mod tests { fn parse_allowed_groups_permits_listed() { let mentions = vec![FeishuMention { key: Some("@_user_1".into()), - id: Some(FeishuMentionId { - open_id: Some("ou_bot".into()), - }), + id: Some(FeishuMentionId { open_id: Some("ou_bot".into()) }), name: Some("Bot".into()), }]; let env = make_envelope("group", "@_user_1 hello", "ou_user1", Some(mentions)); @@ -2757,13 +2578,7 @@ mod tests { #[test] fn parse_thread_id_from_root_id() { let mut env = make_envelope("p2p", "reply", "ou_user1", None); - env.event - .as_mut() - .unwrap() - .message - .as_mut() - .unwrap() - .root_id = Some("om_root".into()); + env.event.as_mut().unwrap().message.as_mut().unwrap().root_id = Some("om_root".into()); let cfg = test_config(); let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg).unwrap(); assert_eq!(evt.channel.thread_id, Some("om_root".into())); @@ -2772,13 +2587,7 @@ mod tests { #[test] fn parse_thread_id_from_parent_id() { let mut env = make_envelope("p2p", "reply", "ou_user1", None); - env.event - .as_mut() - .unwrap() - .message - .as_mut() - .unwrap() - .parent_id = Some("om_parent".into()); + env.event.as_mut().unwrap().message.as_mut().unwrap().parent_id = Some("om_parent".into()); let cfg = test_config(); let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg).unwrap(); assert_eq!(evt.channel.thread_id, Some("om_parent".into())); diff --git a/gateway/src/adapters/googlechat.rs b/gateway/src/adapters/googlechat.rs index ed3b07d1..13a97993 100644 --- a/gateway/src/adapters/googlechat.rs +++ b/gateway/src/adapters/googlechat.rs @@ -231,17 +231,13 @@ impl GoogleChatAdapter { }; let formatted = markdown_to_gchat(text); - let url = format!("{}/{}?updateMask=text", self.api_base, message_name); + let url = format!( + "{}/{}?updateMask=text", + self.api_base, message_name + ); let body = serde_json::json!({ "text": formatted }); - match self - .client - .patch(&url) - .bearer_auth(&token) - .json(&body) - .send() - .await - { + match self.client.patch(&url).bearer_auth(&token).json(&body).send().await { Ok(r) if r.status().is_success() => { tracing::trace!(message_name = %message_name, "googlechat message edited"); } @@ -265,8 +261,7 @@ impl GoogleChatAdapter { match reply.command.as_deref() { Some("add_reaction") | Some("remove_reaction") | Some("create_topic") => return, Some("edit_message") => { - self.edit_message(&reply.reply_to, &reply.content.text) - .await; + self.edit_message(&reply.reply_to, &reply.content.text).await; return; } _ => {} @@ -402,7 +397,10 @@ pub async fn webhook( if let Some(ref adapter) = state.google_chat { if let Some(ref verifier) = adapter.jwt_verifier { - let auth_header = match headers.get("authorization").and_then(|v| v.to_str().ok()) { + let auth_header = match headers + .get("authorization") + .and_then(|v| v.to_str().ok()) + { Some(h) => h, None => { warn!("googlechat webhook: missing authorization header"); @@ -468,7 +466,12 @@ pub async fn webhook( let thread_id = msg.thread.as_ref().map(|t| t.name.clone()); - let message_id = msg.name.rsplit('/').next().unwrap_or(&msg.name).to_string(); + let message_id = msg + .name + .rsplit('/') + .next() + .unwrap_or(&msg.name) + .to_string(); let gw_event = GatewayEvent::new( "googlechat", @@ -556,9 +559,7 @@ impl GoogleChatTokenCache { } async fn refresh(&self, client: &reqwest::Client) -> Result<(String, u64), String> { - let jwt = self - .build_jwt() - .map_err(|e| format!("JWT build error: {e}"))?; + let jwt = self.build_jwt().map_err(|e| format!("JWT build error: {e}"))?; let resp = client .post("https://oauth2.googleapis.com/token") .form(&[ @@ -611,7 +612,8 @@ impl GoogleChatTokenCache { let key = jsonwebtoken::EncodingKey::from_rsa_pem(self.private_key.as_bytes()) .map_err(|e| format!("RSA key parse error: {e}"))?; let header = jsonwebtoken::Header::new(jsonwebtoken::Algorithm::RS256); - jsonwebtoken::encode(&header, &claims, &key).map_err(|e| format!("JWT encode error: {e}")) + jsonwebtoken::encode(&header, &claims, &key) + .map_err(|e| format!("JWT encode error: {e}")) } } @@ -979,10 +981,7 @@ mod tests { let msg = payload.message.as_ref().unwrap(); assert_eq!(msg.argument_text.as_deref(), Some("hi")); assert_eq!(msg.thread.as_ref().unwrap().name, "spaces/SP/threads/t1"); - assert_eq!( - payload.space.as_ref().unwrap().space_type.as_deref(), - Some("ROOM") - ); + assert_eq!(payload.space.as_ref().unwrap().space_type.as_deref(), Some("ROOM")); } #[test] @@ -1345,16 +1344,15 @@ mod tests { #[tokio::test] async fn handle_reply_sends_gateway_response_success() { - use wiremock::matchers::{method, path_regex}; use wiremock::{Mock, MockServer, ResponseTemplate}; + use wiremock::matchers::{method, path_regex}; let mock_server = MockServer::start().await; Mock::given(method("POST")) .and(path_regex("/spaces/.*/messages")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(serde_json::json!({"name": "spaces/TEST/messages/msg_abc"})), - ) + .respond_with(ResponseTemplate::new(200).set_body_json( + serde_json::json!({"name": "spaces/TEST/messages/msg_abc"}), + )) .mount(&mock_server) .await; @@ -1391,8 +1389,8 @@ mod tests { #[tokio::test] async fn handle_reply_sends_failure_response_on_api_error() { - use wiremock::matchers::{method, path_regex}; use wiremock::{Mock, MockServer, ResponseTemplate}; + use wiremock::matchers::{method, path_regex}; let mock_server = MockServer::start().await; Mock::given(method("POST")) @@ -1431,17 +1429,13 @@ mod tests { assert!(!resp.success); assert!(resp.message_id.is_none()); let err = resp.error.expect("error should be set on send failure"); - assert!( - err.contains("500"), - "error should include status code, got: {}", - err - ); + assert!(err.contains("500"), "error should include status code, got: {}", err); } #[tokio::test] async fn handle_reply_empty_message_short_circuits() { - use wiremock::matchers::{method, path_regex}; use wiremock::{Mock, MockServer, ResponseTemplate}; + use wiremock::matchers::{method, path_regex}; let mock_server = MockServer::start().await; // Mount a mock that would fail the test if called @@ -1476,10 +1470,7 @@ mod tests { adapter.handle_reply(&reply, &event_tx).await; let received = event_rx.try_recv(); - assert!( - received.is_ok(), - "expected failure GatewayResponse for empty message" - ); + assert!(received.is_ok(), "expected failure GatewayResponse for empty message"); let resp: GatewayResponse = serde_json::from_str(&received.unwrap()).unwrap(); assert_eq!(resp.request_id, "req_empty"); assert!(!resp.success); @@ -1488,8 +1479,8 @@ mod tests { #[tokio::test] async fn handle_reply_multi_chunk_failure_includes_error() { - use wiremock::matchers::{method, path_regex}; use wiremock::{Mock, MockServer, ResponseTemplate}; + use wiremock::matchers::{method, path_regex}; let mock_server = MockServer::start().await; Mock::given(method("POST")) @@ -1566,16 +1557,15 @@ mod tests { #[tokio::test] async fn handle_reply_edit_message_does_not_send_response() { - use wiremock::matchers::{method, path_regex}; use wiremock::{Mock, MockServer, ResponseTemplate}; + use wiremock::matchers::{method, path_regex}; let mock_server = MockServer::start().await; Mock::given(method("PATCH")) .and(path_regex("/spaces/.*/messages/.*")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(serde_json::json!({"name": "spaces/SP/messages/msg1"})), - ) + .respond_with(ResponseTemplate::new(200).set_body_json( + serde_json::json!({"name": "spaces/SP/messages/msg1"}), + )) .mount(&mock_server) .await; @@ -1608,16 +1598,15 @@ mod tests { #[tokio::test] async fn handle_reply_multi_chunk_sends_gateway_response() { - use wiremock::matchers::{method, path_regex}; use wiremock::{Mock, MockServer, ResponseTemplate}; + use wiremock::matchers::{method, path_regex}; let mock_server = MockServer::start().await; Mock::given(method("POST")) .and(path_regex("/spaces/.*/messages")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(serde_json::json!({"name": "spaces/TEST/messages/first_chunk"})), - ) + .respond_with(ResponseTemplate::new(200).set_body_json( + serde_json::json!({"name": "spaces/TEST/messages/first_chunk"}), + )) .mount(&mock_server) .await; @@ -1650,10 +1639,7 @@ mod tests { let resp: GatewayResponse = serde_json::from_str(&received.unwrap()).unwrap(); assert_eq!(resp.request_id, "req_multi"); assert!(resp.success); - assert_eq!( - resp.message_id, - Some("spaces/TEST/messages/first_chunk".into()) - ); + assert_eq!(resp.message_id, Some("spaces/TEST/messages/first_chunk".into())); } #[tokio::test] @@ -1661,17 +1647,16 @@ mod tests { // Mixed success/failure: chunk 1 succeeds, subsequent chunks fail. // Expect success=false (any chunk failure marks overall as failed), // but message_id is still set so core has a reference. - use wiremock::matchers::{method, path_regex}; use wiremock::{Mock, MockServer, ResponseTemplate}; + use wiremock::matchers::{method, path_regex}; let mock_server = MockServer::start().await; // First request: 200 OK with message name Mock::given(method("POST")) .and(path_regex("/spaces/.*/messages")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(serde_json::json!({"name": "spaces/TEST/messages/first_chunk"})), - ) + .respond_with(ResponseTemplate::new(200).set_body_json( + serde_json::json!({"name": "spaces/TEST/messages/first_chunk"}), + )) .up_to_n_times(1) .mount(&mock_server) .await; @@ -1711,10 +1696,7 @@ mod tests { let resp: GatewayResponse = serde_json::from_str(&received.unwrap()).unwrap(); assert_eq!(resp.request_id, "req_partial"); assert!(!resp.success, "partial failure must report success=false"); - assert_eq!( - resp.message_id, - Some("spaces/TEST/messages/first_chunk".into()) - ); + assert_eq!(resp.message_id, Some("spaces/TEST/messages/first_chunk".into())); let err = resp.error.expect("partial failure should set error"); assert!(err.contains("500")); } diff --git a/gateway/src/adapters/teams.rs b/gateway/src/adapters/teams.rs index fdbcae67..d7b5433e 100644 --- a/gateway/src/adapters/teams.rs +++ b/gateway/src/adapters/teams.rs @@ -275,9 +275,7 @@ impl TeamsAdapter { } // B2: Validate channel endorsements — key must endorse the activity's channelId - let channel_id = activity - .channel_id - .as_deref() + let channel_id = activity.channel_id.as_deref() .ok_or_else(|| anyhow::anyhow!("activity missing channelId"))?; if key.endorsements.is_empty() { anyhow::bail!("JWK has no endorsements — cannot verify channelId={channel_id}"); @@ -303,13 +301,9 @@ impl TeamsAdapter { let token_data = decode::(token, &decoding_key, &validation)?; // B1: Validate serviceUrl claim matches activity's serviceUrl - let activity_service_url = activity - .service_url - .as_deref() + let activity_service_url = activity.service_url.as_deref() .ok_or_else(|| anyhow::anyhow!("activity missing serviceUrl"))?; - let token_service_url = token_data - .claims - .get("serviceurl") + let token_service_url = token_data.claims.get("serviceurl") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("JWT missing serviceurl claim"))?; if token_service_url != activity_service_url { @@ -749,9 +743,7 @@ mod tests { async fn jwt_rejects_garbage_token() { let adapter = TeamsAdapter::new(make_config(vec![])); let activity = make_activity_with_tenant(Some("t1")); - let result = adapter - .validate_jwt("Bearer not.a.valid.jwt", &activity) - .await; + let result = adapter.validate_jwt("Bearer not.a.valid.jwt", &activity).await; assert!(result.is_err()); } diff --git a/gateway/src/main.rs b/gateway/src/main.rs index bbbfa0c8..1e253977 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -109,7 +109,7 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: let reaction_state: Arc>>> = Arc::new(Mutex::new(HashMap::new())); let recv_task = tokio::spawn(async move { - let client = reqwest::Client::new(); + let client = &state_for_recv.client; while let Some(Ok(msg)) = ws_rx.next().await { if let Message::Text(text) = msg { match serde_json::from_str::(&*text) {