Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions docs/agents/issue.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,19 @@ INFO Tier anthropic message succeeded (took over from failed tier: zhipu)

`is_semantic_rejection` 检测到 zhipu 返回 `invalid_request_error + 1210` 含「API 调用参数有误」中文标记,判定为语义拒绝,跳过下一层 tier。1210 是智谱官方错误码,[官方文档](https://docs.bigmodel.cn/cn/api/api-code) 定义为「参数格式/类型不符规范」(区别于 1213「必需字段缺失」、1214「字段参数非法」)。

**根因(仍在收集证据)**
**根因(已定位,修复中)**

PR #244 的初版诊断字段仅覆盖 `thinking / thinking_blocks / cache_control / model / messages`,但 2026-05-25 17:26 后的诊断日志显示失败请求**均不含**上述任何字段。说明真正祸根在更细粒度的参数(system / tools / max_tokens / sampling / metadata / content_types / body_size 等)
PR #247 (Step 1 v2) 部署后,2026-05-26 16:30–16:31 的诊断日志显示 8 次连续拒绝**全部携带 `thinking={"type": "adaptive"}`**(Anthropic Claude 4.x 新增的参数类型),而同一时段其他会话的请求持续成功。之前 curl 测试仅验证了 `{"type": "enabled"}`,未覆盖 `adaptive` 类型。GLM 可能不支持此特定类型值,导致 [1210] 参数校验失败

**处理方式(分阶段)**

- **Step 1(PR #244,已合并)**:在 `executor.py::_build_semantic_rejection_diagnostic` 中输出 thinking / cache_control 相关字段 — 但证据反转,覆盖不足以定位真因。
- **Step 1 v2(本次)**:扩展诊断函数覆盖 `system_kind|blocks(+cc)` / `tools` / `tool_choice` / 采样参数 / `stream` / `metadata_keys` / `content_types` / `body_bytes` 等维度。所有项「仅存在时输出」以控制日志噪声。配套 14 个单元测试(`TestBuildSemanticRejectionDiagnostic`)覆盖各字段组合。
- **Step 2(待定)**:依据扩展诊断日志的新证据,定位具体祸根参数后再施修复(候选路径:`ZhipuVendor._prepare_request` 参数剥离 / 调用现有 `normalize_for_zhipu` / pre-validation 警告)。
- **Step 1 v2(PR #247,已合并)**:扩展诊断函数覆盖 `system_kind|blocks(+cc)` / `tools` / `tool_choice` / 采样参数 / `stream` / `metadata_keys` / `content_types` / `body_bytes` 等维度。所有项「仅存在时输出」以控制日志噪声。配套 14 个单元测试(`TestBuildSemanticRejectionDiagnostic`)覆盖各字段组合。
- **Step 2(进行中)**:基于 Step 1 v2 的日志证据,在 `ZhipuVendor._prepare_request` 中实现 **兼容转换**(而非移除):
- `thinking.type="adaptive"` → `{"type": "enabled", "budget_tokens": 16000}`(保留 thinking 能力)
- 新增 `_build_zhipu_request_snapshot` 诊断快照,同时覆盖成功/失败请求,建立可对比证据链
- 扩展语义拒绝日志的错误体截断限制(200 → 500 字符),保留完整字段级诊断
- `metadata` 暂不处理(待进一步诊断确认兼容性)

**后续防范**

Expand Down
6 changes: 4 additions & 2 deletions src/coding/proxy/convert/vendor_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,10 @@ def _strip_cache_control(body: dict[str, Any]) -> int:

# ── zhipu 共享清洗函数 ──────────────────────────────────────────

# 跨供应商转换时主动剥离的顶层参数(首选 tier 场景由 _prepare_request 原样透传,
# GLM 原生支持 thinking / 静默忽略 cache_control 和 reasoning_effort,不会触发 400)。
# 跨供应商转换时主动剥离的顶层参数。
# 首选 tier 场景的 thinking.type=adaptive 兼容转换由
# ZhipuVendor._prepare_request 处理(转换为 enabled + budget,保留功能),
# 此处仅负责 failover 路径的全量剥离(跨供应商 thinking signature 失效)。
_ZHIPU_UNSUPPORTED_PARAMS: frozenset[str] = frozenset(
{"thinking", "extended_thinking", "reasoning_effort"}
)
Expand Down
127 changes: 99 additions & 28 deletions src/coding/proxy/routing/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,50 +129,116 @@ def _build_semantic_rejection_diagnostic(body: dict[str, Any]) -> str:

在 semantic rejection 日志中附加请求体的可疑参数快照,
用于定位供应商参数校验失败的具体祸根参数。

覆盖范围:
* 模型 / messages 数(baseline)
* thinking 系列顶层参数 + history thinking_blocks 数
* system 形态(string / blocks,含 cache_control 计数)
* tools 数量 + tool_choice 形态
* 采样参数(max_tokens / temperature / top_p / top_k / stop_sequences)
* stream / metadata 形态
* cache_control 存在性
* messages.content 类型分布
* 请求体大小估算(json.dumps 字节数)
"""
parts: list[str] = []
# 顶层不兼容参数

# ── 模型 + 消息数(baseline,始终输出)──
parts.append(f"model={body.get('model', 'N/A')}")
parts.append(f"messages={len(body.get('messages', []))}")

# ── 顶层 thinking 系列参数 ──
for key in ("thinking", "extended_thinking", "reasoning_effort"):
if key in body:
val = body[key]
parts.append(f"{key}={val!r:.80}")
# 会话历史中的 thinking blocks

# ── system 形态 ──
system = body.get("system")
if isinstance(system, str):
parts.append(f"system_kind=string(len={len(system)})")
elif isinstance(system, list):
cc_count = sum(
1 for item in system if isinstance(item, dict) and "cache_control" in item
)
if cc_count:
parts.append(f"system_blocks={len(system)},cc={cc_count}")
else:
parts.append(f"system_blocks={len(system)}")

# ── tools 与 tool_choice ──
tools = body.get("tools")
if isinstance(tools, list):
parts.append(f"tools={len(tools)}")
tool_choice = body.get("tool_choice")
if tool_choice is not None:
parts.append(f"tool_choice={tool_choice!r:.60}")

# ── 采样参数(仅存在时输出)──
for key in ("max_tokens", "temperature", "top_p", "top_k"):
if key in body:
parts.append(f"{key}={body[key]!r:.40}")
stop_sequences = body.get("stop_sequences")
if isinstance(stop_sequences, list) and stop_sequences:
parts.append(f"stop_sequences={len(stop_sequences)}")

# ── stream / metadata ──
if "stream" in body:
parts.append(f"stream={body['stream']}")
metadata = body.get("metadata")
if isinstance(metadata, dict) and metadata:
parts.append(f"metadata_keys={len(metadata)}")

# ── 会话历史中的 thinking blocks 与 content_types 分布 ──
thinking_count = 0
content_type_counts: dict[str, int] = {}
for msg in body.get("messages", []):
content = msg.get("content")
if isinstance(content, str):
content_type_counts["string"] = content_type_counts.get("string", 0) + 1
continue
if not isinstance(content, list):
continue
for block in content:
if isinstance(block, dict) and block.get("type") in (
"thinking",
"redacted_thinking",
):
if not isinstance(block, dict):
continue
btype = block.get("type")
if isinstance(btype, str):
content_type_counts[btype] = content_type_counts.get(btype, 0) + 1
if btype in ("thinking", "redacted_thinking"):
thinking_count += 1
if thinking_count:
parts.append(f"thinking_blocks_in_history={thinking_count}")
# cache_control 存在检测
if content_type_counts:
type_repr = ",".join(f"{k}:{v}" for k, v in sorted(content_type_counts.items()))
parts.append(f"content_types={{{type_repr}}}")

# ── cache_control 存在检测(messages / tools,不含 system 因已单独统计)──
has_cc = False
for section in (
body.get("system", []) if isinstance(body.get("system"), list) else [],
*(
m.get("content", [])
for m in body.get("messages", [])
if isinstance(m.get("content"), list)
),
body.get("tools", []),
):
if isinstance(section, list):
for item in section:
if isinstance(item, dict) and "cache_control" in item:
has_cc = True
break
sections: list[Any] = []
for m in body.get("messages", []):
if isinstance(m.get("content"), list):
sections.append(m["content"])
if isinstance(body.get("tools"), list):
sections.append(body["tools"])
for section in sections:
for item in section:
if isinstance(item, dict) and "cache_control" in item:
has_cc = True
break
if has_cc:
break
if has_cc:
parts.append("cache_control_fields=present")
# 模型 + 消息数
parts.append(f"model={body.get('model', 'N/A')}")
parts.append(f"messages={len(body.get('messages', []))}")

# ── 请求体大小估算 ──
try:
body_bytes = len(json.dumps(body, ensure_ascii=False).encode("utf-8"))
parts.append(f"body_bytes={body_bytes}")
except (TypeError, ValueError):
# 极少数情况下 body 含非可序列化对象,跳过
pass

return f" [{', '.join(parts)}]" if parts else ""


Expand Down Expand Up @@ -860,12 +926,15 @@ async def execute_message(

if not is_last and is_semantic:
diagnostic = _build_semantic_rejection_diagnostic(body)
# zhipu 等供应商的错误体含字段级诊断(如 [1210] 错误码 + request_id),
# 500 字符足以覆盖完整错误体,避免截断丢失关键细节
err_msg = (resp.error_message or "N/A")[:500]
logger.warning(
"Tier %s semantic rejection (type=%s, msg=%s)%s, "
"trying next tier without recording failure",
tier.name,
resp.error_type or resp.status_code,
(resp.error_message or "N/A")[:200],
err_msg,
diagnostic,
)
failed_tier_name = tier.name
Expand Down Expand Up @@ -1100,14 +1169,16 @@ async def _handle_http_error(
if semantic_rejection and not is_last:
if request_body is not None:
diagnostic = _build_semantic_rejection_diagnostic(request_body)
stream_err_msg = (
error.get("message") if isinstance(error, dict) else "N/A"
)
# 扩展至 500 字符以保留完整字段级诊断信息
logger.warning(
"Tier %s stream semantic rejection (type=%s, msg=%s)%s, "
"trying next tier without recording failure",
tier.name,
error.get("type") if isinstance(error, dict) else None,
(error.get("message") if isinstance(error, dict) else "N/A")[
:200
],
stream_err_msg[:500],
diagnostic,
)
return True, tier.name, exc
Expand Down
94 changes: 88 additions & 6 deletions src/coding/proxy/vendors/zhipu.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
"""智谱 GLM 供应商 — 原生 Anthropic 兼容端点薄透传代理.
"""智谱 GLM 供应商 — 原生 Anthropic 兼容端点代理(兼容转换 + 429 重试).

官方端点 (https://open.bigmodel.cn/api/anthropic) 已完整支持
Anthropic Messages API 协议,本模块仅做两项最小适配
官方端点 (https://open.bigmodel.cn/api/anthropic) 支持大部分
Anthropic Messages API 协议,本模块做以下适配
1. 模型名映射(Claude -> GLM)
2. 认证头替换(x-api-key)
3. 首选 tier 参数兼容转换(_prepare_request)

注意:实测验证 GLM 的 Anthropic 兼容端点对以下参数的处理方式:
- thinking 参数:原生支持(GLM 有自己的 thinking 机制)
实测验证 GLM 对 Anthropic 扩展参数的处理方式:
- thinking.type="enabled":原生支持(GLM 有自己的 thinking 机制)
- thinking.type="adaptive":不支持,触发 [1210] 参数错误 → 转换为 enabled + budget
- cache_control 字段:静默忽略(GLM 使用隐式自动缓存)
- reasoning_effort 参数:静默忽略
以上参数均不会导致 400 错误,因此不需要在 _prepare_request 中剥离。
- metadata 字段:暂不处理(待进一步诊断确认兼容性)

额外提供 429 Rate Limit 专用重试挽回机制:
- max_attempt = 5(1 初始 + 4 重试)
Expand All @@ -20,6 +22,7 @@
from __future__ import annotations

import asyncio
import json
import logging
from collections.abc import AsyncIterator
from typing import Any
Expand Down Expand Up @@ -76,6 +79,49 @@ def __init__(
else None
)

# ── 首选 tier 参数兼容转换 ────────────────────────────────

# adaptive thinking → enabled 的默认预算(Anthropic 推荐的 adaptive 等价值)
_ADAPTIVE_THINKING_BUDGET = 16000

async def _prepare_request(
self,
request_body: dict[str, Any],
headers: dict[str, Any],
) -> tuple[dict[str, Any], dict[str, str]]:
"""深拷贝 + 模型映射 + 认证头替换 + GLM 兼容转换.

当 zhipu 作为首选 tier 时(source_vendor=None),请求体来自原始客户端,
不经过跨供应商转换通道。此处对已知的 GLM 不兼容参数做兼容转换(而非移除),
保留完整的 CC (Claude Code) 功能特性。
"""
body, new_headers = await super()._prepare_request(request_body, headers)

adaptations: list[str] = []

# thinking.type="adaptive" 是 Anthropic Claude 4.x 新增的类型,
# GLM 不支持此类型值,会触发 [1210] 参数错误。
# 转换为 enabled + budget 保留 thinking 能力。
thinking = body.get("thinking")
if isinstance(thinking, dict) and thinking.get("type") == "adaptive":
body["thinking"] = {
"type": "enabled",
"budget_tokens": self._ADAPTIVE_THINKING_BUDGET,
}
adaptations.append(
f"converted_thinking_adaptive→enabled"
f"(budget={self._ADAPTIVE_THINKING_BUDGET})"
)

if adaptations:
logger.debug(
"ZhipuVendor first-tier compat: %s%s",
", ".join(adaptations),
_build_zhipu_request_snapshot(body),
)

return body, new_headers

# ── 非流式:429 重试 ────────────────────────────────────

async def send_message(
Expand Down Expand Up @@ -239,3 +285,39 @@ def _compute_retry_delay_from_response(

# 向后兼容别名
ZhipuBackend = ZhipuVendor


def _build_zhipu_request_snapshot(body: dict[str, Any]) -> str:
"""构建发往 zhipu 请求的轻量参数快照,用于诊断日志.

输出格式与 executor._build_semantic_rejection_diagnostic 一致,
使成功请求和失败请求的日志可直接 diff 对比,定位差异维度。

仅在转换发生时输出(DEBUG 级别),避免常态化日志噪声。
"""
parts: list[str] = []
parts.append(f"messages={len(body.get('messages', []))}")

thinking = body.get("thinking")
if isinstance(thinking, dict):
parts.append(f"thinking_type={thinking.get('type', 'unknown')}")

metadata = body.get("metadata")
if isinstance(metadata, dict) and metadata:
parts.append(f"metadata_keys={len(metadata)}")

tools = body.get("tools")
if isinstance(tools, list):
parts.append(f"tools={len(tools)}")

system = body.get("system")
if isinstance(system, list):
parts.append(f"system_blocks={len(system)}")

try:
body_bytes = len(json.dumps(body, ensure_ascii=False).encode("utf-8"))
parts.append(f"body_bytes={body_bytes}")
except (TypeError, ValueError):
pass

return f" [{', '.join(parts)}]" if parts else ""
27 changes: 25 additions & 2 deletions tests/test_vendors.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ async def test_zhipu_prepare_request_preserves_metadata():

@pytest.mark.asyncio
async def test_zhipu_prepare_request_preserves_thinking():
"""ZhipuVendor._prepare_request 应原样保留 thinking 字段(GLM 原生支持)."""
"""ZhipuVendor._prepare_request 应原样保留 thinking.type=enabled(GLM 原生支持)."""
mapper = ModelMapper([])
zhipu_vendor = ZhipuVendor(ZhipuConfig(api_key="sk-test"), mapper)
body = {
Expand All @@ -405,12 +405,35 @@ async def test_zhipu_prepare_request_preserves_thinking():
"thinking": {"type": "enabled", "budget_tokens": 10000},
}
prepared_body, _ = await zhipu_vendor._prepare_request(body, {})
# thinking 原样透传(GLM 原生支持 thinking
# thinking.type=enabled 原样透传(GLM 原生支持)
assert prepared_body["thinking"] == {"type": "enabled", "budget_tokens": 10000}
# 原始 body 不应被修改
assert body["thinking"]["budget_tokens"] == 10000


@pytest.mark.asyncio
async def test_zhipu_prepare_request_converts_thinking_adaptive():
"""ZhipuVendor._prepare_request 应将 thinking.type=adaptive 转换为 enabled+budget.

GLM 不支持 adaptive 类型,转换为已确认安全的 enabled + budget_tokens 格式,
保留 thinking 能力不被阉割。
"""
mapper = ModelMapper([])
zhipu_vendor = ZhipuVendor(ZhipuConfig(api_key="sk-test"), mapper)
body = {
"model": "claude-opus-4-7",
"messages": [],
"thinking": {"type": "adaptive"},
}
prepared_body, _ = await zhipu_vendor._prepare_request(body, {})

# adaptive 应被转换为 enabled + budget
assert prepared_body["thinking"]["type"] == "enabled"
assert prepared_body["thinking"]["budget_tokens"] == 16000
# 原始 body 不应被修改
assert body["thinking"] == {"type": "adaptive"}


@pytest.mark.asyncio
async def test_zhipu_prepare_request_preserves_anthropic_beta_header():
zhipu_vendor = ZhipuVendor(ZhipuConfig(api_key="sk-test"), ModelMapper([]))
Expand Down
Loading
Loading