Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

## [Unreleased]

- fix(zhipu): 将 429/529 兜底退避抖动从 Full Jitter 改为 Equal Jitter(`[0, ceiling]` → `[ceiling/2, ceiling]`),修复 529 过载重试延迟非单调(实测 418.8→1857.7→961.6→3769.7ms)问题,重试延迟呈单调非递减指数形态;429 与 529 共用退避路径同步受益,server `retry-after` 优先级不变;
- feat(dashboard): Model Calling 实时监控扩展至全 vendor / 全 model(仅 CC 场景),其他 vendor 在 monitor 模式下仅计数不限流,Zhipu 保留 limited 模式 + FIFO 排队;
- feat(concurrency): 新增 `peak_pending_recent` 最近 10s 排队峰值追踪,瞬时排队释放后前端仍可见"曾排队 N" 余晖徽章;
- perf(dashboard): Model Calling 轮询间隔由 5000ms 缩短至 1500ms,提升瞬时排队可观测性;
Expand Down
43 changes: 43 additions & 0 deletions docs/.agents/issue.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,49 @@

---

## Zhipu 529 过载重试退避非单调(对齐 429 指数退避语义)

**问题描述**

cc 调用 zhipu 返回 529 过载触发重试时,延迟序列呈非单调形态(实测 `418.8ms → 1857.7ms → 961.6ms → 3769.7ms`,第 3 次反而短于第 2 次),不像 429 那样呈现干净的指数退避。

**表因**

`src/coding/proxy/routing/retry.py::calculate_delay` 的兜底抖动为 **Full Jitter**:`delay = random.uniform(0, ceiling)`,每次延迟是 `[0, ceiling]` 区间均匀随机值,本质非单调。实测值逐项精确匹配 `attempt 0/1/2/3` 的 `uniform(0,1000)/(0,2000)/(0,4000)/(0,8000)`。

**根因**

关键认知修正:429 与 529 在代码层面**早已共用同一退避路径** `ZhipuVendor._compute_retry_delay_from_headers`(`vendors/zhipu.py:230-247`),并非"两套规则"。感知差异来自两个因素叠加:

1. **服务端响应头不对称**:429(限流)响应通常携带 `Retry-After` 头 → 走确定性 server-guided 路径(`retry_after * 1.1`),看起来"干净递增";529(过载)响应通常**不携带**该头 → 落入 Full Jitter 兜底分支。
2. **Full Jitter 本身非单调**:即使 429 也一样,只是 429 多数情况有 `Retry-After` 遮蔽了该问题。

故真正修复对象是共享的抖动策略,而非给 529 单独加逻辑。

**处理方式**

将 `calculate_delay` 从 Full Jitter 改为 **Equal Jitter**(AWS M. Brooker, "Exponential Backoff And Jitter," 2015):

```
temp = min(initial * backoff^attempt, max)
delay = temp/2 + random.uniform(0, temp/2) # [temp/2, temp]
```

Zhipu 配置下区间变为 `[500,1000] → [1000,2000] → [2000,4000] → [4000,8000]`,相邻区间仅边界相切,延迟几乎必然单调非递减;保留抖动以防惊群;429/529 同步受益;`retry-after` 优先级链不动。同步更新 `retry.py` / `zhipu.py` 共 4 处 "Full Jitter" docstring,新增 `tests/test_retry.py` 独立单元测试与 `test_zhipu.py::test_529_equal_jitter_delay_in_expected_band`。

**后续防范**

- **单调性是配置相关的弱保证**:依赖 `backoff_multiplier >= 2.0` 且未触及 `max_delay_ms` 封顶。封顶后(`initial*2^attempt >= max`)各 attempt 区间退化为同一 `[max/2, max]`,单调性丧失。当前 Zhipu `max_retries=4` 触及不到,但未来调参须注意。函数 docstring 已写明此契约边界。
- **诚实权衡(Brooker 反方观点)**:Brooker 2015 分布式吞吐基准显示 Full Jitter 优于 Equal Jitter。但本场景是单代理(CC)低并发过载恢复,诉求是可解释性(日志递增可预测)而非分布式吞吐最优,CC 并发量级远未达"海量",差异在噪声内。若未来观测到并发过载下的惊群回归,应优先要求 server 提供 `retry-after` 或评估 Decorrelated Jitter,而非回退 Full Jitter。

**同类问题影响与处理注意事项**

