|
import os |
|
import pandas as pd |
|
import gzip |
|
import pickle |
|
import numpy as np |
|
|
|
def get_file_path_end(file_path): |
|
|
|
basename = os.path.basename(file_path) |
|
|
|
|
|
filename_without_extension, _ = os.path.splitext(basename) |
|
|
|
|
|
|
|
return filename_without_extension |
|
|
|
def get_file_path_end_with_ext(file_path): |
|
|
|
basename = os.path.basename(file_path) |
|
|
|
return basename |
|
|
|
def ensure_output_folder_exists(): |
|
"""Checks if the 'output/' folder exists, creates it if not.""" |
|
|
|
folder_name = "output/" |
|
|
|
if not os.path.exists(folder_name): |
|
|
|
os.makedirs(folder_name) |
|
print(f"Created the 'output/' folder.") |
|
else: |
|
print(f"The 'output/' folder already exists.") |
|
|
|
def detect_file_type(filename): |
|
"""Detect the file type based on its extension.""" |
|
if (filename.endswith('.csv')) | (filename.endswith('.csv.gz')) | (filename.endswith('.zip')): |
|
return 'csv' |
|
elif filename.endswith('.xlsx'): |
|
return 'xlsx' |
|
elif filename.endswith('.parquet'): |
|
return 'parquet' |
|
elif filename.endswith('.pkl.gz'): |
|
return 'pkl.gz' |
|
elif filename.endswith('.pkl'): |
|
return 'pkl' |
|
elif filename.endswith('.npz'): |
|
return 'npz' |
|
else: |
|
raise ValueError("Unsupported file type.") |
|
|
|
|
|
def read_file(filename, headers=0): |
|
"""Read the file based on its detected type.""" |
|
file_type = detect_file_type(filename) |
|
|
|
print("Loading in file") |
|
|
|
if file_type == 'csv': |
|
file = pd.read_csv(filename, low_memory=False, header=headers) |
|
elif file_type == 'xlsx': |
|
file = pd.read_excel(filename, header=headers) |
|
elif file_type == 'parquet': |
|
file = pd.read_parquet(filename, header = headers) |
|
elif file_type == 'pkl.gz': |
|
with gzip.open(filename, 'rb') as file: |
|
file = pickle.load(file) |
|
|
|
elif file_type == 'npz': |
|
file = np.load(filename)['arr_0'] |
|
|
|
|
|
if "compress" in filename: |
|
file /= 100 |
|
|
|
print("File load complete") |
|
|
|
return file |
|
|
|
|
|
def add_folder_to_path(folder_path: str): |
|
''' |
|
Check if a folder exists on your system. If so, get the absolute path and then add it to the system Path variable if it doesn't already exist. |
|
''' |
|
|
|
if os.path.exists(folder_path) and os.path.isdir(folder_path): |
|
print(folder_path, "folder exists.") |
|
|
|
|
|
absolute_path = os.path.abspath(folder_path) |
|
|
|
current_path = os.environ['PATH'] |
|
if absolute_path not in current_path.split(os.pathsep): |
|
full_path_extension = absolute_path + os.pathsep + current_path |
|
os.environ['PATH'] = full_path_extension |
|
print(f"Updated PATH with: ", full_path_extension) |
|
else: |
|
print(f"Directory {folder_path} already exists in PATH.") |
|
else: |
|
print(f"Folder not found at {folder_path} - not added to PATH") |
|
|
|
def custom_regex_load(in_file, headers = None): |
|
''' |
|
When file is loaded, update the column dropdown choices and write to relevant data states. |
|
''' |
|
|
|
custom_regex = pd.DataFrame() |
|
|
|
file_list = [string.name for string in in_file] |
|
|
|
regex_file_names = [string for string in file_list if "csv" in string.lower()] |
|
if regex_file_names: |
|
regex_file_name = regex_file_names[0] |
|
custom_regex = read_file(regex_file_name, headers) |
|
|
|
|
|
output_text = "Data file loaded." |
|
print(output_text) |
|
else: |
|
error = "No regex file provided." |
|
print(error) |
|
output_text = error |
|
return error, custom_regex |
|
|
|
return output_text, custom_regex |
|
|