Spaces:
Running
on
Zero
Running
on
Zero
from adaface.adaface_wrapper import AdaFaceWrapper | |
import torch | |
#import torch.nn.functional as F | |
from PIL import Image | |
import numpy as np | |
import os, argparse, glob, re, shutil | |
def str2bool(v): | |
if isinstance(v, bool): | |
return v | |
if v.lower() in ("yes", "true", "t", "y", "1"): | |
return True | |
elif v.lower() in ("no", "false", "f", "n", "0"): | |
return False | |
else: | |
raise argparse.ArgumentTypeError("Boolean value expected.") | |
def seed_everything(seed): | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
torch.backends.cudnn.deterministic = True | |
torch.backends.cudnn.benchmark = False | |
os.environ["PL_GLOBAL_SEED"] = str(seed) | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--base_model_path", type=str, default='models/realisticvision/realisticVisionV40_v40VAE.safetensors', | |
help="Path to the UNet checkpoint (default: RealisticVision 4.0)") | |
parser.add_argument("--embman_ckpt", type=str, required=True, | |
help="Path to the checkpoint of the embedding manager") | |
parser.add_argument("--in_folder", type=str, required=True, help="Path to the folder containing input images") | |
# If True, the input folder contains images of mixed subjects. | |
# If False, the input folder contains multiple subfolders, each of which contains images of the same subject. | |
parser.add_argument("--is_mix_subj_folder", type=str2bool, const=True, default=False, nargs="?", | |
help="Whether the input folder contains images of mixed subjects") | |
parser.add_argument("--max_images_per_subject", type=int, default=5, help="Number of example images used per subject") | |
parser.add_argument("--trans_subject_count", type=int, default=-1, help="Number of example images to be translated") | |
parser.add_argument("--out_folder", type=str, required=True, help="Path to the folder saving output images") | |
parser.add_argument("--out_count_per_input_image", type=int, default=1, help="Number of output images to generate per input image") | |
parser.add_argument("--copy_masks", action="store_true", help="Copy the mask images to the output folder") | |
parser.add_argument("--noise", dest='noise_level', type=float, default=0) | |
parser.add_argument("--scale", dest='guidance_scale', type=float, default=4, | |
help="Guidance scale for the diffusion model") | |
parser.add_argument("--ref_img_strength", type=float, default=0.8, | |
help="Strength of the reference image in the output image.") | |
parser.add_argument("--subject_string", | |
type=str, default="z", | |
help="Subject placeholder string used in prompts to denote the concept.") | |
parser.add_argument("--num_vectors", type=int, default=16, | |
help="Number of vectors used to represent the subject.") | |
parser.add_argument("--prompt", type=str, default="a person z") | |
parser.add_argument("--num_images_per_row", type=int, default=4, | |
help="Number of images to display in a row in the output grid image.") | |
parser.add_argument("--num_inference_steps", type=int, default=50, | |
help="Number of DDIM inference steps") | |
parser.add_argument("--num_gpus", type=int, default=1, help="Number of GPUs to use. If num_gpus > 1, use accelerate for distributed execution.") | |
parser.add_argument("--device", type=str, default="cuda", help="Device to run the model on") | |
parser.add_argument("--seed", type=int, default=42, | |
help="the seed (for reproducible sampling). Set to -1 to disable.") | |
args = parser.parse_args() | |
return args | |
if __name__ == "__main__": | |
args = parse_args() | |
if args.seed != -1: | |
seed_everything(args.seed) | |
# screen -dm -L -Logfile trans_rv4-2.txt accelerate launch --multi_gpu --num_processes=2 scripts/adaface-translate.py | |
# --embman_ckpt logs/subjects-celebrity2024-05-16T17-22-46_zero3-ada/checkpoints/embeddings_gs-30000.pt | |
# --base_model_path models/realisticvision/realisticVisionV40_v40VAE.safetensors --in_folder /data/shaohua/VGGface2_HQ_masks/ | |
# --is_mix_subj_folder 0 --out_folder /data/shaohua/VGGface2_HQ_masks_rv4a --copy_masks --num_gpus 2 | |
if args.num_gpus > 1: | |
from accelerate import PartialState | |
distributed_state = PartialState() | |
args.device = distributed_state.device | |
process_index = distributed_state.process_index | |
elif re.match(r"^\d+$", args.device): | |
args.device = f"cuda:{args.device}" | |
distributed_state = None | |
process_index = 0 | |
adaface = AdaFaceWrapper("img2img", args.base_model_path, args.embman_ckpt, args.device, | |
args.subject_string, args.num_vectors, args.num_inference_steps) | |
in_folder = args.in_folder | |
if os.path.isfile(in_folder): | |
subject_folders = [ os.path.dirname(in_folder) ] | |
images_by_subject = [[in_folder]] | |
else: | |
if not args.is_mix_subj_folder: | |
in_folders = [in_folder] | |
else: | |
in_folders = [ os.path.join(in_folder, subfolder) for subfolder in sorted(os.listdir(in_folder)) ] | |
images_by_subject = [] | |
subject_folders = [] | |
for in_folder in in_folders: | |
image_types = ["*.jpg", "*.png", "*.jpeg"] | |
alltype_image_paths = [] | |
for image_type in image_types: | |
# glob returns the full path. | |
image_paths = glob.glob(os.path.join(in_folder, image_type)) | |
if len(image_paths) > 0: | |
alltype_image_paths.extend(image_paths) | |
# Filter out images of "*_mask.png" | |
alltype_image_paths = [image_path for image_path in alltype_image_paths if "_mask.png" not in image_path] | |
alltype_image_paths = sorted(alltype_image_paths) | |
if not args.is_mix_subj_folder: | |
# image_paths contain at most args.max_images_per_subject full image paths. | |
if args.max_images_per_subject > 0: | |
image_paths = alltype_image_paths[:args.max_images_per_subject] | |
else: | |
image_paths = alltype_image_paths | |
images_by_subject.append(image_paths) | |
subject_folders.append(in_folder) | |
else: | |
# Each image in the folder is treated as an individual subject. | |
images_by_subject.extend([[image_path] for image_path in alltype_image_paths]) | |
subject_folders.extend([in_folder] * len(alltype_image_paths)) | |
if args.trans_subject_count > 0 and len(subject_folders) >= args.trans_subject_count: | |
break | |
if args.trans_subject_count > 0: | |
images_by_subject = images_by_subject[:args.trans_subject_count] | |
subject_folders = subject_folders[:args.trans_subject_count] | |
out_image_count = 0 | |
out_mask_count = 0 | |
if not args.out_folder.endswith("/"): | |
args.out_folder += "/" | |
if args.num_gpus > 1: | |
# Split the subjects across the GPUs. | |
subject_folders = subject_folders[process_index::args.num_gpus] | |
images_by_subject = images_by_subject[process_index::args.num_gpus] | |
#subject_folders, images_by_subject = distributed_state.split_between_processes(zip(subject_folders, images_by_subject)) | |
for (subject_folder, image_paths) in zip(subject_folders, images_by_subject): | |
# If is_mix_subj_folder, then image_paths only contains 1 image, and we use the file name as the signature of the image. | |
# Otherwise, we use the folder name as the signature of the images. | |
images_sig = subject_folder if not args.is_mix_subj_folder else os.path.basename(image_paths[0]) | |
print(f"Translating {images_sig}...") | |
with torch.no_grad(): | |
adaface_subj_embs = adaface.generate_adaface_embeddings(image_paths, subject_folder, None, False, | |
out_id_embs_scale=1, noise_level=args.noise_level, | |
update_text_encoder=True) | |
# Replace the first occurrence of "in_folder" with "out_folder" in the path of the subject_folder. | |
subject_out_folder = subject_folder.replace(args.in_folder, args.out_folder, 1) | |
if not os.path.exists(subject_out_folder): | |
os.makedirs(subject_out_folder) | |
print(f"Output images will be saved to {subject_out_folder}") | |
in_images = [] | |
for image_path in image_paths: | |
image = Image.open(image_path).convert("RGB").resize((512, 512)) | |
# [512, 512, 3] -> [3, 512, 512]. | |
image = np.array(image).transpose(2, 0, 1) | |
# Convert the image to a tensor of shape (1, 3, 512, 512) and move it to the GPU. | |
image = torch.tensor(image).unsqueeze(0).float().cuda() | |
in_images.append(image) | |
# Put all input images of the subject into a batch. This assumes max_images_per_subject is small. | |
# NOTE: For simplicity, we do not check overly large batch sizes. | |
in_images = torch.cat(in_images, dim=0) | |
# in_images: [5, 3, 512, 512]. | |
# Normalize the pixel values to [0, 1]. | |
in_images = in_images / 255.0 | |
num_out_images = len(in_images) * args.out_count_per_input_image | |
with torch.no_grad(): | |
# args.noise_level: the *relative* std of the noise added to the face embeddings. | |
# A noise level of 0.08 could change gender, but 0.06 is usually safe. | |
# The returned adaface_subj_embs are already incorporated in the text encoder, and not used explicitly. | |
# NOTE: We assume out_count_per_input_image == 1, so that the output images are of the same number as the input images. | |
out_images = adaface(in_images, args.prompt, args.guidance_scale, num_out_images, ref_img_strength=args.ref_img_strength) | |
for img_i, img in enumerate(out_images): | |
# out_images: subj_1, subj_2, ..., subj_n, subj_1, subj_2, ..., subj_n, ... | |
subj_i = img_i % len(in_images) | |
copy_i = img_i // len(in_images) | |
image_filename_stem, image_fileext = os.path.splitext(os.path.basename(image_paths[subj_i])) | |
if copy_i == 0: | |
img.save(os.path.join(subject_out_folder, f"{image_filename_stem}{image_fileext}")) | |
else: | |
img.save(os.path.join(subject_out_folder, f"{image_filename_stem}_{copy_i}{image_fileext}")) | |
if args.copy_masks: | |
mask_path = image_paths[subj_i].replace(image_fileext, "_mask.png") | |
if os.path.exists(mask_path): | |
if copy_i == 0: | |
shutil.copy(mask_path, subject_out_folder) | |
else: | |
mask_filename_stem = image_filename_stem | |
shutil.copy(mask_path, os.path.join(subject_out_folder, f"{mask_filename_stem}_{copy_i}_mask.png")) | |
out_mask_count += 1 | |
out_image_count += len(out_images) | |
print(f"{out_image_count} output images and {out_mask_count} masks saved to {args.out_folder}") | |