|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
Zero-copy utilities for efficient tensor operations. |
|
|
|
|
|
Provides utilities to minimize memory copies between NumPy and PyTorch, |
|
|
especially for CPU→GPU transfers. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
|
|
|
|
|
|
def numpy_to_torch_zerocopy(arr: np.ndarray, dtype: torch.dtype | None = None, device: str | torch.device = "cpu") -> torch.Tensor: |
|
|
""" |
|
|
Convert NumPy array to PyTorch tensor with zero-copy when possible. |
|
|
|
|
|
Zero-copy is possible when: |
|
|
1. Array is C-contiguous |
|
|
2. Target device is CPU |
|
|
3. dtype is compatible |
|
|
|
|
|
For GPU transfers, this still saves one copy (CPU→pinned→GPU vs CPU→CPU→GPU). |
|
|
|
|
|
Args: |
|
|
arr: Input NumPy array |
|
|
dtype: Target PyTorch dtype (if None, infer from numpy dtype) |
|
|
device: Target device ('cpu', 'cuda', 'mps') |
|
|
|
|
|
Returns: |
|
|
PyTorch tensor on specified device |
|
|
|
|
|
Example: |
|
|
>>> arr = np.random.rand(1000, 1000) |
|
|
>>> tensor = numpy_to_torch_zerocopy(arr, device='cuda') |
|
|
>>> # No intermediate copy on CPU if arr is C-contiguous |
|
|
""" |
|
|
|
|
|
is_contiguous = arr.flags['C_CONTIGUOUS'] |
|
|
|
|
|
if not is_contiguous: |
|
|
|
|
|
arr = np.ascontiguousarray(arr) |
|
|
|
|
|
|
|
|
tensor = torch.from_numpy(arr) |
|
|
|
|
|
|
|
|
if dtype is not None and tensor.dtype != dtype: |
|
|
tensor = tensor.to(dtype) |
|
|
|
|
|
|
|
|
if str(device) != "cpu": |
|
|
|
|
|
tensor = tensor.to(device, non_blocking=True) |
|
|
|
|
|
return tensor |
|
|
|
|
|
|
|
|
def ensure_pinned_memory(arr: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Ensure NumPy array uses pinned (page-locked) memory for faster GPU transfers. |
|
|
|
|
|
Pinned memory allows DMA (Direct Memory Access) for faster CPU→GPU transfers. |
|
|
Only beneficial for repeated transfers of the same data. |
|
|
|
|
|
Args: |
|
|
arr: Input NumPy array |
|
|
|
|
|
Returns: |
|
|
Array in pinned memory |
|
|
|
|
|
Note: |
|
|
Pinned memory is a limited resource. Only use for frequently transferred data. |
|
|
For CUDA devices only (no effect on MPS/CPU). |
|
|
""" |
|
|
if not torch.cuda.is_available(): |
|
|
return arr |
|
|
|
|
|
|
|
|
tensor = torch.from_numpy(arr).pin_memory() |
|
|
|
|
|
|
|
|
|
|
|
return tensor.numpy() |
|
|
|
|
|
|
|
|
def stack_arrays_zerocopy(arrays: list[np.ndarray], dtype: np.dtype | None = None) -> np.ndarray: |
|
|
""" |
|
|
Stack list of arrays with minimal copying. |
|
|
|
|
|
Args: |
|
|
arrays: List of NumPy arrays to stack |
|
|
dtype: Target dtype (if None, use arrays[0].dtype) |
|
|
|
|
|
Returns: |
|
|
Stacked array |
|
|
|
|
|
Note: |
|
|
If all arrays already have compatible dtype and layout, |
|
|
np.stack uses optimized C-level stacking. |
|
|
""" |
|
|
if not arrays: |
|
|
raise ValueError("Cannot stack empty list") |
|
|
|
|
|
|
|
|
if dtype is None: |
|
|
dtype = arrays[0].dtype |
|
|
|
|
|
|
|
|
|
|
|
arrays_contig = [] |
|
|
for arr in arrays: |
|
|
if arr.dtype != dtype or not arr.flags['C_CONTIGUOUS']: |
|
|
arr = np.ascontiguousarray(arr, dtype=dtype) |
|
|
arrays_contig.append(arr) |
|
|
|
|
|
|
|
|
return np.stack(arrays_contig, axis=0) |
|
|
|
|
|
|
|
|
def batch_to_device( |
|
|
tensors: list[torch.Tensor] | tuple[torch.Tensor, ...], |
|
|
device: str | torch.device, |
|
|
non_blocking: bool = True |
|
|
) -> list[torch.Tensor]: |
|
|
""" |
|
|
Move multiple tensors to device with optimal settings. |
|
|
|
|
|
Args: |
|
|
tensors: List/tuple of tensors to move |
|
|
device: Target device |
|
|
non_blocking: Use async transfer (default: True) |
|
|
|
|
|
Returns: |
|
|
List of tensors on target device |
|
|
|
|
|
Example: |
|
|
>>> tensors = [torch.rand(100), torch.rand(200)] |
|
|
>>> gpu_tensors = batch_to_device(tensors, 'cuda') |
|
|
""" |
|
|
return [t.to(device, non_blocking=non_blocking) if t is not None else None for t in tensors] |
|
|
|
|
|
|
|
|
def get_optimal_pin_memory() -> bool: |
|
|
""" |
|
|
Determine if pin_memory should be used for DataLoader. |
|
|
|
|
|
Returns: |
|
|
True if CUDA is available and pinned memory is beneficial |
|
|
|
|
|
Usage: |
|
|
>>> DataLoader(dataset, pin_memory=get_optimal_pin_memory()) |
|
|
""" |
|
|
return torch.cuda.is_available() |
|
|
|