from src.constants import COLUMN_KEY_DICTIONARY, PREFERRED_HEADERS from src.add_funcs import virginia, nevada, kansas, missouri import pandas as pd from janitor import clean_names as pj_clean_names from bs4 import BeautifulSoup from pandas.errors import ParserError from pathlib import Path from csv import QUOTE_ALL, Sniffer import chardet import re from typing import Optional from pandas.errors import EmptyDataError import validators class ColumnCoder: def __init__(self, input_df: Optional[pd.DataFrame] = pd.DataFrame()): self.set_input_data(input_df) @property def column_matches(self): if not self.input_data.empty: return { k: COLUMN_KEY_DICTIONARY[k] for k in self.input_data.columns if k in COLUMN_KEY_DICTIONARY.keys() } def set_input_data(self, df: pd.DataFrame) -> "ColumnCoder": if isinstance(df, pd.DataFrame): self.input_data = pj_clean_names(df) if not df.empty else df else: self.input_data = pd.DataFrame() return self def apply_recodes(self) -> pd.DataFrame: if hasattr(self, "column_matches"): return self.input_data.rename(errors="raise", columns=self.column_matches) else: return "No mappings have been identified." class ColumnSelector: def __init__(self, input_df: pd.DataFrame, source: str): self.source = source self.data_to_reduce = input_df @property def preferred_headers(self): return ( PREFERRED_HEADERS.append("recipient_name") if self.source == "TN" else PREFERRED_HEADERS ) @property def data_to_reduce(self): return self._data_to_reduce @data_to_reduce.setter def data_to_reduce(self, value: pd.DataFrame): if not any(col in self.preferred_headers for col in value.columns): raise KeyError( "Passed pd.DataFrame had none of the correct output columns. Please check you are passing data from the correct stage." ) self._data_to_reduce = value def reduce_columns(self): col_counter = 0 for preferred_col in self.preferred_headers: if preferred_col not in self.data_to_reduce.columns: self.data_to_reduce[preferred_col] = "" col_counter += 1 print(f"Added {col_counter} column(s)") print("Selecting only preferred columns...") reduced_df = self.data_to_reduce[self.preferred_headers] return reduced_df class Ingester: def __init__(self, path: str, source): self.path = path if source == 'VA' else Path(path) self.encoding = self._get_encoding() self.delimiter = self._detect_delimiter() self.source = source @property def file_type(self): if isinstance(self.path, Path): return self.path.suffix return "url" def ingest(self) -> pd.DataFrame: if self.file_type == "url": return self._parse_xml() elif self.file_type in [".csv", ".txt"]: return self._parse_csv() elif self.file_type == ".html": return self._parse_html() elif self.file_type in [".xls", ".xlsx"]: return self._parse_excel() else: raise OSError(f"File extension {self.file_type} not supported.") def _get_encoding(self): if self.file_type == 'url': return None with open(self.path, "rb") as f: to_detect = f.read() return chardet.detect(to_detect)["encoding"] def _detect_delimiter(self) -> str: if self.file_type in ['.xlsx', '.xls', 'url']: return None if self.file_type == ['.txt']: return "\t" sniffer = Sniffer() with open(self.path, newline="", encoding=self.encoding) as csvfile: to_detect = csvfile.read(3000) try: return sniffer.sniff(to_detect).delimiter except Exception as e: return "," def _detect_row_skip(self) -> int: for rs in range(0,21): col_check = pd.read_csv( self.path, sep=self.delimiter, skiprows=rs, nrows=1, encoding=self.encoding, ).columns #print(col_check) if len([col for col in col_check if 'Unnamed' in col]) > 2: continue elif len(col_check) < 4: continue else: return rs return 0 def _detect_row_skip_excel(self) -> int: for rs in range(0,21): col_check = pd.read_excel( self.path, skiprows=rs, nrows=1, ).columns #print(col_check) if len([col for col in col_check if 'Unnamed' in col]) > 2: continue elif len(col_check) < 4: continue else: return rs return 0 def get_base_csv_params(self) -> dict: try: header_params = {'skiprows': self._detect_row_skip()} except EmptyDataError: header_params = {'skiprows': 0, 'header': None} return { "filepath_or_buffer": self.path, "sep": self.delimiter, "encoding": self.encoding, "skip_blank_lines": True, "index_col": False, **header_params } def _parse_csv_encoding_errors(self): return pd.read_csv(**self.get_base_csv_params(), encoding_errors="ignore") def _parse_csv_parser_error(self): return pd.read_csv( self.path, index_col=False, encoding="unicode_escape", encoding_errors="ignore", quoting=QUOTE_ALL, engine="python", quotechar='"', on_bad_lines="skip", ) def _parse_csv(self): if self.source == 'PA': tmp_file = pd.read_csv(self.path, on_bad_lines='skip', header=None) else: try: tmp_file = pd.read_csv(**self.get_base_csv_params()) except UnicodeDecodeError: tmp_file = self._parse_csv_encoding_errors() except ParserError: tmp_file = self._parse_csv_parser_error() return tmp_file def _parse_excel(self): if self.source == 'MNSTP_OLD': ### fix this cleanly later return pd.read_excel(self.path, skiprows=1, sheet_name="#3 Cash contributions > $100") else: row_skip = self._detect_row_skip_excel() return pd.read_excel(self.path, skiprows=row_skip) def _parse_xml(self): return virginia(self.path) def _parse_html(self): if self.source == "KS": with open(self.path, "r") as file: html = file.read() if len(html) > 46091493: raise MemoryError( "Please submit a file with fewer than 25,000 records!" ) tmp_file = kansas(self.path) elif self.source == "MO": tmp_file = missouri(self.path) elif self.source == "NV": tmp_file = nevada(self.path) elif self.source == "SA": self.path = self.path.rename(self.path.with_suffix(".html")) people = [] headers = ["full_name", "addr1", "city_state_zip"] soup = BeautifulSoup(open(self.path), "html.parser") tmp_dict = {} for i in soup.select("table")[0].find_all("tr"): for idx, j in enumerate(i.find_all("td")): if j.find("br"): tmp_dict = dict( zip( headers, [re.sub(r"[^A-Za-z0-9, ]", "", k) for k in j.strings], ) ) if idx == 4: tmp_dict["donation_amount"] = j.text if idx == 13: tmp_dict["donation_date"] = j.text people.append(tmp_dict) tmp_file = pd.DataFrame(people) return tmp_file class Polisher: def __init__(self): self.dedupe_cols = [ "first_name", "last_name", "full_name", "full_address", "addr1", "addr2", "city", "state", ] self.lower_cols = [ "first_name", "last_name", "full_name", "full_address", "addr1", "addr2", "city", ] def _na_to_empty_str(self, df: pd.DataFrame) -> pd.DataFrame: return df.fillna("") def _format_donations_col(self, df: pd.DataFrame) -> pd.Series: df["donation_amount"] = ( df["donation_amount"] .astype(str) .str.replace(r"\.\d+", "", regex=True) .replace(r",|\$", "", regex=True) .replace(r"[^0-9]", "", regex=True) .replace("", 0) .astype("int") ) return df.where((~df.donation_amount.isna()) & (df.donation_amount > 0)) def _parse_zips(self, input_df: pd.DataFrame) -> pd.DataFrame: input_df["zip"] = input_df["zip"].astype(str).str[:5] input_df["zip"] = input_df["zip"].str.replace(r"[^0-9]", "", regex=True) return input_df def _name_formats(self, input_df: pd.DataFrame) -> pd.DataFrame: def name_col_decider(row) -> str: if row["first_name"] == "" and row["last_name"] == "": return row["full_name"] elif row["first_name"] == "" and row["last_name"] != "": return row["full_name"] elif row["first_name"] != "" and row["last_name"] == "": return row["full_name"] else: return "" input_df["full_name"] = input_df.apply(name_col_decider, axis=1) return input_df def _format_str_cols(self, input_df: pd.DataFrame) -> pd.DataFrame: for col in input_df.columns: if col in self.lower_cols: input_df[col] = ( input_df[col].astype(str).map(lambda x: x.lower().strip()) ) input_df[col] = input_df[col].str.replace(r"[^a-z0-9 ]", "", regex=True) elif col == "state": input_df[col] = ( input_df[col].astype(str).map(lambda x: x.upper().strip()) ) return input_df def _dedupe(self, input_df: pd.DataFrame) -> pd.DataFrame: return input_df.drop_duplicates( subset=self.dedupe_cols, keep="first", ) def _filter_unusable(self, input_df: pd.DataFrame) -> pd.DataFrame: return input_df.query( "(full_address != '' or ((zip != '' or city != '') and state != '')) and (first_name != '' or last_name != '' or full_name != '')" ) def data_cleaner(self, input_df: pd.DataFrame): stage0 = self._name_formats(input_df=input_df) stage1 = self._na_to_empty_str(stage0) stage2 = self._format_donations_col(stage1) stage3 = self._parse_zips(stage2) stage4 = self._format_str_cols(stage3) return self._dedupe(self._filter_unusable(stage4))