Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 112 additions & 1 deletion src/coding/proxy/server/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,43 @@ def _build_favicon() -> bytes:
padding: 1px 6px;
border-radius: 3px;
}
.mc-limit-editable {
cursor: pointer;
border-bottom: 1px dashed rgba(74,222,128,.4);
transition: border-color .2s, color .2s;
}
.mc-limit-editable:hover {
border-bottom-color: #4ade80;
color: #4ade80;
}
.mc-limit-input {
width: 36px;
background: var(--bg-primary);
border: 1px solid var(--accent-blue);
border-radius: 3px;
color: var(--text-primary);
font-size: 10px;
font-family: 'JetBrains Mono', monospace;
text-align: center;
padding: 0 2px;
outline: none;
-moz-appearance: textfield;
}
.mc-limit-input::-webkit-outer-spin-button,
.mc-limit-input::-webkit-inner-spin-button {
-webkit-appearance: none;
margin: 0;
}
.mc-limit-flash-ok { animation: mc-flash-ok .6s ease; }
.mc-limit-flash-err { animation: mc-flash-err .6s ease; }
@keyframes mc-flash-ok {
0%,100% { color: inherit; }
40% { color: #4ade80; }
}
@keyframes mc-flash-err {
0%,100% { color: inherit; }
40% { color: #f87171; }
}
</style>
</head>
<body>
Expand Down Expand Up @@ -1268,7 +1305,8 @@ def _build_favicon() -> bytes:
+ '<span class="mc-model-name">' + escapeHtml(m.vendor + '/' + m.model) + '</span>'
+ '<div class="mc-bar-wrap"><div class="mc-bar-fill ' + barClass + '" style="width:' + pct + '%"></div></div>'
+ '<div class="mc-stats">'
+ '<span class="mc-badge mc-badge-active">' + m.in_use + '/' + m.limit + '</span>'
+ '<span class="mc-badge mc-badge-active">' + m.in_use
+ '/<span class="mc-limit-editable" data-tier="' + escapeHtml(m.vendor) + '" data-model="' + escapeHtml(m.model) + '" data-limit="' + m.limit + '" title="点击修改并行度">' + m.limit + '</span></span>'
+ (m.pending > 0 ? '<span class="mc-badge mc-badge-pending">⏳ ' + m.pending + '</span>' : '')
+ '</div>'
+ '</div>';
Expand All @@ -1293,6 +1331,79 @@ def _build_favicon() -> bytes:
if (_mcTimer) { clearInterval(_mcTimer); _mcTimer = null; }
}

// ── 并行度运行时编辑 ──────────────────────────────────────
var _mcEditing = false;
document.addEventListener('click', function(e) {
if (_mcEditing) return;
var el = e.target.closest('.mc-limit-editable');
if (!el) return;
e.preventDefault();
_mcEditing = true;
var oldVal = el.getAttribute('data-limit');
var tier = el.getAttribute('data-tier');
var model = el.getAttribute('data-model');
var input = document.createElement('input');
input.type = 'number';
input.className = 'mc-limit-input';
input.min = '1';
input.max = '20';
input.value = oldVal;
el.style.display = 'none';
el.parentNode.insertBefore(input, el.nextSibling);
input.focus();
input.select();

var _cancelled = false;

function restore() {
_mcEditing = false;
if (input.parentNode) input.parentNode.removeChild(input);
el.style.display = '';
}

function flash(cls) {
el.classList.add(cls);
setTimeout(function() { el.classList.remove(cls); }, 600);
}

input.addEventListener('keydown', function(ev) {
if (ev.key === 'Escape') { _cancelled = true; restore(); return; }
if (ev.key !== 'Enter') return;
ev.preventDefault();
submit();
});

input.addEventListener('blur', function() {
setTimeout(function() { if (!_cancelled) submit(); }, 50);
});

function submit() {
if (_cancelled) return;
var v = parseInt(input.value, 10);
if (isNaN(v) || v < 1 || v > 20) { restore(); flash('mc-limit-flash-err'); return; }
if (String(v) === oldVal) { restore(); return; }
fetch('/api/concurrency', {
method: 'PUT',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({tier: tier, model: model, limit: v})
}).then(function(res) {
if (res.ok) {
return res.json().then(function() {
el.textContent = v;
el.setAttribute('data-limit', v);
flash('mc-limit-flash-ok');
});
} else {
flash('mc-limit-flash-err');
}
}).catch(function() {
flash('mc-limit-flash-err');
}).finally(function() {
restore();
});
}
});

// ── 按 tiers 顺序排序 vendor 列表 ─────────────────────────
function sortByTierOrder(vendors, tierOrder) {
if (!tierOrder || !tierOrder.length) return vendors.sort();
Expand Down
56 changes: 56 additions & 0 deletions src/coding/proxy/server/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,61 @@ async def status() -> dict:
return result


def register_concurrency_route(app: Any, router: Any) -> None:
"""注册运行时并发限制调整路由."""

@app.put("/api/concurrency")
async def update_concurrency(request: Request) -> Response:
try:
body = await request.json()
except Exception:
return json_error_response(
400, error_type="invalid_request_error", message="body must be JSON"
)
tier_name = body.get("tier")
model = body.get("model")
limit = body.get("limit")
if not tier_name or not model or limit is None:
return json_error_response(
400,
error_type="invalid_request_error",
message="requires tier, model, limit",
)
if not isinstance(limit, int) or limit < 1 or limit > 20:
return json_error_response(
400,
error_type="invalid_request_error",
message="limit must be an integer between 1 and 20",
)
for tier in router.tiers:
if tier.name == tier_name:
vendor = tier.vendor
update_fn = getattr(vendor, "update_concurrency", None)
if update_fn is None:
return json_error_response(
400,
error_type="invalid_request_error",
message=f"vendor '{tier_name}' does not support concurrency",
)
try:
update_fn(model, limit)
except (ValueError, AttributeError) as exc:
return json_error_response(
400, error_type="invalid_request_error", message=str(exc)
)
return Response(
content=json.dumps(
{"ok": True, "tier": tier_name, "model": model, "limit": limit},
ensure_ascii=False,
).encode(),
status_code=200,
media_type="application/json",
)
return json_error_response(
404, error_type="not_found", message=f"tier '{tier_name}' not found"
)


def register_copilot_routes(app: Any, router: Any) -> None:
"""注册 Copilot 诊断与模型探测路由."""
from .factory import _find_copilot_vendor
Expand Down Expand Up @@ -457,6 +512,7 @@ def register_all_routes(
register_core_routes(app, router)
register_health_routes(app)
register_status_route(app, router)
register_concurrency_route(app, router)
register_copilot_routes(app, router)
register_admin_routes(app, router)
register_session_vendor_routes(app, router)
Expand Down
145 changes: 112 additions & 33 deletions src/coding/proxy/vendors/concurrency.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""每模型并发限制器 — 基于 asyncio.Semaphore 的公平排队.
"""每模型并发限制器 — 支持运行时动态调整的公平排队.

为每个映射后的模型(如 ``glm-5v-turbo``)独立维护一个 ``asyncio.Semaphore``,
为每个映射后的模型(如 ``glm-5v-turbo``)独立维护一个 ``_ConcurrencySlot`,
确保同一时间点该模型的并行请求数不超过配置的上限。当所有槽位被占满时,
新请求按 FIFO 顺序排队等待,直到有槽位释放。

设计要点:
- **惰性创建**:仅在首次请求到达时才为该模型创建 Semaphore,避免冷启动开销
- **FIFO 公平**:``asyncio.Semaphore`` 内部使用 FIFO 队列,天然满足排队语义
- **按映射后模型名键控**:与上游真实承载能力对齐,而非按客户端请求名(如 ``claude-sonnet-*``)
- **惰性创建**:仅在首次请求到达时才为该模型创建 Slot,避免冷启动开销
- **FIFO 公平**:``asyncio.Event`` + while 循环天然满足 FIFO 排队语义
- **动态调整**:支持运行时修改 per-model limit,无需重启进程
- **按映射后模型名键控**:与上游真实承载能力对齐,而非按客户端请求名
"""

from __future__ import annotations
Expand All @@ -20,62 +21,140 @@
logger = logging.getLogger(__name__)


class _ConcurrencySlot:
"""支持动态 limit 的并发槽位.

使用 ``asyncio.Event`` 作为等待/通知原语,在 ``acquire`` 中 await 等待,
在 ``release`` / ``set_limit`` 中唤醒。``set_limit`` 修改上限后立即唤醒
所有等待者,由它们重新判断是否可获得槽位。
"""

def __init__(self, limit: int) -> None:
self._limit = limit
self._in_use: int = 0
self._pending: int = 0
self._wake = asyncio.Event()
self._wake.set()

async def acquire(self) -> _ConcurrencySlot:
"""获取一个并发槽位,必要时阻塞排队.

返回 ``self``,调用方在请求完成后调用 ``release()``。
"""
# Fast path
if self._in_use < self._limit:
self._in_use += 1
return self
# Slow path — 等待槽位释放
self._pending += 1
try:
while True:
self._wake.clear()
await self._wake.wait()
if self._in_use < self._limit:
self._in_use += 1
return self
finally:
self._pending -= 1

def release(self) -> None:
"""释放一个并发槽位."""
self._in_use = max(0, self._in_use - 1)
self._wake.set()

def set_limit(self, new_limit: int) -> None:
"""动态调整并发上限.

增大 limit 时立即唤醒等待者;缩小时已持有的槽位不受影响,
新 limit 在后续 acquire 中自然生效。
"""
self._limit = new_limit
self._wake.set()

@property
def limit(self) -> int:
return self._limit

@property
def in_use(self) -> int:
return self._in_use

@property
def available(self) -> int:
return max(0, self._limit - self._in_use)

@property
def pending(self) -> int:
return self._pending


class ModelConcurrencyLimiter:
"""按模型名提供独立并发槽位的限制器.

用法::

limiter = ModelConcurrencyLimiter(config)
sem = await limiter.acquire("glm-5v-turbo")
slot = await limiter.acquire("glm-5v-turbo")
try:
... # 执行请求
finally:
sem.release()
slot.release()
"""

def __init__(self, config: ZhipuConcurrencyConfig) -> None:
self._config = config
self._semaphores: dict[str, asyncio.Semaphore] = {}
self._slots: dict[str, _ConcurrencySlot] = {}

def _get_semaphore(self, model: str) -> asyncio.Semaphore:
"""获取(或惰性创建)指定模型的信号量."""
sem = self._semaphores.get(model)
if sem is None:
def _get_or_create_slot(self, model: str) -> _ConcurrencySlot:
"""获取(或惰性创建)指定模型的并发槽位."""
slot = self._slots.get(model)
if slot is None:
limit = self._config.get_limit(model)
sem = asyncio.Semaphore(limit)
self._semaphores[model] = sem
slot = _ConcurrencySlot(limit)
self._slots[model] = slot
logger.debug(
"ModelConcurrencyLimiter: created semaphore model=%s limit=%d",
"ModelConcurrencyLimiter: created slot model=%s limit=%d",
model,
limit,
)
return sem
return slot

async def acquire(self, model: str) -> asyncio.Semaphore:
async def acquire(self, model: str) -> _ConcurrencySlot:
"""获取指定模型的并发槽位,必要时阻塞排队.

返回已获取的 Semaphore 实例,调用方负责在请求完成后调用 ``release()``。
返回已获取的 Slot 实例,调用方负责在请求完成后调用 ``release()``。
"""
sem = self._get_semaphore(model)
await sem.acquire()
return sem
slot = self._get_or_create_slot(model)
await slot.acquire()
return slot

def set_limit(self, model: str, new_limit: int) -> None:
"""运行时修改指定模型的并发上限.

同时更新 config.models 以确保后续惰性创建使用新值。
"""
slot = self._slots.get(model)
if slot is None:
slot = _ConcurrencySlot(new_limit)
self._slots[model] = slot
else:
slot.set_limit(new_limit)
self._config.models[model] = new_limit
logger.info(
"ModelConcurrencyLimiter: updated limit model=%s new_limit=%d",
model,
new_limit,
)

def get_diagnostics(self) -> dict[str, dict[str, int]]:
"""返回每个模型的并发状态快照(用于可观测性)."""
snapshot: dict[str, dict[str, int]] = {}
for model, sem in self._semaphores.items():
limit = self._config.get_limit(model)
# asyncio.Semaphore 内部 _value 表示剩余可用槽位
available = sem._value # noqa: SLF001 — 公开 API 未暴露
in_use = max(limit - available, 0)
# _waiters 为正在排队等待的协程集合,无等待者时为 None
waiters = getattr(sem, "_waiters", None) # noqa: SLF001
pending = len(waiters) if waiters else 0
for model, slot in self._slots.items():
snapshot[model] = {
"limit": limit,
"in_use": in_use,
"available": max(available, 0),
"pending": pending,
"limit": slot.limit,
"in_use": slot.in_use,
"available": slot.available,
"pending": slot.pending,
}
return snapshot

Expand Down
Loading
Loading