pymllm.orchestrator.scheduler_process

SchedulerProcess – the central scheduling and inference hub.

Receives tokenized requests from the TokenizerProcess, organises them into batches, runs model forward passes via the in-process model runner, and streams finished token IDs to the DetokenizerProcess.

Architecture: the scheduler owns the ModelRunnerProcess directly (same process, direct function calls). GPU resources (KV cache, req pool slots) are freed immediately when requests finish — no cross-process communication needed.

Request ingestion supports two modes:
  1. ZMQ path: Receive TokenizedGenerateReqInput via ZMQ recv_pyobj

  2. Shared queue fast path: Read from shared memory + multiprocessing queue

The main event_loop:

while True:
    recv_requests()
    process_input_requests()
    batch = get_next_batch_to_run()   # also frees finished GPU resources
    if batch:
        result = run_batch(batch)      # direct call to model runner
        process_batch_result(batch, result)
    else:
        idle_sleeper.sleep()           # block until ZMQ data or timeout
    stream_output()

Attributes

Classes

IdleSleeper

Block the scheduler thread when idle using ZMQ Poller.

Req

Tracks a single request through its lifecycle (prefill -> decode -> finish).

ScheduleBatch

Wraps a list of Req objects for a single forward pass.

SchedulerProcess

Runs inside a subprocess. Central hub that drives the inference loop.

Functions

run_scheduler_process(recv_from_tokenizer_addr, ...[, ...])

Entry point for torch.multiprocessing.Process(target=...).

Module Contents

pymllm.orchestrator.scheduler_process.logger
class pymllm.orchestrator.scheduler_process.IdleSleeper(sockets, poll_timeout_ms=_DEFAULT_IDLE_POLL_TIMEOUT_MS)

Block the scheduler thread when idle using ZMQ Poller.

Avoids 100% CPU spinning when no requests are pending. The poller wakes immediately when data arrives on any registered socket, so request latency is not affected.

Parameters:
  • sockets (list)

  • poll_timeout_ms (int)

poller
poll_timeout_ms = 1000
sleep()

Block until data arrives on any registered socket, or timeout.

Return type:

None

class pymllm.orchestrator.scheduler_process.Req(rid, input_ids, input_text='', sampling_params=None, mm_inputs=None, stream=False, return_logprob=False, logprob_start_len=-1, top_logprobs_num=0)

Tracks a single request through its lifecycle (prefill -> decode -> finish).

Created by SchedulerProcess.process_input_requests() from a TokenizedGenerateReqInput.

Parameters:
  • rid (str)

  • input_ids (List[int])

  • input_text (str)

  • sampling_params (Optional[Dict[str, Any]])

  • mm_inputs (Optional[Dict[str, Any]])

  • stream (bool)

  • return_logprob (bool)

  • logprob_start_len (int)

  • top_logprobs_num (int)

