# -*- coding: utf-8 -*- from __future__ import print_function, division import glob import json import uuid from copy import deepcopy from collections import defaultdict, OrderedDict import numpy as np from torchmoji.filter_utils import is_special_token from torchmoji.word_generator import WordGenerator from torchmoji.global_variables import SPECIAL_TOKENS, VOCAB_PATH class VocabBuilder(): """ Create vocabulary with words extracted from sentences as fed from a word generator. """ def __init__(self, word_gen): # initialize any new key with value of 0 self.word_counts = defaultdict(lambda: 0, {}) self.word_length_limit=30 for token in SPECIAL_TOKENS: assert len(token) < self.word_length_limit self.word_counts[token] = 0 self.word_gen = word_gen def count_words_in_sentence(self, words): """ Generates word counts for all tokens in the given sentence. # Arguments: words: Tokenized sentence whose words should be counted. """ for word in words: if 0 < len(word) and len(word) <= self.word_length_limit: try: self.word_counts[word] += 1 except KeyError: self.word_counts[word] = 1 def save_vocab(self, path=None): """ Saves the vocabulary into a file. # Arguments: path: Where the vocabulary should be saved. If not specified, a randomly generated filename is used instead. """ dtype = ([('word','|S{}'.format(self.word_length_limit)),('count','int')]) np_dict = np.array(self.word_counts.items(), dtype=dtype) # sort from highest to lowest frequency np_dict[::-1].sort(order='count') data = np_dict if path is None: path = str(uuid.uuid4()) np.savez_compressed(path, data=data) print("Saved dict to {}".format(path)) def get_next_word(self): """ Returns next tokenized sentence from the word geneerator. # Returns: List of strings, representing the next tokenized sentence. """ return self.word_gen.__iter__().next() def count_all_words(self): """ Generates word counts for all words in all sentences of the word generator. """ for words, _ in self.word_gen: self.count_words_in_sentence(words) class MasterVocab(): """ Combines vocabularies. """ def __init__(self): # initialize custom tokens self.master_vocab = {} def populate_master_vocab(self, vocab_path, min_words=1, force_appearance=None): """ Populates the master vocabulary using all vocabularies found in the given path. Vocabularies should be named *.npz. Expects the vocabularies to be numpy arrays with counts. Normalizes the counts and combines them. # Arguments: vocab_path: Path containing vocabularies to be combined. min_words: Minimum amount of occurences a word must have in order to be included in the master vocabulary. force_appearance: Optional vocabulary filename that will be added to the master vocabulary no matter what. This vocabulary must be present in vocab_path. """ paths = glob.glob(vocab_path + '*.npz') sizes = {path: 0 for path in paths} dicts = {path: {} for path in paths} # set up and get sizes of individual dictionaries for path in paths: np_data = np.load(path)['data'] for entry in np_data: word, count = entry if count < min_words: continue if is_special_token(word): continue dicts[path][word] = count sizes[path] = sum(dicts[path].values()) print('Overall word count for {} -> {}'.format(path, sizes[path])) print('Overall word number for {} -> {}'.format(path, len(dicts[path]))) vocab_of_max_size = max(sizes, key=sizes.get) max_size = sizes[vocab_of_max_size] print('Min: {}, {}, {}'.format(sizes, vocab_of_max_size, max_size)) # can force one vocabulary to always be present if force_appearance is not None: force_appearance_path = [p for p in paths if force_appearance in p][0] force_appearance_vocab = deepcopy(dicts[force_appearance_path]) print(force_appearance_path) else: force_appearance_path, force_appearance_vocab = None, None # normalize word counts before inserting into master dict for path in paths: normalization_factor = max_size / sizes[path] print('Norm factor for path {} -> {}'.format(path, normalization_factor)) for word in dicts[path]: if is_special_token(word): print("SPECIAL - ", word) continue normalized_count = dicts[path][word] * normalization_factor # can force one vocabulary to always be present if force_appearance_vocab is not None: try: force_word_count = force_appearance_vocab[word] except KeyError: continue #if force_word_count < 5: #continue if word in self.master_vocab: self.master_vocab[word] += normalized_count else: self.master_vocab[word] = normalized_count print('Size of master_dict {}'.format(len(self.master_vocab))) print("Hashes for master dict: {}".format( len([w for w in self.master_vocab if '#' in w[0]]))) def save_vocab(self, path_count, path_vocab, word_limit=100000): """ Saves the master vocabulary into a file. """ # reserve space for 10 special tokens words = OrderedDict() for token in SPECIAL_TOKENS: # store -1 instead of np.inf, which can overflow words[token] = -1 # sort words by frequency desc_order = OrderedDict(sorted(self.master_vocab.items(), key=lambda kv: kv[1], reverse=True)) words.update(desc_order) # use encoding of up to 30 characters (no token conversions) # use float to store large numbers (we don't care about precision loss) np_vocab = np.array(words.items(), dtype=([('word','|S30'),('count','float')])) # output count for debugging counts = np_vocab[:word_limit] np.savez_compressed(path_count, counts=counts) # output the index of each word for easy lookup final_words = OrderedDict() for i, w in enumerate(words.keys()[:word_limit]): final_words.update({w:i}) with open(path_vocab, 'w') as f: f.write(json.dumps(final_words, indent=4, separators=(',', ': '))) def all_words_in_sentences(sentences): """ Extracts all unique words from a given list of sentences. # Arguments: sentences: List or word generator of sentences to be processed. # Returns: List of all unique words contained in the given sentences. """ vocab = [] if isinstance(sentences, WordGenerator): sentences = [s for s, _ in sentences] for sentence in sentences: for word in sentence: if word not in vocab: vocab.append(word) return vocab def extend_vocab_in_file(vocab, max_tokens=10000, vocab_path=VOCAB_PATH): """ Extends JSON-formatted vocabulary with words from vocab that are not present in the current vocabulary. Adds up to max_tokens words. Overwrites file in vocab_path. # Arguments: new_vocab: Vocabulary to be added. MUST have word_counts populated, i.e. must have run count_all_words() previously. max_tokens: Maximum number of words to be added. vocab_path: Path to the vocabulary json which is to be extended. """ try: with open(vocab_path, 'r') as f: current_vocab = json.load(f) except IOError: print('Vocabulary file not found, expected at ' + vocab_path) return extend_vocab(current_vocab, vocab, max_tokens) # Save back to file with open(vocab_path, 'w') as f: json.dump(current_vocab, f, sort_keys=True, indent=4, separators=(',',': ')) def extend_vocab(current_vocab, new_vocab, max_tokens=10000): """ Extends current vocabulary with words from vocab that are not present in the current vocabulary. Adds up to max_tokens words. # Arguments: current_vocab: Current dictionary of tokens. new_vocab: Vocabulary to be added. MUST have word_counts populated, i.e. must have run count_all_words() previously. max_tokens: Maximum number of words to be added. # Returns: How many new tokens have been added. """ if max_tokens < 0: max_tokens = 10000 words = OrderedDict() # sort words by frequency desc_order = OrderedDict(sorted(new_vocab.word_counts.items(), key=lambda kv: kv[1], reverse=True)) words.update(desc_order) base_index = len(current_vocab.keys()) added = 0 for word in words: if added >= max_tokens: break if word not in current_vocab.keys(): current_vocab[word] = base_index + added added += 1 return added