Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 125 additions & 1 deletion src/coding/proxy/convert/vendor_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,117 @@ def get_transition_channel(
# ── 共享辅助函数 ──────────────────────────────────────────────


def _dump_message_digest(
messages: list[dict[str, Any]],
*,
max_messages: int = 10,
label: str = "",
) -> None:
"""输出前 N 条消息的结构摘要(role + content_type_counts),用于过渡管线诊断.

仅在 DEBUG 级别输出,且仅在消息数 > 0 时才输出,避免噪声。
"""
if not messages or not logger.isEnabledFor(logging.DEBUG):
return
parts: list[str] = [f"[{label}]" if label else ""]
limit = min(len(messages), max_messages)
for idx in range(limit):
msg = messages[idx]
role = msg.get("role", "?") if isinstance(msg, dict) else "?"
content = msg.get("content") if isinstance(msg, dict) else None
if isinstance(content, list):
type_counts: dict[str, int] = {}
for b in content:
if isinstance(b, dict):
t = b.get("type", "?")
type_counts[t] = type_counts.get(t, 0) + 1
else:
type_counts["raw"] = type_counts.get("raw", 0) + 1
counts_str = ",".join(f"{t}:{c}" for t, c in sorted(type_counts.items()))
elif isinstance(content, str):
counts_str = f"str({len(content)})"
else:
counts_str = "empty"
parts.append(f"{idx}:{role}[{counts_str}]")
if len(messages) > max_messages:
parts.append(f"...+{len(messages) - max_messages}more")
logger.debug("Transition digest %s", " ".join(parts))


def _validate_anthropic_pairing(
messages: list[dict[str, Any]],
*,
context: str = "",
) -> list[str]:
"""独立的 Anthropic tool_use/tool_result 配对自检(过渡管线末端执行).

与 ``_enforce_pairing_sanity_pass`` 不同,此函数:
- 不修改消息列表(纯检测)
- 针对每个 assistant + tool_use,精确记录下一条 user 消息中匹配/缺失的 ID
- 发现不一致时输出 WARNING 级别日志含 message index 与具体 ID

Returns:
检测到的问题描述列表(空列表表示全部通过)。
"""
issues: list[str] = []
for i, msg in enumerate(messages):
if not isinstance(msg, dict) or msg.get("role") != "assistant":
continue
content = msg.get("content")
if not isinstance(content, list):
continue
tool_use_ids = [
b["id"]
for b in content
if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("id")
]
if not tool_use_ids:
continue

next_idx = i + 1
if next_idx >= len(messages):
issues.append(f"messages[{i}]: assistant with tool_uses at end of list")
continue

next_msg = messages[next_idx]
if not isinstance(next_msg, dict) or next_msg.get("role") != "user":
issues.append(
f"messages[{i}]: next messages[{next_idx}] is not user "
f"(role={next_msg.get('role') if isinstance(next_msg, dict) else '?'})"
)
continue

user_content = next_msg.get("content")
if not isinstance(user_content, list):
user_content = []

result_ids = {
b["tool_use_id"]
for b in user_content
if isinstance(b, dict)
and b.get("type") == "tool_result"
and isinstance(b.get("tool_use_id"), str)
}

missing = [uid for uid in tool_use_ids if uid not in result_ids]
if missing:
issue = (
f"messages[{i}]: {len(missing)}/{len(tool_use_ids)} tool_use(s) "
f"without tool_result in messages[{next_idx}]: {missing[:5]}"
)
issues.append(issue)

if issues:
prefix = f"[{context}] " if context else ""
logger.warning(
"Anthropic pairing validation: %s%d issue(s) found: %s",
prefix,
len(issues),
"; ".join(issues),
)
return issues


def strip_thinking_blocks(body: dict[str, Any]) -> int:
"""从 assistant 消息中移除 thinking/redacted_thinking 块(就地).

Expand Down Expand Up @@ -678,6 +789,7 @@ def prepare_zhipu_to_anthropic(
2. 改写 ``srvtoolu_*`` ID 与 ``server_tool_use`` 类型为标准 Anthropic 形式
3. 强制 tool_use/tool_result 配对(单遍正向扫描)
4. 剥离 thinking blocks(signature 无效)
5. 独立的 Anthropic 兼容性自检(纯检测,不修改,定位 enforce/sanity 未覆盖的边界 case)

所有变换均为幂等操作,安全地在已清理的请求体上重复执行。

Expand All @@ -686,6 +798,10 @@ def prepare_zhipu_to_anthropic(
"""
prepared = copy.deepcopy(body)
adaptations: list[str] = []
msgs = prepared.get("messages", [])

# ── 过渡管线诊断:变换前快照 ──
_dump_message_digest(msgs, label="zhipu→anthropic.before")

# Step 1: 剥离 zhipu 私有 content block 类型(如 server_tool_use_delta)
removed_vendor_blocks = _remove_vendor_blocks(prepared, _ZHIPU_VENDOR_BLOCK_TYPES)
Expand All @@ -696,16 +812,24 @@ def prepare_zhipu_to_anthropic(
rewritten, _ = _rewrite_srvtoolu_ids(prepared)
if rewritten:
adaptations.append(f"rewritten_{rewritten}_srvtoolu_ids")
_dump_message_digest(msgs, label="zhipu→anthropic.after_rewrite")

# Step 3: 强制 tool_use/tool_result 配对
pairing_fixes = enforce_anthropic_tool_pairing(prepared.get("messages", []))
pairing_fixes = enforce_anthropic_tool_pairing(msgs)
if pairing_fixes:
adaptations.extend(pairing_fixes)
_dump_message_digest(msgs, label="zhipu→anthropic.after_enforce")

# Step 4: 剥离 thinking blocks(zhipu signature 无效)
stripped = strip_thinking_blocks(prepared)
if stripped:
adaptations.append(f"stripped_{stripped}_thinking_blocks")
_dump_message_digest(msgs, label="zhipu→anthropic.after_strip")

# Step 5: 独立的 Anthropic 兼容性自检(纯检测,不修改)
validation_issues = _validate_anthropic_pairing(msgs, context="zhipu→anthropic")
if validation_issues:
adaptations.append("anthropic_pairing_validation_issues")

return prepared, adaptations

Expand Down
218 changes: 218 additions & 0 deletions tests/test_vendor_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -2225,3 +2225,221 @@ def test_preserves_supported_params(self):
assert result["stream"] is True
assert result["metadata"] == {"user_id": "test"}
assert adaptations == []


class TestDumpMessageDigest:
"""``_dump_message_digest`` 诊断快照函数测试."""

def test_outputs_nothing_on_empty_messages(self, caplog):
import logging

from coding.proxy.convert.vendor_channels import _dump_message_digest

with caplog.at_level(
logging.DEBUG, logger="coding.proxy.convert.vendor_channels"
):
_dump_message_digest([], label="test")
assert "Transition digest" not in caplog.text

def test_outputs_structure_for_first_n_messages(self, caplog):
import logging

from coding.proxy.convert.vendor_channels import _dump_message_digest

messages = [
{"role": "user", "content": [{"type": "text", "text": "hi"}]},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}},
],
},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "toolu_1", "content": "ok"},
],
},
]
with caplog.at_level(
logging.DEBUG, logger="coding.proxy.convert.vendor_channels"
):
_dump_message_digest(messages, label="test")
assert "test" in caplog.text
assert "0:user" in caplog.text
assert "1:assistant" in caplog.text
assert "tool_use:1" in caplog.text


