Serving V2: multiprocess NPU worker + dynamic batching + prefix cache#8
Serving V2: multiprocess NPU worker + dynamic batching + prefix cache#8superxf wants to merge 4 commits into
Conversation
Migrate serving architecture from pypto-lib with a dedicated worker process for NPU inference, scheduler-driven continuous batching, and async streaming. Key components: - python/core/serving_worker.py: NPU worker process (model load + batch execution) - python/core/async_engine.py: AsyncLLMEngine with scheduler loop - python/core/block_pool.py: logical page management with prefix caching - python/core/scheduler.py: continuous batching with chunked prefill and preemption - python/core/server.py: FastAPI /v1/completions and /v1/chat/completions endpoints - python/cli/main.py: --serve mode for HTTP serving Also fixes compile_and_assemble 3-value unpack and upgrades L2 worker to L3 for register() compatibility. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add tokenizer None check in AsyncLLMEngine.add_request - Abort scheduler request on client disconnect to prevent resource leaks - Catch queue.Empty timeout in engine loop to avoid crash on worker hang - Add Scheduler.finish_request() public API, remove private access from engine - Remove unused engine parameter from run_serve Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The scheduler's BlockPool and the worker's KvCacheManager were completely disconnected — the worker always processed full prompts regardless of scheduler decisions. This change bridges them so prefix cache hits skip recomputation and chunk prefill actually splits long prompts across steps. Key changes: - ScheduledRequest carries num_computed_tokens and block_ids to the worker - Worker _batch_prefill only processes the token chunk [num_computed, num_computed+num_new) - KvCacheManager gains allocate_with_page_ids() for scheduler-assigned blocks - Runner _prepare_prefill_inputs uses precomputed block_table/slot_mapping - Scheduler truncates prompts exceeding max_seq_len - block_size default aligned to page_size (64) Verified on NPU: serving e2e, chunk prefill (177 tokens split into 2 steps), and prefix cache all pass correctly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request implements a new serving architecture featuring a multiprocess worker for NPU execution, a continuous batching scheduler with chunked prefill and preemption, and an OpenAI-compatible HTTP API. It also introduces a hash-based prefix cache for efficient KV cache reuse. Review feedback identifies a critical merge conflict in the documentation, suggests optimizing token decoding to avoid redundant work, and recommends using the tokenizer's native chat template methods for better model portability.
| <<<<<<< HEAD | ||
| tests/ CLI and batching tests | ||
| ======= | ||
| src/ PyPTO kernel/program builders | ||
| tests/ CLI, batching, E2E serving, and benchmark tests | ||
| >>>>>>> b2fca02 (Add serving V2: multiprocess worker + dynamic batching + HTTP API) |
There was a problem hiding this comment.
This file contains merge conflict markers (<<<<<<<, =======, >>>>>>>). Please resolve the conflict before merging.
| <<<<<<< HEAD | |
| tests/ CLI and batching tests | |
| ======= | |
| src/ PyPTO kernel/program builders | |
| tests/ CLI, batching, E2E serving, and benchmark tests | |
| >>>>>>> b2fca02 (Add serving V2: multiprocess worker + dynamic batching + HTTP API) | |
| src/ PyPTO kernel/program builders | |
| tests/ CLI, batching, E2E serving, and benchmark tests |
| if ctx.request.output_token_ids: | ||
| text = self.tokenizer.decode(ctx.request.output_token_ids) |
There was a problem hiding this comment.
The current implementation re-decodes the entire sequence of output tokens (ctx.request.output_token_ids) on every step to generate the output text. For long sequences, this can become a performance bottleneck due to redundant work.
Consider optimizing this by decoding only the new token and appending it to a text buffer. If the tokenizer library supports it, using a streaming decoder would be ideal. This would avoid repeated work and improve performance for requests generating many tokens.
| def _apply_chat_template(self, messages: list[ChatMessage]) -> str: | ||
| """Simple chat template — can be replaced with tokenizer's chat_template.""" | ||
| parts = [] | ||
| for msg in messages: | ||
| if msg.role == "system": | ||
| parts.append(f"<|system|>\n{msg.content}") | ||
| elif msg.role == "user": | ||
| parts.append(f"<|user|>\n{msg.content}") | ||
| elif msg.role == "assistant": | ||
| parts.append(f"<|assistant|>\n{msg.content}") | ||
| parts.append("<|assistant|>\n") | ||
| return "\n".join(parts) |
There was a problem hiding this comment.
The chat template is hardcoded here. This implementation is specific to a certain model family (like Qwen) and makes the server less portable to other models. The transformers library provides a tokenizer.apply_chat_template() method which is the standard way to handle this, using the template defined in the model's tokenizer_config.json.
I suggest replacing this hardcoded logic with a call to the tokenizer's apply_chat_template method to make the server more general and easier to maintain. You'll likely need to expose this method through your TransformersTokenizerAdapter.
def _apply_chat_template(self, messages: list[ChatMessage]) -> str:
"""Applies the model's chat template to a list of messages."""
# Assumes the tokenizer adapter exposes `apply_chat_template` from the underlying transformer tokenizer.
if hasattr(self.engine.tokenizer, "apply_chat_template") and callable(getattr(self.engine.tokenizer, "apply_chat_template")):
chat = [msg.model_dump() for msg in messages]
return self.engine.tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
# Fallback to the simple template if the tokenizer doesn't support it.
parts = []
for msg in messages:
if msg.role == "system":
parts.append(f"<|system|>\n{msg.content}")
elif msg.role == "user":
parts.append(f"<|user|>\n{msg.content}")
elif msg.role == "assistant":
parts.append(f"<|assistant|>\n{msg.content}")
parts.append("<|assistant|>\n")
return "\n".join(parts)
Summary
Complete serving architecture with multiprocess NPU worker, continuous batching scheduler, and OpenAI-compatible HTTP API. Includes prefix caching and chunked prefill for efficient long-prompt handling.
Key Changes
Core Architecture (commit 1)
/v1/completionsand/v1/chat/completionsendpoints--servemode for HTTP servingRobustness Fixes (commit 2)
Scheduler.finish_request()public APIUnified Block Pool + KV Cache (commit 3)
num_computed_tokensandblock_idsto the workerTesting
tests/test_serving_e2e.py: End-to-end streaming completions testtests/test_prefix_cache_and_chunk_prefill.py: Prefix cache and chunk prefill unit teststests/test_npu_prefix_chunk.py: NPU-level prefix + chunk integration testtests/bench_serving.py: Throughput benchmark scriptStats
14 files changed, +2376 / -21 lines