#! /usr/bin/env python # -*- coding: utf-8 -*- # Copyright 2020 Imperial College London (Pingchuan Ma) # Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0) """ TCN for lipreading""" import os import time import random import argparse # 명령행 인자를 파싱해주는 모듈 import numpy as np from tqdm import tqdm # 작업진행률 표시하는 라이브러리 import csv import torch # 파이토치 import torch.nn as nn # 클래스 # attribute 를 활용해 state 를 저장하고 활용 import torch.nn.functional as F # 함수 # 인스턴스화시킬 필요없이 사용 가능 from lipreading.utils import get_save_folder from lipreading.utils import load_json, save2npz from lipreading.utils import load_model, CheckpointSaver from lipreading.utils import get_logger, update_logger_batch from lipreading.utils import showLR, calculateNorm2, AverageMeter from lipreading.model import Lipreading from lipreading.mixup import mixup_data, mixup_criterion from lipreading.optim_utils import get_optimizer, CosineScheduler from lipreading.dataloaders import get_data_loaders, get_preprocessing_pipelines from pathlib import Path import wandb # 학습 관리 툴 (Loss, Acc 자동 저장) # 인자값을 받아서 처리하는 함수 def load_args(default_config=None): # 인자값을 받을 수 있는 인스턴스 생성 parser = argparse.ArgumentParser(description='Pytorch Lipreading ') # 입력받을 인자값 목록 # -- dataset config parser.add_argument('--dataset', default='lrw', help='dataset selection') parser.add_argument('--num-classes', type=int, default=30, help='Number of classes') parser.add_argument('--modality', default='video', choices=['video', 'raw_audio'], help='choose the modality') # -- directory parser.add_argument('--data-dir', default='./datasets/visual', help='Loaded data directory') parser.add_argument('--label-path', type=str, default='./labels/30VietnameseSort.txt', help='Path to txt file with labels') parser.add_argument('--annonation-direc', default=None, help='Loaded data directory') # -- model config parser.add_argument('--backbone-type', type=str, default='resnet', choices=['resnet', 'shufflenet'], help='Architecture used for backbone') parser.add_argument('--relu-type', type=str, default='relu', choices=['relu','prelu'], help='what relu to use' ) parser.add_argument('--width-mult', type=float, default=1.0, help='Width multiplier for mobilenets and shufflenets') # -- TCN config parser.add_argument('--tcn-kernel-size', type=int, nargs="+", help='Kernel to be used for the TCN module') parser.add_argument('--tcn-num-layers', type=int, default=4, help='Number of layers on the TCN module') parser.add_argument('--tcn-dropout', type=float, default=0.2, help='Dropout value for the TCN module') parser.add_argument('--tcn-dwpw', default=False, action='store_true', help='If True, use the depthwise seperable convolution in TCN architecture') parser.add_argument('--tcn-width-mult', type=int, default=1, help='TCN width multiplier') # -- train parser.add_argument('--training-mode', default='tcn', help='tcn') parser.add_argument('--batch-size', type=int, default=8, help='Mini-batch size') # dafault=32 에서 default=8 (OOM 방지) 로 변경 parser.add_argument('--optimizer',type=str, default='adamw', choices = ['adam','sgd','adamw']) parser.add_argument('--lr', default=3e-4, type=float, help='initial learning rate') parser.add_argument('--init-epoch', default=0, type=int, help='epoch to start at') parser.add_argument('--epochs', default=100, type=int, help='number of epochs') # dafault=80 에서 default=10 (테스트 용도) 로 변경 parser.add_argument('--test', default=False, action='store_true', help='training mode') parser.add_argument('--save-dir', type=Path, default=Path('/kaggle/working/result/')) # -- mixup parser.add_argument('--alpha', default=0.4, type=float, help='interpolation strength (uniform=1., ERM=0.)') # -- test parser.add_argument('--model-path', type=str, default=None, help='Pretrained model pathname') parser.add_argument('--allow-size-mismatch', default=False, action='store_true', help='If True, allows to init from model with mismatching weight tensors. Useful to init from model with diff. number of classes') # -- feature extractor parser.add_argument('--extract-feats', default=False, action='store_true', help='Feature extractor') parser.add_argument('--mouth-patch-path', type=str, default=None, help='Path to the mouth ROIs, assuming the file is saved as numpy.array') parser.add_argument('--mouth-embedding-out-path', type=str, default=None, help='Save mouth embeddings to a specificed path') # -- json pathname parser.add_argument('--config-path', type=str, default=None, help='Model configuration with json format') # -- other vars parser.add_argument('--interval', default=50, type=int, help='display interval') parser.add_argument('--workers', default=2, type=int, help='number of data loading workers') # dafault=8 에서 default=2 (GCP core 4개의 절반) 로 변경 # paths parser.add_argument('--logging-dir', type=str, default='/kaggle/working/train_logs', help = 'path to the directory in which to save the log file') # 입력받은 인자값을 args에 저장 (type: namespace) args = parser.parse_args() return args args = load_args() # args 파싱 및 로드 # 실험 재현을 위해서 난수 고정 torch.manual_seed(1) # 메인 프레임워크인 pytorch 에서 random seed 고정 np.random.seed(1) # numpy 에서 random seed 고정 random.seed(1) # python random 라이브러리에서 random seed 고정 # 참고: 실험 재현하려면 torch.backends.cudnn.deterministic = True, torch.backends.cudnn.benchmark = False 이어야 함 torch.backends.cudnn.benchmark = True # 내장된 cudnn 자동 튜너를 활성화하여, 하드웨어에 맞게 사용할 최상의 알고리즘(텐서 크기나 conv 연산에 맞게)을 찾음 # feature 추출 def extract_feats(model): """ :rtype: FloatTensor """ model.eval() # evaluation 과정에서 사용하지 않아야 하는 layer들을 알아서 off 시키도록 하는 함수 preprocessing_func = get_preprocessing_pipelines()['test'] # test 전처리 mouth_patch_path = args.mouth_patch_path.replace('.','') dir_name = os.path.dirname(os.path.abspath(__file__)) dir_name = dir_name + mouth_patch_path data_paths = [os.path.join(pth, f) for pth, dirs, files in os.walk(dir_name) for f in files] npz_files = np.load(data_paths[0])['data'] data = preprocessing_func(npz_files) # data: TxHxW # data = preprocessing_func(np.load(args.mouth_patch_path)['data']) # data: TxHxW return data_paths[0], model(torch.FloatTensor(data)[None, None, :, :, :].cuda(), lengths=[data.shape[0]]) # return model(torch.FloatTensor(data)[None, None, :, :, :].cuda(), lengths=[data.shape[0]]) # 평가 def evaluate(model, dset_loader, criterion, is_print=False): model.eval() # evaluation 과정에서 사용하지 않아야 하는 layer들을 알아서 off 시키도록 하는 함수 # running_loss = 0. # running_corrects = 0. prediction='' # evaluation/validation 과정에선 보통 model.eval()과 torch.no_grad()를 함께 사용함 with torch.no_grad(): inferences = [] for batch_idx, (input, lengths, labels) in enumerate(tqdm(dset_loader)): # 모델 생성 # input 텐서의 차원을 하나 더 늘리고 gpu 에 할당 logits = model(input.unsqueeze(1).cuda(), lengths=lengths) # _, preds = torch.max(F.softmax(logits, dim=1).data, dim=1) # softmax 적용 후 각 원소 중 최대값 가져오기 # running_corrects += preds.eq(labels.cuda().view_as(preds)).sum().item() # 정확도 계산 # loss = criterion(logits, labels.cuda()) # loss 계산 # running_loss += loss.item() * input.size(0) # loss.item(): loss 가 갖고 있는 scalar 값 # # ------------ Prediction, Confidence 출력 ------------ probs = torch.nn.functional.softmax(logits, dim=-1) probs = probs[0].detach().cpu().numpy() label_path = args.label_path with Path(label_path).open() as fp: vocab = fp.readlines() top = np.argmax(probs) prediction = vocab[top].strip() # confidence = np.round(probs[top], 3) # inferences.append({ # 'prediction': prediction, # 'confidence': confidence # }) f = open("/home/user/app/result/ho.txt", 'a') f.write(prediction+'\n') if is_print: print() print(f'Prediction: {prediction}') # print(f'Confidence: {confidence}') print() return prediction # ------------ Prediction, Confidence 텍스트 파일 저장 ------------ # txt_save_path = str(args.save_dir) + f'/predict.txt' # # 파일 없을 경우 # if not os.path.exists(os.path.dirname(txt_save_path)): # os.makedirs(os.path.dirname(txt_save_path)) # 디렉토리 생성 # with open(txt_save_path, 'w') as f: # for inference in inferences: # prediction = inference['prediction'] # confidence = inference['confidence'] # f.writelines(f'Prediction: {prediction}, Confidence: {confidence}\n') # print('Test Dataset {} In Total \t CR: {}'.format( len(dset_loader.dataset), running_corrects/len(dset_loader.dataset))) # 데이터개수, 정확도 출력 # return running_corrects/len(dset_loader.dataset), running_loss/len(dset_loader.dataset), inferences # 정확도, loss, inferences 반환 # 모델 학습 # def train(wandb, model, dset_loader, criterion, epoch, optimizer, logger): # data_time = AverageMeter() # 평균, 현재값 저장 # batch_time = AverageMeter() # 평균, 현재값 저장 # lr = showLR(optimizer) # LR 변화값 # # 로거 INFO 작성 # logger.info('-' * 10) # logger.info('Epoch {}/{}'.format(epoch, args.epochs - 1)) # epoch 작성 # logger.info('Current learning rate: {}'.format(lr)) # learning rate 작성 # model.train() # train mode # running_loss = 0. # running_corrects = 0. # running_all = 0. # end = time.time() # 현재 시각 # for batch_idx, (input, lengths, labels) in enumerate(dset_loader): # # measure data loading time # data_time.update(time.time() - end) # 평균, 현재값 업데이트 # # -- # # mixup augmentation 계산 # input, labels_a, labels_b, lam = mixup_data(input, labels, args.alpha) # labels_a, labels_b = labels_a.cuda(), labels_b.cuda() # tensor 를 gpu 에 할당 # # Pytorch에서는 gradients값들을 추후에 backward를 해줄때 계속 더해주기 때문 # optimizer.zero_grad() # 항상 backpropagation을 하기전에 gradients를 zero로 만들어주고 시작을 해야 함 # # 모델 생성 # # input 텐서의 차원을 하나 더 늘리고 gpu 에 할당 # logits = model(input.unsqueeze(1).cuda(), lengths=lengths) # loss_func = mixup_criterion(labels_a, labels_b, lam) # mixup 적용 # loss = loss_func(criterion, logits) # loss 계산 # loss.backward() # gradient 계산 # optimizer.step() # 저장된 gradient 값을 이용하여 파라미터를 업데이트 # # measure elapsed time # 경과 시간 측정 # batch_time.update(time.time() - end) # 평균, 현재값 업데이트 # end = time.time() # 현재 시각 # # -- compute running performance # 컴퓨팅 실행 성능 # _, predicted = torch.max(F.softmax(logits, dim=1).data, dim=1) # softmax 적용 후 각 원소 중 최대값 가져오기 # running_loss += loss.item()*input.size(0) # loss.item(): loss 가 갖고 있는 scalar 값 # running_corrects += lam * predicted.eq(labels_a.view_as(predicted)).sum().item() + (1 - lam) * predicted.eq(labels_b.view_as(predicted)).sum().item() # 정확도 계산 # running_all += input.size(0) # # ------------------ wandb 로그 입력 ------------------ # wandb.log({'loss': running_loss, 'acc': running_corrects}, step=epoch) # # -- log intermediate results # 중간 결과 기록 # if batch_idx % args.interval == 0 or (batch_idx == len(dset_loader)-1): # # 로거 INFO 작성 # update_logger_batch( args, logger, dset_loader, batch_idx, running_loss, running_corrects, running_all, batch_time, data_time ) # return model # 모델 반환 # model 설정에 대한 json 작성 def get_model_from_json(): # json 파일이 있는지 확인, 없으면 AssertionError 메시지를 띄움 assert args.config_path.endswith('.json') and os.path.isfile(args.config_path), \ "'.json' config path does not exist. Path input: {}".format(args.config_path) # 원하는 조건의 변수값을 보증하기 위해 사용 args_loaded = load_json( args.config_path) # json 읽어오기 args.backbone_type = args_loaded['backbone_type'] # json 에서 backbone_type 가져오기 args.width_mult = args_loaded['width_mult'] # json 에서 width_mult 가져오기 args.relu_type = args_loaded['relu_type'] # json 에서 relu_type 가져오기 # TCN 옵션 설정 tcn_options = { 'num_layers': args_loaded['tcn_num_layers'], 'kernel_size': args_loaded['tcn_kernel_size'], 'dropout': args_loaded['tcn_dropout'], 'dwpw': args_loaded['tcn_dwpw'], 'width_mult': args_loaded['tcn_width_mult'], } # 립리딩 모델 생성 model = Lipreading( modality=args.modality, num_classes=args.num_classes, tcn_options=tcn_options, backbone_type=args.backbone_type, relu_type=args.relu_type, width_mult=args.width_mult, extract_feats=args.extract_feats).cuda() calculateNorm2(model) # 모델 학습이 잘 진행되는지 확인 - 일반적으로 parameter norm(L2)은 학습이 진행될수록 커져야 함 return model # 모델 반환 # main() 함수 def main(): # wandb 연결 # wandb.init(project="Lipreading_using_TCN_running") # wandb.config = { # "learning_rate": args.lr, # "epochs": args.epochs, # "batch_size": args.batch_size # } # os.environ['CUDA_LAUNCH_BLOCKING']="1" # os.environ["CUDA_VISIBLE_DEVICES"]="0" # GPU 선택 코드 추가 # -- logging save_path = get_save_folder( args) # 저장 디렉토리 print("Model and log being saved in: {}".format(save_path)) # 저장 디렉토리 경로 출력 logger = get_logger(args, save_path) # 로거 생성 및 설정 ckpt_saver = CheckpointSaver(save_path) # 체크포인트 저장 설정 # -- get model model = get_model_from_json() # -- get dataset iterators dset_loaders = get_data_loaders(args) # -- get loss function criterion = nn.CrossEntropyLoss() # -- get optimizer optimizer = get_optimizer(args, optim_policies=model.parameters()) # -- get learning rate scheduler scheduler = CosineScheduler(args.lr, args.epochs) # 코사인 스케줄러 설정 if args.model_path: # tar 파일이 있는지 확인, 없으면 AssertionError 메시지를 띄움 assert args.model_path.endswith('.tar') and os.path.isfile(args.model_path), \ "'.tar' model path does not exist. Path input: {}".format(args.model_path) # 원하는 조건의 변수값을 보증하기 위해 사용 # resume from checkpoint if args.init_epoch > 0: model, optimizer, epoch_idx, ckpt_dict = load_model(args.model_path, model, optimizer) # 모델 불러오기 args.init_epoch = epoch_idx # epoch 설정 ckpt_saver.set_best_from_ckpt(ckpt_dict) # best 체크포인트 저장 logger.info('Model and states have been successfully loaded from {}'.format( args.model_path )) # 로거 INFO 작성 # init from trained model else: model = load_model(args.model_path, model, allow_size_mismatch=args.allow_size_mismatch) # 모델 불러오기 logger.info('Model has been successfully loaded from {}'.format( args.model_path )) # 로거 INFO 작성 # feature extraction if args.mouth_patch_path: filename, embeddings = extract_feats(model) filename = filename.split('/')[-1] save_npz_path = os.path.join(args.mouth_embedding_out_path, filename) # ExtractEmbedding 은 코드 수정이 필요함! save2npz(save_npz_path, data = embeddings.cpu().detach().numpy()) # npz 파일 저장 # save2npz( args.mouth_embedding_out_path, data = extract_feats(model).cpu().detach().numpy()) # npz 파일 저장 return # if test-time, performance on test partition and exit. Otherwise, performance on validation and continue (sanity check for reload) if args.test: predicthi = evaluate(model, dset_loaders['test'], criterion, is_print=False) # 모델 평가 # logging_sentence = 'Test-time performance on partition {}: Loss: {:.4f}\tAcc:{:.4f}'.format( 'test', loss_avg_test, acc_avg_test) # logger.info(logging_sentence) # 로거 INFO 작성 return predicthi # -- fix learning rate after loading the ckeckpoint (latency) if args.model_path and args.init_epoch > 0: scheduler.adjust_lr(optimizer, args.init_epoch-1) # learning rate 업데이트 epoch = args.init_epoch # epoch 초기화 while epoch < args.epochs: model = train(wandb, model, dset_loaders['train'], criterion, epoch, optimizer, logger) # 모델 학습 acc_avg_val, loss_avg_val, inferences = evaluate(model, dset_loaders['val'], criterion) # 모델 평가 logger.info('{} Epoch:\t{:2}\tLoss val: {:.4f}\tAcc val:{:.4f}, LR: {}'.format('val', epoch, loss_avg_val, acc_avg_val, showLR(optimizer))) # 로거 INFO 작성 # -- save checkpoint # 체크포인트 상태 기록 save_dict = { 'epoch_idx': epoch + 1, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict() } ckpt_saver.save(save_dict, acc_avg_val) # 체크포인트 저장 scheduler.adjust_lr(optimizer, epoch) # learning rate 업데이트 epoch += 1 # -- evaluate best-performing epoch on test partition # test 데이터로 best 성능의 epoch 평가 best_fp = os.path.join(ckpt_saver.save_dir, ckpt_saver.best_fn) # best 체크포인트 경로 _ = load_model(best_fp, model) # 모델 불러오기 acc_avg_test, loss_avg_test, inferences = evaluate(model, dset_loaders['test'], criterion) # 모델 평가 logger.info('Test time performance of best epoch: {} (loss: {})'.format(acc_avg_test, loss_avg_test)) # 로거 INFO 작성 torch.cuda.empty_cache() # GPU 캐시 데이터 삭제 # 해당 모듈이 임포트된 경우가 아니라 인터프리터에서 직접 실행된 경우에만, if문 이하의 코드를 돌리라는 명령 # => main.py 실행할 경우 제일 먼저 호출되는 부분 if __name__ == '__main__': # 현재 스크립트 파일이 실행되는 상태 파악 main() # main() 함수 호출