class TestValidateAnthropicPairing:
"""``_validate_anthropic_pairing`` 独立配对自检测试."""

def test_no_issues_for_correct_pairing(self):
from coding.proxy.convert.vendor_channels import _validate_anthropic_pairing

messages = [
{"role": "user", "content": "go"},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}},
],
},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "toolu_1", "content": "ok"},
],
},
]
issues = _validate_anthropic_pairing(messages)
assert issues == []

def test_detects_missing_tool_result(self):
from coding.proxy.convert.vendor_channels import _validate_anthropic_pairing

messages = [
{"role": "user", "content": "go"},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}},
],
},
{"role": "user", "content": [{"type": "text", "text": "no result"}]},
]
issues = _validate_anthropic_pairing(messages)
assert len(issues) == 1
assert "toolu_1" in issues[0]
assert "messages[1]" in issues[0]

def test_detects_non_user_after_assistant_with_tool_use(self):
from coding.proxy.convert.vendor_channels import _validate_anthropic_pairing

messages = [
{"role": "user", "content": "go"},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}},
],
},
{
"role": "assistant",
"content": [{"type": "text", "text": "another assistant"}],
},
]
issues = _validate_anthropic_pairing(messages)
assert len(issues) == 1
assert "not user" in issues[0]

def test_detects_assistant_with_tool_use_at_end_of_list(self):
from coding.proxy.convert.vendor_channels import _validate_anthropic_pairing

messages = [
{"role": "user", "content": "go"},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}},
],
},
]
issues = _validate_anthropic_pairing(messages)
assert len(issues) == 1
assert "end of list" in issues[0]

def test_partial_missing_only_reports_missing_ids(self):
from coding.proxy.convert.vendor_channels import _validate_anthropic_pairing

messages = [
{"role": "user", "content": "go"},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "toolu_1", "name": "bash", "input": {}},
{"type": "tool_use", "id": "toolu_2", "name": "read", "input": {}},
],
},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "toolu_1", "content": "ok"},
],
},
]
issues = _validate_anthropic_pairing(messages)
assert len(issues) == 1
assert "toolu_2" in issues[0]
assert "1/2" in issues[0]

def test_integration_with_zhipu_to_anthropic_channel(self):
"""验证 prepare_zhipu_to_anthropic 在末端执行自检且 adaptations 包含标签."""
from coding.proxy.convert.vendor_channels import prepare_zhipu_to_anthropic

body = {
"model": "claude-opus-4-7",
"messages": [
{"role": "user", "content": "go"},
{
"role": "assistant",
"content": [
{
"type": "server_tool_use",
"id": "srvtoolu_01",
"name": "bash",
"input": {},
},
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "srvtoolu_01",
"content": "ok",
},
],
},
],
}
result, adaptations = prepare_zhipu_to_anthropic(body)
# 自检通过,不应包含 validation_issues 标签
assert "anthropic_pairing_validation_issues" not in adaptations

def test_integration_detects_enforce_missed_issue(self):
"""构造一个理论上 enforce 可能遗漏的场景,验证自检能捕获.

场景:两条连续 assistant 消息,第一条的 tool_result 被第二条的
existing_result_ids"冒领"(相同 ID 碰撞场景的模拟)。
虽然当前 enforce 实现下不太可能自然产生此场景,但自检应能捕获。
"""
from coding.proxy.convert.vendor_channels import (
_validate_anthropic_pairing,
)

# 手动构造一个 enforce 后仍存在配对缺陷的 body
messages = [
{"role": "user", "content": "go"},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "toolu_x", "name": "bash", "input": {}},
],
},
{
"role": "user",
"content": [
# tool_result 缺失 tolu_x,但有不相关的 tool_result
{
"type": "tool_result",
"tool_use_id": "toolu_other",
"content": "wrong",
},
],
},
]
issues = _validate_anthropic_pairing(messages)
assert len(issues) == 1
assert "toolu_x" in issues[0]
Loading