Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/arch/design-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ $$

> **参数默认值**:参见 [配置参考 -- RetryConfig](./config-reference.md#elastic-params)

**Vendor 级应用(Zhipu 429/529 内部退避重试)**:[`vendors/zhipu.py`](../../src/coding/proxy/vendors/zhipu.py) 复用 `RetryConfig` / `calculate_delay()`,在 `send_message` / `send_message_stream` 内对 **429 Rate Limit(限流)** 与 **529 Overloaded(并发过载)** 两类服务端瞬态过载信号做就地指数退避重试(共用 `_BACKOFF_RETRY_STATUS = {429, 529}` 作单一事实源,max=5、1s→2s→4s→8s、Full Jitter,优先尊重 server `retry-after`)。耗尽重试后将状态码原样返回,交由上层 `should_trigger_failover`(§3.6 VendorTier)与 [CircuitBreaker](#circuit-breaker) 处理,从而降低 failover 频率。

---

## 3.13 Rate Limit Deadline Tracking(速率限制截止追踪)
Expand Down
76 changes: 42 additions & 34 deletions src/coding/proxy/vendors/zhipu.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""智谱 GLM 供应商 — 原生 Anthropic 兼容端点代理(兼容转换 + 429 重试).
"""智谱 GLM 供应商 — 原生 Anthropic 兼容端点代理(兼容转换 + 429/529 重试).

官方端点 (https://open.bigmodel.cn/api/anthropic) 支持大部分
Anthropic Messages API 协议,本模块做以下适配:
Expand All @@ -13,7 +13,8 @@
- reasoning_effort 参数:静默忽略
- metadata 字段:暂不处理(待进一步诊断确认兼容性)

额外提供 429 Rate Limit 专用重试挽回机制:
额外提供 429/529 专用重试挽回机制:
- 429 Rate Limit(限流)与 529 Overloaded(并发过载)共用同一退避策略
- max_attempt = 5(1 初始 + 4 重试)
- 指数退避 + Full Jitter(1s → 2s → 4s → 8s)
- 优先尊重 server retry-after header
Expand Down Expand Up @@ -56,14 +57,20 @@
jitter=True,
)

# 触发指数退避重试的瞬态状态码(单一事实源)
# 429 Rate Limit(限流);529 Overloaded(并发过载)
# 二者均为服务端瞬态过载信号,适用同一退避挽回策略。
_BACKOFF_RETRY_STATUS = frozenset({429, 529})


class ZhipuVendor(NativeAnthropicVendor):
"""智谱 GLM 原生 Anthropic 兼容端点供应商(薄透传 + 429 重试挽回).
"""智谱 GLM 原生 Anthropic 兼容端点供应商(薄透传 + 429/529 重试挽回).

通过官方 /api/anthropic 端点转发请求,
仅替换模型名和认证头,其余原样透传。

429 Rate Limit 时自动重试(指数退避),降低 failover 频率。
429 Rate Limit(限流)/ 529 Overloaded(并发过载)时自动重试
(指数退避),降低 failover 频率。
并发限流由 BaseVendor._concurrency_controller 统一管控。
"""

Expand Down Expand Up @@ -124,24 +131,25 @@ async def _prepare_request(

return body, new_headers

# ── 非流式:429 重试 ────────────────────────────────────
# ── 非流式:429/529 重试 ────────────────────────────────

async def send_message(
self,
request_body: dict[str, Any],
headers: dict[str, str],
) -> VendorResponse:
"""非流式请求,429 时自动重试."""
"""非流式请求,429/529 时自动重试."""
max_attempts = self._rl_retry.max_attempts

for attempt in range(max_attempts):
resp = await super().send_message(request_body, headers)
if resp.status_code != 429:
if resp.status_code not in _BACKOFF_RETRY_STATUS:
return resp

if attempt == max_attempts - 1:
logger.warning(
"Zhipu 429 rate limit exhausted after %d attempts",
"Zhipu %d overload exhausted after %d attempts",
resp.status_code,
max_attempts,
)
return resp
Expand All @@ -150,7 +158,8 @@ async def send_message(
resp.response_headers, attempt
)
logger.info(
"Zhipu 429 rate limit, retry %d/%d in %.1fms",
"Zhipu %d overload, retry %d/%d in %.1fms",
resp.status_code,
attempt + 1,
max_attempts - 1,
delay,
Expand All @@ -159,42 +168,50 @@ async def send_message(

return resp # pragma: no cover

# ── 流式:429 重试 ──────────────────────────────────────
# ── 流式:429/529 重试 ──────────────────────────────────

async def send_message_stream(
self,
request_body: dict[str, Any],
headers: dict[str, str],
) -> AsyncIterator[bytes]:
"""流式请求,429 时自动重试.
"""流式请求,429/529 时自动重试.

安全性:429 在 BaseVendor.send_message_stream 中于
安全性:429/529 在 BaseVendor.send_message_stream 中于
status code 检查阶段即 raise(在任何 chunk yield 之前),
因此重试不会导致已发出数据不一致。
"""
max_attempts = self._rl_retry.max_attempts

for attempt in range(max_attempts):
try:
# 429 在 status code 检查阶段即 raise(在任何 chunk 之前),
# 429/529 在 status code 检查阶段即 raise(在任何 chunk 之前),
# 因此 __anext__ 安全:要么拿到首个 chunk,要么抛异常。
ait = super().send_message_stream(request_body, headers)
head = await ait.__anext__()
except StopAsyncIteration:
return
except httpx.HTTPStatusError as exc:
if exc.response is None or exc.response.status_code != 429:
if (
exc.response is None
or exc.response.status_code not in _BACKOFF_RETRY_STATUS
):
raise
status_code = exc.response.status_code
if attempt == max_attempts - 1:
logger.warning(
"Zhipu 429 stream rate limit exhausted after %d attempts",
"Zhipu %d stream overload exhausted after %d attempts",
status_code,
max_attempts,
)
raise

delay = self._compute_retry_delay_from_response(exc.response, attempt)
delay = self._compute_retry_delay_from_headers(
exc.response.headers, attempt
)
logger.info(
"Zhipu 429 stream rate limit, retry %d/%d in %.1fms",
"Zhipu %d stream overload, retry %d/%d in %.1fms",
status_code,
attempt + 1,
max_attempts - 1,
delay,
Expand All @@ -215,24 +232,15 @@ def _compute_retry_delay_from_headers(
headers: dict[str, str] | None,
attempt: int,
) -> float:
"""计算重试延迟(毫秒),优先使用 server retry-after."""
rl_info = parse_rate_limit_headers(headers, 429, None)
server_delay_s = compute_effective_retry_seconds(rl_info)
if server_delay_s is not None:
return min(server_delay_s * 1000, self._rl_retry.max_delay_ms)
return calculate_delay(attempt, self._rl_retry)
"""计算重试延迟(毫秒),优先使用 server retry-after.

def _compute_retry_delay_from_response(
self,
response: httpx.Response,
attempt: int,
) -> float:
"""计算重试延迟(毫秒),从 httpx.Response 提取 header."""
rl_info = parse_rate_limit_headers(
response.headers,
response.status_code,
response.text[:500] if response.text else None,
)
非流式与流式 429/529 重试共用此方法(单一延迟逻辑)。
以"限流退避"语义(429)解析 header:``parse_rate_limit_headers``
仅对 429/403 解析 retry-after,故此处固定传 429,
使 529 也能尊重 server retry-after,与 429 行为一致。
无 server 信号时回退到指数退避 + Full Jitter。
"""
rl_info = parse_rate_limit_headers(headers, 429, None)
server_delay_s = compute_effective_retry_seconds(rl_info)
if server_delay_s is not None:
return min(server_delay_s * 1000, self._rl_retry.max_delay_ms)
Expand Down
181 changes: 181 additions & 0 deletions tests/test_zhipu.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,20 @@ def _make_429_response(
)


def _make_529_response(
headers: dict[str, str] | None = None,
) -> httpx.Response:
"""构造 529 HTTP 响应(Overloaded / 并发过载)."""
return httpx.Response(
status_code=529,
content=b'{"error":{"type":"overloaded_error","message":"Overloaded"}}',
headers=headers or {},
request=httpx.Request(
"POST", "https://open.bigmodel.cn/api/anthropic/v1/messages"
),
)


def _make_200_response() -> httpx.Response:
"""构造 200 HTTP 响应."""
body = json.dumps(
Expand Down Expand Up @@ -666,3 +680,170 @@ async def test_missing_api_key_skips_retry(self):
{},
)
assert resp.status_code == 401

# ── 529 Overloaded(并发过载)重试,行为与 429 一致 ──────

@pytest.mark.asyncio
async def test_nonstream_529_retries_and_succeeds(self):
"""非流式 529 两次后 200,重试成功."""
vendor = _make_zhipu_vendor()
call_count = 0

async def mock_post(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count <= 2:
return _make_529_response()
return _make_200_response()

with (
patch.object(vendor, "_get_client") as mock_client,
patch("asyncio.sleep", new_callable=AsyncMock),
):
client = AsyncMock()
client.post = mock_post
mock_client.return_value = client

resp = await vendor.send_message(
{"model": "claude-sonnet-4-20250514", "messages": []},
{},
)

assert resp.status_code == 200
assert call_count == 3

@pytest.mark.asyncio
async def test_nonstream_529_exhausted_retries(self):
"""非流式连续 5 次 529,耗尽重试后返回 529."""
vendor = _make_zhipu_vendor()
call_count = 0

async def mock_post(*args, **kwargs):
nonlocal call_count
call_count += 1
return _make_529_response()

with (
patch.object(vendor, "_get_client") as mock_client,
patch("asyncio.sleep", new_callable=AsyncMock),
):
client = AsyncMock()
client.post = mock_post
mock_client.return_value = client

resp = await vendor.send_message(
{"model": "claude-sonnet-4-20250514", "messages": []},
{},
)

assert resp.status_code == 529
assert call_count == 5

@pytest.mark.asyncio
async def test_stream_529_retries_and_succeeds(self):
"""流式 529 两次后成功."""
call_count = 0

async def fake_stream(self, body, headers):
nonlocal call_count
call_count += 1
if call_count <= 2:
resp = _make_529_response()
raise httpx.HTTPStatusError(
"529",
request=resp.request,
response=resp,
)
yield b'data: {"type":"content_block_start"}\n\n'
yield b'data: {"type":"content_block_delta"}\n\n'

vendor = _make_zhipu_vendor()
chunks = []
with (
patch.object(NativeAnthropicVendor, "send_message_stream", fake_stream),
patch("asyncio.sleep", new_callable=AsyncMock),
):
async for chunk in vendor.send_message_stream(
{"model": "claude-sonnet-4-20250514", "messages": []},
{},
):
chunks.append(chunk)

assert len(chunks) == 2
assert call_count == 3

@pytest.mark.asyncio
async def test_stream_529_exhausted_retries_raises(self):
"""流式连续 529,耗尽重试后 raise."""
call_count = 0

async def fake_stream(self, body, headers):
nonlocal call_count
call_count += 1
resp = _make_529_response()
raise httpx.HTTPStatusError(
"529",
request=resp.request,
response=resp,
)
yield # 使函数成为 async generator(不可达,仅影响类型)

vendor = _make_zhipu_vendor()
with (
patch.object(NativeAnthropicVendor, "send_message_stream", fake_stream),
patch("asyncio.sleep", new_callable=AsyncMock),
pytest.raises(httpx.HTTPStatusError) as exc_info,
):
async for _ in vendor.send_message_stream(
{"model": "claude-sonnet-4-20250514", "messages": []},
{},
):
pass

assert exc_info.value.response.status_code == 529
assert call_count == 5

@pytest.mark.asyncio
async def test_stream_529_respects_retry_after(self):
"""流式 529 响应含 retry-after 时使用 server 建议延迟.

回归保障:修复前流式延迟计算把真实状态码 529 传给
parse_rate_limit_headers(仅对 429/403 解析),导致 529 忽略
retry-after 而回退指数退避(首次最多 1s)。修复后固定按 429
语义解析,529 与 429 一样尊重 server retry-after。
"""
call_count = 0
sleep_delays = []

async def fake_stream(self, body, headers):
nonlocal call_count
call_count += 1
if call_count == 1:
resp = _make_529_response(headers={"retry-after": "2"})
raise httpx.HTTPStatusError(
"529",
request=resp.request,
response=resp,
)
yield b'data: {"type":"content_block_start"}\n\n'

async def mock_sleep(delay):
sleep_delays.append(delay)

vendor = _make_zhipu_vendor()
chunks = []
with (
patch.object(NativeAnthropicVendor, "send_message_stream", fake_stream),
patch("asyncio.sleep", side_effect=mock_sleep),
):
async for chunk in vendor.send_message_stream(
{"model": "claude-sonnet-4-20250514", "messages": []},
{},
):
chunks.append(chunk)

assert len(chunks) == 1
assert call_count == 2
assert len(sleep_delays) == 1
# retry-after=2 → 2 * 1.1 = 2.2s(>1s 指数退避首跳,证明用了 server 信号)
assert 2.0 <= sleep_delays[0] <= 2.2
Loading