File size: 15,437 Bytes
8f8b054
 
 
 
cacf045
2a77201
 
340b448
 
d1b5811
 
 
8f8b054
 
 
 
 
6d6f3c6
4de2908
8f8b054
c6aa746
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2a77201
 
 
 
 
 
 
 
 
 
 
 
 
11dd9d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6d6f3c6
 
 
 
 
11dd9d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d1b5811
11dd9d5
 
 
 
 
469d918
11dd9d5
 
d1b5811
11dd9d5
469d918
11dd9d5
 
ef6f553
11dd9d5
d1b5811
11dd9d5
 
 
6d6f3c6
11dd9d5
6d6f3c6
469d918
11dd9d5
 
 
469d918
 
 
11dd9d5
 
 
469d918
 
11dd9d5
469d918
d1b5811
 
 
469d918
d1b5811
 
 
 
 
b837f81
 
d1b5811
 
b837f81
 
 
 
 
 
 
 
 
d1b5811
 
 
 
b837f81
11dd9d5
 
 
 
 
 
 
9e4473c
11dd9d5
 
 
 
 
 
 
 
 
 
 
 
ef6f553
11dd9d5
 
 
 
ef6f553
11dd9d5
 
b75d54e
11dd9d5
 
 
 
 
2587718
 
469d918
11dd9d5
2587718
6d6f3c6
2587718
469d918
11dd9d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ef6f553
11dd9d5
 
d1b5811
11dd9d5
 
d1b5811
a392854
11dd9d5
06b54b2
11dd9d5
ca2a3f1
a392854
11dd9d5
 
 
 
 
 
 
d1b5811
17eb0ae
11dd9d5
 
512ef82
bef8ac3
 
6d6f3c6
 
 
7d365eb
9e4473c
11dd9d5
 
 
 
6d6f3c6
11dd9d5
 
 
 
6d6f3c6
 
b837f81
 
11dd9d5
6d6f3c6
 
 
11dd9d5
 
 
 
 
 
 
 
 
8f8b054
6d6f3c6
8f8b054
 
6d6f3c6
 
8f8b054
 
 
11dd9d5
c6aa746
11dd9d5
 
 
 
c6aa746
 
 
11dd9d5
 
c6aa746
 
11dd9d5
c6aa746
 
 
11dd9d5
c6aa746
8f8b054
 
d1b5811
469d918
8f8b054
c6aa746
6d6f3c6
c6aa746
 
6d6f3c6
469d918
c6aa746
11dd9d5
8f8b054
 
 
c6aa746
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
import gradio as gr
import os
from PIL import Image
import numpy as np
import pickle
import io
import sys
import torch
import subprocess
import h5py
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

# Paths to the predefined images folder
RAW_PATH = os.path.join("images", "raw")
EMBEDDINGS_PATH = os.path.join("images", "embeddings")

# Specific values for percentage of data for training
percentage_values = (np.arange(9) + 1)*10




def beam_prediction_task(data_percentage, task_complexity):
    # Folder naming convention based on input_type, data_percentage, and task_complexity
    raw_folder = f"images/raw_{data_percentage/100:.1f}_{task_complexity}"
    embeddings_folder = f"images/embeddings_{data_percentage/100:.1f}_{task_complexity}"

    # Process raw confusion matrix
    raw_cm = compute_average_confusion_matrix(raw_folder)
    if raw_cm is not None:
        raw_cm_path = os.path.join(raw_folder, "confusion_matrix_raw.png")
        plot_confusion_matrix(raw_cm, classes=np.arange(raw_cm.shape[0]), title=f"Raw Confusion Matrix ({data_percentage}% data, {task_complexity} beams)", save_path=raw_cm_path)
        raw_img = Image.open(raw_cm_path)
    else:
        raw_img = None

    # Process embeddings confusion matrix
    embeddings_cm = compute_average_confusion_matrix(embeddings_folder)
    if embeddings_cm is not None:
        embeddings_cm_path = os.path.join(embeddings_folder, "confusion_matrix_embeddings.png")
        plot_confusion_matrix(embeddings_cm, classes=np.arange(embeddings_cm.shape[0]), title=f"Embeddings Confusion Matrix ({data_percentage}% data, {task_complexity} beams)", save_path=embeddings_cm_path)
        embeddings_img = Image.open(embeddings_cm_path)
    else:
        embeddings_img = None

    return raw_img, embeddings_img


