pymllm.orchestrator.cuda_ipc_transport¶
CUDA IPC Transport for zero-copy GPU tensor sharing between processes.
## Background
When sharing CUDA tensors between processes, there are two fundamentally different paths:
CPU shared memory path (
enable_shared_queue=True, enable_cuda_ipc=False): GPU tensors are moved to CPU / POSIX shared memory viatensor.share_memory_(). This is safe but incurs a GPU→CPU copy which is expensive for large vision features.CUDA IPC path (
enable_cuda_ipc=True): GPU tensors stay on GPU. PyTorch’sstorage._share_cuda_()yields a serialisable IPC handle; the receiver callsUntypedStorage._new_shared_cuda(*handle)to map the same physical GPU memory without any copy.
These two paths are mutually exclusive for GPU tensors. enable_cuda_ipc takes
priority; when active the CPU-copy step in TensorQueue._make_tensors_shareable is
skipped.
## CUDA IPC memory-leak problem and its fix
PyTorch never releases the GPU allocation backing an IPC-exported tensor until the sending process exits. If we export raw model tensors we permanently leak GPU memory.
Solution (pool-based recycling via MmItemMemoryPool):
Allocate a single, fixed-size GPU workspace (
MmItemMemoryPool).For each outgoing GPU tensor, copy it into a chunk of the workspace and export the chunk via IPC (the workspace is never freed; its chunks are recycled).
After the receiving process has finished with the data it writes a sync flag (
ShmSyncBuffer) to signal that the chunk may be reused.A background recycler thread in the sender walks
occupied_chunksand returns chunks whose sync flag has been incremented back toavailable_chunks.
## Transport modes
TensorTransportMode:
* "default" – CPU/shared-memory path; no CUDA IPC.
* "cuda_ipc" – Simple CUDA IPC: wraps GPU tensors in TransportProxyTensor
(a
torch.Tensorsubclass whose__getstate__/__setstate__use_share_cuda_). Suitable for single-process-group scenarios; incurs the PyTorch memory-leak noted above.
"cuda_ipc_pool"– Pool-based CUDA IPC: copies GPU tensors into a pre-allocatedMmItemMemoryPooland wraps the slice inCudaIpcTensorTransportProxy. The pool is recycled, so there is no memory leak.
Attributes¶
Classes¶
A single float32 in POSIX shared memory used as a sync counter. |
|
A contiguous slice of the |
|
Pre-allocated GPU memory pool for CUDA IPC tensor transport. |
|
Proxy that carries a CUDA IPC handle for a pool-slice tensor. |
|
A |
Functions¶
|
Recursively wrap CUDA tensors in mm_inputs for IPC transport. |
|
Recursively reconstruct tensors from IPC proxy objects. |
Module Contents¶
- pymllm.orchestrator.cuda_ipc_transport.logger¶
- pymllm.orchestrator.cuda_ipc_transport.TensorTransportMode¶
- class pymllm.orchestrator.cuda_ipc_transport.ShmSyncBuffer(byte_size=4)¶
A single float32 in POSIX shared memory used as a sync counter.
The sender resets it to 0 before exporting a chunk. The receiver increments it (atomically under a file lock) once it has finished copying data out of the chunk. When the value reaches the number of consumers (
tp_size) the sender recycles the chunk.- Parameters:
byte_size (int)
- buffer¶
- meta_data: Dict[str, Any]¶
- static open(meta_data)¶
Open an existing ShmSyncBuffer from the metadata dict.
- Parameters:
meta_data (Dict[str, Any])
- Return type:
Tuple[multiprocessing.shared_memory.SharedMemory, numpy.ndarray]
- __del__()¶
- Return type:
None
- class pymllm.orchestrator.cuda_ipc_transport.MmItemMemoryChunk(area, sync_flag)¶
A contiguous slice of the
MmItemMemoryPoolworkspace tensor.- Parameters:
area (Tuple[int, int])
sync_flag (ShmSyncBuffer)
- area¶
- sync_flag¶
- property mem_size: int¶
- Return type:
int
- property start: int¶
- Return type:
int
- property end: int¶
- Return type:
int
- try_to_recycle(num_consumers=1)¶
Return True if all consumers have finished and the chunk can be reused.
- Parameters:
num_consumers (int)
- Return type:
bool
- class pymllm.orchestrator.cuda_ipc_transport.MmItemMemoryPool(memory_size, recycle_interval=0.1, num_consumers=1, device=0)¶
Pre-allocated GPU memory pool for CUDA IPC tensor transport.
Chunks are allocated from a contiguous
torch.int8tensor on GPU. A background thread periodically recycles chunks whose sync flags show that all consumers have finished reading.- Parameters:
memory_size (int) – Pool size in bytes.
recycle_interval (float) – How often (seconds) the recycler thread runs.
num_consumers (int) – Number of consumer processes (tp_size). Each consumer must increment the sync flag once before a chunk is recycled.
device (int) – CUDA device index.
- num_consumers = 1¶
- available_chunks: List[MmItemMemoryChunk]¶
- occupied_chunks: List[MmItemMemoryChunk] = []¶
- get_slice_with_flag(src)¶
Allocate a pool slice for src and return
(sync_flag_meta, slice_tensor).Thread-safe. Returns
(None, None)if the pool is full.- Parameters:
src (torch.Tensor)
- Return type:
Tuple[Optional[Dict[str, Any]], Optional[torch.Tensor]]
- shutdown()¶
- Return type:
None
- class pymllm.orchestrator.cuda_ipc_transport.CudaIpcTensorTransportProxy(data, info_data, sync_buffer_meta)¶
Proxy that carries a CUDA IPC handle for a pool-slice tensor.
The sender process: 1. Copies the source tensor into a
MmItemMemoryPoolslice (int8 view). 2. Wraps the slice in this proxy, which captures the CUDA IPC handle viastorage._share_cuda_().Sends the proxy through
multiprocessing.Queue(pickle).
The receiver process: 1. Calls
reconstruct_on_device()to map the IPC memory and copy itinto a fresh local tensor.
The copy increments the sync flag, allowing the sender’s recycler to reclaim the pool slice.
Fallback: if
_share_cuda_()fails (e.g. TP ranks),tensor_dataholds the raw tensor (which will be pickled the normal way, incurring serialization cost).- Parameters:
data (torch.Tensor)
info_data (torch.Tensor)
sync_buffer_meta (Dict[str, Any])
- sync_data_meta¶
- reconstruct_on_device(device_index=None)¶
Map IPC memory and copy into a new local tensor.
This must be called from the receiver process. After the copy the sync flag is incremented so the sender can recycle the pool chunk.
- Parameters:
device_index (Optional[int])
- Return type:
torch.Tensor
- class pymllm.orchestrator.cuda_ipc_transport.TransportProxyTensor¶
Bases:
torch.TensorA
torch.Tensorsubclass whose pickle uses CUDA IPC handles.When
transport_mode == "cuda_ipc"and the tensor is on CUDA,__getstate__exports the tensor viastorage._share_cuda_()instead of serialising the raw data.__setstate__reconstructs it in the receiving process viaUntypedStorage._new_shared_cuda.Caveat: The underlying GPU allocation is never freed until the sender process exits (PyTorch limitation). Prefer
"cuda_ipc_pool"mode for long-running services to avoid GPU memory leaks.When the tensor is on CPU or
transport_mode == "default", the tensor is serialised normally (pickle of raw data).- __getstate__()¶
- Return type:
Dict[str, Any]
- __setstate__(state)¶
- Parameters:
state (Dict[str, Any])
- Return type:
None
- property transport_mode: TensorTransportMode¶
- Return type:
TensorTransportMode
- pymllm.orchestrator.cuda_ipc_transport.wrap_mm_inputs_for_ipc(mm_inputs, transport_mode, pool=None)¶
Recursively wrap CUDA tensors in mm_inputs for IPC transport.
- Parameters:
mm_inputs (Optional[Dict[str, Any]]) – Nested dict/list of tensors and other data.
transport_mode (TensorTransportMode) – One of
"default","cuda_ipc","cuda_ipc_pool".pool (Optional[MmItemMemoryPool]) – Required when
transport_mode == "cuda_ipc_pool".
- Returns:
A new data structure with CUDA tensors replaced by IPC proxies. CPU tensors are left unchanged (they will be shared via
share_memory_()or normal pickling downstream).- Return type:
Optional[Dict[str, Any]]
- pymllm.orchestrator.cuda_ipc_transport.unwrap_mm_inputs_from_ipc(mm_inputs, device_index=None)¶
Recursively reconstruct tensors from IPC proxy objects.
Call this in the receiver process after getting data from the queue.
- Parameters:
mm_inputs (Optional[Dict[str, Any]]) – Data structure possibly containing IPC proxy objects.
device_index (Optional[int]) – If not None, move reconstructed tensors to this device.
- Return type:
Optional[Dict[str, Any]]