Spaces:
Sleeping
Sleeping
File size: 3,364 Bytes
97b9880 d29e442 97b9880 997417f 97b9880 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
import gradio as gr
import numpy as np
import random
import multiprocessing
import subprocess
import sys
import time
import signal
import json
import os
import requests
from loguru import logger
from decouple import config
URL = config('URL')
OUTPUT_DIR = config('OUTPUT_DIR')
COMF_PATH = config('COMF_PATH')
def get_latest_image(folder):
files = os.listdir(folder)
image_files = [f for f in files if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
image_files.sort(key=lambda x: os.path.getmtime(os.path.join(folder, x)))
latest_image = os.path.join(folder, image_files[-1]) if image_files else None
return latest_image
def start_queue(prompt_workflow):
p = {"prompt": prompt_workflow}
data = json.dumps(p).encode('utf-8')
requests.post(URL, data=data)
def generate_image(prompt_text):
with open("workflow_api.json", "r", encoding="utf-8") as file_json:
prompt = json.load(file_json)
prompt["6"]["inputs"]["text"] = f"digital artwork of a {prompt_text}"
previous_image = get_latest_image(OUTPUT_DIR)
start_queue(prompt)
while True:
latest_image = get_latest_image(OUTPUT_DIR)
if latest_image != previous_image:
return latest_image
time.sleep(1)
def run_main_script(exit_event):
process = subprocess.Popen([sys.executable, COMF_PATH, "--cpu"])
logger.info(f"Скрипт main.py запущен в отдельном процессе с PID: {process.pid}")
while not exit_event.is_set():
if process.poll() is not None:
logger.info("Процесс main.py завершился самостоятельно")
break
time.sleep(0.1)
if process.poll() is None:
logger.info("Завершаем процесс main.py")
process.terminate()
process.wait(timeout=5)
if process.poll() is None:
logger.info("Процесс main.py не завершился, применяем SIGKILL")
process.kill()
def signal_handler(signum, frame):
logger.info(f"Получен сигнал {signum}, завершаем работу")
exit_event.set()
if __name__ == "__main__":
exit_event = multiprocessing.Event()
# Устанавливаем обработчик сигналов
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Создаем и запускаем процесс
process = multiprocessing.Process(target=run_main_script, args=(exit_event,))
process.start()
try:
demo = gr.Interface(fn=generate_image, inputs=["text"], outputs=["image"])
demo.launch(share=True)
finally:
logger.info("Ожидаем завершения дочернего процесса")
exit_event.set()
process.join(timeout=10)
if process.is_alive():
logger.info("Дочерний процесс не завершился, применяем terminate()")
process.terminate()
process.join(timeout=5)
if process.is_alive():
logger.info("Дочерний процесс все еще жив, применяем kill()")
process.kill()
process.join()
logger.info("Основной скрипт завершил работу.")
|