|
from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom
|
|
from core.app.entities.app_invoke_entities import InvokeFrom
|
|
from core.app.entities.queue_entities import (
|
|
AppQueueEvent,
|
|
MessageQueueMessage,
|
|
QueueAdvancedChatMessageEndEvent,
|
|
QueueErrorEvent,
|
|
QueueMessage,
|
|
QueueMessageEndEvent,
|
|
QueueStopEvent,
|
|
)
|
|
|
|
|
|
class MessageBasedAppQueueManager(AppQueueManager):
|
|
def __init__(self, task_id: str,
|
|
user_id: str,
|
|
invoke_from: InvokeFrom,
|
|
conversation_id: str,
|
|
app_mode: str,
|
|
message_id: str) -> None:
|
|
super().__init__(task_id, user_id, invoke_from)
|
|
|
|
self._conversation_id = str(conversation_id)
|
|
self._app_mode = app_mode
|
|
self._message_id = str(message_id)
|
|
|
|
def construct_queue_message(self, event: AppQueueEvent) -> QueueMessage:
|
|
return MessageQueueMessage(
|
|
task_id=self._task_id,
|
|
message_id=self._message_id,
|
|
conversation_id=self._conversation_id,
|
|
app_mode=self._app_mode,
|
|
event=event
|
|
)
|
|
|
|
def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
|
|
"""
|
|
Publish event to queue
|
|
:param event:
|
|
:param pub_from:
|
|
:return:
|
|
"""
|
|
message = MessageQueueMessage(
|
|
task_id=self._task_id,
|
|
message_id=self._message_id,
|
|
conversation_id=self._conversation_id,
|
|
app_mode=self._app_mode,
|
|
event=event
|
|
)
|
|
|
|
self._q.put(message)
|
|
|
|
if isinstance(event, QueueStopEvent
|
|
| QueueErrorEvent
|
|
| QueueMessageEndEvent
|
|
| QueueAdvancedChatMessageEndEvent):
|
|
self.stop_listen()
|
|
|
|
if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped():
|
|
raise GenerateTaskStoppedException()
|
|
|
|
|