pymllm.orchestrator.shared_memory_queue¶
Shared memory and queue utilities for fast IPC between tokenizer and scheduler.
This module implements the shared-queue fast path to avoid expensive ZMQ serialization of large multimodal tensors.
## Design
Metadata lane: Small tokenized objects are written to a POSIX shared memory segment keyed by the request ID (
rid). The scheduler reads and immediately unlinks the segment.Tensor lane: Large tensors can be transported in one of three modes, controlled by
TensorTransportMode(passed at queue construction time):"default"– CPU tensors only. GPU tensors are moved to POSIX shared memory viatensor.share_memory_()(or left on CPU if already there). This is the original behaviour and requires no CUDA support."cuda_ipc"– GPU tensors stay on GPU and are wrapped inTransportProxyTensor. On the receiver side the proxy’s__setstate__automatically reconstructs the tensor from the CUDA IPC handle during unpickling. CPU tensors are handled as in"default"mode. Caveat: GPU memory is not freed until the sender process exits (PyTorch limitation). Prefer"cuda_ipc_pool"for services."cuda_ipc_pool"– GPU tensors are copied into a pre-allocatedMmItemMemoryPoolworkspace and wrapped inCudaIpcTensorTransportProxy. After the receiver copies the data it increments a sync flag and the sender’s recycler thread returns the chunk to the pool. This avoids GPU memory leaks. CPU tensors are handled as in"default"mode.
## Key relationship with CUDA IPC
"default" and "cuda_ipc*" modes are mutually exclusive for GPU tensors:
In
"default"mode, GPU tensors that need to cross process boundaries must first be moved to CPU (share_memory_()). This incurs a GPU→CPU copy.In
"cuda_ipc*"modes, GPU tensors are shared as-is via CUDA IPC handles; no copy to CPU is needed.
CPU tensors are always handled via share_memory_() regardless of the mode.
Attributes¶
Classes¶
Manages shared memory segments for passing metadata between processes. |
|
Queue for passing large tensors between processes. |
Module Contents¶
- pymllm.orchestrator.shared_memory_queue.logger¶
- class pymllm.orchestrator.shared_memory_queue.SharedMemoryManager¶
Manages shared memory segments for passing metadata between processes.
Each tokenized request’s metadata is written to a unique shared memory segment keyed by its request ID (rid). The scheduler reads and immediately unlinks the segment to prevent memory leaks.
- static write_metadata(rid, metadata)¶
Write metadata to shared memory and return the segment name.
- Parameters:
rid (str) – Request ID (used as part of the shared memory name)
metadata (Any) – Serializable metadata object
- Returns:
The shared memory segment name
- Return type:
str
- static read_metadata(shm_name, unlink=True)¶
Read metadata from shared memory and optionally unlink it.
- Parameters:
shm_name (str) – The shared memory segment name
unlink (bool) – If True, immediately unlink the segment after reading
- Returns:
The deserialized metadata object
- Return type:
Any
- static cleanup(shm_name)¶
Manually cleanup a shared memory segment (for error recovery).
- Parameters:
shm_name (str)
- Return type:
None
- class pymllm.orchestrator.shared_memory_queue.TensorQueue(maxsize=0, transport_mode='default', pool=None)¶
Queue for passing large tensors between processes.
Depending on
transport_mode, GPU tensors are either moved to CPU shared memory ("default") or kept on GPU and shared via CUDA IPC handles ("cuda_ipc"/"cuda_ipc_pool").- Parameters:
maxsize (int) – Maximum queue size (0 for unlimited).
transport_mode (pymllm.orchestrator.cuda_ipc_transport.TensorTransportMode) – Controls how GPU tensors are transported.
pool (Optional[pymllm.orchestrator.cuda_ipc_transport.MmItemMemoryPool]) – Required when
transport_mode == "cuda_ipc_pool".
- put(rid, shm_name, mm_inputs)¶
Put a request into the queue.
GPU tensors inside mm_inputs are wrapped according to
transport_modebefore being placed into the underlyingmultiprocessing.Queue.- Parameters:
rid (str) – Request ID.
shm_name (str) – Shared memory segment name for the tokenized metadata.
mm_inputs (Optional[Dict[str, Any]]) – Multimodal inputs dict (may contain CUDA tensors).
- Return type:
None
- get(timeout=None)¶
Get a request from the queue.
GPU tensors wrapped as IPC proxies are not automatically reconstructed here – the caller (scheduler) must call
unwrap_mm_inputs_from_ipc()after retrieval.- Parameters:
timeout (Optional[float]) – Timeout in seconds (None for blocking).
- Returns:
Tuple of
(rid, shm_name, mm_inputs).- Return type:
tuple[str, str, Optional[Dict[str, Any]]]
- empty()¶
- Return type:
bool
- qsize()¶
- Return type:
int
- close()¶
- Return type:
None