Spaces:
Build error
Build error
import io | |
import json | |
import os | |
import sys | |
import argparse | |
import re | |
import tarfile | |
from collections import defaultdict | |
import dataclasses | |
from datetime import datetime | |
from typing import Any, Dict, List, Tuple, Optional | |
import pandas as pd | |
import spacy | |
from nltk.corpus import framenet as fn | |
from nltk.corpus.reader.framenet import FramenetError | |
from spacy.tokens import Token | |
from sociofillmore.crashes.utils import is_a_dutch_text | |
ITALIAN_ACTIVE_AUX = ["avere", "ha", "ho", "hai", "avete", "hanno", "abbiamo"] | |
DUTCH_ACTIVE_AUX = ["heb", "hebben", "heeft"] | |
active_frames_df = pd.read_csv("resources/active_frames_full.csv") | |
ACTIVE_FRAMES = active_frames_df[active_frames_df["active"]]["frame"].tolist() | |
IGNORE_DEP_LABELS = ["punct"] | |
DEEP_FRAMES = [ | |
"Transitive_action", | |
"Causation", | |
"Transition_to_a_state", | |
"Event", | |
"State", | |
] | |
# SYNTAX_ANALYSIS_CACHE_FILES = { | |
# "femicides/rai": "resources/rai_syntax_analysis_cache.json", | |
# "femicides/rai_main": "resources/rai_main_syntax_analysis_cache.json", | |
# "femicides/olv": "resources/olv_syntax_analysis_cache.json", | |
# "crashes/thecrashes": "resources/thecrashes_syntax_analysis_cache.json", | |
# "migration/pavia": "resources/migration_pavia_syntax_analysis_cache.json" | |
# } | |
SYNTAX_ANALYSIS_CACHE_FILES = { | |
"femicides/rai": "output/femicides/syntax_cache/rai_ALL", | |
"femicides/rai_main": "output/femicides/syntax_cache/rai_main", | |
"femicides/rai_ALL": "output/femicides/syntax_cache/rai_ALL", | |
"femicides/olv": "output/femicides/syntax_cache/olv", | |
"crashes/thecrashes": "output/crashes/syntax_cache/thecrashes", | |
"migration/pavia": "output/migration/syntax_cache/pavia", | |
} | |
DEEP_FRAMES_CACHE_FILE = "resources/deep_frame_cache.json" | |
DEP_LABEL_CACHE_FILE = "resources/dep_labels.txt" | |
POSSIBLE_CONSTRUCTIONS = [ | |
"nonverbal", | |
"verbal:active", | |
"verbal:impersonal", | |
"verbal:reflexive", | |
"verbal:passive", | |
"verbal:unaccusative", | |
"other", | |
] | |
def load_deep_frames_cache(): | |
if os.path.isfile(DEEP_FRAMES_CACHE_FILE): | |
print("Loading deep frame cache...") | |
with open(DEEP_FRAMES_CACHE_FILE, encoding="utf-8") as f: | |
deep_frames_cache = json.load(f) | |
else: | |
deep_frames_cache = {} | |
return deep_frames_cache | |
# make spacy work with google app engine | |
# (see https://stackoverflow.com/questions/55228492/spacy-on-gae-standard-second-python-exceeds-memory-of-largest-instance) | |
# nlp = spacy.load("it_core_news_md") | |
nlp = None | |
class AnnotationSpan: | |
tokens_idx: List[int] | |
tokens_str: List[str] | |
class FrameStructure: | |
frame: str | |
deep_frame: str | |
target: Optional[AnnotationSpan] | |
roles: List[Tuple[str, AnnotationSpan]] | |
deep_roles: List[Tuple[str, AnnotationSpan]] | |
def make_syntax_cache(dataset, skip_fn=None): | |
print(f"make_syntax_cache({dataset})") | |
if dataset == "femicides/rai": | |
corpus_tarball = "output/femicides/lome/lome_0shot/multilabel_rai_ALL_blocks" | |
corpus = "rai" | |
spacy_model = "it_core_news_md" | |
elif dataset == "femicides/rai_main": | |
corpus_tarball = "output/femicides/lome/lome_0shot/multilabel_rai_main_blocks" | |
corpus = "rai_main" | |
spacy_model = "it_core_news_md" | |
elif dataset == "femicides/rai_ALL": | |
corpus_tarball = "output/femicides/lome/lome_0shot/multilabel_rai_ALL_blocks" | |
corpus = "rai_ALL" | |
spacy_model = "it_core_news_md" | |
elif dataset == "femicides/olv": | |
corpus_tarball = "output/femicides/lome/lome_0shot/multilabel_olv_blocks" | |
corpus = "olv" | |
spacy_model = "it_core_news_md" | |
elif dataset == "crashes/thecrashes": | |
corpus_tarball = "output/crashes/lome/lome_0shot/multilabel_thecrashes_blocks" | |
corpus = "thecrashes" | |
spacy_model = "nl_core_news_md" | |
elif dataset == "migration/pavia": | |
corpus_tarball = "output/migration/lome/lome_0shot/multilabel_pavia_blocks" | |
# corpus_tarball = "output/migration/lome/lome_zs-tgt_ev-frm/multilabel_pavia.tar.gz" | |
corpus = "pavia" | |
spacy_model = "it_core_news_md" | |
else: | |
raise ValueError("Unsupported dataset!") | |
print("params:") | |
print(f"\tcorpus_tarball: {corpus_tarball}") | |
print(f"\tcorpus: {corpus}") | |
print(f"\tspacy: {spacy_model}") | |
print("processing files...") | |
for block in os.listdir(corpus_tarball): | |
print(block) | |
with tarfile.open(os.path.join(corpus_tarball, block)) as tar_in: | |
# check if output tarball exists | |
cache_location = SYNTAX_ANALYSIS_CACHE_FILES[dataset] | |
if not os.path.isdir(cache_location): | |
os.makedirs(cache_location) | |
lome_files = [f for f in tar_in.getmembers( | |
) if f.name.endswith(".comm.json")] | |
lome_files.sort(key=lambda file: file.name) | |
for file in lome_files: | |
print(f"\tprocessing file {file}") | |
doc_id = re.search(r"lome_(\d+)\.comm\.json", file.name).group(1) | |
skipped = False | |
if skip_fn is not None: | |
if skip_fn(doc_id): | |
print(f"\t\tskip_fn: skipping file {file}") | |
skipped = True | |
if skipped: | |
syntax_analyses = None | |
else: | |
file_obj = io.TextIOWrapper(tar_in.extractfile(file)) | |
annotations = json.load(file_obj) | |
syntax_analyses = [] | |
for sentence in annotations: | |
syntax_analyses.append( | |
syntax_analyze(sentence, spacy_model)) | |
# use last two chars of filename as key | |
file_key = doc_id[:2] | |
cache_file = f"{cache_location}/{file_key}.json" | |
if os.path.isfile(cache_file): | |
with open(cache_file, encoding="utf-8") as f: | |
key_cache = json.load(f) | |
else: | |
key_cache = {} | |
key_cache[doc_id] = syntax_analyses | |
with open(cache_file, "w", encoding="utf-8") as f: | |
json.dump(key_cache, f) | |
def make_syntax_cache_key(filename): | |
doc_id = re.search(r"/\d+/lome_(\d+)\.comm\.json", filename).group(1) | |
return doc_id | |
def clean_sentence_(sentence): | |
idx_to_remove = [] | |
for i, tok in enumerate(sentence["tokens"]): | |
# remove whitespace tokens | |
if not tok.strip(): | |
idx_to_remove.append(i) | |
idx_to_remove.reverse() | |
for idx in idx_to_remove: | |
for annotation_list in sentence.values(): | |
annotation_list.pop(idx) | |
def process_prediction_file( | |
filename: str, | |
dataset_name: str, | |
syntax_cache: str, | |
deep_frames_cache: dict, | |
tmp_cache: Optional[dict] = None, | |
file_obj: io.TextIOBase = None, | |
syntax_cache_key: Optional[str] = None, | |
deep_frames_list: Optional[List[str]] = None, | |
spacy_model: str = "it_core_news_md", | |
spacy_model_obj = None | |
) -> Tuple[List, ...]: | |
""" | |
Process a predictions JSON file | |
:param filename: path to the JSON file | |
:param syntax_cache: see `make_syntax_cache()` | |
:param spacy model: spacy model to be used for syntactic analysis | |
:param file_obj: already opened object corresponding to `filename`. If given, `file_obj` will be used instead | |
of loading it from `filename`. This is useful when reading the entire corpus from a tarball (which is what the | |
SocioFillmore webapp does) | |
:return: | |
""" | |
print("Processing", filename) | |
if file_obj is not None: | |
annotations = json.load(file_obj) | |
else: | |
with open(filename, encoding="utf-8") as f: | |
annotations = json.load(f) | |
if syntax_cache is None: | |
syntax_analyses = [] | |
for sentence in annotations: | |
syntax_analyses.append(syntax_analyze(sentence, spacy_model, spacy_model_obj)) | |
else: | |
if syntax_cache_key is None: | |
syntax_cache_key = make_syntax_cache_key(filename) | |
if tmp_cache is not None and syntax_cache_key in tmp_cache: | |
syntax_analyses = tmp_cache[syntax_cache_key] | |
else: | |
with open(f"{syntax_cache}/{syntax_cache_key[:2]}.json", encoding="utf-8") as cache_file: | |
grouped_analyses = json.load(cache_file) | |
syntax_analyses = grouped_analyses[syntax_cache_key] | |
if tmp_cache is not None: | |
tmp_cache.clear() | |
tmp_cache.update(grouped_analyses) | |
fn_structures: List[Dict[int, FrameStructure]] = [] | |
sentences: List[List[str]] = [] | |
role_analyses: List[Dict[int, Dict[str, str]]] = [] | |
for sent_idx, sentence in enumerate(annotations): | |
clean_sentence_(sentence) | |
try: | |
sent_structures = process_fn_sentence( | |
sentence, deep_frames_cache, deep_frames_list=deep_frames_list | |
) | |
# seems to occur for one specific file in the migration set, TODO find out what happens | |
except AttributeError: | |
print("Error processing FN annotations") | |
sent_structures = {} | |
syntax = syntax_analyses[sent_idx] | |
# disambiguate syntactic constructions | |
for fs in sent_structures.values(): | |
target_idx = str(fs.target.tokens_idx[0]) | |
if target_idx not in syntax: | |
print( | |
f"Prediction file {filename}: Cannot find syntactic information for target at idx={target_idx}") | |
continue | |
fs_syn = syntax[target_idx][-1] | |
disambiguate_cxs_(fs, fs_syn) | |
roles = process_syn_sem_roles(sent_structures, syntax) | |
role_analyses.append(roles) | |
sentences.append(sentence["tokens"]) | |
fn_structures.append(sent_structures) | |
return sentences, fn_structures, syntax_analyses, role_analyses | |
def disambiguate_cxs_(struct: FrameStructure, tgt_syntax): | |
# no "_" at the beginning: no disambiguation needed | |
cx = tgt_syntax["syn_construction"] | |
if not cx.startswith("_"): | |
return | |
# print(struct.frame, struct.deep_frame) | |
# NB works only for the selected relevant frames! if any other frames are added, make sure to update this | |
if struct.deep_frame in ["Transitive_action", "Causation", "Emotion_directed", "Quarreling", "Impact", "Committing_crime"]: | |
frame_agentivity_type = "active" | |
elif struct.frame in ACTIVE_FRAMES: | |
frame_agentivity_type = "active" | |
elif struct.frame == "Event": | |
frame_agentivity_type = "impersonal" | |
else: | |
frame_agentivity_type = "unaccusative" | |
if cx == "_verbal:ACTIVE": | |
new_cx = f"verbal:{frame_agentivity_type}" | |
elif cx in ["_verbal:ADPOS", "_verbal:OTH_PART"]: | |
if frame_agentivity_type == "active": | |
new_cx = "verbal:passive" | |
else: | |
new_cx = f"verbal:{frame_agentivity_type}" | |
else: | |
raise ValueError(f"Unknown construction placeholder {cx}") | |
tgt_syntax["syn_construction"] = new_cx | |
def find_governed_roles( | |
syn_self: Dict[str, Any], | |
syn_children: List[Dict[str, Any]], | |
roles: List[Tuple[str, AnnotationSpan]], | |
) -> Dict[str, str]: | |
roles_found = {} | |
# find roles that are governed by the predicate | |
for node in [syn_self] + syn_children: | |
for role_name, role_span in roles: | |
if node["lome_idx"] in role_span.tokens_idx: | |
dep_label = node["dependency"] | |
if role_name not in roles_found and dep_label not in IGNORE_DEP_LABELS: | |
if node == syn_self: | |
roles_found[role_name] = None | |
else: | |
roles_found[role_name] = dep_label + "↓" | |
return roles_found | |
def analyze_role_dependencies( | |
fn_struct, | |
syntax, | |
role_analysis=None, | |
tgt_idx=None, | |
min_depth=-10, | |
max_depth=10, | |
depth=0, | |
label_prefix="", | |
): | |
if role_analysis is None: | |
role_analysis = {} | |
if tgt_idx is None: | |
tgt_idx = fn_struct.target.tokens_idx[0] | |
if depth > max_depth: | |
return role_analysis | |
if depth < min_depth: | |
return role_analysis | |
new_analysis = {} | |
new_analysis.update(role_analysis) | |
token_syntax = syntax[str(tgt_idx)][0] | |
def update_analysis(mapping): | |
for role, dep in mapping.items(): | |
if role not in new_analysis: | |
if label_prefix: | |
if dep is None: | |
label = label_prefix | |
depth_label = depth | |
else: | |
label = label_prefix + "--" + dep | |
depth_label = depth + 1 if depth > 0 else depth - 1 | |
else: | |
if dep is None: | |
label = "⋆" | |
depth_label = depth | |
else: | |
label = dep | |
depth_label = depth + 1 if depth > 0 else depth - 1 | |
new_analysis[role] = label, depth_label | |
update_analysis( | |
find_governed_roles( | |
token_syntax, token_syntax["children"], fn_struct.roles) | |
) | |
# from the initial predicate: first try the children | |
if depth <= 0: | |
for child in token_syntax["children"]: | |
child_analysis = analyze_role_dependencies( | |
fn_struct, | |
syntax, | |
role_analysis=new_analysis, | |
tgt_idx=child["lome_idx"], | |
max_depth=max_depth, | |
min_depth=min_depth, | |
depth=depth - 1, | |
label_prefix=child["dependency"] + "↓" | |
) | |
new_analysis.update(child_analysis) | |
# ... then try the ancestors | |
if depth >= 0: | |
if not token_syntax["ancestors"]: | |
return new_analysis | |
first_ancestor = token_syntax["ancestors"][0] | |
return analyze_role_dependencies( | |
fn_struct, | |
syntax, | |
role_analysis=new_analysis, | |
tgt_idx=first_ancestor["lome_idx"], | |
max_depth=max_depth, | |
min_depth=min_depth, | |
depth=depth + 1, | |
label_prefix=token_syntax["dependency"] + "↑", | |
) | |
else: | |
return new_analysis | |
def process_syn_sem_roles( | |
sent_structures: Dict[int, FrameStructure], syntax: Dict[str, List[Dict[str, Any]]] | |
) -> Dict[int, Dict[str, str]]: | |
analyses = defaultdict(dict) | |
# go through all frame targets | |
for struct in sent_structures.values(): | |
tgt_idx = struct.target.tokens_idx[0] | |
role_deps = analyze_role_dependencies(struct, syntax, max_depth=10) | |
analyses[tgt_idx] = clean_role_deps(role_deps) | |
return analyses | |
def clean_role_deps(role_deps): | |
res = {} | |
for role, (dep_str, depth) in role_deps.items(): | |
dep_parts = dep_str.split("--") | |
if len(dep_parts) == 1: | |
res[role] = dep_str, depth | |
else: | |
res[role] = "--".join([dp[-1] | |
for dp in dep_parts[:-1]] + [dep_parts[-1]]), depth | |
return res | |
def map_or_lookup_deep_frame( | |
frame: str, deep_frames_cache, save_modified_cache=False, deep_frames_list=None | |
) -> Tuple[str, Dict[str, str]]: | |
if frame in deep_frames_cache: | |
return deep_frames_cache[frame] | |
else: | |
deep_frame, mapping = map_to_deep_frame( | |
frame, deep_frames_list=deep_frames_list | |
) | |
deep_frames_cache[frame] = [deep_frame, mapping] | |
if save_modified_cache: | |
with open(DEEP_FRAMES_CACHE_FILE, "w", encoding="utf-8") as f: | |
json.dump(deep_frames_cache, f) | |
return deep_frames_cache[frame] | |
def map_to_deep_frame( | |
frame: str, | |
target: Optional[str] = None, | |
mapping: Optional[Dict[str, str]] = None, | |
self_mapping: Optional[Dict[str, str]] = None, | |
deep_frames_list: Optional[List[str]] = None, | |
) -> Tuple[str, Dict[str, str]]: | |
if deep_frames_list is None: | |
deep_frames_list = DEEP_FRAMES | |
# look up in FrameNet | |
try: | |
fn_entry = fn.frame(frame) | |
except FramenetError: | |
return frame, {} | |
except LookupError: | |
return frame, {} | |
# initial call: `target` == `frame`, mapping maps to self | |
if target is None: | |
target = frame | |
if mapping is None or self_mapping is None: | |
mapping = self_mapping = {role: role for role in fn_entry.FE.keys()} | |
# base case: our frame is a deep frame | |
if frame in deep_frames_list: | |
return frame, mapping | |
# otherwise, look at parents | |
inh_relations = [ | |
fr | |
for fr in fn_entry.frameRelations | |
if fr.type.name == "Inheritance" and fr.Child == fn_entry | |
] | |
parents = [fr.Parent for fr in inh_relations] | |
# no parents --> failure, return original frame | |
if not inh_relations: | |
return target, self_mapping | |
# one parent: follow that parent | |
if len(inh_relations) == 1: | |
parent_rel = inh_relations[0] | |
parent = parents[0] | |
new_mapping = define_fe_mapping(mapping, parent_rel) | |
return map_to_deep_frame( | |
parent.name, target, new_mapping, self_mapping, deep_frames_list | |
) | |
# more parents: check if any of them leads to a deep frame | |
deep_frames = [] | |
deep_mappings = [] | |
for parent_rel, parent in zip(inh_relations, parents): | |
new_mapping = define_fe_mapping(mapping, parent_rel) | |
final_frame, final_mapping = map_to_deep_frame( | |
parent.name, target, new_mapping, self_mapping, deep_frames_list | |
) | |
if final_frame in deep_frames_list: | |
deep_frames.append(final_frame) | |
deep_mappings.append(final_mapping) | |
for deep_frame in deep_frames_list: | |
if deep_frame in deep_frames: | |
idx = deep_frames.index(deep_frame) | |
return deep_frame, deep_mappings[idx] | |
# nothing found, return original frame | |
return target, self_mapping | |
def define_fe_mapping(mapping, parent_rel): | |
child_to_parent_mapping = { | |
fer.subFEName: fer.superFEName for fer in parent_rel.feRelations | |
} | |
target_to_parent_mapping = { | |
role: child_to_parent_mapping[mapping[role]] | |
for role in mapping | |
if mapping[role] in child_to_parent_mapping | |
} | |
return target_to_parent_mapping | |
def is_at_root(syntax_info): | |
# you should either be the actual root... | |
if syntax_info["dependency"] == "ROOT": | |
return True | |
# ... or be the subject of the root | |
if syntax_info["dependency"] == "nsubj" and syntax_info["ancestors"][0]["dependency"] == "ROOT": | |
return True | |
return False | |
def get_tarball_blocks(dataset, lome_model="lome_0shot"): | |
if dataset == "femicides/rai": | |
return f"output/femicides/lome/{lome_model}/multilabel_rai_ALL_blocks" | |
if dataset == "femicides/rai_main": | |
return f"output/femicides/lome/{lome_model}/multilabel_rai_main_blocks" | |
elif dataset == "femicides/olv": | |
return f"output/femicides/lome/{lome_model}/multilabel_olv_blocks" | |
elif dataset == "crashes/thecrashes": | |
return f"output/crashes/lome/{lome_model}/multilabel_thecrashes_blocks" | |
elif dataset == "migration/pavia": | |
return f"output/migration/lome/{lome_model}/multilabel_pavia_blocks" | |
else: | |
raise ValueError("Unsupported dataset!") | |
def analyze_single_document(doc_id, event_id, lome_model, dataset, texts_df, deep_frames_cache): | |
data_domain, data_corpus = dataset.split("/") | |
syntax_cache = SYNTAX_ANALYSIS_CACHE_FILES[dataset] | |
print(dataset) | |
if dataset == "migration/pavia": # this is a hack, fix it! | |
pred_file_path = f"output/migration/lome/multilabel/{lome_model}/pavia/{event_id}/lome_{doc_id}.comm.json" | |
elif dataset == "femicides/olv": | |
pred_file_path = f"output/femicides/lome/lome_0shot/multilabel/olv/{event_id}/lome_{doc_id}.comm.json" | |
elif dataset == "femicides/rai": | |
pred_file_path = f"output/{data_domain}/lome/lome_0shot/multilabel/rai_ALL/{event_id}/lome_{doc_id}.comm.json" | |
else: | |
pred_file_path = f"output/{data_domain}/lome/lome_0shot/multilabel/{data_corpus}/{event_id}/lome_{doc_id}.comm.json" | |
print(f"Analyzing file {pred_file_path}") | |
doc_id = os.path.basename(pred_file_path).split(".")[0].split("_")[1] | |
doc_key = doc_id[:2] | |
tarball = get_tarball_blocks(dataset, lome_model) + f"/block_{doc_key}.tar" | |
with tarfile.open(tarball, "r") as tar_f: | |
pred_file = io.TextIOWrapper(tar_f.extractfile(pred_file_path)) | |
( | |
sents, | |
pred_structures, | |
syntax_analyses, | |
role_analyses, | |
) = process_prediction_file( | |
filename=pred_file_path, | |
dataset_name=dataset, | |
file_obj=pred_file, | |
syntax_cache=syntax_cache, | |
deep_frames_cache=deep_frames_cache | |
) | |
output = [] | |
for sent, structs, syntax, roles in zip( | |
sents, pred_structures, syntax_analyses, role_analyses | |
): | |
output.append( | |
{ | |
"sentence": sent, | |
"fn_structures": [ | |
dataclasses.asdict(fs) for fs in structs.values() | |
], | |
"syntax": syntax, | |
"roles": roles, | |
"meta": { | |
"event_id": event_id, | |
"doc_id": doc_id, | |
"text_meta": get_text_meta(doc_id, texts_df), | |
}, | |
} | |
) | |
return output | |
def get_text_meta(doc_id, texts_df): | |
row = texts_df[texts_df["text_id"] == int(doc_id)].iloc[0] | |
if "pubdate" in row: | |
pubdate = row["pubdate"] if not pd.isna(row["pubdate"]) else None | |
elif "pubyear" in row: | |
pubdate = int(row["pubyear"]) | |
else: | |
pubdate = None | |
return { | |
"url": row["url"] if "url" in row else None, | |
"pubdate": pubdate, | |
"provider": row["provider"], | |
"title": row["title"] if not pd.isna(row["title"]) else None, | |
"days_after_event": int(row["days_after_event"]) if "days_after_event" in row and not pd.isna(row["days_after_event"]) else 0 | |
} | |
def process_fn_sentence( | |
sentence, deep_frames_cache, post_process=True, deep_frames_list=None | |
): | |
# frame structures in the sentence | |
sent_structures: Dict[int, FrameStructure] = {} | |
# role spans currently being built up (per structure + role name) | |
cur_spans: Dict[Tuple[int, str]] = {} | |
for token_idx, (token_str, frame_annos) in enumerate( | |
zip(sentence["tokens"], sentence["frame_list"]) | |
): | |
for fa in frame_annos: | |
# remove "virtual root" nonsense token | |
if "@@VIRTUAL_ROOT@@" in fa: | |
continue | |
fa = fa.split("@@")[0] # remove confidence score if it's there | |
anno, struct_id_str = fa.split("@") | |
struct_id = int(struct_id_str) | |
frame_name = anno.split(":")[1] | |
deep_frame, deep_frame_mapping = map_or_lookup_deep_frame( | |
frame_name, deep_frames_cache, deep_frames_list=deep_frames_list | |
) | |
if struct_id not in sent_structures: | |
sent_structures[struct_id] = FrameStructure( | |
frame=frame_name, | |
deep_frame=deep_frame, | |
target=None, | |
roles=[], | |
deep_roles=[], | |
) | |
cur_struct = sent_structures[struct_id] | |
# TODO: get rid of this hack | |
anno = anno.replace("I::", "I:") | |
anno = anno.replace("B::", "B:") | |
if anno.split(":")[0] == "T": | |
if cur_struct.target is None: | |
cur_struct.target = AnnotationSpan( | |
[token_idx], [token_str]) | |
else: | |
cur_struct.target.tokens_idx.append(token_idx) | |
cur_struct.target.tokens_str.append(token_str) | |
elif anno.split(":")[0] == "B": | |
role_name = anno.split(":")[2] | |
role_span = AnnotationSpan([token_idx], [token_str]) | |
cur_struct.roles.append((role_name, role_span)) | |
if role_name in deep_frame_mapping: | |
cur_struct.deep_roles.append( | |
(deep_frame_mapping[role_name], role_span) | |
) | |
cur_spans[(struct_id, role_name)] = role_span | |
elif anno.split(":")[0] == "I": | |
role_name = anno.split(":")[2] | |
role_span = cur_spans[(struct_id, role_name)] | |
role_span.tokens_str.append(token_str) | |
role_span.tokens_idx.append(token_idx) | |
# post-process: remove punctuation in targets | |
if post_process: | |
for fs in sent_structures.values(): | |
if len(fs.target.tokens_str) > 1: | |
target_tok_str_to_remove = [] | |
target_tok_idx_to_remove = [] | |
for tok_str, tok_idx in zip(fs.target.tokens_str, fs.target.tokens_idx): | |
if tok_str in ["``", "''", "`", "'", ".", ",", ";", ":"]: | |
target_tok_str_to_remove.append(tok_str) | |
target_tok_idx_to_remove.append(tok_idx) | |
for tok_str, tok_idx in zip( | |
target_tok_str_to_remove, target_tok_idx_to_remove | |
): | |
fs.target.tokens_str.remove(tok_str) | |
fs.target.tokens_idx.remove(tok_idx) | |
return sent_structures | |
def map_back_spacy_lome_tokens(spacy_doc, lome_tokens): | |
if len(lome_tokens) > len(spacy_doc): | |
raise ValueError( | |
f"Cannot re-tokenize (#lome={len(lome_tokens)} // #spacy={len(spacy_doc)})" | |
) | |
spacy_to_lome = {} | |
lome_idx = 0 | |
for spacy_idx, spacy_token in enumerate(spacy_doc): | |
spacy_to_lome[spacy_idx] = lome_idx | |
# whitespace after token: tokens correspond | |
if spacy_token.whitespace_: | |
lome_idx += 1 | |
return spacy_to_lome | |
def get_syn_category(spacy_token): | |
if spacy_token.pos_ == "NOUN": | |
return "n" | |
if spacy_token.pos_ == "ADJ": | |
return "adj" | |
if spacy_token.pos_ == "ADV": | |
return "adv" | |
if spacy_token.pos_ == "ADP": | |
return "p" | |
if spacy_token.pos_ == "VERB": | |
if spacy_token.morph.get("VerbForm") == ["Fin"]: | |
return "v:fin" | |
if spacy_token.morph.get("VerbForm") == ["Part"]: | |
return "v:part" | |
if spacy_token.morph.get("VerbForm") == ["Ger"]: | |
return "v:ger" | |
if spacy_token.morph.get("VerbForm") == ["Inf"]: | |
return "v:inf" | |
return "other" | |
def syntax_analyze(sentence, spacy_model_name, spacy_model_obj=None) -> Dict[str, Dict[str, Any]]: | |
lome_tokens = sentence["tokens"] | |
# load spacy model locally (so that it works in GAE) | |
# global nlp | |
if spacy_model_obj is not None: | |
nlp = spacy_model_obj | |
else: | |
nlp = spacy.load(spacy_model_name) | |
spacy_doc = nlp(" ".join(lome_tokens)) | |
analysis = defaultdict(list) | |
spacy_to_lome_tokens = map_back_spacy_lome_tokens(spacy_doc, lome_tokens) | |
for spacy_idx, token in enumerate(spacy_doc): | |
lome_idx = spacy_to_lome_tokens[spacy_idx] | |
syn_category = get_syn_category(token) | |
syn_construction = get_syn_construction(token, syn_category) | |
children = [] | |
for c in token.children: | |
children.append( | |
{ | |
"token": c.text, | |
"spacy_idx": c.i, | |
"lome_idx": spacy_to_lome_tokens[c.i], | |
"syn_category": get_syn_category(c), | |
"dependency": c.dep_, | |
} | |
) | |
ancestors = [] | |
for a in token.ancestors: | |
ancestors.append( | |
{ | |
"token": a.text, | |
"spacy_idx": a.i, | |
"lome_idx": spacy_to_lome_tokens[a.i], | |
"syn_category": get_syn_category(a), | |
"dependency": a.dep_, | |
} | |
) | |
# str key so that it doesn't change when converting to JSON | |
lome_key = str(lome_idx) | |
analysis[lome_key].append( | |
{ | |
"token": token.text, | |
"dependency": token.dep_, | |
"spacy_idx": spacy_idx, | |
"lome_idx": lome_idx, | |
"syn_category": syn_category, | |
"syn_construction": syn_construction, | |
"children": children, | |
"ancestors": ancestors, | |
} | |
) | |
return analysis | |
def get_syn_construction(token: Token, syn_category: str) -> str: | |
if syn_category in ["n", "adj", "adv", "p"]: | |
return "nonverbal" | |
if syn_category.startswith("v:"): | |
# find reflexives | |
for c in token.children: | |
if c.lemma_.lower() in ["si", "zich", "zichzelf"]: | |
return "verbal:reflexive" | |
# find impersonal constructions | |
for c in token.children: | |
if c.dep_ == "expl": | |
return "verbal:impersonal" | |
# all other finite verbs/gerunds/infinites -> active construction | |
if syn_category in ["v:fin", "v:ger", "v:inf"]: | |
return "_verbal:ACTIVE" | |
if syn_category == "v:part": | |
if token.dep_ == "acl": | |
return "_verbal:ADPOS" | |
for c in token.children: | |
# passive subj or auxiliary present: it's a passive | |
if c.dep_ in ["nsubj:pass", "aux:pass"]: | |
return "verbal:passive" | |
# auxiliary "HAVE" (avere/hebben) present: it's an active | |
if ( | |
c.dep_ == "aux" | |
and c.lemma_.lower() in ITALIAN_ACTIVE_AUX + DUTCH_ACTIVE_AUX | |
): | |
return "verbal:active" | |
return "_verbal:OTH_PART" | |
return "other" | |
def get_syntax_info(struct: FrameStructure, syntax: Dict) -> Dict: | |
target_idx = str(struct.target.tokens_idx[0]) | |
# print(target_idx, syntax) | |
syntax_for_target = syntax[target_idx] | |
return syntax_for_target[-1] | |
def enrich_texts_df(texts_df: pd.DataFrame, events_df: pd.DataFrame): | |
time_delta_rows: List[Optional[int]] = [] | |
for idx, text_row in texts_df.iterrows(): | |
try: | |
event_row = events_df[events_df["event:id"] | |
== text_row["event_id"]].iloc[0] | |
except IndexError: | |
print(f"Skipping {idx} (IndexError)") | |
time_delta_rows.append(None) | |
if "pubdate" not in text_row or pd.isna(text_row["pubdate"]) or pd.isna(event_row["event:date"]): | |
time_delta_rows.append(None) | |
else: | |
try: | |
pub_date = datetime.strptime( | |
text_row["pubdate"], "%Y-%m-%d %H:%M:%S") | |
event_date = datetime.strptime( | |
event_row["event:date"], "%Y-%m-%d") | |
time_delta = pub_date - event_date | |
time_delta_days = time_delta.days | |
time_delta_rows.append(time_delta_days) | |
except ValueError as e: | |
print( | |
f"\t\terror parsing dates, see below for more info:\n\t\t{e}") | |
time_delta_rows.append(None) | |
return texts_df.assign(days_after_event=time_delta_rows) | |
def read_frames_of_interest(dataset) -> List[str]: | |
if dataset in ["femicides/rai", "femicides/olv"]: | |
file = "resources/femicide_frame_list.txt" | |
elif dataset == "crashes/thecrashes": | |
file = "resources/crashes_frame_list.txt" | |
elif dataset == "migration/pavia": | |
file = "resources/migration_frame_list.txt" | |
else: | |
raise ValueError("Unsupported dataset") | |
frames = set() | |
with open(file, encoding="utf-8") as f: | |
for line in f: | |
line = line.strip() | |
if line.startswith("#") or not line: | |
continue | |
frames.add(line[0].upper() + line[1:].lower()) | |
return sorted(frames) | |
def make_dep_label_cache(): | |
labels = set() | |
for dataset in ["femicides/rai", "crashes/thecrashes", "migration/pavia"]: | |
tarball = ( | |
"output/femicides/lome/lome_0shot/multilabel_rai.tar.gz" | |
if dataset == "femicides/rai" | |
else "output/crashes/lome/lome_0shot/multilabel_thecrashes.tar.gz" | |
if dataset == "crashes/thecrashes" | |
else "output/migration/lome/lome_0shot/multilabel_pavia.tar.gz" | |
) | |
spacy_model = ( | |
"it_core_news_md" if dataset["femicides/rai", | |
"migration/pavia"] else "nl_core_news_md" | |
) | |
deep_frames_cache = load_deep_frames_cache(dataset) | |
syntax_cache = SYNTAX_ANALYSIS_CACHE_FILES[dataset] | |
with tarfile.open(tarball, "r:gz") as tar_f: | |
for mem in [ | |
m.name for m in tar_f.getmembers() if m.name.endswith(".comm.json") | |
]: | |
if mem is None: | |
continue | |
print(mem) | |
mem_obj = io.TextIOWrapper(tar_f.extractfile(mem)) | |
(_, _, _, role_analyses,) = process_prediction_file( | |
filename=mem, | |
dataset_name=dataset, | |
file_obj=mem_obj, | |
syntax_cache=syntax_cache, | |
deep_frames_cache=deep_frames_cache, | |
spacy_model=spacy_model, | |
) | |
if role_analyses is None: | |
print(f"\tSkipping file {mem}, no role analyses found") | |
continue | |
for sent_ra in role_analyses: | |
for ra in sent_ra.values(): | |
for dep, _ in ra.values(): | |
labels.add(dep) | |
with open(DEP_LABEL_CACHE_FILE, "w", encoding="utf-8") as f_out: | |
for label in sorted(labels): | |
f_out.write(label + os.linesep) | |
def analyze_external_file(file_in, file_out, spacy_model): | |
deep_frames_cache = load_deep_frames_cache() | |
( | |
sents, | |
pred_structures, | |
syntax_analyses, | |
role_analyses, | |
) = process_prediction_file(file_in, "", None, deep_frames_cache, spacy_model_obj=spacy_model) | |
output = [] | |
for sent, structs, syntax, roles in zip( | |
sents, pred_structures, syntax_analyses, role_analyses | |
): | |
output.append( | |
{ | |
"sentence": sent, | |
"fn_structures": [ | |
dataclasses.asdict(fs) for fs in structs.values() | |
], | |
"syntax": syntax, | |
"roles": roles | |
} | |
) | |
with open(file_out, "w", encoding="utf-8") as f_out: | |
json.dump(output, f_out, indent=4) | |
if __name__ == "__main__": | |
ap = argparse.ArgumentParser() | |
ap.add_argument("command", choices=[ | |
"make_syntax_cache", "make_dep_label_cache", "analyze_file" | |
]) | |
ap.add_argument("dataset", choices=["femicides/rai", "femicides/rai_main", "femicides/rai_ALL", | |
"femicides/olv", "crashes/thecrashes", "migration/pavia", "*"]) | |
ap.add_argument("--input_file", type=str, default="") | |
ap.add_argument("--output_file", type=str, default="") | |
args = ap.parse_args() | |
if args.command == "make_syntax_cache": | |
if args.dataset == "*": | |
raise ValueError( | |
"Please specificy a dataset for `make_syntax_cache`") | |
if args.dataset == "crashes/thecrashes": | |
make_syntax_cache( | |
"crashes/thecrashes", skip_fn=lambda f: not is_a_dutch_text(f) | |
) | |
elif args.dataset == "femicides/rai": | |
make_syntax_cache("femicides/rai") | |
elif args.dataset == "femicides/rai_main": | |
make_syntax_cache("femicides/rai_main") | |
elif args.dataset == "femicides/rai_ALL": | |
make_syntax_cache("femicides/rai_ALL") | |
elif args.dataset == "femicides/olv": | |
make_syntax_cache("femicides/olv") | |
else: | |
make_syntax_cache("migration/pavia") | |
elif args.command == "make_dep_label_cache": | |
make_dep_label_cache() | |
elif args.command == "analyze_file": | |
analyze_external_file(args.input_file, args.output_file) | |