pymllm.orchestrator.shared_memory_queue ======================================= .. py:module:: pymllm.orchestrator.shared_memory_queue .. autoapi-nested-parse:: Shared memory and queue utilities for fast IPC between tokenizer and scheduler. This module implements the shared-queue fast path to avoid expensive ZMQ serialization of large multimodal tensors. ## Design - **Metadata lane**: Small tokenized objects are written to a POSIX shared memory segment keyed by the request ID (``rid``). The scheduler reads and immediately unlinks the segment. - **Tensor lane**: Large tensors can be transported in one of three modes, controlled by ``TensorTransportMode`` (passed at queue construction time): * ``"default"`` – CPU tensors only. GPU tensors are moved to POSIX shared memory via ``tensor.share_memory_()`` (or left on CPU if already there). This is the original behaviour and requires no CUDA support. * ``"cuda_ipc"`` – GPU tensors stay on GPU and are wrapped in :class:`~pymllm.orchestrator.cuda_ipc_transport.TransportProxyTensor`. On the receiver side the proxy's ``__setstate__`` automatically reconstructs the tensor from the CUDA IPC handle during unpickling. CPU tensors are handled as in ``"default"`` mode. **Caveat**: GPU memory is not freed until the sender process exits (PyTorch limitation). Prefer ``"cuda_ipc_pool"`` for services. * ``"cuda_ipc_pool"`` – GPU tensors are copied into a pre-allocated :class:`~pymllm.orchestrator.cuda_ipc_transport.MmItemMemoryPool` workspace and wrapped in :class:`~pymllm.orchestrator.cuda_ipc_transport.CudaIpcTensorTransportProxy`. After the receiver copies the data it increments a sync flag and the sender's recycler thread returns the chunk to the pool. This avoids GPU memory leaks. CPU tensors are handled as in ``"default"`` mode. ## Key relationship with CUDA IPC ``"default"`` and ``"cuda_ipc*"`` modes are **mutually exclusive for GPU tensors**: - In ``"default"`` mode, GPU tensors that need to cross process boundaries must first be moved to CPU (``share_memory_()``). This incurs a GPU→CPU copy. - In ``"cuda_ipc*"`` modes, GPU tensors are shared as-is via CUDA IPC handles; no copy to CPU is needed. CPU tensors are always handled via ``share_memory_()`` regardless of the mode. Attributes ---------- .. autoapisummary:: pymllm.orchestrator.shared_memory_queue.logger Classes ------- .. autoapisummary:: pymllm.orchestrator.shared_memory_queue.SharedMemoryManager pymllm.orchestrator.shared_memory_queue.TensorQueue Module Contents --------------- .. py:data:: logger .. py:class:: SharedMemoryManager Manages shared memory segments for passing metadata between processes. Each tokenized request's metadata is written to a unique shared memory segment keyed by its request ID (rid). The scheduler reads and immediately unlinks the segment to prevent memory leaks. .. py:method:: write_metadata(rid, metadata) :staticmethod: Write metadata to shared memory and return the segment name. :param rid: Request ID (used as part of the shared memory name) :param metadata: Serializable metadata object :returns: The shared memory segment name :rtype: str .. py:method:: read_metadata(shm_name, unlink = True) :staticmethod: Read metadata from shared memory and optionally unlink it. :param shm_name: The shared memory segment name :param unlink: If True, immediately unlink the segment after reading :returns: The deserialized metadata object .. py:method:: cleanup(shm_name) :staticmethod: Manually cleanup a shared memory segment (for error recovery). .. py:class:: TensorQueue(maxsize = 0, transport_mode = 'default', pool = None) Queue for passing large tensors between processes. Depending on ``transport_mode``, GPU tensors are either moved to CPU shared memory (``"default"``) or kept on GPU and shared via CUDA IPC handles (``"cuda_ipc"`` / ``"cuda_ipc_pool"``). :param maxsize: Maximum queue size (0 for unlimited). :param transport_mode: Controls how GPU tensors are transported. :param pool: Required when ``transport_mode == "cuda_ipc_pool"``. .. py:method:: put(rid, shm_name, mm_inputs) Put a request into the queue. GPU tensors inside *mm_inputs* are wrapped according to ``transport_mode`` before being placed into the underlying ``multiprocessing.Queue``. :param rid: Request ID. :param shm_name: Shared memory segment name for the tokenized metadata. :param mm_inputs: Multimodal inputs dict (may contain CUDA tensors). .. py:method:: get(timeout = None) Get a request from the queue. GPU tensors wrapped as IPC proxies are **not** automatically reconstructed here – the caller (scheduler) must call :func:`~pymllm.orchestrator.cuda_ipc_transport.unwrap_mm_inputs_from_ipc` after retrieval. :param timeout: Timeout in seconds (None for blocking). :returns: Tuple of ``(rid, shm_name, mm_inputs)``. .. py:method:: empty() .. py:method:: qsize() .. py:method:: close()