import os import gradio as gr import shutil import numpy as np from PIL import Image import asyncio from concurrent.futures import ThreadPoolExecutor, as_completed from gradio_client import Client from fastapi import FastAPI, Request from fastapi.staticfiles import StaticFiles from starlette.responses import Response import uvicorn import requests from urllib.parse import quote from unicodedata import normalize import json import time from html.parser import HTMLParser try: import cv2 except ImportError: os.system('pip install opencv-python') import cv2 root = os.path.dirname(os.path.abspath(__file__)) textures_folder = os.path.join(root, 'textures') os.makedirs(textures_folder, exist_ok=True) valid_extensions = ['.jpeg', '.jpg', '.png'] textures_repo = "https://huggingface.co/datasets/2ch/textures/resolve/main/" textures_for_download = [ f"{textures_repo}гауссовский_шум_и_мелкое_зерно.png?download=true", f"{textures_repo}грязная_матрица.png?download=true", f"{textures_repo}для_ночных_и_тёмных_кадров_сильный_шум_и_пыль.png?download=true", f"{textures_repo}для_ночных_и_тёмных_кадров_царапины_шум_пыль_дымка.png?download=true", f"{textures_repo}для_светлых_и_солнечных_ярких_фото_мелкое_констрастное_зерно.png?download=true", f"{textures_repo}зернистость_плёнки.png?download=true", f"{textures_repo}зернистость_плёнки_с_грязью.png?download=true", f"{textures_repo}испорченная_ворсом_плёнка.png?download=true", f"{textures_repo}мелкий_цветной_шум.png?download=true", f"{textures_repo}мелкое_контрастное_зерно_и_средний_цветвой_шум.png?download=true", f"{textures_repo}очень_мелкое_зерно.png?download=true", f"{textures_repo}пыльная_плёнка.png?download=true", f"{textures_repo}сильный_цветовой_шум_для_ночных_фото.png?download=true", f"{textures_repo}слабый_естественный_шум_матрицы_смартфона.png?download=true", f"{textures_repo}среднее_зерно.png?download=true", f"{textures_repo}среднее_монохромное_зерно_пыль_и_ворсинки.png?download=true", f"{textures_repo}средний_цветной_шум.png?download=true", f"{textures_repo}старая_матрица.png?download=true", f"{textures_repo}старая_потёртая_плёнка.png?download=true", f"{textures_repo}цветной_шум_матрицы.png?download=true", f"{textures_repo}цветной_шум_на_плёнке.png?download=true", f"{textures_repo}шумная_матрица.png?download=true", ] def dl_textures(texture_url: str) -> None: texture_for_download = quote(normalize('NFD', texture_url), safe='/?:=') filename = texture_url.split('/')[-1].split('?')[0] file_path = os.path.join(textures_folder, filename) response = requests.get(texture_for_download, stream=True) response.raise_for_status() with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) def create_texture_preview(texture_folder: str, output_folder: str, size: tuple[int] = (246, 246)) -> None: os.makedirs(output_folder, exist_ok=True) for texture in os.listdir(texture_folder): img_path = os.path.join(texture_folder, texture) img = cv2.imread(img_path, cv2.IMREAD_UNCHANGED) start_x = np.random.randint(0, img.shape[1] - size[1]) start_y = np.random.randint(0, img.shape[0] - size[0]) img = img[start_y:start_y + size[0], start_x:start_x + size[1]] cv2.imwrite(os.path.join(output_folder, texture), img) def prepare_textures(texture_folder: str, output_folder: str) -> None: with ThreadPoolExecutor(max_workers=len(textures_for_download)) as executor: futures = [executor.submit(dl_textures, texture_for_download) for texture_for_download in textures_for_download] for future in as_completed(futures): future.result() create_texture_preview(texture_folder, output_folder, size=(246, 246)) preview_css = "" prepare_textures(textures_folder, os.path.join(root, 'preview')) for i, texture in enumerate(os.listdir(textures_folder), start=1): if os.path.splitext(texture)[1].lower() in valid_extensions: preview_css += f"""[data-testid="{i:02d}-radio-label"]::before {{ background-color: transparent !important; background-image: url("./preview/{texture}") !important; }}\n""" radio_css = """ html, body { background: var(--body-background-fill); } .gradio-container { max-width: 1396px !important; } #textures label { position: relative; width: 256px; height: 256px; display: flex; flex-direction: row; align-items: flex-end; background: none !important; padding: 4px !important; transition: .3s; } #textures label::before { width: 246px; height: 246px; border-radius: 8px; display: block; content: ""; transition: .3s; background: red; position: relative; top: 0px; } #textures label:hover::before, #textures label:active::before, #textures label.selected::before { mix-blend-mode: soft-light; transition: .3s } #textures span:not([data-testid="block-info"]), #textures input { position: absolute; z-index: 999; } #textures input { position: absolute; z-index: 999; bottom: 9px; left: 9px; } #textures span:not([data-testid="block-info"]) { left: 21px; padding: 2px 8px; background: rgba(0, 0, 0, .57); backdrop-filter: blur(3px) } #textures { background-color: hsla(0, 0%, 50%, 1); } .built-with, .show-api, footer .svelte-mpyp5e { display: none !important; } footer:after { content: "ну пролапс, ну и что?"; } #zoom { position: absolute; top: 50%; left: 50%; width: 250px; height: 250px; background-repeat: no-repeat; box-shadow: 0px 0px 10px 5px rgba(0, 0, 0, .2); border-radius: 50%; cursor: none; pointer-events: none; z-index: 999; opacity: 0; transform: scale(0); transition: opacity 500ms, transform 500ms; } #textures_tab .image-button { cursor: none; } #textured_result-download-link, #restored_image-download-link, #upscaled_image-download-link { position: absolute; z-index: 9999; padding: 2px 4px; margin: 0 7px; background: black; bottom: 0; right: 0; font-size: 20px; transition: 300ms } #textured_result-download-link:hover, #restored_image-download-link:hover, #upscaled_image-download-link:hover { color: #99f7a8 } #restored_images.disabled, #upscaled_images.disabled { height: 0px !important; opacity: 0; transition: 300ms } #restored_images.enabled, #upscaled_images.enabled { transition: 300ms } """ + preview_css custom_js = """ const PageLoadObserver = new MutationObserver((mutationsList, observer) => { for (let mutation of mutationsList) { if (mutation.type === 'childList') { const tabsDiv = document.querySelector('div.tab-nav'); if (tabsDiv) { observer.disconnect(); document.querySelector('#textures_tab-button').addEventListener('click', () => { setTimeout(() => { let labels = document.querySelectorAll('label[data-testid]'); labels.forEach((label) => { let input = label.querySelector('input[type="radio"]'); if (input) { let title = input.value.split('.')[0].replace(/_/g, ' '); label.title = title; } }); document.querySelector("label[data-testid='05-radio-label']").click() }, 150); }) function checkImagesAndSetClass(galleryElement) { const firstDiv = galleryElement.querySelector('div:first-child'); const hasChildElements = firstDiv && firstDiv.children.length > 0; const hasImages = galleryElement.querySelectorAll('img').length > 0; if (hasChildElements || hasImages) { galleryElement.classList.add('enabled'); galleryElement.classList.remove('disabled'); } else { galleryElement.classList.add('disabled'); galleryElement.classList.remove('enabled'); } } function setupGalleryObserver(galleryId) { let gallery = document.getElementById(galleryId); const observer = new MutationObserver(() => { checkImagesAndSetClass(gallery); }); observer.observe(gallery, { childList: true, subtree: true }); checkImagesAndSetClass(gallery); } setupGalleryObserver('restored_images'); setupGalleryObserver('upscaled_images'); function magnify(imgID, zoom) { var img, glass, w, h, bw; img = document.querySelector(imgID); glass = document.createElement("DIV"); glass.setAttribute("id", "zoom"); img.parentElement.insertBefore(glass, img); glass.style.backgroundImage = "url('" + img.src + "')"; glass.style.backgroundRepeat = "no-repeat"; glass.style.backgroundSize = (img.width * zoom) + "px " + (img.height * zoom) + "px"; bw = 3; w = glass.offsetWidth / 2; h = glass.offsetHeight / 2; glass.addEventListener("mousemove", moveMagnifier); img.addEventListener("mousemove", moveMagnifier); glass.addEventListener("touchmove", moveMagnifier); img.addEventListener("touchmove", moveMagnifier); function moveMagnifier(e) { var pos, x, y; e.preventDefault(); pos = getCursorPos(e); x = pos.x; y = pos.y; if (x > img.width - (w / zoom)) { x = img.width - (w / zoom); } if (x < w / zoom) { x = w / zoom; } if (y > img.height - (h / zoom)) { y = img.height - (h / zoom); } if (y < h / zoom) { y = h / zoom; } glass.style.left = (x - w) + "px"; glass.style.top = (y - h) + "px"; glass.style.backgroundPosition = "-" + ((x * zoom) - w + bw) + "px -" + ((y * zoom) - h) + "px"; glass.style.backgroundImage = "url('" + img.src + "')"; } function getCursorPos(e) { var a, x = 0, y = 0; e = e || window.event; a = img.getBoundingClientRect(); x = e.pageX - a.left; y = e.pageY - a.top; x = x - window.scrollX; y = y - window.scrollY; return { x: x, y: y }; } img.addEventListener("mouseover", function () { glass.style.opacity = "1"; glass.style.transform = "scale(1)"; }); img.addEventListener("mouseout", function () { glass.style.opacity = "0"; glass.style.transform = "scale(0)"; }); } function setupDownloadLink(imgSelector, linkSelector, linkId, magnifyImage) { const imgElement = document.querySelector(imgSelector); if (imgElement && imgElement.src) { let downloadLink = document.querySelector(linkSelector); if (!downloadLink) { if (magnifyImage) { magnify(magnifyImage, 3); } downloadLink = document.createElement('a'); downloadLink.id = linkId; downloadLink.innerText = 'скачать'; imgElement.after(downloadLink); } downloadLink.href = imgElement.src; downloadLink.download = ''; } } const DownloadLinkObserverCallback = (mutationsList, observer, imgSelector, linkSelector, linkId, magnifyImage) => { setupDownloadLink(imgSelector, linkSelector, linkId, magnifyImage); }; const DownloadLinkObserverOptions = { childList: true, subtree: true, attributes: true, attributeFilter: ['src'] }; const ImageTexturedObserver = new MutationObserver((mutationsList, observer) => { DownloadLinkObserverCallback(mutationsList, observer, '#textured_result img[data-testid="detailed-image"]', '#textured_result-download-link', 'textured_result-download-link', "#textured_result .image-button img"); }); ImageTexturedObserver.observe(document, DownloadLinkObserverOptions); const ImageRestoredObserver = new MutationObserver((mutationsList, observer) => { DownloadLinkObserverCallback(mutationsList, observer, '#restored_images img[data-testid="detailed-image"]', '#restored_image-download-link', 'restored_image-download-link'); }); ImageRestoredObserver.observe(document, DownloadLinkObserverOptions); const ImageUpscaledObserver = new MutationObserver((mutationsList, observer) => { DownloadLinkObserverCallback(mutationsList, observer, '#upscaled_images img[data-testid="detailed-image"]', '#upscaled_image-download-link', 'upscaled_image-download-link'); }); ImageUpscaledObserver.observe(document, DownloadLinkObserverOptions); } } } }); PageLoadObserver.observe(document, { childList: true, subtree: true }); """ def extract_path_from_result(predict_answer: str | list[str] | tuple[str]) -> str: if isinstance(predict_answer, (tuple, list)): result = predict_answer[0] shutil.rmtree(os.path.dirname(predict_answer[1]), ignore_errors=True) else: result = predict_answer return result def restore_face_common(img_path: str, predict_answer: str, model: str) -> None: result = extract_path_from_result(predict_answer) if os.path.exists(result): if os.path.exists(img_path): os.unlink(img_path) new_file, new_extension = os.path.splitext(result) old_file, old_extension = os.path.splitext(img_path) old_filename = os.path.basename(old_file) new_location = os.path.join(os.path.dirname(img_path), f"{old_filename}_{model}{new_extension}") shutil.move(result, new_location) shutil.rmtree(os.path.dirname(result), ignore_errors=True) def restore_face_gfpgan(img_path: str) -> None: client = Client(src="https://xintao-gfpgan.hf.space/", verbose=False) result = client.predict(img_path, "v1.4", 4, api_name="/predict") restore_face_common(img_path, result, "gfpgan") def restore_face_codeformer(img_path: str) -> None: client = Client(src="https://sczhou-codeformer.hf.space/", verbose=False) result = client.predict(img_path, True, True, True, 2, 0, api_name="/predict") restore_face_common(img_path, result, "codeformer") async def restore_faces_one_image(img_path: str, func_list: list) -> bool: def run_func(func) -> bool: for _ in range(3): try: func(img_path) return True except Exception as e: print(f"ошибка в {func.__name__}: {e}") return False loop = asyncio.get_event_loop() with ThreadPoolExecutor(max_workers=len(func_list)) as executor: futures = [loop.run_in_executor(executor, run_func, func) for func in func_list] results = await asyncio.gather(*futures) return any(results) async def restore_faces_batch(input_images: list[str], func_list: list, batch_size: int = 3) -> bool: results = False try: batches = [input_images[i:i + batch_size] for i in range(0, len(input_images), batch_size)] for batch in batches: tasks = [restore_faces_one_image(img_path, func_list) for img_path in batch] results = await asyncio.gather(*tasks) return any(results) except Exception as error: print(error) return results def get_file_paths(input_path: str | list[str], extensions_list: list[str]) -> list[str]: files = [] def add_files_from_directory(directory): for file_name in os.listdir(directory): if os.path.splitext(file_name)[1] in extensions_list: files.append(os.path.abspath(os.path.join(directory, file_name))) if isinstance(input_path, list): for file_path in input_path: parent_directory = os.path.dirname(file_path) add_files_from_directory(parent_directory) else: add_files_from_directory(input_path) return files async def restore_upscale(files: tuple, restore_method: str) -> list[str]: file_paths = [file.name for file in files] if restore_method == 'codeformer': func_list = [restore_face_codeformer] elif restore_method == 'gfpgan': func_list = [restore_face_gfpgan] else: func_list = [restore_face_codeformer, restore_face_gfpgan] results = await restore_faces_batch(file_paths, func_list, batch_size=3) if results: file_paths = get_file_paths(file_paths, valid_extensions) return file_paths else: return ['https://iili.io/JzrxjDP.png'] def image_noise_softlight_layer_mix(img, texture, output: str = None, opacity: float = 0.7): if isinstance(img, Image.Image): img = np.array(img).astype(float) elif isinstance(img, np.ndarray): img = img.astype(float) if img.shape[2] == 3 and not isinstance(img, Image.Image): img = cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_RGB2BGR).astype(float) overlay = cv2.imread(texture, cv2.IMREAD_UNCHANGED).astype(float) start_x = np.random.randint(0, overlay.shape[1] - img.shape[1]) start_y = np.random.randint(0, overlay.shape[0] - img.shape[0]) overlay = overlay[start_y:start_y + img.shape[0], start_x:start_x + img.shape[1]] if img.shape[2] == 3: img = cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_RGB2RGBA).astype(float) if overlay.shape[2] == 3: overlay = cv2.cvtColor(overlay.astype(np.uint8), cv2.COLOR_RGB2RGBA).astype(float) overlay[..., 3] *= opacity img_in_norm = img / 255.0 img_layer_norm = overlay / 255.0 comp_alpha = np.minimum(img_in_norm[:, :, 3], img_layer_norm[:, :, 3]) * 1.0 new_alpha = img_in_norm[:, :, 3] + (1.0 - img_in_norm[:, :, 3]) * comp_alpha np.seterr(divide='ignore', invalid='ignore') ratio = comp_alpha / new_alpha ratio[ratio == np.NAN] = 0.0 comp = (1.0 - img_in_norm[:, :, :3]) * img_in_norm[:, :, :3] * img_layer_norm[:, :, :3] + img_in_norm[:, :, :3] * ( 1.0 - (1.0 - img_in_norm[:, :, :3]) * (1.0 - img_layer_norm[:, :, :3])) ratio_rs = np.reshape(np.repeat(ratio, 3), [comp.shape[0], comp.shape[1], comp.shape[2]]) img_out = comp * ratio_rs + img_in_norm[:, :, :3] * (1.0 - ratio_rs) img_out = np.nan_to_num(np.dstack((img_out, img_in_norm[:, :, 3]))) result = img_out * 255.0 rgb_image = cv2.cvtColor(result.astype(np.uint8), cv2.COLOR_BGR2RGB) image = Image.fromarray(rgb_image) return np.array(image) def apply_texture(input_image, textures_choice: str, opacity_slider: float): result = image_noise_softlight_layer_mix(input_image, os.path.join(textures_folder, textures_choice), opacity=opacity_slider) return [result] def temp_upload_file(file_path: str) -> str | None: servers = [ ('https://transfer.sh/', 'fileToUpload'), ('https://x0.at/', 'file'), ('https://tmpfiles.org/api/v1/upload', 'file'), ('https://uguu.se/upload.php', 'files[]') ] for i in range(3): for server, file_key in servers: try: with open(file_path, 'rb') as f: files = {file_key: f} response = requests.post(server, files=files) if response.status_code == 200: if server == 'https://transfer.sh/': return response.text.replace("https://transfer.sh/","https://transfer.sh/get/").replace("\n","") elif server == 'https://tmpfiles.org/api/v1/upload': response_json = response.json() if response_json['status'] == 'success': return response_json['data']['url'].replace("https://tmpfiles.org/", "https://tmpfiles.org/dl/") elif server == 'https://uguu.se/upload.php': response_json = response.json() if response_json['success']: return response_json['files'][0]['url'] else: return response.text except Exception as e: print(f'{server}: {e}') return None def upload_image(image_path: str) -> str | None: files = {'source': open(image_path, "rb")} data = {'key': '6d207e02198a847aa98d0a2a901485a5', 'action': 'upload', 'format': 'json'} response = requests.post('https://freeimage.host/api/1/upload', files=files, data=data) if response.json()["status_code"] == 200: return response.json()["image"]["url"] else: return temp_upload_file(image_path) def get_headers(url: str) -> dict: session = requests.Session() anon_auth = session.get(url) cookies = session.cookies.get_dict() return { 'content-type': 'application/json', 'cookie': f'csrftoken={cookies["csrftoken"]}; replicate_anonymous_id={cookies["replicate_anonymous_id"]};', 'origin': 'https://replicate.com', 'x-csrftoken': cookies['csrftoken'], 'authority': 'replicate.com', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/jxl,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'accept-language': 'ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7', 'cache-control': 'no-cache', 'dnt': '1', 'pragma': 'no-cache', 'referer': f'{url}?input=http', 'sec-ch-ua': '"Chromium";v="117", "Not;A=Brand";v="8"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'same-origin', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36' } def get_version(url: str) -> str: url = url.rstrip('/') + '/versions' response = requests.get(url) class Version(HTMLParser): def __init__(self): super().__init__() self.recording = 0 self.data = '' def handle_starttag(self, tag, attrs): if tag == 'a': for name, value in attrs: if name == 'href' and '/versions/' in value: self.recording = 1 def handle_endtag(self, tag): if tag == 'a' and self.recording: self.recording -= 1 def handle_data(self, data): if self.recording: self.data = data parser = Version() parser.feed(response.text) return parser.data.strip() def replicate_upscale(url: str, image_url: str, upscale: int = 2) -> str: version = get_version(url) headers = get_headers(url) session = requests.Session() anon_auth = session.get(url, headers=headers) data = { "version": version, "input": { "img": image_url, "image": image_url, "upscale": upscale, "scale": upscale, "version": "General - RealESRGANplus", }, "face_enhance": False, "is_training": False, "stream": False } response = session.post('https://replicate.com/api/predictions', headers=headers, data=json.dumps(data)) prediction_id = response.json()['id'] while True: response = session.get(f'https://replicate.com/api/predictions/{prediction_id}', headers=headers) if 'status' in response.json(): status = response.json()['status'] else: status = 'processing' if status == 'succeeded': break time.sleep(1) session.close() return response.json()['output'] def upscaler(img_url: str) -> list[str] | None: def run(url): try: return replicate_upscale(url, img_url) except Exception as e: print(e) return None urls = [ 'https://replicate.com/cjwbw/real-esrgan', 'https://replicate.com/daanelson/real-esrgan-a100', 'https://replicate.com/xinntao/realesrgan', ] with ThreadPoolExecutor() as executor: futures = {executor.submit(run, url) for url in urls} for future in as_completed(futures): result = future.result() if result is not None: break return [result] def check_upscale_result(image: str) -> list[str]: attempt = 0 response = None while attempt < 3: response = upscaler(upload_image(image)) if response: return response attempt += 1 return ['https://iili.io/JzrxjDP.png'] with gr.Blocks(analytics_enabled=False, css=radio_css, theme='Taithrah/Minimal', title='апскейл') as demo: with gr.Tab(label="апскейл", elem_id="upscale_tab"): file_output = gr.Gallery(label="", container=True, object_fit="cover", columns=4, rows=4, allow_preview=True, preview=True, show_share_button=False, show_download_button=False, elem_id="upscaled_images") upload_button = gr.UploadButton("выбор одного изображения для обработки", file_types=["image"], file_count="single", variant="primary") upload_button.upload(fn=check_upscale_result, inputs=[upload_button], outputs=file_output, api_name="upscale") with gr.Tab(label="восстановление лиц", id=1, elem_id="restore_tab"): restore_method = gr.Radio(["codeformer", "gfpgan", "оба"], value="codeformer", label="", interactive=True) restore_method.change(fn=lambda x: print(f"restore_method value = {x}"), inputs=restore_method, api_name="show_selected_method") file_output = gr.Gallery(label="", container=True, object_fit="cover", columns=4, rows=4, allow_preview=True, preview=True, show_share_button=False, show_download_button=False, elem_id="restored_images") upload_button = gr.UploadButton("выбор нескольких изображений для обработки", file_types=["image"], file_count="multiple", variant="primary") upload_button.upload(fn=restore_upscale, inputs=[upload_button, restore_method], outputs=file_output, api_name="face_restore") with gr.Tab(label="наложение зернистости пленки и шума", id=2, elem_id="textures_tab"): with gr.Row(variant="compact", elem_id="textures_tab_images"): input_image = gr.Image(label="исходник", sources=["upload", "clipboard"], type="numpy") result_image = gr.Gallery(label="результат", elem_id="textured_result", allow_preview=True, preview=True, show_share_button=False, show_download_button=False) opacity_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.1, label="видимость") apply_button = gr.Button(value="применить", variant="primary") texture_files = [(f"{i:02d}", texture) for i, texture in enumerate(os.listdir(textures_folder), start=1) if os.path.splitext(texture)[1].lower() in valid_extensions] textures_choice = gr.Radio(texture_files, show_label=False, interactive=True, elem_id="textures") apply_button.click(fn=apply_texture, inputs=[input_image, textures_choice, opacity_slider], outputs=result_image, api_name="texturize") app = FastAPI() @app.middleware("http") async def some_fastapi_middleware(request: Request, call_next): response = await call_next(request) path = request.url.path if path == "/": response_body = "" async for chunk in response.body_iterator: response_body += chunk.decode() javascript = f""" """ response_body = response_body.replace("", javascript + "") del response.headers["content-length"] return Response( content=response_body, status_code=response.status_code, headers=dict(response.headers), media_type=response.media_type ) return response app.mount("/preview", StaticFiles(directory=os.path.join(root, 'preview')), name="preview") gr.mount_gradio_app(app, demo, path="/") uvicorn.run(app, host="0.0.0.0", port=7860)