pymllm.orchestrator.shared_memory_queue

Shared memory and queue utilities for fast IPC between tokenizer and scheduler.

This module implements the shared-queue fast path to avoid expensive ZMQ serialization of large multimodal tensors.

## Design

  • Metadata lane: Small tokenized objects are written to a POSIX shared memory segment keyed by the request ID (rid). The scheduler reads and immediately unlinks the segment.

  • Tensor lane: Large tensors can be transported in one of three modes, controlled by TensorTransportMode (passed at queue construction time):

    • "default" – CPU tensors only. GPU tensors are moved to POSIX shared memory via tensor.share_memory_() (or left on CPU if already there). This is the original behaviour and requires no CUDA support.

    • "cuda_ipc" – GPU tensors stay on GPU and are wrapped in TransportProxyTensor. On the receiver side the proxy’s __setstate__ automatically reconstructs the tensor from the CUDA IPC handle during unpickling. CPU tensors are handled as in "default" mode. Caveat: GPU memory is not freed until the sender process exits (PyTorch limitation). Prefer "cuda_ipc_pool" for services.

    • "cuda_ipc_pool" – GPU tensors are copied into a pre-allocated MmItemMemoryPool workspace and wrapped in CudaIpcTensorTransportProxy. After the receiver copies the data it increments a sync flag and the sender’s recycler thread returns the chunk to the pool. This avoids GPU memory leaks. CPU tensors are handled as in "default" mode.

## Key relationship with CUDA IPC

"default" and "cuda_ipc*" modes are mutually exclusive for GPU tensors:

  • In "default" mode, GPU tensors that need to cross process boundaries must first be moved to CPU (share_memory_()). This incurs a GPU→CPU copy.

  • In "cuda_ipc*" modes, GPU tensors are shared as-is via CUDA IPC handles; no copy to CPU is needed.

CPU tensors are always handled via share_memory_() regardless of the mode.

Attributes

Classes

SharedMemoryManager

Manages shared memory segments for passing metadata between processes.

TensorQueue

Queue for passing large tensors between processes.

Module Contents

pymllm.orchestrator.shared_memory_queue.logger
class pymllm.orchestrator.shared_memory_queue.SharedMemoryManager

Manages shared memory segments for passing metadata between processes.

Each tokenized request’s metadata is written to a unique shared memory segment keyed by its request ID (rid). The scheduler reads and immediately unlinks the segment to prevent memory leaks.

static write_metadata(rid, metadata)

Write metadata to shared memory and return the segment name.

Parameters:
  • rid (str) – Request ID (used as part of the shared memory name)

  • metadata (Any) – Serializable metadata object

Returns:

The shared memory segment name

Return type:

str

static read_metadata(shm_name, unlink=True)

Read metadata from shared memory and optionally unlink it.

Parameters:
  • shm_name (str) – The shared memory segment name

  • unlink (bool) – If True, immediately unlink the segment after reading

Returns:

The deserialized metadata object

Return type:

Any

static cleanup(shm_name)

Manually cleanup a shared memory segment (for error recovery).

Parameters:

shm_name (str)

Return type:

None

class pymllm.orchestrator.shared_memory_queue.TensorQueue(maxsize=0, transport_mode='default', pool=None)

Queue for passing large tensors between processes.

Depending on transport_mode, GPU tensors are either moved to CPU shared memory ("default") or kept on GPU and shared via CUDA IPC handles ("cuda_ipc" / "cuda_ipc_pool").

Parameters:
  • maxsize (int) – Maximum queue size (0 for unlimited).

  • transport_mode (pymllm.orchestrator.cuda_ipc_transport.TensorTransportMode) – Controls how GPU tensors are transported.

  • pool (Optional[pymllm.orchestrator.cuda_ipc_transport.MmItemMemoryPool]) – Required when transport_mode == "cuda_ipc_pool".

put(rid, shm_name, mm_inputs)

Put a request into the queue.

GPU tensors inside mm_inputs are wrapped according to transport_mode before being placed into the underlying multiprocessing.Queue.

Parameters:
  • rid (str) – Request ID.

  • shm_name (str) – Shared memory segment name for the tokenized metadata.

  • mm_inputs (Optional[Dict[str, Any]]) – Multimodal inputs dict (may contain CUDA tensors).

Return type:

None

get(timeout=None)

Get a request from the queue.

GPU tensors wrapped as IPC proxies are not automatically reconstructed here – the caller (scheduler) must call unwrap_mm_inputs_from_ipc() after retrieval.

Parameters:

timeout (Optional[float]) – Timeout in seconds (None for blocking).

Returns:

Tuple of (rid, shm_name, mm_inputs).

Return type:

tuple[str, str, Optional[Dict[str, Any]]]

empty()
Return type:

bool

qsize()
Return type:

int

close()
Return type:

None