# Function to compute the average confusion matrix across CSV files in a folder
def compute_average_confusion_matrix(folder):
    confusion_matrices = []
    for file in os.listdir(folder):
        if file.endswith(".csv"):
            data = pd.read_csv(os.path.join(folder, file))
            y_true = data["Target"]
            y_pred = data["Top-1 Prediction"]
            num_labels = len(np.unique(y_true))
            cm = confusion_matrix(y_true, y_pred, labels=np.arange(num_labels))
            confusion_matrices.append(cm)

    if confusion_matrices:
        avg_cm = np.mean(confusion_matrices, axis=0)
        return avg_cm
    else:
        return None





# Custom class to capture print output
class PrintCapture(io.StringIO):
    def __init__(self):
        super().__init__()
        self.output = []

    def write(self, txt):
        self.output.append(txt)
        super().write(txt)

    def get_output(self):
        return ''.join(self.output)

# Function to load and display predefined images based on user selection
def display_predefined_images(percentage_idx):
    percentage = percentage_values[percentage_idx]
    raw_image_path = os.path.join(RAW_PATH, f"percentage_{percentage}_complexity_16.png")
    embeddings_image_path = os.path.join(EMBEDDINGS_PATH, f"percentage_{percentage}_complexity_16.png")
    
    # Check if the images exist
    if os.path.exists(raw_image_path):
        raw_image = Image.open(raw_image_path)
    else:
        raw_image = create_random_image()  # Use a fallback random image
    
    if os.path.exists(embeddings_image_path):
        embeddings_image = Image.open(embeddings_image_path)
    else:
        embeddings_image = create_random_image()  # Use a fallback random image

    return raw_image, embeddings_image

# Updated los_nlos_classification to handle missing outputs properly
def los_nlos_classification(file, percentage_idx):
    if file is not None:
        raw_cm_image, emb_cm_image, console_output = process_hdf5_file(file, percentage_idx)
        return raw_cm_image, emb_cm_image, console_output
    else:
        raw_image, embeddings_image = display_predefined_images(percentage_idx)
        return raw_image, embeddings_image, "No file uploaded. Displaying predefined images."

# Function to create random images for LoS/NLoS classification results
def create_random_image(size=(300, 300)):
    random_image = np.random.rand(*size, 3) * 255
    return Image.fromarray(random_image.astype('uint8'))

# Function to load the pre-trained model from your cloned repository
def load_custom_model():
    from lwm_model import LWM  # Assuming the model is defined in lwm_model.py
    model = LWM()  # Modify this according to your model initialization
    model.eval()
    return model

import importlib.util

# Function to dynamically load a Python module from a given file path
def load_module_from_path(module_name, file_path):
    spec = importlib.util.spec_from_file_location(module_name, file_path)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)
    return module

# Function to split dataset into training and test sets based on user selection
def split_dataset(channels, labels, percentage_idx):
    percentage = percentage_values[percentage_idx] / 100
    num_samples = channels.shape[0]
    train_size = int(num_samples * percentage)
    print(f'Number of Training Samples: {train_size}')
    
    indices = np.arange(num_samples)
    np.random.shuffle(indices)
    
    train_idx, test_idx = indices[:train_size], indices[train_size:]
    
    train_data, test_data = channels[train_idx], channels[test_idx]
    train_labels, test_labels = labels[train_idx], labels[test_idx]
    
    return train_data, test_data, train_labels, test_labels

# Function to calculate Euclidean distance between a point and a centroid
def euclidean_distance(x, centroid):
    return np.linalg.norm(x - centroid)

import torch

def classify_based_on_distance(train_data, train_labels, test_data):
    # Compute the centroids for the two classes
    centroid_0 = train_data[train_labels == 0].mean(dim=0)  # Use torch.mean
    centroid_1 = train_data[train_labels == 1].mean(dim=0)  # Use torch.mean
    
    predictions = []
    for test_point in test_data:
        # Compute Euclidean distance between the test point and each centroid
        dist_0 = euclidean_distance(test_point, centroid_0)
        dist_1 = euclidean_distance(test_point, centroid_1)
        predictions.append(0 if dist_0 < dist_1 else 1)
    
    return torch.tensor(predictions)  # Return predictions as a PyTorch tensor

