Spaces:
Sleeping
Sleeping
import re | |
import asyncio | |
from typing import List, Dict | |
from urllib.parse import urlparse | |
import httpx | |
from bs4 import BeautifulSoup | |
import os | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Import our multi-LLM handler | |
from LLM.llm_handler import llm_handler | |
# URL extraction pattern (same as ShastraDocs) | |
URL_PATTERN = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+') | |
def extract_urls_from_text(text: str) -> List[str]: | |
urls = URL_PATTERN.findall(text or "") | |
seen = set() | |
clean_urls = [] | |
for url in urls: | |
clean_url = url.rstrip('.,;:!?)') | |
if clean_url and clean_url not in seen and validate_url(clean_url): | |
seen.add(clean_url) | |
clean_urls.append(clean_url) | |
return clean_urls | |
def validate_url(url: str) -> bool: | |
try: | |
result = urlparse(url) | |
return bool(result.scheme and result.netloc) | |
except Exception: | |
return False | |
async def scrape_url(url: str, max_chars: int = 4000) -> Dict[str, str]: | |
"""Async URL scraping using httpx + BeautifulSoup (FastAPI-friendly).""" | |
try: | |
timeout = httpx.Timeout(20.0) | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
} | |
async with httpx.AsyncClient(timeout=timeout, headers=headers, follow_redirects=True) as client: | |
resp = await client.get(url) | |
resp.raise_for_status() | |
soup = BeautifulSoup(resp.content, 'html.parser') | |
for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): | |
tag.decompose() | |
text_content = soup.get_text(separator=' ', strip=True) | |
cleaned = ' '.join(text_content.split()) | |
if len(cleaned) > max_chars: | |
cleaned = cleaned[:max_chars] + "..." | |
return { | |
'url': url, | |
'content': cleaned, | |
'status': 'success', | |
'length': len(cleaned), | |
'title': soup.title.string if soup.title else 'No title' | |
} | |
except httpx.TimeoutException: | |
return {'url': url, 'content': 'Timeout error', 'status': 'timeout', 'length': 0, 'title': 'Timeout'} | |
except Exception as e: | |
return {'url': url, 'content': f'Error: {str(e)[:100]}', 'status': 'error', 'length': 0, 'title': 'Error'} | |
async def scrape_urls(urls: List[str], max_chars: int = 4000) -> List[Dict[str, str]]: | |
if not urls: | |
return [] | |
sem = asyncio.Semaphore(5) | |
async def _scrape(u): | |
async with sem: | |
return await scrape_url(u, max_chars) | |
results = await asyncio.gather(*[_scrape(u) for u in urls], return_exceptions=True) | |
final = [] | |
for i, r in enumerate(results): | |
if isinstance(r, Exception): | |
final.append({'url': urls[i], 'content': f'Exception: {str(r)[:100]}', 'status': 'exception', 'length': 0, 'title': 'Exception'}) | |
else: | |
final.append(r) | |
return final | |
def build_additional_content(scrapes: List[Dict[str, str]]) -> str: | |
parts = [] | |
for r in scrapes: | |
if r.get('status') == 'success' and r.get('length', 0) > 50: | |
parts.append("\n" + "="*50) | |
parts.append(f"SOURCE: Additional Source") | |
parts.append(f"URL: {r.get('url','')}") | |
parts.append(f"TITLE: {r.get('title','No title')}") | |
parts.append("-"*30 + " CONTENT " + "-"*30) | |
parts.append(r.get('content','')) | |
parts.append("="*50) | |
return "\n".join(parts) | |
def parse_numbered_answers(text: str, expected_count: int) -> List[str]: | |
"""Parse numbered answers, with sane fallbacks.""" | |
pattern = re.compile(r'^\s*(\d+)[\).\-]\s*(.+)$', re.MULTILINE) | |
matches = pattern.findall(text or "") | |
result: Dict[int, str] = {} | |
for num_str, answer in matches: | |
try: | |
num = int(num_str) | |
if 1 <= num <= expected_count: | |
clean_answer = re.sub(r'\s+', ' ', answer).strip() | |
if clean_answer: | |
result[num] = clean_answer | |
except Exception: | |
continue | |
answers: List[str] = [] | |
for i in range(1, expected_count + 1): | |
answers.append(result.get(i, f"Unable to find answer for question {i}")) | |
return answers | |
def parse_answers_from_json(raw: str, expected_count: int) -> List[str]: | |
import json, re | |
# Try direct JSON | |
try: | |
obj = json.loads(raw) | |
if isinstance(obj, dict) and isinstance(obj.get('answers'), list): | |
out = [str(x).strip() for x in obj['answers']][:expected_count] | |
while len(out) < expected_count: | |
out.append(f"Unable to find answer for question {len(out)+1}") | |
return out | |
except Exception: | |
pass | |
# Try to extract JSON fragment | |
m = re.search(r'\{[^\{\}]*"answers"[^\{\}]*\}', raw or "", re.DOTALL) | |
if m: | |
try: | |
obj = json.loads(m.group(0)) | |
if isinstance(obj, dict) and isinstance(obj.get('answers'), list): | |
out = [str(x).strip() for x in obj['answers']][:expected_count] | |
while len(out) < expected_count: | |
out.append(f"Unable to find answer for question {len(out)+1}") | |
return out | |
except Exception: | |
pass | |
# Fallback to numbered parsing | |
return parse_numbered_answers(raw or "", expected_count) | |
async def get_oneshot_answer(content: str, questions: List[str]) -> List[str]: | |
""" | |
Enhanced oneshot QA flow with ShastraDocs-style URL extraction and scraping. | |
- Extract URLs from content and questions | |
- Scrape relevant pages | |
- Merge additional content and feed to LLM | |
- Return per-question answers | |
""" | |
if not questions: | |
return [] | |
try: | |
# Build numbered questions | |
numbered_questions = "\n".join([f"{i+1}. {q}" for i, q in enumerate(questions)]) | |
# Find URLs from content and questions | |
combined = (content or "") + "\n" + "\n".join(questions or []) | |
found_urls = extract_urls_from_text(combined) | |
# Special case: content starts with URL marker | |
if content.startswith("URL for Context:"): | |
only_url = content.replace("URL for Context:", "").strip() | |
if validate_url(only_url): | |
if only_url not in found_urls: | |
found_urls.insert(0, only_url) | |
# Scrape URLs if any | |
additional_content = "" | |
if found_urls: | |
print(f"π Scraping {len(found_urls)} URL(s) for additional context...") | |
scrape_results = await scrape_urls(found_urls, max_chars=4000) | |
additional_content = build_additional_content(scrape_results) | |
print(f"π Additional content length: {len(additional_content)}") | |
# Merge final context | |
if additional_content: | |
final_context = (content or "") + "\n\nADDITIONAL INFORMATION FROM SCRAPED SOURCES:\n" + additional_content | |
else: | |
final_context = content or "" | |
print(f"π Final context length: {len(final_context)}") | |
# Prompts (ask for JSON answers to improve parsing) | |
system_prompt = ( | |
"You are an expert assistant. Read ALL provided context (including any 'ADDITIONAL INFORMATION FROM\n" | |
"SCRAPED SOURCES') and answer the questions comprehensively. If info is missing, say so." | |
) | |
user_prompt = f"""FULL CONTEXT: | |
{final_context[:8000]}{"..." if len(final_context) > 8000 else ""} | |
QUESTIONS: | |
{numbered_questions} | |
Respond in this EXACT JSON format: | |
{{ | |
"answers": [ | |
"<Answer to question 1>", | |
"<Answer to question 2>", | |
"<Answer to question 3>" | |
] | |
}}""" | |
print(f"π€ Using {llm_handler.provider.upper()} model: {llm_handler.model_name}") | |
raw = await llm_handler.generate_text( | |
system_prompt=system_prompt, | |
user_prompt=user_prompt, | |
temperature=0.4, | |
max_tokens=1800 | |
) | |
print(f"π LLM response length: {len(raw) if raw else 0}") | |
answers = parse_answers_from_json(raw, len(questions)) | |
print(f"β Parsed {len(answers)} answers") | |
return answers | |
except Exception as e: | |
print(f"β Error in oneshot answer generation: {str(e)}") | |
return [f"Error processing question: {str(e)}" for _ in questions] | |