Animator / app.py
Opera8's picture
Update app.py
f2841bb verified
import os
import uuid
import requests
import threading
import time
from flask import Flask, request, jsonify, render_template
from werkzeug.utils import secure_filename
from itertools import cycle
from dotenv import load_dotenv
load_dotenv()
workers_env = os.getenv("WORKER_NODES", "")
if not workers_env:
raise ValueError("WORKER_NODES environment variable is not set")
WORKER_URLS = [url.strip() for url in workers_env.split(',') if url.strip()]
worker_pool = cycle(WORKER_URLS)
UPLOADER_API_URL = "https://hamed744-uploadfile.hf.space/upload"
HF_TOKEN = os.getenv("HF_TOKEN")
app = Flask(__name__, template_folder='templates')
jobs = {}
lock = threading.Lock()
def get_next_worker_url():
return next(worker_pool)
def get_permanent_link(job_id, temp_render_url, worker_url):
try:
with lock: jobs[job_id]["status"] = "در حال دائمی‌سازی لینک..."
if not HF_TOKEN: raise Exception("HF_TOKEN not found")
video_full_url = f"{worker_url}{temp_render_url}"
payload = {'url': video_full_url}
headers = {'Authorization': f'Bearer {HF_TOKEN}'}
response = requests.post(UPLOADER_API_URL, json=payload, headers=headers, timeout=600)
response.raise_for_status()
data = response.json()
final_url = data.get("hf_url") or data.get("url")
if not final_url: raise Exception("Invalid response from uploader service")
with lock:
jobs[job_id]["status"] = "completed"
jobs[job_id]["result"] = final_url
except Exception as e:
print(f"Error making link permanent for {job_id}: {e}")
with lock:
jobs[job_id]["status"] = "error"
jobs[job_id]["result"] = f"Error making link permanent. Temp link: {temp_render_url}"
def poll_worker_service(job_id, render_job_id, worker_url):
POLLING_INTERVAL = 15
MAX_POLLING_ERRORS = 20
error_count = 0
while True:
try:
response = requests.post(f"{worker_url}/check_job_status", json={"job_id": render_job_id}, timeout=45)
response.raise_for_status()
error_count = 0
data = response.json()
with lock:
current_job = jobs.get(job_id)
if current_job and current_job.get("status") != data.get("status"):
current_job["status"] = data.get("status")
if data.get("status") == "completed":
get_permanent_link(job_id, data.get("result"), worker_url)
return
elif data.get("status") == "error":
with lock:
current_job = jobs.get(job_id)
if current_job:
current_job["result"] = data.get("result", "Unknown worker error")
return
except requests.exceptions.RequestException as e:
error_count += 1
print(f"Connection error polling worker {worker_url} ({error_count}/{MAX_POLLING_ERRORS}): {e}")
if error_count >= MAX_POLLING_ERRORS:
with lock:
current_job = jobs.get(job_id)
if current_job:
current_job["status"] = "error"
current_job["result"] = f"Connection to worker ({worker_url}) lost."
return
time.sleep(POLLING_INTERVAL)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/submit_job', methods=['POST'])
def submit_job():
if 'image_file' not in request.files or 'video_file' not in request.files:
return jsonify({"error": "Image and video files are required"}), 400
image_file = request.files['image_file']
video_file = request.files['video_file']
motion = request.form.get('motion')
style = request.form.get('style')
image_bytes = image_file.read()
video_bytes = video_file.read()
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
selected_worker_url = get_next_worker_url()
print(f"Attempt {attempt + 1}/{MAX_RETRIES}: Sending to worker: {selected_worker_url}")
try:
response = requests.post(
f"{selected_worker_url}/submit_new_job",
files={'image_file': (secure_filename(image_file.filename), image_bytes, image_file.mimetype),
'video_file': (secure_filename(video_file.filename), video_bytes, video_file.mimetype)},
data={'motion': motion, 'style': style},
timeout=600
)
response.raise_for_status()
render_data = response.json()
render_job_id = render_data.get("job_id")
if not render_job_id: raise Exception("Invalid response from worker")
internal_job_id = str(uuid.uuid4())
with lock: jobs[internal_job_id] = {"status": "Sending to worker...", "result": None}
thread = threading.Thread(target=poll_worker_service, args=(internal_job_id, render_job_id, selected_worker_url))
thread.start()
return jsonify({"job_id": internal_job_id})
except Exception as e:
print(f"Error on attempt {attempt + 1} for worker {selected_worker_url}: {e}")
time.sleep(2)
final_error_message = ("<strong>Servers are busy!</strong><br>"
"Please try again in a few minutes.")
return jsonify({"error": final_error_message}), 500
@app.route('/api/check_status', methods=['POST'])
def check_status():
data = request.get_json()
job_id = data.get('job_id')
if not job_id: return jsonify({"error": "Job ID is required"}), 400
with lock: job = jobs.get(job_id, {"status": "not_found", "result": None})
return jsonify({"status": job["status"], "result": job["result"]})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860)))