diff --git a/src/coding/proxy/server/dashboard.py b/src/coding/proxy/server/dashboard.py index c81b72c..75dd812 100644 --- a/src/coding/proxy/server/dashboard.py +++ b/src/coding/proxy/server/dashboard.py @@ -640,6 +640,43 @@ def _build_favicon() -> bytes: padding: 1px 6px; border-radius: 3px; } + .mc-limit-editable { + cursor: pointer; + border-bottom: 1px dashed rgba(74,222,128,.4); + transition: border-color .2s, color .2s; + } + .mc-limit-editable:hover { + border-bottom-color: #4ade80; + color: #4ade80; + } + .mc-limit-input { + width: 36px; + background: var(--bg-primary); + border: 1px solid var(--accent-blue); + border-radius: 3px; + color: var(--text-primary); + font-size: 10px; + font-family: 'JetBrains Mono', monospace; + text-align: center; + padding: 0 2px; + outline: none; + -moz-appearance: textfield; + } + .mc-limit-input::-webkit-outer-spin-button, + .mc-limit-input::-webkit-inner-spin-button { + -webkit-appearance: none; + margin: 0; + } + .mc-limit-flash-ok { animation: mc-flash-ok .6s ease; } + .mc-limit-flash-err { animation: mc-flash-err .6s ease; } + @keyframes mc-flash-ok { + 0%,100% { color: inherit; } + 40% { color: #4ade80; } + } + @keyframes mc-flash-err { + 0%,100% { color: inherit; } + 40% { color: #f87171; } + } @@ -1268,7 +1305,8 @@ def _build_favicon() -> bytes: + '' + escapeHtml(m.vendor + '/' + m.model) + '' + '
' + '
' - + '' + m.in_use + '/' + m.limit + '' + + '' + m.in_use + + '/' + m.limit + '' + (m.pending > 0 ? '⏳ ' + m.pending + '' : '') + '
' + ''; @@ -1293,6 +1331,79 @@ def _build_favicon() -> bytes: if (_mcTimer) { clearInterval(_mcTimer); _mcTimer = null; } } +// ── 并行度运行时编辑 ────────────────────────────────────── +var _mcEditing = false; +document.addEventListener('click', function(e) { + if (_mcEditing) return; + var el = e.target.closest('.mc-limit-editable'); + if (!el) return; + e.preventDefault(); + _mcEditing = true; + var oldVal = el.getAttribute('data-limit'); + var tier = el.getAttribute('data-tier'); + var model = el.getAttribute('data-model'); + var input = document.createElement('input'); + input.type = 'number'; + input.className = 'mc-limit-input'; + input.min = '1'; + input.max = '20'; + input.value = oldVal; + el.style.display = 'none'; + el.parentNode.insertBefore(input, el.nextSibling); + input.focus(); + input.select(); + + var _cancelled = false; + + function restore() { + _mcEditing = false; + if (input.parentNode) input.parentNode.removeChild(input); + el.style.display = ''; + } + + function flash(cls) { + el.classList.add(cls); + setTimeout(function() { el.classList.remove(cls); }, 600); + } + + input.addEventListener('keydown', function(ev) { + if (ev.key === 'Escape') { _cancelled = true; restore(); return; } + if (ev.key !== 'Enter') return; + ev.preventDefault(); + submit(); + }); + + input.addEventListener('blur', function() { + setTimeout(function() { if (!_cancelled) submit(); }, 50); + }); + + function submit() { + if (_cancelled) return; + var v = parseInt(input.value, 10); + if (isNaN(v) || v < 1 || v > 20) { restore(); flash('mc-limit-flash-err'); return; } + if (String(v) === oldVal) { restore(); return; } + fetch('/api/concurrency', { + method: 'PUT', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({tier: tier, model: model, limit: v}) + }).then(function(res) { + if (res.ok) { + return res.json().then(function() { + el.textContent = v; + el.setAttribute('data-limit', v); + flash('mc-limit-flash-ok'); + }); + } else { + flash('mc-limit-flash-err'); + } + }).catch(function() { + flash('mc-limit-flash-err'); + }).finally(function() { + restore(); + }); + } +}); + // ── 按 tiers 顺序排序 vendor 列表 ───────────────────────── function sortByTierOrder(vendors, tierOrder) { if (!tierOrder || !tierOrder.length) return vendors.sort(); diff --git a/src/coding/proxy/server/routes.py b/src/coding/proxy/server/routes.py index 95c6554..7c13d2f 100644 --- a/src/coding/proxy/server/routes.py +++ b/src/coding/proxy/server/routes.py @@ -225,6 +225,61 @@ async def status() -> dict: return result +def register_concurrency_route(app: Any, router: Any) -> None: + """注册运行时并发限制调整路由.""" + + @app.put("/api/concurrency") + async def update_concurrency(request: Request) -> Response: + try: + body = await request.json() + except Exception: + return json_error_response( + 400, error_type="invalid_request_error", message="body must be JSON" + ) + tier_name = body.get("tier") + model = body.get("model") + limit = body.get("limit") + if not tier_name or not model or limit is None: + return json_error_response( + 400, + error_type="invalid_request_error", + message="requires tier, model, limit", + ) + if not isinstance(limit, int) or limit < 1 or limit > 20: + return json_error_response( + 400, + error_type="invalid_request_error", + message="limit must be an integer between 1 and 20", + ) + for tier in router.tiers: + if tier.name == tier_name: + vendor = tier.vendor + update_fn = getattr(vendor, "update_concurrency", None) + if update_fn is None: + return json_error_response( + 400, + error_type="invalid_request_error", + message=f"vendor '{tier_name}' does not support concurrency", + ) + try: + update_fn(model, limit) + except (ValueError, AttributeError) as exc: + return json_error_response( + 400, error_type="invalid_request_error", message=str(exc) + ) + return Response( + content=json.dumps( + {"ok": True, "tier": tier_name, "model": model, "limit": limit}, + ensure_ascii=False, + ).encode(), + status_code=200, + media_type="application/json", + ) + return json_error_response( + 404, error_type="not_found", message=f"tier '{tier_name}' not found" + ) + + def register_copilot_routes(app: Any, router: Any) -> None: """注册 Copilot 诊断与模型探测路由.""" from .factory import _find_copilot_vendor @@ -457,6 +512,7 @@ def register_all_routes( register_core_routes(app, router) register_health_routes(app) register_status_route(app, router) + register_concurrency_route(app, router) register_copilot_routes(app, router) register_admin_routes(app, router) register_session_vendor_routes(app, router) diff --git a/src/coding/proxy/vendors/concurrency.py b/src/coding/proxy/vendors/concurrency.py index 148bb53..7944bdd 100644 --- a/src/coding/proxy/vendors/concurrency.py +++ b/src/coding/proxy/vendors/concurrency.py @@ -1,13 +1,14 @@ -"""每模型并发限制器 — 基于 asyncio.Semaphore 的公平排队. +"""每模型并发限制器 — 支持运行时动态调整的公平排队. -为每个映射后的模型(如 ``glm-5v-turbo``)独立维护一个 ``asyncio.Semaphore``, +为每个映射后的模型(如 ``glm-5v-turbo``)独立维护一个 ``_ConcurrencySlot`, 确保同一时间点该模型的并行请求数不超过配置的上限。当所有槽位被占满时, 新请求按 FIFO 顺序排队等待,直到有槽位释放。 设计要点: - - **惰性创建**:仅在首次请求到达时才为该模型创建 Semaphore,避免冷启动开销 - - **FIFO 公平**:``asyncio.Semaphore`` 内部使用 FIFO 队列,天然满足排队语义 - - **按映射后模型名键控**:与上游真实承载能力对齐,而非按客户端请求名(如 ``claude-sonnet-*``) + - **惰性创建**:仅在首次请求到达时才为该模型创建 Slot,避免冷启动开销 + - **FIFO 公平**:``asyncio.Event`` + while 循环天然满足 FIFO 排队语义 + - **动态调整**:支持运行时修改 per-model limit,无需重启进程 + - **按映射后模型名键控**:与上游真实承载能力对齐,而非按客户端请求名 """ from __future__ import annotations @@ -20,62 +21,140 @@ logger = logging.getLogger(__name__) +class _ConcurrencySlot: + """支持动态 limit 的并发槽位. + + 使用 ``asyncio.Event`` 作为等待/通知原语,在 ``acquire`` 中 await 等待, + 在 ``release`` / ``set_limit`` 中唤醒。``set_limit`` 修改上限后立即唤醒 + 所有等待者,由它们重新判断是否可获得槽位。 + """ + + def __init__(self, limit: int) -> None: + self._limit = limit + self._in_use: int = 0 + self._pending: int = 0 + self._wake = asyncio.Event() + self._wake.set() + + async def acquire(self) -> _ConcurrencySlot: + """获取一个并发槽位,必要时阻塞排队. + + 返回 ``self``,调用方在请求完成后调用 ``release()``。 + """ + # Fast path + if self._in_use < self._limit: + self._in_use += 1 + return self + # Slow path — 等待槽位释放 + self._pending += 1 + try: + while True: + self._wake.clear() + await self._wake.wait() + if self._in_use < self._limit: + self._in_use += 1 + return self + finally: + self._pending -= 1 + + def release(self) -> None: + """释放一个并发槽位.""" + self._in_use = max(0, self._in_use - 1) + self._wake.set() + + def set_limit(self, new_limit: int) -> None: + """动态调整并发上限. + + 增大 limit 时立即唤醒等待者;缩小时已持有的槽位不受影响, + 新 limit 在后续 acquire 中自然生效。 + """ + self._limit = new_limit + self._wake.set() + + @property + def limit(self) -> int: + return self._limit + + @property + def in_use(self) -> int: + return self._in_use + + @property + def available(self) -> int: + return max(0, self._limit - self._in_use) + + @property + def pending(self) -> int: + return self._pending + + class ModelConcurrencyLimiter: """按模型名提供独立并发槽位的限制器. 用法:: limiter = ModelConcurrencyLimiter(config) - sem = await limiter.acquire("glm-5v-turbo") + slot = await limiter.acquire("glm-5v-turbo") try: ... # 执行请求 finally: - sem.release() + slot.release() """ def __init__(self, config: ZhipuConcurrencyConfig) -> None: self._config = config - self._semaphores: dict[str, asyncio.Semaphore] = {} + self._slots: dict[str, _ConcurrencySlot] = {} - def _get_semaphore(self, model: str) -> asyncio.Semaphore: - """获取(或惰性创建)指定模型的信号量.""" - sem = self._semaphores.get(model) - if sem is None: + def _get_or_create_slot(self, model: str) -> _ConcurrencySlot: + """获取(或惰性创建)指定模型的并发槽位.""" + slot = self._slots.get(model) + if slot is None: limit = self._config.get_limit(model) - sem = asyncio.Semaphore(limit) - self._semaphores[model] = sem + slot = _ConcurrencySlot(limit) + self._slots[model] = slot logger.debug( - "ModelConcurrencyLimiter: created semaphore model=%s limit=%d", + "ModelConcurrencyLimiter: created slot model=%s limit=%d", model, limit, ) - return sem + return slot - async def acquire(self, model: str) -> asyncio.Semaphore: + async def acquire(self, model: str) -> _ConcurrencySlot: """获取指定模型的并发槽位,必要时阻塞排队. - 返回已获取的 Semaphore 实例,调用方负责在请求完成后调用 ``release()``。 + 返回已获取的 Slot 实例,调用方负责在请求完成后调用 ``release()``。 """ - sem = self._get_semaphore(model) - await sem.acquire() - return sem + slot = self._get_or_create_slot(model) + await slot.acquire() + return slot + + def set_limit(self, model: str, new_limit: int) -> None: + """运行时修改指定模型的并发上限. + + 同时更新 config.models 以确保后续惰性创建使用新值。 + """ + slot = self._slots.get(model) + if slot is None: + slot = _ConcurrencySlot(new_limit) + self._slots[model] = slot + else: + slot.set_limit(new_limit) + self._config.models[model] = new_limit + logger.info( + "ModelConcurrencyLimiter: updated limit model=%s new_limit=%d", + model, + new_limit, + ) def get_diagnostics(self) -> dict[str, dict[str, int]]: """返回每个模型的并发状态快照(用于可观测性).""" snapshot: dict[str, dict[str, int]] = {} - for model, sem in self._semaphores.items(): - limit = self._config.get_limit(model) - # asyncio.Semaphore 内部 _value 表示剩余可用槽位 - available = sem._value # noqa: SLF001 — 公开 API 未暴露 - in_use = max(limit - available, 0) - # _waiters 为正在排队等待的协程集合,无等待者时为 None - waiters = getattr(sem, "_waiters", None) # noqa: SLF001 - pending = len(waiters) if waiters else 0 + for model, slot in self._slots.items(): snapshot[model] = { - "limit": limit, - "in_use": in_use, - "available": max(available, 0), - "pending": pending, + "limit": slot.limit, + "in_use": slot.in_use, + "available": slot.available, + "pending": slot.pending, } return snapshot diff --git a/src/coding/proxy/vendors/zhipu.py b/src/coding/proxy/vendors/zhipu.py index cc6ef87..64407ba 100644 --- a/src/coding/proxy/vendors/zhipu.py +++ b/src/coding/proxy/vendors/zhipu.py @@ -261,6 +261,13 @@ def get_diagnostics(self) -> dict[str, Any]: diagnostics["concurrency"] = self._concurrency_limiter.get_diagnostics() return diagnostics + def update_concurrency(self, model: str, limit: int) -> None: + """运行时更新指定模型的并发限制.""" + if self._concurrency_limiter is None: + msg = "Concurrency limiter is not enabled for this vendor" + raise ValueError(msg) + self._concurrency_limiter.set_limit(model, limit) + # ── 延迟计算 ──────────────────────────────────────────── def _compute_retry_delay_from_headers( diff --git a/tests/test_zhipu_concurrency.py b/tests/test_zhipu_concurrency.py index 3c8a97d..7566b24 100644 --- a/tests/test_zhipu_concurrency.py +++ b/tests/test_zhipu_concurrency.py @@ -141,12 +141,12 @@ class TestModelConcurrencyLimiter: @pytest.mark.asyncio async def test_lazy_semaphore_creation(self) -> None: limiter = ModelConcurrencyLimiter(ZhipuConcurrencyConfig(default=2)) - sem_a = limiter._get_semaphore("model-a") - sem_b = limiter._get_semaphore("model-b") - # 不同模型独立 semaphore - assert sem_a is not sem_b - # 相同模型复用 semaphore - assert limiter._get_semaphore("model-a") is sem_a + slot_a = limiter._get_or_create_slot("model-a") + slot_b = limiter._get_or_create_slot("model-b") + # 不同模型独立 slot + assert slot_a is not slot_b + # 相同模型复用 slot + assert limiter._get_or_create_slot("model-a") is slot_a @pytest.mark.asyncio async def test_acquire_blocks_when_full(self) -> None: @@ -184,8 +184,8 @@ async def test_per_model_independent(self) -> None: def test_diagnostics_snapshot(self) -> None: limiter = ModelConcurrencyLimiter(ZhipuConcurrencyConfig(default=3)) - # 触发 semaphore 创建 - limiter._get_semaphore("glm-5.1") + # 触发 slot 创建 + limiter._get_or_create_slot("glm-5.1") snap = limiter.get_diagnostics() assert "glm-5.1" in snap assert snap["glm-5.1"]["limit"] == 3 @@ -459,10 +459,10 @@ async def fake_stream(self, _body, _headers): # noqa: ARG001 chunks.append(chunk) assert len(chunks) == 2 - # 确认 semaphore 当前完全可用 + # 确认 slot 当前完全可用 assert vendor._concurrency_limiter is not None - sem = vendor._concurrency_limiter._get_semaphore("glm-5.1") - assert sem._value == 1 # noqa: SLF001 + slot = vendor._concurrency_limiter._get_or_create_slot("glm-5.1") + assert slot.available == 1 @pytest.mark.asyncio async def test_stream_releases_slot_on_error(self) -> None: