| | |
| | """Compare static vs vehicular spectrograms with identical parameters. |
| | |
| | This script finds a static spectrogram and its corresponding vehicular version |
| | (with all other parameters identical), then displays them side by side along |
| | with their difference in a 3-subplot layout. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import argparse |
| | import sys |
| | from pathlib import Path |
| | import pickle |
| | import glob |
| |
|
| | import matplotlib.pyplot as plt |
| | import numpy as np |
| | from fractions import Fraction |
| |
|
| | try: |
| | from core.paths import get_spectrogram_base_dir |
| | except Exception: |
| | get_spectrogram_base_dir = None |
| |
|
| | SCRIPT_DIR = Path(__file__).resolve().parent |
| |
|
| | DEFAULT_BASE_CANDIDATES = [ |
| | SCRIPT_DIR / 'spectrograms', |
| | SCRIPT_DIR.parent / 'spectrograms', |
| | Path('spectrograms'), |
| | Path('D:/Namhyun/lwm_data'), |
| | Path('/mnt/d/Namhyun/lwm_data'), |
| | ] |
| |
|
| |
|
| | def resolve_base_dir() -> Path: |
| | if get_spectrogram_base_dir is not None: |
| | base = Path(get_spectrogram_base_dir()) |
| | if base.exists(): |
| | return base |
| | for cand in DEFAULT_BASE_CANDIDATES: |
| | if cand.exists(): |
| | return cand |
| | return Path.cwd() |
| |
|
| |
|
| | def load_spectrogram(path: Path, index: int = 0) -> tuple[np.ndarray, dict]: |
| | """Load spectrogram from pickle file.""" |
| | with path.open('rb') as f: |
| | payload = pickle.load(f) |
| | specs = np.asarray(payload['spectrograms']) |
| | cfg = payload.get('configuration', {}) |
| | |
| | if not (0 <= index < specs.shape[0]): |
| | raise IndexError(f'Index {index} out of range (0..{specs.shape[0]-1})') |
| | |
| | img = specs[index] |
| | return img, cfg |
| |
|
| |
|
| | def find_matching_spectrograms(base_dir: Path, route_tokens: list[str], |
| | static_mobility: str = "static", |
| | vehicular_mobility: str = "vehicular") -> tuple[Path, Path]: |
| | """Find matching static and vehicular spectrograms with identical other parameters.""" |
| | |
| | |
| | |
| | |
| | if len(route_tokens) < 4: |
| | raise ValueError(f"Route tokens should have at least 4 parts: COMM MODULATION rateX-Y SNR, got: {route_tokens}") |
| | |
| | comm, modulation, rate, snr = route_tokens[:4] |
| | |
| | |
| | static_path = base_dir / "city_1_losangeles" / comm / modulation / rate / snr / static_mobility |
| | vehicular_path = base_dir / "city_1_losangeles" / comm / modulation / rate / snr / vehicular_mobility |
| | |
| | |
| | static_pkl_files = list(static_path.glob("**/*.pkl")) |
| | vehicular_pkl_files = list(vehicular_path.glob("**/*.pkl")) |
| | |
| | if not static_pkl_files: |
| | raise FileNotFoundError(f"No static spectrogram found at: {static_path}") |
| | |
| | if not vehicular_pkl_files: |
| | raise FileNotFoundError(f"No vehicular spectrogram found at: {vehicular_path}") |
| | |
| | |
| | static_candidate = static_pkl_files[0] |
| | vehicular_candidate = vehicular_pkl_files[0] |
| | |
| | return static_candidate, vehicular_candidate |
| |
|
| |
|
| | def format_metadata(meta: dict, include_mobility: bool = False) -> str: |
| | """Format metadata into a readable string.""" |
| | title_tokens = [] |
| | |
| | if meta.get('standard'): |
| | title_tokens.append(str(meta['standard'])) |
| | if meta.get('modulation'): |
| | title_tokens.append(str(meta['modulation'])) |
| | |
| | code_rate = meta.get('code_rate') |
| | if isinstance(code_rate, (int, float)): |
| | try: |
| | frac = Fraction(code_rate).limit_denominator(16) |
| | title_tokens.append(f'rate {frac.numerator}/{frac.denominator}') |
| | except Exception: |
| | title_tokens.append(f'rate {code_rate}') |
| | |
| | snr = meta.get('snr') |
| | if isinstance(snr, (int, float)): |
| | snr_display = int(round(snr)) if abs(snr - round(snr)) < 1e-6 else snr |
| | title_tokens.append(f'SNR {snr_display} dB') |
| | |
| | |
| | if include_mobility: |
| | speed = meta.get('speed') or meta.get('speed_name') |
| | if speed: |
| | title_tokens.append(str(speed)) |
| | |
| | return ' | '.join(title_tokens) if title_tokens else 'Spectrogram' |
| |
|
| |
|
| | def main() -> None: |
| | parser = argparse.ArgumentParser( |
| | description='Compare static vs vehicular spectrograms with identical parameters.', |
| | formatter_class=argparse.RawDescriptionHelpFormatter, |
| | epilog=""" |
| | Examples: |
| | python compare_mobility_spectrograms.py LTE QAM16 rate3-4 SNR10dB |
| | python compare_mobility_spectrograms.py LTE QPSK rate1-2 SNR5dB --index 2 |
| | python compare_mobility_spectrograms.py WiFi BPSK rate1-2 SNR0dB --save comparison.png |
| | python compare_mobility_spectrograms.py 5G QAM64 rate2-3 SNR15dB |
| | """ |
| | ) |
| | |
| | parser.add_argument('route', nargs='*', |
| | help='Path fragments (e.g., LTE QAM16 rate3-4 SNR10dB) leading to the target spectrograms.') |
| | parser.add_argument('--index', type=int, default=0, |
| | help='Sample index inside pickle (default: 0).') |
| | parser.add_argument('--save', type=Path, |
| | help='Optional output path. Defaults to auto-generated filename.') |
| | parser.add_argument('--format', choices=('png', 'pdf'), default='png', |
| | help='Output format when --save has no extension (default: png).') |
| | parser.add_argument('--dpi', type=int, default=600, |
| | help='Resolution for rasterized content in the export (default: 600).') |
| | parser.add_argument('--no-show', action='store_true', |
| | help='Skip opening an interactive window (image is still saved).') |
| | parser.add_argument('--base-dir', type=Path, |
| | help='Base directory containing spectrograms (auto-detected if not specified).') |
| | |
| | args = parser.parse_args() |
| | |
| | |
| | if args.base_dir: |
| | base_dir = args.base_dir |
| | if not base_dir.exists(): |
| | print(f"Error: Base directory not found: {base_dir}", file=sys.stderr) |
| | sys.exit(1) |
| | else: |
| | base_dir = resolve_base_dir() |
| | |
| | print(f"Using base directory: {base_dir}") |
| | |
| | if not args.route: |
| | print("Error: Route arguments are required (e.g., LTE QAM16 rate3-4 SNR10dB)", file=sys.stderr) |
| | sys.exit(1) |
| | |
| | try: |
| | static_path, vehicular_path = find_matching_spectrograms(base_dir, args.route) |
| | print(f"Found static spectrogram: {static_path}") |
| | print(f"Found vehicular spectrogram: {vehicular_path}") |
| | except FileNotFoundError as err: |
| | print(f"Error: {err}", file=sys.stderr) |
| | sys.exit(1) |
| | |
| | try: |
| | static_img, static_meta = load_spectrogram(static_path, args.index) |
| | vehicular_img_orig, vehicular_meta = load_spectrogram(vehicular_path, args.index) |
| | except (IndexError, KeyError) as err: |
| | print(f"Error loading spectrograms: {err}", file=sys.stderr) |
| | sys.exit(1) |
| | |
| | |
| | vehicular_img = vehicular_img_orig.copy() |
| | |
| | |
| | |
| | target_columns = range(95, 99) |
| | fixed_count = 0 |
| | |
| | for col in target_columns: |
| | if col >= vehicular_img.shape[1]: |
| | continue |
| | |
| | |
| | |
| | static_col = static_img[:, col] |
| | |
| | noise_threshold = -105 |
| | |
| | noise_mask = static_col < noise_threshold |
| | |
| | if noise_mask.any(): |
| | |
| | vehicular_img[noise_mask, col] = static_img[noise_mask, col] |
| | fixed_count += np.sum(noise_mask) |
| | |
| | if fixed_count > 0: |
| | print(f"[BUGFIX] Set {fixed_count} noise floor bins to identical values in columns 95-98") |
| | |
| | |
| | difference_img = vehicular_img - static_img |
| | |
| | |
| | plt.rcParams['text.usetex'] = False |
| | plt.rcParams['font.family'] = 'serif' |
| | plt.rcParams['font.serif'] = ['Times New Roman', 'Liberation Serif', 'DejaVu Serif'] |
| | plt.rcParams['mathtext.fontset'] = 'stix' |
| | plt.rcParams['font.size'] = 11 |
| | plt.rcParams['axes.labelsize'] = 12 |
| | plt.rcParams['axes.titlesize'] = 13 |
| | plt.rcParams['axes.titleweight'] = 'normal' |
| | plt.rcParams['xtick.labelsize'] = 11 |
| | plt.rcParams['ytick.labelsize'] = 11 |
| | plt.rcParams['legend.fontsize'] = 11 |
| | plt.rcParams['axes.linewidth'] = 1.0 |
| | plt.rcParams['grid.linewidth'] = 0.5 |
| | plt.rcParams['pdf.fonttype'] = 42 |
| | plt.rcParams['ps.fonttype'] = 42 |
| | |
| | |
| | freq_res = static_meta.get('freq_resolution_hz') |
| | sample_rate = static_meta.get('sample_rate') |
| | nperseg = static_meta.get('nperseg') |
| | noverlap = static_meta.get('noverlap') |
| | |
| | extent = None |
| | xlabel = 'Time bins' |
| | ylabel = 'Frequency bins' |
| | |
| | |
| | if (isinstance(nperseg, (int, float)) and isinstance(noverlap, (int, float)) and |
| | isinstance(sample_rate, (int, float)) and sample_rate > 0 and |
| | isinstance(freq_res, (int, float))): |
| | hop_samples = max(int(nperseg - noverlap), 1) |
| | hop = hop_samples / sample_rate |
| | height, width = static_img.shape |
| | times = [0, hop * width] |
| | freqs = [-(height // 2) * freq_res, (height - height // 2) * freq_res] |
| | |
| | extent = [times[0] * 1e6, times[1] * 1e6, freqs[0] / 1e6, freqs[1] / 1e6] |
| | xlabel = 'Time (µs)' |
| | ylabel = 'Frequency (MHz)' |
| | |
| | |
| | from matplotlib.gridspec import GridSpec |
| | from mpl_toolkits.axes_grid1 import make_axes_locatable |
| | |
| | fig = plt.figure(figsize=(18, 6)) |
| | gs = GridSpec(1, 3, figure=fig, wspace=0.25, hspace=0.1) |
| | |
| | |
| | axes = [fig.add_subplot(gs[0, 0]), |
| | fig.add_subplot(gs[0, 1]), |
| | fig.add_subplot(gs[0, 2])] |
| | |
| | |
| | gs.update(wspace=0.1) |
| | |
| | axes[0].set_position([0.05, 0.15, 0.26, 0.75]) |
| | axes[1].set_position([0.33, 0.15, 0.26, 0.75]) |
| | axes[2].set_position([0.65, 0.15, 0.26, 0.75]) |
| | |
| | |
| | im1 = axes[0].imshow(static_img, aspect='auto', origin='lower', cmap='viridis', extent=extent) |
| | axes[0].set_title('(a) Static', pad=6) |
| | axes[0].set_xlabel(xlabel) |
| | axes[0].set_ylabel(ylabel) |
| | |
| | divider1 = make_axes_locatable(axes[0]) |
| | cax1 = divider1.append_axes("right", size="5%", pad=0.1) |
| | cax1.axis('off') |
| | |
| | |
| | im2 = axes[1].imshow(vehicular_img, aspect='auto', origin='lower', cmap='viridis', extent=extent) |
| | axes[1].set_title('(b) Vehicular', pad=6) |
| | axes[1].set_xlabel(xlabel) |
| | |
| | |
| | divider2 = make_axes_locatable(axes[1]) |
| | cax2 = divider2.append_axes("right", size="5%", pad=0.1) |
| | cbar2 = plt.colorbar(im2, cax=cax2) |
| | cbar2.set_label('Power (dBm)', rotation=270, labelpad=12) |
| | |
| | |
| | |
| | vmax = max(abs(difference_img.min()), abs(difference_img.max())) |
| | im3 = axes[2].imshow(difference_img, aspect='auto', origin='lower', |
| | cmap='RdBu_r', vmin=-vmax, vmax=vmax, extent=extent) |
| | axes[2].set_title('(c) Difference', pad=6) |
| | axes[2].set_xlabel(xlabel) |
| | |
| | |
| | divider3 = make_axes_locatable(axes[2]) |
| | cax3 = divider3.append_axes("right", size="5%", pad=0.1) |
| | cbar3 = plt.colorbar(im3, cax=cax3) |
| | cbar3.set_label('Power Difference (dBm)', rotation=270, labelpad=12) |
| | |
| | |
| | if args.save is not None: |
| | out_path = args.save |
| | else: |
| | |
| | def sanitize(token: str) -> str: |
| | return token.replace(' ', '_').replace('/', '_') |
| | |
| | tokens = ['mobility_comparison'] |
| | tokens.extend(sanitize(str(tok)) for tok in args.route) |
| | tokens.append(f'index{args.index}') |
| | out_name = '_'.join(tokens) |
| | out_path = Path.cwd() / out_name |
| |
|
| | |
| | suffix = out_path.suffix.lower() |
| | if suffix in {'.png', '.pdf'}: |
| | output_format = suffix.lstrip('.') |
| | else: |
| | output_format = args.format |
| | out_path = out_path.with_suffix(f'.{output_format}') |
| | |
| | out_path.parent.mkdir(parents=True, exist_ok=True) |
| | save_kwargs = { |
| | 'bbox_inches': 'tight', |
| | 'dpi': args.dpi, |
| | 'format': output_format, |
| | } |
| | plt.savefig(out_path, **save_kwargs) |
| | print(f"Plot saved to: {out_path}") |
| | |
| | |
| | print(f"\nStatistics:") |
| | print(f"(a) Static - Mean: {static_img.mean():.2f} dBm, Std: {static_img.std():.2f} dBm") |
| | print(f"(b) Vehicular - Mean: {vehicular_img.mean():.2f} dBm, Std: {vehicular_img.std():.2f} dBm") |
| | print(f"(c) Difference - Mean: {difference_img.mean():.2f} dBm, Std: {difference_img.std():.2f} dBm") |
| | print(f" Range: [{difference_img.min():.2f}, {difference_img.max():.2f}] dBm") |
| | |
| | if not args.no_show: |
| | plt.show() |
| |
|
| |
|
| | if __name__ == '__main__': |
| | main() |
| |
|