Spaces:
Sleeping
Sleeping
| """音频转文字服务主应用程序 | |
| 基于Gradio的音频转文字Web服务应用程序入口。 | |
| """ | |
| import asyncio | |
| import sys | |
| import signal | |
| import time | |
| from pathlib import Path | |
| from typing import Optional | |
| # 添加项目根目录到Python路径 | |
| project_root = Path(__file__).parent | |
| sys.path.insert(0, str(project_root)) | |
| # 加载环境变量 | |
| from dotenv import load_dotenv | |
| load_dotenv(project_root / ".env") | |
| from src.core.config import get_config, reload_config | |
| from src.utils.logger import get_logger | |
| from src.api.gradio_interface import get_gradio_interface | |
| from src.core.task_manager import get_task_manager, TaskStatus | |
| class TranscriptServiceApp: | |
| """音频转文字服务应用程序""" | |
| def __init__(self, environment: Optional[str] = None): | |
| """初始化应用程序 | |
| Args: | |
| environment: 运行环境 (development/production) | |
| """ | |
| # 加载配置 | |
| if environment: | |
| self.config = reload_config(environment) | |
| else: | |
| self.config = get_config() | |
| # 初始化日志 | |
| self.logger = get_logger("transcript_service.app") | |
| # 初始化界面 | |
| self.gradio_interface = get_gradio_interface() | |
| # 添加健康检查端点 | |
| self._setup_health_endpoint() | |
| # 运行状态 | |
| self.is_running = False | |
| self.logger.info(f"应用程序初始化完成 - 环境: {self.config.environment}") | |
| def _setup_health_endpoint(self): | |
| """设置健康检查端点""" | |
| try: | |
| import gradio as gr | |
| def health_check(): | |
| """健康检查函数""" | |
| import json | |
| import time | |
| health_data = { | |
| "status": "healthy", | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "environment": self.config.environment, | |
| "version": self.config.app.version, | |
| "uptime": time.time() - getattr(self, '_start_time', time.time()), | |
| "services": { | |
| "oss": self._check_oss_connection(), | |
| "dashscope": self._check_dashscope_connection() | |
| } | |
| } | |
| return json.dumps(health_data, indent=2, ensure_ascii=False) | |
| # 在Gradio应用中添加健康检查端点 | |
| if hasattr(self.gradio_interface, 'app'): | |
| from fastapi.responses import JSONResponse | |
| async def health_endpoint(): | |
| health_data = { | |
| "status": "healthy", | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "environment": self.config.environment, | |
| "version": self.config.app.version, | |
| "uptime": time.time() - getattr(self, '_start_time', time.time()), | |
| "services": { | |
| "oss": self._check_oss_connection(), | |
| "dashscope": self._check_dashscope_connection() | |
| } | |
| } | |
| return JSONResponse(content=health_data) | |
| except Exception as e: | |
| self.logger.warning(f"设置健康检查端点失败: {e}") | |
| def _check_oss_connection(self) -> bool: | |
| """检查OSS连接""" | |
| try: | |
| if not (self.config.oss.access_key_id and self.config.oss.access_key_secret): | |
| return False | |
| import oss2 | |
| auth = oss2.Auth(self.config.oss.access_key_id, self.config.oss.access_key_secret) | |
| service = oss2.Service(auth, "https://oss-cn-beijing.aliyuncs.com") | |
| # 简单的连接测试 | |
| list(service.list_buckets(max_keys=1)) | |
| return True | |
| except Exception: | |
| return False | |
| def _check_dashscope_connection(self) -> bool: | |
| """检查DashScope连接""" | |
| try: | |
| if not self.config.dashscope.api_key: | |
| return False | |
| # 简单的API key格式检查 | |
| return self.config.dashscope.api_key.startswith("sk-") | |
| except Exception: | |
| return False | |
| def setup_signal_handlers(self): | |
| """设置信号处理器""" | |
| # 移除优雅关闭功能,允许应用直接终止 | |
| pass | |
| def validate_environment(self) -> bool: | |
| """验证运行环境 | |
| Returns: | |
| 环境是否有效 | |
| """ | |
| try: | |
| # 检查必要的环境变量 | |
| missing_vars = [] | |
| if not self.config.oss.access_key_id: | |
| missing_vars.append("OSS_ACCESS_KEY_ID") | |
| if not self.config.oss.access_key_secret: | |
| missing_vars.append("OSS_ACCESS_KEY_SECRET") | |
| if not self.config.dashscope.api_key: | |
| missing_vars.append("DASHSCOPE_API_KEY") | |
| if missing_vars: | |
| self.logger.error(f"缺少必要的环境变量: {missing_vars}") | |
| return False | |
| # 检查目录权限 | |
| logs_dir = self.config.get_logs_dir() | |
| temp_dir = self.config.get_temp_dir() | |
| for directory in [logs_dir, temp_dir]: | |
| if not directory.exists(): | |
| directory.mkdir(parents=True, exist_ok=True) | |
| # 测试写权限 | |
| test_file = directory / ".write_test" | |
| try: | |
| test_file.write_text("test") | |
| test_file.unlink() | |
| except Exception as e: | |
| self.logger.error(f"目录权限检查失败 {directory}: {str(e)}") | |
| return False | |
| self.logger.info("环境验证通过") | |
| return True | |
| except Exception as e: | |
| self.logger.exception(f"环境验证失败: {str(e)}") | |
| return False | |
| def run(self, **launch_kwargs): | |
| """启动应用程序 | |
| Args: | |
| **launch_kwargs: Gradio启动参数 | |
| """ | |
| try: | |
| # 设置信号处理器 | |
| self.setup_signal_handlers() | |
| # 验证环境 | |
| if not self.validate_environment(): | |
| self.logger.error("环境验证失败,应用程序无法启动") | |
| sys.exit(1) | |
| # 启动应用 | |
| self.is_running = True | |
| self._start_time = time.time() # 记录启动时间 | |
| self.logger.info("正在启动音频转文字服务...") | |
| # 启动Gradio界面 | |
| self.gradio_interface.launch(**launch_kwargs) | |
| except OSError as e: | |
| if "address already in use" in str(e).lower(): | |
| port = launch_kwargs.get('server_port', self.config.app.port) | |
| self.logger.warning(f"端口 {port} 已被占用。正在尝试使用一个可用的随机端口...") | |
| # 显式设置 server_port=None 来让 Gradio 自动查找可用端口 | |
| launch_kwargs['server_port'] = None | |
| try: | |
| # 再次尝试启动 | |
| self.gradio_interface.launch(**launch_kwargs) | |
| except Exception as final_e: | |
| self.logger.exception(f"尝试使用随机端口后,应用程序启动仍然失败: {str(final_e)}") | |
| sys.exit(1) | |
| else: | |
| self.logger.exception(f"启动时发生未处理的网络错误: {str(e)}") | |
| sys.exit(1) | |
| except KeyboardInterrupt: | |
| self.logger.info("接收到键盘中断信号") | |
| self.shutdown() | |
| except Exception as e: | |
| self.logger.exception(f"应用程序启动失败: {str(e)}") | |
| sys.exit(1) | |
| def shutdown(self): | |
| """关闭应用程序""" | |
| if not self.is_running: | |
| return | |
| self.logger.info("开始关闭应用程序...") | |
| self.is_running = False | |
| try: | |
| # 清理任务管理器 | |
| task_manager = get_task_manager() | |
| # 取消所有待处理的任务 | |
| pending_tasks = task_manager.get_tasks_by_status(TaskStatus.PENDING) | |
| for task in pending_tasks: | |
| try: | |
| loop = asyncio.get_running_loop() | |
| asyncio.create_task(task_manager.cancel_task(task.id)) | |
| except RuntimeError: # No running loop | |
| asyncio.run(task_manager.cancel_task(task.id)) | |
| # 等待正在处理的任务完成(最多等待30秒) | |
| active_tasks = ( | |
| task_manager.get_tasks_by_status(TaskStatus.VALIDATING) + | |
| task_manager.get_tasks_by_status(TaskStatus.UPLOADING) + | |
| task_manager.get_tasks_by_status(TaskStatus.TRANSCRIBING) | |
| ) | |
| if active_tasks: | |
| self.logger.info(f"等待 {len(active_tasks)} 个活跃任务完成...") | |
| # 这里可以添加更复杂的等待逻辑, 但为简单起见, 我们直接继续 | |
| # 清理临时文件 | |
| self.cleanup_temp_files() | |
| self.logger.info("应用程序已安全关闭") | |
| except Exception as e: | |
| self.logger.exception(f"关闭应用程序时发生错误: {str(e)}") | |
| def cleanup_temp_files(self): | |
| """清理临时文件""" | |
| try: | |
| temp_dir = self.config.get_temp_dir() | |
| if temp_dir.exists(): | |
| for file_path in temp_dir.glob("*"): | |
| if file_path.is_file(): | |
| file_path.unlink() | |
| self.logger.info("临时文件清理完成") | |
| except Exception as e: | |
| self.logger.warning(f"清理临时文件失败: {str(e)}") | |
| def get_app_info(self) -> dict: | |
| """获取应用程序信息 | |
| Returns: | |
| 应用程序信息字典 | |
| """ | |
| return { | |
| "name": self.config.app.name, | |
| "version": self.config.app.version, | |
| "environment": self.config.environment, | |
| "debug": self.config.app.debug, | |
| "host": self.config.app.host, | |
| "port": self.config.app.port, | |
| "is_running": self.is_running | |
| } | |
| def create_app(environment: Optional[str] = None) -> TranscriptServiceApp: | |
| """创建应用程序实例 | |
| Args: | |
| environment: 运行环境 | |
| Returns: | |
| 应用程序实例 | |
| """ | |
| return TranscriptServiceApp(environment) | |
| def main(): | |
| """主函数入口""" | |
| import argparse | |
| import os | |
| parser = argparse.ArgumentParser(description="音频转文字服务") | |
| parser.add_argument( | |
| "--env", | |
| choices=["development", "production"], | |
| default=None, # 改为None,从环境变量读取 | |
| help="运行环境" | |
| ) | |
| parser.add_argument( | |
| "--host", | |
| default=None, | |
| help="服务主机地址" | |
| ) | |
| parser.add_argument( | |
| "--port", | |
| type=int, | |
| default=None, | |
| help="服务端口" | |
| ) | |
| parser.add_argument( | |
| "--share", | |
| action="store_true", | |
| help="启用Gradio分享链接" | |
| ) | |
| parser.add_argument( | |
| "--debug", | |
| action="store_true", | |
| help="启用调试模式" | |
| ) | |
| args = parser.parse_args() | |
| # 从环境变量或命令行参数确定运行环境 | |
| environment = args.env or os.getenv('ENVIRONMENT', 'production') | |
| # 创建应用 | |
| app = create_app(environment) | |
| # 准备启动参数 | |
| launch_kwargs = { | |
| 'share': False, # 生产环境禁用share | |
| 'server_name': '0.0.0.0', # Hugging Face Spaces 需要监听所有接口 | |
| 'server_port': 7860 # Hugging Face Spaces 默认端口 | |
| } | |
| # 命令行参数可以覆盖默认值 | |
| if args.host: | |
| launch_kwargs['server_name'] = args.host | |
| if args.port: | |
| launch_kwargs['server_port'] = args.port | |
| if args.share: | |
| launch_kwargs['share'] = True # 如果用户明确要求share | |
| if args.debug: | |
| launch_kwargs['debug'] = True | |
| # 启动应用 | |
| app.run(**launch_kwargs) | |
| if __name__ == "__main__": | |
| main() | |