Spaces:
Paused
Paused
| # secure_tts_app_huggingface.py | |
| # -*- coding: utf-8 -*- | |
| import sys, os, re, time, random, json, base64, hashlib | |
| import requests | |
| import warnings | |
| from typing import List, Dict, Optional, Tuple | |
| import concurrent.futures | |
| import urllib3 | |
| import gradio as gr | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| import asyncio | |
| import aiohttp | |
| from dataclasses import dataclass | |
| import threading | |
| import queue | |
| import logging | |
| warnings.filterwarnings("ignore") | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| # ==================== CẤU HÌNH LOGGING ==================== | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ==================== HÀM ẨN THÔNG TIN MÁY TÍNH ==================== | |
| class PrivacyProtector: | |
| """Lớp bảo vệ quyền riêng tư và ẩn thông tin máy tính""" | |
| def get_random_user_agent() -> str: | |
| """Tạo User-Agent ngẫu nhiên""" | |
| browsers = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" | |
| ] | |
| return random.choice(browsers) | |
| def get_obfuscated_headers(api_key: str) -> Dict: | |
| """Tạo headers với thông tin bị ẩn""" | |
| return { | |
| "xi-api-key": api_key, | |
| "Content-Type": "application/json", | |
| "User-Agent": PrivacyProtector.get_random_user_agent(), | |
| "Accept": "application/json", | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "Connection": "keep-alive", | |
| "Cache-Control": "no-cache", | |
| "Pragma": "no-cache", | |
| "Sec-Fetch-Dest": "empty", | |
| "Sec-Fetch-Mode": "cors", | |
| "Sec-Fetch-Site": "cross-site", | |
| "DNT": "1", | |
| "Sec-GPC": "1" | |
| } | |
| # ==================== HÀM KIỂM TRA API KEY NÂNG CAO ==================== | |
| def check_api_key_advanced(api_key: str, timeout=15) -> Dict: | |
| """Kiểm tra API key với retry mechanism và xử lý lỗi chi tiết""" | |
| max_retries = 2 | |
| for attempt in range(max_retries + 1): | |
| try: | |
| # Tạo session | |
| session = requests.Session() | |
| session.timeout = timeout | |
| # Headers | |
| headers = { | |
| "xi-api-key": api_key, | |
| "Content-Type": "application/json", | |
| "User-Agent": PrivacyProtector.get_random_user_agent(), | |
| "Accept": "application/json" | |
| } | |
| # Độ trễ ngẫu nhiên | |
| time.sleep(random.uniform(1, 3)) | |
| # Gửi request | |
| response = session.get( | |
| "https://api.elevenlabs.io/v1/user", | |
| headers=headers, | |
| timeout=timeout, | |
| verify=False | |
| ) | |
| # Xử lý response | |
| if response.status_code == 200: | |
| data = response.json() | |
| subscription = data.get("subscription", {}) | |
| character_limit = subscription.get("character_limit", 0) | |
| character_count = subscription.get("character_count", 0) | |
| remaining = max(0, character_limit - character_count) | |
| return { | |
| "valid": True, | |
| "remaining": remaining, | |
| "total": character_limit, | |
| "used": character_count, | |
| "tier": subscription.get("tier", "free"), | |
| "is_fresh": remaining >= 1000, | |
| "attempt": attempt + 1 | |
| } | |
| elif response.status_code == 401: | |
| return { | |
| "valid": False, | |
| "error": "Invalid API key (Unauthorized)", | |
| "status_code": 401 | |
| } | |
| elif response.status_code == 429: | |
| if attempt < max_retries: | |
| wait_time = 5 * (attempt + 1) | |
| time.sleep(wait_time) | |
| continue | |
| return { | |
| "valid": False, | |
| "error": "Rate limited", | |
| "status_code": 429 | |
| } | |
| else: | |
| return { | |
| "valid": False, | |
| "error": f"HTTP Error {response.status_code}", | |
| "status_code": response.status_code | |
| } | |
| except requests.exceptions.Timeout: | |
| if attempt < max_retries: | |
| time.sleep(3) | |
| continue | |
| return { | |
| "valid": False, | |
| "error": "Connection timeout", | |
| "status": "timeout" | |
| } | |
| except requests.exceptions.ConnectionError: | |
| if attempt < max_retries: | |
| time.sleep(3) | |
| continue | |
| return { | |
| "valid": False, | |
| "error": "Connection failed", | |
| "status": "connection_error" | |
| } | |
| except Exception as e: | |
| if attempt < max_retries: | |
| time.sleep(3) | |
| continue | |
| return { | |
| "valid": False, | |
| "error": f"Unexpected error: {str(e)}", | |
| "status": "error" | |
| } | |
| return { | |
| "valid": False, | |
| "error": "Max retries exceeded", | |
| "status": "max_retries" | |
| } | |
| # ==================== HÀM TẠO VOICE NÂNG CAO ==================== | |
| def generate_voice_advanced(text: str, api_key: str, voice_id: str, model_id: str, | |
| stability=0.7, similarity=0.8, style=0.0, speed=0.75, | |
| speaker_boost=True) -> Optional[bytes]: | |
| """Tạo giọng nói với retry mechanism nâng cao""" | |
| max_retries = 3 | |
| url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}" | |
| payload = { | |
| "text": text, | |
| "model_id": model_id, | |
| "voice_settings": { | |
| "stability": stability, | |
| "similarity_boost": similarity, | |
| "style": style, | |
| "speed": speed, | |
| "use_speaker_boost": speaker_boost | |
| } | |
| } | |
| for attempt in range(max_retries + 1): | |
| try: | |
| # Tạo session mới mỗi lần retry | |
| session = requests.Session() | |
| session.timeout = 45 | |
| # Headers | |
| headers = { | |
| "xi-api-key": api_key, | |
| "Content-Type": "application/json", | |
| "User-Agent": PrivacyProtector.get_random_user_agent(), | |
| "Accept": "audio/mpeg" | |
| } | |
| # Độ trễ trước khi request | |
| delay = random.uniform(2, 5) if attempt == 0 else random.uniform(10, 20) | |
| time.sleep(delay) | |
| # Gửi request | |
| response = session.post( | |
| url, | |
| headers=headers, | |
| json=payload, | |
| timeout=45, | |
| verify=False | |
| ) | |
| # Xử lý response | |
| if response.status_code == 200: | |
| audio_content = response.content | |
| # Kiểm tra kích thước file | |
| if len(audio_content) > 2048: # File hợp lệ phải > 2KB | |
| return audio_content | |
| else: | |
| # File quá nhỏ, có thể bị chặn | |
| logger.warning(f"Audio too small: {len(audio_content)} bytes") | |
| if attempt < max_retries: | |
| time.sleep(10) | |
| continue | |
| elif response.status_code == 429: | |
| # Rate limit | |
| wait_time = 30 + (attempt * 10) | |
| logger.warning(f"Rate limited, waiting {wait_time}s...") | |
| time.sleep(wait_time) | |
| continue | |
| elif response.status_code == 403: | |
| # Bị chặn | |
| logger.warning(f"Blocked (403)") | |
| if attempt < max_retries: | |
| time.sleep(15) | |
| continue | |
| else: | |
| logger.warning(f"Error {response.status_code}: {response.text[:100]}") | |
| if attempt < max_retries: | |
| time.sleep(10) | |
| continue | |
| except requests.exceptions.Timeout: | |
| logger.warning(f"Timeout on attempt {attempt + 1}") | |
| if attempt < max_retries: | |
| time.sleep(10) | |
| continue | |
| except requests.exceptions.ConnectionError: | |
| logger.warning(f"Connection error") | |
| if attempt < max_retries: | |
| time.sleep(10) | |
| continue | |
| except Exception as e: | |
| logger.error(f"Error: {str(e)}") | |
| if attempt < max_retries: | |
| time.sleep(10) | |
| continue | |
| logger.error(f"All attempts failed for text: {text[:50]}...") | |
| return None | |
| # ==================== XỬ LÝ VĂN BẢN ==================== | |
| def parse_text_blocks(raw_text, max_length=250): | |
| """Phân chia văn bản thành các block""" | |
| blocks = [] | |
| sentences = re.split(r'(?<=[.!?])\s+', raw_text) | |
| current_block = "" | |
| for sentence in sentences: | |
| if len(current_block) + len(sentence) <= max_length: | |
| current_block += " " + sentence if current_block else sentence | |
| else: | |
| if current_block: | |
| blocks.append(current_block.strip()) | |
| current_block = sentence | |
| if current_block: | |
| blocks.append(current_block.strip()) | |
| return blocks | |
| def estimate_credit(text): | |
| """Ước tính credit cần thiết""" | |
| return len(text) + 50 | |
| # ==================== QUẢN LÝ TÁC VỤ BẤT ĐỒNG BỘ ==================== | |
| class TTSRequest: | |
| text: str | |
| api_key: str | |
| voice_id: str | |
| model_id: str | |
| settings: dict | |
| file_index: int | |
| output_dir: str | |
| format: str | |
| class TTSWorker: | |
| def __init__(self): | |
| self.task_queue = queue.Queue() | |
| self.results = {} | |
| self.is_running = False | |
| self.worker_thread = None | |
| def start(self): | |
| self.is_running = True | |
| self.worker_thread = threading.Thread(target=self._process_queue) | |
| self.worker_thread.daemon = True | |
| self.worker_thread.start() | |
| def stop(self): | |
| self.is_running = False | |
| if self.worker_thread: | |
| self.worker_thread.join(timeout=5) | |
| def add_task(self, request: TTSRequest): | |
| self.task_queue.put(request) | |
| def _process_queue(self): | |
| while self.is_running: | |
| try: | |
| request = self.task_queue.get(timeout=1) | |
| try: | |
| audio = generate_voice_advanced( | |
| request.text, | |
| request.api_key, | |
| request.voice_id, | |
| request.model_id, | |
| **request.settings | |
| ) | |
| if audio: | |
| filename = os.path.join(request.output_dir, f"voice_{request.file_index:03d}.{request.format}") | |
| with open(filename, "wb") as f: | |
| f.write(audio) | |
| self.results[request.file_index] = filename | |
| else: | |
| self.results[request.file_index] = None | |
| except Exception as e: | |
| logger.error(f"Error processing task {request.file_index}: {str(e)}") | |
| self.results[request.file_index] = None | |
| finally: | |
| self.task_queue.task_done() | |
| except queue.Empty: | |
| continue | |
| def wait_completion(self, timeout=None): | |
| self.task_queue.join() | |
| return self.results | |
| # ==================== GIAO DIỆN GRADIO ==================== | |
| class SecureTTSApp: | |
| def __init__(self): | |
| self.output_dir = tempfile.mkdtemp(prefix="tts_output_") | |
| self.tts_worker = TTSWorker() | |
| self.tts_worker.start() | |
| def cleanup(self): | |
| """Dọn dẹp thư mục tạm""" | |
| self.tts_worker.stop() | |
| try: | |
| shutil.rmtree(self.output_dir) | |
| except: | |
| pass | |
| def create_interface(self): | |
| with gr.Blocks(title="🎤 ElevenLabs TTS Pro - Secure Edition", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🎤 ElevenLabs TTS Pro - Secure Edition") | |
| gr.Markdown("Chuyển văn bản thành giọng nói với bảo mật nâng cao") | |
| with gr.Tabs(): | |
| # Tab 1: Cấu hình chính | |
| with gr.Tab("⚙️ Cấu hình"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| api_keys = gr.Textbox( | |
| label="🔑 API Keys", | |
| placeholder="Nhập API keys (mỗi key một dòng)\nsk_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", | |
| lines=4, | |
| info="Có thể nhập nhiều API keys để tự động chuyển đổi" | |
| ) | |
| voice_id = gr.Textbox( | |
| label="🎤 Voice ID", | |
| placeholder="21m00Tcm4TlvDq8ikWAM", | |
| value="21m00Tcm4TlvDq8ikWAM" | |
| ) | |
| with gr.Row(): | |
| model_id = gr.Dropdown( | |
| label="🤖 Model", | |
| choices=["eleven_multilingual_v2", "eleven_turbo_v2", "eleven_monolingual_v1"], | |
| value="eleven_multilingual_v2" | |
| ) | |
| output_format = gr.Dropdown( | |
| label="📁 Format", | |
| choices=["mp3", "wav"], | |
| value="mp3" | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Voice Parameters") | |
| stability = gr.Slider( | |
| label="Stability", | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.7, | |
| step=0.01 | |
| ) | |
| similarity = gr.Slider( | |
| label="Similarity Boost", | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.8, | |
| step=0.01 | |
| ) | |
| speed = gr.Slider( | |
| label="Speed", | |
| minimum=0.5, | |
| maximum=1.5, | |
| value=1.0, | |
| step=0.05 | |
| ) | |
| speaker_boost = gr.Checkbox( | |
| label="Speaker Boost", | |
| value=True | |
| ) | |
| # Tab 2: Nhập văn bản | |
| with gr.Tab("📝 Văn bản"): | |
| text_input = gr.Textbox( | |
| label="Văn bản cần chuyển thành giọng nói", | |
| placeholder="Nhập văn bản tại đây...", | |
| lines=10 | |
| ) | |
| with gr.Row(): | |
| text_stats = gr.Markdown("**Thống kê:** 0 ký tự | 0 đoạn | Ước tính: 0 credits") | |
| with gr.Row(): | |
| gr.Examples( | |
| examples=[ | |
| ["Xin chào! Đây là ví dụ về giọng nói được tạo bởi ElevenLabs."], | |
| ["Tôi yêu công nghệ và đang tạo ra những sản phẩm tuyệt vời với trí tuệ nhân tạo."], | |
| ["Đây là một đoạn văn bản dài hơn để kiểm tra khả năng xử lý của hệ thống."] | |
| ], | |
| inputs=text_input, | |
| label="Ví dụ" | |
| ) | |
| # Tab 3: Kiểm tra API | |
| with gr.Tab("🔍 Kiểm tra API"): | |
| check_api_btn = gr.Button("Kiểm tra API Keys", variant="primary") | |
| api_status = gr.JSON(label="Kết quả kiểm tra") | |
| def check_api(keys): | |
| if not keys.strip(): | |
| return {"error": "Vui lòng nhập API keys"} | |
| keys_list = [k.strip() for k in keys.splitlines() if k.strip()] | |
| results = [] | |
| for i, key in enumerate(keys_list[:5]): # Giới hạn 5 keys | |
| info = check_api_key_advanced(key) | |
| results.append({ | |
| "key": f"Key {i+1}", | |
| "valid": info.get("valid", False), | |
| "remaining": info.get("remaining", 0), | |
| "error": info.get("error", ""), | |
| "tier": info.get("tier", "unknown") | |
| }) | |
| return {"total_keys": len(keys_list), "checked": len(results), "results": results} | |
| # Tab 4: Output | |
| with gr.Tab("📊 Output"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| progress_bar = gr.Progress(label="Tiến trình") | |
| status_output = gr.Textbox(label="Trạng thái", lines=3) | |
| start_btn = gr.Button( | |
| "🚀 Bắt đầu tạo giọng nói", | |
| variant="primary", | |
| scale=1 | |
| ) | |
| stop_btn = gr.Button("⏹️ Dừng", variant="stop", scale=1) | |
| with gr.Column(): | |
| audio_output = gr.Audio(label="Âm thanh đã tạo") | |
| download_btn = gr.File(label="Tải file") | |
| with gr.Row(): | |
| gr.Markdown("### Logs") | |
| log_output = gr.Textbox(label="Logs", lines=10, interactive=False) | |
| # Xử lý sự kiện nhập văn bản | |
| def update_text_stats(text): | |
| if not text.strip(): | |
| return "**Thống kê:** 0 ký tự | 0 đoạn | Ước tính: 0 credits" | |
| char_count = len(text) | |
| blocks = parse_text_blocks(text) | |
| block_count = len(blocks) | |
| credit_estimate = char_count + (block_count * 50) | |
| return f"**Thống kê:** {char_count:,} ký tự | {block_count} đoạn | Ước tính: {credit_estimate:,} credits" | |
| text_input.change( | |
| fn=update_text_stats, | |
| inputs=[text_input], | |
| outputs=[text_stats] | |
| ) | |
| # Xử lý nút bắt đầu | |
| def start_tts_generation(api_keys_text, voice_id_text, model, format, | |
| stability_val, similarity_val, speed_val, | |
| speaker_boost_val, text): | |
| # Kiểm tra đầu vào | |
| if not api_keys_text.strip(): | |
| yield "❌ Vui lòng nhập API keys", None, None, "" | |
| return | |
| if not voice_id_text.strip(): | |
| yield "❌ Vui lòng nhập Voice ID", None, None, "" | |
| return | |
| if not text.strip(): | |
| yield "❌ Vui lòng nhập văn bản", None, None, "" | |
| return | |
| # Parse API keys | |
| api_keys_list = [k.strip() for k in api_keys_text.splitlines() if k.strip()] | |
| # Parse văn bản | |
| text_blocks = parse_text_blocks(text) | |
| # Cấu hình | |
| settings = { | |
| "stability": stability_val, | |
| "similarity": similarity_val, | |
| "style": 0.0, | |
| "speed": speed_val, | |
| "speaker_boost": speaker_boost_val | |
| } | |
| # Tạo thư mục output | |
| output_dir = tempfile.mkdtemp(dir=self.output_dir) | |
| logs = [] | |
| logs.append("="*60) | |
| logs.append("🚀 BẮT ĐẦU TẠO GIỌNG NÓI") | |
| logs.append(f"🔑 API keys: {len(api_keys_list)}") | |
| logs.append(f"🎤 Voice ID: {voice_id_text}") | |
| logs.append(f"🤖 Model: {model}") | |
| logs.append(f"📁 Format: {format}") | |
| logs.append(f"📝 Số đoạn văn bản: {len(text_blocks)}") | |
| logs.append("="*60) | |
| yield "\n".join(logs), 0, None, "" | |
| # Kiểm tra API keys | |
| valid_keys = [] | |
| for i, key in enumerate(api_keys_list[:3], 1): # Giới hạn 3 keys | |
| info = check_api_key_advanced(key) | |
| if info.get("valid") and info.get("remaining", 0) > 100: | |
| valid_keys.append({ | |
| "key": key, | |
| "remaining": info.get("remaining", 0) | |
| }) | |
| logs.append(f"✅ Key {i}: {info['remaining']:,} ký tự còn lại") | |
| else: | |
| logs.append(f"❌ Key {i}: {info.get('error', 'Invalid')}") | |
| yield "\n".join(logs), 0, None, "" | |
| time.sleep(1) | |
| if not valid_keys: | |
| logs.append("❌ Không có API key hợp lệ") | |
| yield "\n".join(logs), 0, None, "" | |
| return | |
| logs.append(f"✅ Đã tìm thấy {len(valid_keys)} API keys hợp lệ") | |
| yield "\n".join(logs), 0, None, "" | |
| # Tạo âm thanh cho từng block | |
| created_files = [] | |
| for i, text_block in enumerate(text_blocks, 1): | |
| # Chọn API key theo round-robin | |
| key_idx = (i - 1) % len(valid_keys) | |
| key_info = valid_keys[key_idx] | |
| logs.append(f"🎤 Đang tạo block {i}/{len(text_blocks)}...") | |
| yield "\n".join(logs), i/len(text_blocks), None, "" | |
| # Tạo request | |
| request = TTSRequest( | |
| text=text_block, | |
| api_key=key_info["key"], | |
| voice_id=voice_id_text, | |
| model_id=model, | |
| settings=settings, | |
| file_index=i, | |
| output_dir=output_dir, | |
| format=format | |
| ) | |
| self.tts_worker.add_task(request) | |
| # Đợi hoàn thành | |
| results = self.tts_worker.wait_completion(timeout=300) | |
| # Lấy danh sách file đã tạo | |
| created_files = [results[i] for i in range(1, len(text_blocks) + 1) if results.get(i)] | |
| if len(created_files) == len(text_blocks): | |
| logs.append("✅ Đã tạo tất cả các block") | |
| # Nếu chỉ có 1 file, trả về luôn | |
| if len(created_files) == 1: | |
| audio_file = created_files[0] | |
| logs.append(f"🎉 Hoàn thành! File: {os.path.basename(audio_file)}") | |
| yield "\n".join(logs), 1.0, audio_file, audio_file | |
| else: | |
| # Merge các file nếu có nhiều block | |
| logs.append("🔗 Đang merge các file...") | |
| yield "\n".join(logs), 0.9, None, "" | |
| # Tạo file tổng hợp (đơn giản: chỉ trả về file đầu tiên cho demo) | |
| audio_file = created_files[0] if created_files else None | |
| logs.append(f"🎉 Hoàn thành! Đã tạo {len(created_files)} files") | |
| yield "\n".join(logs), 1.0, audio_file, audio_file | |
| else: | |
| created_count = len([f for f in created_files if f]) | |
| logs.append(f"⚠️ Đã tạo {created_count}/{len(text_blocks)} blocks") | |
| if created_files: | |
| audio_file = created_files[0] | |
| yield "\n".join(logs), created_count/len(text_blocks), audio_file, audio_file | |
| else: | |
| logs.append("❌ Không thể tạo bất kỳ file nào") | |
| yield "\n".join(logs), 0, None, "" | |
| # Kết nối các nút | |
| start_btn.click( | |
| fn=start_tts_generation, | |
| inputs=[ | |
| api_keys, voice_id, model_id, output_format, | |
| stability, similarity, speed, speaker_boost, text_input | |
| ], | |
| outputs=[status_output, progress_bar, audio_output, download_btn] | |
| ) | |
| # Xử lý nút dừng | |
| def stop_generation(): | |
| self.tts_worker.stop() | |
| self.tts_worker = TTSWorker() | |
| self.tts_worker.start() | |
| return "⏹️ Đã dừng quá trình xử lý" | |
| stop_btn.click( | |
| fn=stop_generation, | |
| outputs=[status_output] | |
| ) | |
| # Xử lý khi đóng | |
| demo.load( | |
| fn=lambda: f"Ứng dụng đã sẵn sàng. Thư mục output: {self.output_dir}", | |
| outputs=[status_output] | |
| ) | |
| return demo | |
| # ==================== MAIN ==================== | |
| def main(): | |
| app = SecureTTSApp() | |
| try: | |
| demo = app.create_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=False, | |
| show_error=True | |
| ) | |
| except KeyboardInterrupt: | |
| print("\nĐang dọn dẹp...") | |
| finally: | |
| app.cleanup() | |
| if __name__ == "__main__": | |
| main() |