# Function to generate confusion matrix plot
def plot_confusion_matrix(y_true, y_pred, title):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(5, 5))
    plt.imshow(cm, cmap='Blues')
    plt.title(title)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.colorbar()

    # Add labels for x and y ticks (Actual/Predicted class labels)
    plt.xticks([0, 1], labels=[0, 1])
    plt.yticks([0, 1], labels=[0, 1])

    # Annotate the confusion matrix
    thresh = cm.max() / 2  # Define threshold to choose text color (black or white)
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], 'd'),
                     ha="center", va="center",
                     color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.savefig(f"{title}.png")
    return Image.open(f"{title}.png")


def identical_train_test_split(output_emb, output_raw, labels, percentage_idx):
    N = output_emb.shape[0]  # Get the total number of samples
    
    # Generate the indices for shuffling and splitting
    indices = torch.randperm(N)  # Randomly shuffle the indices
    
    # Calculate the split index
    split_index = int(N * percentage_values[percentage_idx]/100)
    print(f'Training Size: {split_index}')
    
    # Split indices into train and test
    train_indices = indices[:split_index]  # First 80% for training
    test_indices = indices[split_index:]   # Remaining 20% for testing
    
    # Select the same indices from both output_emb and output_raw
    train_emb = output_emb[train_indices]
    test_emb = output_emb[test_indices]
    
    train_raw = output_raw[train_indices]
    test_raw = output_raw[test_indices]

    train_labels = labels[train_indices]
    test_labels = labels[test_indices]

    return train_emb, test_emb, train_raw, test_raw, train_labels, test_labels

# Store the original working directory when the app starts
original_dir = os.getcwd()

def process_hdf5_file(uploaded_file, percentage_idx):
    capture = PrintCapture()
    sys.stdout = capture  # Redirect print statements to capture
    
    try:
        model_repo_url = "https://huggingface.co/sadjadalikhani/LWM"
        model_repo_dir = "./LWM"

        # Step 1: Clone the repository if not already done
        if not os.path.exists(model_repo_dir):
            print(f"Cloning model repository from {model_repo_url}...")
            subprocess.run(["git", "clone", model_repo_url, model_repo_dir], check=True)

        # Step 2: Verify the repository was cloned and change the working directory
        repo_work_dir = os.path.join(original_dir, model_repo_dir)
        if os.path.exists(repo_work_dir):
            os.chdir(repo_work_dir)  # Change the working directory only once
            print(f"Changed working directory to {os.getcwd()}")
            print(f"Directory content: {os.listdir(os.getcwd())}")  # Debugging: Check repo content
        else:
            print(f"Directory {repo_work_dir} does not exist.")
            return
            
        # Step 3: Dynamically load lwm_model.py, input_preprocess.py, and inference.py
        lwm_model_path = os.path.join(os.getcwd(), 'lwm_model.py')
        input_preprocess_path = os.path.join(os.getcwd(), 'input_preprocess.py')
        inference_path = os.path.join(os.getcwd(), 'inference.py')

        # Load lwm_model
        lwm_model = load_module_from_path("lwm_model", lwm_model_path)

        # Load input_preprocess
        input_preprocess = load_module_from_path("input_preprocess", input_preprocess_path)

        # Load inference
        inference = load_module_from_path("inference", inference_path)

        # Step 4: Load the model from lwm_model module
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        print(f"Loading the LWM model on {device}...")
        model = lwm_model.LWM.from_pretrained(device=device).to(torch.float32)

        # Step 5: Load the HDF5 file and extract the channels and labels
        with h5py.File(uploaded_file.name, 'r') as f:
            channels = np.array(f['channels'])  # Assuming 'channels' dataset in the HDF5 file
            labels = np.array(f['labels'])  # Assuming 'labels' dataset in the HDF5 file
        print(f"Loaded dataset with {channels.shape[0]} samples.")

        # Step 7: Tokenize the data using the tokenizer from input_preprocess
        preprocessed_chs = input_preprocess.tokenizer(manual_data=channels)
        #print(preprocessed_chs[0][0][1])

        # Step 7: Perform inference using the functions from inference.py
        output_emb = inference.lwm_inference(preprocessed_chs, 'cls_emb', model)
        output_raw = inference.create_raw_dataset(preprocessed_chs, device)

        print(f"Output Embeddings Shape: {output_emb.shape}")
        print(f"Output Raw Shape: {output_raw.shape}")

        print(f'percentage_idx: {percentage_idx}')
        print(f'percentage_value: {percentage_values[percentage_idx]}')
        train_data_emb, test_data_emb, train_data_raw, test_data_raw, train_labels, test_labels = identical_train_test_split(output_emb.view(len(output_emb),-1),
                                                                                                                             output_raw.view(len(output_raw),-1),
                                                                                                                             labels,
                                                                                                                             percentage_idx)
        
        # Step 8: Perform classification using the Euclidean distance for both raw and embeddings
        print(f'train_data_emb: {train_data_emb.shape}')
        print(f'train_labels: {train_labels.shape}')
        print(f'test_data_emb: {test_data_emb.shape}')
        pred_raw = classify_based_on_distance(train_data_raw, train_labels, test_data_raw)
        pred_emb = classify_based_on_distance(train_data_emb, train_labels, test_data_emb)
        #print(f'pred_emb: {pred_emb}')
        #print(f'actual labels: {test_labels}')
        # Step 9: Generate confusion matrices for both raw and embeddings
        raw_cm_image = plot_confusion_matrix(test_labels, pred_raw, title="Confusion Matrix (Raw Channels)")
        emb_cm_image = plot_confusion_matrix(test_labels, pred_emb, title="Confusion Matrix (Embeddings)")

        return raw_cm_image, emb_cm_image, capture.get_output()

    except Exception as e:
        return str(e), str(e), capture.get_output()

    finally:
        # Always return to the original working directory after processing
        os.chdir(original_dir)
        sys.stdout = sys.__stdout__  # Reset print statements

