Spaces:
Running
Running
import re | |
import usaddress | |
import pandas as pd | |
from scourgify import normalize_address_record | |
### need to refactor these into a special class | |
def nevada(input_path): | |
table_id = "ctl04_mobjContributions_dgContributions" | |
nv_df = pd.read_html(input_path, attrs={"id": table_id}, header=0)[0] | |
# try: | |
nv_df["full_name"] = nv_df[ | |
"NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO MADE CONTRIBUTION" | |
].apply(lambda x: " ".join(x.split()[:2])) | |
# except AttributeError: | |
# raise RuntimeError("File had no valid data when attempting to read. Please check that you are not passing an empty form!") | |
nv_df["full_address"] = nv_df[ | |
"NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO MADE CONTRIBUTION" | |
].apply(lambda x: " ".join(x.split()[2:]).lower()) | |
nv_df = nv_df.rename( | |
columns={ | |
"AMOUNT OF CONTRIBUTION": "donation_amount", | |
"DATE OF CONTRIBUTION": "donation_date", | |
} | |
) | |
print(nv_df.columns) | |
nv_df.drop( | |
columns=[ | |
"NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO MADE CONTRIBUTION", | |
"CHECK HERE IF LOAN", | |
"NAME AND ADDRESS OF 3rd PARTY IF LOAN GUARANTEED BY 3rd PARTY", | |
"NAME AND ADDRESS OF PERSON, GROUP OR ORGANIZATION WHO FORGAVE THE LOAN, IF DIFFERENT THAN CONTRIBUTOR", | |
], | |
inplace=True, | |
) | |
nv_df["donation_date"] = nv_df["donation_date"].apply(lambda x: "".join(x[:10])) | |
nv_df["donation_amount"] = nv_df["donation_amount"].apply( | |
lambda x: "".join(x.split("$")[-1]) | |
) | |
nv_df["donation_amount"] = nv_df["donation_amount"].apply( | |
lambda x: "".join(x.split(".")[0]).replace(",", "") | |
) | |
# nv_df = nv_df[ | |
# nv_df["full_name"].str.contains(r",|\.|\$|\&|\'|\d+", regex=True) is False | |
# ] | |
nv_df["full_address"] = nv_df["full_address"].str.replace( | |
r"[^A-Za-z0-9\s+]", "", regex=True | |
) | |
nv_df.drop_duplicates("full_name", inplace=True) | |
return nv_df | |
def philadelphia(candidate): | |
import requests | |
import pandas as pd | |
good_names = [ | |
"donation_date", | |
"donation_amount", | |
"full_name", | |
"addr1", | |
"city", | |
"state", | |
"zip", | |
"full_address", | |
"first_name", | |
"middle_name", | |
"last_name", | |
"addr2", | |
"phone1", | |
"phone2", | |
"email1", | |
"email2", | |
] | |
response = requests.get( | |
f"https://cf-api.phila.gov/services/campaign-finance/search/searchtransactions?query=&limit=10000&ascending=1&page=1&byColumn=0&orderBy=transactionDate&searchString={candidate}&amount=&isSearchByFilingEntity=true&year=&cycleCode=&isSearchOnExpenditure=false" | |
) | |
j = response.json() | |
df = pd.json_normalize(j["transactionList"]) | |
df = df.rename( | |
columns={ | |
"transactionEntityName": "full_name", | |
"transactionEntityAddress": "full_address", | |
"EntityCity": "city", | |
"transactionDate": "donation_date", | |
"transactionAmount": "donation_amount", | |
} | |
) | |
for col in df.columns.values: | |
if col not in good_names: | |
df.drop(col, axis=1, inplace=True) | |
for col in good_names: | |
if col not in df.columns.values: | |
df[col] = "" | |
df["donation_date"] = df["donation_date"].astype(str).str[0:10] | |
return df | |
def virginia(input_xml): | |
import pandas as pd | |
from lxml import etree, objectify | |
metadata = input_xml | |
parser = etree.XMLParser(remove_blank_text=True) | |
tree = etree.parse(metadata, parser) | |
root = tree.getroot() | |
#### | |
for elem in root.getiterator(): | |
if not hasattr(elem.tag, "find"): | |
continue # guard for Comment tags | |
i = elem.tag.find("}") | |
if i >= 0: | |
elem.tag = elem.tag[i + 1 :] | |
objectify.deannotate(root, cleanup_namespaces=True) | |
#### | |
tree.write( | |
f"{metadata}_cleaned.xml", | |
pretty_print=True, | |
xml_declaration=True, | |
encoding="UTF-8", | |
) | |
cleaned = f"{metadata}_cleaned.xml" | |
try: | |
df = pd.read_xml(cleaned, xpath="//LiA") | |
df = df.drop("Contributor", axis=1) | |
df_2 = pd.read_xml(cleaned, xpath="//Contributor") | |
df_3 = pd.read_xml(cleaned, xpath="//ScheduleA//LiA//Contributor//Address") | |
final_df = pd.concat( | |
[df, df_2.loc[:, ["FirstName", "LastName", "IsIndividual"]], df_3], | |
axis="columns", | |
) | |
final_df.columns = [col.lower() for col in final_df.columns] | |
return final_df | |
except ValueError: | |
raise Exception("No valid ScheduleA donations.") | |
def missouri(input_path): | |
import pandas as pd | |
with open(input_path) as html: | |
df = pd.read_html(html)[0] | |
return df | |
def kansas(input_path): | |
from bs4 import BeautifulSoup, SoupStrainer | |
import pandas as pd | |
import re | |
# Pre-compile regular expressions | |
match_patterns = { | |
"full_name": re.compile(r"lblContributor.*"), | |
"addr1": re.compile(r"lblAddress_.*"), | |
"addr2": re.compile(r"lblAddress2_.*"), | |
"city": re.compile(r"lblCity.*"), | |
"state": re.compile(r"lblState.*"), | |
"zip": re.compile(r"lblZip.*"), | |
"donation_date": re.compile(r"lblDate.*"), | |
"donation_amount": re.compile(r"lblAmount.*"), | |
} | |
strainer = SoupStrainer("span", attrs={"id": list(match_patterns.values())}) | |
with open(input_path) as html: | |
soup = BeautifulSoup(html, "lxml", parse_only=strainer) | |
parsed_data = {key: [] for key in match_patterns} | |
for span in soup.find_all("span"): | |
matched = False | |
for key, pattern in match_patterns.items(): | |
if pattern.match(span["id"]): | |
parsed_data[key].append(span.text) | |
matched = True | |
break | |
if not matched: | |
# Append None or some placeholder to all lists if the span doesn't match any pattern | |
for key in match_patterns: | |
parsed_data[key].append(None) | |
donors_df = pd.DataFrame(parsed_data) | |
# Vectorized operation to strip dollar sign | |
donors_df["donation_amount"] = donors_df["donation_amount"].str.lstrip("$") | |
return donors_df | |
def address_parse_attempt(address): | |
date_regexp = r"(\d+(/|-){1}\d+(/|-){1}\d{2,4})" | |
if address: | |
has_date = re.search(date_regexp, address) | |
if has_date: | |
after_date_address = address[has_date.end() + 1 :] | |
has_pobox = re.search("po box", after_date_address) or re.search( | |
"p o box", after_date_address | |
) | |
has_digit = re.search(r"\d", after_date_address) | |
if has_pobox: | |
final_address = after_date_address[has_pobox.start() :] | |
elif has_digit: | |
final_address = after_date_address[has_digit.start() :] | |
else: | |
final_address = "not_able_to_parse" | |
else: | |
has_pobox = re.search("po box", address) or re.search("p o box", address) | |
has_digit = re.search(r"\d", address) | |
if has_pobox: | |
final_address = address[has_pobox.start() :] | |
elif has_digit: | |
final_address = address[has_digit.start() :] | |
else: | |
final_address = None | |
return final_address | |
def parse_pobox_address(address): | |
if address: | |
try: | |
if "po box" in address or "p o box" in address: | |
parsed_address = usaddress.parse(address) | |
address_dict = {} | |
address_dict["address_line_1"] = ( | |
"po box " + [x for x in parsed_address if x[1] == "USPSBoxID"][0][0] | |
) | |
address_dict["address_line_2"] = None | |
address_dict["city"] = " ".join( | |
[x[0] for x in parsed_address if x[1] == "PlaceName"] | |
) | |
address_dict["state"] = [ | |
x for x in parsed_address if x[1] == "StateName" | |
][0][0] | |
address_dict["postal_code"] = [ | |
x for x in parsed_address if x[1] == "ZipCode" | |
][0][0] | |
return dict( | |
(k.lower() if k else None, v.lower() if v else None) | |
for k, v in address_dict.items() | |
) | |
except: | |
return None | |
def normal_address(new_address): | |
if new_address: | |
try: | |
parsed_address = normalize_address_record(new_address) | |
return dict( | |
(k.lower() if k else None, v.lower() if v else None) | |
for k, v in parsed_address.items() | |
) | |
except: | |
return parse_pobox_address(new_address) | |