- **双 `RetryConfig` 死代码(重要遗留债)**:仓库存在两个 `RetryConfig`——`routing/retry.py:25`(活跃 dataclass,仅 Zhipu 用)与 `config/resiliency.py:15`(Pydantic 死代码,`config/routing.py:285` 注册为 `retry` 字段但 `VendorTier.retry_config` 从未赋值,全仓库 `grep "retry_config="` 为空)。`docs/arch/routing.md:22` 与 `config-reference.md:95` 引用了不同的 `RetryConfig`,构成认知陷阱。本次修复**未触碰**死代码(超出范围),后续应单独清理。这也是本次"为何选方案 A(直改 `calculate_delay`)而非方案 B(加可配置 jitter_strategy 字段)"的决定性理由——方案 B 会扩大 schema 与运行时的裂缝。
- **`calculate_delay` 唯一消费者确证**:经全仓库 grep,仅 `vendors/zhipu.py:247` 调用(`routing/__init__.py` 仅 re-export,`tier.retry_config` 字段未激活)。修改其抖动策略爆炸半径仅限 Zhipu,安全。
- **测试缺口已补**:修复前 `calculate_delay` 无任何独立单元测试(仅通过 zhipu 集成测试间接覆盖,且那些测试要么禁用 jitter、要么走 retry-after 路径),现已新建 `tests/test_retry.py`。

---

## streaming usage parse failed: 'NoneType' object has no attribute 'get'

**问题描述**
Expand Down
17 changes: 13 additions & 4 deletions src/coding/proxy/routing/retry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""传输层重试策略 — 指数退避与 Full Jitter.
"""传输层重试策略 — 指数退避与 Equal Jitter.

与 Circuit Breaker 正交互:
- Retry 处理瞬态网络抖动(秒级恢复)
Expand Down Expand Up @@ -68,15 +68,24 @@ def is_retryable_status(status_code: int) -> bool:


def calculate_delay(attempt: int, cfg: RetryConfig) -> float:
"""计算第 N 次重试的延迟(毫秒),含指数退避和 Full Jitter.
"""计算第 N 次重试的延迟(毫秒),含指数退避和 Equal Jitter.

Equal Jitter 策略: temp = min(initial * backoff^attempt, max);
delay = temp/2 + random(0, temp/2),落在 [temp/2, temp]。
相较 Full Jitter (random(0, temp)),Equal Jitter 保留一半固定基线,
使相邻重试的延迟区间仅边界相切,呈现单调非递减的指数形态,
同时保留抖动以防惊群。

契约边界:单调非递减依赖 ``backoff_multiplier >= 2.0`` 且未触及
``max_delay_ms`` 封顶;封顶后各 attempt 区间退化为同一 [max/2, max]
(当前 Zhipu ``max_retries=4`` 触及不到该边界)。

Full Jitter 策略: delay = random(0, min(initial * backoff^attempt, max))
参考: AWS "Exponential Backoff And Jitter" (Marc Brooker, 2015)
"""
delay = cfg.initial_delay_ms * (cfg.backoff_multiplier**attempt)
delay = min(delay, cfg.max_delay_ms)

if cfg.jitter:
delay = random.uniform(0, delay)
delay = delay / 2 + random.uniform(0, delay / 2)

return delay
4 changes: 2 additions & 2 deletions src/coding/proxy/vendors/zhipu.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
额外提供 429/529 专用重试挽回机制:
- 429 Rate Limit(限流)与 529 Overloaded(并发过载)共用同一退避策略
- max_attempt = 5(1 初始 + 4 重试)
- 指数退避 + Full Jitter(1s → 2s → 4s → 8s)
- 指数退避 + Equal Jitter(区间 0.5–1s → 1–2s → 2–4s → 4–8s)
- 优先尊重 server retry-after header