# Define the Gradio interface
with gr.Blocks(css="""
    .slider-container {
        display: inline-block;
        margin-right: 50px;
        text-align: center;
    }
""") as demo:
    
    # Tab for Beam Prediction Task
    with gr.Tab("Beam Prediction Task"):
        gr.Markdown("### Beam Prediction Task")
        
        with gr.Row():
            with gr.Column():
                data_percentage_slider = gr.Slider(label="Data Percentage for Training", minimum=10, maximum=100, step=10, value=10)
                task_complexity_slider = gr.Slider(label="Task Complexity (Number of Beams)", minimum=16, maximum=256, value=16, choices=[16, 32, 64, 128, 256])

        with gr.Row():
            raw_img_bp = gr.Image(label="Raw Channels", type="pil", width=300, height=300)
            embeddings_img_bp = gr.Image(label="Embeddings", type="pil", width=300, height=300)

        # Update the confusion matrices whenever sliders change
        data_percentage_slider.change(fn=beam_prediction_task, inputs=[data_percentage_slider, task_complexity_slider], outputs=[raw_img_bp, embeddings_img_bp])
        task_complexity_slider.change(fn=beam_prediction_task, inputs=[data_percentage_slider, task_complexity_slider], outputs=[raw_img_bp, embeddings_img_bp])

    # Separate Tab for LoS/NLoS Classification Task
    with gr.Tab("LoS/NLoS Classification Task"):
        gr.Markdown("### LoS/NLoS Classification Task")
        file_input = gr.File(label="Upload HDF5 Dataset", file_types=[".h5"])

        with gr.Row():
            percentage_dropdown_los = gr.Dropdown(choices=[0, 1, 2, 3, 4, 5, 6, 7, 8], value=0, label="Percentage of Data for Training")
        with gr.Row():
            raw_img_los = gr.Image(label="Raw Channels", type="pil", width=300, height=300)
            embeddings_img_los = gr.Image(label="Embeddings", type="pil", width=300, height=300)
            output_textbox = gr.Textbox(label="Console Output", lines=10)

        # Placeholder for LoS/NLoS classification function (already implemented in your previous code)
        file_input.change(fn=los_nlos_classification, inputs=[file_input, percentage_dropdown_los], outputs=[raw_img_los, embeddings_img_los, output_textbox])

# Launch the app
if __name__ == "__main__":
    demo.launch()