Spaces:
Build error
Build error
File size: 7,444 Bytes
47c0211 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
import csv
import pandas as pd
from sklearn.model_selection import train_test_split
from weakly_supervised_parser.utils.process_ptb import punctuation_words, currency_tags_words
from weakly_supervised_parser.utils.distant_supervision import RuleBasedHeuristic
filterchars = punctuation_words + currency_tags_words
filterchars = [char for char in filterchars if char not in list(",;-") and char not in "``" and char not in "''"]
class NGramify:
def __init__(self, sentence):
self.sentence = sentence.split()
self.sentence_length = len(self.sentence)
self.ngrams = []
def generate_ngrams(self, single_span=True, whole_span=True):
# number of substrings possible is N*(N+1)/2
# exclude substring or spans of length 1 and length N
if single_span:
start = 1
else:
start = 2
if whole_span:
end = self.sentence_length + 1
else:
end = self.sentence_length
for n in range(start, end):
for i in range(self.sentence_length - n + 1):
self.ngrams.append(((i, i + n), self.sentence[i : i + n]))
return self.ngrams
def generate_all_possible_spans(self):
for n in range(2, self.sentence_length):
for i in range(self.sentence_length - n + 1):
if i > 0 and (i + n) < self.sentence_length:
self.ngrams.append(
(
(i, i + n),
" ".join(self.sentence[i : i + n]),
" ".join(self.sentence[0:i])
+ " ("
+ " ".join(self.sentence[i : i + n])
+ ") "
+ " ".join(self.sentence[i + n : self.sentence_length]),
)
)
elif i == 0:
self.ngrams.append(
(
(i, i + n),
" ".join(self.sentence[i : i + n]),
"(" + " ".join(self.sentence[i : i + n]) + ") " + " ".join(self.sentence[i + n : self.sentence_length]),
)
)
elif (i + n) == self.sentence_length:
self.ngrams.append(
(
(i, i + n),
" ".join(self.sentence[i : i + n]),
" ".join(self.sentence[0:i]) + " (" + " ".join(self.sentence[i : i + n]) + ")",
)
)
return self.ngrams
class DataLoaderHelper:
def __init__(self, input_file_object=None, output_file_object=None):
self.input_file_object = input_file_object
self.output_file_object = output_file_object
def read_lines(self):
with open(self.input_file_object, "r") as f:
lines = f.read().splitlines()
return lines
def __getitem__(self, index):
return self.read_lines()[index]
def write_lines(self, keys, values):
with open(self.output_file_object, "w", newline="\n") as output_file:
dict_writer = csv.DictWriter(output_file, keys, delimiter="\t")
dict_writer.writeheader()
dict_writer.writerows(values)
class PTBDataset:
def __init__(self, data_path):
self.data = pd.read_csv(data_path, sep="\t", header=None, names=["sentence"])
self.data["sentence"] = self.data
def __len__(self):
return len(self.data)
def __getitem__(self, index):
return self.data["sentence"].loc[index]
def retrieve_all_sentences(self, N=None):
if N:
return self.data["sentence"].iloc[:N].tolist()
return self.data["sentence"].tolist()
def preprocess(self):
self.data["sentence"] = self.data["sentence"].apply(
lambda row: " ".join([sentence for sentence in row.split() if sentence not in filterchars])
)
return self.data
def seed_bootstrap_constituent(self):
whole_span_slice = self.data["sentence"]
func = lambda x: RuleBasedHeuristic().add_contiguous_titlecase_words(
row=[(index, character) for index, character in enumerate(x) if character.istitle() or "'" in character]
)
titlecase_matches = [item for sublist in self.data["sentence"].str.split().apply(func).tolist() for item in sublist if len(item.split()) > 1]
titlecase_matches_df = pd.Series(titlecase_matches)
titlecase_matches_df = titlecase_matches_df[~titlecase_matches_df.str.split().str[0].str.contains("'")].str.replace("''", "")
most_frequent_start_token = RuleBasedHeuristic(corpus=self.retrieve_all_sentences()).augment_using_most_frequent_starting_token(N=1)[0][0]
most_frequent_start_token_df = titlecase_matches_df[titlecase_matches_df.str.startswith(most_frequent_start_token)].str.lower()
constituent_samples = pd.DataFrame(dict(sentence=pd.concat([whole_span_slice, titlecase_matches_df, most_frequent_start_token_df]), label=1))
return constituent_samples
def seed_bootstrap_distituent(self):
avg_sent_len = int(self.data["sentence"].str.split().str.len().mean())
last_but_one_slice = self.data["sentence"].str.split().str[:-1].str.join(" ")
last_but_two_slice = self.data[self.data["sentence"].str.split().str.len() > avg_sent_len + 10]["sentence"].str.split().str[:-2].str.join(" ")
last_but_three_slice = (
self.data[self.data["sentence"].str.split().str.len() > avg_sent_len + 20]["sentence"].str.split().str[:-3].str.join(" ")
)
last_but_four_slice = (
self.data[self.data["sentence"].str.split().str.len() > avg_sent_len + 30]["sentence"].str.split().str[:-4].str.join(" ")
)
last_but_five_slice = (
self.data[self.data["sentence"].str.split().str.len() > avg_sent_len + 40]["sentence"].str.split().str[:-5].str.join(" ")
)
last_but_six_slice = self.data[self.data["sentence"].str.split().str.len() > avg_sent_len + 50]["sentence"].str.split().str[:-6].str.join(" ")
distituent_samples = pd.DataFrame(
dict(
sentence=pd.concat(
[
last_but_one_slice,
last_but_two_slice,
last_but_three_slice,
last_but_four_slice,
last_but_five_slice,
last_but_six_slice,
]
),
label=0,
)
)
return distituent_samples
def train_validation_split(self, seed, test_size=0.5, shuffle=True):
self.preprocess()
bootstrap_constituent_samples = self.seed_bootstrap_constituent()
bootstrap_distituent_samples = self.seed_bootstrap_distituent()
df = pd.concat([bootstrap_constituent_samples, bootstrap_distituent_samples], ignore_index=True)
df = df.drop_duplicates(subset=["sentence"]).dropna(subset=["sentence"])
df["sentence"] = df["sentence"].str.strip()
df = df[df["sentence"].str.split().str.len() > 1]
train, validation = train_test_split(df, test_size=test_size, random_state=seed, shuffle=shuffle)
return train.head(8000), validation.head(2000)
|