from fastapi import FastAPI, Query import uvicorn import requests import os from pydub import AudioSegment from uuid import uuid4 app = FastAPI() # ✅ Create directory to store audio files AUDIO_DIR = "audio_files" os.makedirs(AUDIO_DIR, exist_ok=True) # ✅ Replace with your actual TTS API URL TTS_API_URL = "https://hivecorp-s8test.hf.space/tts" # Change this # 🔹 Function: Splitting long text into smaller chunks def split_text(text, max_length=500): """Splits text into chunks of max_length characters""" words = text.split() chunks = [] current_chunk = [] current_length = 0 for word in words: if current_length + len(word) + 1 > max_length: chunks.append(" ".join(current_chunk)) current_chunk = [] current_length = 0 current_chunk.append(word) current_length += len(word) + 1 if current_chunk: chunks.append(" ".join(current_chunk)) return chunks # 🔹 Function: Call external TTS API def call_tts_api(text, voice): """Calls external TTS API and returns the saved MP3 filename.""" try: response = requests.post( TTS_API_URL, json={"text": text, "voice": voice}, headers={"Content-Type": "application/json"} ) data = response.json() # ✅ Log response for debugging print("TTS API Response:", data) if "audio_url" in data: audio_url = data["audio_url"] filename = f"audio_{uuid4().hex}.mp3" audio_path = os.path.join(AUDIO_DIR, filename) # ✅ Download the MP3 file with requests.get(audio_url, stream=True) as r: r.raise_for_status() with open(audio_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return audio_path # ✅ Return saved file path else: print("Error: Invalid TTS API response", data) return None except Exception as e: print("TTS API Error:", e) return None # 🔹 Function: Merge multiple MP3s into one def merge_audio(audio_files): """Merges multiple MP3 files into one final MP3.""" if not audio_files: return None final_audio = AudioSegment.empty() for file in audio_files: final_audio += AudioSegment.from_mp3(file) final_filename = f"{AUDIO_DIR}/final_{uuid4().hex}.mp3" final_audio.export(final_filename, format="mp3") return final_filename # ✅ API Endpoint: Generate Audio from Text @app.post("/generate-audio") def generate_audio(text: str = Query(...), voice: str = Query(...)): """Splits text, calls TTS API, merges audio, and returns the final MP3.""" chunks = split_text(text) print("Total Chunks:", len(chunks)) audio_files = [] for chunk in chunks: filename = call_tts_api(chunk, voice) if filename: audio_files.append(filename) if not audio_files: return {"error": "TTS API failed for all parts"} final_audio = merge_audio(audio_files) if final_audio: return {"success": True, "audio_url": f"/download/{os.path.basename(final_audio)}"} else: return {"error": "Failed to merge audio files"} # ✅ API Endpoint: Download the generated MP3 @app.get("/download/{filename}") def download_file(filename: str): """Allows users to download the generated MP3 file.""" file_path = os.path.join(AUDIO_DIR, filename) if os.path.exists(file_path): return {"download_url": f"https://hivecorp-manager1.hf.space/audio_files/{filename}"} return {"error": "File not found"} # ✅ Ensure the app starts when running in Hugging Face Spaces if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)