donor-formatter / src /add_funcs.py
jer164's picture
finished total refactor
3c56fe8
raw
history blame
8.81 kB
import re
import usaddress
import pandas as pd
from scourgify import normalize_address_record
### need to refactor these into a special class
def nevada(input_path):
table_id = "ctl04_mobjContributions_dgContributions"
nv_df = pd.read_html(input_path, attrs={"id": table_id}, header=0)[0]
# try:
nv_df["full_name"] = nv_df[
"NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO MADE CONTRIBUTION"
].apply(lambda x: " ".join(x.split()[:2]))
# except AttributeError:
# raise RuntimeError("File had no valid data when attempting to read. Please check that you are not passing an empty form!")
nv_df["full_address"] = nv_df[
"NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO MADE CONTRIBUTION"
].apply(lambda x: " ".join(x.split()[2:]).lower())
nv_df = nv_df.rename(
columns={
"AMOUNT OF CONTRIBUTION": "donation_amount",
"DATE OF CONTRIBUTION": "donation_date",
}
)
print(nv_df.columns)
nv_df.drop(
columns=[
"NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO MADE CONTRIBUTION",
"CHECK HERE IF LOAN",
"NAME AND ADDRESS OF 3rd PARTY IF LOAN GUARANTEED BY 3rd PARTY",
"NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO FORGAVE THE LOAN, IF DIFFERENT THAN CONTRIBUTOR",
],
inplace=True,
)
nv_df["donation_date"] = nv_df["donation_date"].apply(lambda x: "".join(x[:10]))
nv_df["donation_amount"] = nv_df["donation_amount"].apply(
lambda x: "".join(x.split("$")[-1])
)
nv_df["donation_amount"] = nv_df["donation_amount"].apply(
lambda x: "".join(x.split(".")[0]).replace(",", "")
)
# nv_df = nv_df[
# nv_df["full_name"].str.contains(r",|\.|\$|\&|\'|\d+", regex=True) is False
# ]
nv_df["full_address"] = nv_df["full_address"].str.replace(
r"[^A-Za-z0-9\s+]", "", regex=True
)
nv_df.drop_duplicates("full_name", inplace=True)
return nv_df
def philadelphia(candidate):
import requests
import pandas as pd
good_names = [
"donation_date",
"donation_amount",
"full_name",
"addr1",
"city",
"state",
"zip",
"full_address",
"first_name",
"middle_name",
"last_name",
"addr2",
"phone1",
"phone2",
"email1",
"email2",
]
response = requests.get(
f"https://cf-api.phila.gov/services/campaign-finance/search/searchtransactions?query=&limit=10000&ascending=1&page=1&byColumn=0&orderBy=transactionDate&searchString={candidate}&amount=&isSearchByFilingEntity=true&year=&cycleCode=&isSearchOnExpenditure=false"
)
j = response.json()
df = pd.json_normalize(j["transactionList"])
df = df.rename(
columns={
"transactionEntityName": "full_name",
"transactionEntityAddress": "full_address",
"EntityCity": "city",
"transactionDate": "donation_date",
"transactionAmount": "donation_amount",
}
)
for col in df.columns.values:
if col not in good_names:
df.drop(col, axis=1, inplace=True)
for col in good_names:
if col not in df.columns.values:
df[col] = ""
df["donation_date"] = df["donation_date"].astype(str).str[0:10]
return df
def virginia(input_xml):
import pandas as pd
from lxml import etree, objectify
metadata = input_xml
parser = etree.XMLParser(remove_blank_text=True)
tree = etree.parse(metadata, parser)
root = tree.getroot()
####
for elem in root.getiterator():
if not hasattr(elem.tag, "find"):
continue # guard for Comment tags
i = elem.tag.find("}")
if i >= 0:
elem.tag = elem.tag[i + 1 :]
objectify.deannotate(root, cleanup_namespaces=True)
####
tree.write(
f"{metadata}_cleaned.xml",
pretty_print=True,
xml_declaration=True,
encoding="UTF-8",
)
cleaned = f"{metadata}_cleaned.xml"
try:
df = pd.read_xml(cleaned, xpath="//LiA")
df = df.drop("Contributor", axis=1)
df_2 = pd.read_xml(cleaned, xpath="//Contributor")
df_3 = pd.read_xml(cleaned, xpath="//ScheduleA//LiA//Contributor//Address")
final_df = pd.concat(
[df, df_2.loc[:, ["FirstName", "LastName", "IsIndividual"]], df_3],
axis="columns",
)
final_df.columns = [col.lower() for col in final_df.columns]
return final_df
except ValueError:
raise Exception("No valid ScheduleA donations.")
def missouri(input_path):
import pandas as pd
with open(input_path) as html:
df = pd.read_html(html)[0]
return df
def kansas(input_path):
from bs4 import BeautifulSoup, SoupStrainer
import pandas as pd
import re
# Pre-compile regular expressions
match_patterns = {
"full_name": re.compile(r"lblContributor.*"),
"addr1": re.compile(r"lblAddress_.*"),
"addr2": re.compile(r"lblAddress2_.*"),
"city": re.compile(r"lblCity.*"),
"state": re.compile(r"lblState.*"),
"zip": re.compile(r"lblZip.*"),
"donation_date": re.compile(r"lblDate.*"),
"donation_amount": re.compile(r"lblAmount.*"),
}
strainer = SoupStrainer("span", attrs={"id": list(match_patterns.values())})
with open(input_path) as html:
soup = BeautifulSoup(html, "lxml", parse_only=strainer)
parsed_data = {key: [] for key in match_patterns}
for span in soup.find_all("span"):
matched = False
for key, pattern in match_patterns.items():
if pattern.match(span["id"]):
parsed_data[key].append(span.text)
matched = True
break
if not matched:
# Append None or some placeholder to all lists if the span doesn't match any pattern
for key in match_patterns:
parsed_data[key].append(None)
donors_df = pd.DataFrame(parsed_data)
# Vectorized operation to strip dollar sign
donors_df["donation_amount"] = donors_df["donation_amount"].str.lstrip("$")
return donors_df
def address_parse_attempt(address):
date_regexp = r"(\d+(/|-){1}\d+(/|-){1}\d{2,4})"
if address:
has_date = re.search(date_regexp, address)
if has_date:
after_date_address = address[has_date.end() + 1 :]
has_pobox = re.search("po box", after_date_address) or re.search(
"p o box", after_date_address
)
has_digit = re.search(r"\d", after_date_address)
if has_pobox:
final_address = after_date_address[has_pobox.start() :]
elif has_digit:
final_address = after_date_address[has_digit.start() :]
else:
final_address = "not_able_to_parse"
else:
has_pobox = re.search("po box", address) or re.search("p o box", address)
has_digit = re.search(r"\d", address)
if has_pobox:
final_address = address[has_pobox.start() :]
elif has_digit:
final_address = address[has_digit.start() :]
else:
final_address = None
return final_address
def parse_pobox_address(address):
if address:
try:
if "po box" in address or "p o box" in address:
parsed_address = usaddress.parse(address)
address_dict = {}
address_dict["address_line_1"] = (
"po box " + [x for x in parsed_address if x[1] == "USPSBoxID"][0][0]
)
address_dict["address_line_2"] = None
address_dict["city"] = " ".join(
[x[0] for x in parsed_address if x[1] == "PlaceName"]
)
address_dict["state"] = [
x for x in parsed_address if x[1] == "StateName"
][0][0]
address_dict["postal_code"] = [
x for x in parsed_address if x[1] == "ZipCode"
][0][0]
return dict(
(k.lower() if k else None, v.lower() if v else None)
for k, v in address_dict.items()
)
except:
return None
def normal_address(new_address):
if new_address:
try:
parsed_address = normalize_address_record(new_address)
return dict(
(k.lower() if k else None, v.lower() if v else None)
for k, v in parsed_address.items()
)
except:
return parse_pobox_address(new_address)