pymllm.orchestrator.request_response_process¶
RequestResponseProcess – the main-process entry point for user requests.
This process is not a subprocess; it lives in the engine’s main process.
Incoming requests are placed into an asyncio.Queue and forwarded to the
TokenizerProcess via ZMQ. Decoded results arrive back from the
DetokenizerProcess and are dispatched to the waiting callers.
The request-tracking model uses ReqState pattern: each request
gets an asyncio.Event + output list so that streaming (multiple incremental
chunks) and one-shot responses are both supported.
Attributes¶
Classes¶
Per-request state that supports both streaming and one-shot responses. |
|
Sits in the main process; bridges user-facing API and subprocess pipeline. |
Module Contents¶
- pymllm.orchestrator.request_response_process.logger¶
- class pymllm.orchestrator.request_response_process.ReqState¶
Per-request state that supports both streaming and one-shot responses.
ReqState(Event + out_list).The recv loop appends results to out_list and signals event; callers
await event.wait()in a loop, consuming results until finished isTrue.- out_list: List[Dict[str, Any]] = []¶
- finished: bool = False¶
- event: asyncio.Event¶
- created_at: float¶
- class pymllm.orchestrator.request_response_process.RequestResponseProcess(send_to_tokenizer_addr, recv_from_detokenizer_addr)¶
Sits in the main process; bridges user-facing API and subprocess pipeline.
- Parameters:
send_to_tokenizer_addr (str)
recv_from_detokenizer_addr (str)
- start()¶
Bind ZMQ sockets. Background tasks are started lazily by
listen()on the firstadd_request()call, so they always run on the correct event loop regardless of whether the caller is uvicorn,loop.run_until_complete, or anything else.- Return type:
None
- listen()¶
Start the send/recv background tasks on the current running event loop. Idempotent — subsequent calls are no-ops while the tasks are still alive.
Called automatically by
add_request(), so callers never need to invoke this directly.- Return type:
None
- async add_request(request, max_queued=None)¶
Enqueue request(s) and return the corresponding :class:`ReqState`(s).
Single request (
request.is_single is True): behaves exactly as before – registers oneReqStateand enqueues one message.Batch request (
request.is_single is False): splits the batch into N individual sub-requests, registers aReqStateper rid, and enqueues each sub-request separately so the downstream pipeline sees independent messages. Returns aList[ReqState]in the same order as the input rids.
- Parameters:
max_queued (Optional[int]) – If set, raise
RuntimeErrorwhen the queue already has this many items (back-pressure / overload protection).request (pymllm.engine.io_struct.GenerateReqInput)
- Return type:
- remove_state(rid)¶
Remove the
ReqStatefor rid (called by the caller once done).- Parameters:
rid (str)
- Return type:
None
- async abort_request(rid)¶
Cancel a pending request and notify downstream processes.
- Parameters:
rid (str)
- Return type:
None
- async shutdown()¶
- Return type:
None