|
import subprocess
|
|
import os
|
|
import shutil
|
|
|
|
def clone_dataset_scenario(repo_url, model_repo_dir="./LWM", scenarios_dir="scenarios"):
|
|
"""
|
|
Clones all scenarios from a repository, ensuring all files (small and large) are downloaded.
|
|
|
|
Args:
|
|
repo_url (str): URL of the Git repository
|
|
model_repo_dir (str): Path to the model repository
|
|
scenarios_dir (str): Directory name for storing scenarios
|
|
"""
|
|
current_dir = os.path.basename(os.getcwd())
|
|
if current_dir == "LWM":
|
|
model_repo_dir = "."
|
|
|
|
scenarios_path = os.path.join(model_repo_dir, scenarios_dir)
|
|
os.makedirs(scenarios_path, exist_ok=True)
|
|
|
|
original_dir = os.getcwd()
|
|
|
|
try:
|
|
if os.path.exists(scenarios_path):
|
|
shutil.rmtree(scenarios_path)
|
|
|
|
print("Cloning entire repository into temporary directory ...")
|
|
subprocess.run([
|
|
"git", "clone",
|
|
repo_url,
|
|
scenarios_path
|
|
], check=True)
|
|
|
|
os.chdir(scenarios_path)
|
|
|
|
print("Pulling all files using Git LFS ...")
|
|
subprocess.run(["git", "lfs", "install"], check=True)
|
|
subprocess.run(["git", "lfs", "pull"], check=True)
|
|
|
|
print(f"Successfully cloned all scenarios into {scenarios_path}")
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error cloning scenarios: {str(e)}")
|
|
finally:
|
|
if os.path.exists(scenarios_path):
|
|
shutil.rmtree(scenarios_path)
|
|
os.chdir(original_dir)
|
|
|
|
model_repo_url = "https://huggingface.co/wi-lab/lwm"
|
|
model_repo_dir = "./LWM"
|
|
|
|
if not os.path.exists(model_repo_dir):
|
|
print(f"Cloning model repository from {model_repo_url}...")
|
|
subprocess.run(["git", "clone", model_repo_url, model_repo_dir], check=True)
|
|
|
|
import numpy as np
|
|
dataset_repo_url = "https://huggingface.co/datasets/wi-lab/lwm"
|
|
clone_dataset_scenario(dataset_repo_url, model_repo_dir)
|
|
|
|
if os.path.exists(model_repo_dir):
|
|
os.chdir(model_repo_dir)
|
|
print(f"Changed working directory to {os.getcwd()}")
|
|
else:
|
|
print(f"Directory {model_repo_dir} does not exist. Please check if the repository is cloned properly.")
|
|
|
|
from input_preprocess import tokenizer
|
|
from lwm_model import lwm
|
|
import torch
|
|
|
|
scenario_names = np.array([
|
|
"city_18_denver", "city_15_indianapolis", "city_19_oklahoma",
|
|
"city_12_fortworth", "city_11_santaclara", "city_7_sandiego"
|
|
])
|
|
scenario_idxs = np.array([0, 1, 2, 3, 4, 5])[3]
|
|
selected_scenario_names = scenario_names[scenario_idxs]
|
|
|
|
snr_db = None
|
|
|
|
preprocessed_chs = tokenizer(
|
|
selected_scenario_names=selected_scenario_names,
|
|
manual_data=None,
|
|
gen_raw=True,
|
|
snr_db=snr_db
|
|
)
|
|
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
|
print(f"Loading the LWM model on {device} ...")
|
|
model = lwm.from_pretrained(device=device)
|
|
|
|
from inference import lwm_inference, create_raw_dataset
|
|
input_types = ['cls_emb', 'channel_emb', 'raw']
|
|
selected_input_type = input_types[1]
|
|
|
|
if selected_input_type in ['cls_emb', 'channel_emb']:
|
|
dataset = lwm_inference(preprocessed_chs, selected_input_type, model, device)
|
|
else:
|
|
dataset = create_raw_dataset(preprocessed_chs, device)
|
|
|
|
from input_preprocess import create_labels
|
|
n_beams = 16
|
|
tasks = ['LoS/NLoS Classification', 'Beam Prediction']
|
|
task = tasks[0]
|
|
labels = create_labels(task, selected_scenario_names, n_beams=n_beams)
|
|
|
|
|
|
|
|
from utils import plot_dimensionality_reduction
|
|
|
|
|
|
for task in tasks:
|
|
|
|
|
|
labels = create_labels(task, selected_scenario_names, n_beams=n_beams)
|
|
|
|
|
|
for input_type_idx, input_type in enumerate(input_types):
|
|
|
|
|
|
selected_input_type = input_types[input_type_idx]
|
|
|
|
|
|
if selected_input_type in ['cls_emb', 'channel_emb']:
|
|
dataset = lwm_inference(
|
|
preprocessed_chs,
|
|
selected_input_type,
|
|
model,
|
|
device
|
|
)
|
|
else:
|
|
dataset = create_raw_dataset(preprocessed_chs, device)
|
|
|
|
|
|
plot_dimensionality_reduction(
|
|
dataset,
|
|
method='all',
|
|
labels=labels,
|
|
task=task,
|
|
input_type=input_type
|
|
)
|
|
|
|
|
|
|
|
task = ['LoS/NLoS Classification', 'Beam Prediction'][0]
|
|
n_trials = 1
|
|
num_classes = 2 if task == 'LoS/NLoS Classification' else n_beams
|
|
input_types = ['raw', 'cls_emb']
|
|
split_ratios = np.array([.005, .0075, .01, .015, .02, .03,
|
|
.05, .1, .25, .5, .8])
|
|
f1_scores = np.zeros((n_trials, len(input_types), len(split_ratios)))
|
|
labels = create_labels(task, selected_scenario_names, n_beams=n_beams)
|
|
|
|
|
|
from utils import get_data_loaders, FCN, train_model, plot_metrics
|
|
|
|
|
|
for input_type_idx, input_type in enumerate(input_types):
|
|
|
|
|
|
if input_type in ['cls_emb', 'channel_emb']:
|
|
dataset = lwm_inference(preprocessed_chs, input_type, model, device)
|
|
else:
|
|
dataset = create_raw_dataset(preprocessed_chs, device)
|
|
|
|
|
|
dataset = dataset.view(dataset.size(0), -1)
|
|
input_dim = dataset.shape[-1]
|
|
|
|
|
|
for split_ratio_idx, split_ratio in enumerate(split_ratios):
|
|
|
|
n_train = int(split_ratio * dataset.shape[0])
|
|
|
|
|
|
for trial in range(n_trials):
|
|
|
|
print(f"\ninput type: {input_type}, \nnumber of training samples: {int(split_ratio*len(dataset))}, \ntrial: {trial}\n")
|
|
|
|
torch.manual_seed(trial)
|
|
|
|
if snr_db is not None:
|
|
preprocessed_chs = tokenizer(
|
|
selected_scenario_names=selected_scenario_names,
|
|
manual_data=None,
|
|
gen_raw=True,
|
|
snr_db=snr_db
|
|
)
|
|
if input_type in ['cls_emb', 'channel_emb']:
|
|
dataset = lwm_inference(preprocessed_chs, input_type, model, device)
|
|
else:
|
|
dataset = create_raw_dataset(preprocessed_chs, device)
|
|
dataset = dataset.view(dataset.size(0), -1)
|
|
|
|
train_loader, test_loader = get_data_loaders(
|
|
dataset,
|
|
labels,
|
|
batch_size=128,
|
|
split_ratio=split_ratio
|
|
)
|
|
|
|
|
|
FCN_model = FCN(input_dim=input_dim, num_classes=num_classes)
|
|
|
|
|
|
train_losses, test_f1_scores = train_model(
|
|
FCN_model,
|
|
train_loader,
|
|
test_loader,
|
|
epochs=120,
|
|
lr=0.0001 if input_type == "raw" else 0.001,
|
|
device=device,
|
|
decay_step=30,
|
|
decay_rate=0.5
|
|
)
|
|
|
|
|
|
f1_scores[trial, input_type_idx, split_ratio_idx] = test_f1_scores[0, -1]
|
|
|
|
|
|
|
|
|
|
|
|
plot_metrics(
|
|
np.mean(f1_scores, axis=0),
|
|
input_types,
|
|
np.asarray(split_ratios * dataset.shape[0], dtype=int),
|
|
flag=1
|
|
)
|
|
|
|
|
|
|
|
|
|
f1_scores_knn = np.zeros((n_trials, len(input_types), len(split_ratios)))
|
|
|
|
|
|
from utils import classify_by_euclidean_distance
|
|
|
|
|
|
for input_type_idx, input_type in enumerate(input_types):
|
|
|
|
|
|
if input_type in ['cls_emb', 'channel_emb']:
|
|
dataset = lwm_inference(preprocessed_chs, input_type, model, device)
|
|
else:
|
|
dataset = create_raw_dataset(preprocessed_chs, device)
|
|
|
|
|
|
dataset = dataset.view(dataset.size(0), -1)
|
|
input_dim = dataset.shape[-1]
|
|
|
|
|
|
for split_ratio_idx, split_ratio in enumerate(split_ratios):
|
|
|
|
n_train = int(split_ratio * dataset.shape[0])
|
|
|
|
|
|
for trial in range(n_trials):
|
|
|
|
torch.manual_seed(trial)
|
|
|
|
if snr_db is not None:
|
|
preprocessed_chs = tokenizer(
|
|
selected_scenario_names=selected_scenario_names,
|
|
manual_data=None,
|
|
gen_raw=True,
|
|
snr_db=snr_db
|
|
)
|
|
if input_type in ['cls_emb', 'channel_emb']:
|
|
dataset = lwm_inference(preprocessed_chs, input_type, model, device)
|
|
else:
|
|
dataset = create_raw_dataset(preprocessed_chs, device)
|
|
dataset = dataset.view(dataset.size(0), -1)
|
|
|
|
train_loader, test_loader = get_data_loaders(
|
|
dataset,
|
|
labels,
|
|
batch_size=128,
|
|
split_ratio=split_ratio
|
|
)
|
|
|
|
|
|
f1 = classify_by_euclidean_distance(
|
|
train_loader,
|
|
test_loader,
|
|
device="cpu"
|
|
)
|
|
|
|
|
|
f1_scores_knn[trial, input_type_idx, split_ratio_idx] = f1
|
|
|
|
|
|
plot_metrics(
|
|
np.mean(f1_scores_knn, axis=0),
|
|
input_types,
|
|
np.asarray(split_ratios * dataset.shape[0], dtype=int),
|
|
flag=1
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|