Spaces:
Sleeping
Sleeping
| from prefect import flow, unmapped | |
| from src.config import settings | |
| from src.infrastructure.supabase.init_session import init_engine | |
| from src.models.article_models import FeedItem | |
| from src.models.sql_models import FeedArticle | |
| from src.pipelines.tasks.fetch_rss import fetch_rss_entries | |
| from src.pipelines.tasks.ingest_rss import ingest_from_rss | |
| from src.utils.logger_util import setup_logging | |
| def rss_ingest_flow(article_model: type[FeedArticle] = FeedArticle) -> None: | |
| """Fetch and ingest articles from configured RSS feeds concurrently. | |
| Each feed is fetched in parallel and ingested into the database | |
| with error handling at each stage. Ensures the database engine is disposed | |
| after completion. | |
| Args: | |
| article_model (type[FeedArticle]): SQLAlchemy model for storing articles. | |
| Returns: | |
| None | |
| Raises: | |
| RuntimeError: If ingestion fails for all feeds. | |
| Exception: For unexpected errors during execution. | |
| """ | |
| logger = setup_logging() | |
| engine = init_engine() | |
| errors = [] | |
| # tracking counters | |
| per_feed_counts: dict[str, int] = {} | |
| total_ingested = 0 | |
| try: | |
| if not settings.rss.feeds: | |
| logger.warning("No feeds found in configuration.") | |
| return | |
| feeds = [FeedItem(name=f.name, author=f.author, url=f.url) for f in settings.rss.feeds] | |
| logger.info(f"π Processing {len(feeds)} feeds concurrently...") | |
| # 1. Fetch articles concurrently | |
| fetched_articles_futures = fetch_rss_entries.map( | |
| feeds, | |
| engine=unmapped(engine), | |
| article_model=unmapped(article_model), | |
| ) | |
| # 2. Ingest concurrently per feed | |
| results = [] | |
| for feed, fetched_future in zip(feeds, fetched_articles_futures, strict=False): | |
| try: | |
| fetched_articles = fetched_future.result() | |
| except Exception as e: | |
| logger.error(f"β Error fetching articles for feed '{feed.name}': {e}") | |
| errors.append(f"Fetch error: {feed.name}") | |
| continue | |
| if not fetched_articles: | |
| logger.info(f"π No new articles for feed '{feed.name}'") | |
| per_feed_counts[feed.name] = 0 | |
| continue | |
| try: | |
| count = len(fetched_articles) | |
| per_feed_counts[feed.name] = count | |
| total_ingested += count | |
| logger.info(f"β Feed '{feed.name}': {count} articles ready for ingestion") | |
| task_result = ingest_from_rss.submit( | |
| fetched_articles, | |
| feed, | |
| article_model=article_model, | |
| engine=engine, | |
| ) | |
| results.append(task_result) | |
| except Exception as e: | |
| logger.error(f"β Error submitting ingest_from_rss for feed '{feed.name}': {e}") | |
| errors.append(f"Ingest error: {feed.name}") | |
| # 3. Wait for all ingestion tasks | |
| for r in results: | |
| try: | |
| r.result() | |
| except Exception as e: | |
| logger.error(f"β Error in ingest_from_rss task: {e}") | |
| errors.append("Task failure") | |
| # ---- Summary logging ---- | |
| logger.info("π Ingestion Summary per feed:") | |
| for feed_name, count in per_feed_counts.items(): | |
| logger.info(f" β’ {feed_name}: {count} article(s) ingested") | |
| logger.info(f"π Total ingested across all feeds: {total_ingested}") | |
| if errors: | |
| raise RuntimeError(f"Flow completed with errors: {errors}") | |
| except Exception as e: | |
| logger.error(f"π₯ Unexpected error in rss_ingest_flow: {e}") | |
| raise | |
| finally: | |
| engine.dispose() | |
| logger.info("π Database engine disposed.") | |
| if __name__ == "__main__": | |
| rss_ingest_flow(article_model=FeedArticle) | |