PCNUSMSE's picture
Upload app.py with huggingface_hub
9b2a62d verified
"""音频转文字服务主应用程序
基于Gradio的音频转文字Web服务应用程序入口。
"""
import asyncio
import sys
import signal
import time
from pathlib import Path
from typing import Optional
# 添加项目根目录到Python路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
# 加载环境变量
from dotenv import load_dotenv
load_dotenv(project_root / ".env")
from src.core.config import get_config, reload_config
from src.utils.logger import get_logger
from src.api.gradio_interface import get_gradio_interface
from src.core.task_manager import get_task_manager, TaskStatus
class TranscriptServiceApp:
"""音频转文字服务应用程序"""
def __init__(self, environment: Optional[str] = None):
"""初始化应用程序
Args:
environment: 运行环境 (development/production)
"""
# 加载配置
if environment:
self.config = reload_config(environment)
else:
self.config = get_config()
# 初始化日志
self.logger = get_logger("transcript_service.app")
# 初始化界面
self.gradio_interface = get_gradio_interface()
# 添加健康检查端点
self._setup_health_endpoint()
# 运行状态
self.is_running = False
self.logger.info(f"应用程序初始化完成 - 环境: {self.config.environment}")
def _setup_health_endpoint(self):
"""设置健康检查端点"""
try:
import gradio as gr
def health_check():
"""健康检查函数"""
import json
import time
health_data = {
"status": "healthy",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"environment": self.config.environment,
"version": self.config.app.version,
"uptime": time.time() - getattr(self, '_start_time', time.time()),
"services": {
"oss": self._check_oss_connection(),
"dashscope": self._check_dashscope_connection()
}
}
return json.dumps(health_data, indent=2, ensure_ascii=False)
# 在Gradio应用中添加健康检查端点
if hasattr(self.gradio_interface, 'app'):
from fastapi.responses import JSONResponse
@self.gradio_interface.app.get("/health")
async def health_endpoint():
health_data = {
"status": "healthy",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"environment": self.config.environment,
"version": self.config.app.version,
"uptime": time.time() - getattr(self, '_start_time', time.time()),
"services": {
"oss": self._check_oss_connection(),
"dashscope": self._check_dashscope_connection()
}
}
return JSONResponse(content=health_data)
except Exception as e:
self.logger.warning(f"设置健康检查端点失败: {e}")
def _check_oss_connection(self) -> bool:
"""检查OSS连接"""
try:
if not (self.config.oss.access_key_id and self.config.oss.access_key_secret):
return False
import oss2
auth = oss2.Auth(self.config.oss.access_key_id, self.config.oss.access_key_secret)
service = oss2.Service(auth, "https://oss-cn-beijing.aliyuncs.com")
# 简单的连接测试
list(service.list_buckets(max_keys=1))
return True
except Exception:
return False
def _check_dashscope_connection(self) -> bool:
"""检查DashScope连接"""
try:
if not self.config.dashscope.api_key:
return False
# 简单的API key格式检查
return self.config.dashscope.api_key.startswith("sk-")
except Exception:
return False
def setup_signal_handlers(self):
"""设置信号处理器"""
# 移除优雅关闭功能,允许应用直接终止
pass
def validate_environment(self) -> bool:
"""验证运行环境
Returns:
环境是否有效
"""
try:
# 检查必要的环境变量
missing_vars = []
if not self.config.oss.access_key_id:
missing_vars.append("OSS_ACCESS_KEY_ID")
if not self.config.oss.access_key_secret:
missing_vars.append("OSS_ACCESS_KEY_SECRET")
if not self.config.dashscope.api_key:
missing_vars.append("DASHSCOPE_API_KEY")
if missing_vars:
self.logger.error(f"缺少必要的环境变量: {missing_vars}")
return False
# 检查目录权限
logs_dir = self.config.get_logs_dir()
temp_dir = self.config.get_temp_dir()
for directory in [logs_dir, temp_dir]:
if not directory.exists():
directory.mkdir(parents=True, exist_ok=True)
# 测试写权限
test_file = directory / ".write_test"
try:
test_file.write_text("test")
test_file.unlink()
except Exception as e:
self.logger.error(f"目录权限检查失败 {directory}: {str(e)}")
return False
self.logger.info("环境验证通过")
return True
except Exception as e:
self.logger.exception(f"环境验证失败: {str(e)}")
return False
def run(self, **launch_kwargs):
"""启动应用程序
Args:
**launch_kwargs: Gradio启动参数
"""
try:
# 设置信号处理器
self.setup_signal_handlers()
# 验证环境
if not self.validate_environment():
self.logger.error("环境验证失败,应用程序无法启动")
sys.exit(1)
# 启动应用
self.is_running = True
self._start_time = time.time() # 记录启动时间
self.logger.info("正在启动音频转文字服务...")
# 启动Gradio界面
self.gradio_interface.launch(**launch_kwargs)
except OSError as e:
if "address already in use" in str(e).lower():
port = launch_kwargs.get('server_port', self.config.app.port)
self.logger.warning(f"端口 {port} 已被占用。正在尝试使用一个可用的随机端口...")
# 显式设置 server_port=None 来让 Gradio 自动查找可用端口
launch_kwargs['server_port'] = None
try:
# 再次尝试启动
self.gradio_interface.launch(**launch_kwargs)
except Exception as final_e:
self.logger.exception(f"尝试使用随机端口后,应用程序启动仍然失败: {str(final_e)}")
sys.exit(1)
else:
self.logger.exception(f"启动时发生未处理的网络错误: {str(e)}")
sys.exit(1)
except KeyboardInterrupt:
self.logger.info("接收到键盘中断信号")
self.shutdown()
except Exception as e:
self.logger.exception(f"应用程序启动失败: {str(e)}")
sys.exit(1)
def shutdown(self):
"""关闭应用程序"""
if not self.is_running:
return
self.logger.info("开始关闭应用程序...")
self.is_running = False
try:
# 清理任务管理器
task_manager = get_task_manager()
# 取消所有待处理的任务
pending_tasks = task_manager.get_tasks_by_status(TaskStatus.PENDING)
for task in pending_tasks:
try:
loop = asyncio.get_running_loop()
asyncio.create_task(task_manager.cancel_task(task.id))
except RuntimeError: # No running loop
asyncio.run(task_manager.cancel_task(task.id))
# 等待正在处理的任务完成(最多等待30秒)
active_tasks = (
task_manager.get_tasks_by_status(TaskStatus.VALIDATING) +
task_manager.get_tasks_by_status(TaskStatus.UPLOADING) +
task_manager.get_tasks_by_status(TaskStatus.TRANSCRIBING)
)
if active_tasks:
self.logger.info(f"等待 {len(active_tasks)} 个活跃任务完成...")
# 这里可以添加更复杂的等待逻辑, 但为简单起见, 我们直接继续
# 清理临时文件
self.cleanup_temp_files()
self.logger.info("应用程序已安全关闭")
except Exception as e:
self.logger.exception(f"关闭应用程序时发生错误: {str(e)}")
def cleanup_temp_files(self):
"""清理临时文件"""
try:
temp_dir = self.config.get_temp_dir()
if temp_dir.exists():
for file_path in temp_dir.glob("*"):
if file_path.is_file():
file_path.unlink()
self.logger.info("临时文件清理完成")
except Exception as e:
self.logger.warning(f"清理临时文件失败: {str(e)}")
def get_app_info(self) -> dict:
"""获取应用程序信息
Returns:
应用程序信息字典
"""
return {
"name": self.config.app.name,
"version": self.config.app.version,
"environment": self.config.environment,
"debug": self.config.app.debug,
"host": self.config.app.host,
"port": self.config.app.port,
"is_running": self.is_running
}
def create_app(environment: Optional[str] = None) -> TranscriptServiceApp:
"""创建应用程序实例
Args:
environment: 运行环境
Returns:
应用程序实例
"""
return TranscriptServiceApp(environment)
def main():
"""主函数入口"""
import argparse
import os
parser = argparse.ArgumentParser(description="音频转文字服务")
parser.add_argument(
"--env",
choices=["development", "production"],
default=None, # 改为None,从环境变量读取
help="运行环境"
)
parser.add_argument(
"--host",
default=None,
help="服务主机地址"
)
parser.add_argument(
"--port",
type=int,
default=None,
help="服务端口"
)
parser.add_argument(
"--share",
action="store_true",
help="启用Gradio分享链接"
)
parser.add_argument(
"--debug",
action="store_true",
help="启用调试模式"
)
args = parser.parse_args()
# 从环境变量或命令行参数确定运行环境
environment = args.env or os.getenv('ENVIRONMENT', 'production')
# 创建应用
app = create_app(environment)
# 准备启动参数
launch_kwargs = {
'share': False, # 生产环境禁用share
'server_name': '0.0.0.0', # Hugging Face Spaces 需要监听所有接口
'server_port': 7860 # Hugging Face Spaces 默认端口
}
# 命令行参数可以覆盖默认值
if args.host:
launch_kwargs['server_name'] = args.host
if args.port:
launch_kwargs['server_port'] = args.port
if args.share:
launch_kwargs['share'] = True # 如果用户明确要求share
if args.debug:
launch_kwargs['debug'] = True
# 启动应用
app.run(**launch_kwargs)
if __name__ == "__main__":
main()