Spaces:
Runtime error
Runtime error
import os | |
import re | |
import json | |
import itertools | |
import math | |
import joblib | |
from typing import List | |
import pandas as pd | |
from loguru import logger | |
def parse_json_garbage(s, start="{", end="}"): | |
"""Parse JSON string without comments | |
Argument | |
s: str | |
start: str | |
end: str | |
Return | |
json_obj: dict | |
""" | |
s = s[next(idx for idx, c in enumerate(s) if c in start):] | |
# print(f"fix head -> {s}") | |
s = s[:next(idx for idx, c in enumerate(s) if c in end)+1] | |
# print(f"fix tail -> {s}") | |
if s.startswith("json"): | |
s = s[4:] | |
try: | |
return json.loads(re.sub("[//#].*","",s,flags=re.MULTILINE)) | |
except json.JSONDecodeError as e: | |
logger.warning(f"Error parsing JSON (trying another regex...): {e}") | |
return json.loads(re.sub("^[//#].*","",s,flags=re.MULTILINE)) | |
def merge_results( results: list, dataframe_columns: list, list_columns: list): | |
""" | |
Argument | |
results: a list of dataframes | |
dataframe_columns: list | |
list_columns: list | |
Return | |
merged_results: dict | |
""" | |
assert len(results) > 0, "No results to merge" | |
merged_results = {} | |
for result in results: | |
for key in dataframe_columns: | |
mer_res = pd.concat([ r[key] for r in results], ignore_index=True) | |
merged_results[key] = mer_res | |
for key in list_columns: | |
mer_res = list(itertools.chain(*[ r[key] for r in results])) | |
merged_results[key] = mer_res | |
return merged_results | |
def split_dataframe( df: pd.DataFrame, n_processes: int = 4) -> list: | |
""" | |
""" | |
n = df.shape[0] | |
n_per_process = max( math.ceil(n / n_processes), 1) | |
return [ df.iloc[i:i+n_per_process] for i in range(0, n, n_per_process)] | |
def combine_results( results: pd.DataFrame, combined_results_path: str, src_column: str = 'classified_category', tgt_column: str = 'category', strategy: str = 'replace'): | |
""" | |
Argument | |
classified_results_df: dataframe | |
combined_results_path | |
src_column: str | |
strategy: str, 'replace' or 'patch' | |
Return | |
combined_results: dataframe | |
""" | |
if not os.path.exists(combined_results_path): | |
combined_results = results.copy() | |
if strategy == 'replace': | |
condition = (combined_results[tgt_column]=='') | (combined_results[src_column]!=combined_results[tgt_column]) | |
combined_results.loc[ condition, tgt_column] = combined_results[condition][src_column].values | |
elif strategy == 'patch': | |
condition = (combined_results[tgt_column]=='') | |
combined_results.loc[ condition, tgt_column] = combined_results[condition][src_column].values | |
else: | |
raise Exception(f"Strategy {strategy} not implemented") | |
with open( combined_results_path, "wb") as f: | |
joblib.dump( combined_results, f) | |
else: | |
with open( combined_results_path, "rb") as f: | |
combined_results = joblib.load(f) | |
return combined_results | |
def split_dict( information: dict | List[dict], keys1: List[str], keys2: List[str]): | |
"""[ { key1: value1, key2: value2}, { key1: value1, key2: value2}] -> [ {key1: value1}, {key1: value1}], [{key2: value2, key2: value2}] | |
Argument | |
information: dict | List[dict], dim -> N | |
keys1: List[str], dim -> K1 | |
keys2: List[str], dim -> K2 | |
Example: | |
>> split_dict( [ {"a": 1, "b":2, "c": 3}, {"a": 1, "b":2, "c": 3}, {"a": 1, "b":2, "c": 3}], ['a','b'], ['c']) | |
>> ( [{'a': 1, 'b': 2}, {'a': 1, 'b': 2}, {'a': 1, 'b': 2}], [{'c': 3}, {'c': 3}, {'c': 3}] ) | |
""" | |
assert len(keys1)>0 and len(keys2)>0 | |
results1, results2 = [], [] | |
if isinstance( information, dict): | |
information = [ information] | |
for info in information: # N | |
split_results1 = {} # K1 | |
for key in keys1: | |
if key in info: | |
split_results1[key] = info[key] | |
else: | |
split_results1[key] = None | |
results1.append( split_results1) | |
split_results2 = {} # K2 | |
for key in keys2: | |
if key in info: | |
split_results2[key] = info[key] | |
else: | |
split_results2[key] = None | |
results2.append( split_results2) | |
# results.append( [ split_results1, split_results2]) | |
assert len(results1)==len(results2) | |
if len(results1)==1: | |
return results1[0], results2[0] | |
return results1, results2 | |
def format_df( df: pd.DataFrame, input_column: str = 'evidence', output_column: str = 'formatted_evidence', format_func: str = lambda x: x): | |
""" | |
Argument | |
df: `evidence`, `result` | |
input_column: | |
output_column: | |
format_func: | |
Return | |
formatted_df: dataframe of `formatted_evidence` | |
""" | |
formatted_df = df.copy() | |
formatted_df[output_column] = formatted_df[input_column].apply(format_func) | |
return formatted_df | |
def clean_quotes( text: str): | |
""" | |
""" | |
return text.strip().replace("\u3000","").replace("\r","").replace("\"", "").replace("'", "") | |
def compose_query( address, name, with_index: bool = True, exclude: str = "-inurl:twincn.com -inurl:findcompany.com.tw -inurl:iyp.com.tw -inurl:twypage.com -inurl:alltwcompany.com -inurl:zhupiter.com -inurl:twinc.com.tw", use_exclude: bool = True): | |
""" | |
Argumemnt | |
# d: series with d[1]: 地址, d[4]: 營業人名稱 # | |
address: str | |
name: str | |
with_index: bool | |
Return | |
query: `縣市` `營業人名稱` | |
""" | |
# if with_index: # .itertuples() | |
# query = f"{d[1][:3]} {d[4]}" | |
# else: | |
# query = f"{d[0][:3]} {d[3]}" | |
if use_exclude: | |
query = f"{address[:3]} {name} {exclude}" | |
else: | |
query = f"{address[:3]} {name}" | |
return query | |
def reverse_category2supercategory(category2supercategory): | |
""" | |
Argument | |
category2supercategory: dict | |
Return | |
supercategory2category: dict | |
""" | |
supercategory2category = {} | |
for key, value in category2supercategory.items(): | |
if value not in supercategory2category: | |
supercategory2category[value] = [key] | |
else: | |
supercategory2category[value].append(key) | |
return supercategory2category | |
def concat_df( list_df: List[pd.DataFrame], axis: int = 0): | |
""" | |
Argument | |
list_df: List[pd.DataFrame] | |
axis: int | |
Return | |
df: pd.DataFrame | |
""" | |
assert len(list_df)>0, "Empty list of dataframes" | |
if len(list_df)==1: | |
return list_df[0] | |
return pd.concat( list_df, axis=axis) | |