diff --git a/src/coding/proxy/convert/vendor_channels.py b/src/coding/proxy/convert/vendor_channels.py index 456a9b3..030f7a3 100644 --- a/src/coding/proxy/convert/vendor_channels.py +++ b/src/coding/proxy/convert/vendor_channels.py @@ -52,6 +52,117 @@ def get_transition_channel( # ── 共享辅助函数 ────────────────────────────────────────────── +def _dump_message_digest( + messages: list[dict[str, Any]], + *, + max_messages: int = 10, + label: str = "", +) -> None: + """输出前 N 条消息的结构摘要(role + content_type_counts),用于过渡管线诊断. + + 仅在 DEBUG 级别输出,且仅在消息数 > 0 时才输出,避免噪声。 + """ + if not messages or not logger.isEnabledFor(logging.DEBUG): + return + parts: list[str] = [f"[{label}]" if label else ""] + limit = min(len(messages), max_messages) + for idx in range(limit): + msg = messages[idx] + role = msg.get("role", "?") if isinstance(msg, dict) else "?" + content = msg.get("content") if isinstance(msg, dict) else None + if isinstance(content, list): + type_counts: dict[str, int] = {} + for b in content: + if isinstance(b, dict): + t = b.get("type", "?") + type_counts[t] = type_counts.get(t, 0) + 1 + else: + type_counts["raw"] = type_counts.get("raw", 0) + 1 + counts_str = ",".join(f"{t}:{c}" for t, c in sorted(type_counts.items())) + elif isinstance(content, str): + counts_str = f"str({len(content)})" + else: + counts_str = "empty" + parts.append(f"{idx}:{role}[{counts_str}]") + if len(messages) > max_messages: + parts.append(f"...+{len(messages) - max_messages}more") + logger.debug("Transition digest %s", " ".join(parts)) + + +def _validate_anthropic_pairing( + messages: list[dict[str, Any]], + *, + context: str = "", +) -> list[str]: + """独立的 Anthropic tool_use/tool_result 配对自检(过渡管线末端执行). + + 与 ``_enforce_pairing_sanity_pass`` 不同,此函数: + - 不修改消息列表(纯检测) + - 针对每个 assistant + tool_use,精确记录下一条 user 消息中匹配/缺失的 ID + - 发现不一致时输出 WARNING 级别日志含 message index 与具体 ID + + Returns: + 检测到的问题描述列表(空列表表示全部通过)。 + """ + issues: list[str] = [] + for i, msg in enumerate(messages): + if not isinstance(msg, dict) or msg.get("role") != "assistant": + continue + content = msg.get("content") + if not isinstance(content, list): + continue + tool_use_ids = [ + b["id"] + for b in content + if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("id") + ] + if not tool_use_ids: + continue + + next_idx = i + 1 + if next_idx >= len(messages): + issues.append(f"messages[{i}]: assistant with tool_uses at end of list") + continue + + next_msg = messages[next_idx] + if not isinstance(next_msg, dict) or next_msg.get("role") != "user": + issues.append( + f"messages[{i}]: next messages[{next_idx}] is not user " + f"(role={next_msg.get('role') if isinstance(next_msg, dict) else '?'})" + ) + continue + + user_content = next_msg.get("content") + if not isinstance(user_content, list): + user_content = [] + + result_ids = { + b["tool_use_id"] + for b in user_content + if isinstance(b, dict) + and b.get("type") == "tool_result" + and isinstance(b.get("tool_use_id"), str) + } + + missing = [uid for uid in tool_use_ids if uid not in result_ids] + if missing: + issue = ( + f"messages[{i}]: {len(missing)}/{len(tool_use_ids)} tool_use(s) " + f"without tool_result in messages[{next_idx}]: {missing[:5]}" + ) + issues.append(issue) + + if issues: + prefix = f"[{context}] " if context else "" + logger.warning( + "Anthropic pairing validation: %s%d issue(s) found: %s", + prefix, + len(issues), + "; ".join(issues), + ) + return issues + + def strip_thinking_blocks(body: dict[str, Any]) -> int: """从 assistant 消息中移除 thinking/redacted_thinking 块(就地). @@ -678,6 +789,7 @@ def prepare_zhipu_to_anthropic( 2. 改写 ``srvtoolu_*`` ID 与 ``server_tool_use`` 类型为标准 Anthropic 形式 3. 强制 tool_use/tool_result 配对(单遍正向扫描) 4. 剥离 thinking blocks(signature 无效) + 5. 独立的 Anthropic 兼容性自检(纯检测,不修改,定位 enforce/sanity 未覆盖的边界 case) 所有变换均为幂等操作,安全地在已清理的请求体上重复执行。 @@ -686,6 +798,10 @@ def prepare_zhipu_to_anthropic( """ prepared = copy.deepcopy(body) adaptations: list[str] = [] + msgs = prepared.get("messages", []) + + # ── 过渡管线诊断:变换前快照 ── + _dump_message_digest(msgs, label="zhipu→anthropic.before") # Step 1: 剥离 zhipu 私有 content block 类型(如 server_tool_use_delta) removed_vendor_blocks = _remove_vendor_blocks(prepared, _ZHIPU_VENDOR_BLOCK_TYPES) @@ -696,16 +812,24 @@ def prepare_zhipu_to_anthropic( rewritten, _ = _rewrite_srvtoolu_ids(prepared) if rewritten: adaptations.append(f"rewritten_{rewritten}_srvtoolu_ids") + _dump_message_digest(msgs, label="zhipu→anthropic.after_rewrite") # Step 3: 强制 tool_use/tool_result 配对 - pairing_fixes = enforce_anthropic_tool_pairing(prepared.get("messages", [])) + pairing_fixes = enforce_anthropic_tool_pairing(msgs) if pairing_fixes: adaptations.extend(pairing_fixes) + _dump_message_digest(msgs, label="zhipu→anthropic.after_enforce") # Step 4: 剥离 thinking blocks(zhipu signature 无效) stripped = strip_thinking_blocks(prepared) if stripped: adaptations.append(f"stripped_{stripped}_thinking_blocks") + _dump_message_digest(msgs, label="zhipu→anthropic.after_strip") + + # Step 5: 独立的 Anthropic 兼容性自检(纯检测,不修改) + validation_issues = _validate_anthropic_pairing(msgs, context="zhipu→anthropic") + if validation_issues: + adaptations.append("anthropic_pairing_validation_issues") return prepared, adaptations diff --git a/tests/test_vendor_channels.py b/tests/test_vendor_channels.py index f9c9bb5..d99a4c6 100644 --- a/tests/test_vendor_channels.py +++ b/tests/test_vendor_channels.py @@ -2225,3 +2225,221 @@ def test_preserves_supported_params(self): assert result["stream"] is True assert result["metadata"] == {"user_id": "test"} assert adaptations == [] + + +class TestDumpMessageDigest: + """``_dump_message_digest`` 诊断快照函数测试.""" + + def test_outputs_nothing_on_empty_messages(self, caplog): + import logging + + from coding.proxy.convert.vendor_channels import _dump_message_digest + + with caplog.at_level( + logging.DEBUG, logger="coding.proxy.convert.vendor_channels" + ): + _dump_message_digest([], label="test") + assert "Transition digest" not in caplog.text + + def test_outputs_structure_for_first_n_messages(self, caplog): + import logging + + from coding.proxy.convert.vendor_channels import _dump_message_digest + + messages = [ + {"role": "user", "content": [{"type": "text", "text": "hi"}]}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}}, + ], + }, + { + "role": "user", + "content": [ + {"type": "tool_result", "tool_use_id": "toolu_1", "content": "ok"}, + ], + }, + ] + with caplog.at_level( + logging.DEBUG, logger="coding.proxy.convert.vendor_channels" + ): + _dump_message_digest(messages, label="test") + assert "test" in caplog.text + assert "0:user" in caplog.text + assert "1:assistant" in caplog.text + assert "tool_use:1" in caplog.text + + +class TestValidateAnthropicPairing: + """``_validate_anthropic_pairing`` 独立配对自检测试.""" + + def test_no_issues_for_correct_pairing(self): + from coding.proxy.convert.vendor_channels import _validate_anthropic_pairing + + messages = [ + {"role": "user", "content": "go"}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}}, + ], + }, + { + "role": "user", + "content": [ + {"type": "tool_result", "tool_use_id": "toolu_1", "content": "ok"}, + ], + }, + ] + issues = _validate_anthropic_pairing(messages) + assert issues == [] + + def test_detects_missing_tool_result(self): + from coding.proxy.convert.vendor_channels import _validate_anthropic_pairing + + messages = [ + {"role": "user", "content": "go"}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}}, + ], + }, + {"role": "user", "content": [{"type": "text", "text": "no result"}]}, + ] + issues = _validate_anthropic_pairing(messages) + assert len(issues) == 1 + assert "toolu_1" in issues[0] + assert "messages[1]" in issues[0] + + def test_detects_non_user_after_assistant_with_tool_use(self): + from coding.proxy.convert.vendor_channels import _validate_anthropic_pairing + + messages = [ + {"role": "user", "content": "go"}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}}, + ], + }, + { + "role": "assistant", + "content": [{"type": "text", "text": "another assistant"}], + }, + ] + issues = _validate_anthropic_pairing(messages) + assert len(issues) == 1 + assert "not user" in issues[0] + + def test_detects_assistant_with_tool_use_at_end_of_list(self): + from coding.proxy.convert.vendor_channels import _validate_anthropic_pairing + + messages = [ + {"role": "user", "content": "go"}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}}, + ], + }, + ] + issues = _validate_anthropic_pairing(messages) + assert len(issues) == 1 + assert "end of list" in issues[0] + + def test_partial_missing_only_reports_missing_ids(self): + from coding.proxy.convert.vendor_channels import _validate_anthropic_pairing + + messages = [ + {"role": "user", "content": "go"}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}}, + {"type": "tool_use", "id": "toolu_2", "name": "read", "input": {}}, + ], + }, + { + "role": "user", + "content": [ + {"type": "tool_result", "tool_use_id": "toolu_1", "content": "ok"}, + ], + }, + ] + issues = _validate_anthropic_pairing(messages) + assert len(issues) == 1 + assert "toolu_2" in issues[0] + assert "1/2" in issues[0] + + def test_integration_with_zhipu_to_anthropic_channel(self): + """验证 prepare_zhipu_to_anthropic 在末端执行自检且 adaptations 包含标签.""" + from coding.proxy.convert.vendor_channels import prepare_zhipu_to_anthropic + + body = { + "model": "claude-opus-4-7", + "messages": [ + {"role": "user", "content": "go"}, + { + "role": "assistant", + "content": [ + { + "type": "server_tool_use", + "id": "srvtoolu_01", + "name": "bash", + "input": {}, + }, + ], + }, + { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": "srvtoolu_01", + "content": "ok", + }, + ], + }, + ], + } + result, adaptations = prepare_zhipu_to_anthropic(body) + # 自检通过,不应包含 validation_issues 标签 + assert "anthropic_pairing_validation_issues" not in adaptations + + def test_integration_detects_enforce_missed_issue(self): + """构造一个理论上 enforce 可能遗漏的场景,验证自检能捕获. + + 场景:两条连续 assistant 消息,第一条的 tool_result 被第二条的 + existing_result_ids"冒领"(相同 ID 碰撞场景的模拟)。 + 虽然当前 enforce 实现下不太可能自然产生此场景,但自检应能捕获。 + """ + from coding.proxy.convert.vendor_channels import ( + _validate_anthropic_pairing, + ) + + # 手动构造一个 enforce 后仍存在配对缺陷的 body + messages = [ + {"role": "user", "content": "go"}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "toolu_x", "name": "bash", "input": {}}, + ], + }, + { + "role": "user", + "content": [ + # tool_result 缺失 tolu_x,但有不相关的 tool_result + { + "type": "tool_result", + "tool_use_id": "toolu_other", + "content": "wrong", + }, + ], + }, + ] + issues = _validate_anthropic_pairing(messages) + assert len(issues) == 1 + assert "toolu_x" in issues[0]