|
|
|
from __future__ import print_function, division |
|
|
|
import glob |
|
import json |
|
import uuid |
|
from copy import deepcopy |
|
from collections import defaultdict, OrderedDict |
|
import numpy as np |
|
|
|
from torchmoji.filter_utils import is_special_token |
|
from torchmoji.word_generator import WordGenerator |
|
from torchmoji.global_variables import SPECIAL_TOKENS, VOCAB_PATH |
|
|
|
class VocabBuilder(): |
|
""" Create vocabulary with words extracted from sentences as fed from a |
|
word generator. |
|
""" |
|
def __init__(self, word_gen): |
|
|
|
self.word_counts = defaultdict(lambda: 0, {}) |
|
self.word_length_limit=30 |
|
|
|
for token in SPECIAL_TOKENS: |
|
assert len(token) < self.word_length_limit |
|
self.word_counts[token] = 0 |
|
self.word_gen = word_gen |
|
|
|
def count_words_in_sentence(self, words): |
|
""" Generates word counts for all tokens in the given sentence. |
|
|
|
# Arguments: |
|
words: Tokenized sentence whose words should be counted. |
|
""" |
|
for word in words: |
|
if 0 < len(word) and len(word) <= self.word_length_limit: |
|
try: |
|
self.word_counts[word] += 1 |
|
except KeyError: |
|
self.word_counts[word] = 1 |
|
|
|
def save_vocab(self, path=None): |
|
""" Saves the vocabulary into a file. |
|
|
|
# Arguments: |
|
path: Where the vocabulary should be saved. If not specified, a |
|
randomly generated filename is used instead. |
|
""" |
|
dtype = ([('word','|S{}'.format(self.word_length_limit)),('count','int')]) |
|
np_dict = np.array(self.word_counts.items(), dtype=dtype) |
|
|
|
|
|
np_dict[::-1].sort(order='count') |
|
data = np_dict |
|
|
|
if path is None: |
|
path = str(uuid.uuid4()) |
|
|
|
np.savez_compressed(path, data=data) |
|
print("Saved dict to {}".format(path)) |
|
|
|
def get_next_word(self): |
|
""" Returns next tokenized sentence from the word geneerator. |
|
|
|
# Returns: |
|
List of strings, representing the next tokenized sentence. |
|
""" |
|
return self.word_gen.__iter__().next() |
|
|
|
def count_all_words(self): |
|
""" Generates word counts for all words in all sentences of the word |
|
generator. |
|
""" |
|
for words, _ in self.word_gen: |
|
self.count_words_in_sentence(words) |
|
|
|
class MasterVocab(): |
|
""" Combines vocabularies. |
|
""" |
|
def __init__(self): |
|
|
|
|
|
self.master_vocab = {} |
|
|
|
def populate_master_vocab(self, vocab_path, min_words=1, force_appearance=None): |
|
""" Populates the master vocabulary using all vocabularies found in the |
|
given path. Vocabularies should be named *.npz. Expects the |
|
vocabularies to be numpy arrays with counts. Normalizes the counts |
|
and combines them. |
|
|
|
# Arguments: |
|
vocab_path: Path containing vocabularies to be combined. |
|
min_words: Minimum amount of occurences a word must have in order |
|
to be included in the master vocabulary. |
|
force_appearance: Optional vocabulary filename that will be added |
|
to the master vocabulary no matter what. This vocabulary must |
|
be present in vocab_path. |
|
""" |
|
|
|
paths = glob.glob(vocab_path + '*.npz') |
|
sizes = {path: 0 for path in paths} |
|
dicts = {path: {} for path in paths} |
|
|
|
|
|
for path in paths: |
|
np_data = np.load(path)['data'] |
|
|
|
for entry in np_data: |
|
word, count = entry |
|
if count < min_words: |
|
continue |
|
if is_special_token(word): |
|
continue |
|
dicts[path][word] = count |
|
|
|
sizes[path] = sum(dicts[path].values()) |
|
print('Overall word count for {} -> {}'.format(path, sizes[path])) |
|
print('Overall word number for {} -> {}'.format(path, len(dicts[path]))) |
|
|
|
vocab_of_max_size = max(sizes, key=sizes.get) |
|
max_size = sizes[vocab_of_max_size] |
|
print('Min: {}, {}, {}'.format(sizes, vocab_of_max_size, max_size)) |
|
|
|
|
|
if force_appearance is not None: |
|
force_appearance_path = [p for p in paths if force_appearance in p][0] |
|
force_appearance_vocab = deepcopy(dicts[force_appearance_path]) |
|
print(force_appearance_path) |
|
else: |
|
force_appearance_path, force_appearance_vocab = None, None |
|
|
|
|
|
for path in paths: |
|
normalization_factor = max_size / sizes[path] |
|
print('Norm factor for path {} -> {}'.format(path, normalization_factor)) |
|
|
|
for word in dicts[path]: |
|
if is_special_token(word): |
|
print("SPECIAL - ", word) |
|
continue |
|
normalized_count = dicts[path][word] * normalization_factor |
|
|
|
|
|
if force_appearance_vocab is not None: |
|
try: |
|
force_word_count = force_appearance_vocab[word] |
|
except KeyError: |
|
continue |
|
|
|
|
|
|
|
if word in self.master_vocab: |
|
self.master_vocab[word] += normalized_count |
|
else: |
|
self.master_vocab[word] = normalized_count |
|
|
|
print('Size of master_dict {}'.format(len(self.master_vocab))) |
|
print("Hashes for master dict: {}".format( |
|
len([w for w in self.master_vocab if '#' in w[0]]))) |
|
|
|
def save_vocab(self, path_count, path_vocab, word_limit=100000): |
|
""" Saves the master vocabulary into a file. |
|
""" |
|
|
|
|
|
words = OrderedDict() |
|
for token in SPECIAL_TOKENS: |
|
|
|
words[token] = -1 |
|
|
|
|
|
desc_order = OrderedDict(sorted(self.master_vocab.items(), |
|
key=lambda kv: kv[1], reverse=True)) |
|
words.update(desc_order) |
|
|
|
|
|
|
|
np_vocab = np.array(words.items(), |
|
dtype=([('word','|S30'),('count','float')])) |
|
|
|
|
|
counts = np_vocab[:word_limit] |
|
np.savez_compressed(path_count, counts=counts) |
|
|
|
|
|
final_words = OrderedDict() |
|
for i, w in enumerate(words.keys()[:word_limit]): |
|
final_words.update({w:i}) |
|
with open(path_vocab, 'w') as f: |
|
f.write(json.dumps(final_words, indent=4, separators=(',', ': '))) |
|
|
|
|
|
def all_words_in_sentences(sentences): |
|
""" Extracts all unique words from a given list of sentences. |
|
|
|
# Arguments: |
|
sentences: List or word generator of sentences to be processed. |
|
|
|
# Returns: |
|
List of all unique words contained in the given sentences. |
|
""" |
|
vocab = [] |
|
if isinstance(sentences, WordGenerator): |
|
sentences = [s for s, _ in sentences] |
|
|
|
for sentence in sentences: |
|
for word in sentence: |
|
if word not in vocab: |
|
vocab.append(word) |
|
|
|
return vocab |
|
|
|
|
|
def extend_vocab_in_file(vocab, max_tokens=10000, vocab_path=VOCAB_PATH): |
|
""" Extends JSON-formatted vocabulary with words from vocab that are not |
|
present in the current vocabulary. Adds up to max_tokens words. |
|
Overwrites file in vocab_path. |
|
|
|
# Arguments: |
|
new_vocab: Vocabulary to be added. MUST have word_counts populated, i.e. |
|
must have run count_all_words() previously. |
|
max_tokens: Maximum number of words to be added. |
|
vocab_path: Path to the vocabulary json which is to be extended. |
|
""" |
|
try: |
|
with open(vocab_path, 'r') as f: |
|
current_vocab = json.load(f) |
|
except IOError: |
|
print('Vocabulary file not found, expected at ' + vocab_path) |
|
return |
|
|
|
extend_vocab(current_vocab, vocab, max_tokens) |
|
|
|
|
|
with open(vocab_path, 'w') as f: |
|
json.dump(current_vocab, f, sort_keys=True, indent=4, separators=(',',': ')) |
|
|
|
|
|
def extend_vocab(current_vocab, new_vocab, max_tokens=10000): |
|
""" Extends current vocabulary with words from vocab that are not |
|
present in the current vocabulary. Adds up to max_tokens words. |
|
|
|
# Arguments: |
|
current_vocab: Current dictionary of tokens. |
|
new_vocab: Vocabulary to be added. MUST have word_counts populated, i.e. |
|
must have run count_all_words() previously. |
|
max_tokens: Maximum number of words to be added. |
|
|
|
# Returns: |
|
How many new tokens have been added. |
|
""" |
|
if max_tokens < 0: |
|
max_tokens = 10000 |
|
|
|
words = OrderedDict() |
|
|
|
|
|
desc_order = OrderedDict(sorted(new_vocab.word_counts.items(), |
|
key=lambda kv: kv[1], reverse=True)) |
|
words.update(desc_order) |
|
|
|
base_index = len(current_vocab.keys()) |
|
added = 0 |
|
for word in words: |
|
if added >= max_tokens: |
|
break |
|
if word not in current_vocab.keys(): |
|
current_vocab[word] = base_index + added |
|
added += 1 |
|
|
|
return added |
|
|