Spaces:
Sleeping
Sleeping
from flask import Flask, request, send_file, jsonify, render_template | |
import os | |
import requests | |
from pydub import AudioSegment | |
import io | |
import uuid | |
import tempfile | |
from flask_cors import CORS | |
import sqlite3 | |
from datetime import datetime | |
app = Flask(__name__, static_folder='static') | |
CORS(app) | |
# Configuration | |
class Config: | |
ELEVENLABS_API_KEY = os.getenv('ELEVENLABS_API_KEY', "<YOUR-API-KEY>") | |
VOICE_ID = "21m00Tcm4TlvDq8ikWAM" # Default voice (Rachel) | |
CHUNK_SIZE = 2000 # Characters per chunk | |
TEMP_DIR = "temp_audio" | |
DB_PATH = "data/audio_history.db" | |
def create_temp_dir(): | |
if not os.path.exists(Config.TEMP_DIR): | |
os.makedirs(Config.TEMP_DIR) | |
def init_db(): | |
conn = sqlite3.connect(Config.DB_PATH) | |
c = conn.cursor() | |
c.execute(''' | |
CREATE TABLE IF NOT EXISTS audio_generations | |
(id TEXT PRIMARY KEY, | |
timestamp DATETIME, | |
text TEXT, | |
file_path TEXT) | |
''') | |
conn.commit() | |
conn.close() | |
# Database operations | |
class DatabaseManager: | |
def add_generation(generation_id, text, file_path): | |
conn = sqlite3.connect(Config.DB_PATH) | |
c = conn.cursor() | |
c.execute( | |
"INSERT INTO audio_generations VALUES (?, ?, ?, ?)", | |
(generation_id, datetime.now(), text, file_path) | |
) | |
conn.commit() | |
conn.close() | |
def get_generations(): | |
conn = sqlite3.connect(Config.DB_PATH) | |
c = conn.cursor() | |
c.execute("SELECT * FROM audio_generations ORDER BY timestamp DESC") | |
generations = c.fetchall() | |
conn.close() | |
return generations | |
# Text processor class | |
class TextProcessor: | |
def split_text(text, chunk_size): | |
words = text.split() | |
chunks = [] | |
current_chunk = [] | |
current_size = 0 | |
for word in words: | |
if current_size + len(word) > chunk_size: | |
chunks.append(" ".join(current_chunk)) | |
current_chunk = [word] | |
current_size = len(word) | |
else: | |
current_chunk.append(word) | |
current_size += len(word) + 1 | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
return chunks | |
# Audio Generator class | |
class AudioGenerator: | |
def __init__(self): | |
self.api_key = Config.ELEVENLABS_API_KEY | |
self.voice_id = Config.VOICE_ID | |
def generate_audio_chunk(self, text, previous_text=None, next_text=None, previous_request_ids=None): | |
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self.voice_id}/stream" | |
headers = { | |
"xi-api-key": self.api_key | |
} | |
payload = { | |
"text": text, | |
"model_id": "eleven_multilingual_v2" | |
} | |
if previous_text is not None: | |
payload["previous_text"] = previous_text | |
if next_text is not None: | |
payload["next_text"] = next_text | |
if previous_request_ids: | |
payload["previous_request_ids"] = previous_request_ids[-3:] | |
response = requests.post(url, json=payload, headers=headers) | |
if response.status_code != 200: | |
raise Exception(f"API Error: {response.status_code} - {response.text}") | |
return response.content, response.headers.get("request-id") | |
# Routes | |
def index(): | |
return render_template('index.html') | |
def generate_audio(): | |
try: | |
data = request.get_json() | |
if not data or 'text' not in data: | |
return jsonify({'error': 'No text provided'}), 400 | |
text = data['text'] | |
context_before = data.get('context_before', '') | |
context_after = data.get('context_after', '') | |
processor = TextProcessor() | |
generator = AudioGenerator() | |
chunks = processor.split_text(text, Config.CHUNK_SIZE) | |
segments = [] | |
previous_request_ids = [] | |
for i, chunk in enumerate(chunks): | |
is_first = i == 0 | |
is_last = i == len(chunks) - 1 | |
prev_text = None if is_first else (context_before + " " + " ".join(chunks[:i])).strip() | |
next_text = None if is_last else (" ".join(chunks[i+1:]) + " " + context_after).strip() | |
audio_content, request_id = generator.generate_audio_chunk( | |
chunk, | |
previous_text=prev_text, | |
next_text=next_text, | |
previous_request_ids=previous_request_ids | |
) | |
segments.append(AudioSegment.from_mp3(io.BytesIO(audio_content))) | |
previous_request_ids.append(request_id) | |
final_audio = segments[0] | |
for segment in segments[1:]: | |
final_audio += segment | |
generation_id = str(uuid.uuid4()) | |
output_path = os.path.join(Config.TEMP_DIR, f"{generation_id}.wav") | |
final_audio.export(output_path, format="wav") | |
# Save to database | |
DatabaseManager.add_generation(generation_id, text, output_path) | |
return send_file( | |
output_path, | |
mimetype="audio/wav", | |
as_attachment=True, | |
download_name="generated_audio.wav" | |
) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def get_voices(): | |
try: | |
response = requests.get( | |
"https://api.elevenlabs.io/v1/voices", | |
headers={"xi-api-key": Config.ELEVENLABS_API_KEY} | |
) | |
return jsonify(response.json()) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def get_history(): | |
try: | |
generations = DatabaseManager.get_generations() | |
return jsonify([{ | |
'id': g[0], | |
'timestamp': g[1], | |
'text': g[2], | |
'file_path': g[3] | |
} for g in generations]) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
# Error handlers | |
def not_found(error): | |
return jsonify({'error': 'Not found'}), 404 | |
def server_error(error): | |
return jsonify({'error': 'Server error'}), 500 | |
if __name__ == '__main__': | |
# Initialize directories and database | |
Config.create_temp_dir() | |
Config.init_db() | |
# Run the Flask app | |
port = int(os.getenv('PORT', 7860)) | |
app.run(host='0.0.0.0', port=port) |