pymllm.orchestrator.scheduler_process¶
SchedulerProcess – the central scheduling and inference hub.
Receives tokenized requests from the TokenizerProcess, organises them into batches, runs model forward passes via the in-process model runner, and streams finished token IDs to the DetokenizerProcess.
Architecture: the scheduler owns the ModelRunnerProcess directly
(same process, direct function calls). GPU resources (KV cache, req pool
slots) are freed immediately when requests finish — no cross-process
communication needed.
- Request ingestion supports two modes:
ZMQ path: Receive TokenizedGenerateReqInput via ZMQ recv_pyobj
Shared queue fast path: Read from shared memory + multiprocessing queue
The main event_loop:
while True:
recv_requests()
process_input_requests()
batch = get_next_batch_to_run() # also frees finished GPU resources
if batch:
result = run_batch(batch) # direct call to model runner
process_batch_result(batch, result)
else:
idle_sleeper.sleep() # block until ZMQ data or timeout
stream_output()
Attributes¶
Classes¶
Block the scheduler thread when idle using ZMQ Poller. |
|
Tracks a single request through its lifecycle (prefill -> decode -> finish). |
|
Wraps a list of |
|
Runs inside a subprocess. Central hub that drives the inference loop. |
Functions¶
|
Entry point for |
Module Contents¶
- pymllm.orchestrator.scheduler_process.logger¶
- class pymllm.orchestrator.scheduler_process.IdleSleeper(sockets, poll_timeout_ms=_DEFAULT_IDLE_POLL_TIMEOUT_MS)¶
Block the scheduler thread when idle using ZMQ Poller.
Avoids 100% CPU spinning when no requests are pending. The poller wakes immediately when data arrives on any registered socket, so request latency is not affected.
- Parameters:
sockets (list)
poll_timeout_ms (int)
- poller¶
- poll_timeout_ms = 1000¶
- sleep()¶
Block until data arrives on any registered socket, or timeout.
- Return type:
None
- class pymllm.orchestrator.scheduler_process.Req(rid, input_ids, input_text='', sampling_params=None, mm_inputs=None, stream=False, return_logprob=False, logprob_start_len=-1, top_logprobs_num=0)¶
Tracks a single request through its lifecycle (prefill -> decode -> finish).
Created by
SchedulerProcess.process_input_requests()from aTokenizedGenerateReqInput.- Parameters:
rid (str)
input_ids (List[int])
input_text (str)
sampling_params (Optional[Dict[str, Any]])
mm_inputs (Optional[Dict[str, Any]])
stream (bool)
return_logprob (bool)
logprob_start_len (int)
top_logprobs_num (int)
- __slots__ = ('rid', 'input_ids', 'input_text', 'sampling_params', 'mm_inputs', 'stream', 'return_logprob',...¶
- rid¶
- input_ids¶
- input_text = ''¶
- mm_inputs = None¶
- stream = False¶
- return_logprob = False¶
- logprob_start_len = -1¶
- top_logprobs_num = 0¶
- sampling_params¶
- max_new_tokens: int¶
- temperature: float¶
- top_p: float¶
- top_k: int¶
- stop_token_ids: List[int]¶
- req_pool_idx: int = -1¶
- seq_len: int¶
- prefix_len: int = 0¶
- output_ids: List[int] = []¶
- finished_reason: str | None = None¶
- is_prefilled: bool = False¶
- read_offset: int = 0¶
- prompt_len: int¶
- check_finished()¶
Check if this request has reached a finish condition.
Sets
finished_reasonand returns True if finished. Checks: 1. Stop token (EOS tokens are merged into stop_token_ids duringmax_new_tokensreached
- Return type:
bool
- property is_finished: bool¶
- Return type:
bool
- abort()¶
Mark this request as aborted.
- Return type:
None
- __repr__()¶
- Return type:
str
- class pymllm.orchestrator.scheduler_process.ScheduleBatch(reqs, forward_mode)¶
Wraps a list of
Reqobjects for a single forward pass.Provides helpers to assemble the batch dict sent to the ModelRunnerProcess in the format expected by
ForwardBatch.- Parameters:
reqs (List[Req])
forward_mode (pymllm.engine.forward_batch.ForwardMode)
- reqs¶
- forward_mode¶
- property batch_size: int¶
- Return type:
int
- prepare_for_extend()¶
Assemble a batch dict for prefill / extend forward pass.
Returns a dict with flattened
input_ids, per-requestpositions,req_pool_indices,seq_lens,extend_seq_lens,extend_prefix_lens, and request metadata.Note: The scheduler sends the full input_ids (no prefix trimming). The ModelRunnerProcess performs radix cache prefix matching and rebuilds the tensors with actual prefix lengths before the forward pass. The
extend_prefix_lenshere are always 0 from the scheduler; they serve as placeholders.- Return type:
Dict[str, Any]
- prepare_for_decode()¶
Assemble a batch dict for decode forward pass (one token per request).
Returns a dict with one input token per request (the last generated token), positions at
seq_len, and request metadata.- Return type:
Dict[str, Any]
- to_batch_dict()¶
Build the batch dict appropriate for the current forward mode.
- Return type:
Dict[str, Any]
- __repr__()¶
- Return type:
str
- class pymllm.orchestrator.scheduler_process.SchedulerProcess(recv_from_tokenizer_addr, send_to_detokenizer_addr, server_config=None, model_config=None, gpu_id=0, shared_queue=None, enable_shared_queue=False, tensor_transport_mode='default', max_running_requests=_DEFAULT_MAX_RUNNING_REQUESTS, max_prefill_tokens=_DEFAULT_MAX_PREFILL_TOKENS, max_total_tokens=_DEFAULT_MAX_TOTAL_TOKENS, eos_token_ids=None, default_max_new_tokens=_DEFAULT_MAX_NEW_TOKENS)¶
Runs inside a subprocess. Central hub that drives the inference loop.
- Parameters:
recv_from_tokenizer_addr (str)
send_to_detokenizer_addr (str)
server_config (Optional[Any])
model_config (Optional[Any])
gpu_id (int)
shared_queue (Optional[pymllm.orchestrator.shared_memory_queue.TensorQueue])
enable_shared_queue (bool)
tensor_transport_mode (pymllm.orchestrator.cuda_ipc_transport.TensorTransportMode)
max_running_requests (int)
max_prefill_tokens (int)
max_total_tokens (int)
eos_token_ids (Optional[List[int]])
default_max_new_tokens (int)
- init_sockets()¶
- Return type:
None
- init_model()¶
Create and initialise the in-process model runner.
Must be called after
init_socketsand inside the subprocess (after spawn) since it performs CUDA initialisation.- Return type:
None
- event_loop()¶
Infinite scheduling loop.
- Return type:
None
- recv_requests()¶
Non-blocking receive of tokenized requests from TokenizerProcess.
Supports two modes: 1. Legacy ZMQ: Uses
zmq.Pollerwith a short timeout 2. Shared queue: Non-blocking get from multiprocessing.QueueMessages are either: * A
TokenizedGenerateReqInputdataclass – appended to
_waiting_queue.A plain abort sentinel dict
{"rid": ..., "abort": True}– handled inline by removing the matching rid from the waiting queue.
- Return type:
None
- process_input_requests()¶
Convert raw
TokenizedGenerateReqInputin_waiting_queueintoReqobjects and move them to_pending_queue.For each request: 1. Parse sampling params (max_new_tokens, temperature, top_p, top_k,
stop_token_ids with defaults from EOS token).
Create a
Reqobject.Move from
_waiting_queueto_pending_queue.
- Return type:
None
- get_next_batch_to_run()¶
Implements continuous batching with two phases.
Filter finished: Remove finished requests from
_running_batchand free their token budget.Schedule new prefills: From
_pending_queue, admit requests that fit within the token budget andmax_running_requests.Build batch: - If new prefill requests exist -> EXTEND batch - Else if running decode requests exist -> DECODE batch - Else -> None (idle)
Note on prefix cache: The actual prefix matching is done by the ModelRunnerProcess (which owns the RadixCache). The scheduler uses
input_lenas a conservative budget estimate. The model runner reports back actualprefix_lenin results, and the scheduler adjusts_used_tokensaccordingly inprocess_batch_result.- Return type:
Optional[ScheduleBatch]
- run_batch(batch)¶
Execute the batch via the in-process model runner.
Direct function call — no ZMQ serialisation overhead.
- Parameters:
batch (ScheduleBatch)
- Return type:
Dict[str, Any]
- process_batch_result(batch, result)¶
Handle the result returned by the ModelRunnerProcess.
For each request in the result: 1. Update
prefix_lenfrom the model runner’s radix cache hit. 2. Adjust_used_tokensif a prefix cache hit was found (thescheduler over-reserved during scheduling).
Append new token(s) to
req.output_ids.Increment
req.seq_len.Call
req.check_finished()(EOS token, max_new_tokens).If prefill request: mark
req.is_prefilled = True, move to running batch for decode.If finished: collect for output, free KV-cache budget.
- Parameters:
batch (ScheduleBatch)
result (Dict[str, Any])
- Return type:
None
- stream_output()¶
Send finished/streaming outputs to the DetokenizerProcess.
Produces
BatchTokenIDOutput-compatible dicts. For streaming requests, intermediate tokens are also sent.- Return type:
None
- shutdown()¶
- Return type:
None
- pymllm.orchestrator.scheduler_process.run_scheduler_process(recv_from_tokenizer_addr, send_to_detokenizer_addr, pipe_writer, shared_queue=None, enable_shared_queue=False, tensor_transport_mode='default', log_level='info', default_max_new_tokens=_DEFAULT_MAX_NEW_TOKENS, eos_token_ids=None, server_config=None, model_config=None, gpu_id=0)¶
Entry point for
torch.multiprocessing.Process(target=...).The scheduler process now also owns the model runner, so model initialisation happens here.
- Parameters:
recv_from_tokenizer_addr (str)
send_to_detokenizer_addr (str)
pipe_writer (multiprocessing.connection.Connection)
shared_queue (Optional[pymllm.orchestrator.shared_memory_queue.TensorQueue])
enable_shared_queue (bool)
tensor_transport_mode (pymllm.orchestrator.cuda_ipc_transport.TensorTransportMode)
log_level (str)
default_max_new_tokens (int)
eos_token_ids (Optional[List[int]])
server_config (Optional[Any])
model_config (Optional[Any])
gpu_id (int)
- Return type:
None