File size: 12,678 Bytes
8f8b054
 
 
 
cacf045
2a77201
 
340b448
 
d1b5811
 
 
8f8b054
 
 
 
 
6d6f3c6
469d918
8f8b054
2a77201
 
 
 
 
 
 
 
 
 
 
 
 
8f8b054
6d6f3c6
8f8b054
6d6f3c6
 
8f8b054
 
 
 
 
e14ecb7
6d6f3c6
 
 
 
 
 
 
 
 
 
 
 
 
 
a392854
 
 
 
 
 
 
d1b5811
 
 
 
 
469d918
 
d1b5811
 
 
 
469d918
d1b5811
 
 
 
 
6d6f3c6
 
 
 
 
 
469d918
6d6f3c6
 
 
469d918
 
 
6d6f3c6
 
 
469d918
 
6d6f3c6
469d918
d1b5811
 
 
469d918
d1b5811
 
 
 
 
 
 
 
 
 
 
6d6f3c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df7ce72
 
 
798b3c0
2a77201
469d918
d2d9264
0176215
2587718
 
469d918
 
2587718
6d6f3c6
2587718
469d918
6d6f3c6
df7ce72
 
6d6f3c6
 
 
469d918
 
 
6d6f3c6
 
a392854
 
 
 
6d6f3c6
d1b5811
6d6f3c6
 
d1b5811
6d6f3c6
 
d1b5811
a392854
6d6f3c6
06b54b2
6d6f3c6
a392854
 
6d6f3c6
d1b5811
6d6f3c6
 
 
a392854
6d6f3c6
d1b5811
469d918
6d6f3c6
bef8ac3
 
 
6d6f3c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7e7ba0a
0176215
2a77201
 
 
6d6f3c6
df7ce72
6d6f3c6
8f8b054
469d918
d1b5811
8f8b054
d1b5811
8f8b054
d1b5811
8f8b054
6d6f3c6
8f8b054
6d6f3c6
 
 
 
 
 
8f8b054
6d6f3c6
 
8f8b054
 
 
 
 
bc8a6bd
 
 
 
 
 
 
 
8f8b054
bc8a6bd
8f8b054
 
 
 
469d918
8f8b054
 
469d918
6d6f3c6
469d918
6d6f3c6
 
 
 
 
8f8b054
 
 
469d918
d1b5811
469d918
8f8b054
 
469d918
6d6f3c6
469d918
6d6f3c6
 
 
 
469d918
d1b5811
 
8f8b054
 
 
6d6f3c6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
import gradio as gr
import os
from PIL import Image
import numpy as np
import pickle
import io
import sys
import torch
import subprocess
import h5py
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

# Paths to the predefined images folder
RAW_PATH = os.path.join("images", "raw")
EMBEDDINGS_PATH = os.path.join("images", "embeddings")

# Specific values for percentage of data for training
percentage_values = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]

# Custom class to capture print output
class PrintCapture(io.StringIO):
    def __init__(self):
        super().__init__()
        self.output = []

    def write(self, txt):
        self.output.append(txt)
        super().write(txt)

    def get_output(self):
        return ''.join(self.output)

# Function to load and display predefined images based on user selection
def display_predefined_images(percentage_idx):
    percentage = percentage_values[percentage_idx]
    raw_image_path = os.path.join(RAW_PATH, f"percentage_{percentage}_complexity_16.png")  # Assume complexity 16 for simplicity
    embeddings_image_path = os.path.join(EMBEDDINGS_PATH, f"percentage_{percentage}_complexity_16.png")
    
    raw_image = Image.open(raw_image_path)
    embeddings_image = Image.open(embeddings_image_path)
    
    return raw_image, embeddings_image

# Function to create random images for LoS/NLoS classification results
def create_random_image(size=(300, 300)):
    random_image = np.random.rand(*size, 3) * 255
    return Image.fromarray(random_image.astype('uint8'))

# Function to load the pre-trained model from your cloned repository
def load_custom_model():
    from lwm_model import LWM  # Assuming the model is defined in lwm_model.py
    model = LWM()  # Modify this according to your model initialization
    model.eval()
    return model

import importlib.util

# Function to dynamically load a Python module from a given file path
def load_module_from_path(module_name, file_path):
    spec = importlib.util.spec_from_file_location(module_name, file_path)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)
    return module

# Function to split dataset into training and test sets based on user selection
def split_dataset(channels, labels, percentage_idx):
    percentage = percentage_values[percentage_idx] / 100
    num_samples = channels.shape[0]
    train_size = int(num_samples * percentage)
    print(f'Number of Training Samples: {train_size}')
    
    indices = np.arange(num_samples)
    np.random.shuffle(indices)
    
    train_idx, test_idx = indices[:train_size], indices[train_size:]
    
    train_data, test_data = channels[train_idx], channels[test_idx]
    train_labels, test_labels = labels[train_idx], labels[test_idx]
    
    return train_data, test_data, train_labels, test_labels