__slots__ = ('rid', 'input_ids', 'input_text', 'sampling_params', 'mm_inputs', 'stream', 'return_logprob',...
rid
input_ids
input_text = ''
mm_inputs = None
stream = False
return_logprob = False
logprob_start_len = -1
top_logprobs_num = 0
sampling_params
max_new_tokens: int
temperature: float
top_p: float
top_k: int
stop_token_ids: List[int]
req_pool_idx: int = -1
seq_len: int
prefix_len: int = 0
output_ids: List[int] = []
finished_reason: str | None = None
is_prefilled: bool = False
read_offset: int = 0
prompt_len: int
check_finished()

Check if this request has reached a finish condition.

Sets finished_reason and returns True if finished. Checks: 1. Stop token (EOS tokens are merged into stop_token_ids during

  1. max_new_tokens reached

Return type:

bool

property is_finished: bool
Return type:

bool

abort()

Mark this request as aborted.

Return type:

None

__repr__()
Return type:

str

class pymllm.orchestrator.scheduler_process.ScheduleBatch(reqs, forward_mode)

Wraps a list of Req objects for a single forward pass.

Provides helpers to assemble the batch dict sent to the ModelRunnerProcess in the format expected by ForwardBatch.

Parameters:
reqs
forward_mode
property batch_size: int
Return type:

int

prepare_for_extend()

Assemble a batch dict for prefill / extend forward pass.

Returns a dict with flattened input_ids, per-request positions, req_pool_indices, seq_lens, extend_seq_lens, extend_prefix_lens, and request metadata.

Note: The scheduler sends the full input_ids (no prefix trimming). The ModelRunnerProcess performs radix cache prefix matching and rebuilds the tensors with actual prefix lengths before the forward pass. The extend_prefix_lens here are always 0 from the scheduler; they serve as placeholders.

Return type:

Dict[str, Any]

prepare_for_decode()

Assemble a batch dict for decode forward pass (one token per request).

Returns a dict with one input token per request (the last generated token), positions at seq_len, and request metadata.

Return type:

Dict[str, Any]

to_batch_dict()

Build the batch dict appropriate for the current forward mode.

Return type:

Dict[str, Any]

__repr__()
Return type:

str

class pymllm.orchestrator.scheduler_process.SchedulerProcess(recv_from_tokenizer_addr, send_to_detokenizer_addr, server_config=None, model_config=None, gpu_id=0, shared_queue=None, enable_shared_queue=False, tensor_transport_mode='default', max_running_requests=_DEFAULT_MAX_RUNNING_REQUESTS, max_prefill_tokens=_DEFAULT_MAX_PREFILL_TOKENS, max_total_tokens=_DEFAULT_MAX_TOTAL_TOKENS, eos_token_ids=None, default_max_new_tokens=_DEFAULT_MAX_NEW_TOKENS)

Runs inside a subprocess. Central hub that drives the inference loop.

Parameters:
  • recv_from_tokenizer_addr (str)

  • send_to_detokenizer_addr (str)

  • server_config (Optional[Any])

  • model_config (Optional[Any])

  • gpu_id (int)

  • shared_queue (Optional[pymllm.orchestrator.shared_memory_queue.TensorQueue])

  • enable_shared_queue (bool)

  • tensor_transport_mode (pymllm.orchestrator.cuda_ipc_transport.TensorTransportMode)

  • max_running_requests (int)

  • max_prefill_tokens (int)

  • max_total_tokens (int)

  • eos_token_ids (Optional[List[int]])

  • default_max_new_tokens (int)

init_sockets()
Return type:

None

init_model()

Create and initialise the in-process model runner.

Must be called after init_sockets and inside the subprocess (after spawn) since it performs CUDA initialisation.

Return type:

None

event_loop()

Infinite scheduling loop.

Return type:

None

recv_requests()

Non-blocking receive of tokenized requests from TokenizerProcess.

Supports two modes: 1. Legacy ZMQ: Uses zmq.Poller with a short timeout 2. Shared queue: Non-blocking get from multiprocessing.Queue

Messages are either: * A TokenizedGenerateReqInput

dataclass – appended to _waiting_queue.

  • A plain abort sentinel dict {"rid": ..., "abort": True} – handled inline by removing the matching rid from the waiting queue.

Return type:

None

process_input_requests()

Convert raw TokenizedGenerateReqInput in _waiting_queue into Req objects and move them to _pending_queue.

For each request: 1. Parse sampling params (max_new_tokens, temperature, top_p, top_k,

stop_token_ids with defaults from EOS token).

  1. Create a Req object.

  2. Move from _waiting_queue to _pending_queue.

Return type:

None

get_next_batch_to_run()

Implements continuous batching with two phases.

  1. Filter finished: Remove finished requests from _running_batch and free their token budget.

  2. Schedule new prefills: From _pending_queue, admit requests that fit within the token budget and max_running_requests.

  3. Build batch: - If new prefill requests exist -> EXTEND batch - Else if running decode requests exist -> DECODE batch - Else -> None (idle)

Note on prefix cache: The actual prefix matching is done by the ModelRunnerProcess (which owns the RadixCache). The scheduler uses input_len as a conservative budget estimate. The model runner reports back actual prefix_len in results, and the scheduler adjusts _used_tokens accordingly in process_batch_result.

Return type:

Optional[ScheduleBatch]

run_batch(batch)

Execute the batch via the in-process model runner.

Direct function call — no ZMQ serialisation overhead.

Parameters:

batch (ScheduleBatch)

Return type:

Dict[str, Any]

process_batch_result(batch, result)

Handle the result returned by the ModelRunnerProcess.

For each request in the result: 1. Update prefix_len from the model runner’s radix cache hit. 2. Adjust _used_tokens if a prefix cache hit was found (the

scheduler over-reserved during scheduling).

  1. Append new token(s) to req.output_ids.

  2. Increment req.seq_len.

  3. Call req.check_finished() (EOS token, max_new_tokens).

  4. If prefill request: mark req.is_prefilled = True, move to running batch for decode.

  5. If finished: collect for output, free KV-cache budget.

Parameters:
Return type:

None

stream_output()

Send finished/streaming outputs to the DetokenizerProcess.

Produces BatchTokenIDOutput-compatible dicts. For streaming requests, intermediate tokens are also sent.

Return type:

None

shutdown()
Return type:

None

pymllm.orchestrator.scheduler_process.run_scheduler_process(recv_from_tokenizer_addr, send_to_detokenizer_addr, pipe_writer, shared_queue=None, enable_shared_queue=False, tensor_transport_mode='default', log_level='info', default_max_new_tokens=_DEFAULT_MAX_NEW_TOKENS, eos_token_ids=None, server_config=None, model_config=None, gpu_id=0)

Entry point for torch.multiprocessing.Process(target=...).

The scheduler process now also owns the model runner, so model initialisation happens here.

Parameters:
  • recv_from_tokenizer_addr (str)

  • send_to_detokenizer_addr (str)

  • pipe_writer (multiprocessing.connection.Connection)

  • shared_queue (Optional[pymllm.orchestrator.shared_memory_queue.TensorQueue])

  • enable_shared_queue (bool)

  • tensor_transport_mode (pymllm.orchestrator.cuda_ipc_transport.TensorTransportMode)

  • log_level (str)

  • default_max_new_tokens (int)

  • eos_token_ids (Optional[List[int]])

  • server_config (Optional[Any])

  • model_config (Optional[Any])

  • gpu_id (int)

Return type:

None