pymllm.orchestrator.tokenizer_process¶
TokenizerProcess – subprocess that tokenizes incoming raw requests.
Receives raw requests from RequestResponseProcess via ZMQ, tokenizes them, and forwards the tokenized payloads to the SchedulerProcess.
Supports two transport modes (controlled by enable_shared_queue and
tensor_transport_mode in the tokenizer config):
Legacy ZMQ path (
enable_shared_queue=False): Tokenized objects are sent directly viaZMQ send_pyobj(pickle). This is simple but slow for large multimodal tensors.Shared queue fast path (
enable_shared_queue=True): Metadata is written to POSIX shared memory and the queue carries a lightweight(rid, shm_name, mm_inputs)tuple. The GPU tensors insidemm_inputsare transported differently depending ontensor_transport_mode:"default"– GPU tensors are moved to CPU first (GPU→CPU copy), then placed in POSIX shared memory."cuda_ipc"– GPU tensors stay on GPU; they are wrapped in aTransportProxyTensorwhose pickle uses CUDA IPC handles. Simple but may leak GPU memory."cuda_ipc_pool"– GPU tensors are copied into a pre-allocatedMmItemMemoryPoolworkspace and shared via pool-chunk IPC handles. Chunks are recycled; no GPU memory is leaked.
Attributes¶
Classes¶
Runs inside a subprocess spawned by |
Functions¶
|
Entry point for |
Module Contents¶
- pymllm.orchestrator.tokenizer_process.logger¶
- class pymllm.orchestrator.tokenizer_process.TokenizerProcess(recv_from_rr_addr, send_to_scheduler_addr, tokenizer_cfg, shared_queue=None)¶
Runs inside a subprocess spawned by
torch.multiprocessing.- Parameters:
recv_from_rr_addr (str)
send_to_scheduler_addr (str)
tokenizer_cfg (Dict[str, Any])
shared_queue (Optional[pymllm.orchestrator.shared_memory_queue.TensorQueue])
- init_sockets()¶
- Return type:
None
- event_loop()¶
Infinite loop: recv raw request -> tokenize -> send to scheduler.
- Return type:
None
- shutdown()¶
- Return type:
None
- pymllm.orchestrator.tokenizer_process.run_tokenizer_process(recv_from_rr_addr, send_to_scheduler_addr, pipe_writer, tokenizer_cfg, shared_queue=None)¶
Entry point for
torch.multiprocessing.Process(target=...).- Parameters:
recv_from_rr_addr (str)
send_to_scheduler_addr (str)
pipe_writer (multiprocessing.connection.Connection)
tokenizer_cfg (Dict[str, Any])
shared_queue (Optional[pymllm.orchestrator.shared_memory_queue.TensorQueue])
- Return type:
None