pymllm.orchestrator.cuda_ipc_transport

CUDA IPC Transport for zero-copy GPU tensor sharing between processes.

## Background

When sharing CUDA tensors between processes, there are two fundamentally different paths:

  1. CPU shared memory path (enable_shared_queue=True, enable_cuda_ipc=False): GPU tensors are moved to CPU / POSIX shared memory via tensor.share_memory_(). This is safe but incurs a GPU→CPU copy which is expensive for large vision features.

  2. CUDA IPC path (enable_cuda_ipc=True): GPU tensors stay on GPU. PyTorch’s storage._share_cuda_() yields a serialisable IPC handle; the receiver calls UntypedStorage._new_shared_cuda(*handle) to map the same physical GPU memory without any copy.

These two paths are mutually exclusive for GPU tensors. enable_cuda_ipc takes priority; when active the CPU-copy step in TensorQueue._make_tensors_shareable is skipped.

## CUDA IPC memory-leak problem and its fix

PyTorch never releases the GPU allocation backing an IPC-exported tensor until the sending process exits. If we export raw model tensors we permanently leak GPU memory.

Solution (pool-based recycling via MmItemMemoryPool):

  • Allocate a single, fixed-size GPU workspace (MmItemMemoryPool).

  • For each outgoing GPU tensor, copy it into a chunk of the workspace and export the chunk via IPC (the workspace is never freed; its chunks are recycled).

  • After the receiving process has finished with the data it writes a sync flag (ShmSyncBuffer) to signal that the chunk may be reused.

  • A background recycler thread in the sender walks occupied_chunks and returns chunks whose sync flag has been incremented back to available_chunks.

## Transport modes

TensorTransportMode: * "default" – CPU/shared-memory path; no CUDA IPC. * "cuda_ipc" – Simple CUDA IPC: wraps GPU tensors in TransportProxyTensor

(a torch.Tensor subclass whose __getstate__/__setstate__ use _share_cuda_). Suitable for single-process-group scenarios; incurs the PyTorch memory-leak noted above.

  • "cuda_ipc_pool" – Pool-based CUDA IPC: copies GPU tensors into a pre-allocated MmItemMemoryPool and wraps the slice in CudaIpcTensorTransportProxy. The pool is recycled, so there is no memory leak.

Attributes

Classes

ShmSyncBuffer

A single float32 in POSIX shared memory used as a sync counter.

MmItemMemoryChunk

A contiguous slice of the MmItemMemoryPool workspace tensor.

MmItemMemoryPool

Pre-allocated GPU memory pool for CUDA IPC tensor transport.

CudaIpcTensorTransportProxy

Proxy that carries a CUDA IPC handle for a pool-slice tensor.

TransportProxyTensor

A torch.Tensor subclass whose pickle uses CUDA IPC handles.

Functions

wrap_mm_inputs_for_ipc(mm_inputs, transport_mode[, pool])

Recursively wrap CUDA tensors in mm_inputs for IPC transport.

unwrap_mm_inputs_from_ipc(mm_inputs[, device_index])

Recursively reconstruct tensors from IPC proxy objects.

Module Contents

pymllm.orchestrator.cuda_ipc_transport.logger
pymllm.orchestrator.cuda_ipc_transport.TensorTransportMode
class pymllm.orchestrator.cuda_ipc_transport.ShmSyncBuffer(byte_size=4)

A single float32 in POSIX shared memory used as a sync counter.

The sender resets it to 0 before exporting a chunk. The receiver increments it (atomically under a file lock) once it has finished copying data out of the chunk. When the value reaches the number of consumers (tp_size) the sender recycles the chunk.

Parameters:

byte_size (int)

buffer
meta_data: Dict[str, Any]
static open(meta_data)

Open an existing ShmSyncBuffer from the metadata dict.

Parameters:

meta_data (Dict[str, Any])

Return type:

Tuple[multiprocessing.shared_memory.SharedMemory, numpy.ndarray]

__del__()
Return type:

None

class pymllm.orchestrator.cuda_ipc_transport.MmItemMemoryChunk(area, sync_flag)

A contiguous slice of the MmItemMemoryPool workspace tensor.

Parameters:
area
sync_flag
property mem_size: int
Return type:

int

property start: int
Return type:

int

property end: int
Return type:

int

try_to_recycle(num_consumers=1)

Return True if all consumers have finished and the chunk can be reused.

Parameters:

num_consumers (int)

Return type:

bool

class pymllm.orchestrator.cuda_ipc_transport.MmItemMemoryPool(memory_size, recycle_interval=0.1, num_consumers=1, device=0)

