diff --git a/docs/feishu.md b/docs/feishu.md index a4a494f9..b1c70eb2 100644 --- a/docs/feishu.md +++ b/docs/feishu.md @@ -80,6 +80,7 @@ https://your-gateway-host/webhook/feishu | — | `FEISHU_ALLOW_BOTS` | `off` | Bot message handling: `off` / `mentions` / `all` | | — | `FEISHU_TRUSTED_BOT_IDS` | — | Comma-separated open_id list of known bots | | — | `FEISHU_MAX_BOT_TURNS` | `20` | Max consecutive bot replies per channel before suppression | +| — | `FEISHU_SESSION_TTL_HOURS` | `24` | How long the bot remembers thread participation (hours). After expiry, @mention is required again. | | `gateway.botUsername` | — | — | Set to bot's `open_id` for @mention gating | | `gateway.streaming` | — | `false` | Enable streaming (typewriter) mode | @@ -95,6 +96,14 @@ In group chats, the bot only responds when @mentioned (default). To find your bo To disable mention gating: `feishu.requireMention: false`. +### Thread Participation (Involved Mode) + +Once the bot replies in a thread (topic), it remembers that thread and responds to subsequent messages **without requiring @mention** — similar to Discord's `allow_user_messages: "involved"` mode. + +- Only applies to threads (messages with `root_id`). Main channel messages always require @mention. +- Participation is stored in memory. Gateway restart clears the cache; users need to @mention once to re-engage. +- TTL controlled by `FEISHU_SESSION_TTL_HOURS` (default 24h). After expiry, @mention is required again. + ## Security Notes - `appSecret`, `verificationToken`, and `encryptKey` are stored in a Kubernetes Secret, not in ConfigMap. diff --git a/gateway/src/adapters/feishu.rs b/gateway/src/adapters/feishu.rs index 922fae34..286fa32b 100644 --- a/gateway/src/adapters/feishu.rs +++ b/gateway/src/adapters/feishu.rs @@ -85,6 +85,11 @@ pub struct FeishuConfig { pub max_bot_turns: u32, pub dedupe_ttl_secs: u64, pub message_limit: usize, + /// TTL for participated-thread cache entries (seconds). Threads older than + /// this are forgotten and require a fresh @mention to re-engage. + /// Set to 0 (via FEISHU_SESSION_TTL_HOURS=0) to disable participation + /// tracking entirely — all messages will require @mention. + pub session_ttl_secs: u64, } impl FeishuConfig { @@ -137,6 +142,11 @@ impl FeishuConfig { .ok() .and_then(|v| v.parse().ok()) .unwrap_or(4000); + let session_ttl_secs = std::env::var("FEISHU_SESSION_TTL_HOURS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(24) + * 3600; Some(Self { app_id, @@ -154,6 +164,7 @@ impl FeishuConfig { max_bot_turns, dedupe_ttl_secs, message_limit, + session_ttl_secs, }) } @@ -246,6 +257,7 @@ mod event_types { envelope: &FeishuEventEnvelope, bot_open_id: Option<&str>, config: &FeishuConfig, + is_thread_participated: bool, ) -> Option<(GatewayEvent, Vec)> { let _header = envelope.header.as_ref()?; let event = envelope.event.as_ref()?; @@ -412,11 +424,16 @@ mod event_types { // Gateway-side mention gating: in groups, skip if require_mention // is true and bot is not mentioned (for human senders). + // Bypass: if bot has previously replied in this thread (participated), + // no @mention needed (like Discord's "involved" mode). + let in_thread = thread_id.is_some(); if channel_type == "group" && !is_bot_sender && config.require_mention { - if let Some(bot_id) = bot_open_id { - let bot_mentioned = mention_ids.iter().any(|id| id == bot_id); - if !bot_mentioned { - return None; + if !(in_thread && is_thread_participated) { + if let Some(bot_id) = bot_open_id { + let bot_mentioned = mention_ids.iter().any(|id| id == bot_id); + if !bot_mentioned { + return None; + } } } } @@ -627,7 +644,11 @@ pub struct FeishuAdapter { pub name_cache: Arc>>, /// Per-channel bot turn counter. Key = chat_id, Value = (count, last_reset). /// Human message resets count to 0. Prevents runaway bot-to-bot loops. - pub bot_turns: Arc>>, // TODO: add TTL eviction for long-running deploys + pub bot_turns: Arc>>, // eviction: human msg resets; follow-up can add TTL like participated_threads + /// Positive-only cache: thread_id (root_id) → last_replied_at. + /// When bot has replied in a thread, subsequent messages in that thread + /// bypass @mention gating (like Discord's "involved" mode). + pub participated_threads: Arc>>, pub client: reqwest::Client, } @@ -644,6 +665,7 @@ impl FeishuAdapter { bot_open_id: Arc::new(RwLock::new(None)), name_cache: Arc::new(std::sync::Mutex::new(HashMap::new())), bot_turns: Arc::new(std::sync::Mutex::new(HashMap::new())), + participated_threads: Arc::new(std::sync::Mutex::new(HashMap::new())), client: reqwest::Client::new(), } } @@ -737,6 +759,7 @@ pub async fn start_websocket( let client = adapter.client.clone(); let name_cache = adapter.name_cache.clone(); let bot_turns = adapter.bot_turns.clone(); + let participated_threads = adapter.participated_threads.clone(); let handle = tokio::spawn(async move { let mut backoff_secs = 1u64; @@ -751,6 +774,7 @@ pub async fn start_websocket( &mut shutdown_rx, &name_cache, &bot_turns, + &participated_threads, ) .await; @@ -791,6 +815,7 @@ async fn ws_connect_loop( shutdown_rx: &mut watch::Receiver, name_cache: &Arc>>, bot_turns: &Arc>>, + participated_threads: &Arc>>, ) -> anyhow::Result<()> { let api_base = config.api_base(); @@ -818,7 +843,7 @@ async fn ws_connect_loop( Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => { handle_ws_message( &text, bot_open_id_store, dedupe, config, event_tx, - name_cache, token_cache, client, bot_turns, + name_cache, token_cache, client, bot_turns, participated_threads, ).await; } Some(Ok(tokio_tungstenite::tungstenite::Message::Ping(data))) => { @@ -839,7 +864,7 @@ async fn ws_connect_loop( if let Ok(text) = String::from_utf8(payload.clone()) { handle_ws_message( &text, bot_open_id_store, dedupe, config, event_tx, - name_cache, token_cache, client, bot_turns, + name_cache, token_cache, client, bot_turns, participated_threads, ).await; } } @@ -879,6 +904,7 @@ async fn handle_ws_message( token_cache: &Arc, client: &reqwest::Client, bot_turns: &Arc>>, + participated_threads: &Arc>>, ) { let envelope: FeishuEventEnvelope = match serde_json::from_str(text) { Ok(e) => e, @@ -914,7 +940,12 @@ async fn handle_ws_message( let bot_id = bot_open_id_store.read().await; let bot_id_ref = bot_id.as_deref(); - if let Some((mut gateway_event, media_refs)) = parse_message_event(&envelope, bot_id_ref, config) { + // Check if the message is in a thread where bot has previously replied + let is_thread_participated = check_thread_participated( + &envelope, participated_threads, config.session_ttl_secs, + ); + + if let Some((mut gateway_event, media_refs)) = parse_message_event(&envelope, bot_id_ref, config, is_thread_participated) { // Also dedupe by message_id if dedupe.is_duplicate(&gateway_event.message_id) { return; @@ -1639,6 +1670,59 @@ async fn remove_reaction(adapter: &FeishuAdapter, message_id: &str, emoji: &str) // Reply handler // --------------------------------------------------------------------------- +/// Check if the bot has participated in the thread referenced by this envelope. +/// Returns `true` if the message is in a thread and that thread has a valid +/// (non-expired) participation entry in the cache. +fn check_thread_participated( + envelope: &FeishuEventEnvelope, + cache: &Arc>>, + session_ttl_secs: u64, +) -> bool { + envelope + .event + .as_ref() + .and_then(|e| e.message.as_ref()) + .and_then(|m| m.root_id.as_deref().or(m.parent_id.as_deref())) + .map(|tid| { + // Intentionally recover from poisoned mutex — cache data loss is acceptable + // and preferable to panicking the gateway. + let c = cache.lock().unwrap_or_else(|e| e.into_inner()); + c.get(tid).is_some_and(|ts| ts.elapsed().as_secs() < session_ttl_secs) + }) + .unwrap_or(false) +} + +/// Max entries in the participated_threads cache before eviction. +const PARTICIPATION_CACHE_MAX: usize = 1000; + +/// Record that the bot has participated in a thread. Evicts oldest entries +/// when the cache exceeds PARTICIPATION_CACHE_MAX. +fn record_participation( + cache: &Arc>>, + thread_id: &str, + session_ttl_secs: u64, +) { + if session_ttl_secs == 0 { + return; // Participation tracking disabled + } + // Intentionally recover from poisoned mutex — cache data loss is acceptable + // and preferable to panicking the gateway. + let mut map = cache.lock().unwrap_or_else(|e| e.into_inner()); + map.insert(thread_id.to_string(), Instant::now()); + // Evict if over capacity: first drop expired entries, then oldest half if still over + if map.len() > PARTICIPATION_CACHE_MAX { + map.retain(|_, ts| ts.elapsed().as_secs() < session_ttl_secs); + if map.len() > PARTICIPATION_CACHE_MAX { + let mut entries: Vec<_> = map.iter().map(|(k, v)| (k.clone(), *v)).collect(); + entries.sort_by_key(|(_, ts)| *ts); + let evict_count = entries.len() / 2; + for (k, _) in entries.into_iter().take(evict_count) { + map.remove(&k); + } + } + } +} + pub async fn handle_reply( reply: &GatewayReply, adapter: &FeishuAdapter, @@ -1701,6 +1785,10 @@ pub async fn handle_reply( match send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, thread_id, text).await { Some(msg_id) => { adapter.dedupe.is_duplicate(&msg_id); + // Record thread participation for mention bypass + if let Some(tid) = thread_id { + record_participation(&adapter.participated_threads, tid, adapter.config.session_ttl_secs); + } // Send response with message_id back to OAB core (for streaming edit) if let Some(ref req_id) = reply.request_id { let resp = crate::schema::GatewayResponse { @@ -1734,9 +1822,16 @@ pub async fn handle_reply( } } } else { + let mut sent_any = false; for chunk in split_text(text, limit) { if let Some(msg_id) = send_post_message(&adapter.client, &api_base, &token, &reply.channel.id, thread_id, chunk).await { adapter.dedupe.is_duplicate(&msg_id); + sent_any = true; + } + } + if sent_any { + if let Some(tid) = thread_id { + record_participation(&adapter.participated_threads, tid, adapter.config.session_ttl_secs); } } } @@ -2012,7 +2107,12 @@ pub async fn webhook( let bot_id = feishu.bot_open_id.read().await; let bot_id_ref = bot_id.as_deref(); - if let Some((mut gateway_event, media_refs)) = parse_message_event(&envelope, bot_id_ref, &feishu.config) { + // Check participated threads for mention bypass + let is_thread_participated = check_thread_participated( + &envelope, &feishu.participated_threads, feishu.config.session_ttl_secs, + ); + + if let Some((mut gateway_event, media_refs)) = parse_message_event(&envelope, bot_id_ref, &feishu.config, is_thread_participated) { if !feishu.dedupe.is_duplicate(&gateway_event.message_id) { let name = resolve_user_name( &gateway_event.sender.id, &feishu.name_cache, &feishu.token_cache, @@ -2086,6 +2186,7 @@ mod tests { max_bot_turns: 20, dedupe_ttl_secs: 300, message_limit: 4000, + session_ttl_secs: 86400, } } @@ -2304,7 +2405,7 @@ mod tests { fn parse_dm_text() { let env = make_envelope("p2p", "hello", "ou_user1", None); let cfg = test_config(); - let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg).unwrap(); + let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg, false).unwrap(); assert_eq!(evt.platform, "feishu"); assert_eq!(evt.channel.channel_type, "direct"); assert_eq!(evt.channel.id, "oc_chat1"); @@ -2324,7 +2425,7 @@ mod tests { }]; let env = make_envelope("group", "@_user_1 explain VPC", "ou_user1", Some(mentions)); let cfg = test_config(); - let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg).unwrap(); + let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg, false).unwrap(); assert_eq!(evt.channel.channel_type, "group"); assert_eq!(evt.content.text, "explain VPC"); assert_eq!(evt.mentions, vec!["ou_bot"]); @@ -2335,7 +2436,7 @@ mod tests { let env = make_envelope("group", "just chatting", "ou_user1", None); let cfg = test_config(); // require_mention = true // Gateway-side mention gating: group message without bot mention is filtered - assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); + assert!(parse_message_event(&env, Some("ou_bot"), &cfg, false).is_none()); } #[test] @@ -2343,7 +2444,7 @@ mod tests { let env = make_envelope("group", "just chatting", "ou_user1", None); let mut cfg = test_config(); cfg.require_mention = false; - let evt = parse_message_event(&env, Some("ou_bot"), &cfg); + let evt = parse_message_event(&env, Some("ou_bot"), &cfg, false); assert!(evt.is_some()); } @@ -2352,14 +2453,14 @@ mod tests { let mut env = make_envelope("p2p", "hello", "ou_bot", None); env.event.as_mut().unwrap().sender.as_mut().unwrap().sender_type = Some("bot".into()); let cfg = test_config(); - assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); + assert!(parse_message_event(&env, Some("ou_bot"), &cfg, false).is_none()); } #[test] fn parse_skips_empty_text() { let env = make_envelope("p2p", " ", "ou_user1", None); let cfg = test_config(); - assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); + assert!(parse_message_event(&env, Some("ou_bot"), &cfg, false).is_none()); } #[test] @@ -2367,14 +2468,14 @@ mod tests { let mut env = make_envelope("p2p", "hello", "ou_user1", None); env.event.as_mut().unwrap().message.as_mut().unwrap().message_type = Some("sticker".into()); let cfg = test_config(); - assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); + assert!(parse_message_event(&env, Some("ou_bot"), &cfg, false).is_none()); } #[test] fn parse_skips_self_message() { let env = make_envelope("p2p", "hello", "ou_bot", None); let cfg = test_config(); - assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); + assert!(parse_message_event(&env, Some("ou_bot"), &cfg, false).is_none()); } // --- Dedupe tests --- @@ -2506,7 +2607,7 @@ mod tests { }]; let env = make_envelope("group", "@_user_1 tell me about @_user_1 patterns", "ou_user1", Some(mentions)); let cfg = test_config(); - let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg).unwrap(); + let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg, false).unwrap(); // Only first @_user_1 removed, second preserved assert!(evt.content.text.contains("@_user_1")); } @@ -2518,7 +2619,7 @@ mod tests { let env = make_envelope("p2p", "hello", "ou_stranger", None); let mut cfg = test_config(); cfg.allowed_users = vec!["ou_vip".into()]; - assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); + assert!(parse_message_event(&env, Some("ou_bot"), &cfg, false).is_none()); } #[test] @@ -2526,7 +2627,7 @@ mod tests { let env = make_envelope("p2p", "hello", "ou_vip", None); let mut cfg = test_config(); cfg.allowed_users = vec!["ou_vip".into()]; - assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_some()); + assert!(parse_message_event(&env, Some("ou_bot"), &cfg, false).is_some()); } // --- allowed_groups filtering --- @@ -2541,7 +2642,7 @@ mod tests { let env = make_envelope("group", "@_user_1 hello", "ou_user1", Some(mentions)); let mut cfg = test_config(); cfg.allowed_groups = vec!["oc_other".into()]; // oc_chat1 not in list - assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_none()); + assert!(parse_message_event(&env, Some("ou_bot"), &cfg, false).is_none()); } #[test] @@ -2554,7 +2655,7 @@ mod tests { let env = make_envelope("group", "@_user_1 hello", "ou_user1", Some(mentions)); let mut cfg = test_config(); cfg.allowed_groups = vec!["oc_chat1".into()]; - assert!(parse_message_event(&env, Some("ou_bot"), &cfg).is_some()); + assert!(parse_message_event(&env, Some("ou_bot"), &cfg, false).is_some()); } // --- Token TTL from API response --- @@ -2611,7 +2712,7 @@ mod tests { let mut env = make_envelope("p2p", "reply", "ou_user1", None); env.event.as_mut().unwrap().message.as_mut().unwrap().root_id = Some("om_root".into()); let cfg = test_config(); - let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg).unwrap(); + let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg, false).unwrap(); assert_eq!(evt.channel.thread_id, Some("om_root".into())); } @@ -2620,7 +2721,7 @@ mod tests { let mut env = make_envelope("p2p", "reply", "ou_user1", None); env.event.as_mut().unwrap().message.as_mut().unwrap().parent_id = Some("om_parent".into()); let cfg = test_config(); - let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg).unwrap(); + let (evt, _media) = parse_message_event(&env, Some("ou_bot"), &cfg, false).unwrap(); assert_eq!(evt.channel.thread_id, Some("om_parent".into())); } @@ -2637,4 +2738,43 @@ mod tests { fn emoji_mapping_unknown() { assert_eq!(emoji_to_feishu_reaction("🎉"), None); } + + // --- Participated thread tests --- + + #[test] + fn participated_thread_bypasses_mention_gating() { + let cfg = test_config(); // require_mention = true + // Build envelope with root_id (in a thread) + let mut env = make_envelope("group", "Hello", "ou_user1", None); + env.event.as_mut().unwrap().message.as_mut().unwrap().root_id = Some("root_123".into()); + // Without participation: no @mention → None + assert!(parse_message_event(&env, Some("ou_bot"), &cfg, false).is_none()); + // With participation: no @mention → Some (bypass) + let result = parse_message_event(&env, Some("ou_bot"), &cfg, true); + assert!(result.is_some()); + let (evt, _) = result.unwrap(); + assert_eq!(evt.channel.thread_id.as_deref(), Some("root_123")); + } + + #[test] + fn participated_no_effect_without_thread() { + let cfg = test_config(); // require_mention = true + // Message in main channel (no thread_id) — participated flag doesn't help + let env = make_envelope("group", "Hello", "ou_user1", None); + assert!(parse_message_event(&env, Some("ou_bot"), &cfg, true).is_none()); + } + + #[test] + fn record_participation_and_eviction() { + let cache = Arc::new(std::sync::Mutex::new(HashMap::new())); + // Record a thread + record_participation(&cache, "thread_1", 86400); + assert_eq!(cache.lock().unwrap().len(), 1); + // Fill beyond PARTICIPATION_CACHE_MAX + for i in 0..PARTICIPATION_CACHE_MAX + 10 { + record_participation(&cache, &format!("thread_{i}"), 86400); + } + // After eviction, should be roughly half + assert!(cache.lock().unwrap().len() <= PARTICIPATION_CACHE_MAX); + } }