NCTC / models /research /qa_kg /util /data_reader.py
NCTCMumbai's picture
Upload 2571 files
0b8359d
raw
history blame
7.58 kB
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
from collections import namedtuple
try:
from queue import Queue # Python 3
except ImportError:
from Queue import Queue # Python 2
import re
import threading
import numpy as np
import tensorflow as tf
Data = namedtuple('Data', ['X', 'Y', 'MultiYs', 'qid'])
class SampleBuilder:
def __init__(self, config):
self.config = config
self.kb_raw = self.read_kb()
self.data_raw = self.read_raw_data()
# dictionary of entities, normal words, and relations
self.dict_all = self.gen_dict()
self.reverse_dict_all = dict(
zip(self.dict_all.values(), self.dict_all.keys()))
tf.logging.info('size of dict: %d' % len(self.dict_all))
self.kb = self.build_kb()
self.data_all = self.build_samples()
def read_kb(self):
kb_raw = []
for line in open(self.config.KB_file):
sub, rel, obj = line.strip().split('|')
kb_raw.append((sub, rel, obj))
tf.logging.info('# of KB records: %d' % len(kb_raw))
return kb_raw
def read_raw_data(self):
data = dict()
for name in self.config.data_files:
raw = []
tf.logging.info(
'Reading data file {}'.format(self.config.data_files[name]))
for line in open(self.config.data_files[name]):
question, answers = line.strip().split('\t')
question = question.replace('],', ']') # ignore ',' in the template
raw.append((question, answers))
data[name] = raw
return data
def build_kb(self):
tf.logging.info('Indexing KB...')
kb = []
for sub, rel, obj in self.kb_raw:
kb.append([self.dict_all[sub], self.dict_all[rel], self.dict_all[obj]])
return kb
def gen_dict(self):
s = set()
for sub, rel, obj in self.kb_raw:
s.add(sub)
s.add(rel)
s.add(obj)
for name in self.data_raw:
for question, answers in self.data_raw[name]:
normal = re.split('\[[^\]]+\]', question)
for phrase in normal:
for word in phrase.split():
s.add(word)
s = list(s)
d = {s[idx]: idx for idx in range(len(s))}
return d
def build_samples(self):
def map_entity_idx(text):
entities = re.findall('\[[^\]]+\]', text)
for entity in entities:
entity = entity[1:-1]
index = self.dict_all[entity]
text = text.replace('[%s]' % entity, '@%d' % index)
return text
data_all = dict()
for name in self.data_raw:
X, Y, MultiYs, qid = [], [], [], []
for i, (question, answers) in enumerate(self.data_raw[name]):
qdata, labels = [], []
question = map_entity_idx(question)
for word in question.split():
if word[0] == '@':
qdata.append(int(word[1:]))
else:
qdata.append(self.dict_all[word])
for answer in answers.split('|'):
labels.append(self.dict_all[answer])
if len(qdata) > self.config.T_encoder:
self.config.T_encoder = len(qdata)
for label in labels:
X.append(qdata)
Y.append(label)
MultiYs.append(set(labels))
qid.append(i)
data_all[name] = Data(X=X, Y=Y, MultiYs=MultiYs, qid=qid)
return data_all
def _run_prefetch(prefetch_queue, batch_loader, data, shuffle, one_pass,
config):
assert len(data.X) == len(data.Y) == len(data.MultiYs) == len(data.qid)
num_samples = len(data.X)
batch_size = config.batch_size
n_sample = 0
fetch_order = config.rng.permutation(num_samples)
while True:
sample_ids = fetch_order[n_sample:n_sample + batch_size]
batch = batch_loader.load_one_batch(sample_ids)
prefetch_queue.put(batch, block=True)
n_sample += len(sample_ids)
if n_sample >= num_samples:
if one_pass:
prefetch_queue.put(None, block=True)
n_sample = 0
if shuffle:
fetch_order = config.rng.permutation(num_samples)
class DataReader:
def __init__(self,
config,
data,
assembler,
shuffle=True,
one_pass=False,
prefetch_num=10):
self.config = config
self.data = data
self.assembler = assembler
self.batch_loader = BatchLoader(self.config,
self.data, self.assembler)
self.shuffle = shuffle
self.one_pass = one_pass
self.prefetch_queue = Queue(maxsize=prefetch_num)
self.prefetch_thread = threading.Thread(target=_run_prefetch,
args=(self.prefetch_queue,
self.batch_loader, self.data,
self.shuffle, self.one_pass,
self.config))
self.prefetch_thread.daemon = True
self.prefetch_thread.start()
def batches(self):
while True:
if self.prefetch_queue.empty():
tf.logging.warning('Waiting for data loading (IO is slow)...')
batch = self.prefetch_queue.get(block=True)
if batch is None:
assert self.one_pass
tf.logging.info('One pass finished!')
raise StopIteration()
yield batch
class BatchLoader:
def __init__(self, config,
data, assembler):
self.config = config
self.data = data
self.assembler = assembler
self.T_encoder = config.T_encoder
self.T_decoder = config.T_decoder
tf.logging.info('T_encoder: %d' % self.T_encoder)
tf.logging.info('T_decoder: %d' % self.T_decoder)
tf.logging.info('batch size: %d' % self.config.batch_size)
self.gt_layout_tokens = config.gt_layout_tokens
def load_one_batch(self, sample_ids):
actual_batch_size = len(sample_ids)
input_seq_batch = np.zeros((self.T_encoder, actual_batch_size), np.int32)
seq_len_batch = np.zeros(actual_batch_size, np.int32)
ans_label_batch = np.zeros(actual_batch_size, np.int32)
ans_set_labels_list = [None] * actual_batch_size
question_id_list = [None] * actual_batch_size
gt_layout_batch = np.zeros((self.T_decoder, actual_batch_size), np.int32)
for batch_i in range(actual_batch_size):
idx = sample_ids[batch_i]
seq_len = len(self.data.X[idx])
seq_len_batch[batch_i] = seq_len
input_seq_batch[:seq_len, batch_i] = self.data.X[idx]
ans_label_batch[batch_i] = self.data.Y[idx]
ans_set_labels_list[batch_i] = self.data.MultiYs[idx]
question_id_list[batch_i] = self.data.qid[idx]
gt_layout_batch[:, batch_i] = self.assembler.module_list2tokens(
self.gt_layout_tokens, self.T_decoder)
batch = dict(input_seq_batch=input_seq_batch,
seq_len_batch=seq_len_batch,
ans_label_batch=ans_label_batch,
gt_layout_batch=gt_layout_batch,
ans_set_labels_list=ans_set_labels_list,
question_id_list=question_id_list)
return batch