| from flask import Flask, request, jsonify, render_template |
| from datetime import datetime |
| from flask_cors import CORS |
| from TTS.api import TTS |
| import os |
| import base64 |
| import logging |
| import threading |
| import tempfile |
| import shutil |
| import textwrap |
| import torch |
| from pydub import AudioSegment |
| import psutil |
| import warnings |
|
|
| from helper import ( |
| save_audio, |
| generate_random_filename, |
| save_to_dataset_repo, |
| video_to_audio, |
| validate_audio_file, |
| ensure_wav_format, |
| ) |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| log = logging.getLogger("app") |
|
|
| |
| warnings.filterwarnings("ignore", category=UserWarning, module="transformers") |
| logging.getLogger("transformers").setLevel(logging.ERROR) |
|
|
| app = Flask(__name__) |
| CORS(app) |
| os.environ["COQUI_TOS_AGREED"] = "1" |
|
|
| device = "cpu" |
| MODEL_NAME = "tts_models/multilingual/multi-dataset/xtts_v2" |
| MAX_AUDIO_SIZE_MB = 15 |
| MAX_TEXT_LEN = 150 |
|
|
| |
| tts = None |
| try: |
| log.info(f"⬇️ Initializing XTTS from {MODEL_NAME}...") |
| tts = TTS(model_name=MODEL_NAME).to(device) |
| log.info("✅ TTS ready (direct init).") |
| except Exception as exc: |
| log.exception("Fatal: TTS init failed: %s", exc) |
| raise |
|
|
| |
| |
| |
| active_tasks = {} |
|
|
|
|
| @app.route("/") |
| def greet_html(): |
| return render_template("home.html") |
|
|
|
|
| @app.route("/sign-in") |
| def sign_in(): |
| return render_template("sign_in.html") |
|
|
|
|
| @app.route("/user_dash") |
| def user_dash(): |
| user_id = request.args.get("user_id") |
| if user_id: |
| return render_template("u_dash.html", user_id=user_id) |
| return jsonify({"error": "Missing user_id"}), 400 |
|
|
|
|
| @app.route("/generate_voice", methods=["POST"]) |
| def generate_voice(): |
| try: |
| data = request.get_json() |
| if not data: |
| return jsonify({"error": "No JSON body"}), 400 |
|
|
| video = data.get("video") |
| text = data.get("text") |
| audio_base64 = data.get("audio") |
| task_id = data.get("task_id") |
| user_id = data.get("user_id") |
|
|
| if not user_id: |
| return jsonify({"error": "You must sign in before using this AI"}), 401 |
| if not text: |
| return jsonify({"error": "Please input a prompt"}), 400 |
| if not task_id: |
| return jsonify({"error": "task_id is required"}), 400 |
| if task_id in active_tasks: |
| return jsonify({"error": f"There is already an active task for {task_id}"}), 409 |
|
|
| active_tasks[task_id] = { |
| "user_id": user_id, |
| "status": "Processing", |
| "created_at": datetime.now(), |
| } |
|
|
| |
| process_vox(user_id, text, video, audio_base64, task_id) |
| return jsonify({"message": "Processing started", "task_id": task_id}), 202 |
|
|
| except Exception as e: |
| log.exception("generate_voice error: %s", e) |
| return jsonify({"error": str(e)}), 500 |
|
|
|
|
| def process_vox(user_id, text, video, audio_base64, task_id): |
| temp_audio_path = None |
| temp_output_path = None |
| try: |
| |
| ram_gb = psutil.virtual_memory().available / (1024 ** 3) |
| log.info(f"Available RAM: {ram_gb:.1f} GB") |
| if ram_gb < 1.5: |
| raise Exception("Low RAM: Please try a shorter text or later.") |
|
|
| |
| if audio_base64: |
| if audio_base64.startswith("data:audio/"): |
| audio_base64 = audio_base64.split(",", 1)[1] |
| temp_audio_path = f"/tmp/temp_ref_{task_id}.wav" |
| with open(temp_audio_path, "wb") as f: |
| f.write(base64.b64decode(audio_base64)) |
| elif video: |
| temp_audio_path = video_to_audio(video, output_path=None) |
|
|
| |
| temp_audio_path = ensure_wav_format(temp_audio_path) |
| valid, msg = validate_audio_file(temp_audio_path, MAX_AUDIO_SIZE_MB) |
| if not valid: |
| raise Exception(f"Invalid audio file: {msg}") |
|
|
| |
| temp_output_path = clone(text, temp_audio_path) |
|
|
| |
| out_dir = "user_audios" |
| os.makedirs(out_dir, exist_ok=True) |
| file_name = generate_random_filename("mp3") |
| file_path = os.path.join(out_dir, file_name) |
|
|
| with open(temp_output_path, "rb") as src, open(file_path, "wb") as dst: |
| dst.write(src.read()) |
|
|
| |
| import wave |
| with wave.open(file_path, "rb") as wf: |
| dura = wf.getnframes() / float(wf.getframerate()) |
| duration = f"{dura:.2f}" |
| title = text[:20] |
|
|
| |
| audio_url = save_to_dataset_repo(file_path, f"user/data/audios/{file_name}", file_name) |
| active_tasks[task_id].update( |
| { |
| "status": "completed", |
| "audio_url": audio_url, |
| "completion_time": datetime.now(), |
| } |
| ) |
| save_audio(user_id, audio_url, title or "Audio", text, duration) |
|
|
| except Exception as e: |
| log.exception("process_vox failed: %s", e) |
| active_tasks[task_id] = { |
| "status": "failed", |
| "error": str(e), |
| "completion_time": datetime.now(), |
| } |
|
|
| finally: |
| |
| for path in [temp_audio_path, temp_output_path]: |
| if path and os.path.exists(path): |
| try: |
| os.remove(path) |
| except: |
| pass |
| task = active_tasks.get(task_id) |
| if task and task["status"] == "completed": |
| remove_task_after_delay(task_id, delay_seconds=300) |
| elif task and task["status"] == "failed": |
| |
| threading.Timer(60, lambda: active_tasks.pop(task_id, None)).start() |
|
|
|
|
| def clone(text, audio): |
| """ |
| Generate cloned audio; chunk long text to avoid OOM. |
| Returns path to (possibly concatenated) output WAV. |
| """ |
| |
| lang = "en" |
| if any(ord(c) in range(0x0900, 0x0980) for c in text): |
| lang = "hi" |
| elif any(c in "äöüß" for c in text): |
| lang = "de" |
|
|
| log.info(f"Cloning with lang: {lang}, text len: {len(text)}") |
| out_path = tempfile.mktemp(suffix=".wav") |
| |
| wrapped = textwrap.wrap(text, width=MAX_TEXT_LEN, break_long_words=False) |
| chunks = wrapped if len(wrapped) > 1 else [text] |
|
|
| log.info(f"Split into {len(chunks)} chunks") |
| chunk_files = [] |
| for i, chunk in enumerate(chunks): |
| if not chunk.strip(): continue |
| chunk_out = tempfile.mktemp(suffix=f"_chunk{i}.wav") |
| with torch.no_grad(): |
| tts.tts_to_file( |
| text=chunk.strip(), |
| speaker_wav=audio, |
| language=lang, |
| file_path=chunk_out, |
| split_sentences=True |
| ) |
| chunk_files.append(chunk_out) |
|
|
| |
| if chunk_files: |
| combined = AudioSegment.empty() |
| for f in chunk_files: |
| combined += AudioSegment.from_wav(f) |
| combined.export(out_path, format="wav") |
| |
| for f in chunk_files: |
| try: |
| os.remove(f) |
| except: |
| pass |
| else: |
| raise Exception("No chunks generated—check text input.") |
|
|
| |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
|
|
| log.info("Clone complete.") |
| return out_path |
|
|
|
|
| @app.route("/task_status") |
| def task_status(): |
| task_id = request.args.get("task_id") |
| if not task_id: |
| return jsonify({"error": "task_id parameter is required"}), 400 |
|
|
| if task_id not in active_tasks: |
| return jsonify({"status": "not found"}), 404 |
|
|
| task = active_tasks[task_id] |
| response_data = { |
| "status": task["status"], |
| "start_time": task.get("created_at").isoformat() if task.get("created_at") else None, |
| } |
|
|
| if task["status"] == "completed": |
| response_data["audio_url"] = task.get("audio_url") |
| response_data["completion_time"] = ( |
| task.get("completion_time").isoformat() if task.get("completion_time") else None |
| ) |
| elif task["status"] == "failed": |
| response_data["error"] = task.get("error") |
| response_data["completion_time"] = ( |
| task.get("completion_time").isoformat() if task.get("completion_time") else None |
| ) |
|
|
| return jsonify(response_data) |
|
|
|
|
| def remove_task_after_delay(task_id, delay_seconds=300): |
| def remove_task(): |
| if task_id in active_tasks: |
| del active_tasks[task_id] |
| log.info(f"Task {task_id} auto-deleted after {delay_seconds} seconds.") |
| timer = threading.Timer(delay_seconds, remove_task) |
| timer.start() |
|
|
|
|
| if __name__ == "__main__": |
| app.run(debug=True, host="0.0.0.0", port=7860) |