pymllm.orchestrator.scheduler_process ===================================== .. py:module:: pymllm.orchestrator.scheduler_process .. autoapi-nested-parse:: SchedulerProcess -- the central scheduling and inference hub. Receives tokenized requests from the TokenizerProcess, organises them into batches, runs model forward passes via the **in-process** model runner, and streams finished token IDs to the DetokenizerProcess. Architecture: the scheduler owns the :class:`ModelRunnerProcess` directly (same process, direct function calls). GPU resources (KV cache, req pool slots) are freed immediately when requests finish — no cross-process communication needed. Request ingestion supports two modes: 1. ZMQ path: Receive TokenizedGenerateReqInput via ZMQ recv_pyobj 2. Shared queue fast path: Read from shared memory + multiprocessing queue The main ``event_loop``:: while True: recv_requests() process_input_requests() batch = get_next_batch_to_run() # also frees finished GPU resources if batch: result = run_batch(batch) # direct call to model runner process_batch_result(batch, result) else: idle_sleeper.sleep() # block until ZMQ data or timeout stream_output() Attributes ---------- .. autoapisummary:: pymllm.orchestrator.scheduler_process.logger Classes ------- .. autoapisummary:: pymllm.orchestrator.scheduler_process.IdleSleeper pymllm.orchestrator.scheduler_process.Req pymllm.orchestrator.scheduler_process.ScheduleBatch pymllm.orchestrator.scheduler_process.SchedulerProcess Functions --------- .. autoapisummary:: pymllm.orchestrator.scheduler_process.run_scheduler_process Module Contents --------------- .. py:data:: logger .. py:class:: IdleSleeper(sockets, poll_timeout_ms = _DEFAULT_IDLE_POLL_TIMEOUT_MS) Block the scheduler thread when idle using ZMQ Poller. Avoids 100% CPU spinning when no requests are pending. The poller wakes immediately when data arrives on any registered socket, so request latency is not affected. .. py:attribute:: poller .. py:attribute:: poll_timeout_ms :value: 1000 .. py:method:: sleep() Block until data arrives on any registered socket, or timeout. .. py:class:: Req(rid, input_ids, input_text = '', sampling_params = None, mm_inputs = None, stream = False, return_logprob = False, logprob_start_len = -1, top_logprobs_num = 0) Tracks a single request through its lifecycle (prefill -> decode -> finish). Created by :meth:`SchedulerProcess.process_input_requests` from a :class:`~pymllm.engine.io_struct.TokenizedGenerateReqInput`. .. py:attribute:: __slots__ :value: ('rid', 'input_ids', 'input_text', 'sampling_params', 'mm_inputs', 'stream', 'return_logprob',... .. py:attribute:: rid .. py:attribute:: input_ids .. py:attribute:: input_text :value: '' .. py:attribute:: mm_inputs :value: None .. py:attribute:: stream :value: False .. py:attribute:: return_logprob :value: False .. py:attribute:: logprob_start_len :value: -1 .. py:attribute:: top_logprobs_num :value: 0 .. py:attribute:: sampling_params .. py:attribute:: max_new_tokens :type: int .. py:attribute:: temperature :type: float .. py:attribute:: top_p :type: float .. py:attribute:: top_k :type: int .. py:attribute:: stop_token_ids :type: List[int] .. py:attribute:: req_pool_idx :type: int :value: -1 .. py:attribute:: seq_len :type: int .. py:attribute:: prefix_len :type: int :value: 0 .. py:attribute:: output_ids :type: List[int] :value: [] .. py:attribute:: finished_reason :type: Optional[str] :value: None .. py:attribute:: is_prefilled :type: bool :value: False .. py:attribute:: read_offset :type: int :value: 0 .. py:attribute:: prompt_len :type: int .. py:method:: check_finished() Check if this request has reached a finish condition. Sets ``finished_reason`` and returns True if finished. Checks: 1. Stop token (EOS tokens are merged into stop_token_ids during :meth:`SchedulerProcess.process_input_requests`) 2. ``max_new_tokens`` reached .. py:property:: is_finished :type: bool .. py:method:: abort() Mark this request as aborted. .. py:method:: __repr__() .. py:class:: ScheduleBatch(reqs, forward_mode) Wraps a list of :class:`Req` objects for a single forward pass. Provides helpers to assemble the batch dict sent to the ModelRunnerProcess in the format expected by :class:`~pymllm.engine.forward_batch.ForwardBatch`. .. py:attribute:: reqs .. py:attribute:: forward_mode .. py:property:: batch_size :type: int .. py:method:: prepare_for_extend() Assemble a batch dict for prefill / extend forward pass. Returns a dict with flattened ``input_ids``, per-request ``positions``, ``req_pool_indices``, ``seq_lens``, ``extend_seq_lens``, ``extend_prefix_lens``, and request metadata. Note: The scheduler sends the **full** input_ids (no prefix trimming). The ModelRunnerProcess performs radix cache prefix matching and rebuilds the tensors with actual prefix lengths before the forward pass. The ``extend_prefix_lens`` here are always 0 from the scheduler; they serve as placeholders. .. py:method:: prepare_for_decode() Assemble a batch dict for decode forward pass (one token per request). Returns a dict with one input token per request (the last generated token), positions at ``seq_len``, and request metadata. .. py:method:: to_batch_dict() Build the batch dict appropriate for the current forward mode. .. py:method:: __repr__() .. py:class:: SchedulerProcess(recv_from_tokenizer_addr, send_to_detokenizer_addr, server_config = None, model_config = None, gpu_id = 0, shared_queue = None, enable_shared_queue = False, tensor_transport_mode = 'default', max_running_requests = _DEFAULT_MAX_RUNNING_REQUESTS, max_prefill_tokens = _DEFAULT_MAX_PREFILL_TOKENS, max_total_tokens = _DEFAULT_MAX_TOTAL_TOKENS, eos_token_ids = None, default_max_new_tokens = _DEFAULT_MAX_NEW_TOKENS) Runs inside a subprocess. Central hub that drives the inference loop. .. py:method:: init_sockets() .. py:method:: init_model() Create and initialise the in-process model runner. Must be called after ``init_sockets`` and inside the subprocess (after spawn) since it performs CUDA initialisation. .. py:method:: event_loop() Infinite scheduling loop. .. py:method:: recv_requests() Non-blocking receive of tokenized requests from TokenizerProcess. Supports two modes: 1. Legacy ZMQ: Uses ``zmq.Poller`` with a short timeout 2. Shared queue: Non-blocking get from multiprocessing.Queue Messages are either: * A :class:`~pymllm.engine.io_struct.TokenizedGenerateReqInput` dataclass – appended to ``_waiting_queue``. * A plain abort sentinel dict ``{"rid": ..., "abort": True}`` – handled inline by removing the matching rid from the waiting queue. .. py:method:: process_input_requests() Convert raw :class:`TokenizedGenerateReqInput` in ``_waiting_queue`` into :class:`Req` objects and move them to ``_pending_queue``. For each request: 1. Parse sampling params (max_new_tokens, temperature, top_p, top_k, stop_token_ids with defaults from EOS token). 2. Create a ``Req`` object. 3. Move from ``_waiting_queue`` to ``_pending_queue``. .. py:method:: get_next_batch_to_run() Implements continuous batching with two phases. 1. **Filter finished**: Remove finished requests from ``_running_batch`` and free their token budget. 2. **Schedule new prefills**: From ``_pending_queue``, admit requests that fit within the token budget and ``max_running_requests``. 3. **Build batch**: - If new prefill requests exist -> EXTEND batch - Else if running decode requests exist -> DECODE batch - Else -> None (idle) Note on prefix cache: The actual prefix matching is done by the ModelRunnerProcess (which owns the RadixCache). The scheduler uses ``input_len`` as a conservative budget estimate. The model runner reports back actual ``prefix_len`` in results, and the scheduler adjusts ``_used_tokens`` accordingly in ``process_batch_result``. .. py:method:: run_batch(batch) Execute the batch via the in-process model runner. Direct function call — no ZMQ serialisation overhead. .. py:method:: process_batch_result(batch, result) Handle the result returned by the ModelRunnerProcess. For each request in the result: 1. Update ``prefix_len`` from the model runner's radix cache hit. 2. Adjust ``_used_tokens`` if a prefix cache hit was found (the scheduler over-reserved during scheduling). 3. Append new token(s) to ``req.output_ids``. 4. Increment ``req.seq_len``. 5. Call ``req.check_finished()`` (EOS token, max_new_tokens). 6. If prefill request: mark ``req.is_prefilled = True``, move to running batch for decode. 7. If finished: collect for output, free KV-cache budget. .. py:method:: stream_output() Send finished/streaming outputs to the DetokenizerProcess. Produces :class:`~pymllm.engine.io_struct.BatchTokenIDOutput`-compatible dicts. For streaming requests, intermediate tokens are also sent. .. py:method:: shutdown() .. py:function:: run_scheduler_process(recv_from_tokenizer_addr, send_to_detokenizer_addr, pipe_writer, shared_queue = None, enable_shared_queue = False, tensor_transport_mode = 'default', log_level = 'info', default_max_new_tokens = _DEFAULT_MAX_NEW_TOKENS, eos_token_ids = None, server_config = None, model_config = None, gpu_id = 0) Entry point for ``torch.multiprocessing.Process(target=...)``. The scheduler process now also owns the model runner, so model initialisation happens here.