import requests from requests import Response import re import usaddress from typing import List, Dict import pandas as pd from scourgify import normalize_address_record import warnings from bs4 import BeautifulSoup, SoupStrainer from bs4.element import Tag warnings.filterwarnings(category=DeprecationWarning, action='ignore') import xmltodict from tqdm import tqdm ### need to refactor these into a special class def nevada(input_path): table_id = "ctl04_mobjContributions_dgContributions" nv_df = pd.read_html(input_path, attrs={"id": table_id}, header=0)[0] # try: nv_df["full_name"] = nv_df[ "NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO MADE CONTRIBUTION" ].apply(lambda x: " ".join(x.split()[:2])) # except AttributeError: # raise RuntimeError("File had no valid data when attempting to read. Please check that you are not passing an empty form!") nv_df["full_address"] = nv_df[ "NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO MADE CONTRIBUTION" ].apply(lambda x: " ".join(x.split()[2:]).lower()) nv_df = nv_df.rename( columns={ "AMOUNT OF CONTRIBUTION": "donation_amount", "DATE OF CONTRIBUTION": "donation_date", } ) print(nv_df.columns) nv_df.drop( columns=[ "NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO MADE CONTRIBUTION", "CHECK HERE IF LOAN", "NAME AND ADDRESS OF 3rd PARTY IF LOAN GUARANTEED BY 3rd PARTY", "NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO FORGAVE THE LOAN, IF DIFFERENT THAN CONTRIBUTOR", ], inplace=True, ) nv_df["donation_date"] = nv_df["donation_date"].apply(lambda x: "".join(x[:10])) nv_df["donation_amount"] = nv_df["donation_amount"].apply( lambda x: "".join(x.split("$")[-1]) ) nv_df["donation_amount"] = nv_df["donation_amount"].apply( lambda x: "".join(x.split(".")[0]).replace(",", "") ) # nv_df = nv_df[ # nv_df["full_name"].str.contains(r",|\.|\$|\&|\'|\d+", regex=True) is False # ] nv_df["full_address"] = nv_df["full_address"].str.replace( r"[^A-Za-z0-9\s+]", "", regex=True ) nv_df.drop_duplicates("full_name", inplace=True) return nv_df #### virginia parsers def get_downloadable_reports(link: str) -> List[str]: va_base_url = "https://cfreports.elections.virginia.gov/Report/ReportXML/{report_id}" resp = requests.get(link) parsed = BeautifulSoup(resp.content, "html.parser").find("div", {"id": "ScheduledReports"}) ids = [extract_va_report_id(i) for i in parsed.find_all("a", {"title": "Click to view report"})] return [va_base_url.format(report_id = rep_id) for rep_id in ids] def extract_va_report_id(tag: Tag) -> str: return str(tag['href']).split("/")[-1] def get_contributions_from_report(report: Response) -> List[Dict]: content = report.content try: report = xmltodict.parse(content)["Report"] except KeyError: report = xmltodict.parse(content)["FinalReport"] if report.get("ScheduleA", None): return report["ScheduleA"]["LiA"] return None def make_all_requests(report_urls: List[str]) -> list: results = [] for idx, i in enumerate(report_urls): print(idx) results.append(requests.get(i)) return results def parse_va_xml(xml_dict: Dict[str,str]) -> Dict: if xml_dict is not None and isinstance(xml_dict, dict): if xml_dict['Contributor']['@IsIndividual'] == 'true': return { 'first_name': xml_dict['Contributor'].get("FirstName", None), 'last_name': xml_dict['Contributor'].get("LastName", None), 'addr1': xml_dict['Contributor']['Address'].get("Line1", None), 'addr2': xml_dict['Contributor']['Address'].get("Line2", None), 'city': xml_dict['Contributor']['Address'].get("City", None), 'state': xml_dict['Contributor']['Address'].get("State", None), 'zip': xml_dict['Contributor']['Address'].get("ZipCode", None), 'donation_amount': xml_dict.get("Amount", None), 'donation_date': xml_dict.get("TransactionDate") } return None def virginia(report_url: str) -> pd.DataFrame: reports = get_downloadable_reports(report_url) requests = make_all_requests(reports) contributions = [] for r in requests: xml_dict = get_contributions_from_report(r) if xml_dict: contributions.extend(xml_dict) parsed_dicts = [parse_va_xml(x) for x in contributions if x] if not parsed_dicts: raise ValueError("No valid records for this candidate.") return pd.DataFrame.from_records([d for d in parsed_dicts if d]) ### html parsers for Kansas and Missouri def missouri(input_path): import pandas as pd with open(input_path) as html: df = pd.read_html(html)[0] return df def kansas(input_path): # Pre-compile regular expressions match_patterns = { "full_name": re.compile(r"lblContributor.*"), "addr1": re.compile(r"lblAddress_.*"), "addr2": re.compile(r"lblAddress2_.*"), "city": re.compile(r"lblCity.*"), "state": re.compile(r"lblState.*"), "zip": re.compile(r"lblZip.*"), "donation_date": re.compile(r"lblDate.*"), "donation_amount": re.compile(r"lblAmount.*"), } strainer = SoupStrainer("span", attrs={"id": list(match_patterns.values())}) with open(input_path) as html: soup = BeautifulSoup(html, parse_only=strainer, features='html.parser') parsed_data = {key: [] for key in match_patterns} for span in soup.find_all("span"): matched = False for key, pattern in match_patterns.items(): if pattern.match(span["id"]): parsed_data[key].append(span.text) matched = True break if not matched: # Append None or some placeholder to all lists if the span doesn't match any pattern for key in match_patterns: parsed_data[key].append(None) donors_df = pd.DataFrame(parsed_data) # Vectorized operation to strip dollar sign donors_df["donation_amount"] = donors_df["donation_amount"].str.lstrip("$") return donors_df def address_parse_attempt(address): date_regexp = r"(\d+(/|-){1}\d+(/|-){1}\d{2,4})" if address: has_date = re.search(date_regexp, address) if has_date: after_date_address = address[has_date.end() + 1 :] has_pobox = re.search("po box", after_date_address) or re.search( "p o box", after_date_address ) has_digit = re.search(r"\d", after_date_address) if has_pobox: final_address = after_date_address[has_pobox.start() :] elif has_digit: final_address = after_date_address[has_digit.start() :] else: final_address = "not_able_to_parse" else: has_pobox = re.search("po box", address) or re.search("p o box", address) has_digit = re.search(r"\d", address) if has_pobox: final_address = address[has_pobox.start() :] elif has_digit: final_address = address[has_digit.start() :] else: final_address = None return final_address def parse_pobox_address(address): if address: try: if "po box" in address or "p o box" in address: parsed_address = usaddress.parse(address) address_dict = {} address_dict["address_line_1"] = ( "po box " + [x for x in parsed_address if x[1] == "USPSBoxID"][0][0] ) address_dict["address_line_2"] = None address_dict["city"] = " ".join( [x[0] for x in parsed_address if x[1] == "PlaceName"] ) address_dict["state"] = [ x for x in parsed_address if x[1] == "StateName" ][0][0] address_dict["postal_code"] = [ x for x in parsed_address if x[1] == "ZipCode" ][0][0] return dict( (k.lower() if k else None, v.lower() if v else None) for k, v in address_dict.items() ) except: return None def normal_address(new_address): if new_address: try: parsed_address = normalize_address_record(new_address) return dict( (k.lower() if k else None, v.lower() if v else None) for k, v in parsed_address.items() ) except: return parse_pobox_address(new_address)