KB-VQA-E / my_model /utilities.py
m7mdal7aj's picture
Update my_model/utilities.py
31229df verified
raw
history blame
10.2 kB
import pandas as pd
from collections import Counter
import json
import os
from PIL import Image
import numpy as np
import torch
import matplotlib.pyplot as plt
from IPython import get_ipython
import sys
import gc
import streamlit as st
class VQADataProcessor:
"""
A class to process OKVQA dataset.
Attributes:
questions_file_path (str): The file path for the questions JSON file.
annotations_file_path (str): The file path for the annotations JSON file.
questions (list): List of questions extracted from the JSON file.
annotations (list): List of annotations extracted from the JSON file.
df_questions (DataFrame): DataFrame created from the questions list.
df_answers (DataFrame): DataFrame created from the annotations list.
merged_df (DataFrame): DataFrame resulting from merging questions and answers.
"""
def __init__(self, questions_file_path, annotations_file_path):
"""
Initializes the VQADataProcessor with file paths for questions and annotations.
Parameters:
questions_file_path (str): The file path for the questions JSON file.
annotations_file_path (str): The file path for the annotations JSON file.
"""
self.questions_file_path = questions_file_path
self.annotations_file_path = annotations_file_path
self.questions, self.annotations = self.read_json_files()
self.df_questions = pd.DataFrame(self.questions)
self.df_answers = pd.DataFrame(self.annotations)
self.merged_df = None
def read_json_files(self):
"""
Reads the JSON files for questions and annotations.
Returns:
tuple: A tuple containing two lists: questions and annotations.
"""
with open(self.questions_file_path, 'r') as file:
data = json.load(file)
questions = data['questions']
with open(self.annotations_file_path, 'r') as file:
data = json.load(file)
annotations = data['annotations']
return questions, annotations
@staticmethod
def find_most_frequent(my_list):
"""
Finds the most frequent item in a list.
Parameters:
my_list (list): A list of items.
Returns:
The most frequent item in the list. Returns None if the list is empty.
"""
if not my_list:
return None
counter = Counter(my_list)
most_common = counter.most_common(1)
return most_common[0][0]
def merge_dataframes(self):
"""
Merges the questions and answers DataFrames on 'question_id' and 'image_id'.
"""
self.merged_df = pd.merge(self.df_questions, self.df_answers, on=['question_id', 'image_id'])
def join_words_with_hyphen(self, sentence):
return '-'.join(sentence.split())
def process_answers(self):
"""
Processes the answers by extracting raw and processed answers and finding the most frequent ones.
"""
if self.merged_df is not None:
self.merged_df['raw_answers'] = self.merged_df['answers'].apply(lambda x: [ans['raw_answer'] for ans in x])
self.merged_df['processed_answers'] = self.merged_df['answers'].apply(
lambda x: [ans['answer'] for ans in x])
self.merged_df['most_frequent_raw_answer'] = self.merged_df['raw_answers'].apply(self.find_most_frequent)
self.merged_df['most_frequent_processed_answer'] = self.merged_df['processed_answers'].apply(
self.find_most_frequent)
self.merged_df.drop(columns=['answers'], inplace=True)
else:
print("DataFrames have not been merged yet.")
# Apply the function to the 'most_frequent_processed_answer' column
self.merged_df['single_word_answers'] = self.merged_df['most_frequent_processed_answer'].apply(
self.join_words_with_hyphen)
def get_processed_data(self):
"""
Retrieves the processed DataFrame.
Returns:
DataFrame: The processed DataFrame. Returns None if the DataFrame is empty or not processed.
"""
if self.merged_df is not None:
return self.merged_df
else:
print("DataFrame is empty or not processed yet.")
return None
def save_to_csv(self, df, saved_file_name):
if saved_file_name is not None:
if ".csv" not in saved_file_name:
df.to_csv(os.path.join(saved_file_name, ".csv"), index=None)
else:
df.to_csv(saved_file_name, index=None)
else:
df.to_csv("data.csv", index=None)
def display_dataframe(self):
"""
Displays the processed DataFrame.
"""
if self.merged_df is not None:
print(self.merged_df)
else:
print("DataFrame is empty.")
def process_okvqa_dataset(questions_file_path, annotations_file_path, save_to_csv=False, saved_file_name=None):
"""
Processes the OK-VQA dataset given the file paths for questions and annotations.
Parameters:
questions_file_path (str): The file path for the questions JSON file.
annotations_file_path (str): The file path for the annotations JSON file.
Returns:
DataFrame: The processed DataFrame containing merged and processed VQA data.
"""
# Create an instance of the class
processor = VQADataProcessor(questions_file_path, annotations_file_path)
# Process the data
processor.merge_dataframes()
processor.process_answers()
# Retrieve the processed DataFrame
processed_data = processor.get_processed_data()
if save_to_csv:
processor.save_to_csv(processed_data, saved_file_name)
return processed_data
def show_image(image):
"""
Display an image in various environments (Jupyter, PyCharm, Hugging Face Spaces).
Handles different types of image inputs (file path, PIL Image, numpy array, OpenCV, PyTorch tensor).
Args:
image (str or PIL.Image or numpy.ndarray or torch.Tensor): The image to display.
"""
in_jupyter = is_jupyter_notebook()
in_colab = is_google_colab()
# Convert image to PIL Image if it's a file path, numpy array, or PyTorch tensor
if isinstance(image, str):
if os.path.isfile(image):
image = Image.open(image)
else:
raise ValueError("File path provided does not exist.")
elif isinstance(image, np.ndarray):
if image.ndim == 3 and image.shape[2] in [3, 4]:
image = Image.fromarray(image[..., ::-1] if image.shape[2] == 3 else image)
else:
image = Image.fromarray(image)
elif torch.is_tensor(image):
image = Image.fromarray(image.permute(1, 2, 0).numpy().astype(np.uint8))
# Display the image
if in_jupyter or in_colab:
from IPython.display import display
display(image)
else:
image.show()
def show_image_with_matplotlib(image):
if isinstance(image, str):
image = Image.open(image)
elif isinstance(image, np.ndarray):
image = Image.fromarray(image)
elif torch.is_tensor(image):
image = Image.fromarray(image.permute(1, 2, 0).numpy().astype(np.uint8))
plt.imshow(image)
plt.axis('off') # Turn off axis numbers
plt.show()
def is_jupyter_notebook():
"""
Check if the code is running in a Jupyter notebook.
Returns:
bool: True if running in a Jupyter notebook, False otherwise.
"""
try:
from IPython import get_ipython
if 'IPKernelApp' not in get_ipython().config:
return False
if 'ipykernel' in str(type(get_ipython())):
return True # Running in Jupyter Notebook
except (NameError, AttributeError):
return False # Not running in Jupyter Notebook
return False # Default to False if none of the above conditions are met
def is_pycharm():
return 'PYCHARM_HOSTED' in os.environ
def is_google_colab():
return 'COLAB_GPU' in os.environ or 'google.colab' in sys.modules
def get_image_path(name, path_type):
"""
Generates a path for models, images, or data based on the specified type.
Args:
name (str): The name of the model, image, or data folder/file.
path_type (str): The type of path needed ('models', 'images', or 'data').
Returns:
str: The full path to the specified resource.
"""
# Get the current working directory (assumed to be inside 'code' folder)
current_dir = os.getcwd()
# Get the directory one level up (the parent directory)
parent_dir = os.path.dirname(current_dir)
# Construct the path to the specified folder
folder_path = os.path.join(parent_dir, path_type)
# Construct the full path to the specific resource
full_path = os.path.join(folder_path, name)
return full_path
def get_model_path(model_name):
"""
Get the path to the 'model1' folder.
Returns:
str: Absolute path to the 'model1' folder.
"""
# Directory of the current script
current_script_dir = os.path.dirname(os.path.abspath(__file__))
# Directory of the 'my_project' folder (parent of the 'my_model' folder)
project_dir = os.path.dirname(current_script_dir)
# Path to the 'models/{model_name}' folder
model_path = os.path.join(project_dir, "models", model_name)
return model_path
def free_gpu_resources():
"""
Clears GPU memory.
"""
try:
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.empty_cache()
gc.collect()
gc.collect()
st.write("GPU memory has been cleared.")
if __name__ == "__main__":
pass
#val_data = process_okvqa_dataset('OpenEnded_mscoco_val2014_questions.json', 'mscoco_val2014_annotations.json', save_to_csv=True, saved_file_name="okvqa_val.csv")
#train_data = process_okvqa_dataset('OpenEnded_mscoco_train2014_questions.json', 'mscoco_train2014_annotations.json', save_to_csv=True, saved_file_name="okvqa_train.csv")