| |
| import asyncio |
| import json |
| from typing import List, Dict, Any |
|
|
| import aiohttp |
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.responses import StreamingResponse, JSONResponse |
| from pydantic import BaseModel |
| from starlette.middleware.cors import CORSMiddleware |
| import asyncio |
| import aiohttp |
| import urllib.parse |
| from bs4 import BeautifulSoup as bs |
| import json |
| import random |
| import html |
| from bs4 import NavigableString, Tag |
| from rich import print |
|
|
| class BingScraper: |
| |
| USER_AGENTS = [ |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36", |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15", |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:104.0) Gecko/20100101 Firefox/104.0", |
| "Mozilla/5.0 (Linux; Android 10; SM-G975F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Mobile Safari/537.36" |
| ] |
|
|
| |
| PROXIES = [ |
| |
| |
| ] |
|
|
| def __init__(self): |
| pass |
|
|
| @classmethod |
| def get_random_headers(cls): |
| """Return HTTP headers with a random User-Agent.""" |
| return { |
| "User-Agent": random.choice(cls.USER_AGENTS), |
| "Accept": "text/html,application/xhtml+xml,application/xml;""q=0.9,image/avif,image/webp,*/*;q=0.8","Accept-Language": "en-GB,en-US;q=0.9,en;q=0.8","Cache-Control": "no-cache","Pragma": "no-cache","Cookie": "MUID=12164D9908FD6B8C103F5871090F6AAA; _EDGE_V=1; MUIDB=12164D9908FD6B8C103F5871090F6AAA; SRCHD=AF=NOFORM; SRCHUID=V=2&GUID=26D35EBC20D44E418642BA10D7C14F42&dmnchg=1; ak_bmsc=17AB29B1FF4DE4D83B080A7E5CD24350~000000000000000000000000000000~YAAQNtjIFzUD3a2WAQAAnKvFuRtoko96GJe/vh01R588ZQYnYquFWtB0CzXeN5JGXZWgz7CwJqtckHuj3Z70qUhOcji4vkzhCMc/u91gnAIA0zCu7FcDeEJQgRx6n9MxhjrDAel2IIezGUgh+5ktFvDgUIO05s06PqDAtIUzuc9yTrbdAJi3iZvxFFKdGnbQkJ5krI9w3auWhY6i7JvcUPiDsbzzv0Chj1MxzRT1zdkP1B/JFtz+s5d8rUfagFpQporeRG/9gdid4qUPWvPHD6k98AdCTBYOysMHH2z9ErrD5PCO2mLK/RPrJSoqqN4d2mtnWeHNeF897PioJk0nOJw/IrseF0EgdsscKs7NVg/e3Mp27FTEIBduBRa93vvaabLMxg38; _UR=QS=0&TQS=0&Pn=0; BFBUSR=BFBHP=0; _HPVN=CS=eyJQbiI6eyJDbiI6MSwiU3QiOjAsIlFzIjowLCJQcm9kIjoiUCJ9LCJTYyI6eyJDbiI6MSwiU3QiOjAsIlFzIjowLCJQcm9kIjoiSCJ9LCJReiI6eyJDbiI6MSwiU3QiOjAsIlFzIjowLCJQcm9kIjoiVCJ9LCJBcCI6dHJ1ZSwiTXV0ZSI6dHJ1ZSwiTGFkIjoiMjAyNS0wNS0xMFQwMDowMDowMFoiLCJJb3RkIjowLCJHd2IiOjAsIlRucyI6MCwiRGZ0IjpudWxsLCJNdnMiOjAsIkZsdCI6MCwiSW1wIjoyLCJUb2JuIjowfQ==; _Rwho=u=d&ts=2025-05-10; ipv6=hit=1746877082438&t=4; _EDGE_S=F=1&SID=159E288249896C4D3A4D3D6A487B6D96&mkt=en-in; USRLOC=HS=1&ELOC=LAT=28.648426055908203|LON=77.1643295288086|N=Delhi%2C%20Delhi|ELT=1|; SRCHUSR=DOB=20250510&DS=1; MMCASM=ID=94A07BAF860D449DA2A4ABC8CAAC2538; _RwBf=r=0&ilt=9&ihpd=1&ispd=1&rc=24&rb=0&rg=200&pc=21&mtu=0&rbb=0&clo=0&v=9&l=2025-05-10T07:00:00.0000000Z&lft=0001-01-01T00:00:00.0000000&aof=0&ard=0001-01-01T00:00:00.0000000&rwdbt=0&rwflt=0&rwaul2=0&g=&o=2&p=&c=&t=0&s=0001-01-01T00:00:00.0000000+00:00&ts=2025-05-10T11:00:39.6623875+00:00&rwred=0&wls=&wlb=&wle=&ccp=&cpt=&lka=0&lkt=0&aad=0&TH=&cid=0&gb=; _SS=SID=159E288249896C4D3A4D3D6A487B6D96&R=24&RB=0&GB=0&RG=200&RP=21; SRCHHPGUSR=SRCHLANG=en&IG=00DEBA4A2B754BC983011302535541CA&PV=10.0.0&DM=1&BRW=S&BRH=M&CW=1094&CH=738&SCW=1177&SCH=2902&DPR=1.3&UTC=330&WTS=63882470261&PRVCW=1094&PRVCH=738&EXLTT=1&HV=1746874838&HVE=CfDJ8GtUudZcSi1Enm88WwQKtCf-s_3m7rtBIR85jW2Uv01W2IjDUasKRdncp2MkJ7Bl7PxVetzuZETt8bkyd54iRcMP8SVUsClaL2I5uvRiGiSldOKFjy7i69jYPS-egJOhCGf717H5WHFvCI4UwespMZgxZkdo8SVoBwOlx_yQKpA2qtpqV7t6wYd7etwY1FpUaA&BCML=0&BCSRLANG=&ADLT=OFF" |
|
|
| } |
|
|
|
|
| @classmethod |
| def get_random_proxy(cls): |
| """Return a random proxy if available.""" |
| if cls.PROXIES: |
| return random.choice(cls.PROXIES) |
| return None |
|
|
| async def fetch_html(self, url, session): |
| """Fetch HTML content asynchronously with minimal overhead.""" |
| headers = self.get_random_headers() |
| proxy = self.get_random_proxy() |
| try: |
| async with session.get(url, headers=headers, proxy=proxy, timeout=30) as response: |
| if response.status != 200: |
| print(f"Failed to fetch {url}: HTTP {response.status}") |
| return None |
| return await response.text() |
| except Exception as e: |
| print(f"Error fetching {url}: {e}") |
| return None |
|
|
| |
| async def fetch_bing_text(self, query, session): |
| """Fetch Bing search results HTML for the given query.""" |
| encoded_query = urllib.parse.quote_plus(query) |
| url = f'https://www.bing.com/search?q={encoded_query}' |
| return await self.fetch_html(url, session) |
|
|
| def parse_text_results(self, html): |
| """ |
| Parse the Bing HTML using BeautifulSoup to extract: |
| - icon (if available) |
| - URL |
| - Title |
| - Abstract (or fallback text) |
| - Additional columns (if available) |
| """ |
| soup = bs(html, 'lxml') |
| results = [] |
| result_no = 0 |
| main_containers = soup.find_all(id="b_results") |
| for container in main_containers: |
| list_results = container.find_all(class_='b_algo') |
| for cont in list_results: |
| result_no += 1 |
| try: |
| icon = 'https:' + cont.find_all(class_='rms_iac')[0].get('data-src') |
| except Exception: |
| icon = None |
| try: |
| URL = cont.find_all(class_='tilk')[0].get('href') |
| except Exception: |
| continue |
| try: |
| Title = cont.find_all('h2')[0].get_text(strip=True) |
| except Exception: |
| Title = "No Title" |
| try: |
| abstract_elem = cont.find_all(class_='b_caption') |
| if abstract_elem and abstract_elem[0].get_text(strip=True): |
| Abstract = abstract_elem[0].get_text(strip=True) |
| else: |
| Abstract = cont.find_all(class_='b_algoSlug')[0].get_text(strip=True) |
| except Exception: |
| Abstract = "No Abstract" |
| other = [] |
| try: |
| for column in cont.find_all(class_='b_rc_gb_sub_column'): |
| for div in column.find_all('div'): |
| try: |
| sub_title = div.find_all(class_='b_rc_gb_sub_title')[0].get_text(strip=True) |
| except Exception: |
| sub_title = "No Sub-title" |
| try: |
| sub_description = div.find_all(class_='b_rc_gb_text_wrapper')[0].get_text(strip=True) |
| except Exception: |
| sub_description = "" |
| if sub_description: |
| other.append({'Title': sub_title, 'Description': sub_description}) |
| except Exception: |
| other = None |
| results.append({ |
| 'no': result_no, |
| 'icon': icon, |
| 'URL': URL, |
| 'Title': Title, |
| 'Abstract': Abstract, |
| 'other': other |
| }) |
| return results |
|
|
| async def search_text(self, query, session): |
| """Perform a text search (regular and news) and return parsed results.""" |
| real_query = query |
| news_query = query + " news" |
| |
| real_html_task = asyncio.create_task(self.fetch_bing_text(real_query, session)) |
| news_html_task = asyncio.create_task(self.fetch_bing_text(news_query, session)) |
| real_html, news_html = await asyncio.gather(real_html_task, news_html_task) |
| if not real_html or not news_html: |
| print("Failed to retrieve one or both text searches.") |
| return None |
| real_results = self.parse_text_results(real_html) |
| news_results = self.parse_text_results(news_html) |
| return { |
| "query_results": real_results, |
| "news_results": news_results |
| } |
|
|
| |
| async def fetch_bing_video(self, query, session): |
| """ |
| Fetch Bing video search results HTML for the given query. |
| The URL below targets Bing's video search endpoint. |
| """ |
| encoded_query = urllib.parse.quote_plus(query) |
| url = f'https://www.bing.com/videos/search?q={encoded_query}&adlt=off' |
| return await self.fetch_html(url, session) |
|
|
| def parse_video_results(self, html_content): |
| """ |
| Parse the Bing video search HTML to extract: |
| - Video URL (from the mmeta JSON) |
| - Title (from an element with class "mc_vtvc_title") |
| - Thumbnail URL (from the mmeta JSON) |
| - Duration (if available) |
| - Source (if available) |
| """ |
| soup = bs(html_content, 'lxml') |
| video_results = [] |
| result_no = 0 |
|
|
| |
| video_divs = soup.find_all("div", class_="mc_vtvc", attrs={"mmeta": True}) |
| for div in video_divs: |
| result_no += 1 |
| mmeta_raw = div.get("mmeta") |
| try: |
| |
| mmeta_json = json.loads(html.unescape(mmeta_raw)) |
| except Exception as e: |
| mmeta_json = {} |
| print(f"Error parsing mmeta JSON: {e}") |
| |
| video_url = mmeta_json.get("murl") or mmeta_json.get("pgurl") |
| thumbnail = mmeta_json.get("turl") |
| |
| title_elem = div.find(class_="mc_vtvc_title") |
| title = title_elem.get_text(strip=True) if title_elem else "No Title" |
| |
| duration_elem = div.find(class_="mc_bc_rc") |
| duration = duration_elem.get_text(strip=True) if duration_elem else None |
| |
| source_elem = div.find(class_="mc_vtvc_meta_source") |
| source = source_elem.get_text(strip=True) if source_elem else None |
|
|
| video_results.append({ |
| "no": result_no, |
| "Video URL": video_url, |
| "Title": title, |
| "Thumbnail": thumbnail, |
| "Duration": duration, |
| "Source": source, |
| "Meta": mmeta_json |
| }) |
| return video_results |
|
|
| async def search_video(self, query, session): |
| """Perform a video search and return parsed results.""" |
| html_content = await self.fetch_bing_video(query, session) |
| if not html_content: |
| print("Failed to retrieve video search results.") |
| return None |
| video_results = self.parse_video_results(html_content) |
| return { |
| "query": query, |
| "video_results": video_results |
| } |
|
|
| |
| async def fetch_bing_image(self, query, session): |
| """Fetch Bing images search results HTML for the given query.""" |
| encoded_query = urllib.parse.quote_plus(query) |
| |
| url = f'https://www.bing.com/images/search?q={encoded_query}' |
| return await self.fetch_html(url, session) |
|
|
| def parse_image_results(self, html): |
| """ |
| Parse the Bing images HTML using BeautifulSoup to extract detailed image data. |
| For each image result (<li> inside a <ul class="dgControl_list">), extract: |
| - JSON data from the 'm' attribute in <a class="iusc"> |
| - Image resolution from the <span class="nowrap"> tag inside <div class="img_info hon"> |
| - The hosting domain from the <div class="lnkw"> tag. |
| """ |
| soup = bs(html, 'lxml') |
| image_results = [] |
| |
| ul_lists = soup.find_all("ul", class_="dgControl_list") |
| for ul in ul_lists: |
| li_elements = ul.find_all("li") |
| for li in li_elements: |
| image_data = {} |
| try: |
| |
| a_tag = li.find("a", class_="iusc") |
| if not a_tag: |
| continue |
| m_json_str = a_tag.get("m") |
| if not m_json_str: |
| continue |
| try: |
| |
| details = json.loads(m_json_str) |
| except Exception as e: |
| print(f"JSON parsing error: {e}") |
| continue |
| |
| image_data.update(details) |
| |
| |
| img_info = li.find("div", class_="img_info") |
| if img_info: |
| resolution_span = img_info.find("span", class_="nowrap") |
| if resolution_span: |
| image_data["resolution"] = resolution_span.get_text(strip=True) |
| |
| lnkw_div = li.find("div", class_="lnkw") |
| if lnkw_div: |
| domain_a = lnkw_div.find("a") |
| if domain_a: |
| image_data["domain"] = domain_a.get_text(strip=True) |
| image_results.append(image_data) |
| except Exception as e: |
| print(f"Error parsing an image result: {e}") |
| continue |
| return image_results |
|
|
| async def search_image(self, query, session): |
| """Perform an image search and return parsed results.""" |
| image_html = await self.fetch_bing_image(query, session) |
| if not image_html: |
| print("Failed to retrieve image search results.") |
| return None |
| image_results = self.parse_image_results(image_html) |
| return { |
| "query": query, |
| "image_results": image_results |
| } |
|
|
| |
| @staticmethod |
| def format_anchor(tag): |
| """ |
| Format an <a> tag into the desired output format: |
| (Link Text)[Full_URL] |
| """ |
| href = tag.get("href", "") |
| if href.startswith("/") or href.startswith("#"): |
| href = "https://en.wikipedia.org" + href |
| text = tag.get_text(strip=True) |
| return f"({text})[{href}]" |
|
|
| @classmethod |
| def extract_text(cls, element): |
| """ |
| Recursively extract text from an element. |
| - For <a> tags, return the formatted version and do not process its children. |
| - For other tags, process only their immediate children. |
| """ |
| if isinstance(element, NavigableString): |
| return element.strip() |
| if isinstance(element, Tag): |
| if element.name in ["style", "script", "noscript"]: |
| return "" |
| if element.name == "a": |
| return cls.format_anchor(element) |
| texts = [] |
| for child in element.children: |
| t = cls.extract_text(child) |
| if t: |
| texts.append(t) |
| return " ".join(texts).strip() |
| return "" |
|
|
| @classmethod |
| def process_infobox(cls, table): |
| """ |
| Process the infobox table row by row using recursive extraction. |
| Returns a list of strings where each line represents a row. |
| """ |
| lines = [] |
| for row in table.find_all("tr"): |
| header_cell = row.find("th") |
| data_cell = row.find("td") |
| header_text = cls.extract_text(header_cell) if header_cell else "" |
| data_text = cls.extract_text(data_cell) if data_cell else "" |
| if header_text and data_text: |
| lines.append(f"{header_text}: {data_text}") |
| elif header_text: |
| lines.append(header_text) |
| elif data_text: |
| lines.append(data_text) |
| return lines |
|
|
| @classmethod |
| def process_paragraph(cls, p_tag): |
| """ |
| Process the paragraph (<p> tag) using recursive extraction. |
| """ |
| return cls.extract_text(p_tag) |
|
|
| async def extract_page_content(self, url, session): |
| """ |
| Fetch the Wikipedia page HTML once and extract both the infobox and a |
| paragraph with sufficient content. |
| """ |
| html_content = await self.fetch_html(url, session) |
| if not html_content: |
| print("Failed to fetch the page.") |
| return None, None |
|
|
| soup = bs(html_content, "lxml") |
| |
| table = soup.find("table", class_="infobox") |
| infobox_lines = self.process_infobox(table) if table else None |
|
|
| |
| content_div = soup.find("div", class_="mw-content-ltr") |
| paragraph_text = None |
| if content_div: |
| paragraphs = content_div.find_all("p") |
| for p in paragraphs: |
| text = self.process_paragraph(p) |
| |
| if len(text.split()) >= 5: |
| paragraph_text = text |
| break |
|
|
| return infobox_lines, paragraph_text |
|
|
| async def search_wikipedia_url(self, query, session): |
| """ |
| Perform a Bing search for the query and return a Wikipedia URL if found. |
| """ |
| |
| encoded_query = urllib.parse.quote_plus(query + " wikipedia.org") |
| bing_url = f"https://www.bing.com/search?q={encoded_query}" |
| html_content = await self.fetch_html(bing_url, session) |
| if not html_content: |
| return None |
| soup = bs(html_content, "lxml") |
| |
| results = soup.find_all("li", class_="b_algo") or soup.find_all("div", class_="b_algo") |
| for result in results: |
| a_tag = result.find("a", href=True) |
| if a_tag and "wikipedia.org" in a_tag["href"]: |
| return a_tag["href"] |
| return None |
|
|
| async def get_wikipedia_url_concurrently(self, query, session, num_attempts=10): |
| """ |
| Launch num_attempts concurrent search tasks for a Wikipedia URL. |
| Return the first found Wikipedia URL and cancel the other tasks. |
| """ |
| tasks = [asyncio.create_task(self.search_wikipedia_url(query, session)) for _ in range(num_attempts)] |
| done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) |
| wiki_url = None |
| for task in done: |
| wiki_url = task.result() |
| if wiki_url: |
| break |
| for task in pending: |
| task.cancel() |
| return wiki_url |
|
|
| async def search_wikipedia(self, query, session): |
| """ |
| Search Bing for a Wikipedia URL and extract its infobox and a paragraph. |
| """ |
| wiki_url = await self.get_wikipedia_url_concurrently(query, session, num_attempts=10) |
| if not wiki_url: |
| print("Could not find a Wikipedia URL for the query.") |
| return None |
| print(f"\nFound Wikipedia URL: {wiki_url}\n") |
| infobox_lines, paragraph_text = await self.extract_page_content(wiki_url, session) |
| return { |
| "wikipedia_url": wiki_url, |
| "infobox": infobox_lines, |
| "paragraph": paragraph_text |
| } |
|
|
| |
| class BingScraperAPI: |
| """ |
| A unified API that exposes methods as: |
| - search(query) |
| - video(query) |
| - images(query) |
| - wikipedia(query) |
| In addition, a new asynchronous generator 'fetch' is provided to run multiple |
| providers concurrently and yield results as soon as they become available. |
| """ |
| def __init__(self): |
| self.scraper = BingScraper() |
|
|
| async def search(self, query): |
| """Perform a regular and news text search.""" |
| async with aiohttp.ClientSession() as session: |
| return await self.scraper.search_text(query, session) |
|
|
| async def video(self, query): |
| """Perform a video search.""" |
| async with aiohttp.ClientSession() as session: |
| return await self.scraper.search_video(query, session) |
|
|
| async def images(self, query): |
| """Perform an image search.""" |
| async with aiohttp.ClientSession() as session: |
| return await self.scraper.search_image(query, session) |
|
|
| async def wikipedia(self, query): |
| """Perform a Wikipedia search and extract infobox and content.""" |
| async with aiohttp.ClientSession() as session: |
| return await self.scraper.search_wikipedia(query, session) |
|
|
| async def fetch(self, providers, param): |
| """ |
| Accepts: |
| - providers: a list of provider names (e.g., ["search", "video", "images"]) |
| - param: a list of query parameters. |
| If a single parameter is provided, it is used for all providers. |
| Otherwise, the number of parameters must match the number of providers. |
| |
| Launches tasks concurrently and yields (provider_name, result) as soon as each is available. |
| """ |
| |
| provider_map = { |
| "search": self.search, |
| "video": self.video, |
| "images": self.images, |
| "wikipedia": self.wikipedia |
| } |
|
|
| |
| if len(param) == 1: |
| params = [param[0]] * len(providers) |
| elif len(param) == len(providers): |
| params = param |
| else: |
| raise ValueError("The number of query parameters must be either 1 or equal to the number of providers.") |
|
|
| tasks = [] |
| for prov, q in zip(providers, params): |
| if prov not in provider_map: |
| raise ValueError(f"Unknown provider: {prov}") |
| |
| task = asyncio.create_task(provider_map[prov](q)) |
| tasks.append((prov, task)) |
|
|
| |
| for prov, task in tasks: |
| try: |
| result = await asyncio.wait_for(task, timeout=60) |
| yield prov, result |
| except Exception as e: |
| yield prov, {"error": str(e)} |
|
|
|
|
| app = FastAPI(title="BingScraper API (FastAPI wrapper)") |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| class FetchRequest(BaseModel): |
| providers: List[str] |
| param: List[str] |
|
|
|
|
| |
| ALLOWED_PROVIDERS = { |
| "search": "search_text", |
| "video": "search_video", |
| "images": "search_image", |
| "wikipedia": "search_wikipedia" |
| } |
|
|
|
|
| async def _run_provider(scraper: BingScraperAPI, provider: str, query: str, session: aiohttp.ClientSession, timeout: float = 30.0) -> Dict[str, Any]: |
| """ |
| Run a single provider using the underlying BingScraper methods (reusing the given session). |
| Returns a dict: {"provider": provider, "result": ...} |
| """ |
| |
| low_level = ALLOWED_PROVIDERS.get(provider) |
| if not low_level: |
| raise ValueError(f"Unknown provider: {provider}") |
|
|
| method = getattr(scraper.scraper, low_level, None) |
| if not method or not asyncio.iscoroutinefunction(method): |
| raise RuntimeError(f"Provider method not found or not async: {low_level}") |
|
|
| try: |
| |
| result = await asyncio.wait_for(method(query, session), timeout=timeout) |
| return {"provider": provider, "result": result} |
| except asyncio.CancelledError: |
| raise |
| except Exception as e: |
| return {"provider": provider, "error": str(e)} |
|
|
|
|
| @app.post("/v1/agents/websearch") |
| async def fetch_post(payload: FetchRequest): |
| """ |
| Accepts JSON: |
| { |
| "providers": ["search", "images"], |
| "param": ["who is elon musk", "elon musk"] |
| } |
| Returns JSON mapping provider -> result as they finish (collected). |
| """ |
| providers = payload.providers |
| param = payload.param |
|
|
| if not providers: |
| raise HTTPException(status_code=400, detail="providers must be a non-empty list") |
| if not param: |
| raise HTTPException(status_code=400, detail="param must be a non-empty list") |
|
|
| |
| if len(param) == 1: |
| params = [param[0]] * len(providers) |
| elif len(param) == len(providers): |
| params = param |
| else: |
| raise HTTPException(status_code=400, detail="param length must be 1 or equal to length of providers") |
|
|
| |
| for p in providers: |
| if p not in ALLOWED_PROVIDERS: |
| raise HTTPException(status_code=400, detail=f"Unsupported provider: {p}") |
|
|
| scraper_api = BingScraperAPI() |
|
|
| |
| async with aiohttp.ClientSession() as session: |
| tasks = [ |
| asyncio.create_task(_run_provider(scraper_api, prov, q, session)) |
| for prov, q in zip(providers, params) |
| ] |
|
|
| results = {} |
| |
| for coro in asyncio.as_completed(tasks): |
| res = await coro |
| prov = res.get("provider") |
| |
| results[prov] = res.get("result") if "result" in res else {"error": res.get("error")} |
|
|
| return JSONResponse(results) |
|
|
|
|
| @app.get("/v1/agents/websearch/stream") |
| async def stream(request: Request, providers: str, params: str): |
| """ |
| Stream results as Server-Sent Events (SSE). |
| Query string example: |
| /stream?providers=search,images¶ms=who%20is%20elon%20musk,elon%20musk |
| Each SSE event will be JSON: |
| {"provider": "<provider>", "result": <result or error>} |
| """ |
| prov_list = [p.strip() for p in providers.split(",") if p.strip()] |
| param_list = [p.strip() for p in params.split(",") if p.strip()] |
|
|
| if not prov_list: |
| raise HTTPException(status_code=400, detail="providers query param required") |
| if not param_list: |
| raise HTTPException(status_code=400, detail="params query param required") |
|
|
| if len(param_list) == 1: |
| param_list = [param_list[0]] * len(prov_list) |
| elif len(param_list) != len(prov_list): |
| raise HTTPException(status_code=400, detail="params must have length 1 or equal to providers length") |
|
|
| for p in prov_list: |
| if p not in ALLOWED_PROVIDERS: |
| raise HTTPException(status_code=400, detail=f"Unsupported provider: {p}") |
|
|
| scraper_api = BingScraperAPI() |
|
|
| async def event_generator(): |
| |
| async with aiohttp.ClientSession() as session: |
| tasks = [ |
| asyncio.create_task(_run_provider(scraper_api, prov, q, session)) |
| for prov, q in zip(prov_list, param_list) |
| ] |
|
|
| for coro in asyncio.as_completed(tasks): |
| if await request.is_disconnected(): |
| |
| for t in tasks: |
| t.cancel() |
| break |
| res = await coro |
| |
| payload = json.dumps(res, default=str) |
| yield f"data: {payload}\n\n" |
|
|
| return StreamingResponse(event_generator(), media_type="text/event-stream") |
|
|
|
|
| |
| @app.get("/") |
| async def root(): |
| return {"msg": "BingScraper FastAPI wrapper. Use POST /fetch or GET /stream"} |
|
|
|
|
| |
| |