pymllm.orchestrator.request_response_process

RequestResponseProcess – the main-process entry point for user requests.

This process is not a subprocess; it lives in the engine’s main process. Incoming requests are placed into an asyncio.Queue and forwarded to the TokenizerProcess via ZMQ. Decoded results arrive back from the DetokenizerProcess and are dispatched to the waiting callers.

The request-tracking model uses ReqState pattern: each request gets an asyncio.Event + output list so that streaming (multiple incremental chunks) and one-shot responses are both supported.

Attributes

Classes

ReqState

Per-request state that supports both streaming and one-shot responses.

RequestResponseProcess

Sits in the main process; bridges user-facing API and subprocess pipeline.

Module Contents

pymllm.orchestrator.request_response_process.logger
class pymllm.orchestrator.request_response_process.ReqState

Per-request state that supports both streaming and one-shot responses.

ReqState (Event + out_list).

The recv loop appends results to out_list and signals event; callers await event.wait() in a loop, consuming results until finished is True.

out_list: List[Dict[str, Any]] = []
finished: bool = False
event: asyncio.Event
created_at: float
class pymllm.orchestrator.request_response_process.RequestResponseProcess(send_to_tokenizer_addr, recv_from_detokenizer_addr)

Sits in the main process; bridges user-facing API and subprocess pipeline.

Parameters:
  • send_to_tokenizer_addr (str)

  • recv_from_detokenizer_addr (str)

start()

Bind ZMQ sockets. Background tasks are started lazily by listen() on the first add_request() call, so they always run on the correct event loop regardless of whether the caller is uvicorn, loop.run_until_complete, or anything else.

Return type:

None

listen()

Start the send/recv background tasks on the current running event loop. Idempotent — subsequent calls are no-ops while the tasks are still alive.

Called automatically by add_request(), so callers never need to invoke this directly.

Return type:

None

async add_request(request, max_queued=None)

Enqueue request(s) and return the corresponding :class:`ReqState`(s).

  • Single request (request.is_single is True): behaves exactly as before – registers one ReqState and enqueues one message.

  • Batch request (request.is_single is False): splits the batch into N individual sub-requests, registers a ReqState per rid, and enqueues each sub-request separately so the downstream pipeline sees independent messages. Returns a List[ReqState] in the same order as the input rids.

Parameters:
Return type:

Union[ReqState, List[ReqState]]

remove_state(rid)

Remove the ReqState for rid (called by the caller once done).

Parameters:

rid (str)

Return type:

None

async abort_request(rid)

Cancel a pending request and notify downstream processes.

Parameters:

rid (str)

Return type:

None

async shutdown()
Return type:

None