|
import csv |
|
import torch |
|
from sklearn.metrics import accuracy_score, precision_score, recall_score, classification_report, confusion_matrix |
|
import shutil |
|
import numpy as np |
|
|
|
|
|
class AverageMeter(object): |
|
"""Computes and stores the average and current value""" |
|
|
|
def __init__(self): |
|
self.reset() |
|
|
|
def reset(self): |
|
self.val = 0 |
|
self.avg = 0 |
|
self.sum = 0 |
|
self.count = 0 |
|
|
|
def update(self, val, n=1): |
|
self.val = val |
|
self.sum += val * n |
|
self.count += n |
|
self.avg = self.sum / self.count |
|
|
|
|
|
class Logger(object): |
|
|
|
def __init__(self, path, header): |
|
self.log_file = open(path, 'w') |
|
self.logger = csv.writer(self.log_file, delimiter='\t') |
|
|
|
self.logger.writerow(header) |
|
self.header = header |
|
|
|
def __del(self): |
|
self.log_file.close() |
|
|
|
def log(self, values): |
|
write_values = [] |
|
for col in self.header: |
|
assert col in values |
|
write_values.append(values[col]) |
|
|
|
self.logger.writerow(write_values) |
|
self.log_file.flush() |
|
|
|
|
|
class Queue: |
|
|
|
def __init__(self, max_size, n_classes): |
|
self.queue = list(np.zeros((max_size, n_classes), dtype=float).tolist()) |
|
self.max_size = max_size |
|
self.median = None |
|
self.ma = None |
|
self.ewma = None |
|
|
|
|
|
def enqueue(self, data): |
|
self.queue.insert(0, data) |
|
self.median = self._median() |
|
self.ma = self._ma() |
|
self.ewma = self._ewma() |
|
return True |
|
|
|
|
|
def dequeue(self): |
|
if len(self.queue) > 0: |
|
return self.queue.pop() |
|
return ("Queue Empty!") |
|
|
|
|
|
def size(self): |
|
return len(self.queue) |
|
|
|
|
|
def printQueue(self): |
|
return self.queue |
|
|
|
|
|
def _ma(self): |
|
return np.array(self.queue[:self.max_size]).mean(axis=0) |
|
|
|
|
|
def _median(self): |
|
return np.median(np.array(self.queue[:self.max_size]), axis=0) |
|
|
|
|
|
def _ewma(self): |
|
weights = np.exp(np.linspace(-1., 0., self.max_size)) |
|
weights /= weights.sum() |
|
average = weights.reshape(1, self.max_size).dot(np.array(self.queue[:self.max_size])) |
|
return average.reshape(average.shape[1], ) |
|
|
|
|
|
def LevenshteinDistance(a, b): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"Calculates the Levenshtein distance between a and b." |
|
n, m = len(a), len(b) |
|
if n > m: |
|
|
|
a, b = b, a |
|
n, m = m, n |
|
|
|
current = range(n + 1) |
|
for i in range(1, m + 1): |
|
previous, current = current, [i] + [0] * n |
|
for j in range(1, n + 1): |
|
add, delete = previous[j] + 1, current[j - 1] + 1 |
|
change = previous[j - 1] |
|
if a[j - 1] != b[i - 1]: |
|
change = change + 1 |
|
current[j] = min(add, delete, change) |
|
if current[n] < 0: |
|
return 0 |
|
else: |
|
return current[n] |
|
|
|
|
|
def load_value_file(file_path): |
|
with open(file_path, 'r') as input_file: |
|
value = float(input_file.read().rstrip('\n\r')) |
|
|
|
return value |
|
|
|
|
|
def calculate_accuracy(output, target, topk=(1,)): |
|
"""Computes the precision@k for the specified values of k""" |
|
maxk = max(topk) |
|
batch_size = target.size(0) |
|
|
|
_, pred = output.topk(maxk, 1, True, True) |
|
pred = pred.t() |
|
correct = pred.eq(target.view(1, -1).expand_as(pred)) |
|
|
|
res = [] |
|
for k in topk: |
|
correct_k = correct[:k].view(-1).float().sum(0) |
|
res.append(correct_k.mul_(100.0 / batch_size)) |
|
return res |
|
|
|
|
|
def calculate_precision(outputs, targets): |
|
_, pred = outputs.topk(1, 1, True) |
|
pred = pred.t() |
|
return precision_score(targets.view(-1), pred.view(-1), average='macro') |
|
|
|
|
|
def calculate_recall(outputs, targets): |
|
_, pred = outputs.topk(1, 1, True) |
|
pred = pred.t() |
|
return recall_score(targets.view(-1), pred.view(-1), average='macro') |
|
|
|
|
|
def save_checkpoint(state, is_best): |
|
|
|
|
|
|
|
|
|
torch.save(state, './_checkpoint.pth') |
|
if is_best: |
|
shutil.copyfile('./_checkpoint.pth', |
|
'./_best.pth') |
|
|
|
|
|
def adjust_learning_rate(optimizer, epoch, lr_steps=[15, 25, 35, 45, 60, 50, 200, 250]): |
|
"""Sets the learning rate to the initial LR decayed by 10 every 30 epochs""" |
|
lr_new = 0.1 * (0.1 ** (sum(epoch >= np.array(lr_steps)))) |
|
for param_group in optimizer.param_groups: |
|
param_group['lr'] = lr_new |
|
|
|
|