Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Path, Query, Header # Add Header import | |
| from pydantic import BaseModel, ConfigDict | |
| import requests | |
| import openai | |
| from duckduckgo_search import DDGS | |
| from googleapiclient.discovery import build | |
| import concurrent.futures | |
| from docling.document_converter import DocumentConverter | |
| import dotenv | |
| import os | |
| import time | |
| import docling | |
| import fastapi | |
| import sys | |
| import pytz | |
| from datetime import datetime | |
| import uvicorn | |
| import logging | |
| # Load environment variables | |
| dotenv.load_dotenv() | |
| SEARCH_ENGINE = os.getenv("SEARCH_ENGINE", "duckduckgo") # Default to DuckDuckGo | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID") | |
| # Configuración de la API de OpenAI | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| api_key = os.getenv("API_KEY") | |
| client = openai.Client() | |
| app = FastAPI( | |
| title="Search & Summary API", | |
| description=""" | |
| This API provides an enhanced search service that combines DuckDuckGo search results with AI-powered summaries. | |
| ## Features | |
| - **Web Search**: Utilizes DuckDuckGo's search engine to find relevant web pages | |
| - **Content Processing**: Extracts and converts web content to markdown format | |
| - **AI Summaries**: Generates intelligent summaries of search results using OpenAI's GPT model | |
| - **Concurrent Processing**: Handles multiple results simultaneously for better performance | |
| ## Main Endpoints | |
| ### /search | |
| The main search endpoint accepts the following parameters: | |
| - `query` (required): The search term or phrase | |
| - `timelimit` (optional): Time range for results ('d' for day, 'w' for week, 'm' for month, 'y' for year) | |
| - `region` (optional): Geographic region for results (e.g., 'us-en', 'es-es') | |
| - `max_results` (optional): Maximum number of results to return (default: 3) | |
| ### /fetch | |
| The fetch endpoint retrieves and summarizes content from a specific URL: | |
| - `url` (required): The URL to fetch and summarize | |
| ## Response Format | |
| The API returns: | |
| - Search results with URLs and snippets | |
| - AI-generated summaries for each result | |
| - Processing time information | |
| """, | |
| version="1.0.0", | |
| contact={ | |
| "name": "API Support", | |
| "email": "your-email@example.com", | |
| }, | |
| license_info={ | |
| "name": "MIT", | |
| } | |
| ) | |
| # Modelos para la respuesta | |
| class SearchResult(BaseModel): | |
| url: str | |
| #url_md: str | |
| url_snippet: str | |
| url_summary: str | |
| input_tokens: int | |
| completion_tokens: int | |
| class SearchResponse(BaseModel): | |
| query: str | |
| results: list[SearchResult] | |
| processing_time: float # tiempo en segundos | |
| timestamp: str | |
| total_input_tokens: int | |
| total_completion_tokens: int | |
| total_url_summaries_ok: int | |
| total_url_summaries_nok: int | |
| class VersionInfo(BaseModel): | |
| python: str = "Python Runtime" | |
| openai: str = "OpenAI Language Model API" | |
| fastapi: str = "API Server Framework" | |
| model_config = ConfigDict( | |
| title="Version Information", | |
| description="Current versions of all major dependencies", | |
| json_schema_extra={ | |
| "example": { | |
| "python": "3.11.4 [CPython] - Python Runtime", | |
| "openai": "1.12.0 - AI Language Model API", | |
| "fastapi": "0.109.0 - API Server Framework" | |
| } | |
| } | |
| ) | |
| class FetchResult(BaseModel): | |
| url: str | |
| url_md: str # Add this field | |
| content_summary: str | |
| processing_time: float | |
| timestamp: str | |
| input_tokens: int | |
| completion_tokens: int | |
| total_url_summaries_ok: int | |
| total_url_summaries_nok: int | |
| class SearchEngineFactory: | |
| def create_search_engine(): | |
| if SEARCH_ENGINE.lower() == "google": | |
| return GoogleSearchEngine() | |
| return DuckDuckGoSearchEngine() | |
| class SearchEngineBase: | |
| async def search(self, query: str, max_results: int, **kwargs) -> list: | |
| pass | |
| class DuckDuckGoSearchEngine(SearchEngineBase): | |
| async def search(self, query: str, max_results: int, timelimit: str = "m", region: str = "us-en") -> list: | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=max_results, timelimit=timelimit, region=region)) | |
| return [ | |
| {"href": r["link"], "body": r["body"]} for r in results | |
| ] if results else [] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"DuckDuckGo search error: {str(e)}") | |
| class GoogleSearchEngine(SearchEngineBase): | |
| def __init__(self): | |
| if not GOOGLE_API_KEY or not GOOGLE_CSE_ID: | |
| raise ValueError("Google API key and Custom Search Engine ID are required") | |
| self.service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY) | |
| def _convert_timelimit(self, timelimit: str) -> str: | |
| """Convert DuckDuckGo timelimit format to Google dateRestrict format""" | |
| if not timelimit: | |
| return None | |
| # Map of time units | |
| time_map = { | |
| 'd': 'd', # days stay as days | |
| 'w': 'w', # weeks stay as weeks | |
| 'm': 'm', # months stay as months | |
| 'y': 'y' # years stay as years | |
| } | |
| # Extract unit and number in correct order | |
| unit = timelimit[0].lower() # First character is the unit | |
| number = timelimit[1:] if len(timelimit) > 1 else '1' # Rest is the number | |
| print(f"unit: {unit}, number: {number}") # Debug print in correct order | |
| if unit not in time_map: | |
| return None | |
| try: | |
| int(number) # Validate number | |
| return f"{time_map[unit]}{number}" | |
| except ValueError: | |
| return None | |
| def _convert_region_to_lang(self, region: str) -> str: | |
| """Convert DuckDuckGo region format to Google Search API language restriction""" | |
| if not region: | |
| return None | |
| # Extract language code from region (e.g., 'us-en' -> 'en') | |
| try: | |
| lang_code = region.split('-')[1].lower() | |
| except IndexError: | |
| return None | |
| # Map of language codes to Google Search API format | |
| lang_map = { | |
| 'ar': 'lang_ar', # Arabic | |
| 'bg': 'lang_bg', # Bulgarian | |
| 'ca': 'lang_ca', # Catalan | |
| 'cs': 'lang_cs', # Czech | |
| 'da': 'lang_da', # Danish | |
| 'de': 'lang_de', # German | |
| 'el': 'lang_el', # Greek | |
| 'en': 'lang_en', # English | |
| 'es': 'lang_es', # Spanish | |
| 'et': 'lang_et', # Estonian | |
| 'fi': 'lang_fi', # Finnish | |
| 'fr': 'lang_fr', # French | |
| 'hr': 'lang_hr', # Croatian | |
| 'hu': 'lang_hu', # Hungarian | |
| 'id': 'lang_id', # Indonesian | |
| 'is': 'lang_is', # Icelandic | |
| 'it': 'lang_it', # Italian | |
| 'iw': 'lang_iw', # Hebrew | |
| 'ja': 'lang_ja', # Japanese | |
| 'ko': 'lang_ko', # Korean | |
| 'lt': 'lang_lt', # Lithuanian | |
| 'lv': 'lang_lv', # Latvian | |
| 'nl': 'lang_nl', # Dutch | |
| 'no': 'lang_no', # Norwegian | |
| 'pl': 'lang_pl', # Polish | |
| 'pt': 'lang_pt', # Portuguese | |
| 'ro': 'lang_ro', # Romanian | |
| 'ru': 'lang_ru', # Russian | |
| 'sk': 'lang_sk', # Slovak | |
| 'sl': 'lang_sl', # Slovenian | |
| 'sr': 'lang_sr', # Serbian | |
| 'sv': 'lang_sv', # Swedish | |
| 'tr': 'lang_tr', # Turkish | |
| 'zh': 'lang_zh-CN' # Default Chinese to Simplified | |
| } | |
| print(f"Converting region {region} to language code {lang_code}") # Debug print | |
| return lang_map.get(lang_code) | |
| async def search(self, query: str, max_results: int, timelimit: str = "m", region: str = "us-en", **kwargs) -> list: | |
| try: | |
| results = [] | |
| date_restrict = self._convert_timelimit(timelimit) | |
| language = self._convert_region_to_lang(region) | |
| for i in range(0, max_results, 10): | |
| search_params = { | |
| 'q': query, | |
| 'cx': GOOGLE_CSE_ID, | |
| 'start': i + 1, | |
| 'num': min(10, max_results - i) | |
| } | |
| if date_restrict: | |
| print(f"Adding dateRestrict: {date_restrict}") | |
| search_params['dateRestrict'] = date_restrict | |
| if language: | |
| print(f"Adding language restriction: {language}") | |
| search_params['lr'] = language | |
| try: | |
| response = self.service.cse().list(**search_params).execute() | |
| if "items" not in response: | |
| print(f"No results found for query: {query}") | |
| continue | |
| results.extend([ | |
| { | |
| "href": item["link"], | |
| "body": item.get("snippet", "") | |
| } for item in response["items"] | |
| ]) | |
| except Exception as search_error: | |
| print(f"Error during Google search: {str(search_error)}") | |
| raise HTTPException(status_code=500, detail=f"Google search error: {str(search_error)}") | |
| if not results: | |
| # Aquí lanzamos explícitamente un HTTPException 404 cuando no hay resultados | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"No results found for query: {query}" | |
| ) | |
| return results[:max_results] | |
| except HTTPException as he: | |
| raise he | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Google search error: {str(e)}") | |
| async def get_version_info(): | |
| python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro} [CPython]" | |
| return VersionInfo( | |
| python=f"{python_version} - Python Runtime", | |
| openai=f"{getattr(openai, '__version__', 'unknown')} - OpenAI Language Model API", | |
| fastapi=f"{fastapi.__version__} - API Server Framework" | |
| ) | |
| async def search( | |
| query: str = Query(..., description="The search query term or phrase"), | |
| timelimit: str = Query("m", description="Time range for results (DuckDuckGo only): 'd' (day), 'w' (week), 'm' (month), 'y' (year)"), | |
| region: str = Query("us-en", description="Geographic region for results (DuckDuckGo only, e.g., 'us-en', 'es-es')"), | |
| max_results: int = Query(3, description="Maximum number of results to return", ge=1, le=10), | |
| authorization: str = Header(..., description="API key for authorization", alias="Auth") | |
| ): | |
| if authorization != api_key: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| start_time = time.time() | |
| madrid_tz = pytz.timezone('Europe/Madrid') | |
| current_time = datetime.now(madrid_tz).strftime('%Y-%m-%d %H:%M:%S %Z') | |
| # Initialize counters | |
| total_input_tokens = 0 | |
| total_completion_tokens = 0 | |
| total_url_summaries_ok = 0 | |
| total_url_summaries_nok = 0 | |
| # Create search engine instance | |
| print(f"Using search engine: {SEARCH_ENGINE}") | |
| search_engine = SearchEngineFactory.create_search_engine() | |
| try: | |
| # Perform search using the selected engine | |
| search_results = await search_engine.search( | |
| query=query, | |
| max_results=max_results, | |
| timelimit=timelimit, | |
| region=region | |
| ) | |
| if not search_results: | |
| return SearchResponse( | |
| query=query, | |
| results=[], | |
| processing_time=round(time.time() - start_time, 2), | |
| timestamp=current_time, | |
| total_input_tokens=0, | |
| total_completion_tokens=0, | |
| total_url_summaries_ok=0, | |
| total_url_summaries_nok=0 | |
| ) | |
| except HTTPException as he: | |
| # Re-raise HTTP exceptions (like 404) without wrapping | |
| raise he | |
| except Exception as e: | |
| # For other errors, check if it's a "no results" case | |
| if "No results found" in str(e): | |
| return SearchResponse( | |
| query=query, | |
| results=[], | |
| processing_time=round(time.time() - start_time, 2), | |
| timestamp=current_time, | |
| total_input_tokens=0, | |
| total_completion_tokens=0, | |
| total_url_summaries_ok=0, | |
| total_url_summaries_nok=0 | |
| ) | |
| raise HTTPException(status_code=500, detail=f"Search error: {str(e)}") | |
| # Instance to convert content with docling | |
| converter = DocumentConverter() | |
| def process_result(result): | |
| nonlocal total_url_summaries_ok, total_url_summaries_nok | |
| url = result.get('href', '') | |
| try: | |
| content = converter.convert(url) | |
| url_md = content.document.export_to_markdown() | |
| except Exception as exc: | |
| print(f"Error converting {url}: {exc}") | |
| url_md = f"content error: {str(exc)}" | |
| # Generate summary from markdown content using OpenAI Chat Completions | |
| prompt = ( | |
| f"OBJECTIVE: " | |
| f"Create a full detailed summary of the provided markdown content, focusing on the topic <{query}>." | |
| f"Your task is to distill this information into a focused summary that emphasizes the aspects related to <{query}>." | |
| "\\n\\INSTRUCTIONS:" | |
| f"Analyze the markdown content and extract the key points related to <{query}>. Your summary should capture the essential details and insights in a clear and verbose manner." | |
| "\\n\\nFormat: Provide the summary as a well-organized three paragraphs in markdown format. " | |
| f"CONTEXt: The markdown content provided below contains detailed information: " | |
| "\\n\\n<content>" | |
| f"{url_md}" | |
| "\\n\\n</content>" | |
| "\\n\\nSummary:" | |
| ) | |
| if url_md.startswith("content error:"): | |
| summary = f"Summary error: Failed to convert URL content - {url_md}" | |
| input_tokens = 0 | |
| completion_tokens = 0 | |
| total_url_summaries_nok += 1 | |
| else: | |
| try: | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "developer", "content": "You are a technology expert."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.3, | |
| max_completion_tokens=1000 | |
| ) | |
| # Get content and remove markdown block markers | |
| summary = response.choices[0].message.content.strip() | |
| summary = summary.replace("```markdown", "").replace("```", "").strip() | |
| input_tokens = response.usage.prompt_tokens | |
| completion_tokens = response.usage.completion_tokens | |
| total_url_summaries_ok += 1 | |
| except Exception as exc: | |
| print(f"Error generating summary for {url}: {exc}") | |
| summary = f"Summary error: Failed to generate summary - {str(exc)}" | |
| input_tokens = 0 | |
| completion_tokens = 0 | |
| total_url_summaries_nok += 1 | |
| return SearchResult( | |
| url=url, | |
| #url_md=url_md, | |
| url_snippet=result.get('body', ''), | |
| url_summary=summary, | |
| input_tokens=input_tokens, | |
| completion_tokens=completion_tokens | |
| ) | |
| # Procesa los resultados de forma concurrente para ir más rápido | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| results = list(executor.map(process_result, search_results)) | |
| processing_time = time.time() - start_time # Cálculo del tiempo total | |
| # Calculate total tokens | |
| total_input_tokens = sum(r.input_tokens for r in results) | |
| total_completion_tokens = sum(r.completion_tokens for r in results) | |
| return SearchResponse( | |
| query=query, | |
| results=results, | |
| processing_time=round(processing_time, 2), | |
| timestamp=current_time, | |
| total_input_tokens=total_input_tokens, | |
| total_completion_tokens=total_completion_tokens, | |
| total_url_summaries_ok=total_url_summaries_ok, | |
| total_url_summaries_nok=total_url_summaries_nok | |
| ) | |
| async def fetch( | |
| url: str = Query(..., description="The URL to crawl and summarize"), | |
| authorization: str = Header(..., description="API key for authorization", alias="Auth") | |
| ): | |
| if authorization != api_key: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| start_time = time.time() | |
| madrid_tz = pytz.timezone('Europe/Madrid') | |
| current_time = datetime.now(madrid_tz).strftime('%Y-%m-%d %H:%M:%S %Z') | |
| total_url_summaries_ok = 0 | |
| total_url_summaries_nok = 0 | |
| # Instance to convert content with docling | |
| converter = DocumentConverter() | |
| try: | |
| content = converter.convert(url) | |
| url_md = content.document.export_to_markdown() | |
| except Exception as exc: | |
| total_url_summaries_nok = 1 | |
| error_msg = f"Error converting URL content: {str(exc)}" | |
| return FetchResult( | |
| url=url, | |
| url_md="", | |
| content_summary=error_msg, | |
| processing_time=round(time.time() - start_time, 2), | |
| timestamp=current_time, | |
| input_tokens=0, | |
| completion_tokens=0, | |
| total_url_summaries_ok=0, | |
| total_url_summaries_nok=1 | |
| ) | |
| # Generate summary from markdown content using OpenAI Chat Completions | |
| prompt = ( | |
| f"OBJECTIVE: Create a comprehensive summary of the provided webpage content." | |
| f"Your task is to distill the information into a focused and detailed summary." | |
| "\n\nINSTRUCTIONS:" | |
| "Analyze the markdown content and extract the key points. Your summary should capture " | |
| "the essential details and insights in a clear and verbose manner." | |
| "\n\nFormat: Provide the summary as a well-organized three paragraphs in markdown format." | |
| f"\n\nCONTEXT: The markdown content provided below contains detailed information: " | |
| "\n\n<content>" | |
| f"{url_md}" | |
| "\n\n</content>" | |
| "\n\nSummary:" | |
| ) | |
| try: | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "developer", "content": "You are a technology expert."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.3, | |
| max_completion_tokens=1000 | |
| ) | |
| summary = response.choices[0].message.content.strip() | |
| summary = summary.replace("```markdown", "").replace("```", "").strip() | |
| input_tokens = response.usage.prompt_tokens | |
| completion_tokens = response.usage.completion_tokens | |
| total_url_summaries_ok = 1 | |
| except Exception as exc: | |
| error_msg = f"Error generating summary: {str(exc)}" | |
| return FetchResult( | |
| url=url, | |
| url_md=url_md, | |
| content_summary=error_msg, | |
| processing_time=round(time.time() - start_time, 2), | |
| timestamp=current_time, | |
| input_tokens=0, | |
| completion_tokens=0, | |
| total_url_summaries_ok=0, | |
| total_url_summaries_nok=1 | |
| ) | |
| processing_time = time.time() - start_time | |
| return FetchResult( | |
| url=url, | |
| url_md=url_md, | |
| content_summary=summary, | |
| processing_time=round(processing_time, 2), | |
| timestamp=current_time, | |
| input_tokens=input_tokens, | |
| completion_tokens=completion_tokens, | |
| total_url_summaries_ok=total_url_summaries_ok, | |
| total_url_summaries_nok=total_url_summaries_nok | |
| ) | |
| if __name__ == "__main__": | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| # Run the server with hot reload for development | |
| uvicorn.run( | |
| "index:app", | |
| host="0.0.0.0", | |
| port=8000, | |
| reload=True, | |
| reload_dirs=["./"], | |
| log_level="info" | |
| ) |