Workflow-Engine / api /services /audio_service.py
Severian's picture
initial commit
a8b3f00
import io
import logging
from typing import Optional
from werkzeug.datastructures import FileStorage
from core.model_manager import ModelManager
from core.model_runtime.entities.model_entities import ModelType
from models.model import App, AppMode, AppModelConfig, Message
from services.errors.audio import (
AudioTooLargeServiceError,
NoAudioUploadedServiceError,
ProviderNotSupportSpeechToTextServiceError,
ProviderNotSupportTextToSpeechServiceError,
UnsupportedAudioTypeServiceError,
)
FILE_SIZE = 30
FILE_SIZE_LIMIT = FILE_SIZE * 1024 * 1024
ALLOWED_EXTENSIONS = ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm", "amr"]
logger = logging.getLogger(__name__)
class AudioService:
@classmethod
def transcript_asr(cls, app_model: App, file: FileStorage, end_user: Optional[str] = None):
if app_model.mode in {AppMode.ADVANCED_CHAT.value, AppMode.WORKFLOW.value}:
workflow = app_model.workflow
if workflow is None:
raise ValueError("Speech to text is not enabled")
features_dict = workflow.features_dict
if "speech_to_text" not in features_dict or not features_dict["speech_to_text"].get("enabled"):
raise ValueError("Speech to text is not enabled")
else:
app_model_config: AppModelConfig = app_model.app_model_config
if not app_model_config.speech_to_text_dict["enabled"]:
raise ValueError("Speech to text is not enabled")
if file is None:
raise NoAudioUploadedServiceError()
extension = file.mimetype
if extension not in [f"audio/{ext}" for ext in ALLOWED_EXTENSIONS]:
raise UnsupportedAudioTypeServiceError()
file_content = file.read()
file_size = len(file_content)
if file_size > FILE_SIZE_LIMIT:
message = f"Audio size larger than {FILE_SIZE} mb"
raise AudioTooLargeServiceError(message)
model_manager = ModelManager()
model_instance = model_manager.get_default_model_instance(
tenant_id=app_model.tenant_id, model_type=ModelType.SPEECH2TEXT
)
if model_instance is None:
raise ProviderNotSupportSpeechToTextServiceError()
buffer = io.BytesIO(file_content)
buffer.name = "temp.mp3"
return {"text": model_instance.invoke_speech2text(file=buffer, user=end_user)}
@classmethod
def transcript_tts(
cls,
app_model: App,
text: Optional[str] = None,
voice: Optional[str] = None,
end_user: Optional[str] = None,
message_id: Optional[str] = None,
):
from collections.abc import Generator
from flask import Response, stream_with_context
from app import app
from extensions.ext_database import db
def invoke_tts(text_content: str, app_model, voice: Optional[str] = None):
with app.app_context():
if app_model.mode in {AppMode.ADVANCED_CHAT.value, AppMode.WORKFLOW.value}:
workflow = app_model.workflow
if workflow is None:
raise ValueError("TTS is not enabled")
features_dict = workflow.features_dict
if "text_to_speech" not in features_dict or not features_dict["text_to_speech"].get("enabled"):
raise ValueError("TTS is not enabled")
voice = features_dict["text_to_speech"].get("voice") if voice is None else voice
else:
text_to_speech_dict = app_model.app_model_config.text_to_speech_dict
if not text_to_speech_dict.get("enabled"):
raise ValueError("TTS is not enabled")
voice = text_to_speech_dict.get("voice") if voice is None else voice
model_manager = ModelManager()
model_instance = model_manager.get_default_model_instance(
tenant_id=app_model.tenant_id, model_type=ModelType.TTS
)
try:
if not voice:
voices = model_instance.get_tts_voices()
if voices:
voice = voices[0].get("value")
else:
raise ValueError("Sorry, no voice available.")
return model_instance.invoke_tts(
content_text=text_content.strip(), user=end_user, tenant_id=app_model.tenant_id, voice=voice
)
except Exception as e:
raise e
if message_id:
message = db.session.query(Message).filter(Message.id == message_id).first()
if message.answer == "" and message.status == "normal":
return None
else:
response = invoke_tts(message.answer, app_model=app_model, voice=voice)
if isinstance(response, Generator):
return Response(stream_with_context(response), content_type="audio/mpeg")
return response
else:
response = invoke_tts(text, app_model, voice)
if isinstance(response, Generator):
return Response(stream_with_context(response), content_type="audio/mpeg")
return response
@classmethod
def transcript_tts_voices(cls, tenant_id: str, language: str):
model_manager = ModelManager()
model_instance = model_manager.get_default_model_instance(tenant_id=tenant_id, model_type=ModelType.TTS)
if model_instance is None:
raise ProviderNotSupportTextToSpeechServiceError()
try:
return model_instance.get_tts_voices(language)
except Exception as e:
raise e