| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
|
|
| from flask import Flask, request |
| from queue import Queue |
| from services.webhook import send_webhook |
| import threading |
| import uuid |
| import os |
| import time |
| from version import BUILD_NUMBER |
| from app_utils import log_job_status, discover_and_register_blueprints |
|
|
| MAX_QUEUE_LENGTH = int(os.environ.get('MAX_QUEUE_LENGTH', 0)) |
|
|
| def create_app(): |
| app = Flask(__name__) |
| @app.route("/") |
| def home(): |
| |
| return "API server is running. Please use the specific endpoints to interact." |
| |
| |
| |
| task_queue = Queue() |
| queue_id = id(task_queue) |
|
|
| |
| def process_queue(): |
| while True: |
| job_id, data, task_func, queue_start_time = task_queue.get() |
| queue_time = time.time() - queue_start_time |
| run_start_time = time.time() |
| pid = os.getpid() |
| |
| |
| log_job_status(job_id, { |
| "job_status": "running", |
| "job_id": job_id, |
| "queue_id": queue_id, |
| "process_id": pid, |
| "response": None |
| }) |
| |
| response = task_func() |
| run_time = time.time() - run_start_time |
| total_time = time.time() - queue_start_time |
|
|
| response_data = { |
| "endpoint": response[1], |
| "code": response[2], |
| "id": data.get("id"), |
| "job_id": job_id, |
| "response": response[0] if response[2] == 200 else None, |
| "message": "success" if response[2] == 200 else response[0], |
| "pid": pid, |
| "queue_id": queue_id, |
| "run_time": round(run_time, 3), |
| "queue_time": round(queue_time, 3), |
| "total_time": round(total_time, 3), |
| "queue_length": task_queue.qsize(), |
| "build_number": BUILD_NUMBER |
| } |
| |
| |
| log_job_status(job_id, { |
| "job_status": "done", |
| "job_id": job_id, |
| "queue_id": queue_id, |
| "process_id": pid, |
| "response": response_data |
| }) |
|
|
| |
| if data.get("webhook_url") and data.get("webhook_url") != "": |
| send_webhook(data.get("webhook_url"), response_data) |
|
|
| task_queue.task_done() |
|
|
| |
| threading.Thread(target=process_queue, daemon=True).start() |
|
|
| |
| def queue_task(bypass_queue=False): |
| def decorator(f): |
| def wrapper(*args, **kwargs): |
| job_id = str(uuid.uuid4()) |
| data = request.json if request.is_json else {} |
| pid = os.getpid() |
| start_time = time.time() |
| |
| if bypass_queue or 'webhook_url' not in data: |
| |
| |
| log_job_status(job_id, { |
| "job_status": "running", |
| "job_id": job_id, |
| "queue_id": queue_id, |
| "process_id": pid, |
| "response": None |
| }) |
| |
| response = f(job_id=job_id, data=data, *args, **kwargs) |
| run_time = time.time() - start_time |
| |
| response_obj = { |
| "code": response[2], |
| "id": data.get("id"), |
| "job_id": job_id, |
| "response": response[0] if response[2] == 200 else None, |
| "message": "success" if response[2] == 200 else response[0], |
| "run_time": round(run_time, 3), |
| "queue_time": 0, |
| "total_time": round(run_time, 3), |
| "pid": pid, |
| "queue_id": queue_id, |
| "queue_length": task_queue.qsize(), |
| "build_number": BUILD_NUMBER |
| } |
| |
| |
| log_job_status(job_id, { |
| "job_status": "done", |
| "job_id": job_id, |
| "queue_id": queue_id, |
| "process_id": pid, |
| "response": response_obj |
| }) |
| |
| return response_obj, response[2] |
| else: |
| if MAX_QUEUE_LENGTH > 0 and task_queue.qsize() >= MAX_QUEUE_LENGTH: |
| error_response = { |
| "code": 429, |
| "id": data.get("id"), |
| "job_id": job_id, |
| "message": f"MAX_QUEUE_LENGTH ({MAX_QUEUE_LENGTH}) reached", |
| "pid": pid, |
| "queue_id": queue_id, |
| "queue_length": task_queue.qsize(), |
| "build_number": BUILD_NUMBER |
| } |
| |
| |
| log_job_status(job_id, { |
| "job_status": "done", |
| "job_id": job_id, |
| "queue_id": queue_id, |
| "process_id": pid, |
| "response": error_response |
| }) |
| |
| return error_response, 429 |
| |
| |
| log_job_status(job_id, { |
| "job_status": "queued", |
| "job_id": job_id, |
| "queue_id": queue_id, |
| "process_id": pid, |
| "response": None |
| }) |
| |
| task_queue.put((job_id, data, lambda: f(job_id=job_id, data=data, *args, **kwargs), start_time)) |
| |
| return { |
| "code": 202, |
| "id": data.get("id"), |
| "job_id": job_id, |
| "message": "processing", |
| "pid": pid, |
| "queue_id": queue_id, |
| "max_queue_length": MAX_QUEUE_LENGTH if MAX_QUEUE_LENGTH > 0 else "unlimited", |
| "queue_length": task_queue.qsize(), |
| "build_number": BUILD_NUMBER |
| }, 202 |
| return wrapper |
| return decorator |
|
|
| app.queue_task = queue_task |
|
|
| |
| from routes.v1.media.feedback import create_root_next_routes |
| create_root_next_routes(app) |
| |
| |
| discover_and_register_blueprints(app) |
|
|
| return app |
|
|
| app = create_app() |
|
|
| if __name__ == '__main__': |
| app.launch(server_name="0.0.0.0", server_port=7860) |
|
|