Pre-allocated GPU memory pool for CUDA IPC tensor transport.

Chunks are allocated from a contiguous torch.int8 tensor on GPU. A background thread periodically recycles chunks whose sync flags show that all consumers have finished reading.

Parameters:
  • memory_size (int) – Pool size in bytes.

  • recycle_interval (float) – How often (seconds) the recycler thread runs.

  • num_consumers (int) – Number of consumer processes (tp_size). Each consumer must increment the sync flag once before a chunk is recycled.

  • device (int) – CUDA device index.

num_consumers = 1
available_chunks: List[MmItemMemoryChunk]
occupied_chunks: List[MmItemMemoryChunk] = []
get_slice_with_flag(src)

Allocate a pool slice for src and return (sync_flag_meta, slice_tensor).

Thread-safe. Returns (None, None) if the pool is full.

Parameters:

src (torch.Tensor)

Return type:

Tuple[Optional[Dict[str, Any]], Optional[torch.Tensor]]

shutdown()
Return type:

None

class pymllm.orchestrator.cuda_ipc_transport.CudaIpcTensorTransportProxy(data, info_data, sync_buffer_meta)

Proxy that carries a CUDA IPC handle for a pool-slice tensor.

The sender process: 1. Copies the source tensor into a MmItemMemoryPool slice (int8 view). 2. Wraps the slice in this proxy, which captures the CUDA IPC handle via

storage._share_cuda_().

  1. Sends the proxy through multiprocessing.Queue (pickle).

The receiver process: 1. Calls reconstruct_on_device() to map the IPC memory and copy it

into a fresh local tensor.

  1. The copy increments the sync flag, allowing the sender’s recycler to reclaim the pool slice.

Fallback: if _share_cuda_() fails (e.g. TP ranks), tensor_data holds the raw tensor (which will be pickled the normal way, incurring serialization cost).

Parameters:
  • data (torch.Tensor)

  • info_data (torch.Tensor)

  • sync_buffer_meta (Dict[str, Any])

sync_data_meta
reconstruct_on_device(device_index=None)

Map IPC memory and copy into a new local tensor.

This must be called from the receiver process. After the copy the sync flag is incremented so the sender can recycle the pool chunk.

Parameters:

device_index (Optional[int])

Return type:

torch.Tensor

class pymllm.orchestrator.cuda_ipc_transport.TransportProxyTensor

Bases: torch.Tensor

A torch.Tensor subclass whose pickle uses CUDA IPC handles.

When transport_mode == "cuda_ipc" and the tensor is on CUDA, __getstate__ exports the tensor via storage._share_cuda_() instead of serialising the raw data. __setstate__ reconstructs it in the receiving process via UntypedStorage._new_shared_cuda.

Caveat: The underlying GPU allocation is never freed until the sender process exits (PyTorch limitation). Prefer "cuda_ipc_pool" mode for long-running services to avoid GPU memory leaks.

When the tensor is on CPU or transport_mode == "default", the tensor is serialised normally (pickle of raw data).

__getstate__()
Return type:

Dict[str, Any]

__setstate__(state)
Parameters:

state (Dict[str, Any])

Return type:

None

property transport_mode: TensorTransportMode
Return type:

TensorTransportMode

pymllm.orchestrator.cuda_ipc_transport.wrap_mm_inputs_for_ipc(mm_inputs, transport_mode, pool=None)

Recursively wrap CUDA tensors in mm_inputs for IPC transport.

Parameters:
  • mm_inputs (Optional[Dict[str, Any]]) – Nested dict/list of tensors and other data.

  • transport_mode (TensorTransportMode) – One of "default", "cuda_ipc", "cuda_ipc_pool".

  • pool (Optional[MmItemMemoryPool]) – Required when transport_mode == "cuda_ipc_pool".

Returns:

A new data structure with CUDA tensors replaced by IPC proxies. CPU tensors are left unchanged (they will be shared via share_memory_() or normal pickling downstream).

Return type:

Optional[Dict[str, Any]]

pymllm.orchestrator.cuda_ipc_transport.unwrap_mm_inputs_from_ipc(mm_inputs, device_index=None)

Recursively reconstruct tensors from IPC proxy objects.

Call this in the receiver process after getting data from the queue.

Parameters:
  • mm_inputs (Optional[Dict[str, Any]]) – Data structure possibly containing IPC proxy objects.

  • device_index (Optional[int]) – If not None, move reconstructed tensors to this device.

Return type:

Optional[Dict[str, Any]]