Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

## [Unreleased]

- feat(zhipu): 新增每模型并发限制(默认 3,可通过 `vendors[zhipu].concurrency` 配置),基于 `asyncio.Semaphore` 实现 FIFO 公平排队,流式与非流式共用同一槽位,与 429 重试机制兼容。

## [v0.4.0](https://github.com/ThreeFish-AI/coding-proxy/releases/tag/v0.4.0) — 2026-05-01

> [!IMPORTANT]
Expand Down
40 changes: 34 additions & 6 deletions docs/arch/config-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ flowchart TD

## 5. VendorConfig 弹性字段

| 字段 | 类型 | 默认值 | 说明 |
| -------------------- | -------------- | -------------------- | --------------------------- |
| `circuit_breaker` | config \| None | `None` | 熔断器配置(None = 终端层) |
| `retry` | config | `RetryConfig()` | 重试策略配置 |
| `quota_guard` | config | `QuotaGuardConfig()` | 日度配额守卫配置 |
| `weekly_quota_guard` | config | `QuotaGuardConfig()` | 周度配额守卫配置 |
| 字段 | 类型 | 默认值 | 说明 |
| -------------------- | -------------- | -------------------- | ----------------------------------- |
| `circuit_breaker` | config \| None | `None` | 熔断器配置(None = 终端层) |
| `retry` | config | `RetryConfig()` | 重试策略配置 |
| `quota_guard` | config | `QuotaGuardConfig()` | 日度配额守卫配置 |
| `weekly_quota_guard` | config | `QuotaGuardConfig()` | 周度配额守卫配置 |
| `concurrency` | config \| None | `None` | `[zhipu]` 每模型并发限制(详见 5.5) |

<a id="elastic-params"></a>

Expand Down Expand Up @@ -143,6 +144,33 @@ flowchart TD
| `error_types` | list[str] | `["rate_limit_error", "overloaded_error", "api_error"]` |
| `error_message_patterns` | list[str] | `["quota", "limit exceeded", "usage cap", "capacity", "internal network failure"]` |

### 5.5 ZhipuConcurrencyConfig — Zhipu 每模型并发参数

仅对 `vendor: zhipu` 生效,基于 `asyncio.Semaphore` 实现 FIFO 公平排队。

| 字段 | 类型 | 默认值 | 说明 |
| --------- | -------------- | ------ | -------------------------------------------------------------------------------- |
| `default` | int | `3` | 全局默认并行度(适用于所有未在 `models` 中显式覆盖的模型);取值范围 `[1, 20]` |
| `models` | map[str → int] | `{}` | 按映射后模型名(如 `glm-5v-turbo` / `glm-5.1` / `glm-4.5-air`)自定义并行度上限 |

YAML 示例:

```yaml
- vendor: zhipu
concurrency:
default: 3
models:
glm-5v-turbo: 5
glm-5.1: 2
```

行为语义:

- 信号量按**映射后模型名**键控,与上游真实承载模型对齐;流式与非流式请求共用同一槽位。
- 槽位满时新请求按 FIFO 顺序排队,直到任一在途请求释放槽位才被唤醒。
- 429 重试期间持续占用槽位(重试视为同一请求的延续)。
- 顶层 `concurrency` 字段缺省为 `None` → 转发至 `ZhipuConfig` 时回退默认值 `default=3`;如需完全关闭限流,可在 `ZhipuConfig` 构造层显式置 `null`(一般无需操作)。

---

## 6. 供应商专属字段
Expand Down
8 changes: 8 additions & 0 deletions src/coding/proxy/config/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ vendors:
window_hours: 24.0
threshold_percent: 95.0
probe_interval_seconds: 300
# 每模型并发限制:默认 3 个并行请求;超出则按 FIFO 排队等待
# 可通过 models 字段覆盖单个模型的限制(如 glm-5.1: 5)
concurrency:
default: 3
# models:
# glm-5v-turbo: 3
# glm-5.1: 3
# glm-4.5-air: 3

