Spaces:
Runtime error
Runtime error
#!/usr/bin/env python | |
# coding: utf-8 | |
# In[1]: | |
from fastai.data.all import * | |
from fastai.vision.all import * | |
import cv2 | |
import os | |
from pathlib import Path | |
import pandas as pd | |
# Load your CSV file into a pandas DataFrame | |
path_csv_combined = Path('D:\\Documents\\Machine Learning - Glaucoma\\combined_csv.csv') | |
# Define the image path with images from all three databases combined | |
path_image_combined = Path('D:\\Documents\\Machine Learning - Glaucoma\\combined images') | |
# Load dataframe | |
combined_df = pd.read_csv(path_csv_combined) | |
# In[2]: | |
from sklearn.model_selection import train_test_split | |
train_df, test_df = train_test_split(combined_df, test_size=0.15, random_state=42, stratify=combined_df['label']) | |
train_df, val_df = train_test_split(train_df, test_size=0.15, random_state=42, stratify=train_df['label']) | |
# Display the sizes of the datasets | |
print(f"Training set size: {len(train_df)} samples") | |
print(f"Validation set size: {len(val_df)} samples") | |
print(f"Test set size: {len(test_df)} samples") | |
# In[3]: | |
print(combined_df['label'].value_counts()) | |
# In[4]: | |
import matplotlib.pyplot as plt | |
combined_df['label'].value_counts().plot(kind='bar') | |
plt.title('Class distribution') | |
plt.xlabel('Class') | |
plt.ylabel('Count') | |
plt.show() | |
# In[5]: | |
print(train_df['label'].value_counts()) | |
# In[6]: | |
import matplotlib.pyplot as plt | |
train_df['label'].value_counts().plot(kind='bar') | |
plt.title('Class distribution') | |
plt.xlabel('Class') | |
plt.ylabel('Count') | |
plt.show() | |
# In[7]: | |
import cv2 | |
import numpy as np | |
from skimage import filters | |
from PIL import Image | |
# Define how to get the labels | |
def get_y(row): | |
return row['label'] # adjust this depending on how your csv is structured | |
# Define the transformations | |
def custom_transform(image_path): | |
image = cv2.imread(str(image_path)) # Read the image file. | |
if image is None: | |
return None | |
# Convert the image from BGR to RGB | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
# Apply filters and transformations | |
# Gaussian filter | |
image = cv2.GaussianBlur(image, (5, 5), 0) | |
# Histogram Equalization | |
img_yuv = cv2.cvtColor(image, cv2.COLOR_RGB2YUV) | |
img_yuv[:,:,0] = cv2.equalizeHist(img_yuv[:,:,0]) | |
image = cv2.cvtColor(img_yuv, cv2.COLOR_YUV2RGB) | |
# Median filter | |
image = cv2.medianBlur(image, 3) | |
# Bypass filter (leaving the image unchanged) | |
# (add any specific implementation if needed) | |
# Sharpening filter | |
kernel = np.array([[0, -1, 0], | |
[-1, 5,-1], | |
[0, -1, 0]]) | |
image = cv2.filter2D(image, -1, kernel) | |
# Resize the image to a target size of 224x224 pixels. | |
image = cv2.resize(image, (224, 224)) | |
return image | |
from albumentations import ( | |
Compose, Rotate, RandomBrightnessContrast, OpticalDistortion, IAAPerspective | |
) | |
import albumentations.pytorch as A | |
def additional_augmentations(image): | |
transform = Compose([ | |
Rotate(limit=10, p=0.75), # max_rotate=10.0 | |
RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.75), # max_lighting=0.2, p_lighting=0.75 | |
OpticalDistortion(distort_limit=0.2, shift_limit=0.2, p=0.75), # max_warp=0.2, p_affine=0.75 | |
# No flipping is performed as do_flip and flip_vert are both set to False | |
], p=1) # p=1 ensures that the augmentations are always applied | |
augmented_image = transform(image=image)['image'] | |
return augmented_image | |
def get_x(row, is_test=False): | |
image_path = path_image_combined / (row['id_code']) | |
transformed_image = custom_transform(image_path) | |
# Check the label of the current image and apply augmentations if it belongs to the minority class | |
if not is_test and row['label'] == 1: | |
transformed_image = additional_augmentations(transformed_image) | |
return Image.fromarray(transformed_image) | |
# Define a DataBlock | |
dblock = DataBlock( | |
blocks=(ImageBlock(cls=PILImage), CategoryBlock), | |
get_x=get_x, | |
get_y=get_y, | |
item_tfms=None, | |
batch_tfms=None) | |
# Create a DataLoader for training data | |
dls = dblock.dataloaders(train_df,bs = 128) | |
# In[8]: | |
#Print the first few rows of the 'df_train' DataFrame | |
print(train_df.head()) | |
# In[9]: | |
# Extract all the rows from the training dataset where the 'label' column has a value of 0, | |
# which represents the majority class in this context. | |
majority_class = train_df[train_df['label'] == 0] | |
# Extract all the rows from the training dataset where the 'label' column has a value of 1, | |
# which represents the minority class in this context. | |
minority_class = train_df[train_df['label'] == 1] | |
# In[10]: | |
# Oversample the minority class to have the same number of samples as the majority class | |
oversampled_minority_class = minority_class.sample(n=len(majority_class), replace=True, random_state=42) | |
# Concatenate the oversampled minority class DataFrame with the majority class DataFrame to create a balanced dataset | |
oversampled_train_df = pd.concat([majority_class, oversampled_minority_class], axis=0) | |
# Shuffle the oversampled DataFrame to ensure a random distribution of classes | |
oversampled_train_df = oversampled_train_df.sample(frac=1, random_state=42).reset_index(drop=True) | |
# Create a DataLoader using the balanced DataFrame and a batch size of 128 | |
dls = dblock.dataloaders(oversampled_train_df, bs=128) | |
# In[11]: | |
#Display a batch of data from the training dataloader | |
dls.show_batch() | |
# In[12]: | |
from fastai.metrics import AccumMetric | |
from sklearn.metrics import roc_auc_score | |
def custom_roc_auc_score(preds, targs): | |
# preds are assumed to be from a binary classification model with n_out=2 | |
# taking the probability of the positive class (usually the second column) | |
probs = preds[:, 1] | |
return roc_auc_score(targs, probs) | |
# Now use this custom metric in your learner | |
learn = cnn_learner(dls, resnet50, | |
n_out=2, # For binary classification | |
loss_func=CrossEntropyLossFlat(), | |
metrics=[ | |
accuracy, | |
Precision(average='binary'), | |
Recall(average='binary'), | |
F1Score(average='binary'), | |
AccumMetric(custom_roc_auc_score, flatten=False) # Custom ROC AUC | |
], | |
cbs=[ | |
EarlyStoppingCallback(monitor='valid_loss', patience=3), | |
SaveModelCallback(monitor='valid_loss', fname='best_model') | |
] | |
) | |
# In[13]: | |
# Train the model | |
# Monitor the loss during training; it should typically decrease over epochs | |
learn.fit_one_cycle(10, 5e-02) | |
# In[14]: | |
interp = ClassificationInterpretation.from_learner(learn) | |
interp.plot_confusion_matrix(figsize=(8,8)) | |
# In[15]: | |
from sklearn.metrics import accuracy_score, precision_score, classification_report | |
# Assuming dls is your DataLoaders object | |
test_dl = dls.test_dl(test_df, with_labels=True) | |
# Modify the get_x function in the DataLoader to indicate it's for testing | |
test_dl.dataset.get_x = partial(get_x, is_test=True) | |
# Get predictions and targets | |
preds, targs = learn.get_preds(dl=test_dl) | |
# Get the prediction indices | |
preds_argmax = preds.argmax(dim=-1) | |
# Calculate and print accuracy, precision, and other metrics | |
accuracy = accuracy_score(targs.numpy(), preds_argmax.numpy()) | |
print(f'Accuracy: {accuracy * 100:.2f}%') | |
precision = precision_score(targs.numpy(), preds_argmax.numpy()) | |
print(f'Precision: {precision * 100:.2f}%') | |
report = classification_report(targs.numpy(), preds_argmax.numpy()) | |
print(report) | |
# In[16]: | |
import os | |
from fastai.vision.all import * | |
# Export the model to the directory | |
model_export_path = 'D:/Documents/Machine Learning - Glaucoma/your_model.pkl' | |
learn.export(model_export_path) | |