from grewtse.preprocessing.grew_dependencies import match_dependencies from grewtse.preprocessing.reconstruction import ( perform_token_surgery, recursive_match_token, ) from conllu import parse_incr, Token from typing import Any import pandas as pd import numpy as np import logging class ConlluParser: """ A class designed to parse .conllu files for Grew-TSE, that is, the standard format for UD treebanks. """ def __init__(self) -> None: self.lexicon: pd.DataFrame = None def build_lexicon(self, filepaths: list[str] | str) -> pd.DataFrame: """ Create a DataFrame that contains the set of all words with their features as generated from a UD treebank. This is essential for the subsequent generation of minimal pairs. This was not designed to handle treebanks that assign differing names to features, so please ensure multiple treebank files are all from the same treebank or treebank schema. :param filepaths: a list of strings corresponding to the UD treebank files e.g. ["german_treebank_part_A.conllu", "german_treebank_part_B.conllu"]. :return: a DataFrame with all words and their features. """ rows = [] if isinstance(filepaths, str): filepaths = [filepaths] # wrap single path in list for conllu_path in filepaths: with open(conllu_path, "r", encoding="utf-8") as f: for tokenlist in parse_incr(f): # get the sentence ID in the dataset sent_id = tokenlist.metadata["sent_id"] # iterate over each token for token in tokenlist: # check if it's worth saving to our lexical item dataset is_valid_token = is_valid_for_lexicon(token) if not is_valid_token: continue # from the token object create a dict and append row = build_token_row(token, sent_id) rows.append(row) lexicon_df = pd.DataFrame(rows) # make sure our nan values are interpreted as such lexicon_df.replace("nan", np.nan, inplace=True) # create the (Sentence ID, Token ID) primary key lexicon_df.set_index(["sentence_id", "token_id"], inplace=True) self.lexicon = lexicon_df return lexicon_df def to_syntactic_feature( self, sentence_id: str, token_id: str, token: str, alt_morph_constraints: dict, alt_universal_constraints: dict, ) -> str | None: """ The most important function for the finding of minimal pairs. Converts a given lexical item taken from a UD treebank sentence to another lexical item of the same lemma but with the specified differing feature(s). :param sentence_id: the ID in the treebank of the sentence. :param token_id: the token index in the list of tokens corresponding to the isolated target word. :param token: the token string itself that is the isolated target word. :param alt_morph_constraints: the alternative morphological feature(s) for the target word. :param alt_universal_constraints: the alternative UPOS feature(s) for the target word. :return: a string representing the converted target word. """ # distinguish morphological from universal features # todo: find a better way to do this prefix = "feats__" # prefix = '' alt_morph_constraints = { prefix + key: value for key, value in alt_morph_constraints.items() } token_features = self.get_features(sentence_id, token_id) token_features.update(alt_morph_constraints) token_features.update(alt_universal_constraints) lexical_items = self.lexicon # get only those items which are the same lemma lemma = self.get_lemma(sentence_id, token_id) lemma_mask = lexical_items["lemma"] == lemma lexical_items = lexical_items[lemma_mask] lexical_items = construct_candidate_set(lexical_items, token_features) # ensure that it doesn't allow minimal pairs with different start cases e.g business, Business filtered = lexical_items[ lexical_items["form"].apply(lambda w: is_same_start_case(w, token)) ] if not filtered.empty: return filtered["form"].iloc[0] else: return None def get_lexicon(self) -> pd.DataFrame: return self.lexicon # this shouldn't be hard coded def get_feature_names(self) -> list: return self.lexicon.columns[4:].to_list() # todo: add more safety def get_features(self, sentence_id: str, token_id: int) -> dict: return self.lexicon.loc[(sentence_id, token_id)][ self.get_feature_names() ].to_dict() def get_lemma(self, sentence_id: str, token_id: str) -> str: return self.lexicon.loc[(sentence_id, token_id)]["lemma"] def get_candidate_set( self, universal_constraints: dict, morph_constraints: dict ) -> pd.DataFrame: has_parsed_conllu = self.lexicon is not None if not has_parsed_conllu: raise ValueError("Please parse a ConLLU file first.") morph_constraints = {f"feats__{k}": v for k, v in morph_constraints.items()} are_morph_features_valid = all( f in self.lexicon.columns for f in morph_constraints.keys() ) are_universal_features_valid = all( f in self.lexicon.columns for f in universal_constraints.keys() ) if not are_morph_features_valid or not are_universal_features_valid: raise KeyError( "Features provided for candidate set are not valid features in the dataset." ) all_constraints = {**universal_constraints, **morph_constraints} candidate_set = construct_candidate_set(self.lexicon, all_constraints) return candidate_set def build_prompt_dataset( self, filepaths: list[str], grew_query: str, dependency_node: str, encoding: str = "utf-8", ): prompt_cutoff_token = "[PROMPT_CUTOFF]" results = self.build_masked_dataset( filepaths, grew_query, dependency_node, prompt_cutoff_token, encoding ) prompt_dataset = results["masked"] def substring_up_to_token(s: str, token: str) -> str: idx = s.find(token) return s[:idx].strip() if idx != -1 else s.strip() prompt_dataset["prompt_text"] = prompt_dataset["masked_text"].apply( lambda x: substring_up_to_token(x, prompt_cutoff_token) ) prompt_dataset = prompt_dataset.drop(["masked_text"], axis=1) return prompt_dataset def build_masked_dataset( self, filepaths: list[str], grew_query: str, dependency_node: str, mask_token: str, encoding: str = "utf-8", ): masked_dataset = [] exception_dataset = [] try: for filepath in filepaths: get_tokens_to_mask = match_dependencies( filepath, grew_query, dependency_node ) with open(filepath, "r", encoding=encoding) as data_file: for sentence in parse_incr(data_file): sentence_id = sentence.metadata["sent_id"] sentence_text = sentence.metadata["text"] if sentence_id in get_tokens_to_mask: for i in range(len(sentence)): sentence[i]["index"] = i token_to_mask_id = get_tokens_to_mask[sentence_id] try: t_match = [ tok for tok in sentence if tok.get("id") == token_to_mask_id ][0] t_match_form = t_match["form"] t_match_index = t_match["index"] sentence_as_str_list = [t["form"] for t in sentence] except KeyError: logging.info( "There was a mismatch for the GREW-based ID and the Conllu ID." ) exception_dataset.append( { "sentence_id": sentence_id, "match_id": None, "all_tokens": None, "match_token": None, "original_text": sentence_text, } ) continue try: matched_token_start_index = recursive_match_token( sentence_text, # the original string sentence_as_str_list.copy(), # the string as a list of tokens t_match_index, # the index of the token to be replaced [ "_", " ", ], # todo: skip lines where we don't encounter accounted for tokens ) except ValueError: exception_dataset.append( { "sentence_id": sentence_id, "match_id": token_to_mask_id, "all_tokens": sentence_as_str_list, "match_token": t_match_form, "original_text": sentence_text, } ) continue # let's replace the matched token with a MASK token masked_sentence = perform_token_surgery( sentence_text, t_match_form, mask_token, matched_token_start_index, ) # the sentence ID and match ID are together a primary key masked_dataset.append( { "sentence_id": sentence_id, "match_id": token_to_mask_id, "match_token": t_match_form, "original_text": sentence_text, "masked_text": masked_sentence, } ) except Exception as e: print(f"Issue building dataset: {e}") masked_dataset_df = pd.DataFrame(masked_dataset) exception_dataset_df = pd.DataFrame(exception_dataset) return {"masked": masked_dataset_df, "exception": exception_dataset_df} def construct_candidate_set( lexicon: pd.DataFrame, target_features: dict ) -> pd.DataFrame: """ This constructs a list of words which have the same feature set as the target features which are passed as an argument. These resulting words are termed 'candidates'. :param lexicon: the DataFrame consisting of all lexical items and their features :param target_features: the differing features of the candidates. :return: a DataFrame containing the candidate subset of the lexicon. """ # optionally restrict search to a certain type of lexical item subset = lexicon # continuously filter the dataframe so as to be left # only with those lexical items which match the target # features # this includes cases for feat, value in target_features.items(): # ensure feature is a valid feature in feature set if feat not in subset.columns: raise KeyError("Invalid feature provided to confound set: {}".format(feat)) # slim the mask down using each feature # interesting edge case: np.nan == np.nan returns false! mask = (subset[feat] == value) | (subset[feat].isna() & pd.isna(value)) subset = subset[mask] return subset def is_same_start_case(s1, s2): if not s1 or not s2: return False return s1[0].isupper() == s2[0].isupper() def is_valid_for_lexicon(token: Token) -> bool: punctuation = [".", ",", "!", "?", "*"] # skip multiword tokens, malformed entries and punctuation is_punctuation = token.get("form") in punctuation is_valid_type = isinstance(token, dict) has_valid_id = isinstance(token.get("id"), int) return is_valid_type and has_valid_id and not is_punctuation def build_token_row(token: Token, sentence_id: str) -> dict[str, Any]: # get all token features such as Person, Mood, etc feats = token.get("feats") or {} row = { "sentence_id": sentence_id, "token_id": token.get("id") - 1, # IDs are reduced by one to start at 0 "form": token.get("form"), "lemma": token.get("lemma"), "upos": token.get("upos"), "xpos": token.get("xpos"), } # add each morphological feature as a column for feat_name, feat_value in feats.items(): row["feats__" + feat_name.lower()] = feat_value return row