pymllm.orchestrator.cuda_ipc_transport ====================================== .. py:module:: pymllm.orchestrator.cuda_ipc_transport .. autoapi-nested-parse:: CUDA IPC Transport for zero-copy GPU tensor sharing between processes. ## Background When sharing CUDA tensors between processes, there are two fundamentally different paths: 1. **CPU shared memory path** (``enable_shared_queue=True, enable_cuda_ipc=False``): GPU tensors are moved to CPU / POSIX shared memory via ``tensor.share_memory_()``. This is safe but incurs a GPU→CPU copy which is expensive for large vision features. 2. **CUDA IPC path** (``enable_cuda_ipc=True``): GPU tensors stay on GPU. PyTorch's ``storage._share_cuda_()`` yields a serialisable IPC handle; the receiver calls ``UntypedStorage._new_shared_cuda(*handle)`` to map the same physical GPU memory without any copy. These two paths are **mutually exclusive for GPU tensors**. ``enable_cuda_ipc`` takes priority; when active the CPU-copy step in ``TensorQueue._make_tensors_shareable`` is skipped. ## CUDA IPC memory-leak problem and its fix PyTorch never releases the GPU allocation backing an IPC-exported tensor until the *sending* process exits. If we export raw model tensors we permanently leak GPU memory. **Solution** (pool-based recycling via ``MmItemMemoryPool``): * Allocate a single, fixed-size GPU workspace (``MmItemMemoryPool``). * For each outgoing GPU tensor, copy it into a chunk of the workspace and export the *chunk* via IPC (the workspace is never freed; its chunks are recycled). * After the receiving process has finished with the data it writes a sync flag (``ShmSyncBuffer``) to signal that the chunk may be reused. * A background recycler thread in the sender walks ``occupied_chunks`` and returns chunks whose sync flag has been incremented back to ``available_chunks``. ## Transport modes ``TensorTransportMode``: * ``"default"`` – CPU/shared-memory path; no CUDA IPC. * ``"cuda_ipc"`` – Simple CUDA IPC: wraps GPU tensors in ``TransportProxyTensor`` (a ``torch.Tensor`` subclass whose ``__getstate__``/``__setstate__`` use ``_share_cuda_``). Suitable for single-process-group scenarios; incurs the PyTorch memory-leak noted above. * ``"cuda_ipc_pool"`` – Pool-based CUDA IPC: copies GPU tensors into a pre-allocated ``MmItemMemoryPool`` and wraps the slice in ``CudaIpcTensorTransportProxy``. The pool is recycled, so there is no memory leak. Attributes ---------- .. autoapisummary:: pymllm.orchestrator.cuda_ipc_transport.logger pymllm.orchestrator.cuda_ipc_transport.TensorTransportMode Classes ------- .. autoapisummary:: pymllm.orchestrator.cuda_ipc_transport.ShmSyncBuffer pymllm.orchestrator.cuda_ipc_transport.MmItemMemoryChunk pymllm.orchestrator.cuda_ipc_transport.MmItemMemoryPool pymllm.orchestrator.cuda_ipc_transport.CudaIpcTensorTransportProxy pymllm.orchestrator.cuda_ipc_transport.TransportProxyTensor Functions --------- .. autoapisummary:: pymllm.orchestrator.cuda_ipc_transport.wrap_mm_inputs_for_ipc pymllm.orchestrator.cuda_ipc_transport.unwrap_mm_inputs_from_ipc Module Contents --------------- .. py:data:: logger .. py:data:: TensorTransportMode .. py:class:: ShmSyncBuffer(byte_size = 4) A single float32 in POSIX shared memory used as a sync counter. The sender resets it to 0 before exporting a chunk. The receiver increments it (atomically under a file lock) once it has finished copying data out of the chunk. When the value reaches the number of consumers (``tp_size``) the sender recycles the chunk. .. py:attribute:: buffer .. py:attribute:: meta_data :type: Dict[str, Any] .. py:method:: open(meta_data) :staticmethod: Open an existing ShmSyncBuffer from the metadata dict. .. py:method:: __del__() .. py:class:: MmItemMemoryChunk(area, sync_flag) A contiguous slice of the ``MmItemMemoryPool`` workspace tensor. .. py:attribute:: area .. py:attribute:: sync_flag .. py:property:: mem_size :type: int .. py:property:: start :type: int .. py:property:: end :type: int .. py:method:: try_to_recycle(num_consumers = 1) Return True if all consumers have finished and the chunk can be reused. .. py:class:: MmItemMemoryPool(memory_size, recycle_interval = 0.1, num_consumers = 1, device = 0) Pre-allocated GPU memory pool for CUDA IPC tensor transport. Chunks are allocated from a contiguous ``torch.int8`` tensor on GPU. A background thread periodically recycles chunks whose sync flags show that all consumers have finished reading. :param memory_size: Pool size in **bytes**. :param recycle_interval: How often (seconds) the recycler thread runs. :param num_consumers: Number of consumer processes (tp_size). Each consumer must increment the sync flag once before a chunk is recycled. :param device: CUDA device index. .. py:attribute:: num_consumers :value: 1 .. py:attribute:: available_chunks :type: List[MmItemMemoryChunk] .. py:attribute:: occupied_chunks :type: List[MmItemMemoryChunk] :value: [] .. py:method:: get_slice_with_flag(src) Allocate a pool slice for *src* and return ``(sync_flag_meta, slice_tensor)``. Thread-safe. Returns ``(None, None)`` if the pool is full. .. py:method:: shutdown() .. py:class:: CudaIpcTensorTransportProxy(data, info_data, sync_buffer_meta) Proxy that carries a CUDA IPC handle for a pool-slice tensor. The *sender* process: 1. Copies the source tensor into a ``MmItemMemoryPool`` slice (int8 view). 2. Wraps the slice in this proxy, which captures the CUDA IPC handle via ``storage._share_cuda_()``. 3. Sends the proxy through ``multiprocessing.Queue`` (pickle). The *receiver* process: 1. Calls :meth:`reconstruct_on_device` to map the IPC memory and copy it into a fresh local tensor. 2. The copy increments the sync flag, allowing the sender's recycler to reclaim the pool slice. Fallback: if ``_share_cuda_()`` fails (e.g. TP ranks), ``tensor_data`` holds the raw tensor (which will be pickled the normal way, incurring serialization cost). .. py:attribute:: sync_data_meta .. py:method:: reconstruct_on_device(device_index = None) Map IPC memory and copy into a new local tensor. This **must** be called from the *receiver* process. After the copy the sync flag is incremented so the sender can recycle the pool chunk. .. py:class:: TransportProxyTensor Bases: :py:obj:`torch.Tensor` A ``torch.Tensor`` subclass whose pickle uses CUDA IPC handles. When ``transport_mode == "cuda_ipc"`` and the tensor is on CUDA, ``__getstate__`` exports the tensor via ``storage._share_cuda_()`` instead of serialising the raw data. ``__setstate__`` reconstructs it in the receiving process via ``UntypedStorage._new_shared_cuda``. Caveat: The underlying GPU allocation is never freed until the *sender* process exits (PyTorch limitation). Prefer ``"cuda_ipc_pool"`` mode for long-running services to avoid GPU memory leaks. When the tensor is on CPU or ``transport_mode == "default"``, the tensor is serialised normally (pickle of raw data). .. py:method:: __getstate__() .. py:method:: __setstate__(state) .. py:property:: transport_mode :type: TensorTransportMode .. py:function:: wrap_mm_inputs_for_ipc(mm_inputs, transport_mode, pool = None) Recursively wrap CUDA tensors in *mm_inputs* for IPC transport. :param mm_inputs: Nested dict/list of tensors and other data. :param transport_mode: One of ``"default"``, ``"cuda_ipc"``, ``"cuda_ipc_pool"``. :param pool: Required when ``transport_mode == "cuda_ipc_pool"``. :returns: A new data structure with CUDA tensors replaced by IPC proxies. CPU tensors are left unchanged (they will be shared via ``share_memory_()`` or normal pickling downstream). .. py:function:: unwrap_mm_inputs_from_ipc(mm_inputs, device_index = None) Recursively reconstruct tensors from IPC proxy objects. Call this in the *receiver* process after getting data from the queue. :param mm_inputs: Data structure possibly containing IPC proxy objects. :param device_index: If not None, move reconstructed tensors to this device.