# Function to calculate Euclidean distance between a point and a centroid
def euclidean_distance(x, centroid):
    return np.linalg.norm(x - centroid)

import torch

def classify_based_on_distance(train_data, train_labels, test_data):
    # Compute the centroids for the two classes
    centroid_0 = train_data[train_labels == 0].mean(dim=0)  # Use torch.mean
    centroid_1 = train_data[train_labels == 1].mean(dim=0)  # Use torch.mean
    
    predictions = []
    for test_point in test_data:
        # Compute Euclidean distance between the test point and each centroid
        dist_0 = euclidean_distance(test_point, centroid_0)
        dist_1 = euclidean_distance(test_point, centroid_1)
        predictions.append(0 if dist_0 < dist_1 else 1)
    
    return torch.tensor(predictions)  # Return predictions as a PyTorch tensor

# Function to generate confusion matrix plot
def plot_confusion_matrix(y_true, y_pred, title):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(5, 5))
    plt.imshow(cm, cmap='Blues')
    plt.title(title)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.colorbar()
    plt.xticks([0, 1], labels=[0, 1])
    plt.yticks([0, 1], labels=[0, 1])
    plt.tight_layout()
    plt.savefig(f"{title}.png")
    return Image.open(f"{title}.png")

def identical_train_test_split(output_emb, output_raw, labels, percentage):
    N = output_emb.shape[0]  # Get the total number of samples
    
    # Generate the indices for shuffling and splitting
    indices = torch.randperm(N)  # Randomly shuffle the indices
    
    # Calculate the split index
    split_index = int(N * percentage)
    
    # Split indices into train and test
    train_indices = indices[:split_index]  # First 80% for training
    test_indices = indices[split_index:]   # Remaining 20% for testing
    
    # Select the same indices from both output_emb and output_raw
    train_emb = output_emb[train_indices]
    test_emb = output_emb[test_indices]
    
    train_raw = output_raw[train_indices]
    test_raw = output_raw[test_indices]

    train_labels = labels[train_indices]
    test_labels = labels[test_indices]

    return train_emb, test_emb, train_raw, test_raw, train_labels, test_labels

# Store the original working directory when the app starts
original_dir = os.getcwd()

def process_hdf5_file(uploaded_file, percentage_idx):
    capture = PrintCapture()
    sys.stdout = capture  # Redirect print statements to capture
    
    try:
        model_repo_url = "https://huggingface.co/sadjadalikhani/LWM"
        model_repo_dir = "./LWM"

        # Step 1: Clone the repository if not already done
        if not os.path.exists(model_repo_dir):
            print(f"Cloning model repository from {model_repo_url}...")
            subprocess.run(["git", "clone", model_repo_url, model_repo_dir], check=True)

        # Step 2: Verify the repository was cloned and change the working directory
        repo_work_dir = os.path.join(original_dir, model_repo_dir)
        if os.path.exists(repo_work_dir):
            os.chdir(repo_work_dir)  # Change the working directory only once
            print(f"Changed working directory to {os.getcwd()}")
            print(f"Directory content: {os.listdir(os.getcwd())}")  # Debugging: Check repo content
        else:
            print(f"Directory {repo_work_dir} does not exist.")
            return
            
        # Step 3: Dynamically load lwm_model.py, input_preprocess.py, and inference.py
        lwm_model_path = os.path.join(os.getcwd(), 'lwm_model.py')
        input_preprocess_path = os.path.join(os.getcwd(), 'input_preprocess.py')
        inference_path = os.path.join(os.getcwd(), 'inference.py')

        # Load lwm_model
        lwm_model = load_module_from_path("lwm_model", lwm_model_path)

        # Load input_preprocess
        input_preprocess = load_module_from_path("input_preprocess", input_preprocess_path)

        # Load inference
        inference = load_module_from_path("inference", inference_path)

        # Step 4: Load the model from lwm_model module
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        print(f"Loading the LWM model on {device}...")
        model = lwm_model.LWM.from_pretrained(device=device)

        # Step 5: Load the HDF5 file and extract the channels and labels
        with h5py.File(uploaded_file.name, 'r') as f:
            channels = np.array(f['channels'])  # Assuming 'channels' dataset in the HDF5 file
            labels = np.array(f['labels'])  # Assuming 'labels' dataset in the HDF5 file
        print(f"Loaded dataset with {channels.shape[0]} samples.")

        # Step 7: Tokenize the data using the tokenizer from input_preprocess
        preprocessed_chs = input_preprocess.tokenizer(manual_data=channels)

        # Step 7: Perform inference using the functions from inference.py
        output_emb = inference.lwm_inference(preprocessed_chs, 'channel_emb', model)
        output_raw = inference.create_raw_dataset(preprocessed_chs, device)

        print(f"Output Embeddings Shape: {output_emb.shape}")
        print(f"Output Raw Shape: {output_raw.shape}")

        train_data_emb, test_data_emb, train_data_raw, test_data_raw, train_labels, test_labels = identical_train_test_split(output_emb.view(len(output_emb),-1),
                                                                                                                             output_raw.view(len(output_raw),-1),
                                                                                                                             labels,
                                                                                                                             percentage_idx)
        
        # Step 8: Perform classification using the Euclidean distance for both raw and embeddings
        pred_raw = classify_based_on_distance(train_data_raw, train_labels, test_data_raw)
        pred_emb = classify_based_on_distance(train_data_emb, train_labels, test_data_emb)

        # Step 9: Generate confusion matrices for both raw and embeddings
        raw_cm_image = plot_confusion_matrix(test_labels, pred_raw, title="Confusion Matrix (Raw Channels)")
        emb_cm_image = plot_confusion_matrix(test_labels, pred_emb, title="Confusion Matrix (Embeddings)")

        return raw_cm_image, emb_cm_image, capture.get_output()

    except Exception as e:
        return str(e), str(e), capture.get_output()

    finally:
        # Always return to the original working directory after processing
        os.chdir(original_dir)
        sys.stdout = sys.__stdout__  # Reset print statements

