| from __future__ import annotations |
|
|
| import logging |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, Iterable, List, Optional, Sequence, Union |
|
|
| import numpy as np |
|
|
| _LOGGER = logging.getLogger(__name__) |
|
|
| try: |
| import deepmimo |
| from deepmimo import config as deepmimo_config |
|
|
| _HAS_DEEPMIMO_V4 = True |
| except Exception: |
| deepmimo = None |
| deepmimo_config = None |
| _HAS_DEEPMIMO_V4 = False |
|
|
| try: |
| from input_preprocess import DeepMIMO_data_gen as _legacy_data_gen |
| except Exception: |
| _legacy_data_gen = None |
|
|
| ArrayLike = Union[np.ndarray, "np.typing.NDArray[np.floating[Any]]"] |
|
|
|
|
| @dataclass |
| class _PathTable: |
| power: np.ndarray |
| phase: np.ndarray |
| delay: np.ndarray |
| aoa_az: np.ndarray |
| aoa_el: np.ndarray |
| aod_az: np.ndarray |
| aod_el: np.ndarray |
| interactions: np.ndarray |
| num_paths: np.ndarray |
| los_user: np.ndarray |
| locations: np.ndarray |
|
|
|
|
| class _LazyPathAccessor: |
| """Lazy view over per-user path dictionaries compatible with v3 interface.""" |
|
|
| def __init__(self, data: _PathTable) -> None: |
| self._data = data |
|
|
| def __len__(self) -> int: |
| return int(self._data.num_paths.shape[0]) |
|
|
| def __getitem__(self, index: Union[int, slice, Sequence[int]]) -> Union[Dict[str, np.ndarray], List[Dict[str, np.ndarray]]]: |
| if isinstance(index, slice): |
| return [self[i] for i in range(*index.indices(len(self)))] |
| if isinstance(index, Sequence) and not isinstance(index, (str, bytes)): |
| return [self[int(i)] for i in index] |
| idx = int(index) |
| count = int(self._data.num_paths[idx]) |
| if count <= 0: |
| empty = np.empty((0,), dtype=np.float32) |
| return { |
| "num_paths": 0, |
| "DoD_theta": empty, |
| "DoD_phi": empty, |
| "DoA_theta": empty, |
| "DoA_phi": empty, |
| "phase": empty, |
| "ToA": empty, |
| "power": empty, |
| "LoS": np.empty((0,), dtype=np.int32), |
| } |
| sl = slice(0, count) |
| interactions = np.asarray(self._data.interactions[idx, sl]) |
| los_per_path = np.where(np.isnan(interactions), 0, (interactions == 0).astype(np.int32)) |
| return { |
| "num_paths": count, |
| "DoD_theta": np.asarray(self._data.aod_el[idx, sl]), |
| "DoD_phi": np.asarray(self._data.aod_az[idx, sl]), |
| "DoA_theta": np.asarray(self._data.aoa_el[idx, sl]), |
| "DoA_phi": np.asarray(self._data.aoa_az[idx, sl]), |
| "phase": np.asarray(self._data.phase[idx, sl]), |
| "ToA": np.asarray(self._data.delay[idx, sl]), |
| "power": np.asarray(self._data.power[idx, sl]), |
| "LoS": los_per_path.astype(np.int32), |
| } |
|
|
|
|
| def _cast(array: ArrayLike, dtype: np.dtype[Any]) -> np.ndarray: |
| arr = np.asarray(array) |
| if arr.dtype == dtype: |
| return arr |
| return arr.astype(dtype, copy=True) |
|
|
|
|
| def _load_v4_dataset( |
| scenario: str, |
| *, |
| scenarios_dir: Optional[Path], |
| load_params: Optional[Dict[str, Any]], |
| max_paths: Optional[int], |
| array_dtype: np.dtype[Any], |
| logger: Optional[logging.Logger], |
| ) -> Dict[str, Any]: |
| if not _HAS_DEEPMIMO_V4: |
| raise RuntimeError("DeepMIMO v4 package is not available in the current environment") |
|
|
| if scenarios_dir is not None: |
| deepmimo_config.set("scenarios_folder", str(scenarios_dir)) |
|
|
| params = dict(load_params or {}) |
| if max_paths is not None: |
| params.setdefault("max_paths", int(max_paths)) |
|
|
| dataset = deepmimo.load(scenario, **params) |
| logger = logger or _LOGGER |
| logger.info( |
| "Loaded DeepMIMO v4 scenario '%s' with %s users and %s max paths", |
| scenario, |
| getattr(dataset, "n_ue", "unknown"), |
| params.get("max_paths", "default"), |
| ) |
|
|
| num_paths_raw = np.asarray(dataset.num_paths) |
| tx_axis: Optional[int] = None |
| if num_paths_raw.ndim > 1 and num_paths_raw.shape[0] > 1: |
| axes = tuple(range(1, num_paths_raw.ndim)) |
| scores = num_paths_raw.sum(axis=axes) |
| tx_axis = int(np.argmax(scores)) |
|
|
| def _select_tx(arr: Any, dtype: Optional[np.dtype[Any]] = None) -> np.ndarray: |
| out = np.asarray(arr) |
| if out.dtype == object: |
| out = np.stack([np.asarray(v) for v in out], axis=0) |
| if tx_axis is not None and out.ndim >= 1 and out.shape[0] == num_paths_raw.shape[0]: |
| out = out[tx_axis] |
| if dtype is not None: |
| out = out.astype(dtype, copy=False) |
| return out |
|
|
| num_paths = _select_tx(num_paths_raw, dtype=np.int32).reshape(-1) |
| power_db = _select_tx(dataset.power, dtype=array_dtype) |
| power = np.power(10.0, power_db / 10.0, dtype=array_dtype, casting="unsafe") |
| phase = _select_tx(dataset.phase, dtype=array_dtype) |
| delay = _select_tx(dataset.delay, dtype=array_dtype) |
| aoa_az = _select_tx(dataset.aoa_az, dtype=array_dtype) |
| aoa_el = _select_tx(dataset.aoa_el, dtype=array_dtype) |
| aod_az = _select_tx(dataset.aod_az, dtype=array_dtype) |
| aod_el = _select_tx(dataset.aod_el, dtype=array_dtype) |
| interactions = _select_tx(dataset.inter, dtype=array_dtype) |
| los_raw = getattr(dataset, "los", None) |
| if los_raw is None: |
| los_selected = np.zeros_like(num_paths, dtype=np.int8) |
| else: |
| los_selected = _select_tx(los_raw, dtype=np.int8) |
| locations = _select_tx(dataset.rx_pos, dtype=np.float32) |
| if locations.ndim == 1: |
| if locations.size % 3 == 0: |
| locations = locations.reshape(-1, 3) |
| else: |
| locations = locations.reshape(-1, 1) |
| if locations.ndim > 2: |
| locations = locations.reshape(locations.shape[0], -1) |
|
|
| path_table = _PathTable( |
| power=power, |
| phase=phase, |
| delay=delay, |
| aoa_az=aoa_az, |
| aoa_el=aoa_el, |
| aod_az=aod_az, |
| aod_el=aod_el, |
| interactions=interactions, |
| num_paths=num_paths, |
| los_user=los_selected.reshape(-1), |
| locations=locations, |
| ) |
|
|
| |
| del dataset |
|
|
| user_payload = { |
| "paths": _LazyPathAccessor(path_table), |
| "LoS": path_table.los_user, |
| "location": path_table.locations, |
| } |
|
|
| return { |
| "user": user_payload, |
| "_path_data": path_table, |
| "_source": "deepmimo_v4", |
| } |
|
|
|
|
| def load_deepmimo_user_data( |
| scenario: str, |
| *, |
| scenarios_dir: Optional[Path] = None, |
| load_params: Optional[Dict[str, Any]] = None, |
| max_paths: Optional[int] = None, |
| array_dtype: np.dtype[Any] = np.float32, |
| logger: Optional[logging.Logger] = None, |
| ) -> Dict[str, Any]: |
| """Load DeepMIMO scenario data in a form compatible with legacy utilities. |
| |
| The returned dictionary mimics the structure produced by DeepMIMO v3's |
| ``DeepMIMO_data_gen`` so downstream utilities (e.g., dynamic scenario |
| generation) can operate without modification. When DeepMIMO v4 is not |
| available, the function falls back to the legacy generator if present. |
| """ |
|
|
| if _HAS_DEEPMIMO_V4: |
| return _load_v4_dataset( |
| scenario, |
| scenarios_dir=scenarios_dir, |
| load_params=load_params, |
| max_paths=max_paths, |
| array_dtype=array_dtype, |
| logger=logger, |
| ) |
|
|
| if _legacy_data_gen is not None: |
| raise RuntimeError( |
| "DeepMIMO v4 is not installed. The repository still includes the legacy " |
| "DeepMIMO_data_gen interface, but integration parameters must be provided " |
| "explicitly. Please migrate to the official DeepMIMO package or invoke " |
| "DeepMIMO_data_gen directly from your own tooling." |
| ) |
|
|
| raise RuntimeError( |
| "Neither DeepMIMO v4 nor the legacy DeepMIMO_data_gen function is available. " |
| "Please install the DeepMIMO package or provide the legacy generator." |
| ) |
|
|
|
|
| __all__ = ["load_deepmimo_user_data"] |
|
|