diff --git a/docs/arch/design-patterns.md b/docs/arch/design-patterns.md index 0b8c31b..25b7c20 100644 --- a/docs/arch/design-patterns.md +++ b/docs/arch/design-patterns.md @@ -541,6 +541,8 @@ $$ > **参数默认值**:参见 [配置参考 -- RetryConfig](./config-reference.md#elastic-params) +**Vendor 级应用(Zhipu 429/529 内部退避重试)**:[`vendors/zhipu.py`](../../src/coding/proxy/vendors/zhipu.py) 复用 `RetryConfig` / `calculate_delay()`,在 `send_message` / `send_message_stream` 内对 **429 Rate Limit(限流)** 与 **529 Overloaded(并发过载)** 两类服务端瞬态过载信号做就地指数退避重试(共用 `_BACKOFF_RETRY_STATUS = {429, 529}` 作单一事实源,max=5、1s→2s→4s→8s、Full Jitter,优先尊重 server `retry-after`)。耗尽重试后将状态码原样返回,交由上层 `should_trigger_failover`(§3.6 VendorTier)与 [CircuitBreaker](#circuit-breaker) 处理,从而降低 failover 频率。 + --- ## 3.13 Rate Limit Deadline Tracking(速率限制截止追踪) diff --git a/src/coding/proxy/vendors/zhipu.py b/src/coding/proxy/vendors/zhipu.py index ef36f57..feb8c4b 100644 --- a/src/coding/proxy/vendors/zhipu.py +++ b/src/coding/proxy/vendors/zhipu.py @@ -1,4 +1,4 @@ -"""智谱 GLM 供应商 — 原生 Anthropic 兼容端点代理(兼容转换 + 429 重试). +"""智谱 GLM 供应商 — 原生 Anthropic 兼容端点代理(兼容转换 + 429/529 重试). 官方端点 (https://open.bigmodel.cn/api/anthropic) 支持大部分 Anthropic Messages API 协议,本模块做以下适配: @@ -13,7 +13,8 @@ - reasoning_effort 参数:静默忽略 - metadata 字段:暂不处理(待进一步诊断确认兼容性) -额外提供 429 Rate Limit 专用重试挽回机制: +额外提供 429/529 专用重试挽回机制: + - 429 Rate Limit(限流)与 529 Overloaded(并发过载)共用同一退避策略 - max_attempt = 5(1 初始 + 4 重试) - 指数退避 + Full Jitter(1s → 2s → 4s → 8s) - 优先尊重 server retry-after header @@ -56,14 +57,20 @@ jitter=True, ) +# 触发指数退避重试的瞬态状态码(单一事实源) +# 429 Rate Limit(限流);529 Overloaded(并发过载) +# 二者均为服务端瞬态过载信号,适用同一退避挽回策略。 +_BACKOFF_RETRY_STATUS = frozenset({429, 529}) + class ZhipuVendor(NativeAnthropicVendor): - """智谱 GLM 原生 Anthropic 兼容端点供应商(薄透传 + 429 重试挽回). + """智谱 GLM 原生 Anthropic 兼容端点供应商(薄透传 + 429/529 重试挽回). 通过官方 /api/anthropic 端点转发请求, 仅替换模型名和认证头,其余原样透传。 - 429 Rate Limit 时自动重试(指数退避),降低 failover 频率。 + 429 Rate Limit(限流)/ 529 Overloaded(并发过载)时自动重试 + (指数退避),降低 failover 频率。 并发限流由 BaseVendor._concurrency_controller 统一管控。 """ @@ -124,24 +131,25 @@ async def _prepare_request( return body, new_headers - # ── 非流式:429 重试 ──────────────────────────────────── + # ── 非流式:429/529 重试 ──────────────────────────────── async def send_message( self, request_body: dict[str, Any], headers: dict[str, str], ) -> VendorResponse: - """非流式请求,429 时自动重试.""" + """非流式请求,429/529 时自动重试.""" max_attempts = self._rl_retry.max_attempts for attempt in range(max_attempts): resp = await super().send_message(request_body, headers) - if resp.status_code != 429: + if resp.status_code not in _BACKOFF_RETRY_STATUS: return resp if attempt == max_attempts - 1: logger.warning( - "Zhipu 429 rate limit exhausted after %d attempts", + "Zhipu %d overload exhausted after %d attempts", + resp.status_code, max_attempts, ) return resp @@ -150,7 +158,8 @@ async def send_message( resp.response_headers, attempt ) logger.info( - "Zhipu 429 rate limit, retry %d/%d in %.1fms", + "Zhipu %d overload, retry %d/%d in %.1fms", + resp.status_code, attempt + 1, max_attempts - 1, delay, @@ -159,16 +168,16 @@ async def send_message( return resp # pragma: no cover - # ── 流式:429 重试 ────────────────────────────────────── + # ── 流式:429/529 重试 ────────────────────────────────── async def send_message_stream( self, request_body: dict[str, Any], headers: dict[str, str], ) -> AsyncIterator[bytes]: - """流式请求,429 时自动重试. + """流式请求,429/529 时自动重试. - 安全性:429 在 BaseVendor.send_message_stream 中于 + 安全性:429/529 在 BaseVendor.send_message_stream 中于 status code 检查阶段即 raise(在任何 chunk yield 之前), 因此重试不会导致已发出数据不一致。 """ @@ -176,25 +185,33 @@ async def send_message_stream( for attempt in range(max_attempts): try: - # 429 在 status code 检查阶段即 raise(在任何 chunk 之前), + # 429/529 在 status code 检查阶段即 raise(在任何 chunk 之前), # 因此 __anext__ 安全:要么拿到首个 chunk,要么抛异常。 ait = super().send_message_stream(request_body, headers) head = await ait.__anext__() except StopAsyncIteration: return except httpx.HTTPStatusError as exc: - if exc.response is None or exc.response.status_code != 429: + if ( + exc.response is None + or exc.response.status_code not in _BACKOFF_RETRY_STATUS + ): raise + status_code = exc.response.status_code if attempt == max_attempts - 1: logger.warning( - "Zhipu 429 stream rate limit exhausted after %d attempts", + "Zhipu %d stream overload exhausted after %d attempts", + status_code, max_attempts, ) raise - delay = self._compute_retry_delay_from_response(exc.response, attempt) + delay = self._compute_retry_delay_from_headers( + exc.response.headers, attempt + ) logger.info( - "Zhipu 429 stream rate limit, retry %d/%d in %.1fms", + "Zhipu %d stream overload, retry %d/%d in %.1fms", + status_code, attempt + 1, max_attempts - 1, delay, @@ -215,24 +232,15 @@ def _compute_retry_delay_from_headers( headers: dict[str, str] | None, attempt: int, ) -> float: - """计算重试延迟(毫秒),优先使用 server retry-after.""" - rl_info = parse_rate_limit_headers(headers, 429, None) - server_delay_s = compute_effective_retry_seconds(rl_info) - if server_delay_s is not None: - return min(server_delay_s * 1000, self._rl_retry.max_delay_ms) - return calculate_delay(attempt, self._rl_retry) + """计算重试延迟(毫秒),优先使用 server retry-after. - def _compute_retry_delay_from_response( - self, - response: httpx.Response, - attempt: int, - ) -> float: - """计算重试延迟(毫秒),从 httpx.Response 提取 header.""" - rl_info = parse_rate_limit_headers( - response.headers, - response.status_code, - response.text[:500] if response.text else None, - ) + 非流式与流式 429/529 重试共用此方法(单一延迟逻辑)。 + 以"限流退避"语义(429)解析 header:``parse_rate_limit_headers`` + 仅对 429/403 解析 retry-after,故此处固定传 429, + 使 529 也能尊重 server retry-after,与 429 行为一致。 + 无 server 信号时回退到指数退避 + Full Jitter。 + """ + rl_info = parse_rate_limit_headers(headers, 429, None) server_delay_s = compute_effective_retry_seconds(rl_info) if server_delay_s is not None: return min(server_delay_s * 1000, self._rl_retry.max_delay_ms) diff --git a/tests/test_zhipu.py b/tests/test_zhipu.py index aa05b21..9e4523d 100644 --- a/tests/test_zhipu.py +++ b/tests/test_zhipu.py @@ -356,6 +356,20 @@ def _make_429_response( ) +def _make_529_response( + headers: dict[str, str] | None = None, +) -> httpx.Response: + """构造 529 HTTP 响应(Overloaded / 并发过载).""" + return httpx.Response( + status_code=529, + content=b'{"error":{"type":"overloaded_error","message":"Overloaded"}}', + headers=headers or {}, + request=httpx.Request( + "POST", "https://open.bigmodel.cn/api/anthropic/v1/messages" + ), + ) + + def _make_200_response() -> httpx.Response: """构造 200 HTTP 响应.""" body = json.dumps( @@ -666,3 +680,170 @@ async def test_missing_api_key_skips_retry(self): {}, ) assert resp.status_code == 401 + + # ── 529 Overloaded(并发过载)重试,行为与 429 一致 ────── + + @pytest.mark.asyncio + async def test_nonstream_529_retries_and_succeeds(self): + """非流式 529 两次后 200,重试成功.""" + vendor = _make_zhipu_vendor() + call_count = 0 + + async def mock_post(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count <= 2: + return _make_529_response() + return _make_200_response() + + with ( + patch.object(vendor, "_get_client") as mock_client, + patch("asyncio.sleep", new_callable=AsyncMock), + ): + client = AsyncMock() + client.post = mock_post + mock_client.return_value = client + + resp = await vendor.send_message( + {"model": "claude-sonnet-4-20250514", "messages": []}, + {}, + ) + + assert resp.status_code == 200 + assert call_count == 3 + + @pytest.mark.asyncio + async def test_nonstream_529_exhausted_retries(self): + """非流式连续 5 次 529,耗尽重试后返回 529.""" + vendor = _make_zhipu_vendor() + call_count = 0 + + async def mock_post(*args, **kwargs): + nonlocal call_count + call_count += 1 + return _make_529_response() + + with ( + patch.object(vendor, "_get_client") as mock_client, + patch("asyncio.sleep", new_callable=AsyncMock), + ): + client = AsyncMock() + client.post = mock_post + mock_client.return_value = client + + resp = await vendor.send_message( + {"model": "claude-sonnet-4-20250514", "messages": []}, + {}, + ) + + assert resp.status_code == 529 + assert call_count == 5 + + @pytest.mark.asyncio + async def test_stream_529_retries_and_succeeds(self): + """流式 529 两次后成功.""" + call_count = 0 + + async def fake_stream(self, body, headers): + nonlocal call_count + call_count += 1 + if call_count <= 2: + resp = _make_529_response() + raise httpx.HTTPStatusError( + "529", + request=resp.request, + response=resp, + ) + yield b'data: {"type":"content_block_start"}\n\n' + yield b'data: {"type":"content_block_delta"}\n\n' + + vendor = _make_zhipu_vendor() + chunks = [] + with ( + patch.object(NativeAnthropicVendor, "send_message_stream", fake_stream), + patch("asyncio.sleep", new_callable=AsyncMock), + ): + async for chunk in vendor.send_message_stream( + {"model": "claude-sonnet-4-20250514", "messages": []}, + {}, + ): + chunks.append(chunk) + + assert len(chunks) == 2 + assert call_count == 3 + + @pytest.mark.asyncio + async def test_stream_529_exhausted_retries_raises(self): + """流式连续 529,耗尽重试后 raise.""" + call_count = 0 + + async def fake_stream(self, body, headers): + nonlocal call_count + call_count += 1 + resp = _make_529_response() + raise httpx.HTTPStatusError( + "529", + request=resp.request, + response=resp, + ) + yield # 使函数成为 async generator(不可达,仅影响类型) + + vendor = _make_zhipu_vendor() + with ( + patch.object(NativeAnthropicVendor, "send_message_stream", fake_stream), + patch("asyncio.sleep", new_callable=AsyncMock), + pytest.raises(httpx.HTTPStatusError) as exc_info, + ): + async for _ in vendor.send_message_stream( + {"model": "claude-sonnet-4-20250514", "messages": []}, + {}, + ): + pass + + assert exc_info.value.response.status_code == 529 + assert call_count == 5 + + @pytest.mark.asyncio + async def test_stream_529_respects_retry_after(self): + """流式 529 响应含 retry-after 时使用 server 建议延迟. + + 回归保障:修复前流式延迟计算把真实状态码 529 传给 + parse_rate_limit_headers(仅对 429/403 解析),导致 529 忽略 + retry-after 而回退指数退避(首次最多 1s)。修复后固定按 429 + 语义解析,529 与 429 一样尊重 server retry-after。 + """ + call_count = 0 + sleep_delays = [] + + async def fake_stream(self, body, headers): + nonlocal call_count + call_count += 1 + if call_count == 1: + resp = _make_529_response(headers={"retry-after": "2"}) + raise httpx.HTTPStatusError( + "529", + request=resp.request, + response=resp, + ) + yield b'data: {"type":"content_block_start"}\n\n' + + async def mock_sleep(delay): + sleep_delays.append(delay) + + vendor = _make_zhipu_vendor() + chunks = [] + with ( + patch.object(NativeAnthropicVendor, "send_message_stream", fake_stream), + patch("asyncio.sleep", side_effect=mock_sleep), + ): + async for chunk in vendor.send_message_stream( + {"model": "claude-sonnet-4-20250514", "messages": []}, + {}, + ): + chunks.append(chunk) + + assert len(chunks) == 1 + assert call_count == 2 + assert len(sleep_delays) == 1 + # retry-after=2 → 2 * 1.1 = 2.2s(>1s 指数退避首跳,证明用了 server 信号) + assert 2.0 <= sleep_delays[0] <= 2.2