# Function to handle logic based on whether a file is uploaded or not
def los_nlos_classification(file, percentage_idx):
    if file is not None:
        return process_hdf5_file(file, percentage_idx)
    else:
        return display_predefined_images(percentage_idx), None

# Define the Gradio interface
with gr.Blocks(css="""
    .vertical-slider input[type=range] {
        writing-mode: bt-lr; /* IE */
        -webkit-appearance: slider-vertical; /* WebKit */
        width: 8px;
        height: 200px;
    }
    .slider-container {
        display: inline-block;
        margin-right: 50px;
        text-align: center;
    }
""") as demo:
    
    # Contact Section
    gr.Markdown("""
        <div style="text-align: center;">
            <a target="_blank" href="https://www.wi-lab.net">
                <img src="https://www.wi-lab.net/wp-content/uploads/2021/08/WI-name.png" alt="Wireless Model" style="height: 30px;">
            </a>
            <a target="_blank" href="mailto:alikhani@asu.edu" style="margin-left: 10px;">
                <img src="https://img.shields.io/badge/email-alikhani@asu.edu-blue.svg?logo=gmail" alt="Email">
            </a>
        </div>
    """)
    
    # Tabs for Beam Prediction and LoS/NLoS Classification
    with gr.Tab("Beam Prediction Task"):
        gr.Markdown("### Beam Prediction Task")
        
        with gr.Row():
            with gr.Column(elem_id="slider-container"):
                gr.Markdown("Percentage of Data for Training")
                percentage_slider_bp = gr.Slider(minimum=0, maximum=4, step=1, value=0, interactive=True, elem_id="vertical-slider")

        with gr.Row():
            raw_img_bp = gr.Image(label="Raw Channels", type="pil", width=300, height=300, interactive=False)
            embeddings_img_bp = gr.Image(label="Embeddings", type="pil", width=300, height=300, interactive=False)

        percentage_slider_bp.change(fn=display_predefined_images, inputs=[percentage_slider_bp], outputs=[raw_img_bp, embeddings_img_bp])

    with gr.Tab("LoS/NLoS Classification Task"):
        gr.Markdown("### LoS/NLoS Classification Task")
        
        file_input = gr.File(label="Upload HDF5 Dataset", file_types=[".h5"])

        with gr.Row():
            with gr.Column(elem_id="slider-container"):
                gr.Markdown("Percentage of Data for Training")
                percentage_slider_los = gr.Slider(minimum=0, maximum=4, step=1, value=0, interactive=True, elem_id="vertical-slider")

        with gr.Row():
            raw_img_los = gr.Image(label="Raw Channels", type="pil", width=300, height=300, interactive=False)
            embeddings_img_los = gr.Image(label="Embeddings", type="pil", width=300, height=300, interactive=False)
            output_textbox = gr.Textbox(label="Console Output", lines=10)

        file_input.change(fn=los_nlos_classification, inputs=[file_input, percentage_slider_los], outputs=[raw_img_los, embeddings_img_los, output_textbox])
        percentage_slider_los.change(fn=los_nlos_classification, inputs=[file_input, percentage_slider_los], outputs=[raw_img_los, embeddings_img_los, output_textbox])

# Launch the app
if __name__ == "__main__":
    demo.launch()