Spaces:
Running
Running
| """ | |
| Jina AI Reader Adapter | |
| Extracts clean, full article content from URLs using Jina AI Reader API. | |
| Removes ads, navigation, boilerplate, and returns markdown-formatted text. | |
| Features: | |
| - Async execution with timeout | |
| - Parallel extraction for multiple URLs | |
| - Graceful fallback to snippets on failure | |
| - No API key required (free tier) | |
| - 71x more content than snippets (14,000 vs 200 chars) | |
| Integration: | |
| - Enhances DuckDuckGo live search results | |
| - Replaces 200-char snippets with full articles | |
| - Improves LLM context quality dramatically | |
| """ | |
| import logging | |
| import asyncio | |
| import httpx | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class JinaReaderAdapter: | |
| """ | |
| Adapter for Jina AI Reader API. | |
| Extracts full article content from URLs to enhance RAG context quality. | |
| """ | |
| def __init__( | |
| self, | |
| timeout: float = 10.0, | |
| max_concurrent: int = 5, | |
| base_url: str = "https://r.jina.ai" | |
| ): | |
| """ | |
| Initialize Jina Reader adapter. | |
| Args: | |
| timeout: Maximum time to wait per article (seconds) | |
| max_concurrent: Maximum parallel extractions | |
| base_url: Jina Reader API base URL | |
| """ | |
| self.base_url = base_url | |
| self.timeout = timeout | |
| self.max_concurrent = max_concurrent | |
| self.client = None | |
| logger.info( | |
| f"Jina Reader initialized: timeout={timeout}s, " | |
| f"max_concurrent={max_concurrent}" | |
| ) | |
| async def _ensure_client(self): | |
| """Lazy initialization of HTTP client with optional API key auth""" | |
| if self.client is None: | |
| headers = { | |
| "User-Agent": "ARKI-AI-RAG/2.4 (Ethiopia News Assistant)", | |
| "Accept": "text/plain, text/markdown", | |
| } | |
| # Add Jina API key if available (required for most sites) | |
| try: | |
| from src.core.config import settings | |
| jina_key = getattr(settings, "JINA_API_KEY", "") | |
| if jina_key and jina_key not in ("", "your-jina-api-key-here"): | |
| headers["Authorization"] = f"Bearer {jina_key}" | |
| logger.info("Jina Reader: using API key authentication") | |
| else: | |
| logger.warning("Jina Reader: no API key set — most sites will return 401. Get free key at https://jina.ai") | |
| except Exception: | |
| pass | |
| self.client = httpx.AsyncClient( | |
| timeout=self.timeout, | |
| follow_redirects=True, | |
| headers=headers | |
| ) | |
| async def extract_article(self, url: str) -> Dict[str, Any]: | |
| """ | |
| Extract clean article content from a single URL. | |
| Args: | |
| url: Article URL to extract | |
| Returns: | |
| Dict with: | |
| - success: bool | |
| - url: str | |
| - title: str (if success) | |
| - content: str (if success) | |
| - length: int (if success) | |
| - error: str (if failure) | |
| """ | |
| await self._ensure_client() | |
| logger.debug(f"Extracting article: {url[:80]}") | |
| try: | |
| # Jina Reader API: https://r.jina.ai/{url} | |
| jina_url = f"{self.base_url}/{url}" | |
| response = await self.client.get(jina_url) | |
| if response.status_code == 200: | |
| content = response.text | |
| # Parse markdown response | |
| lines = content.split('\n') | |
| # Extract title (first line, usually starts with # or Title:) | |
| title = "" | |
| if lines: | |
| first_line = lines[0] | |
| title = ( | |
| first_line | |
| .replace('# ', '') | |
| .replace('Title: ', '') | |
| .strip() | |
| ) | |
| # Extract body (skip title and empty lines) | |
| body_lines = [] | |
| for i, line in enumerate(lines): | |
| if i == 0: # Skip title line | |
| continue | |
| if line.strip(): # Skip empty lines at start | |
| body_lines = lines[i:] | |
| break | |
| body = '\n'.join(body_lines).strip() | |
| # ── Strip boilerplate: navigation, footer, archives ─────────── | |
| # Jina extracts the full page markdown including nav/footer. | |
| # We cut at the first sign of boilerplate to keep only the article. | |
| body = self._strip_boilerplate(body) | |
| # Validate content | |
| if not body or len(body) < 100: | |
| logger.warning( | |
| f"Jina returned insufficient content for {url[:50]} " | |
| f"({len(body)} chars)" | |
| ) | |
| return { | |
| "success": False, | |
| "url": url, | |
| "error": "Insufficient content extracted" | |
| } | |
| logger.info( | |
| f"✅ Jina extracted {len(body):,} chars from {url[:50]}" | |
| ) | |
| return { | |
| "success": True, | |
| "url": url, | |
| "title": title or "Untitled", | |
| "content": body, | |
| "length": len(body), | |
| "extracted_at": datetime.utcnow().isoformat() | |
| } | |
| elif response.status_code == 451: | |
| # 451 Unavailable For Legal Reasons (geo-blocking) | |
| logger.debug(f"Jina: 451 geo-blocked for {url[:50]}") | |
| return { | |
| "success": False, | |
| "url": url, | |
| "error": "Content geo-blocked" | |
| } | |
| elif response.status_code == 404: | |
| logger.debug(f"Jina: 404 not found for {url[:50]}") | |
| return { | |
| "success": False, | |
| "url": url, | |
| "error": "Article not found" | |
| } | |
| else: | |
| logger.debug( | |
| f"Jina returned status {response.status_code} for {url[:50]}" | |
| ) | |
| return { | |
| "success": False, | |
| "url": url, | |
| "error": f"HTTP {response.status_code}" | |
| } | |
| except asyncio.TimeoutError: | |
| logger.debug(f"Jina timeout ({self.timeout}s) for {url[:50]}") | |
| return { | |
| "success": False, | |
| "url": url, | |
| "error": "Extraction timeout" | |
| } | |
| except Exception as e: | |
| logger.debug(f"Jina extraction error for {url[:50]}: {e}") | |
| return { | |
| "success": False, | |
| "url": url, | |
| "error": str(e) | |
| } | |
| async def extract_multiple( | |
| self, | |
| urls: List[str], | |
| max_articles: Optional[int] = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Extract content from multiple URLs in parallel. | |
| Args: | |
| urls: List of article URLs | |
| max_articles: Maximum articles to extract (default: max_concurrent) | |
| Returns: | |
| List of extraction results (same order as input URLs) | |
| """ | |
| if not urls: | |
| return [] | |
| # Limit number of articles | |
| max_articles = max_articles or self.max_concurrent | |
| urls_to_extract = urls[:max_articles] | |
| logger.info( | |
| f"Extracting {len(urls_to_extract)} articles in parallel " | |
| f"(max_concurrent={self.max_concurrent})" | |
| ) | |
| # Create tasks for parallel extraction | |
| tasks = [self.extract_article(url) for url in urls_to_extract] | |
| # Execute with semaphore to limit concurrency | |
| semaphore = asyncio.Semaphore(self.max_concurrent) | |
| async def bounded_extract(task): | |
| async with semaphore: | |
| return await task | |
| results = await asyncio.gather( | |
| *[bounded_extract(task) for task in tasks], | |
| return_exceptions=True | |
| ) | |
| # Handle exceptions | |
| processed_results = [] | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| logger.error(f"Extraction failed for {urls_to_extract[i][:50]}: {result}") | |
| processed_results.append({ | |
| "success": False, | |
| "url": urls_to_extract[i], | |
| "error": str(result) | |
| }) | |
| else: | |
| processed_results.append(result) | |
| # Log summary | |
| successful = sum(1 for r in processed_results if r.get("success")) | |
| total_chars = sum(r.get("length", 0) for r in processed_results if r.get("success")) | |
| logger.info( | |
| f"Jina extraction complete: {successful}/{len(processed_results)} successful, " | |
| f"{total_chars:,} total chars" | |
| ) | |
| return processed_results | |
| async def enhance_search_results( | |
| self, | |
| search_results: List[Dict[str, Any]], | |
| fallback_to_snippet: bool = True | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Enhance search results by replacing snippets with full articles. | |
| Args: | |
| search_results: List of search results with URLs and snippets | |
| fallback_to_snippet: Keep original snippet if extraction fails | |
| Returns: | |
| Enhanced search results with full article content | |
| """ | |
| if not search_results: | |
| return [] | |
| # Extract URLs | |
| urls = [r.get("url") for r in search_results if r.get("url")] | |
| if not urls: | |
| logger.warning("No URLs found in search results") | |
| return search_results | |
| # Extract full articles | |
| extractions = await self.extract_multiple(urls) | |
| # Merge extractions back into search results | |
| enhanced_results = [] | |
| for i, result in enumerate(search_results): | |
| enhanced = dict(result) # Copy original | |
| if i < len(extractions): | |
| extraction = extractions[i] | |
| if extraction.get("success"): | |
| # Replace snippet with full article | |
| enhanced["content"] = extraction["content"] | |
| enhanced["full_article"] = True | |
| enhanced["content_length"] = extraction["length"] | |
| enhanced["jina_title"] = extraction.get("title") | |
| enhanced["extracted_at"] = extraction.get("extracted_at") | |
| logger.debug( | |
| f"Enhanced result {i+1}: {extraction['length']:,} chars " | |
| f"(was {len(result.get('content', ''))}) chars" | |
| ) | |
| else: | |
| # Extraction failed | |
| enhanced["full_article"] = False | |
| enhanced["jina_error"] = extraction.get("error") | |
| if not fallback_to_snippet: | |
| # Remove result if fallback disabled | |
| logger.debug( | |
| f"Skipping result {i+1}: Jina failed and fallback disabled" | |
| ) | |
| continue | |
| else: | |
| logger.debug( | |
| f"Keeping snippet for result {i+1}: {extraction.get('error')}" | |
| ) | |
| enhanced_results.append(enhanced) | |
| # Log enhancement summary | |
| full_articles = sum(1 for r in enhanced_results if r.get("full_article")) | |
| snippets = len(enhanced_results) - full_articles | |
| logger.info( | |
| f"Enhanced {len(enhanced_results)} results: " | |
| f"{full_articles} full articles, {snippets} snippets" | |
| ) | |
| return enhanced_results | |
| async def close(self): | |
| """Close HTTP client""" | |
| if self.client: | |
| await self.client.aclose() | |
| self.client = None | |
| logger.debug("Jina Reader client closed") | |
| def _strip_boilerplate(self, content: str, max_chars: int = 8000) -> str: | |
| """ | |
| Strip navigation, footer, archives and other boilerplate from | |
| Jina-extracted markdown. Keeps only the article body. | |
| Strategy: | |
| 1. Cut at common boilerplate section markers | |
| 2. Hard cap at max_chars to avoid sending 176K chars to the LLM | |
| """ | |
| import re | |
| # Markers that indicate end of article content | |
| # Everything after these is navigation/footer/boilerplate | |
| CUTOFF_PATTERNS = [ | |
| r'\n## (Post navigation|Archives|Categories|Recent Posts|Search|Newsletter|Socials|Tags|Related)', | |
| r'\n### (Post navigation|Archives|Categories|Recent Posts|Related)', | |
| r'\n\* \[Home\]\(', # Navigation list starting with Home | |
| r'\n\* \[Facebook\]\(', # Social links | |
| r'\nCopyright ©', | |
| r'\n---\n.*\n---', # Horizontal rules often mark footer | |
| r'\nShare on (Facebook|Twitter|X|LinkedIn)', | |
| r'\n## Search\n', | |
| r'\n## Newsletter\n', | |
| r'\n## Socials\n', | |
| ] | |
| for pattern in CUTOFF_PATTERNS: | |
| match = re.search(pattern, content, re.IGNORECASE) | |
| if match: | |
| content = content[:match.start()].strip() | |
| break | |
| # Hard cap — LLM context window protection | |
| if len(content) > max_chars: | |
| # Try to cut at a paragraph boundary | |
| cutoff = content[:max_chars].rfind('\n\n') | |
| if cutoff > max_chars * 0.7: | |
| content = content[:cutoff].strip() | |
| else: | |
| content = content[:max_chars].strip() | |
| return content | |
| def is_available(self) -> bool: | |
| """Check if Jina Reader is available""" | |
| # Jina Reader is always available (no API key required) | |
| return True | |
| # Module-level singleton for easy import | |
| _default_adapter = None | |
| def get_jina_reader_adapter( | |
| timeout: float = 10.0, | |
| max_concurrent: int = 5 | |
| ) -> JinaReaderAdapter: | |
| """ | |
| Get or create the default Jina Reader adapter instance. | |
| Args: | |
| timeout: Extraction timeout in seconds | |
| max_concurrent: Maximum parallel extractions | |
| Returns: | |
| JinaReaderAdapter instance | |
| """ | |
| global _default_adapter | |
| if _default_adapter is None: | |
| _default_adapter = JinaReaderAdapter( | |
| timeout=timeout, | |
| max_concurrent=max_concurrent | |
| ) | |
| return _default_adapter | |