Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify, render_template, send_from_directory | |
import requests | |
import time | |
import os | |
import random | |
import json | |
app = Flask(__name__, static_folder='static', template_folder='templates') | |
# Image generation API functions | |
def get_common_headers(): | |
return { | |
"Accept": "application/json, text/javascript, */*; q=0.01", | |
"Content-Type": "application/json; charset=UTF-8", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", | |
"X-Requested-With": "XMLHttpRequest" | |
} | |
def get_upgraded_prompt(prompt): | |
url = "https://editor.imagelabs.net/upgrade_prompt" | |
payload = {"prompt": prompt} | |
headers = get_common_headers() | |
try: | |
response = requests.post(url, json=payload, headers=headers) | |
response.raise_for_status() | |
data = response.json() | |
return data.get("upgraded_prompt") or data.get("prompt") or prompt | |
except Exception as e: | |
print(f"Error upgrading prompt: {e}") | |
return prompt | |
def generate_image(prompt, seed, subseed, attention, width, height, tiling, negative_prompt, reference_image, reference_image_type, reference_strength): | |
url = "https://editor.imagelabs.net/txt2img" | |
payload = { | |
"prompt": prompt, | |
"seed": seed, | |
"subseed": subseed, | |
"attention": attention, | |
"width": width, | |
"height": height, | |
"tiling": tiling, | |
"negative_prompt": negative_prompt, | |
"reference_image": reference_image, | |
"reference_image_type": reference_image_type, | |
"reference_strength": reference_strength | |
} | |
headers = get_common_headers() | |
try: | |
response = requests.post(url, json=payload, headers=headers) | |
response.raise_for_status() | |
data = response.json() | |
return data.get("task_id") | |
except Exception as e: | |
print(f"Error generating image: {e}") | |
raise | |
def poll_progress(task_id): | |
url = "https://editor.imagelabs.net/progress" | |
payload = {"task_id": task_id} | |
headers = get_common_headers() | |
max_attempts = 15 | |
attempt = 0 | |
while attempt < max_attempts: | |
try: | |
response = requests.post(url, json=payload, headers=headers) | |
response.raise_for_status() | |
data = response.json() | |
final_image_url = data.get("final_image_url") | |
status = (data.get("status") or "").lower() | |
if final_image_url: | |
return final_image_url | |
if status == "done": | |
return None | |
# Wait before next poll | |
time.sleep(2) | |
attempt += 1 | |
except Exception as e: | |
print(f"Error polling progress: {e}") | |
raise | |
raise Exception("Polling timed out after 30 seconds") | |
# Routes | |
def index(): | |
return render_template('index.html') | |
def healthcheck(): | |
return "Flask application is running!" | |
def upgrade_prompt(): | |
data = request.json | |
if not data or 'prompt' not in data or not data['prompt']: | |
return jsonify({"error": "Invalid prompt. Please provide a text prompt."}), 400 | |
try: | |
upgraded_prompt = get_upgraded_prompt(data['prompt']) | |
return jsonify({"upgradedPrompt": upgraded_prompt}) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
def api_generate_image(): | |
data = request.json | |
if not data or 'prompt' not in data or not data['prompt']: | |
return jsonify({"error": "Invalid prompt. Please provide a text prompt."}), 400 | |
try: | |
prompt = data['prompt'] | |
seed = data.get('seed', random.randint(0, 999999)) | |
subseed = data.get('subseed', 0) | |
attention = data.get('attention', 0.5) | |
width = data.get('width', 512) | |
height = data.get('height', 512) | |
tiling = data.get('tiling', False) | |
negative_prompt = data.get('negative_prompt', "") | |
reference_image = data.get('reference_image') | |
reference_image_type = data.get('reference_image_type') | |
reference_strength = data.get('reference_strength', 0.5) | |
task_id = generate_image( | |
prompt, seed, subseed, attention, width, height, tiling, | |
negative_prompt, reference_image, reference_image_type, reference_strength | |
) | |
return jsonify({"taskId": task_id, "status": "processing"}) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
def api_poll_progress(): | |
data = request.json | |
if not data or 'taskId' not in data or not data['taskId']: | |
return jsonify({"error": "Invalid task ID. Please provide a valid task ID."}), 400 | |
try: | |
task_id = data['taskId'] | |
image_url = poll_progress(task_id) | |
return jsonify({ | |
"final_image_url": image_url, | |
"status": "done" if image_url else "processing" | |
}) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
# Helper route to save image history | |
def save_history(): | |
data = request.json | |
try: | |
history_file = os.path.join(app.static_folder, 'data', 'history.json') | |
os.makedirs(os.path.dirname(history_file), exist_ok=True) | |
# Read existing history | |
history = [] | |
if os.path.exists(history_file): | |
with open(history_file, 'r') as f: | |
try: | |
history = json.load(f) | |
except: | |
history = [] | |
# Add new image | |
if data and isinstance(history, list): | |
# Set an ID if none exists | |
if 'id' not in data: | |
data['id'] = str(int(time.time() * 1000)) | |
# Add to the beginning of the array | |
history.insert(0, data) | |
# Keep only the last 20 images | |
history = history[:20] | |
with open(history_file, 'w') as f: | |
json.dump(history, f) | |
return jsonify({"success": True}) | |
return jsonify({"error": "Invalid data format"}), 400 | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
def get_history(): | |
try: | |
history_file = os.path.join(app.static_folder, 'data', 'history.json') | |
if os.path.exists(history_file): | |
with open(history_file, 'r') as f: | |
history = json.load(f) | |
return jsonify({"images": history}) | |
return jsonify({"images": []}) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
if __name__ == '__main__': | |
# Create data directory if it doesn't exist | |
os.makedirs(os.path.join(app.static_folder, 'data'), exist_ok=True) | |
app.run(host='0.0.0.0', port=8080, debug=True) |