import gradio as gr import requests import time import json from contextlib import closing from websocket import create_connection from deep_translator import GoogleTranslator from langdetect import detect import os from PIL import Image import io import base64 import os import random import tempfile import re from gradio_client import Client import moviepy.editor as mp def animate_img(encoded_string, model): url_hg1 = os.getenv("url_hg1") url_hg2 = os.getenv("url_hg2") if model == "Stable Video Diffusion": try: r = requests.post("https://stable-video-diffusion.com/api/upload", files={"file": open(encoded_string, 'rb')}) hash_ = r.json()['hash'] time.sleep(10) c = 0 while c < 10: r2 = requests.get(f"https://stable-video-diffusion.com/result?hash={hash_}") source_string = r2.text if "Generation has been in progress for" in source_string: time.sleep(15) c += 1 continue if "Generation has been in progress for" not in source_string: pattern = r'https://storage.stable-video-diffusion.com/([a-f0-9]{32})\.mp4' matches = re.findall(pattern, source_string) sd_video = [] for match in matches: sd_video.append(f"https://storage.stable-video-diffusion.com/{match}.mp4") if len(sd_video) != 0: print("s_1") return sd_video[0] else: _ = 1/0 print("f_1") except: return None #print("2") #client1 = Client(url_hg1) #result1 = client1.predict(encoded_string, api_name="/resize_image") #client = Client(url_hg1) #result = client.predict(result1, 0, True, 1, 15, api_name="/video") #res = result[0]['video'] #print("s_2") #return res #if model == "AnimateDiff": # client = Client(url_hg2) # result = client.predict(encoded_string, "zoom-out", api_name="/predict") # return result def create_video(prompt, model): url_sd3 = os.getenv("url_sd3") url_sd4 = os.getenv("url_sd4") if model == "Stable Video Diffusion": try: with closing(create_connection(f"{url_sd3}", timeout=120)) as conn: conn.send('{"fn_index":3,"session_hash":""}') conn.send(f'{{"data":["{prompt}","[deformed | disfigured], poorly drawn, [bad : wrong] anatomy, [extra | missing | floating | disconnected] limb, (mutated hands and fingers), blurry",7.5,"(No style)"],"event_data":null,"fn_index":3,"session_hash":""}}') c = 0 while c < 60: status = json.loads(conn.recv())['msg'] if status == 'estimation': c += 1 time.sleep(1) continue if status == 'process_starts': break photo = json.loads(conn.recv())['output']['data'][0][0] base64_string = photo.replace('data:image/jpeg;base64,', '').replace('data:image/png;base64,', '') image_bytes = base64.b64decode(base64_string) with tempfile.NamedTemporaryFile(delete=True) as temp: temp.write(image_bytes) temp_file_path = temp.name print("cs_1") except: print("c_2") with closing(create_connection(f"{url_sd4}", timeout=120)) as conn: conn.send('{"fn_index":0,"session_hash":""}') conn.send(f'{{"data":["{prompt}","[deformed | disfigured], poorly drawn, [bad : wrong] anatomy, [extra | missing | floating | disconnected] limb, (mutated hands and fingers), blurry","dreamshaperXL10_alpha2.safetensors [c8afe2ef]",30,"DPM++ 2M Karras",7,1024,1024,-1],"event_data":null,"fn_index":0,"session_hash":""}}') conn.recv() conn.recv() conn.recv() conn.recv() photo = json.loads(conn.recv())['output']['data'][0] base64_string = photo.replace('data:image/jpeg;base64,', '').replace('data:image/png;base64,', '') image_bytes = base64.b64decode(base64_string) with tempfile.NamedTemporaryFile(delete=True) as temp: temp.write(image_bytes) temp_file_path = temp.name print("cs_2") try: r = requests.post("https://stable-video-diffusion.com/api/upload", files={"file": open(temp_file_path, 'rb')}) print(r.text) hash_ = r.json()['hash'] time.sleep(10) c = 0 while c < 10: r2 = requests.get(f"https://stable-video-diffusion.com/result?hash={hash_}") source_string = r2.text if "Generation has been in progress for" in source_string: time.sleep(15) c += 1 continue if "Generation has been in progress for" not in source_string: pattern = r'https://storage.stable-video-diffusion.com/([a-f0-9]{32})\.mp4' matches = re.findall(pattern, source_string) sd_video = [] for match in matches: sd_video.append(f"https://storage.stable-video-diffusion.com/{match}.mp4") print(sd_video[0]) if len(sd_video) != 0: return sd_video[0] else: _ = 1/0 except: return None #client1 = Client("https://emmadrex-stable-video-diffusion.hf.space") #result1 = client1.predict(encoded_string, api_name="/resize_image") #client = Client("https://emmadrex-stable-video-diffusion.hf.space") #result = client.predict(result1, 0, True, 1, 15, api_name="/video") #return result[0]['video'] if model == "AnimateDiff": data = {"prompt": prompt, "negative_prompt": "EasyNegative"} r = requests.post("https://sd.cuilutech.com/sdapi/async/txt2gif", json=data) c = 0 while c < 60: r2 = requests.post("https://sd.cuilutech.com/sdapi/get_task_info", json={'task_id': r.json()['data']['task_id']}) time.sleep(2) if r2.json()['data']: photo = r2.json()['data']['image_urls'][0] break c += 1 image = base64.b64encode(requests.get(photo).content).decode("utf-8") with tempfile.NamedTemporaryFile(delete=False) as temp: temp.write(base64.decodebytes(bytes(image, "utf-8"))) temp_file_path = temp.name clip = mp.VideoFileClip(temp_file_path) n_im2 = f"{time.time()}" temp_file2 = tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) clip.write_videofile(temp_file2.name) return temp_file2.name def flip_text1(prompt, motion): try: language = detect(prompt) if language == 'ru': prompt = GoogleTranslator(source='ru', target='en').translate(prompt) print(prompt) except: prompt = 'video' url_video_g = os.getenv("url_video_g") url_video_c = os.getenv("url_video_c") if motion == "Приближение →←": motion = 'zoom in' if motion == "Отдаление ←→": motion = 'zoom out' if motion == "Вверх ↑": motion = 'up' if motion == "Вниз ↓": motion = 'down' if motion == "Влево ←": motion = 'left' if motion == "Вправо →": motion = 'right' if motion == "По часовой стрелке ⟳": motion = 'rotate cw' if motion == "Против часовой стрелки ⟲": motion = 'rotate ccw' data = {"prompt": f"{prompt}","image": "null", "denoise": 0.75,"motion": motion} r = requests.post(f"{url_video_g}", json=data) while True: data2 = {"task_id": f"{r.json()['task_id']}"} r2 = requests.post(f"{url_video_c}", json=data2) time.sleep(3) try: if r2.json()['status'] == "QUEUED": continue if r2.json()['status'] == "PROCESSING": continue except: try: n_im2 = f"{time.time()}" with tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) as file: for chunk in r2.iter_content(chunk_size=1024): if chunk: file.write(chunk) return file.name except Exception as e: print(e) break def flip_text2(encoded_string, prompt, motion): url_video_g = os.getenv("url_video_g") url_video_c = os.getenv("url_video_c") try: language = detect(prompt) if language == 'ru': prompt = GoogleTranslator(source='ru', target='en').translate(prompt) print(prompt) except: pass if motion == "Приближение →←": motion = 'zoom in' if motion == "Отдаление ←→": motion = 'zoom out' if motion == "Вверх ↑": motion = 'up' if motion == "Вниз ↓": motion = 'down' if motion == "Влево ←": motion = 'left' if motion == "Вправо →": motion = 'right' if motion == "По часовой стрелке ⟳": motion = 'rotate cw' if motion == "Против часовой стрелки ⟲": motion = 'rotate ccw' with open(encoded_string, "rb") as image_file: encoded_string2 = base64.b64encode(image_file.read()) encoded_string2 = str(encoded_string2).replace("b'", '') data = {"prompt": f"{prompt}","image": f"{encoded_string2}","denoise":0.75,"motion": motion} r = requests.post(f"{url_video_g}", json=data) while True: data2 = {"task_id": f"{r.json()['task_id']}"} r2 = requests.post(f"{url_video_c}", json=data2) time.sleep(3) try: if r2.json()['status'] == "QUEUED": continue if r2.json()['status'] == "PROCESSING": continue except: try: n_im2 = f"{time.time()}" with tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) as file: for chunk in r2.iter_content(chunk_size=1024): if chunk: file.write(chunk) return file.name except Exception as e: print(e) break css = """ #generate { width: 100%; background: #e253dd !important; border: none; border-radius: 50px; outline: none !important; color: white; } #generate:hover { background: #de6bda !important; outline: none !important; color: #fff; } footer {visibility: hidden !important;} """ with gr.Blocks(css=css) as demo: with gr.Tab("Сгенерировать видео"): with gr.Column(): prompt = gr.Textbox(placeholder="Введите описание видео...", show_label=True, label='Описание:', lines=3) # motion1 = gr.Dropdown(value="Приближение →←", interactive=True, show_label=True, label="Движение камеры:", choices=["Приближение →←", "Отдаление ←→", "Вверх ↑", "Вниз ↓", "Влево ←", "Вправо →", "По часовой стрелке ⟳", "Против часовой стрелки ⟲"]) model = gr.Radio(interactive=True, value="Stable Video Diffusion", show_label=True, label="Модель нейросети:", choices=['Stable Video Diffusion']) with gr.Column(): text_button = gr.Button("Сгенерировать видео", variant='primary', elem_id="generate") with gr.Column(): video_output = gr.Video(show_label=True, label='Результат:', type="file") text_button.click(create_video, inputs=[prompt, model], outputs=video_output) with gr.Tab("Анимировать изображение"): with gr.Column(): prompt2 = gr.Image(show_label=True, interactive=True, type='filepath', label='Исходное изображение:') # prompt12 = gr.Textbox(placeholder="Введите описание видео...", show_label=True, label='Описание видео (опционально):', lines=3) # motion2 = gr.Dropdown(value="Приближение →←", interactive=True, show_label=True, label="Движение камеры:", choices=["Приближение →←", "Отдаление ←→", "Вверх ↑", "Вниз ↓", "Влево ←", "Вправо →", "По часовой стрелке ⟳", "Против часовой стрелки ⟲"]) model2 = gr.Radio(interactive=True, value="Stable Video Diffusion", show_label=True, label="Модель нейросети:", choices=['Stable Video Diffusion']) with gr.Column(): text_button2 = gr.Button("Анимировать изображение", variant='primary', elem_id="generate") with gr.Column(): video_output2 = gr.Video(show_label=True, label='Результат:', type="file") text_button2.click(animate_img, inputs=[prompt2, model2], outputs=video_output2) demo.queue(concurrency_count=12) demo.launch()