| | from __future__ import annotations |
| |
|
| | from typing import TYPE_CHECKING, Any |
| |
|
| | from asgiref.sync import async_to_sync |
| | from celery.exceptions import SoftTimeLimitExceeded |
| |
|
| | from langflow.core.celery_app import celery_app |
| |
|
| | if TYPE_CHECKING: |
| | from langflow.graph.vertex.base import Vertex |
| |
|
| |
|
| | @celery_app.task(acks_late=True) |
| | def test_celery(word: str) -> str: |
| | return f"test task return {word}" |
| |
|
| |
|
| | @celery_app.task(bind=True, soft_time_limit=30, max_retries=3) |
| | def build_vertex(self, vertex: Vertex) -> Vertex: |
| | """Build a vertex.""" |
| | try: |
| | vertex.task_id = self.request.id |
| | async_to_sync(vertex.build)() |
| | except SoftTimeLimitExceeded as e: |
| | raise self.retry(exc=SoftTimeLimitExceeded("Task took too long"), countdown=2) from e |
| | return vertex |
| |
|
| |
|
| | @celery_app.task(acks_late=True) |
| | def process_graph_cached_task() -> dict[str, Any]: |
| | msg = "This task is not implemented yet" |
| | raise NotImplementedError(msg) |
| |
|