import time | |
from config import * | |
import cv2 | |
import glob | |
import numpy as np | |
import os | |
from basicsr.utils import imwrite | |
from pathos.pools import ParallelPool | |
import subprocess | |
import platform | |
from mutagen.wave import WAVE | |
import tqdm | |
from p_tqdm import * | |
import torch | |
from PIL import Image | |
from RealESRGAN import RealESRGAN | |
def vid2frames(vidPath, framesOutPath): | |
print(vidPath) | |
print(framesOutPath) | |
vidcap = cv2.VideoCapture(vidPath) | |
success,image = vidcap.read() | |
frame = 1 | |
while success: | |
cv2.imwrite(os.path.join(framesOutPath, str(frame).zfill(5) + '.png'), image) | |
success,image = vidcap.read() | |
frame += 1 | |
def restore_frames(audiofilePath, videoOutPath, improveOutputPath): | |
no_of_frames = count_files(improveOutputPath) | |
audio_duration = get_audio_duration(audiofilePath) | |
framesPath = improveOutputPath + "/%5d.png" | |
fps = no_of_frames/audio_duration | |
command = f"ffmpeg -y -r {fps} -f image2 -i {framesPath} -i {audiofilePath} -vcodec mpeg4 -b:v 20000k {videoOutPath}" | |
print(command) | |
subprocess.call(command, shell=platform.system() != 'Windows') | |
def get_audio_duration(audioPath): | |
audio = WAVE(audioPath) | |
duration = audio.info.length | |
return duration | |
def count_files(directory): | |
return len([name for name in os.listdir(directory) if os.path.isfile(os.path.join(directory, name))]) | |
def improve(disassembledPath, improvedPath): | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
model = RealESRGAN(device, scale=4) | |
model.load_weights('weights/RealESRGAN_x4.pth', download=True) | |
files = glob.glob(os.path.join(disassembledPath,"*.png")) | |
# pool = ParallelPool(nodes=20) | |
# results = pool.amap(real_esrgan, files, [model]*len(files), [improvedPath] * len(files)) | |
results = t_map(real_esrgan, files, [model]*len(files), [improvedPath] * len(files)) | |
def real_esrgan(img_path, model, improvedPath): | |
image = Image.open(img_path).convert('RGB') | |
sr_image = model.predict(image) | |
img_name = os.path.basename(img_path) | |
sr_image.save(os.path.join(improvedPath, img_name)) | |
# def process(img_path, improveOutputPath): | |
# only_center_face=True | |
# aligned=True | |
# ext='auto' | |
# weight=0.5 | |
# upscale=1 | |
# arch = 'clean' | |
# channel_multiplier = 2 | |
# model_name = 'GFPGANv1.3' | |
# url = 'https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.3.pth' | |
# # determine model paths | |
# model_path = os.path.join('gfpgan_models', model_name + '.pth') | |
# if not os.path.isfile(model_path): | |
# model_path = os.path.join('gfpgan/weights', model_name + '.pth') | |
# if not os.path.isfile(model_path): | |
# # download pre-trained models from url | |
# model_path = url | |
# restorer = GFPGANer( | |
# model_path=model_path, | |
# upscale=upscale, | |
# arch=arch, | |
# channel_multiplier=channel_multiplier, | |
# bg_upsampler=None) | |
# # read image | |
# img_name = os.path.basename(img_path) | |
# basename, ext = os.path.splitext(img_name) | |
# input_img = cv2.imread(img_path, cv2.IMREAD_COLOR) | |
# # restore faces and background if necessary | |
# cropped_faces, restored_faces, restored_img = restorer.enhance( | |
# input_img, | |
# has_aligned=aligned, | |
# only_center_face=only_center_face, | |
# paste_back=True, | |
# weight=weight) | |
# # save faces | |
# for idx, (cropped_face, restored_face) in enumerate(zip(cropped_faces, restored_faces)): | |
# # save cropped face | |
# save_crop_path = os.path.join(improveOutputPath, 'cropped_faces', f'{basename}.png') | |
# imwrite(cropped_face, save_crop_path) | |
# # save restored face | |
# save_face_name = f'{basename}.png' | |
# save_restore_path = os.path.join(improveOutputPath, 'restored_faces', save_face_name) | |
# imwrite(restored_face, save_restore_path) | |
# # save comparison image | |
# cmp_img = np.concatenate((cropped_face, restored_face), axis=1) | |
# imwrite(cmp_img, os.path.join(improveOutputPath, 'cmp', f'{basename}.png')) | |
# # save restored img | |
# if restored_img is not None: | |
# if ext == 'auto': | |
# extension = ext[1:] | |
# else: | |
# extension = ext | |
# save_restore_path = os.path.join(improveOutputPath, 'restored_imgs', f'{basename}.{extension}') | |
# imwrite(restored_img, save_restore_path) | |
# print(f'Processed {img_name} ...') | |
# def improve_faces(improveInputPath, improveOutputPath): | |
# if improveInputPath.endswith('/'): | |
# improveInputPath = improveInputPath[:-1] | |
# if os.path.isfile(improveInputPath): | |
# img_list = [improveInputPath] | |
# else: | |
# img_list = sorted(glob.glob(os.path.join(improveInputPath, '*'))) | |
# os.makedirs(improveInputPath, exist_ok=True) | |
# os.makedirs(improveOutputPath, exist_ok=True) | |
# pool = ParallelPool(nodes=10) | |
# results = pool.amap(process, img_list, [improveOutputPath] * len(img_list)) | |
# while not results.ready(): | |
# time.sleep(5); print(".", end=' ') | |