Search_Engine / src /pipelines /flows /embeddings_ingestion_flow.py
IndraneelKumar
Initial search engine commit
266d7bc
import argparse
import asyncio
from datetime import UTC, datetime, timedelta
from dateutil import parser
from prefect import flow, get_client
from prefect.client.schemas.filters import FlowFilter, FlowRunFilter
from prefect.client.schemas.sorting import FlowRunSort
from src.config import settings
from src.pipelines.tasks.ingest_embeddings import ingest_qdrant
from src.utils.logger_util import setup_logging
async def get_last_successful_run(flow_name: str) -> datetime | None:
"""Get the start time of the last successfully completed run for a given flow.
Queries the Prefect API for recent completed runs of the exact flow `flow_name`.
Returns the start time of the most recent completed run, or None if no runs exist.
Args:
flow_name (str): Exact name of the Prefect flow.
Returns:
datetime | None: Start time of the last completed run, or None if no run exists.
Raises:
Exception: If Prefect API calls fail unexpectedly.
"""
logger = setup_logging()
logger.info(f"Looking for last successful run of flow: {flow_name}")
try:
async with get_client() as client:
# Step 1: get flows matching the name
flows = await client.read_flows(flow_filter=FlowFilter(name=dict(eq_=flow_name))) # type: ignore
logger.debug(f"Flows returned by Prefect API: {flows}")
exact_flow = next((f for f in flows if f.name == flow_name), None)
if not exact_flow:
logger.info(f"No flow found with exact name: {flow_name}")
return None
logger.info(f"Exact flow found: {exact_flow.id} ({exact_flow.name})")
# Step 2: get recent completed runs
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(state=dict(type=dict(any_=["COMPLETED"]))), # type: ignore
sort=FlowRunSort.START_TIME_DESC,
limit=10,
)
logger.debug(f"Recent completed runs fetched: {[r.id for r in flow_runs]}")
# Step 3: ensure only runs for this flow
flow_runs = [r for r in flow_runs if r.flow_id == exact_flow.id]
logger.debug(f"Filtered runs for exact flow: {[r.id for r in flow_runs]}")
if not flow_runs:
logger.info(f"No completed runs found for flow: {flow_name}")
return None
last_run_time = flow_runs[0].start_time
logger.info(f"Last completed run for flow '{flow_name}' started at {last_run_time}")
return last_run_time
except Exception as e:
logger.error(f"Error fetching last successful run for flow '{flow_name}': {e}")
raise
@flow(
name="qdrant_ingest_flow",
flow_run_name="qdrant_ingest_flow_run",
description="Orchestrates SQL → Qdrant ingestion",
retries=2,
retry_delay_seconds=120,
)
async def qdrant_ingest_flow(from_date: str | None = None) -> None:
"""Prefect Flow: Orchestrates ingestion of articles from SQL into Qdrant.
Determines the starting cutoff date for ingestion (user-provided, last run date,
or default fallback) and runs the Qdrant ingestion task.
Args:
from_date (str | None, optional): Start date in YYYY-MM-DD format. If None,
falls back to last successful run or the configured default.
Returns:
None
Raises:
RuntimeError: If ingestion fails.
Exception: For unexpected errors during execution.
"""
logger = setup_logging()
rss = settings.rss
try:
if from_date:
# Parse user-provided date and assume UTC midnight
from_date_dt = parser.parse(from_date).replace(tzinfo=UTC)
logger.info(f"Using user-provided from_date: {from_date_dt}")
else:
# Fallback to last run date, default_start_date, or 30 days ago
last_run_date = await get_last_successful_run("qdrant_ingest_flow")
from_date_dt = (
last_run_date
or parser.parse(rss.default_start_date).replace(tzinfo=UTC)
or (datetime.now(UTC) - timedelta(days=30))
)
logger.info(f"Using fallback from_date: {from_date_dt}")
await ingest_qdrant(from_date=from_date_dt)
except Exception as e:
logger.error(f"Error during Qdrant ingestion flow: {e}")
raise RuntimeError("Qdrant ingestion flow failed") from e
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
"--from-date",
type=str,
default=None,
help="From date in YYYY-MM-DD format",
)
args = arg_parser.parse_args()
asyncio.run(qdrant_ingest_flow(from_date=args.from_date)) # type: ignore[arg-type]