|
import argparse |
|
import concurrent.futures |
|
import csv |
|
import itertools |
|
import time |
|
|
|
import numpy as np |
|
import pandas as pd |
|
from more_itertools import chunked |
|
|
|
import marcai.processing.comparisons as comps |
|
import marcai.processing.normalizations as norms |
|
from marcai.utils.parsing import load_records, record_dict |
|
|
|
from multiprocessing import get_context |
|
|
|
|
|
def multiprocess_pairs( |
|
records_df, |
|
pair_indices, |
|
chunksize=50000, |
|
processes=1, |
|
): |
|
|
|
pairs_chunked = chunked(pair_indices, chunksize) |
|
|
|
|
|
max_jobs = processes * 2 |
|
|
|
context = get_context("fork") |
|
|
|
with concurrent.futures.ProcessPoolExecutor( |
|
max_workers=processes, mp_context=context |
|
) as executor: |
|
futures = set() |
|
done = set() |
|
first_spawn = True |
|
|
|
while futures or first_spawn: |
|
if first_spawn: |
|
spawn_count = max_jobs |
|
first_spawn = False |
|
else: |
|
|
|
done, futures = concurrent.futures.wait( |
|
futures, return_when=concurrent.futures.FIRST_COMPLETED |
|
) |
|
spawn_count = max_jobs - len(futures) |
|
|
|
for future in done: |
|
|
|
df = future.result() |
|
|
|
|
|
yield df |
|
|
|
|
|
for _ in range(spawn_count): |
|
pairs_chunk = next(pairs_chunked, None) |
|
|
|
if pairs_chunk is None: |
|
break |
|
|
|
indices = np.array(pairs_chunk).astype(int) |
|
|
|
left_indices = indices[:, 0] |
|
right_indices = indices[:, 1] |
|
|
|
left_records = records_df.iloc[left_indices].reset_index(drop=True) |
|
right_records = records_df.iloc[right_indices].reset_index(drop=True) |
|
|
|
futures.add(executor.submit(process, left_records, right_records)) |
|
|
|
|
|
def process(df0, df1): |
|
normalize_fields = [ |
|
"author_names", |
|
"corporate_names", |
|
"meeting_names", |
|
"publisher", |
|
"title", |
|
"title_a", |
|
"title_b", |
|
"title_c", |
|
"title_p", |
|
] |
|
|
|
|
|
for field in normalize_fields: |
|
df0[field] = norms.lowercase(df0[field]) |
|
df1[field] = norms.lowercase(df1[field]) |
|
|
|
df0[field] = norms.remove_punctuation(df0[field]) |
|
df1[field] = norms.remove_punctuation(df1[field]) |
|
|
|
df0[field] = norms.remove_diacritics(df0[field]) |
|
df1[field] = norms.remove_diacritics(df1[field]) |
|
|
|
df0[field] = norms.normalize_whitespace(df0[field]) |
|
df1[field] = norms.normalize_whitespace(df1[field]) |
|
|
|
|
|
result_df = pd.DataFrame() |
|
|
|
result_df["id_0"] = df0["id"] |
|
result_df["id_1"] = df1["id"] |
|
|
|
result_df["raw_tokenset"] = comps.token_set_similarity( |
|
df0["raw"], df1["raw"], null_value=0.5 |
|
) |
|
|
|
|
|
result_df["publisher"] = comps.token_sort_similarity( |
|
df0["publisher"], df1["publisher"], null_value=0.5 |
|
) |
|
|
|
author_names = comps.token_sort_similarity( |
|
df0["author_names"], df1["author_names"], null_value=np.nan |
|
) |
|
corporate_names = comps.token_sort_similarity( |
|
df0["corporate_names"], df1["corporate_names"], null_value=np.nan |
|
) |
|
meeting_names = comps.token_sort_similarity( |
|
df0["meeting_names"], df1["meeting_names"], null_value=np.nan |
|
) |
|
authors = pd.concat([author_names, corporate_names, meeting_names], axis=1) |
|
|
|
|
|
result_df["author"] = comps.maximum(authors, null_value=0.5) |
|
|
|
|
|
weights = { |
|
"title_a": 1, |
|
"raw": 0, |
|
"title_p": 1 |
|
} |
|
|
|
result_df["title_agg"] = comps.column_aggregate_similarity( |
|
df0[weights.keys()], df1[weights.keys()], weights.values(), null_value=0 |
|
) |
|
|
|
|
|
result_df["title_length"] = comps.length_similarity( |
|
df0["title"], df1["title"], null_value=0.5 |
|
) |
|
|
|
|
|
|
|
|
|
result_df["title_tokenset"] = comps.token_set_similarity( |
|
df0["title"], df1["title"], null_value=0 |
|
) |
|
|
|
|
|
result_df["title_tokensort"] = comps.token_sort_similarity( |
|
df0["title"], df1["title"], null_value=0 |
|
) |
|
|
|
|
|
result_df["title_levenshtein"] = comps.levenshtein_similarity( |
|
df0["title"], df1["title"], null_value=0 |
|
) |
|
|
|
|
|
result_df["title_jaro"] = comps.jaro_similarity( |
|
df0["title"], df1["title"], null_value=0 |
|
) |
|
|
|
|
|
result_df["title_jaro_winkler"] = comps.jaro_winkler_similarity( |
|
df0["title"], df1["title"], null_value=0 |
|
) |
|
|
|
|
|
result_df["pagination"] = comps.pagination_match( |
|
df0["pagination"], df1["pagination"], null_value=0.5 |
|
) |
|
|
|
|
|
result_df["pub_date"] = comps.year_similarity( |
|
df0["pub_date"], df1["pub_date"], null_value=0.5, exp_coeff=0.15 |
|
) |
|
|
|
|
|
result_df["pub_place"] = comps.equal( |
|
df0["pub_place"], df1["pub_place"], null_value=0.5 |
|
) |
|
|
|
|
|
result_df["cid"] = comps.equal(df0["cid"], df1["cid"], null_value=0.5) |
|
|
|
return result_df |
|
|
|
|
|
def parse_args(): |
|
parser = argparse.ArgumentParser( |
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter |
|
) |
|
|
|
required = parser.add_argument_group("required arguments") |
|
required.add_argument("-i", "--inputs", nargs="+", help="MARC files", required=True) |
|
required.add_argument("-o", "--output", help="Output file", required=True) |
|
|
|
parser.add_argument( |
|
"-C", |
|
"--chunksize", |
|
type=int, |
|
help="Number of comparisons per job", |
|
default=50000, |
|
) |
|
parser.add_argument( |
|
"-p", "--pair-indices", help="File containing indices of comparisons" |
|
) |
|
parser.add_argument( |
|
"-P", |
|
"--processes", |
|
type=int, |
|
help="Number of processes to run in parallel.", |
|
default=1, |
|
) |
|
|
|
return parser.parse_args() |
|
|
|
|
|
def main(): |
|
|
|
start = time.time() |
|
args = parse_args() |
|
|
|
|
|
print("Loading records...") |
|
records = [] |
|
for path in args.inputs: |
|
records.extend([record_dict(r) for r in load_records(path)]) |
|
|
|
records_df = pd.DataFrame(records) |
|
|
|
print(f"Loaded {len(records)} records.") |
|
|
|
print("Processing records...") |
|
|
|
written = False |
|
with open(args.pair_indices, "r") as indices_file: |
|
reader = csv.reader(indices_file) |
|
|
|
for df in multiprocess_pairs( |
|
records_df, reader, args.chunksize, args.processes |
|
): |
|
if not written: |
|
|
|
df.to_csv(args.output, mode="w", header=True, index=False) |
|
written = True |
|
else: |
|
|
|
df.to_csv(args.output, mode="a", header=False, index=False) |
|
|
|
end = time.time() |
|
print(f"Processed {len(records)} records.") |
|
print(f"Time elapsed: {end - start:.2f} seconds.") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|