Spaces:
Running
Running
File size: 15,437 Bytes
8f8b054 cacf045 2a77201 340b448 d1b5811 8f8b054 6d6f3c6 4de2908 8f8b054 c6aa746 2a77201 11dd9d5 6d6f3c6 11dd9d5 d1b5811 11dd9d5 469d918 11dd9d5 d1b5811 11dd9d5 469d918 11dd9d5 ef6f553 11dd9d5 d1b5811 11dd9d5 6d6f3c6 11dd9d5 6d6f3c6 469d918 11dd9d5 469d918 11dd9d5 469d918 11dd9d5 469d918 d1b5811 469d918 d1b5811 b837f81 d1b5811 b837f81 d1b5811 b837f81 11dd9d5 9e4473c 11dd9d5 ef6f553 11dd9d5 ef6f553 11dd9d5 b75d54e 11dd9d5 2587718 469d918 11dd9d5 2587718 6d6f3c6 2587718 469d918 11dd9d5 ef6f553 11dd9d5 d1b5811 11dd9d5 d1b5811 a392854 11dd9d5 06b54b2 11dd9d5 ca2a3f1 a392854 11dd9d5 d1b5811 17eb0ae 11dd9d5 512ef82 bef8ac3 6d6f3c6 7d365eb 9e4473c 11dd9d5 6d6f3c6 11dd9d5 6d6f3c6 b837f81 11dd9d5 6d6f3c6 11dd9d5 8f8b054 6d6f3c6 8f8b054 6d6f3c6 8f8b054 11dd9d5 c6aa746 11dd9d5 c6aa746 11dd9d5 c6aa746 11dd9d5 c6aa746 11dd9d5 c6aa746 8f8b054 d1b5811 469d918 8f8b054 c6aa746 6d6f3c6 c6aa746 6d6f3c6 469d918 c6aa746 11dd9d5 8f8b054 c6aa746 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
import gradio as gr
import os
from PIL import Image
import numpy as np
import pickle
import io
import sys
import torch
import subprocess
import h5py
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
# Paths to the predefined images folder
RAW_PATH = os.path.join("images", "raw")
EMBEDDINGS_PATH = os.path.join("images", "embeddings")
# Specific values for percentage of data for training
percentage_values = (np.arange(9) + 1)*10
def beam_prediction_task(data_percentage, task_complexity):
# Folder naming convention based on input_type, data_percentage, and task_complexity
raw_folder = f"images/raw_{data_percentage/100:.1f}_{task_complexity}"
embeddings_folder = f"images/embeddings_{data_percentage/100:.1f}_{task_complexity}"
# Process raw confusion matrix
raw_cm = compute_average_confusion_matrix(raw_folder)
if raw_cm is not None:
raw_cm_path = os.path.join(raw_folder, "confusion_matrix_raw.png")
plot_confusion_matrix(raw_cm, classes=np.arange(raw_cm.shape[0]), title=f"Raw Confusion Matrix ({data_percentage}% data, {task_complexity} beams)", save_path=raw_cm_path)
raw_img = Image.open(raw_cm_path)
else:
raw_img = None
# Process embeddings confusion matrix
embeddings_cm = compute_average_confusion_matrix(embeddings_folder)
if embeddings_cm is not None:
embeddings_cm_path = os.path.join(embeddings_folder, "confusion_matrix_embeddings.png")
plot_confusion_matrix(embeddings_cm, classes=np.arange(embeddings_cm.shape[0]), title=f"Embeddings Confusion Matrix ({data_percentage}% data, {task_complexity} beams)", save_path=embeddings_cm_path)
embeddings_img = Image.open(embeddings_cm_path)
else:
embeddings_img = None
return raw_img, embeddings_img
# Function to compute the average confusion matrix across CSV files in a folder
def compute_average_confusion_matrix(folder):
confusion_matrices = []
for file in os.listdir(folder):
if file.endswith(".csv"):
data = pd.read_csv(os.path.join(folder, file))
y_true = data["Target"]
y_pred = data["Top-1 Prediction"]
num_labels = len(np.unique(y_true))
cm = confusion_matrix(y_true, y_pred, labels=np.arange(num_labels))
confusion_matrices.append(cm)
if confusion_matrices:
avg_cm = np.mean(confusion_matrices, axis=0)
return avg_cm
else:
return None
# Custom class to capture print output
class PrintCapture(io.StringIO):
def __init__(self):
super().__init__()
self.output = []
def write(self, txt):
self.output.append(txt)
super().write(txt)
def get_output(self):
return ''.join(self.output)
# Function to load and display predefined images based on user selection
def display_predefined_images(percentage_idx):
percentage = percentage_values[percentage_idx]
raw_image_path = os.path.join(RAW_PATH, f"percentage_{percentage}_complexity_16.png")
embeddings_image_path = os.path.join(EMBEDDINGS_PATH, f"percentage_{percentage}_complexity_16.png")
# Check if the images exist
if os.path.exists(raw_image_path):
raw_image = Image.open(raw_image_path)
else:
raw_image = create_random_image() # Use a fallback random image
if os.path.exists(embeddings_image_path):
embeddings_image = Image.open(embeddings_image_path)
else:
embeddings_image = create_random_image() # Use a fallback random image
return raw_image, embeddings_image
# Updated los_nlos_classification to handle missing outputs properly
def los_nlos_classification(file, percentage_idx):
if file is not None:
raw_cm_image, emb_cm_image, console_output = process_hdf5_file(file, percentage_idx)
return raw_cm_image, emb_cm_image, console_output
else:
raw_image, embeddings_image = display_predefined_images(percentage_idx)
return raw_image, embeddings_image, "No file uploaded. Displaying predefined images."
# Function to create random images for LoS/NLoS classification results
def create_random_image(size=(300, 300)):
random_image = np.random.rand(*size, 3) * 255
return Image.fromarray(random_image.astype('uint8'))
# Function to load the pre-trained model from your cloned repository
def load_custom_model():
from lwm_model import LWM # Assuming the model is defined in lwm_model.py
model = LWM() # Modify this according to your model initialization
model.eval()
return model
import importlib.util
# Function to dynamically load a Python module from a given file path
def load_module_from_path(module_name, file_path):
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
# Function to split dataset into training and test sets based on user selection
def split_dataset(channels, labels, percentage_idx):
percentage = percentage_values[percentage_idx] / 100
num_samples = channels.shape[0]
train_size = int(num_samples * percentage)
print(f'Number of Training Samples: {train_size}')
indices = np.arange(num_samples)
np.random.shuffle(indices)
train_idx, test_idx = indices[:train_size], indices[train_size:]
train_data, test_data = channels[train_idx], channels[test_idx]
train_labels, test_labels = labels[train_idx], labels[test_idx]
return train_data, test_data, train_labels, test_labels
# Function to calculate Euclidean distance between a point and a centroid
def euclidean_distance(x, centroid):
return np.linalg.norm(x - centroid)
import torch
def classify_based_on_distance(train_data, train_labels, test_data):
# Compute the centroids for the two classes
centroid_0 = train_data[train_labels == 0].mean(dim=0) # Use torch.mean
centroid_1 = train_data[train_labels == 1].mean(dim=0) # Use torch.mean
predictions = []
for test_point in test_data:
# Compute Euclidean distance between the test point and each centroid
dist_0 = euclidean_distance(test_point, centroid_0)
dist_1 = euclidean_distance(test_point, centroid_1)
predictions.append(0 if dist_0 < dist_1 else 1)
return torch.tensor(predictions) # Return predictions as a PyTorch tensor
# Function to generate confusion matrix plot
def plot_confusion_matrix(y_true, y_pred, title):
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(5, 5))
plt.imshow(cm, cmap='Blues')
plt.title(title)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.colorbar()
# Add labels for x and y ticks (Actual/Predicted class labels)
plt.xticks([0, 1], labels=[0, 1])
plt.yticks([0, 1], labels=[0, 1])
# Annotate the confusion matrix
thresh = cm.max() / 2 # Define threshold to choose text color (black or white)
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
plt.text(j, i, format(cm[i, j], 'd'),
ha="center", va="center",
color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.savefig(f"{title}.png")
return Image.open(f"{title}.png")
def identical_train_test_split(output_emb, output_raw, labels, percentage_idx):
N = output_emb.shape[0] # Get the total number of samples
# Generate the indices for shuffling and splitting
indices = torch.randperm(N) # Randomly shuffle the indices
# Calculate the split index
split_index = int(N * percentage_values[percentage_idx]/100)
print(f'Training Size: {split_index}')
# Split indices into train and test
train_indices = indices[:split_index] # First 80% for training
test_indices = indices[split_index:] # Remaining 20% for testing
# Select the same indices from both output_emb and output_raw
train_emb = output_emb[train_indices]
test_emb = output_emb[test_indices]
train_raw = output_raw[train_indices]
test_raw = output_raw[test_indices]
train_labels = labels[train_indices]
test_labels = labels[test_indices]
return train_emb, test_emb, train_raw, test_raw, train_labels, test_labels
# Store the original working directory when the app starts
original_dir = os.getcwd()
def process_hdf5_file(uploaded_file, percentage_idx):
capture = PrintCapture()
sys.stdout = capture # Redirect print statements to capture
try:
model_repo_url = "https://huggingface.co/sadjadalikhani/LWM"
model_repo_dir = "./LWM"
# Step 1: Clone the repository if not already done
if not os.path.exists(model_repo_dir):
print(f"Cloning model repository from {model_repo_url}...")
subprocess.run(["git", "clone", model_repo_url, model_repo_dir], check=True)
# Step 2: Verify the repository was cloned and change the working directory
repo_work_dir = os.path.join(original_dir, model_repo_dir)
if os.path.exists(repo_work_dir):
os.chdir(repo_work_dir) # Change the working directory only once
print(f"Changed working directory to {os.getcwd()}")
print(f"Directory content: {os.listdir(os.getcwd())}") # Debugging: Check repo content
else:
print(f"Directory {repo_work_dir} does not exist.")
return
# Step 3: Dynamically load lwm_model.py, input_preprocess.py, and inference.py
lwm_model_path = os.path.join(os.getcwd(), 'lwm_model.py')
input_preprocess_path = os.path.join(os.getcwd(), 'input_preprocess.py')
inference_path = os.path.join(os.getcwd(), 'inference.py')
# Load lwm_model
lwm_model = load_module_from_path("lwm_model", lwm_model_path)
# Load input_preprocess
input_preprocess = load_module_from_path("input_preprocess", input_preprocess_path)
# Load inference
inference = load_module_from_path("inference", inference_path)
# Step 4: Load the model from lwm_model module
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Loading the LWM model on {device}...")
model = lwm_model.LWM.from_pretrained(device=device).to(torch.float32)
# Step 5: Load the HDF5 file and extract the channels and labels
with h5py.File(uploaded_file.name, 'r') as f:
channels = np.array(f['channels']) # Assuming 'channels' dataset in the HDF5 file
labels = np.array(f['labels']) # Assuming 'labels' dataset in the HDF5 file
print(f"Loaded dataset with {channels.shape[0]} samples.")
# Step 7: Tokenize the data using the tokenizer from input_preprocess
preprocessed_chs = input_preprocess.tokenizer(manual_data=channels)
#print(preprocessed_chs[0][0][1])
# Step 7: Perform inference using the functions from inference.py
output_emb = inference.lwm_inference(preprocessed_chs, 'cls_emb', model)
output_raw = inference.create_raw_dataset(preprocessed_chs, device)
print(f"Output Embeddings Shape: {output_emb.shape}")
print(f"Output Raw Shape: {output_raw.shape}")
print(f'percentage_idx: {percentage_idx}')
print(f'percentage_value: {percentage_values[percentage_idx]}')
train_data_emb, test_data_emb, train_data_raw, test_data_raw, train_labels, test_labels = identical_train_test_split(output_emb.view(len(output_emb),-1),
output_raw.view(len(output_raw),-1),
labels,
percentage_idx)
# Step 8: Perform classification using the Euclidean distance for both raw and embeddings
print(f'train_data_emb: {train_data_emb.shape}')
print(f'train_labels: {train_labels.shape}')
print(f'test_data_emb: {test_data_emb.shape}')
pred_raw = classify_based_on_distance(train_data_raw, train_labels, test_data_raw)
pred_emb = classify_based_on_distance(train_data_emb, train_labels, test_data_emb)
#print(f'pred_emb: {pred_emb}')
#print(f'actual labels: {test_labels}')
# Step 9: Generate confusion matrices for both raw and embeddings
raw_cm_image = plot_confusion_matrix(test_labels, pred_raw, title="Confusion Matrix (Raw Channels)")
emb_cm_image = plot_confusion_matrix(test_labels, pred_emb, title="Confusion Matrix (Embeddings)")
return raw_cm_image, emb_cm_image, capture.get_output()
except Exception as e:
return str(e), str(e), capture.get_output()
finally:
# Always return to the original working directory after processing
os.chdir(original_dir)
sys.stdout = sys.__stdout__ # Reset print statements
# Define the Gradio interface
with gr.Blocks(css="""
.slider-container {
display: inline-block;
margin-right: 50px;
text-align: center;
}
""") as demo:
# Tab for Beam Prediction Task
with gr.Tab("Beam Prediction Task"):
gr.Markdown("### Beam Prediction Task")
with gr.Row():
with gr.Column():
data_percentage_slider = gr.Slider(label="Data Percentage for Training", minimum=10, maximum=100, step=10, value=10)
task_complexity_slider = gr.Slider(label="Task Complexity (Number of Beams)", minimum=16, maximum=256, value=16, choices=[16, 32, 64, 128, 256])
with gr.Row():
raw_img_bp = gr.Image(label="Raw Channels", type="pil", width=300, height=300)
embeddings_img_bp = gr.Image(label="Embeddings", type="pil", width=300, height=300)
# Update the confusion matrices whenever sliders change
data_percentage_slider.change(fn=beam_prediction_task, inputs=[data_percentage_slider, task_complexity_slider], outputs=[raw_img_bp, embeddings_img_bp])
task_complexity_slider.change(fn=beam_prediction_task, inputs=[data_percentage_slider, task_complexity_slider], outputs=[raw_img_bp, embeddings_img_bp])
# Separate Tab for LoS/NLoS Classification Task
with gr.Tab("LoS/NLoS Classification Task"):
gr.Markdown("### LoS/NLoS Classification Task")
file_input = gr.File(label="Upload HDF5 Dataset", file_types=[".h5"])
with gr.Row():
percentage_dropdown_los = gr.Dropdown(choices=[0, 1, 2, 3, 4, 5, 6, 7, 8], value=0, label="Percentage of Data for Training")
with gr.Row():
raw_img_los = gr.Image(label="Raw Channels", type="pil", width=300, height=300)
embeddings_img_los = gr.Image(label="Embeddings", type="pil", width=300, height=300)
output_textbox = gr.Textbox(label="Console Output", lines=10)
# Placeholder for LoS/NLoS classification function (already implemented in your previous code)
file_input.change(fn=los_nlos_classification, inputs=[file_input, percentage_dropdown_los], outputs=[raw_img_los, embeddings_img_los, output_textbox])
# Launch the app
if __name__ == "__main__":
demo.launch()
|