# Vendor 4: MiniMax(默认禁用,需手动启用并添加到 tiers)
- vendor: minimax
Expand Down
13 changes: 10 additions & 3 deletions src/coding/proxy/config/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pydantic import BaseModel, BeforeValidator, Field, PrivateAttr, model_validator

from .resiliency import CircuitBreakerConfig, QuotaGuardConfig, RetryConfig
from .vendors import ZhipuConcurrencyConfig

# ── 价格字段解析($ / ¥ 前缀支持) ──────────────────────────

Expand Down Expand Up @@ -64,13 +65,13 @@ def _detect_currency(v: Any) -> str | None:
"api_key",
}
)
# 向后兼容别名
_ZHIPU_FIELDS = _NATIVE_ANTHROPIC_FIELDS
# Zhipu 独占字段:在通用 api_key 基础上增加每模型并发限制
_ZHIPU_FIELDS: frozenset[str] = _NATIVE_ANTHROPIC_FIELDS | frozenset({"concurrency"})

_VENDOR_EXCLUSIVE_FIELDS: dict[str, frozenset[str]] = {
"copilot": _COPILOT_FIELDS,
"antigravity": _ANTIGRAVITY_FIELDS,
"zhipu": _NATIVE_ANTHROPIC_FIELDS,
"zhipu": _ZHIPU_FIELDS,
"minimax": _NATIVE_ANTHROPIC_FIELDS,
"kimi": _NATIVE_ANTHROPIC_FIELDS,
"doubao": _NATIVE_ANTHROPIC_FIELDS,
Expand Down Expand Up @@ -285,6 +286,12 @@ class VendorConfig(BaseModel):
quota_guard: QuotaGuardConfig = Field(default_factory=QuotaGuardConfig)
weekly_quota_guard: QuotaGuardConfig = Field(default_factory=QuotaGuardConfig)

# ── Zhipu 专属:每模型并发限制 ───────────────────────────
concurrency: ZhipuConcurrencyConfig | None = Field(
default=None,
description="[zhipu] 每模型并发限制;None 表示不限并发",
)

@model_validator(mode="after")
def _warn_irrelevant_fields(self) -> VendorConfig:
"""对非当前 vendor 类型的非空专属字段发出 warning."""
Expand Down
2 changes: 2 additions & 0 deletions src/coding/proxy/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
KimiConfig,
MinimaxConfig,
XiaomiConfig,
ZhipuConcurrencyConfig,
ZhipuConfig,
)

Expand Down Expand Up @@ -318,6 +319,7 @@ def compat_state_path(self) -> Path:
"CopilotConfig",
"AntigravityConfig",
"ZhipuConfig",
"ZhipuConcurrencyConfig",
# resiliency
"CircuitBreakerConfig",
"RetryConfig",
Expand Down
18 changes: 17 additions & 1 deletion src/coding/proxy/config/vendors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,21 @@

from __future__ import annotations

from pydantic import BaseModel
from pydantic import BaseModel, Field


class ZhipuConcurrencyConfig(BaseModel):
"""Zhipu 每模型并发限制配置."""

default: int = Field(default=3, ge=1, le=20, description="全局默认并行度")
models: dict[str, int] = Field(
default_factory=dict,
description="按映射后模型名自定义并行度(覆盖 default)",
)

def get_limit(self, model: str) -> int:
"""获取指定模型的并行度限制."""
return self.models.get(model, self.default)


class AnthropicConfig(BaseModel):
Expand Down Expand Up @@ -48,6 +62,7 @@ class ZhipuConfig(BaseModel):
base_url: str = "https://open.bigmodel.cn/api/anthropic"
api_key: str = ""
timeout_ms: int = 3000000
concurrency: ZhipuConcurrencyConfig = Field(default_factory=ZhipuConcurrencyConfig)


