pymllm.orchestrator.request_response_process ============================================ .. py:module:: pymllm.orchestrator.request_response_process .. autoapi-nested-parse:: RequestResponseProcess -- the main-process entry point for user requests. This process is **not** a subprocess; it lives in the engine's main process. Incoming requests are placed into an ``asyncio.Queue`` and forwarded to the TokenizerProcess via ZMQ. Decoded results arrive back from the DetokenizerProcess and are dispatched to the waiting callers. The request-tracking model uses ``ReqState`` pattern: each request gets an ``asyncio.Event`` + output list so that streaming (multiple incremental chunks) and one-shot responses are both supported. Attributes ---------- .. autoapisummary:: pymllm.orchestrator.request_response_process.logger Classes ------- .. autoapisummary:: pymllm.orchestrator.request_response_process.ReqState pymllm.orchestrator.request_response_process.RequestResponseProcess Module Contents --------------- .. py:data:: logger .. py:class:: ReqState Per-request state that supports both streaming and one-shot responses. ``ReqState`` (Event + out_list). The recv loop appends results to *out_list* and signals *event*; callers ``await event.wait()`` in a loop, consuming results until *finished* is ``True``. .. py:attribute:: out_list :type: List[Dict[str, Any]] :value: [] .. py:attribute:: finished :type: bool :value: False .. py:attribute:: event :type: asyncio.Event .. py:attribute:: created_at :type: float .. py:class:: RequestResponseProcess(send_to_tokenizer_addr, recv_from_detokenizer_addr) Sits in the main process; bridges user-facing API and subprocess pipeline. .. py:method:: start() Bind ZMQ sockets. Background tasks are started lazily by :meth:`listen` on the first :meth:`add_request` call, so they always run on the correct event loop regardless of whether the caller is uvicorn, ``loop.run_until_complete``, or anything else. .. py:method:: listen() Start the send/recv background tasks on the **current** running event loop. Idempotent — subsequent calls are no-ops while the tasks are still alive. Called automatically by :meth:`add_request`, so callers never need to invoke this directly. .. py:method:: add_request(request, max_queued = None) :async: Enqueue request(s) and return the corresponding :class:`ReqState`(s). * **Single request** (``request.is_single is True``): behaves exactly as before – registers one ``ReqState`` and enqueues one message. * **Batch request** (``request.is_single is False``): splits the batch into *N* individual sub-requests, registers a ``ReqState`` per rid, and enqueues each sub-request separately so the downstream pipeline sees independent messages. Returns a ``List[ReqState]`` in the same order as the input rids. :param max_queued: If set, raise ``RuntimeError`` when the queue already has this many items (back-pressure / overload protection). .. py:method:: remove_state(rid) Remove the ``ReqState`` for *rid* (called by the caller once done). .. py:method:: abort_request(rid) :async: Cancel a pending request and notify downstream processes. .. py:method:: shutdown() :async: