video2 / app.py
lalashechka's picture
Update app.py
d4317be
import gradio as gr
import requests
import time
import json
from contextlib import closing
from websocket import create_connection
from deep_translator import GoogleTranslator
from langdetect import detect
import os
from PIL import Image
import io
import base64
import os
import random
import tempfile
import re
from gradio_client import Client
import moviepy.editor as mp
def animate_img(encoded_string, model):
url_hg1 = os.getenv("url_hg1")
url_hg2 = os.getenv("url_hg2")
if model == "Stable Video Diffusion":
try:
r = requests.post("https://stable-video-diffusion.com/api/upload", files={"file": open(encoded_string, 'rb')})
hash_ = r.json()['hash']
time.sleep(10)
c = 0
while c < 10:
r2 = requests.get(f"https://stable-video-diffusion.com/result?hash={hash_}")
source_string = r2.text
if "Generation has been in progress for" in source_string:
time.sleep(15)
c += 1
continue
if "Generation has been in progress for" not in source_string:
pattern = r'https://storage.stable-video-diffusion.com/([a-f0-9]{32})\.mp4'
matches = re.findall(pattern, source_string)
sd_video = []
for match in matches:
sd_video.append(f"https://storage.stable-video-diffusion.com/{match}.mp4")
if len(sd_video) != 0:
print("s_1")
return sd_video[0]
else:
_ = 1/0
print("f_1")
except:
return None
#print("2")
#client1 = Client(url_hg1)
#result1 = client1.predict(encoded_string, api_name="/resize_image")
#client = Client(url_hg1)
#result = client.predict(result1, 0, True, 1, 15, api_name="/video")
#res = result[0]['video']
#print("s_2")
#return res
#if model == "AnimateDiff":
# client = Client(url_hg2)
# result = client.predict(encoded_string, "zoom-out", api_name="/predict")
# return result
def create_video(prompt, model):
url_sd3 = os.getenv("url_sd3")
url_sd4 = os.getenv("url_sd4")
if model == "Stable Video Diffusion":
try:
with closing(create_connection(f"{url_sd3}", timeout=120)) as conn:
conn.send('{"fn_index":3,"session_hash":""}')
conn.send(f'{{"data":["{prompt}","[deformed | disfigured], poorly drawn, [bad : wrong] anatomy, [extra | missing | floating | disconnected] limb, (mutated hands and fingers), blurry",7.5,"(No style)"],"event_data":null,"fn_index":3,"session_hash":""}}')
c = 0
while c < 60:
status = json.loads(conn.recv())['msg']
if status == 'estimation':
c += 1
time.sleep(1)
continue
if status == 'process_starts':
break
photo = json.loads(conn.recv())['output']['data'][0][0]
base64_string = photo.replace('data:image/jpeg;base64,', '').replace('data:image/png;base64,', '')
image_bytes = base64.b64decode(base64_string)
#with tempfile.NamedTemporaryFile(delete=False) as temp:
# temp.write(image_bytes)
# temp_file_path = temp.name
# print("cs_1")
except:
try:
print("c_2")
with closing(create_connection(f"{url_sd4}", timeout=120)) as conn:
conn.send('{"fn_index":0,"session_hash":""}')
conn.send(f'{{"data":["{prompt}","[deformed | disfigured], poorly drawn, [bad : wrong] anatomy, [extra | missing | floating | disconnected] limb, (mutated hands and fingers), blurry","dreamshaperXL10_alpha2.safetensors [c8afe2ef]",30,"DPM++ 2M Karras",7,1024,1024,-1],"event_data":null,"fn_index":0,"session_hash":""}}')
conn.recv()
conn.recv()
conn.recv()
conn.recv()
photo = json.loads(conn.recv())['output']['data'][0]
base64_string = photo.replace('data:image/jpeg;base64,', '').replace('data:image/png;base64,', '')
image_bytes = base64.b64decode(base64_string)
#with tempfile.NamedTemporaryFile(delete=False) as temp:
# temp.write(image_bytes)
# temp_file_path = temp.name
# print("cs_2")
except:
return None
try:
r = requests.post("https://stable-video-diffusion.com/api/upload", files={"file": image_bytes})
print(r.text)
hash_ = r.json()['hash']
time.sleep(10)
c = 0
while c < 10:
r2 = requests.get(f"https://stable-video-diffusion.com/result?hash={hash_}")
source_string = r2.text
if "Generation has been in progress for" in source_string:
time.sleep(15)
c += 1
continue
if "Generation has been in progress for" not in source_string:
pattern = r'https://storage.stable-video-diffusion.com/([a-f0-9]{32})\.mp4'
matches = re.findall(pattern, source_string)
sd_video = []
for match in matches:
sd_video.append(f"https://storage.stable-video-diffusion.com/{match}.mp4")
print(sd_video[0])
if len(sd_video) != 0:
return sd_video[0]
else:
_ = 1/0
except:
return None
#client1 = Client("https://emmadrex-stable-video-diffusion.hf.space")
#result1 = client1.predict(encoded_string, api_name="/resize_image")
#client = Client("https://emmadrex-stable-video-diffusion.hf.space")
#result = client.predict(result1, 0, True, 1, 15, api_name="/video")
#return result[0]['video']
if model == "AnimateDiff":
data = {"prompt": prompt, "negative_prompt": "EasyNegative"}
r = requests.post("https://sd.cuilutech.com/sdapi/async/txt2gif", json=data)
c = 0
while c < 60:
r2 = requests.post("https://sd.cuilutech.com/sdapi/get_task_info", json={'task_id': r.json()['data']['task_id']})
time.sleep(2)
if r2.json()['data']:
photo = r2.json()['data']['image_urls'][0]
break
c += 1
image = base64.b64encode(requests.get(photo).content).decode("utf-8")
with tempfile.NamedTemporaryFile(delete=False) as temp:
temp.write(base64.decodebytes(bytes(image, "utf-8")))
temp_file_path = temp.name
clip = mp.VideoFileClip(temp_file_path)
n_im2 = f"{time.time()}"
temp_file2 = tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False)
clip.write_videofile(temp_file2.name)
return temp_file2.name
def flip_text1(prompt, motion):
try:
language = detect(prompt)
if language == 'ru':
prompt = GoogleTranslator(source='ru', target='en').translate(prompt)
print(prompt)
except:
prompt = 'video'
url_video_g = os.getenv("url_video_g")
url_video_c = os.getenv("url_video_c")
if motion == "Приближение →←":
motion = 'zoom in'
if motion == "Отдаление ←→":
motion = 'zoom out'
if motion == "Вверх ↑":
motion = 'up'
if motion == "Вниз ↓":
motion = 'down'
if motion == "Влево ←":
motion = 'left'
if motion == "Вправо →":
motion = 'right'
if motion == "По часовой стрелке ⟳":
motion = 'rotate cw'
if motion == "Против часовой стрелки ⟲":
motion = 'rotate ccw'
data = {"prompt": f"{prompt}","image": "null", "denoise": 0.75,"motion": motion}
r = requests.post(f"{url_video_g}", json=data)
while True:
data2 = {"task_id": f"{r.json()['task_id']}"}
r2 = requests.post(f"{url_video_c}", json=data2)
time.sleep(3)
try:
if r2.json()['status'] == "QUEUED":
continue
if r2.json()['status'] == "PROCESSING":
continue
except:
try:
n_im2 = f"{time.time()}"
with tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) as file:
for chunk in r2.iter_content(chunk_size=1024):
if chunk:
file.write(chunk)
return file.name
except Exception as e:
print(e)
break
def flip_text2(encoded_string, prompt, motion):
url_video_g = os.getenv("url_video_g")
url_video_c = os.getenv("url_video_c")
try:
language = detect(prompt)
if language == 'ru':
prompt = GoogleTranslator(source='ru', target='en').translate(prompt)
print(prompt)
except:
pass
if motion == "Приближение →←":
motion = 'zoom in'
if motion == "Отдаление ←→":
motion = 'zoom out'
if motion == "Вверх ↑":
motion = 'up'
if motion == "Вниз ↓":
motion = 'down'
if motion == "Влево ←":
motion = 'left'
if motion == "Вправо →":
motion = 'right'
if motion == "По часовой стрелке ⟳":
motion = 'rotate cw'
if motion == "Против часовой стрелки ⟲":
motion = 'rotate ccw'
with open(encoded_string, "rb") as image_file:
encoded_string2 = base64.b64encode(image_file.read())
encoded_string2 = str(encoded_string2).replace("b'", '')
data = {"prompt": f"{prompt}","image": f"{encoded_string2}","denoise":0.75,"motion": motion}
r = requests.post(f"{url_video_g}", json=data)
while True:
data2 = {"task_id": f"{r.json()['task_id']}"}
r2 = requests.post(f"{url_video_c}", json=data2)
time.sleep(3)
try:
if r2.json()['status'] == "QUEUED":
continue
if r2.json()['status'] == "PROCESSING":
continue
except:
try:
n_im2 = f"{time.time()}"
with tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) as file:
for chunk in r2.iter_content(chunk_size=1024):
if chunk:
file.write(chunk)
return file.name
except Exception as e:
print(e)
break
css = """
#generate {
width: 100%;
background: #e253dd !important;
border: none;
border-radius: 50px;
outline: none !important;
color: white;
}
#generate:hover {
background: #de6bda !important;
outline: none !important;
color: #fff;
}
footer {visibility: hidden !important;}
"""
with gr.Blocks(css=css) as demo:
with gr.Tab("Сгенерировать видео"):
with gr.Column():
prompt = gr.Textbox(placeholder="Введите описание видео...", show_label=True, label='Описание:', lines=3)
# motion1 = gr.Dropdown(value="Приближение →←", interactive=True, show_label=True, label="Движение камеры:", choices=["Приближение →←", "Отдаление ←→", "Вверх ↑", "Вниз ↓", "Влево ←", "Вправо →", "По часовой стрелке ⟳", "Против часовой стрелки ⟲"])
model = gr.Radio(interactive=True, value="Stable Video Diffusion", show_label=True,
label="Модель нейросети:", choices=['Stable Video Diffusion'])
with gr.Column():
text_button = gr.Button("Сгенерировать видео", variant='primary', elem_id="generate")
with gr.Column():
video_output = gr.Video(show_label=True, label='Результат:', type="file")
text_button.click(create_video, inputs=[prompt, model], outputs=video_output)
with gr.Tab("Анимировать изображение"):
with gr.Column():
prompt2 = gr.Image(show_label=True, interactive=True, type='filepath', label='Исходное изображение:')
# prompt12 = gr.Textbox(placeholder="Введите описание видео...", show_label=True, label='Описание видео (опционально):', lines=3)
# motion2 = gr.Dropdown(value="Приближение →←", interactive=True, show_label=True, label="Движение камеры:", choices=["Приближение →←", "Отдаление ←→", "Вверх ↑", "Вниз ↓", "Влево ←", "Вправо →", "По часовой стрелке ⟳", "Против часовой стрелки ⟲"])
model2 = gr.Radio(interactive=True, value="Stable Video Diffusion", show_label=True,
label="Модель нейросети:", choices=['Stable Video Diffusion'])
with gr.Column():
text_button2 = gr.Button("Анимировать изображение", variant='primary', elem_id="generate")
with gr.Column():
video_output2 = gr.Video(show_label=True, label='Результат:', type="file")
text_button2.click(animate_img, inputs=[prompt2, model2], outputs=video_output2)
demo.queue(concurrency_count=24)
demo.launch()