class MinimaxConfig(BaseModel):
Expand Down Expand Up @@ -100,6 +115,7 @@ class AlibabaConfig(BaseModel):
"CopilotConfig",
"AntigravityConfig",
"ZhipuConfig",
"ZhipuConcurrencyConfig",
"MinimaxConfig",
"KimiConfig",
"DoubaoConfig",
Expand Down
16 changes: 10 additions & 6 deletions src/coding/proxy/server/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,17 @@ def _create_vendor_from_config(
cfg = _resolve_antigravity_credentials(cfg, token_store)
return AntigravityVendor(cfg, failover_cfg, mapper)
case "zhipu":
cfg = ZhipuConfig(
enabled=vendor_cfg.enabled,
base_url=vendor_cfg.base_url
zhipu_kwargs: dict[str, Any] = {
"enabled": vendor_cfg.enabled,
"base_url": vendor_cfg.base_url
or "https://open.bigmodel.cn/api/anthropic",
api_key=vendor_cfg.api_key,
timeout_ms=vendor_cfg.timeout_ms,
)
"api_key": vendor_cfg.api_key,
"timeout_ms": vendor_cfg.timeout_ms,
}
# 仅当显式配置了 concurrency 时转发,否则使用 ZhipuConfig 默认值
if vendor_cfg.concurrency is not None:
zhipu_kwargs["concurrency"] = vendor_cfg.concurrency
cfg = ZhipuConfig(**zhipu_kwargs)
return ZhipuVendor(cfg, mapper, failover_cfg)
case "minimax":
cfg = MinimaxConfig(
Expand Down
78 changes: 78 additions & 0 deletions src/coding/proxy/vendors/concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""每模型并发限制器 — 基于 asyncio.Semaphore 的公平排队.

为每个映射后的模型(如 ``glm-5v-turbo``)独立维护一个 ``asyncio.Semaphore``,
确保同一时间点该模型的并行请求数不超过配置的上限。当所有槽位被占满时,
新请求按 FIFO 顺序排队等待,直到有槽位释放。

设计要点:
- **惰性创建**:仅在首次请求到达时才为该模型创建 Semaphore,避免冷启动开销
- **FIFO 公平**:``asyncio.Semaphore`` 内部使用 FIFO 队列,天然满足排队语义
- **按映射后模型名键控**:与上游真实承载能力对齐,而非按客户端请求名(如 ``claude-sonnet-*``)
"""

from __future__ import annotations

import asyncio
import logging

from ..config.vendors import ZhipuConcurrencyConfig

logger = logging.getLogger(__name__)


class ModelConcurrencyLimiter:
"""按模型名提供独立并发槽位的限制器.

用法::

limiter = ModelConcurrencyLimiter(config)
sem = await limiter.acquire("glm-5v-turbo")
try:
... # 执行请求
finally:
sem.release()
"""

def __init__(self, config: ZhipuConcurrencyConfig) -> None:
self._config = config
self._semaphores: dict[str, asyncio.Semaphore] = {}

def _get_semaphore(self, model: str) -> asyncio.Semaphore:
"""获取(或惰性创建)指定模型的信号量."""
sem = self._semaphores.get(model)
if sem is None:
limit = self._config.get_limit(model)
sem = asyncio.Semaphore(limit)
self._semaphores[model] = sem
logger.debug(
"ModelConcurrencyLimiter: created semaphore model=%s limit=%d",
model,
limit,
)
return sem

async def acquire(self, model: str) -> asyncio.Semaphore:
"""获取指定模型的并发槽位,必要时阻塞排队.

返回已获取的 Semaphore 实例,调用方负责在请求完成后调用 ``release()``。
"""
sem = self._get_semaphore(model)
await sem.acquire()
return sem

def get_diagnostics(self) -> dict[str, dict[str, int]]:
"""返回每个模型的并发状态快照(用于可观测性)."""
snapshot: dict[str, dict[str, int]] = {}
for model, sem in self._semaphores.items():
limit = self._config.get_limit(model)
# asyncio.Semaphore 内部 _value 表示剩余可用槽位
available = sem._value # noqa: SLF001 — 公开 API 未暴露
snapshot[model] = {
"limit": limit,
"in_use": max(limit - available, 0),
"available": max(available, 0),
}
return snapshot


__all__ = ["ModelConcurrencyLimiter"]
Loading
Loading