Add Serving V2: multiprocess worker + dynamic batching#274
Conversation
Implement the serving V2 architecture with a dedicated worker process for NPU inference, scheduler-driven batching, and async streaming API. Key components: - llm/core/worker.py: NPU worker process (model load + batch execution) - llm/core/async_engine.py: AsyncLLMEngine with scheduler loop - llm/core/block_pool.py: logical page management for scheduler - llm/core/server.py: FastAPI /v1/completions endpoint - llm/cli/main.py: `pypto serve` CLI command Also documents a pre-existing kernel bug (max_batch_size must be >= 16 due to BATCH_TILE mismatch in qwen3_14b_decode_full.py) and applies the workaround in all configs and tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR introduces a complete async HTTP serving pipeline for LLM inference. It replaces placeholder components with a production-ready system: BlockPool manages KV cache with LRU eviction and prefix caching, Scheduler implements continuous batching with request preemption, Worker executes model inference in a separate process, AsyncEngine orchestrates requests through the pipeline, and ServingServer exposes OpenAI-compatible HTTP endpoints. The CLI gains a ChangesAsync Serving Infrastructure
Sequence Diagram(s)sequenceDiagram
participant Client
participant Server as FastAPI Server
participant Engine as AsyncLLMEngine
participant Sched as Scheduler
participant Worker
Client->>Server: POST /v1/completions
Server->>Engine: add_request(prompt, config)
Engine->>Engine: Tokenize
Engine->>Sched: add_request(Request)
activate Engine
Engine->>Engine: _engine_loop spawned
loop per step
Engine->>Sched: schedule()
Sched-->>Engine: SchedulerOutput
Engine->>Worker: WorkerCommand(step)
Worker->>Worker: Prefill+Decode execution
Worker-->>Engine: StepOutput(tokens)
Engine->>Engine: Decode text, update Scheduler
Engine->>Engine: Enqueue TokenOutput to request queue
end
deactivate Engine
Engine-->>Server: TokenOutput stream
Server->>Server: Aggregate or stream SSE chunks
Server-->>Client: JSON or event stream
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 16
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
llm/cli/main.py (1)
150-156:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
max_batch_sizeguard is narrower than the documented NaN workaround.Current check only enforces 16 when
npu.l3_modeis enabled. That allowsmax_batch_size < 16on NPU in other configs, which conflicts with the stated Serving V2 safety constraint.Suggested fix
- max_batch_size_default = _L3_BATCH_TILE if backend == "npu" and npu_l3_mode else 1 + max_batch_size_default = _L3_BATCH_TILE if backend == "npu" else 1 max_batch_size = _get_int(runtime_section, "max_batch_size", max_batch_size_default) - if backend == "npu" and npu_l3_mode and max_batch_size != _L3_BATCH_TILE: + if backend == "npu" and max_batch_size < _L3_BATCH_TILE: raise ValueError( - f"npu.l3 requires runtime.max_batch_size={_L3_BATCH_TILE}; " + f"runtime.max_batch_size must be >= {_L3_BATCH_TILE} on NPU; " f"got {max_batch_size}." )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@llm/cli/main.py` around lines 150 - 156, The current guard only enforces max_batch_size == _L3_BATCH_TILE when npu_l3_mode is true, but the Serving V2 safety constraint requires preventing max_batch_size < _L3_BATCH_TILE for any NPU backend; update the check around max_batch_size (and the error) so that whenever backend == "npu" and max_batch_size < _L3_BATCH_TILE it raises a ValueError, referencing the same symbols (max_batch_size, _L3_BATCH_TILE, backend, npu_l3_mode, and _get_int) and preserving the existing default computation logic.
🧹 Nitpick comments (2)
llm/tests/test_serving_e2e.py (1)
19-19: 💤 Low valueConsider using a relative path or environment variable for portability.
The hardcoded absolute path
/data/linyifan/models/Qwen3-14Bis not portable across development environments or CI systems. Consider using an environment variable or documenting that users must override this with--model-dir.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@llm/tests/test_serving_e2e.py` at line 19, The test uses a hardcoded absolute path in the parser argument (parser.add_argument("--model-dir", type=str, default="/data/linyifan/models/Qwen3-14B")), which is not portable; change it to read from an environment variable or a relative default by replacing the literal default with a call to the environment (e.g., use os.environ.get("MODEL_DIR", "./models/Qwen3-14B")) and ensure os is imported, and update any test documentation or comments to instruct overriding via --model-dir if needed.serving_config.json (1)
2-5: 💤 Low valueConfiguration contains environment-specific paths.
The
model_diris an absolute path specific to a development environment. Users will need to update this value for their environment. Consider documenting this requirement or providing a template with placeholder values.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@serving_config.json` around lines 2 - 5, The configuration currently hardcodes an environment-specific absolute path in the model object (model.model_dir = "/data/linyifan/models/Qwen3-14B"); update it to a non-specific placeholder and document that users must set model_dir for their environment (e.g., replace the path with "<MODEL_DIR_PATH>" or "./models/<MODEL_ID>") and add a short note near model_id/model_dir explaining how to point to a local or mounted model directory; ensure references to model_id ("qwen3-14b") remain unchanged so the loader can still resolve the model name.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@llm/cli/main.py`:
- Around line 72-73: Change the default bind address from "0.0.0.0" to the
loopback "127.0.0.1" for the CLI host arguments to avoid accidental public
exposure: update the parser.add_argument call that defines "--host" (the
occurrences where parser.add_argument("--host", default="0.0.0.0", ... )
appears, also the repeat around lines 277-278) to use default="127.0.0.1" and
adjust the help text to indicate it binds to localhost by default and that users
can opt into public binding by specifying 0.0.0.0 explicitly. Ensure both
occurrences are changed consistently.
In `@llm/core/async_engine.py`:
- Around line 236-243: The await asyncio.to_thread(self._output_queue.get,
timeout=300) can raise (e.g., asyncio.TimeoutError or queue.Empty) and kill the
engine loop; wrap that await in a try/except, catch asyncio.TimeoutError and
relevant queue exceptions (or Exception as a fallback), log the exception with
details via logger.error, and then call
self._handle_step_error(scheduler_output) (or synthesize a StepOutput with an
error) and continue the loop so engine state remains consistent; update
references around StepOutput, self._output_queue.get, logger.error, and
self._handle_step_error to implement this safe error handling.
- Around line 170-174: The code assumes self.tokenizer exists before calling
self.tokenizer.encode; add an explicit guard at the start of that block (in the
method containing prompt_token_ids logic) that checks if self.tokenizer is None
and immediately raises a clear ValueError (or TypeError) like "Tokenizer is not
configured on AsyncEngine" so callers fail fast; then proceed with the existing
encode, bos_token_id fallback and the existing ValueError for empty tokens.
- Around line 137-146: The shutdown ordering in async_engine.stop is unsafe:
stop currently awaits self._loop_task before sending
WorkerCommand(type="shutdown"), which can deadlock if _engine_loop is blocked
waiting on worker output; modify stop to first send the shutdown command to
self._input_queue (if not None) and then await or cancel self._loop_task (if not
None), or alternatively cancel the loop task prior to awaiting to ensure
_engine_loop is unblocked; update references to stop, self._input_queue,
WorkerCommand("shutdown"), and self._loop_task accordingly so the shutdown
signal is delivered before waiting for task completion.
In `@llm/core/scheduler.py`:
- Around line 329-335: Preemption frees blocks and resets num_computed_tokens
but leaves decode state (output_token_ids), causing resumed runs to miss KV for
already-emitted tokens; update the preemption path (where
_free_request_blocks(victim) is called and victim.status is set to
RequestStatus.PREEMPTED) to also clear decode-related state by resetting
victim.output_token_ids (and any decode/KV cache fields if present), and ensure
cached_block_ids and allocated_block_ids are cleared consistently before
re-queuing the victim into waiting via waiting.appendleft.
- Around line 150-158: The scheduler currently only uses max_seq_len as advisory
so requests near the context limit can be scheduled extra tokens; fix by
clamping any computed num_new (e.g., where num_new =
request.num_new_tokens_needed and later min(...) is applied) to the remaining
context capacity computed from self.config.max_seq_len minus the request's
current sequence length/token count (use the request field that tracks tokens
produced so far), and if remaining <= 0 move the request to running_to_keep/skip
scheduling. Apply the same clamp in the other scheduling phase blocks you
referenced (the other num_new computations around the 201-205 and 293-301
regions) and update _check_finish() to treat requests with current_seq_len >=
self.config.max_seq_len as finished (stop further scheduling and
eviction/cleanup as appropriate) so nothing can overrun the context/KV capacity.
- Around line 285-289: The scheduler currently frees blocks and removes finished
Request objects from self.running but leaves their entries in self.requests;
after calling self._free_request_blocks(request) for each req_id in
finished_ids, remove the corresponding registry entry from self.requests (e.g.,
call self.requests.pop(req_id, None)) so finished Request objects and their
token buffers are dropped; ensure you still guard on request is not None before
freeing and popping to avoid KeyError.
- Around line 196-208: The scheduler currently requeues requests when num_new
(calculated from request.num_new_tokens_needed) is <= 0, which causes fully
block-aligned cache hits to never reach their first decode; to fix, detect the
full-cache case (e.g., cached_blocks returned by
self.block_pool.get_computed_blocks and request.num_computed_tokens >=
request.num_prompt_tokens or request.num_computed_tokens ==
len(request.prompt_token_ids)) and force at least one new token to be scheduled
instead of requeuing: set num_new = max(1, num_new) (or explicitly num_new = 1)
before the "if num_new <= 0" check for requests with
cached_block_ids/request.num_computed_tokens covering the prompt so the first
decode step runs; update uses of
request.num_new_tokens_needed/request.num_computed_tokens/request.cached_block_ids
accordingly and keep remaining_waiting logic unchanged for true zero-work cases.
In `@llm/core/worker.py`:
- Around line 168-174: _batch_prefill is sending full request.prompt_token_ids
and sizing allocations from the full prompt, which re-computes tokens already
marked done by the scheduler; change it to use the scheduler-provided chunk
slice for each scheduled entry (e.g., use sr.chunk_token_ids if available, or
slice request.prompt_token_ids using sr.start/sr.end offsets) when building
token_ids_list and seq_lens and when calling _get_or_create_allocation so the
allocation length equals the chunk length; make the identical change in the
other prefill loop (the block around the later 184-199 region) so both places
respect scheduler chunk boundaries and keep worker KV/state consistent with
scheduler accounting.
In `@llm/tests/bench_serving.py`:
- Line 117: The code creates asyncio.Semaphore(args.concurrency) and uses other
CLI numeric args (requests/tokens) without validation, which allows zero or
negative values and can deadlock or produce invalid runs; fix by validating and
coercing these CLI values after parsing (e.g., ensure args.concurrency,
args.requests, args.tokens are ints > 0), raise a clear
argparse.ArgumentTypeError or ValueError (or default to 1) when they are
non-positive, and then use the validated values when creating the semaphore (sem
= asyncio.Semaphore(validated_concurrency)) and elsewhere; reference the
identifiers args.concurrency, sem, args.requests and args.tokens when making the
checks.
- Line 110: Several print statements in llm/tests/bench_serving.py use f-strings
without interpolation (e.g., print(f"=== PyPTO Serving Benchmark ==="), and the
static-print lines containing "requests/sec", "avg latency", "p99 latency", "p50
latency"); remove the redundant f-prefixes so they are plain string literals
(e.g., print("=== PyPTO Serving Benchmark ===")) to satisfy Ruff F541. Locate
the print(...) calls with those exact static messages and replace f"... " with
"..." in their respective calls.
- Around line 24-35: send_request_streaming and send_request_non_streaming
currently parse response bodies without validating HTTP status; add an explicit
check on resp.status after the async with session.post(...) and before any body
parsing: if resp.status is not in the 200-299 range, read the response text/json
(for streaming, drain remaining content safely), record/log the error (or raise
an exception) and skip counting this request in benchmark metrics. Specifically
modify the streaming block around the async for line in resp.content to first
test resp.status and handle non-2xx responses, and likewise add a resp.status
check in send_request_non_streaming before calling resp.json() so error payloads
are not treated as successful responses.
In `@llm/tests/test_baseline_generate.py`:
- Line 27: The print call uses an unnecessary f-string (no placeholders) —
remove the leading "f" from the string literal in the print statement that
outputs "=== Existing Engine Baseline Test ===" (in the
test_baseline_generate.py test) so it's a regular string literal instead of an
f-string.
In `@llm/tests/test_serving_e2e.py`:
- Line 35: The print statement uses an unnecessary f-string prefix with no
placeholders; update the print call (the line containing print(f"=== PyPTO
Serving V2 E2E Verification ===")) to remove the leading 'f' so it becomes a
plain string literal (print("=== PyPTO Serving V2 E2E Verification ===")).
- Around line 87-119: The test currently calls await engine.start() and later
await engine.stop() but if engine.start() or the request loop raises an
exception the stop call is skipped; wrap the startup, request loop (including
the async for engine.add_request(...)) and assertions in a try/finally so
cleanup always runs, e.g. set a local started flag after await engine.start()
and in the finally await engine.stop() only if started is True (or check engine
is not None), then re-raise the exception if needed so failures still surface;
ensure you reference the existing engine.start(),
engine.add_request("e2e-req-1", ...), and engine.stop() calls when applying the
change.
In `@llm/tests/test_serving_integration.py`:
- Around line 180-185: The test currently only logs batch_sizes_seen but doesn't
assert that batching occurred; after computing max_batch (and/or using
batch_sizes_seen) add an assertion to fail the test if no batching
happened—e.g., replace or augment the prints with assert max_batch > 1 or assert
any(bs > 1 for bs in batch_sizes_seen) so the test will fail when batch_size
never exceeds 1; reference the variables batch_sizes_seen and max_batch in the
assertion.
---
Outside diff comments:
In `@llm/cli/main.py`:
- Around line 150-156: The current guard only enforces max_batch_size ==
_L3_BATCH_TILE when npu_l3_mode is true, but the Serving V2 safety constraint
requires preventing max_batch_size < _L3_BATCH_TILE for any NPU backend; update
the check around max_batch_size (and the error) so that whenever backend ==
"npu" and max_batch_size < _L3_BATCH_TILE it raises a ValueError, referencing
the same symbols (max_batch_size, _L3_BATCH_TILE, backend, npu_l3_mode, and
_get_int) and preserving the existing default computation logic.
---
Nitpick comments:
In `@llm/tests/test_serving_e2e.py`:
- Line 19: The test uses a hardcoded absolute path in the parser argument
(parser.add_argument("--model-dir", type=str,
default="/data/linyifan/models/Qwen3-14B")), which is not portable; change it to
read from an environment variable or a relative default by replacing the literal
default with a call to the environment (e.g., use os.environ.get("MODEL_DIR",
"./models/Qwen3-14B")) and ensure os is imported, and update any test
documentation or comments to instruct overriding via --model-dir if needed.
In `@serving_config.json`:
- Around line 2-5: The configuration currently hardcodes an environment-specific
absolute path in the model object (model.model_dir =
"/data/linyifan/models/Qwen3-14B"); update it to a non-specific placeholder and
document that users must set model_dir for their environment (e.g., replace the
path with "<MODEL_DIR_PATH>" or "./models/<MODEL_ID>") and add a short note near
model_id/model_dir explaining how to point to a local or mounted model
directory; ensure references to model_id ("qwen3-14b") remain unchanged so the
loader can still resolve the model name.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3a41a3df-386c-4ae3-a44a-501de4cbff58
📒 Files selected for processing (15)
llm/cli/main.pyllm/core/__init__.pyllm/core/async_engine.pyllm/core/block_pool.pyllm/core/pypto_executor.pyllm/core/scheduler.pyllm/core/server.pyllm/core/types.pyllm/core/worker.pyllm/tests/bench_serving.pyllm/tests/test_baseline_generate.pyllm/tests/test_serving.pyllm/tests/test_serving_e2e.pyllm/tests/test_serving_integration.pyserving_config.json
| parser.add_argument("--host", default="0.0.0.0", help="Host to bind the serving server (default: 0.0.0.0).") | ||
| parser.add_argument("--port", type=int, default=8000, help="Port for the serving server (default: 8000).") |
There was a problem hiding this comment.
Default host exposes the service on all interfaces.
Using 0.0.0.0 by default makes accidental network exposure easy in local/dev runs. Prefer loopback default and let users opt in to public bind.
Suggested fix
- parser.add_argument("--host", default="0.0.0.0", help="Host to bind the serving server (default: 0.0.0.0).")
+ parser.add_argument("--host", default="127.0.0.1", help="Host to bind the serving server (default: 127.0.0.1).")
...
- host: str = "0.0.0.0",
+ host: str = "127.0.0.1",Also applies to: 277-278
🧰 Tools
🪛 Ruff (0.15.12)
[error] 72-72: Possible binding to all interfaces
(S104)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@llm/cli/main.py` around lines 72 - 73, Change the default bind address from
"0.0.0.0" to the loopback "127.0.0.1" for the CLI host arguments to avoid
accidental public exposure: update the parser.add_argument call that defines
"--host" (the occurrences where parser.add_argument("--host", default="0.0.0.0",
... ) appears, also the repeat around lines 277-278) to use default="127.0.0.1"
and adjust the help text to indicate it binds to localhost by default and that
users can opt into public binding by specifying 0.0.0.0 explicitly. Ensure both
occurrences are changed consistently.
| async def stop(self) -> None: | ||
| """Stop engine loop and worker process.""" | ||
| self._running = False | ||
| if self._loop_task is not None: | ||
| await self._loop_task | ||
| self._loop_task = None | ||
|
|
||
| if self._input_queue is not None: | ||
| self._input_queue.put(WorkerCommand(type="shutdown")) | ||
|
|
There was a problem hiding this comment.
Shutdown sequence can stall for minutes or deadlock.
Line 141 waits for _loop_task before Line 145 sends shutdown, but _engine_loop may be blocked waiting on worker output. Send shutdown first (or cancel loop task) before awaiting task completion.
Suggested fix
async def stop(self) -> None:
"""Stop engine loop and worker process."""
self._running = False
+ if self._input_queue is not None:
+ self._input_queue.put(WorkerCommand(type="shutdown"))
+
if self._loop_task is not None:
- await self._loop_task
+ try:
+ await asyncio.wait_for(self._loop_task, timeout=5)
+ except asyncio.TimeoutError:
+ self._loop_task.cancel()
+ with contextlib.suppress(asyncio.CancelledError):
+ await self._loop_task
self._loop_task = None
-
- if self._input_queue is not None:
- self._input_queue.put(WorkerCommand(type="shutdown"))🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@llm/core/async_engine.py` around lines 137 - 146, The shutdown ordering in
async_engine.stop is unsafe: stop currently awaits self._loop_task before
sending WorkerCommand(type="shutdown"), which can deadlock if _engine_loop is
blocked waiting on worker output; modify stop to first send the shutdown command
to self._input_queue (if not None) and then await or cancel self._loop_task (if
not None), or alternatively cancel the loop task prior to awaiting to ensure
_engine_loop is unblocked; update references to stop, self._input_queue,
WorkerCommand("shutdown"), and self._loop_task accordingly so the shutdown
signal is delivered before waiting for task completion.
| prompt_token_ids = self.tokenizer.encode(prompt) | ||
| if not prompt_token_ids and self.bos_token_id is not None: | ||
| prompt_token_ids = [self.bos_token_id] | ||
| if not prompt_token_ids: | ||
| raise ValueError("Prompt tokenization produced no tokens.") |
There was a problem hiding this comment.
Fail fast when tokenizer is missing.
Line 170 assumes self.tokenizer is always set. Add an explicit guard to return a clear error instead of an AttributeError.
Suggested fix
"""Add a request and yield token outputs as they are generated."""
+ if self.tokenizer is None:
+ raise RuntimeError("AsyncLLMEngine tokenizer is required before add_request().")
prompt_token_ids = self.tokenizer.encode(prompt)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| prompt_token_ids = self.tokenizer.encode(prompt) | |
| if not prompt_token_ids and self.bos_token_id is not None: | |
| prompt_token_ids = [self.bos_token_id] | |
| if not prompt_token_ids: | |
| raise ValueError("Prompt tokenization produced no tokens.") | |
| if self.tokenizer is None: | |
| raise RuntimeError("AsyncLLMEngine tokenizer is required before add_request().") | |
| prompt_token_ids = self.tokenizer.encode(prompt) | |
| if not prompt_token_ids and self.bos_token_id is not None: | |
| prompt_token_ids = [self.bos_token_id] | |
| if not prompt_token_ids: | |
| raise ValueError("Prompt tokenization produced no tokens.") |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@llm/core/async_engine.py` around lines 170 - 174, The code assumes
self.tokenizer exists before calling self.tokenizer.encode; add an explicit
guard at the start of that block (in the method containing prompt_token_ids
logic) that checks if self.tokenizer is None and immediately raises a clear
ValueError (or TypeError) like "Tokenizer is not configured on AsyncEngine" so
callers fail fast; then proceed with the existing encode, bos_token_id fallback
and the existing ValueError for empty tokens.
| step_output: StepOutput = await asyncio.to_thread( | ||
| self._output_queue.get, timeout=300 | ||
| ) | ||
|
|
||
| if step_output.error: | ||
| logger.error(f"Worker returned error: {step_output.error}") | ||
| self._handle_step_error(scheduler_output) | ||
| continue |
There was a problem hiding this comment.
Unhandled worker queue timeout/error can kill engine loop.
If worker output is delayed/crashed, the get(..., timeout=300) path can raise and terminate the loop task. Handle timeout/queue exceptions explicitly and keep engine state consistent.
Suggested fix
+import queue
...
- step_output: StepOutput = await asyncio.to_thread(
- self._output_queue.get, timeout=300
- )
+ try:
+ step_output: StepOutput = await asyncio.to_thread(
+ self._output_queue.get, timeout=300
+ )
+ except queue.Empty:
+ logger.error("Worker step timed out; aborting scheduled batch")
+ self._handle_step_error(scheduler_output)
+ continue
+ except Exception as exc:
+ logger.exception(f"Worker output queue failure: {exc}")
+ self._handle_step_error(scheduler_output)
+ continue📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| step_output: StepOutput = await asyncio.to_thread( | |
| self._output_queue.get, timeout=300 | |
| ) | |
| if step_output.error: | |
| logger.error(f"Worker returned error: {step_output.error}") | |
| self._handle_step_error(scheduler_output) | |
| continue | |
| try: | |
| step_output: StepOutput = await asyncio.to_thread( | |
| self._output_queue.get, timeout=300 | |
| ) | |
| except queue.Empty: | |
| logger.error("Worker step timed out; aborting scheduled batch") | |
| self._handle_step_error(scheduler_output) | |
| continue | |
| except Exception as exc: | |
| logger.exception(f"Worker output queue failure: {exc}") | |
| self._handle_step_error(scheduler_output) | |
| continue | |
| if step_output.error: | |
| logger.error(f"Worker returned error: {step_output.error}") | |
| self._handle_step_error(scheduler_output) | |
| continue |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@llm/core/async_engine.py` around lines 236 - 243, The await
asyncio.to_thread(self._output_queue.get, timeout=300) can raise (e.g.,
asyncio.TimeoutError or queue.Empty) and kill the engine loop; wrap that await
in a try/except, catch asyncio.TimeoutError and relevant queue exceptions (or
Exception as a fallback), log the exception with details via logger.error, and
then call self._handle_step_error(scheduler_output) (or synthesize a StepOutput
with an error) and continue the loop so engine state remains consistent; update
references around StepOutput, self._output_queue.get, logger.error, and
self._handle_step_error to implement this safe error handling.
| num_new = request.num_new_tokens_needed | ||
| if num_new <= 0: | ||
| running_to_keep.append(request) | ||
| continue | ||
|
|
||
| if self.config.long_prefill_token_threshold > 0: | ||
| num_new = min(num_new, self.config.long_prefill_token_threshold) | ||
| num_new = min(num_new, token_budget) | ||
|
|
There was a problem hiding this comment.
max_seq_len is currently advisory only.
Neither scheduling phase clamps work against the remaining context window, and _check_finish() never stops a request that has already reached self.config.max_seq_len. Prompts near the limit can still be scheduled for extra tokens and overrun context/KV capacity.
Also applies to: 201-205, 293-301
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@llm/core/scheduler.py` around lines 150 - 158, The scheduler currently only
uses max_seq_len as advisory so requests near the context limit can be scheduled
extra tokens; fix by clamping any computed num_new (e.g., where num_new =
request.num_new_tokens_needed and later min(...) is applied) to the remaining
context capacity computed from self.config.max_seq_len minus the request's
current sequence length/token count (use the request field that tracks tokens
produced so far), and if remaining <= 0 move the request to running_to_keep/skip
scheduling. Apply the same clamp in the other scheduling phase blocks you
referenced (the other num_new computations around the 201-205 and 293-301
regions) and update _check_finish() to treat requests with current_seq_len >=
self.config.max_seq_len as finished (stop further scheduling and
eviction/cleanup as appropriate) so nothing can overrun the context/KV capacity.
| print(f"Mode: {'streaming (TTFT + decode)' if args.stream else 'non-streaming (e2e only)'}") | ||
| print() | ||
|
|
||
| sem = asyncio.Semaphore(args.concurrency) |
There was a problem hiding this comment.
Validate numeric CLI arguments to prevent deadlock/invalid runs.
Line 117 uses args.concurrency directly. With --concurrency 0, requests block indefinitely; non-positive request/token values also produce invalid runs.
Proposed fix
def main():
@@
parser.add_argument("--stream", action="store_true", help="Use streaming to measure TTFT and decode latency")
args = parser.parse_args()
+ if args.num_requests <= 0:
+ parser.error("--num-requests must be > 0")
+ if args.concurrency <= 0:
+ parser.error("--concurrency must be > 0")
+ if args.max_tokens <= 0:
+ parser.error("--max-tokens must be > 0")
asyncio.run(run_bench(args))Also applies to: 188-194
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@llm/tests/bench_serving.py` at line 117, The code creates
asyncio.Semaphore(args.concurrency) and uses other CLI numeric args
(requests/tokens) without validation, which allows zero or negative values and
can deadlock or produce invalid runs; fix by validating and coercing these CLI
values after parsing (e.g., ensure args.concurrency, args.requests, args.tokens
are ints > 0), raise a clear argparse.ArgumentTypeError or ValueError (or
default to 1) when they are non-positive, and then use the validated values when
creating the semaphore (sem = asyncio.Semaphore(validated_concurrency)) and
elsewhere; reference the identifiers args.concurrency, sem, args.requests and
args.tokens when making the checks.
| print(f"ERROR: Model directory not found: {model_dir}") | ||
| sys.exit(1) | ||
|
|
||
| print(f"=== PyPTO Serving V2 E2E Verification ===") |
There was a problem hiding this comment.
Remove unnecessary f-string prefix.
The f-string contains no placeholders.
📝 Proposed fix
- print(f"=== PyPTO Serving V2 E2E Verification ===")
+ print("=== PyPTO Serving V2 E2E Verification ===")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| print(f"=== PyPTO Serving V2 E2E Verification ===") | |
| print("=== PyPTO Serving V2 E2E Verification ===") |
🧰 Tools
🪛 Ruff (0.15.12)
[error] 35-35: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@llm/tests/test_serving_e2e.py` at line 35, The print statement uses an
unnecessary f-string prefix with no placeholders; update the print call (the
line containing print(f"=== PyPTO Serving V2 E2E Verification ===")) to remove
the leading 'f' so it becomes a plain string literal (print("=== PyPTO Serving
V2 E2E Verification ===")).
| await engine.start() | ||
| print(f" Engine started in {time.time() - t1:.1f}s") | ||
|
|
||
| # --- Test: Single request --- | ||
| print(f"[3/3] Testing single request (max_new_tokens={args.max_new_tokens})...") | ||
| config = GenerateConfig( | ||
| max_new_tokens=args.max_new_tokens, | ||
| temperature=0.0, | ||
| ) | ||
|
|
||
| t2 = time.time() | ||
| full_text = "" | ||
| finish_reason = "" | ||
| token_count = 0 | ||
| async for output in engine.add_request("e2e-req-1", "What is 1+1?", config): | ||
| if output.text: | ||
| full_text = output.text | ||
| if output.token_id is not None: | ||
| token_count += 1 | ||
| if output.finished: | ||
| finish_reason = output.finish_reason | ||
| break | ||
| elapsed = time.time() - t2 | ||
|
|
||
| print(f" Response: {full_text[:100]}...") | ||
| print(f" Tokens: {token_count}, Time: {elapsed:.2f}s") | ||
| print(f" Finish reason: {finish_reason}") | ||
| if token_count > 0: | ||
| print(f" Speed: {token_count/elapsed:.1f} tok/s") | ||
| assert len(full_text) > 0 or token_count > 0, "No output generated" | ||
|
|
||
| # --- Cleanup --- | ||
| await engine.stop() |
There was a problem hiding this comment.
Add error handling to ensure cleanup on failure.
If engine.start() or test execution raises an exception, engine.stop() will not be called, potentially leaving worker processes running. Wrap the test logic in a try-finally block.
🛡️ Proposed fix to add error handling
await engine.start()
print(f" Engine started in {time.time() - t1:.1f}s")
- # --- Test: Single request ---
- print(f"[3/3] Testing single request (max_new_tokens={args.max_new_tokens})...")
- config = GenerateConfig(
- max_new_tokens=args.max_new_tokens,
- temperature=0.0,
- )
-
- t2 = time.time()
- full_text = ""
- finish_reason = ""
- token_count = 0
- async for output in engine.add_request("e2e-req-1", "What is 1+1?", config):
- if output.text:
- full_text = output.text
- if output.token_id is not None:
- token_count += 1
- if output.finished:
- finish_reason = output.finish_reason
- break
- elapsed = time.time() - t2
-
- print(f" Response: {full_text[:100]}...")
- print(f" Tokens: {token_count}, Time: {elapsed:.2f}s")
- print(f" Finish reason: {finish_reason}")
- if token_count > 0:
- print(f" Speed: {token_count/elapsed:.1f} tok/s")
- assert len(full_text) > 0 or token_count > 0, "No output generated"
-
- # --- Cleanup ---
- await engine.stop()
+ try:
+ # --- Test: Single request ---
+ print(f"[3/3] Testing single request (max_new_tokens={args.max_new_tokens})...")
+ config = GenerateConfig(
+ max_new_tokens=args.max_new_tokens,
+ temperature=0.0,
+ )
+
+ t2 = time.time()
+ full_text = ""
+ finish_reason = ""
+ token_count = 0
+ async for output in engine.add_request("e2e-req-1", "What is 1+1?", config):
+ if output.text:
+ full_text = output.text
+ if output.token_id is not None:
+ token_count += 1
+ if output.finished:
+ finish_reason = output.finish_reason
+ break
+ elapsed = time.time() - t2
+
+ print(f" Response: {full_text[:100]}...")
+ print(f" Tokens: {token_count}, Time: {elapsed:.2f}s")
+ print(f" Finish reason: {finish_reason}")
+ if token_count > 0:
+ print(f" Speed: {token_count/elapsed:.1f} tok/s")
+ assert len(full_text) > 0 or token_count > 0, "No output generated"
+ finally:
+ # --- Cleanup ---
+ await engine.stop()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@llm/tests/test_serving_e2e.py` around lines 87 - 119, The test currently
calls await engine.start() and later await engine.stop() but if engine.start()
or the request loop raises an exception the stop call is skipped; wrap the
startup, request loop (including the async for engine.add_request(...)) and
assertions in a try/finally so cleanup always runs, e.g. set a local started
flag after await engine.start() and in the finally await engine.stop() only if
started is True (or check engine is not None), then re-raise the exception if
needed so failures still surface; ensure you reference the existing
engine.start(), engine.add_request("e2e-req-1", ...), and engine.stop() calls
when applying the change.
| # Check that batching occurred (some steps should have batch_size > 1) | ||
| max_batch = max(batch_sizes_seen) if batch_sizes_seen else 0 | ||
| print(f" Batch sizes seen: {batch_sizes_seen}") | ||
| print(f" Max batch size: {max_batch}") | ||
| print(f" All 3 requests completed successfully") | ||
| return True |
There was a problem hiding this comment.
Dynamic batching test does not assert batching behavior.
This currently logs batch sizes but never verifies batch_size > 1, so the test passes even when batching regresses.
Suggested fix
max_batch = max(batch_sizes_seen) if batch_sizes_seen else 0
print(f" Batch sizes seen: {batch_sizes_seen}")
print(f" Max batch size: {max_batch}")
+ assert max_batch > 1, "Expected at least one co-batched step (batch_size > 1)"
print(f" All 3 requests completed successfully")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Check that batching occurred (some steps should have batch_size > 1) | |
| max_batch = max(batch_sizes_seen) if batch_sizes_seen else 0 | |
| print(f" Batch sizes seen: {batch_sizes_seen}") | |
| print(f" Max batch size: {max_batch}") | |
| print(f" All 3 requests completed successfully") | |
| return True | |
| # Check that batching occurred (some steps should have batch_size > 1) | |
| max_batch = max(batch_sizes_seen) if batch_sizes_seen else 0 | |
| print(f" Batch sizes seen: {batch_sizes_seen}") | |
| print(f" Max batch size: {max_batch}") | |
| assert max_batch > 1, "Expected at least one co-batched step (batch_size > 1)" | |
| print(f" All 3 requests completed successfully") | |
| return True |
🧰 Tools
🪛 Ruff (0.15.12)
[error] 184-184: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@llm/tests/test_serving_integration.py` around lines 180 - 185, The test
currently only logs batch_sizes_seen but doesn't assert that batching occurred;
after computing max_batch (and/or using batch_sizes_seen) add an assertion to
fail the test if no batching happened—e.g., replace or augment the prints with
assert max_batch > 1 or assert any(bs > 1 for bs in batch_sizes_seen) so the
test will fail when batch_size never exceeds 1; reference the variables
batch_sizes_seen and max_batch in the assertion.
There was a problem hiding this comment.
Code Review
This pull request introduces a robust serving architecture featuring an OpenAI-compatible API, an asynchronous engine for managing concurrent requests, and a multiprocess worker for NPU-based inference. Key components include a continuous batching scheduler with chunked prefill and preemption capabilities, alongside a block-based KV cache pool supporting prefix caching. Review feedback identifies critical issues regarding state consistency in the preemption logic, missing error handling for worker communication timeouts, and potential memory leaks in request management. Furthermore, improvements are suggested for incremental decoding performance, better encapsulation of request termination logic, and the adoption of flexible chat templates.
| for request in self.running: | ||
| num_new = request.num_new_tokens_needed | ||
| if num_new <= 0: | ||
| running_to_keep.append(request) | ||
| continue | ||
|
|
||
| if self.config.long_prefill_token_threshold > 0: | ||
| num_new = min(num_new, self.config.long_prefill_token_threshold) | ||
| num_new = min(num_new, token_budget) | ||
|
|
||
| if num_new <= 0: | ||
| running_to_keep.append(request) | ||
| continue | ||
|
|
||
| num_blocks_needed = self._blocks_needed(request, num_new) | ||
| if not self._try_allocate_blocks(request, num_blocks_needed): | ||
| preempted = self._preempt_lowest_priority(request) | ||
| if preempted is None: | ||
| running_to_keep.append(request) | ||
| continue | ||
| output.preempted_requests.append(preempted) | ||
| if not self._try_allocate_blocks(request, num_blocks_needed): | ||
| running_to_keep.append(request) | ||
| continue | ||
|
|
||
| is_prefill = request.is_prefill | ||
| output.scheduled_requests.append( | ||
| ScheduledRequest(request=request, num_new_tokens=num_new, is_prefill=is_prefill) | ||
| ) | ||
| if is_prefill: | ||
| output.num_prefill_tokens += num_new | ||
| else: | ||
| output.num_decode_tokens += num_new | ||
| token_budget -= num_new | ||
| running_to_keep.append(request) | ||
|
|
||
| self.running = running_to_keep |
There was a problem hiding this comment.
| step_output: StepOutput = await asyncio.to_thread( | ||
| self._output_queue.get, timeout=300 | ||
| ) |
| for req_id in finished_ids: | ||
| request = self.requests.get(req_id) | ||
| if request is not None: | ||
| self._free_request_blocks(request) | ||
| self.running = [r for r in self.running if r.request_id != req_id] | ||
|
|
There was a problem hiding this comment.
在 update_from_output 中,当请求完成时(FINISHED_EOS 或 FINISHED_LENGTH),仅将其从 self.running 中移除,但没有从 self.requests 字典中删除。这会导致 self.requests 随时间无限增长,造成内存泄漏。建议在移除时同步清理 self.requests。
| for req_id in finished_ids: | |
| request = self.requests.get(req_id) | |
| if request is not None: | |
| self._free_request_blocks(request) | |
| self.running = [r for r in self.running if r.request_id != req_id] | |
| # Remove finished requests from running | |
| for req_id in finished_ids: | |
| request = self.requests.pop(req_id, None) | |
| if request is not None: | |
| self._free_request_blocks(request) | |
| self.running = [r for r in self.running if r.request_id != req_id] |
| return None | ||
| if block.block_hash is not None: | ||
| del self.hash_to_block[block.block_hash] |
There was a problem hiding this comment.
在 allocate 方法中,删除哈希映射时没有检查当前块是否仍然是该哈希的拥有者。如果由于哈希冲突或 cache_block 中的覆盖逻辑,self.hash_to_block[block.block_hash] 已经指向了另一个块,这里的 del 操作会错误地删除新块的映射,破坏 Prefix Caching 的正确性。
| return None | |
| if block.block_hash is not None: | |
| del self.hash_to_block[block.block_hash] | |
| if block.block_hash is not None: | |
| if self.hash_to_block.get(block.block_hash) is block: | |
| del self.hash_to_block[block.block_hash] | |
| block.block_hash = None |
| # Decode current output text | ||
| text = "" | ||
| if ctx.request.output_token_ids: | ||
| text = self.tokenizer.decode(ctx.request.output_token_ids) |
| self.scheduler._free_request_blocks(ctx.request) | ||
| self.scheduler.running = [ | ||
| r | ||
| for r in self.scheduler.running | ||
| if r.request_id != req_output.request_id | ||
| ] | ||
| break |
| def _apply_chat_template(self, messages: list[ChatMessage]) -> str: | ||
| """Simple chat template — can be replaced with tokenizer's chat_template.""" | ||
| parts = [] | ||
| for msg in messages: | ||
| if msg.role == "system": | ||
| parts.append(f"<|system|>\n{msg.content}") | ||
| elif msg.role == "user": | ||
| parts.append(f"<|user|>\n{msg.content}") | ||
| elif msg.role == "assistant": | ||
| parts.append(f"<|assistant|>\n{msg.content}") | ||
| parts.append("<|assistant|>\n") | ||
| return "\n".join(parts) |
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
修改点
新增
llm/core/worker.py): 独立进程持有 NPU,执行 batch prefill/decodellm/core/async_engine.py): scheduler-driven 主循环,Queue 通信,token-by-token streamingllm/core/scheduler.py): 两阶段调度(RUNNING decode + WAITING prefill),动态 batchingllm/core/server.py): OpenAI 兼容 API(/v1/completions, /v1/chat/completions),支持 SSE streamingllm/cli/main.py):pypto serve命令llm/tests/bench_serving.py): 支持 TTFT / decode interval / throughput 测量用法
启动服务
请求示例
性能测试