MoYoYo.tts / api_server /app /services /file_service.py
liumaolin
feat(api): implement local training MVP with adapter pattern
e054d0c
"""
文件管理服务
处理文件上传、下载和管理的业务逻辑
"""
from datetime import datetime
from typing import List, Optional, Tuple
from ..core.adapters import get_database_adapter, get_storage_adapter
from ..models.schemas.file import (
FileMetadata,
FileUploadResponse,
FileListResponse,
FileDeleteResponse,
)
class FileService:
"""
文件管理服务
提供文件的完整生命周期管理:
- 上传文件
- 下载文件
- 获取元数据
- 列出文件
- 删除文件
Example:
>>> service = FileService()
>>> result = await service.upload_file(data, "audio.wav", "audio/wav", "training")
>>> content = await service.download_file(result.file.id)
>>> await service.delete_file(result.file.id)
"""
def __init__(self):
"""初始化服务"""
self._db = None
self._storage = None
@property
def db(self):
"""延迟获取数据库适配器"""
if self._db is None:
self._db = get_database_adapter()
return self._db
@property
def storage(self):
"""延迟获取存储适配器"""
if self._storage is None:
self._storage = get_storage_adapter()
return self._storage
async def upload_file(
self,
file_data: bytes,
filename: str,
content_type: Optional[str] = None,
purpose: str = "training"
) -> FileUploadResponse:
"""
上传文件
Args:
file_data: 文件二进制数据
filename: 原始文件名
content_type: MIME 类型
purpose: 文件用途 (training, reference, output)
Returns:
FileUploadResponse
"""
# 构建元数据
metadata = {
"content_type": content_type,
"purpose": purpose,
"size_bytes": len(file_data),
}
# 上传到存储
file_id = await self.storage.upload_file(file_data, filename, metadata)
# 获取完整元数据(包含音频信息)
full_metadata = await self.storage.get_file_metadata(file_id)
# 保存到数据库
file_record = {
"id": file_id,
"filename": filename,
"content_type": content_type,
"size_bytes": len(file_data),
"purpose": purpose,
"duration_seconds": full_metadata.get("duration_seconds") if full_metadata else None,
"sample_rate": full_metadata.get("sample_rate") if full_metadata else None,
"uploaded_at": datetime.utcnow().isoformat(),
}
await self.db.create_file_record(file_record)
# 构建响应
file_metadata = FileMetadata(
id=file_id,
filename=filename,
content_type=content_type,
size_bytes=len(file_data),
purpose=purpose,
duration_seconds=file_record.get("duration_seconds"),
sample_rate=file_record.get("sample_rate"),
uploaded_at=datetime.utcnow(),
)
return FileUploadResponse(
success=True,
message="文件上传成功",
file=file_metadata,
)
async def download_file(self, file_id: str) -> Optional[Tuple[bytes, str, str]]:
"""
下载文件
Args:
file_id: 文件ID
Returns:
(文件数据, 文件名, 内容类型) 或 None
"""
# 检查文件是否存在
if not await self.storage.file_exists(file_id):
return None
# 获取元数据
metadata = await self.storage.get_file_metadata(file_id)
if not metadata:
return None
# 下载文件
file_data = await self.storage.download_file(file_id)
return (
file_data,
metadata.get("filename", "file"),
metadata.get("content_type", "application/octet-stream"),
)
async def get_file(self, file_id: str) -> Optional[FileMetadata]:
"""
获取文件元数据
Args:
file_id: 文件ID
Returns:
FileMetadata 或 None
"""
# 从数据库获取
record = await self.db.get_file_record(file_id)
if record:
return self._record_to_metadata(record)
# 尝试从存储获取
metadata = await self.storage.get_file_metadata(file_id)
if metadata:
return self._storage_metadata_to_file_metadata(metadata)
return None
async def list_files(
self,
purpose: Optional[str] = None,
limit: int = 50,
offset: int = 0
) -> FileListResponse:
"""
获取文件列表
Args:
purpose: 按用途筛选
limit: 每页数量
offset: 偏移量
Returns:
FileListResponse
"""
# 从数据库获取
records = await self.db.list_file_records(
purpose=purpose, limit=limit, offset=offset
)
total = await self.db.count_file_records(purpose=purpose)
return FileListResponse(
items=[self._record_to_metadata(r) for r in records],
total=total,
limit=limit,
offset=offset,
)
async def delete_file(self, file_id: str) -> FileDeleteResponse:
"""
删除文件
Args:
file_id: 文件ID
Returns:
FileDeleteResponse
"""
# 从存储删除
storage_deleted = await self.storage.delete_file(file_id)
# 从数据库删除
db_deleted = await self.db.delete_file_record(file_id)
if storage_deleted or db_deleted:
return FileDeleteResponse(
success=True,
message="文件删除成功",
file_id=file_id,
)
else:
return FileDeleteResponse(
success=False,
message="文件不存在或已删除",
file_id=file_id,
)
async def file_exists(self, file_id: str) -> bool:
"""
检查文件是否存在
Args:
file_id: 文件ID
Returns:
是否存在
"""
return await self.storage.file_exists(file_id)
def _record_to_metadata(self, record: dict) -> FileMetadata:
"""将数据库记录转换为 FileMetadata"""
uploaded_at = record.get("uploaded_at")
if isinstance(uploaded_at, str):
uploaded_at = datetime.fromisoformat(uploaded_at)
elif uploaded_at is None:
uploaded_at = datetime.utcnow()
return FileMetadata(
id=record["id"],
filename=record["filename"],
content_type=record.get("content_type"),
size_bytes=record.get("size_bytes", 0),
purpose=record.get("purpose", "training"),
duration_seconds=record.get("duration_seconds"),
sample_rate=record.get("sample_rate"),
uploaded_at=uploaded_at,
)
def _storage_metadata_to_file_metadata(self, metadata: dict) -> FileMetadata:
"""将存储元数据转换为 FileMetadata"""
uploaded_at = metadata.get("uploaded_at")
if isinstance(uploaded_at, str):
uploaded_at = datetime.fromisoformat(uploaded_at)
elif uploaded_at is None:
uploaded_at = datetime.utcnow()
return FileMetadata(
id=metadata.get("id", ""),
filename=metadata.get("filename", ""),
content_type=metadata.get("content_type"),
size_bytes=metadata.get("size_bytes", 0),
purpose=metadata.get("purpose", "training"),
duration_seconds=metadata.get("duration_seconds"),
sample_rate=metadata.get("sample_rate"),
uploaded_at=uploaded_at,
)