import nltk |
from nltk.tag import PerceptronTagger |
from stable_whisper.result import WordTiming |
import numpy as np |
import torch |
def bind_wordtimings_to_tags(wt: list[WordTiming]): |
raw_words = [w.word for w in wt] |
tokenized_raw_words = [] |
tokens_wordtiming_map = [] |
for word in raw_words: |
tokens_word = nltk.word_tokenize(word) |
tokenized_raw_words.extend(tokens_word) |
tokens_wordtiming_map.append(len(tokens_word)) |
tagged_words = nltk.pos_tag(tokenized_raw_words) |
grouped_tags = [] |
for k in tokens_wordtiming_map: |
grouped_tags.append(tagged_words[:k]) |
tagged_words = tagged_words[k:] |
tags_only = [tuple([w[1] for w in t]) for t in grouped_tags] |
wordtimings_with_tags = zip(wt, tags_only) |
return list(wordtimings_with_tags) |
def embed_tag_list(tags: list[str]): |
tags_dict = get_upenn_tags_dict() |
eye = np.eye(len(tags_dict)) |
return eye[np.array([tags_dict[tag] for tag in tags])] |
def lookup_tag_list(tags: list[str]): |
tags_dict = get_upenn_tags_dict() |
return np.array([tags_dict[tag] for tag in tags], dtype=int) |
def tag_training_data(filename: str): |
with open(filename, "r") as f: |
segmented_lines = f.readlines() |
segmented_lines = [s.strip() for s in segmented_lines if s.strip() != ""] |
full_text = " ".join(segmented_lines) |
tokenized_full_text = nltk.word_tokenize(full_text) |
tagged_full_text = nltk.pos_tag(tokenized_full_text) |
tagged_full_text_copy = tagged_full_text |
reconstructed_tags = [] |
for line in segmented_lines: |
line_nospace = line.replace(r" ", "") |
found = False |
for i in range(len(tagged_full_text_copy)+1): |
rejoined = "".join([x[0] for x in tagged_full_text_copy[:i]]) |
if line_nospace == rejoined: |
found = True |
reconstructed_tags.append(tagged_full_text_copy[:i]) |
tagged_full_text_copy = tagged_full_text_copy[i:] |
continue; |
if found == False: |
print("Panic. Cannot match further.") |
print(f"Was trying to match: {line}") |
print(tagged_full_text_copy) |
return reconstructed_tags |
def get_upenn_tags_dict(): |
tagger = PerceptronTagger() |
tags = list(tagger.tagdict.values()) |
tags.extend(["CC", "CD", "DT", "EX", "FW", "IN", "JJ", "JJR", "JJS", "LS", "MD", "NN", "NNS", "NNP", "NNPS", "PDT", "POS", "PRP", "PRP$", "RB", "RBR", "RBS", "RP", "SYM", "TO", "UH", "VB", "VBD", "VBG", "VBN", "VBP", "VBZ", "WDT", "WP", "WP$", "WRB"]) |
tags = list(set(tags)) |
tags.sort() |
tags.append("BREAK") |
tags_dict = dict() |
for index, tag in enumerate(tags): |
tags_dict[tag] = index |
return tags_dict |
def parse_tags(reconstructed_tags): |
""" |
Parse reconstructed tags into input/tag datapoint. |
In the original plan, this type of output is suitable for bidirectional LSTM. |
Input: |
reconstured_tags: |
Tagged segments, from tag_training_data() |
Example: [ |
[('You', 'PRP'), ("'re", 'VBP'), ('back', 'RB'), ('again', 'RB'), ('?', '.')], |
[('You', 'PRP'),("'ve", 'VBP'), ('been', 'VBN'), ('consuming', 'VBG'), ('a', 'DT'), ('lot', 'NN'), ('of', 'IN'), ('tech', 'JJ'), ('news', 'NN'), ('lately', 'RB'), ('.', '.')] |
... |
] |
Output: |
(input_tokens, output_tag) |
input_tokens: |
A sequence of tokens, each number corresponds to a type of word. |
Example: [25, 38, 27, 27, 6, 25, 38, 37, 36, 10, 19, 13, 14, 19, 27, 6] |
output_tags: |
A sequence of 0 and 1, indicating whether a break should be inserted AFTER each location. |
Example: [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1] |
""" |
tags_dict = get_upenn_tags_dict() |
all_tags_sequence = [[y[1] for y in segments] + ['BREAK'] for segments in reconstructed_tags] |
all_tags_sequence = [tag for tags in all_tags_sequence for tag in tags] |
input_tokens = [] |
output_tag = [] |
for token in all_tags_sequence: |
if token != 'BREAK': |
input_tokens.append(tags_dict[token]) |
output_tag.append(0) |
else: |
output_tag[-1] = 1 |
return input_tokens, output_tag |
def embed_segments(tagged_segments): |
tags, tags_dict = get_upenn_tags_dict() |
for index, tag in enumerate(tags): |
tags_dict[tag] = index |
result_embedding = [] |
classes = len(tags) |
eye = np.eye(classes) |
for segment in tagged_segments: |
targets = np.array([tags_dict[tag] for word, tag in segment]) |
segment_embedding = eye[targets] |
result_embedding.append(segment_embedding) |
result_embedding.append(np.array([eye[tags_dict["BREAK"]]])) |
result_embedding = np.concatenate(result_embedding) |
return result_embedding, tags_dict |
def window_embedded_segments_rnn(embeddings, tags_dict): |
datapoints = [] |
eye = np.eye(len(tags_dict)) |
break_vector = eye[tags_dict["BREAK"]] |
for i in range(1, embeddings.shape[0]): |
if (embeddings[i] == break_vector).all(): |
continue |
else: |
prev_sequence = embeddings[:i] |
if (prev_sequence[-1] == break_vector).all(): |
prev_sequence = prev_sequence[:-1] |
tag = 1 |
else: |
tag = 0 |
entire_sequence = np.concatenate((prev_sequence, np.array([embeddings[i]]))) |
datapoints.append((entire_sequence, tag)) |
return datapoints |
def print_dataset(datapoints, tags_dict, tokenized_full_text): |
eye = np.eye(len(tags_dict)) |
break_vector = eye[tags_dict["BREAK"]] |
for input, tag in datapoints: |
if tag == 1: |
print("[1] ", end='') |
else: |
print("[0] ", end='') |
count = 0 |
for v in input: |
if not (v == break_vector).all(): |
count += 1 |
segment = tokenized_full_text[:count] |
print(segment) |
from stable_whisper.result import Segment |
def get_indicies(segment: Segment, model, device, threshold): |
word_list = segment.words |
tagged_wordtiming = bind_wordtimings_to_tags(word_list) |
tag_list = [tag for twt in tagged_wordtiming for tag in twt[1]] |
tag_per_word = [len(twt[1]) for twt in tagged_wordtiming] |
embedded_tags = embed_tag_list(tag_list) |
embedded_tags = torch.from_numpy(embedded_tags).float() |
output = model(embedded_tags[None, :].to(device)) |
list_output = output.detach().cpu().numpy().tolist()[0] |
current_index = 0 |
cut_indicies = [] |
for index, tags_count in enumerate(tag_per_word): |
tags = list_output[current_index:current_index+tags_count] |
if max(tags) > threshold: |
cut_indicies.append(index) |
current_index += tags_count |
return cut_indicies |
def get_indicies_autoembed(segment: Segment, model, device, threshold): |
word_list = segment.words |
tagged_wordtiming = bind_wordtimings_to_tags(word_list) |
tag_list = [tag for twt in tagged_wordtiming for tag in twt[1]] |
tag_per_word = [len(twt[1]) for twt in tagged_wordtiming] |
embedded_tags = lookup_tag_list(tag_list) |
embedded_tags = torch.from_numpy(embedded_tags).int().to(device) |
output = model(embedded_tags[None, :].to(device)) |
list_output = output.detach().cpu().numpy().tolist()[0] |
current_index = 0 |
cut_indicies = [] |
for index, tags_count in enumerate(tag_per_word): |
tags = list_output[current_index:current_index+tags_count] |
if max(tags) > threshold: |
cut_indicies.append(index) |
current_index += tags_count |
return cut_indicies |