|
from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom
|
|
from core.app.entities.app_invoke_entities import InvokeFrom
|
|
from core.app.entities.queue_entities import (
|
|
AppQueueEvent,
|
|
QueueErrorEvent,
|
|
QueueMessageEndEvent,
|
|
QueueStopEvent,
|
|
QueueWorkflowFailedEvent,
|
|
QueueWorkflowSucceededEvent,
|
|
WorkflowQueueMessage,
|
|
)
|
|
|
|
|
|
class WorkflowAppQueueManager(AppQueueManager):
|
|
def __init__(self, task_id: str,
|
|
user_id: str,
|
|
invoke_from: InvokeFrom,
|
|
app_mode: str) -> None:
|
|
super().__init__(task_id, user_id, invoke_from)
|
|
|
|
self._app_mode = app_mode
|
|
|
|
def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
|
|
"""
|
|
Publish event to queue
|
|
:param event:
|
|
:param pub_from:
|
|
:return:
|
|
"""
|
|
message = WorkflowQueueMessage(
|
|
task_id=self._task_id,
|
|
app_mode=self._app_mode,
|
|
event=event
|
|
)
|
|
|
|
self._q.put(message)
|
|
|
|
if isinstance(event, QueueStopEvent
|
|
| QueueErrorEvent
|
|
| QueueMessageEndEvent
|
|
| QueueWorkflowSucceededEvent
|
|
| QueueWorkflowFailedEvent):
|
|
self.stop_listen()
|
|
|
|
if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped():
|
|
raise GenerateTaskStoppedException()
|
|
|