并发限流由 BaseVendor._concurrency_controller 统一管控
Expand Down Expand Up @@ -238,7 +238,7 @@ def _compute_retry_delay_from_headers(
以"限流退避"语义(429)解析 header:``parse_rate_limit_headers``
仅对 429/403 解析 retry-after,故此处固定传 429,
使 529 也能尊重 server retry-after,与 429 行为一致。
无 server 信号时回退到指数退避 + Full Jitter。
无 server 信号时回退到指数退避 + Equal Jitter。
"""
rl_info = parse_rate_limit_headers(headers, 429, None)
server_delay_s = compute_effective_retry_seconds(rl_info)
Expand Down
98 changes: 98 additions & 0 deletions tests/test_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""routing.retry 模块单元测试 — calculate_delay(指数退避 + Equal Jitter)."""

import random

from coding.proxy.routing.retry import RetryConfig, calculate_delay

# Equal Jitter 蒙特卡洛采样次数(纯计算,开销可忽略)
_SAMPLES = 500


def _cfg(**overrides) -> RetryConfig:
"""构造测试用 RetryConfig(Zhipu 实际配置为默认基线)."""
defaults = dict(
max_retries=4,
initial_delay_ms=1000,
max_delay_ms=30000,
backoff_multiplier=2.0,
jitter=True,
)
defaults.update(overrides)
return RetryConfig(**defaults)


# --- 无抖动:精确指数(回归基线)---


def test_calculate_delay_no_jitter_exact_exponential():
"""jitter=False 时延迟为纯指数 1000 → 2000 → 4000 → 8000 ms."""
cfg = _cfg(jitter=False)
assert calculate_delay(0, cfg) == 1000.0
assert calculate_delay(1, cfg) == 2000.0
assert calculate_delay(2, cfg) == 4000.0
assert calculate_delay(3, cfg) == 8000.0


# --- Equal Jitter:区间边界 ---


def test_calculate_delay_equal_jitter_bounds():
"""Equal Jitter 落在 [temp/2, temp]:[500,1000]/[1000,2000]/[2000,4000]/[4000,8000]."""
cfg = _cfg(jitter=True)
bounds = [(500.0, 1000.0), (1000.0, 2000.0), (2000.0, 4000.0), (4000.0, 8000.0)]
for attempt, (low, high) in enumerate(bounds):
for _ in range(_SAMPLES):
delay = calculate_delay(attempt, cfg)
assert low <= delay <= high, (
f"attempt={attempt} delay={delay} 越界 [{low}, {high}]"
)


def test_calculate_delay_capped_at_max():
"""触及 max_delay_ms 封顶后,区间退化为 [max/2, max] = [15000, 30000]."""
cfg = _cfg(jitter=True)
# attempt=5: 1000 * 2^5 = 32000 > max=30000 → temp=30000
for _ in range(_SAMPLES):
delay = calculate_delay(5, cfg)
assert 15000.0 <= delay <= 30000.0


# --- 单调非递减(multiplier=2.0 下数学保证)---


def test_calculate_delay_monotonic_non_decreasing():
"""相邻重试延迟单调非递减(multiplier>=2.0 且未封顶时成立).

数学保证:delay[i] ∈ [temp_i/2, temp_i],delay[i+1] ∈ [temp_i, 2·temp_i],
故 delay[i] <= temp_i <= delay[i+1] 恒成立(用 <= 而非 <,体现边界相切)。
"""
cfg = _cfg(jitter=True)
for _ in range(_SAMPLES):
delays = [calculate_delay(a, cfg) for a in range(4)]
for i in range(len(delays) - 1):
assert delays[i] <= delays[i + 1], (
f"非单调:delays={delays}(index {i} > {i + 1})"
)


# --- 健壮性:极小 initial 不报错 ---


def test_calculate_delay_tiny_initial_no_error():
"""initial_delay_ms 极小值时不抛异常,仍落在合法区间."""
cfg = _cfg(initial_delay_ms=1, jitter=True)
delay = calculate_delay(0, cfg) # temp=1 → [0.5, 1.0]
assert 0.5 <= delay <= 1.0


# --- 可复现性:固定 random seed ---


def test_calculate_delay_reproducible_with_seed():
"""固定 random seed 后延迟序列可复现(验证 jitter 可观测、可调试)."""
cfg = _cfg(jitter=True)
random.seed(42)
first = [calculate_delay(a, cfg) for a in range(4)]
random.seed(42)
second = [calculate_delay(a, cfg) for a in range(4)]
assert first == second
42 changes: 42 additions & 0 deletions tests/test_zhipu.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,3 +847,45 @@ async def mock_sleep(delay):
assert len(sleep_delays) == 1
# retry-after=2 → 2 * 1.1 = 2.2s(>1s 指数退避首跳,证明用了 server 信号)
assert 2.0 <= sleep_delays[0] <= 2.2

@pytest.mark.asyncio
async def test_529_equal_jitter_delay_in_expected_band(self):
"""非流式 529 无 retry-after 时,Equal Jitter 首跳落在 [0.5, 1.0]s。

回归保障:Full Jitter 时首跳为 uniform(0, 1000ms),可能接近 0ms
(用户报告的 418.8ms 落在 (0, 1000] 全区间,且整体序列非单调)。
Equal Jitter 后区间收窄为 [500, 1000]ms,下界抬升至 500ms,
呈现单调非递减的指数退避形态,429/529 同步受益(共用 calculate_delay)。
"""
vendor = _make_zhipu_vendor()
sleep_delays = []

async def mock_sleep(delay):
sleep_delays.append(delay)

call_count = 0

async def mock_post(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
return _make_529_response() # 无 retry-after
return _make_200_response()

with (
patch.object(vendor, "_get_client") as mock_client,
patch("asyncio.sleep", side_effect=mock_sleep),
):
client = AsyncMock()
client.post = mock_post
mock_client.return_value = client

resp = await vendor.send_message(
{"model": "claude-sonnet-4-20250514", "messages": []},
{},
)

assert resp.status_code == 200
assert len(sleep_delays) == 1
# Equal Jitter: attempt 0 → temp=1000ms → [500, 1000]ms → sleep([0.5, 1.0])
assert 0.5 <= sleep_delays[0] <= 1.0
Loading