Sadjad Alikhani
Update app.py
c6aa746 verified
raw
history blame
15.4 kB
import gradio as gr
import os
from PIL import Image
import numpy as np
import pickle
import io
import sys
import torch
import subprocess
import h5py
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
# Paths to the predefined images folder
RAW_PATH = os.path.join("images", "raw")
EMBEDDINGS_PATH = os.path.join("images", "embeddings")
# Specific values for percentage of data for training
percentage_values = (np.arange(9) + 1)*10
def beam_prediction_task(data_percentage, task_complexity):
# Folder naming convention based on input_type, data_percentage, and task_complexity
raw_folder = f"images/raw_{data_percentage/100:.1f}_{task_complexity}"
embeddings_folder = f"images/embeddings_{data_percentage/100:.1f}_{task_complexity}"
# Process raw confusion matrix
raw_cm = compute_average_confusion_matrix(raw_folder)
if raw_cm is not None:
raw_cm_path = os.path.join(raw_folder, "confusion_matrix_raw.png")
plot_confusion_matrix(raw_cm, classes=np.arange(raw_cm.shape[0]), title=f"Raw Confusion Matrix ({data_percentage}% data, {task_complexity} beams)", save_path=raw_cm_path)
raw_img = Image.open(raw_cm_path)
else:
raw_img = None
# Process embeddings confusion matrix
embeddings_cm = compute_average_confusion_matrix(embeddings_folder)
if embeddings_cm is not None:
embeddings_cm_path = os.path.join(embeddings_folder, "confusion_matrix_embeddings.png")
plot_confusion_matrix(embeddings_cm, classes=np.arange(embeddings_cm.shape[0]), title=f"Embeddings Confusion Matrix ({data_percentage}% data, {task_complexity} beams)", save_path=embeddings_cm_path)
embeddings_img = Image.open(embeddings_cm_path)
else:
embeddings_img = None
return raw_img, embeddings_img
# Function to compute the average confusion matrix across CSV files in a folder
def compute_average_confusion_matrix(folder):
confusion_matrices = []
for file in os.listdir(folder):
if file.endswith(".csv"):
data = pd.read_csv(os.path.join(folder, file))
y_true = data["Target"]
y_pred = data["Top-1 Prediction"]
num_labels = len(np.unique(y_true))
cm = confusion_matrix(y_true, y_pred, labels=np.arange(num_labels))
confusion_matrices.append(cm)
if confusion_matrices:
avg_cm = np.mean(confusion_matrices, axis=0)
return avg_cm
else:
return None
# Custom class to capture print output
class PrintCapture(io.StringIO):
def __init__(self):
super().__init__()
self.output = []
def write(self, txt):
self.output.append(txt)
super().write(txt)
def get_output(self):
return ''.join(self.output)
# Function to load and display predefined images based on user selection
def display_predefined_images(percentage_idx):
percentage = percentage_values[percentage_idx]
raw_image_path = os.path.join(RAW_PATH, f"percentage_{percentage}_complexity_16.png")
embeddings_image_path = os.path.join(EMBEDDINGS_PATH, f"percentage_{percentage}_complexity_16.png")
# Check if the images exist
if os.path.exists(raw_image_path):
raw_image = Image.open(raw_image_path)
else:
raw_image = create_random_image() # Use a fallback random image
if os.path.exists(embeddings_image_path):
embeddings_image = Image.open(embeddings_image_path)
else:
embeddings_image = create_random_image() # Use a fallback random image
return raw_image, embeddings_image
# Updated los_nlos_classification to handle missing outputs properly
def los_nlos_classification(file, percentage_idx):
if file is not None:
raw_cm_image, emb_cm_image, console_output = process_hdf5_file(file, percentage_idx)
return raw_cm_image, emb_cm_image, console_output
else:
raw_image, embeddings_image = display_predefined_images(percentage_idx)
return raw_image, embeddings_image, "No file uploaded. Displaying predefined images."
# Function to create random images for LoS/NLoS classification results
def create_random_image(size=(300, 300)):
random_image = np.random.rand(*size, 3) * 255
return Image.fromarray(random_image.astype('uint8'))
# Function to load the pre-trained model from your cloned repository
def load_custom_model():
from lwm_model import LWM # Assuming the model is defined in lwm_model.py
model = LWM() # Modify this according to your model initialization
model.eval()
return model
import importlib.util
# Function to dynamically load a Python module from a given file path
def load_module_from_path(module_name, file_path):
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
# Function to split dataset into training and test sets based on user selection
def split_dataset(channels, labels, percentage_idx):
percentage = percentage_values[percentage_idx] / 100
num_samples = channels.shape[0]
train_size = int(num_samples * percentage)
print(f'Number of Training Samples: {train_size}')
indices = np.arange(num_samples)
np.random.shuffle(indices)
train_idx, test_idx = indices[:train_size], indices[train_size:]
train_data, test_data = channels[train_idx], channels[test_idx]
train_labels, test_labels = labels[train_idx], labels[test_idx]
return train_data, test_data, train_labels, test_labels
# Function to calculate Euclidean distance between a point and a centroid
def euclidean_distance(x, centroid):
return np.linalg.norm(x - centroid)
import torch
def classify_based_on_distance(train_data, train_labels, test_data):
# Compute the centroids for the two classes
centroid_0 = train_data[train_labels == 0].mean(dim=0) # Use torch.mean
centroid_1 = train_data[train_labels == 1].mean(dim=0) # Use torch.mean
predictions = []
for test_point in test_data:
# Compute Euclidean distance between the test point and each centroid
dist_0 = euclidean_distance(test_point, centroid_0)
dist_1 = euclidean_distance(test_point, centroid_1)
predictions.append(0 if dist_0 < dist_1 else 1)
return torch.tensor(predictions) # Return predictions as a PyTorch tensor
# Function to generate confusion matrix plot
def plot_confusion_matrix(y_true, y_pred, title):
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(5, 5))
plt.imshow(cm, cmap='Blues')
plt.title(title)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.colorbar()
# Add labels for x and y ticks (Actual/Predicted class labels)
plt.xticks([0, 1], labels=[0, 1])
plt.yticks([0, 1], labels=[0, 1])
# Annotate the confusion matrix
thresh = cm.max() / 2 # Define threshold to choose text color (black or white)
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
plt.text(j, i, format(cm[i, j], 'd'),
ha="center", va="center",
color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.savefig(f"{title}.png")
return Image.open(f"{title}.png")
def identical_train_test_split(output_emb, output_raw, labels, percentage_idx):
N = output_emb.shape[0] # Get the total number of samples
# Generate the indices for shuffling and splitting
indices = torch.randperm(N) # Randomly shuffle the indices
# Calculate the split index
split_index = int(N * percentage_values[percentage_idx]/100)
print(f'Training Size: {split_index}')
# Split indices into train and test
train_indices = indices[:split_index] # First 80% for training
test_indices = indices[split_index:] # Remaining 20% for testing
# Select the same indices from both output_emb and output_raw
train_emb = output_emb[train_indices]
test_emb = output_emb[test_indices]
train_raw = output_raw[train_indices]
test_raw = output_raw[test_indices]
train_labels = labels[train_indices]
test_labels = labels[test_indices]
return train_emb, test_emb, train_raw, test_raw, train_labels, test_labels
# Store the original working directory when the app starts
original_dir = os.getcwd()
def process_hdf5_file(uploaded_file, percentage_idx):
capture = PrintCapture()
sys.stdout = capture # Redirect print statements to capture
try:
model_repo_url = "https://huggingface.co/sadjadalikhani/LWM"
model_repo_dir = "./LWM"
# Step 1: Clone the repository if not already done
if not os.path.exists(model_repo_dir):
print(f"Cloning model repository from {model_repo_url}...")
subprocess.run(["git", "clone", model_repo_url, model_repo_dir], check=True)
# Step 2: Verify the repository was cloned and change the working directory
repo_work_dir = os.path.join(original_dir, model_repo_dir)
if os.path.exists(repo_work_dir):
os.chdir(repo_work_dir) # Change the working directory only once
print(f"Changed working directory to {os.getcwd()}")
print(f"Directory content: {os.listdir(os.getcwd())}") # Debugging: Check repo content
else:
print(f"Directory {repo_work_dir} does not exist.")
return
# Step 3: Dynamically load lwm_model.py, input_preprocess.py, and inference.py
lwm_model_path = os.path.join(os.getcwd(), 'lwm_model.py')
input_preprocess_path = os.path.join(os.getcwd(), 'input_preprocess.py')
inference_path = os.path.join(os.getcwd(), 'inference.py')
# Load lwm_model
lwm_model = load_module_from_path("lwm_model", lwm_model_path)
# Load input_preprocess
input_preprocess = load_module_from_path("input_preprocess", input_preprocess_path)
# Load inference
inference = load_module_from_path("inference", inference_path)
# Step 4: Load the model from lwm_model module
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Loading the LWM model on {device}...")
model = lwm_model.LWM.from_pretrained(device=device).to(torch.float32)
# Step 5: Load the HDF5 file and extract the channels and labels
with h5py.File(uploaded_file.name, 'r') as f:
channels = np.array(f['channels']) # Assuming 'channels' dataset in the HDF5 file
labels = np.array(f['labels']) # Assuming 'labels' dataset in the HDF5 file
print(f"Loaded dataset with {channels.shape[0]} samples.")
# Step 7: Tokenize the data using the tokenizer from input_preprocess
preprocessed_chs = input_preprocess.tokenizer(manual_data=channels)
#print(preprocessed_chs[0][0][1])
# Step 7: Perform inference using the functions from inference.py
output_emb = inference.lwm_inference(preprocessed_chs, 'cls_emb', model)
output_raw = inference.create_raw_dataset(preprocessed_chs, device)
print(f"Output Embeddings Shape: {output_emb.shape}")
print(f"Output Raw Shape: {output_raw.shape}")
print(f'percentage_idx: {percentage_idx}')
print(f'percentage_value: {percentage_values[percentage_idx]}')
train_data_emb, test_data_emb, train_data_raw, test_data_raw, train_labels, test_labels = identical_train_test_split(output_emb.view(len(output_emb),-1),
output_raw.view(len(output_raw),-1),
labels,
percentage_idx)
# Step 8: Perform classification using the Euclidean distance for both raw and embeddings
print(f'train_data_emb: {train_data_emb.shape}')
print(f'train_labels: {train_labels.shape}')
print(f'test_data_emb: {test_data_emb.shape}')
pred_raw = classify_based_on_distance(train_data_raw, train_labels, test_data_raw)
pred_emb = classify_based_on_distance(train_data_emb, train_labels, test_data_emb)
#print(f'pred_emb: {pred_emb}')
#print(f'actual labels: {test_labels}')
# Step 9: Generate confusion matrices for both raw and embeddings
raw_cm_image = plot_confusion_matrix(test_labels, pred_raw, title="Confusion Matrix (Raw Channels)")
emb_cm_image = plot_confusion_matrix(test_labels, pred_emb, title="Confusion Matrix (Embeddings)")
return raw_cm_image, emb_cm_image, capture.get_output()
except Exception as e:
return str(e), str(e), capture.get_output()
finally:
# Always return to the original working directory after processing
os.chdir(original_dir)
sys.stdout = sys.__stdout__ # Reset print statements
# Define the Gradio interface
with gr.Blocks(css="""
.slider-container {
display: inline-block;
margin-right: 50px;
text-align: center;
}
""") as demo:
# Tab for Beam Prediction Task
with gr.Tab("Beam Prediction Task"):
gr.Markdown("### Beam Prediction Task")
with gr.Row():
with gr.Column():
data_percentage_slider = gr.Slider(label="Data Percentage for Training", minimum=10, maximum=100, step=10, value=10)
task_complexity_slider = gr.Slider(label="Task Complexity (Number of Beams)", minimum=16, maximum=256, value=16, choices=[16, 32, 64, 128, 256])
with gr.Row():
raw_img_bp = gr.Image(label="Raw Channels", type="pil", width=300, height=300)
embeddings_img_bp = gr.Image(label="Embeddings", type="pil", width=300, height=300)
# Update the confusion matrices whenever sliders change
data_percentage_slider.change(fn=beam_prediction_task, inputs=[data_percentage_slider, task_complexity_slider], outputs=[raw_img_bp, embeddings_img_bp])
task_complexity_slider.change(fn=beam_prediction_task, inputs=[data_percentage_slider, task_complexity_slider], outputs=[raw_img_bp, embeddings_img_bp])
# Separate Tab for LoS/NLoS Classification Task
with gr.Tab("LoS/NLoS Classification Task"):
gr.Markdown("### LoS/NLoS Classification Task")
file_input = gr.File(label="Upload HDF5 Dataset", file_types=[".h5"])
with gr.Row():
percentage_dropdown_los = gr.Dropdown(choices=[0, 1, 2, 3, 4, 5, 6, 7, 8], value=0, label="Percentage of Data for Training")
with gr.Row():
raw_img_los = gr.Image(label="Raw Channels", type="pil", width=300, height=300)
embeddings_img_los = gr.Image(label="Embeddings", type="pil", width=300, height=300)
output_textbox = gr.Textbox(label="Console Output", lines=10)
# Placeholder for LoS/NLoS classification function (already implemented in your previous code)
file_input.change(fn=los_nlos_classification, inputs=[file_input, percentage_dropdown_los], outputs=[raw_img_los, embeddings_img_los, output_textbox])
# Launch the app
if __name__ == "__main__":
demo.launch()