#!/usr/bin/env python from __future__ import annotations import argparse import functools import os import pathlib import subprocess import sys import urllib import zipfile from typing import Callable # workaround for https://github.com/gradio-app/gradio/issues/483 command = 'pip install -U gradio==2.7.0' subprocess.call(command.split()) command = 'pip install -r DeepDanbooru/requirements.txt' subprocess.call(command.split()) sys.path.insert(0, 'DeepDanbooru') import deepdanbooru as dd import gradio as gr import huggingface_hub import numpy as np import PIL.Image import tensorflow as tf TOKEN = os.environ['TOKEN'] ZIP_PATH = 'data.zip' TAG_PATH = 'tags.txt' MODEL_PATH = 'model-resnet_custom_v3.h5' def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument('--score-slider-step', type=float, default=0.05) parser.add_argument('--score-threshold', type=float, default=0.5) parser.add_argument('--theme', type=str, default='dark-grass') parser.add_argument('--live', action='store_true') parser.add_argument('--share', action='store_true') parser.add_argument('--port', type=int) parser.add_argument('--disable-queue', dest='enable_queue', action='store_false') parser.add_argument('--allow-flagging', type=str, default='never') parser.add_argument('--allow-screenshot', action='store_true') return parser.parse_args() def download_sample_images() -> list[pathlib.Path]: image_dir = pathlib.Path('samples') image_dir.mkdir(exist_ok=True) dataset_repo = 'hysts/sample-images-TADNE' n_images = 36 paths = [] for index in range(n_images): path = huggingface_hub.hf_hub_download(dataset_repo, f'{index:02d}.jpg', repo_type='dataset', cache_dir=image_dir.as_posix(), use_auth_token=TOKEN) paths.append(pathlib.Path(path)) return paths def download_model_data() -> None: url = 'https://github.com/KichangKim/DeepDanbooru/releases/download/v3-20200915-sgd-e30/deepdanbooru-v3-20200915-sgd-e30.zip' urllib.request.urlretrieve(url, ZIP_PATH) with zipfile.ZipFile(ZIP_PATH) as f: f.extract(TAG_PATH) f.extract(MODEL_PATH) def predict(image: PIL.Image.Image, score_threshold: float, model, labels: list[str]) -> dict[str, float]: _, height, width, _ = model.input_shape image = np.asarray(image) image = tf.image.resize(image, size=(height, width), method=tf.image.ResizeMethod.AREA, preserve_aspect_ratio=True) image = image.numpy() image = dd.image.transform_and_pad_image(image, width, height) image = image / 255. probs = model.predict(image[None, ...])[0] probs = probs.astype(float) res = dict() for prob, label in zip(probs, labels): if prob < score_threshold: continue res[label] = prob return res def main(): gr.close_all() args = parse_args() image_paths = download_sample_images() examples = [[path.as_posix(), args.score_threshold] for path in image_paths] zip_path = pathlib.Path(ZIP_PATH) if not zip_path.exists(): download_model_data() model = tf.keras.models.load_model(MODEL_PATH) with open(TAG_PATH) as f: labels = [line.strip() for line in f.readlines()] func = functools.partial(predict, model=model, labels=labels) func = functools.update_wrapper(func, predict) repo_url = 'https://github.com/KichangKim/DeepDanbooru' title = 'KichangKim/DeepDanbooru' description = f'A demo for {repo_url}' article = None gr.Interface( func, [ gr.inputs.Image(type='pil', label='Input'), gr.inputs.Slider(0, 1, step=args.score_slider_step, default=args.score_threshold, label='Score Threshold'), ], gr.outputs.Label(label='Output'), theme=args.theme, title=title, description=description, article=article, examples=examples, allow_screenshot=args.allow_screenshot, allow_flagging=args.allow_flagging, live=args.live, ).launch( enable_queue=args.enable_queue, server_port=args.port, share=args.share, ) if __name__ == '__main__': main()