iBrokeTheCode commited on
Commit
43fe501
·
1 Parent(s): df5c96c

chore: Add tests cases

Browse files
tests/__init__.py ADDED
File without changes
tests/test_classifiers_classic_ml.py ADDED
@@ -0,0 +1,106 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from unittest.mock import patch
2
+
3
+ import pytest
4
+ from sklearn.datasets import make_classification
5
+ from sklearn.decomposition import PCA
6
+ from sklearn.ensemble import RandomForestClassifier
7
+ from sklearn.linear_model import LogisticRegression
8
+ from sklearn.model_selection import train_test_split
9
+
10
+ from src.classifiers_classic_ml import train_and_evaluate_model, visualize_embeddings
11
+
12
+ ####################################################################################################
13
+ ################################### Test the Classical ML Models ###################################
14
+ ####################################################################################################
15
+
16
+
17
+ @pytest.fixture
18
+ def sample_embedding_data():
19
+ """
20
+ Fixture to create a mock dataset for testing dimensionality reduction and model training.
21
+ Returns:
22
+ X_train, X_test, y_train, y_test: Training and testing data along with labels.
23
+ """
24
+ # Create a synthetic dataset with 20 samples, 6 features, and 3 classes
25
+ X, y = make_classification(
26
+ n_samples=20, n_features=6, n_classes=3, random_state=42, n_informative=4
27
+ )
28
+
29
+ # Split the dataset into training and test sets (80% train, 20% test)
30
+ X_train, X_test, y_train, y_test = train_test_split(
31
+ X, y, test_size=0.2, random_state=42
32
+ )
33
+
34
+ return X_train, X_test, y_train, y_test
35
+
36
+
37
+ @pytest.mark.parametrize(
38
+ "method, plot_type",
39
+ [
40
+ ("PCA", "2D"), # PCA reduction to 2D
41
+ ("PCA", "3D"), # PCA reduction to 3D
42
+ ],
43
+ )
44
+ def test_visualize_embeddings(method, plot_type, sample_embedding_data):
45
+ """
46
+ Test the dimensionality reduction and embedding visualization.
47
+ This ensures that PCA can reduce embeddings correctly and produce visualizations.
48
+ """
49
+ X_train, X_test, y_train, y_test = sample_embedding_data
50
+
51
+ # Mock the plotly figures to avoid actual plotting in test environment
52
+ with patch("plotly.graph_objs.Figure.show"):
53
+ # Test the visualize_embeddings function
54
+ model = visualize_embeddings(
55
+ X_train, X_test, y_train, y_test, plot_type=plot_type, method=method
56
+ )
57
+
58
+ # Check if the PCA model is an instance of the correct class and has the expected number of components
59
+ assert isinstance(model, PCA), "The model should be an instance of PCA"
60
+ if plot_type == "2D":
61
+ assert model.n_components_ == 2, "PCA should reduce data to 2 components"
62
+ elif plot_type == "3D":
63
+ assert model.n_components_ == 3, "PCA should reduce data to 3 components"
64
+
65
+
66
+ def test_train_and_evaluate_model(sample_embedding_data):
67
+ """
68
+ Test the training and evaluation of models (Logistic Regression, Random Forest).
69
+ Ensures that models are correctly trained and returned in the expected format.
70
+ """
71
+ X_train, X_test, y_train, y_test = sample_embedding_data
72
+
73
+ # Train and evaluate the models
74
+ trained_models = train_and_evaluate_model(
75
+ X_train, X_test, y_train, y_test, test=False
76
+ )
77
+
78
+ # Verify that trained_models is a list
79
+ assert isinstance(trained_models, list), (
80
+ "The output should be a list of trained models"
81
+ )
82
+
83
+ # Check that at least two models were trained (Logistic Regression, Random Forest)
84
+ assert len(trained_models) >= 2, "At least two models should be trained"
85
+
86
+ # Check that the models have Logistic Regression and Random Forest
87
+ models_instances = [model for _, model in trained_models]
88
+ assert any(isinstance(model, LogisticRegression) for model in models_instances), (
89
+ "Logistic Regression model not found"
90
+ )
91
+ assert any(
92
+ isinstance(model, RandomForestClassifier) for model in models_instances
93
+ ), "Random Forest model not found"
94
+
95
+ # Ensure that the trained models are indeed fitted (trained)
96
+ for name, model in trained_models:
97
+ assert hasattr(model, "fit"), f"{name} should have a fit method"
98
+ assert hasattr(model, "predict"), f"{name} should have a predict method"
99
+
100
+ # Check if the model is correctly trained by predicting on the test set
101
+ y_pred = model.predict(X_test)
102
+ assert y_pred is not None, f"{name} should have successfully made predictions"
103
+
104
+
105
+ if __name__ == "__main__":
106
+ pytest.main()
tests/test_classifiers_mlp.py ADDED
@@ -0,0 +1,626 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # import numpy as np
2
+ # from sklearn.decomposition import PCA
3
+ # from sklearn.manifold import TSNE
4
+ # from src.classifiers_classic_ml import visualize_embeddings, train_and_evaluate_model
5
+
6
+ import os
7
+
8
+ import pandas as pd
9
+ import pytest
10
+ from sklearn.datasets import make_classification
11
+ from sklearn.metrics import accuracy_score, f1_score
12
+ from sklearn.model_selection import train_test_split
13
+ from sklearn.preprocessing import LabelEncoder
14
+ from tensorflow.keras.layers import BatchNormalization, Concatenate, Dense, Dropout
15
+ from tensorflow.keras.losses import CategoricalCrossentropy
16
+ from tensorflow.keras.models import Model
17
+ from tensorflow.keras.optimizers import SGD, Adam
18
+
19
+ from src.classifiers_mlp import MultimodalDataset, create_early_fusion_model, train_mlp
20
+
21
+ ####################################################################################################
22
+ ##################################### Test the Keras MLP Models ####################################
23
+ ####################################################################################################
24
+
25
+
26
+ @pytest.fixture
27
+ def correlated_sample_data():
28
+ """
29
+ Fixture to create a correlated synthetic dataset using make_classification for testing.
30
+ It generates data with 10 text features and 10 image features.
31
+ Returns:
32
+ train_df (pd.DataFrame): DataFrame with train data.
33
+ test_df (pd.DataFrame): DataFrame with test data.
34
+ """
35
+ # Create synthetic multi-class data with 8 features (4 text-like, 4 image-like)
36
+ X, y = make_classification(
37
+ n_samples=20, n_features=8, n_informative=6, n_classes=3, random_state=42
38
+ )
39
+
40
+ # Rename features to simulate text and image columns
41
+ feature_names = [f"text_{i}" for i in range(4)] + [
42
+ f"image_{i}" for i in range(4, 8)
43
+ ]
44
+
45
+ # Create a DataFrame and assign class labels
46
+ df = pd.DataFrame(X, columns=feature_names)
47
+ df["class_id"] = y
48
+
49
+ # Split into train and test sets
50
+ train_df, test_df = train_test_split(df, test_size=0.3, random_state=42)
51
+
52
+ return train_df, test_df
53
+
54
+
55
+ @pytest.fixture
56
+ def label_encoder(correlated_sample_data):
57
+ """
58
+ Fixture to create a label encoder based on the training data.
59
+ """
60
+ train_df, _ = correlated_sample_data
61
+ label_encoder = LabelEncoder()
62
+ label_encoder.fit(train_df["class_id"])
63
+ return label_encoder
64
+
65
+
66
+ def test_multimodal_dataset_image_only(correlated_sample_data, label_encoder):
67
+ """
68
+ Test the MultimodalDataset class with only image data.
69
+ """
70
+ train_df, test_df = correlated_sample_data
71
+
72
+ # Image columns (the second 4 features)
73
+ image_columns = [f"image_{i}" for i in range(4, 8)]
74
+ label_column = "class_id"
75
+
76
+ # Create the dataset
77
+ train_dataset = MultimodalDataset(
78
+ train_df,
79
+ text_cols=None,
80
+ image_cols=image_columns,
81
+ label_col=label_column,
82
+ encoder=label_encoder,
83
+ )
84
+
85
+ # Check if the dataset is correctly instantiated
86
+ assert train_dataset.image_data is not None, "Image data should be instantiated"
87
+ assert train_dataset.text_data is None, "Text data should be None"
88
+
89
+ # Fetch a batch of data
90
+ (batch_inputs, batch_labels) = train_dataset[0]
91
+
92
+ assert "image" in batch_inputs, "Batch should contain image data"
93
+ assert "text" not in batch_inputs, "Batch should not contain text data"
94
+ assert batch_inputs["image"].shape[1] == len(image_columns), (
95
+ "Image data shape is incorrect"
96
+ )
97
+ assert batch_labels is not None, "Batch should contain labels"
98
+ assert batch_labels.shape[0] == batch_inputs["image"].shape[0], (
99
+ "Labels should match the batch size"
100
+ )
101
+
102
+
103
+ def test_multimodal_dataset_text_only(correlated_sample_data, label_encoder):
104
+ """
105
+ Test the MultimodalDataset class with only text data.
106
+ """
107
+ train_df, test_df = correlated_sample_data
108
+
109
+ # Text columns (the first 4 features)
110
+ text_columns = [f"text_{i}" for i in range(4)]
111
+ label_column = "class_id"
112
+
113
+ # Create the dataset
114
+ train_dataset = MultimodalDataset(
115
+ train_df,
116
+ text_cols=text_columns,
117
+ image_cols=None,
118
+ label_col=label_column,
119
+ encoder=label_encoder,
120
+ )
121
+
122
+ # Check if the dataset is correctly instantiated
123
+ assert train_dataset.text_data is not None, "Text data should be instantiated"
124
+ assert train_dataset.image_data is None, "Image data should be None"
125
+
126
+ # Fetch a batch of data
127
+ (batch_inputs, batch_labels) = train_dataset[0]
128
+
129
+ assert "text" in batch_inputs, "Batch should contain text data"
130
+ assert "image" not in batch_inputs, "Batch should not contain image data"
131
+ assert batch_inputs["text"].shape[1] == len(text_columns), (
132
+ "Text data shape is incorrect"
133
+ )
134
+ assert batch_labels is not None, "Batch should contain labels"
135
+ assert batch_labels.shape[0] == batch_inputs["text"].shape[0], (
136
+ "Labels should match the batch size"
137
+ )
138
+
139
+
140
+ def test_multimodal_dataset_multimodal(correlated_sample_data, label_encoder):
141
+ """
142
+ Test the MultimodalDataset class with both text and image data.
143
+ """
144
+ train_df, test_df = correlated_sample_data
145
+
146
+ # Text and image columns
147
+ text_columns = [f"text_{i}" for i in range(4)]
148
+ image_columns = [f"image_{i}" for i in range(4, 8)]
149
+ label_column = "class_id"
150
+
151
+ # Create the dataset
152
+ train_dataset = MultimodalDataset(
153
+ train_df,
154
+ text_cols=text_columns,
155
+ image_cols=image_columns,
156
+ label_col=label_column,
157
+ encoder=label_encoder,
158
+ )
159
+
160
+ # Check if the dataset is correctly instantiated
161
+ assert train_dataset.text_data is not None, "Text data should be instantiated"
162
+ assert train_dataset.image_data is not None, "Image data should be instantiated"
163
+
164
+ # Fetch a batch of data
165
+ (batch_inputs, batch_labels) = train_dataset[0]
166
+ assert "text" in batch_inputs, "Batch should contain text data"
167
+ assert "image" in batch_inputs, "Batch should contain image data"
168
+ assert batch_inputs["text"].shape[1] == len(text_columns), (
169
+ "Text data shape is incorrect"
170
+ )
171
+ assert batch_inputs["image"].shape[1] == len(image_columns), (
172
+ "Image data shape is incorrect"
173
+ )
174
+ assert batch_labels is not None, "Batch should contain labels"
175
+ assert (
176
+ batch_labels.shape[0]
177
+ == batch_inputs["text"].shape[0]
178
+ == batch_inputs["image"].shape[0]
179
+ ), "Labels should match the batch size"
180
+
181
+
182
+ def test_create_early_fusion_model_single_modality_image():
183
+ """
184
+ Test the model creation with only image input or only text input.
185
+ Ensure the architecture matches expectations.
186
+ """
187
+ text_input_size = None
188
+ image_input_size = 4
189
+ output_size = 3
190
+
191
+ # Create the model
192
+ model = create_early_fusion_model(
193
+ text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3
194
+ )
195
+
196
+ # Check if the model has the expected number of layers
197
+ assert isinstance(model, Model), "Model should be a Keras Model instance"
198
+
199
+ # Check that the input and output shapes are consistent
200
+ assert model.input_shape == (None, image_input_size), (
201
+ "Input shape should match image input size"
202
+ )
203
+ assert model.output_shape == (None, output_size), (
204
+ "Output shape should match number of classes"
205
+ )
206
+
207
+ # Check that there are the correct number of Dense, Dropout, and BatchNormalization layers
208
+ dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)]
209
+ dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)]
210
+ batchnorm_layers = [
211
+ layer for layer in model.layers if isinstance(layer, BatchNormalization)
212
+ ]
213
+
214
+ assert len(dense_layers) == 3, (
215
+ "There should be 3 Dense layers (2 hidden + 1 output)"
216
+ )
217
+ assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers"
218
+ assert len(batchnorm_layers) > 0, (
219
+ "There should be at least 1 BatchNormalization layer"
220
+ )
221
+
222
+
223
+ def test_create_early_fusion_model_single_modality_text():
224
+ """
225
+ Test the model creation with only image input or only text input.
226
+ Ensure the architecture matches expectations.
227
+ """
228
+ text_input_size = 4
229
+ image_input_size = None
230
+ output_size = 3
231
+
232
+ # Create the model
233
+ model = create_early_fusion_model(
234
+ text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3
235
+ )
236
+
237
+ # Check if the model has the expected number of layers
238
+ assert isinstance(model, Model), "Model should be a Keras Model instance"
239
+
240
+ # Check that the input and output shapes are consistent
241
+ assert model.input_shape == (None, text_input_size), (
242
+ "Input shape should match text input size"
243
+ )
244
+ assert model.output_shape == (None, output_size), (
245
+ "Output shape should match number of classes"
246
+ )
247
+
248
+ # Check that there are the correct number of Dense, Dropout, and BatchNormalization layers
249
+ dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)]
250
+ dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)]
251
+ batchnorm_layers = [
252
+ layer for layer in model.layers if isinstance(layer, BatchNormalization)
253
+ ]
254
+
255
+ assert len(dense_layers) == 3, (
256
+ "There should be 3 Dense layers (2 hidden + 1 output)"
257
+ )
258
+ assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers"
259
+ assert len(batchnorm_layers) > 0, (
260
+ "There should be at least 1 BatchNormalization layer"
261
+ )
262
+
263
+
264
+ def test_create_early_fusion_model_multimodal():
265
+ """
266
+ Test the model creation with both text and image input.
267
+ Ensure the architecture matches expectations.
268
+ """
269
+ text_input_size = 4
270
+ image_input_size = 4
271
+ output_size = 3
272
+
273
+ # Create the model
274
+ model = create_early_fusion_model(
275
+ text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3
276
+ )
277
+
278
+ # Check if the model has the expected number of layers
279
+ assert isinstance(model, Model), "Model should be a Keras Model instance"
280
+
281
+ # Check that the input and output shapes are consistent
282
+ assert model.input_shape == [(None, text_input_size), (None, image_input_size)], (
283
+ "Input shape should match both text and image input sizes"
284
+ )
285
+ assert model.output_shape == (None, output_size), (
286
+ "Output shape should match number of classes"
287
+ )
288
+
289
+ # Check that the concatenation of text and image inputs is present
290
+ assert any(isinstance(layer, Concatenate) for layer in model.layers), (
291
+ "There should be a Concatenate layer for text and image inputs"
292
+ )
293
+
294
+ # Check that there are the correct number of Dense, Dropout, and BatchNormalization layers
295
+ dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)]
296
+ dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)]
297
+ batchnorm_layers = [
298
+ layer for layer in model.layers if isinstance(layer, BatchNormalization)
299
+ ]
300
+
301
+ assert len(dense_layers) == 3, (
302
+ "There should be 3 Dense layers (2 hidden + 1 output)"
303
+ )
304
+ assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers"
305
+ assert len(batchnorm_layers) > 0, (
306
+ "There should be at least 1 BatchNormalization layer"
307
+ )
308
+
309
+
310
+ def test_train_mlp_single_modality_image(correlated_sample_data, label_encoder):
311
+ """
312
+ Test the MLP training with only image data.
313
+ Ensure the model trains and evaluates correctly.
314
+ """
315
+ train_df, test_df = correlated_sample_data
316
+
317
+ # Image columns (the second 10 features)
318
+ image_columns = [f"image_{i}" for i in range(4, 8)]
319
+ label_column = "class_id"
320
+
321
+ # Create datasets
322
+ train_dataset = MultimodalDataset(
323
+ train_df,
324
+ text_cols=None,
325
+ image_cols=image_columns,
326
+ label_col=label_column,
327
+ encoder=label_encoder,
328
+ )
329
+ test_dataset = MultimodalDataset(
330
+ test_df,
331
+ text_cols=None,
332
+ image_cols=image_columns,
333
+ label_col=label_column,
334
+ encoder=label_encoder,
335
+ )
336
+
337
+ image_input_size = len(image_columns)
338
+ output_size = len(label_encoder.classes_)
339
+
340
+ # Train the model
341
+ model, test_accuracy, f1, macro_auc = train_mlp(
342
+ train_loader=train_dataset,
343
+ test_loader=test_dataset,
344
+ text_input_size=None,
345
+ image_input_size=image_input_size,
346
+ output_size=output_size,
347
+ num_epochs=1,
348
+ set_weights=True,
349
+ adam=True,
350
+ patience=10,
351
+ save_results=False,
352
+ train_model=False,
353
+ test_mlp_model=False,
354
+ )
355
+
356
+ # Check model
357
+ assert model is not None, "Model should not be None after training."
358
+
359
+ # Ensure the model is compiled with the correct loss and optimizer
360
+ assert (
361
+ isinstance(model.loss, CategoricalCrossentropy)
362
+ or model.loss == "categorical_crossentropy"
363
+ ), f"Loss function should be categorical crossentropy, but got {model.loss}"
364
+
365
+ # Check model input and output shapes
366
+ assert model.input_shape == (None, image_input_size), (
367
+ "Input shape should match image input size"
368
+ )
369
+ assert model.output_shape == (None, output_size), (
370
+ "Output shape should match number of classes"
371
+ )
372
+
373
+ # Check if the model is compiled with the correct optimizer
374
+ assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), (
375
+ f"Optimizer should be Adam or SGD, but got {model.optimizer}"
376
+ )
377
+
378
+
379
+ def test_train_mlp_single_modality_text(correlated_sample_data, label_encoder):
380
+ """
381
+ Test the MLP training with only text data.
382
+ Ensure the model trains and evaluates correctly.
383
+ """
384
+ train_df, test_df = correlated_sample_data
385
+
386
+ # Text columns (the first 10 features)
387
+ text_columns = [f"text_{i}" for i in range(4)]
388
+ label_column = "class_id"
389
+
390
+ # Create datasets
391
+ train_dataset = MultimodalDataset(
392
+ train_df,
393
+ text_cols=text_columns,
394
+ image_cols=None,
395
+ label_col=label_column,
396
+ encoder=label_encoder,
397
+ )
398
+ test_dataset = MultimodalDataset(
399
+ test_df,
400
+ text_cols=text_columns,
401
+ image_cols=None,
402
+ label_col=label_column,
403
+ encoder=label_encoder,
404
+ )
405
+
406
+ text_input_size = len(text_columns)
407
+ output_size = len(label_encoder.classes_)
408
+
409
+ # Train the model
410
+ model, test_accuracy, f1, macro_auc = train_mlp(
411
+ train_loader=train_dataset,
412
+ test_loader=test_dataset,
413
+ text_input_size=text_input_size,
414
+ image_input_size=None,
415
+ output_size=output_size,
416
+ num_epochs=1,
417
+ set_weights=True,
418
+ adam=True,
419
+ patience=10,
420
+ save_results=False,
421
+ train_model=False,
422
+ test_mlp_model=False,
423
+ )
424
+
425
+ # Check model
426
+ assert model is not None, "Model should not be None after training."
427
+
428
+ # Ensure the model is compiled with the correct loss and optimizer
429
+ assert (
430
+ isinstance(model.loss, CategoricalCrossentropy)
431
+ or model.loss == "categorical_crossentropy"
432
+ ), f"Loss function should be categorical crossentropy, but got {model.loss}"
433
+
434
+ # Check model input and output shapes
435
+ assert model.input_shape == (None, text_input_size), (
436
+ "Input shape should match text input size"
437
+ )
438
+ assert model.output_shape == (None, output_size), (
439
+ "Output shape should match number of classes"
440
+ )
441
+
442
+ # Check if the model is compiled with the correct optimizer
443
+ assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), (
444
+ f"Optimizer should be Adam or SGD, but got {model.optimizer}"
445
+ )
446
+
447
+
448
+ def test_train_mlp_multimodal(correlated_sample_data, label_encoder):
449
+ """
450
+ Test the MLP training with class weights for an imbalanced dataset.
451
+ Ensure class weights are applied correctly and early stopping works.
452
+ """
453
+ train_df, test_df = correlated_sample_data
454
+
455
+ # Text and image columns
456
+ text_columns = [f"text_{i}" for i in range(4)]
457
+ image_columns = [f"image_{i}" for i in range(4, 8)]
458
+ label_column = "class_id"
459
+
460
+ # Create datasets
461
+ train_dataset = MultimodalDataset(
462
+ train_df,
463
+ text_cols=text_columns,
464
+ image_cols=image_columns,
465
+ label_col=label_column,
466
+ encoder=label_encoder,
467
+ )
468
+ test_dataset = MultimodalDataset(
469
+ test_df,
470
+ text_cols=text_columns,
471
+ image_cols=image_columns,
472
+ label_col=label_column,
473
+ encoder=label_encoder,
474
+ )
475
+
476
+ text_input_size = len(text_columns)
477
+ image_input_size = len(image_columns)
478
+ output_size = len(label_encoder.classes_)
479
+
480
+ # Train the model
481
+ model, test_accuracy, f1, macro_auc = train_mlp(
482
+ train_loader=train_dataset,
483
+ test_loader=test_dataset,
484
+ text_input_size=text_input_size,
485
+ image_input_size=image_input_size,
486
+ output_size=output_size,
487
+ num_epochs=1,
488
+ set_weights=True,
489
+ adam=True,
490
+ patience=10,
491
+ save_results=False,
492
+ train_model=False,
493
+ test_mlp_model=False,
494
+ )
495
+
496
+ # Check model
497
+ assert model is not None, "Model should not be None after training."
498
+
499
+ # Ensure the model is compiled with the correct loss and optimizer
500
+ assert (
501
+ isinstance(model.loss, CategoricalCrossentropy)
502
+ or model.loss == "categorical_crossentropy"
503
+ ), f"Loss function should be categorical crossentropy, but got {model.loss}"
504
+
505
+ # Check model input and output shapes
506
+ assert model.input_shape == [(None, text_input_size), (None, image_input_size)], (
507
+ "Input shape should match both text and image input sizes"
508
+ )
509
+ assert model.output_shape == (None, output_size), (
510
+ "Output shape should match number of classes"
511
+ )
512
+
513
+ # Check if the model is compiled with the correct optimizer
514
+ assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), (
515
+ f"Optimizer should be Adam or SGD, but got {model.optimizer}"
516
+ )
517
+
518
+
519
+ # Check if the result files are correctly saved
520
+ def test_result_files():
521
+ """
522
+ Test if the result files are created for each modality and have the correct format.
523
+ """
524
+ # Get the absolute path of the directory where this test file is located
525
+ test_dir = os.path.dirname(os.path.abspath(__file__))
526
+
527
+ # Paths for result files relative to the test file location
528
+ multimodal_results_path = os.path.join(
529
+ test_dir, "../results/multimodal_results.csv"
530
+ )
531
+ text_results_path = os.path.join(test_dir, "../results/text_results.csv")
532
+ image_results_path = os.path.join(test_dir, "../results/image_results.csv")
533
+
534
+ # Check if the files exist
535
+ assert os.path.exists(multimodal_results_path), "Multimodal result file is missing!"
536
+ assert os.path.exists(text_results_path), "Text result file is missing!"
537
+ assert os.path.exists(image_results_path), "Image result file is missing!"
538
+
539
+ # Check if the files are not empty and in correct format (CSV)
540
+ for file_path in [multimodal_results_path, text_results_path, image_results_path]:
541
+ df = pd.read_csv(file_path)
542
+ assert not df.empty, f"{file_path} is empty!"
543
+ assert "Predictions" in df.columns and "True Labels" in df.columns, (
544
+ f"{file_path} is not in the correct format!"
545
+ )
546
+
547
+
548
+ # Check if the accuracy and F1 scores meet the specified thresholds
549
+ def test_model_performance():
550
+ """
551
+ Test if the accuracy and F1 score are above the required thresholds.
552
+ """
553
+ # Get the absolute path of the directory where this test file is located
554
+ test_dir = os.path.dirname(os.path.abspath(__file__))
555
+
556
+ # Paths for result files relative to the test file location
557
+ multimodal_results_path = os.path.join(
558
+ test_dir, "../results/multimodal_results.csv"
559
+ )
560
+ text_results_path = os.path.join(test_dir, "../results/text_results.csv")
561
+ image_results_path = os.path.join(test_dir, "../results/image_results.csv")
562
+
563
+ # Load the result files
564
+ multimodal_results = pd.read_csv(multimodal_results_path)
565
+ text_results = pd.read_csv(text_results_path)
566
+ image_results = pd.read_csv(image_results_path)
567
+
568
+ # Define the accuracy and F1-score thresholds
569
+ multimodal_accuracy_threshold = 0.85
570
+ multimodal_f1_threshold = 0.80
571
+ text_accuracy_threshold = 0.85
572
+ text_f1_threshold = 0.80
573
+ image_accuracy_threshold = 0.75
574
+ image_f1_threshold = 0.70
575
+
576
+ # Calculate accuracy and F1 score for multimodal results
577
+ multimodal_accuracy = accuracy_score(
578
+ multimodal_results["True Labels"], multimodal_results["Predictions"]
579
+ )
580
+ multimodal_f1 = f1_score(
581
+ multimodal_results["True Labels"],
582
+ multimodal_results["Predictions"],
583
+ average="macro",
584
+ )
585
+
586
+ # Calculate accuracy and F1 score for text results
587
+ text_accuracy = accuracy_score(
588
+ text_results["True Labels"], text_results["Predictions"]
589
+ )
590
+ text_f1 = f1_score(
591
+ text_results["True Labels"], text_results["Predictions"], average="macro"
592
+ )
593
+
594
+ # Calculate accuracy and F1 score for image results
595
+ image_accuracy = accuracy_score(
596
+ image_results["True Labels"], image_results["Predictions"]
597
+ )
598
+ image_f1 = f1_score(
599
+ image_results["True Labels"], image_results["Predictions"], average="macro"
600
+ )
601
+
602
+ # Check multimodal performance
603
+ assert multimodal_accuracy > multimodal_accuracy_threshold, (
604
+ f"Multimodal accuracy is below {multimodal_accuracy_threshold}"
605
+ )
606
+ assert multimodal_f1 > multimodal_f1_threshold, (
607
+ f"Multimodal F1 score is below {multimodal_f1_threshold}"
608
+ )
609
+
610
+ # Check text performance
611
+ assert text_accuracy > text_accuracy_threshold, (
612
+ f"Text accuracy is below {text_accuracy_threshold}"
613
+ )
614
+ assert text_f1 > text_f1_threshold, f"Text F1 score is below {text_f1_threshold}"
615
+
616
+ # Check image performance
617
+ assert image_accuracy > image_accuracy_threshold, (
618
+ f"Image accuracy is below {image_accuracy_threshold}"
619
+ )
620
+ assert image_f1 > image_f1_threshold, (
621
+ f"Image F1 score is below {image_f1_threshold}"
622
+ )
623
+
624
+
625
+ if __name__ == "__main__":
626
+ pytest.main()
tests/test_nlp_models.py ADDED
@@ -0,0 +1,82 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ import pandas as pd
3
+ import pytest
4
+ from transformers import AutoModel, AutoTokenizer
5
+
6
+ from src.nlp_models import HuggingFaceEmbeddings
7
+
8
+ # import torch
9
+ # import os
10
+
11
+ ####################################################################################################
12
+ ################################## Test the Text Embeddings Model ##################################
13
+ ####################################################################################################
14
+
15
+
16
+ @pytest.fixture
17
+ def mock_text_data(tmp_path):
18
+ """
19
+ Fixture to create a mock CSV file with text data for testing.
20
+ """
21
+ data = {"description": ["Product 1 description", "Product 2 description"]}
22
+ df = pd.DataFrame(data)
23
+ file_path = tmp_path / "test_text_data.csv"
24
+ df.to_csv(file_path, index=False)
25
+ return str(file_path)
26
+
27
+
28
+ @pytest.mark.parametrize(
29
+ "model_name, expected_hidden_size",
30
+ [
31
+ ("sentence-transformers/all-MiniLM-L6-v2", 384), # MiniLM with 384 hidden units
32
+ # ('bert-base-uncased', 768), # BERT base with 768 hidden units
33
+ ],
34
+ )
35
+ def test_huggingface_embeddings_generic(
36
+ model_name, expected_hidden_size, mock_text_data
37
+ ):
38
+ """
39
+ Generic test for loading a Hugging Face model, generating text embeddings, and saving them to a CSV file.
40
+
41
+ This test ensures that:
42
+ - The model and tokenizer are properly loaded from Hugging Face.
43
+ - Embeddings are correctly generated for text descriptions.
44
+ - Embeddings are saved in the correct format to a CSV file.
45
+
46
+ Parameters:
47
+ ----------
48
+ model_name : str
49
+ The name of the Hugging Face model to test.
50
+ expected_hidden_size : int
51
+ The expected hidden size (dimensionality) of the embeddings generated by the model.
52
+ mock_text_data : str
53
+ Path to the mock CSV file containing text descriptions.
54
+ """
55
+ # Initialize the HuggingFaceEmbeddings model with the provided model name
56
+ model = HuggingFaceEmbeddings(
57
+ model_name=model_name, path=mock_text_data, device="cpu"
58
+ )
59
+
60
+ # Check that the tokenizer and model were loaded correctly
61
+ assert isinstance(
62
+ model.tokenizer, type(AutoTokenizer.from_pretrained(model_name))
63
+ ), (
64
+ f"Tokenizer should be an instance of {type(AutoTokenizer.from_pretrained(model_name))}"
65
+ )
66
+ assert isinstance(model.model, type(AutoModel.from_pretrained(model_name))), (
67
+ f"Model should be an instance of {type(AutoModel.from_pretrained(model_name))}"
68
+ )
69
+
70
+ # Generate embeddings for a sample text
71
+ sample_text = "This is a test description."
72
+ embeddings = model.get_embedding(sample_text)
73
+
74
+ # Check that the embeddings are a NumPy array with the expected shape
75
+ assert isinstance(embeddings, np.ndarray), "Embeddings should be a NumPy array"
76
+ assert embeddings.shape == (expected_hidden_size,), (
77
+ f"Embeddings shape should be ({expected_hidden_size},), got {embeddings.shape}"
78
+ )
79
+
80
+
81
+ if __name__ == "__main__":
82
+ pytest.main()
tests/test_utils.py ADDED
@@ -0,0 +1,93 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # import numpy as np
2
+ # import os
3
+
4
+ # from src.utils import preprocess_data
5
+ # from sklearn.model_selection import train_test_split
6
+
7
+
8
+ import numpy as np
9
+ import pandas as pd
10
+ import pytest
11
+
12
+ from src.utils import train_test_split_and_feature_extraction
13
+
14
+ ####################################################################################################
15
+ ######################### Test the Train-Test Split and variable selection #########################
16
+ ####################################################################################################
17
+
18
+
19
+ @pytest.fixture
20
+ def big_fake_data():
21
+ # Create a fake dataset with 100 rows
22
+ num_rows = 100
23
+ num_image_columns = 10
24
+ num_text_columns = 11
25
+
26
+ data = {
27
+ "id": np.arange(1, num_rows + 1),
28
+ "image": [f"path/{i}.jpg" for i in range(1, num_rows + 1)],
29
+ }
30
+
31
+ # Add image_0 to image_9 columns
32
+ for i in range(num_image_columns):
33
+ data[f"image_{i}"] = np.random.rand(num_rows)
34
+
35
+ # Add text_0 to text_10 columns
36
+ for i in range(num_text_columns):
37
+ data[f"text_{i}"] = np.random.rand(num_rows)
38
+
39
+ # Add a class_id column
40
+ data["class_id"] = np.random.choice(["label1", "label2", "label3"], size=num_rows)
41
+
42
+ return pd.DataFrame(data)
43
+
44
+
45
+ def test_train_test_split_and_feature_extraction(big_fake_data):
46
+ # Split the data and extract features and labels
47
+ train_df, test_df, text_columns, image_columns, label_columns = (
48
+ train_test_split_and_feature_extraction(
49
+ big_fake_data, test_size=0.3, random_state=42
50
+ )
51
+ )
52
+
53
+ # Check that the correct columns were identified
54
+ assert text_columns == [f"text_{i}" for i in range(11)], (
55
+ "The text embedding columns extraction is incorrect"
56
+ )
57
+ assert image_columns == [f"image_{i}" for i in range(10)], (
58
+ "The image embedding columns extraction is incorrect"
59
+ )
60
+ assert label_columns == ["class_id"], (
61
+ "The label column extraction is incorrect, should be 'class_id'"
62
+ )
63
+
64
+ # Check if 'image' is in the columns
65
+ assert "image" not in image_columns, (
66
+ "'image' column is not part of the embedding columns"
67
+ )
68
+
69
+ # Check the train-test split sizes (30% of 100 rows should be 70 train, 30 test)
70
+ assert len(train_df) == 70, f"Train size should be 70%, but got {len(train_df)}%"
71
+ assert len(test_df) == 30, f"Test size should be 30%, but got {len(test_df)}%"
72
+
73
+ # Check random state consistency by ensuring the split results are reproducible
74
+ expected_train_indices = train_df.index.tolist()
75
+ expected_test_indices = test_df.index.tolist()
76
+
77
+ # Re-run the function to check for consistency in split
78
+ train_df_recheck, test_df_recheck, _, _, _ = (
79
+ train_test_split_and_feature_extraction(
80
+ big_fake_data, test_size=0.3, random_state=42
81
+ )
82
+ )
83
+
84
+ assert expected_train_indices == train_df_recheck.index.tolist(), (
85
+ "Train set indices are not consistent with the random state"
86
+ )
87
+ assert expected_test_indices == test_df_recheck.index.tolist(), (
88
+ "Test set indices are not consistent with the random state"
89
+ )
90
+
91
+
92
+ if __name__ == "__main__":
93
+ pytest.main()
tests/test_vision_embeddings_tf.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # import os
2
+ # import pandas as pd
3
+
4
+ # from src.vision_embeddings_tf import get_embeddings_df
5
+
6
+
7
+ import numpy as np
8
+ import pytest
9
+ from PIL import Image
10
+ from tensorflow.keras.applications import ResNet50
11
+ from transformers import TFConvNextV2Model
12
+
13
+ from src.vision_embeddings_tf import FoundationalCVModel, load_and_preprocess_image
14
+
15
+ # Run tests with CPU and not GPU (custom added)
16
+ # os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
17
+
18
+
19
+ ####################################################################################################
20
+ #################### Test the foundational CV model and image preprocessing ########################
21
+ ####################################################################################################
22
+ @pytest.fixture
23
+ def mock_image(tmp_path):
24
+ """
25
+ Fixture to create a mock image for testing.
26
+ """
27
+ img_path = tmp_path / "test_image.jpg"
28
+ img = Image.new("RGB", (300, 300), color="red")
29
+ img.save(img_path)
30
+ return str(img_path)
31
+
32
+
33
+ def test_load_and_preprocess_image(mock_image):
34
+ """
35
+ Test loading and preprocessing of an image.
36
+ """
37
+ # Test the load_and_preprocess_image function
38
+ img = load_and_preprocess_image(mock_image, target_size=(224, 224))
39
+
40
+ # Check if the output is a numpy array
41
+ assert isinstance(img, np.ndarray), "Output is not a numpy array"
42
+
43
+ # Check if the image has the correct shape
44
+ assert img.shape == (224, 224, 3), (
45
+ f"Image shape is {img.shape}, expected (224, 224, 3)"
46
+ )
47
+
48
+ # Check if the pixel values are in the range [0, 1]
49
+ assert img.min() >= 0 and img.max() <= 1, (
50
+ "Image pixel values are not in the range [0, 1]"
51
+ )
52
+
53
+
54
+ @pytest.mark.parametrize(
55
+ "backbone, expected_model_class, expected_output_shape",
56
+ [
57
+ ("resnet50", type(ResNet50()), (2048,)), # Keras ResNet50 with 2048 features
58
+ (
59
+ "convnextv2_tiny",
60
+ TFConvNextV2Model,
61
+ (768,),
62
+ ), # ConvNeXt V2 Tiny from Hugging Face with 768 features
63
+ ],
64
+ )
65
+ def test_foundational_cv_model_generic(
66
+ backbone, expected_model_class, expected_output_shape
67
+ ):
68
+ """
69
+ Generic test for loading a foundational CV model and making predictions.
70
+
71
+ This test ensures that:
72
+ - The correct backbone model is loaded.
73
+ - The input shape matches the model's requirements (224x224x3).
74
+ - The output embedding shape matches the expected shape for the backbone.
75
+
76
+ Parameters:
77
+ ----------
78
+ backbone : str
79
+ The name of the model backbone to test.
80
+ expected_model_class : class
81
+ The expected class of the loaded backbone model (e.g., ResNet50 or TFConvNextV2Model).
82
+ expected_output_shape : tuple
83
+ The expected shape of the output embedding vector.
84
+ """
85
+ # Initialize the model with the provided backbone
86
+ model = FoundationalCVModel(backbone=backbone, mode="eval")
87
+
88
+ # Check if the model is an instance of the expected model class
89
+ assert isinstance(model.base_model, expected_model_class), (
90
+ f"Expected model class {expected_model_class}, got {type(model.model)}"
91
+ )
92
+
93
+ # Create a batch of random images (2 images of shape 224x224x3)
94
+ batch_images = np.random.rand(2, 224, 224, 3)
95
+
96
+ # Ensure that the input shape matches the model's input requirements
97
+ assert model.model.input_shape == (None, 224, 224, 3), (
98
+ f"Expected input shape (None, 224, 224, 3), got {model.model.input_shape}"
99
+ )
100
+
101
+ # Ensure that the output shape matches the expected output shape without using the model.predict method
102
+ output = model.get_output_shape()
103
+
104
+ assert output == (None, *expected_output_shape), (
105
+ f"Expected output shape (None, {expected_output_shape}), got {output}"
106
+ )
107
+
108
+
109
+ if __name__ == "__main__":
110
+ pytest.main()