Spaces:
Sleeping
Sleeping
File size: 8,704 Bytes
e8051be |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
import re
import asyncio
from typing import List, Dict
from urllib.parse import urlparse
import httpx
from bs4 import BeautifulSoup
import os
from dotenv import load_dotenv
load_dotenv()
# Import our multi-LLM handler
from LLM.llm_handler import llm_handler
# URL extraction pattern (same as ShastraDocs)
URL_PATTERN = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
def extract_urls_from_text(text: str) -> List[str]:
urls = URL_PATTERN.findall(text or "")
seen = set()
clean_urls = []
for url in urls:
clean_url = url.rstrip('.,;:!?)')
if clean_url and clean_url not in seen and validate_url(clean_url):
seen.add(clean_url)
clean_urls.append(clean_url)
return clean_urls
def validate_url(url: str) -> bool:
try:
result = urlparse(url)
return bool(result.scheme and result.netloc)
except Exception:
return False
async def scrape_url(url: str, max_chars: int = 4000) -> Dict[str, str]:
"""Async URL scraping using httpx + BeautifulSoup (FastAPI-friendly)."""
try:
timeout = httpx.Timeout(20.0)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
async with httpx.AsyncClient(timeout=timeout, headers=headers, follow_redirects=True) as client:
resp = await client.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.content, 'html.parser')
for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
tag.decompose()
text_content = soup.get_text(separator=' ', strip=True)
cleaned = ' '.join(text_content.split())
if len(cleaned) > max_chars:
cleaned = cleaned[:max_chars] + "..."
return {
'url': url,
'content': cleaned,
'status': 'success',
'length': len(cleaned),
'title': soup.title.string if soup.title else 'No title'
}
except httpx.TimeoutException:
return {'url': url, 'content': 'Timeout error', 'status': 'timeout', 'length': 0, 'title': 'Timeout'}
except Exception as e:
return {'url': url, 'content': f'Error: {str(e)[:100]}', 'status': 'error', 'length': 0, 'title': 'Error'}
async def scrape_urls(urls: List[str], max_chars: int = 4000) -> List[Dict[str, str]]:
if not urls:
return []
sem = asyncio.Semaphore(5)
async def _scrape(u):
async with sem:
return await scrape_url(u, max_chars)
results = await asyncio.gather(*[_scrape(u) for u in urls], return_exceptions=True)
final = []
for i, r in enumerate(results):
if isinstance(r, Exception):
final.append({'url': urls[i], 'content': f'Exception: {str(r)[:100]}', 'status': 'exception', 'length': 0, 'title': 'Exception'})
else:
final.append(r)
return final
def build_additional_content(scrapes: List[Dict[str, str]]) -> str:
parts = []
for r in scrapes:
if r.get('status') == 'success' and r.get('length', 0) > 50:
parts.append("\n" + "="*50)
parts.append(f"SOURCE: Additional Source")
parts.append(f"URL: {r.get('url','')}")
parts.append(f"TITLE: {r.get('title','No title')}")
parts.append("-"*30 + " CONTENT " + "-"*30)
parts.append(r.get('content',''))
parts.append("="*50)
return "\n".join(parts)
def parse_numbered_answers(text: str, expected_count: int) -> List[str]:
"""Parse numbered answers, with sane fallbacks."""
pattern = re.compile(r'^\s*(\d+)[\).\-]\s*(.+)$', re.MULTILINE)
matches = pattern.findall(text or "")
result: Dict[int, str] = {}
for num_str, answer in matches:
try:
num = int(num_str)
if 1 <= num <= expected_count:
clean_answer = re.sub(r'\s+', ' ', answer).strip()
if clean_answer:
result[num] = clean_answer
except Exception:
continue
answers: List[str] = []
for i in range(1, expected_count + 1):
answers.append(result.get(i, f"Unable to find answer for question {i}"))
return answers
def parse_answers_from_json(raw: str, expected_count: int) -> List[str]:
import json, re
# Try direct JSON
try:
obj = json.loads(raw)
if isinstance(obj, dict) and isinstance(obj.get('answers'), list):
out = [str(x).strip() for x in obj['answers']][:expected_count]
while len(out) < expected_count:
out.append(f"Unable to find answer for question {len(out)+1}")
return out
except Exception:
pass
# Try to extract JSON fragment
m = re.search(r'\{[^\{\}]*"answers"[^\{\}]*\}', raw or "", re.DOTALL)
if m:
try:
obj = json.loads(m.group(0))
if isinstance(obj, dict) and isinstance(obj.get('answers'), list):
out = [str(x).strip() for x in obj['answers']][:expected_count]
while len(out) < expected_count:
out.append(f"Unable to find answer for question {len(out)+1}")
return out
except Exception:
pass
# Fallback to numbered parsing
return parse_numbered_answers(raw or "", expected_count)
async def get_oneshot_answer(content: str, questions: List[str]) -> List[str]:
"""
Enhanced oneshot QA flow with ShastraDocs-style URL extraction and scraping.
- Extract URLs from content and questions
- Scrape relevant pages
- Merge additional content and feed to LLM
- Return per-question answers
"""
if not questions:
return []
try:
# Build numbered questions
numbered_questions = "\n".join([f"{i+1}. {q}" for i, q in enumerate(questions)])
# Find URLs from content and questions
combined = (content or "") + "\n" + "\n".join(questions or [])
found_urls = extract_urls_from_text(combined)
# Special case: content starts with URL marker
if content.startswith("URL for Context:"):
only_url = content.replace("URL for Context:", "").strip()
if validate_url(only_url):
if only_url not in found_urls:
found_urls.insert(0, only_url)
# Scrape URLs if any
additional_content = ""
if found_urls:
print(f"π Scraping {len(found_urls)} URL(s) for additional context...")
scrape_results = await scrape_urls(found_urls, max_chars=4000)
additional_content = build_additional_content(scrape_results)
print(f"π Additional content length: {len(additional_content)}")
# Merge final context
if additional_content:
final_context = (content or "") + "\n\nADDITIONAL INFORMATION FROM SCRAPED SOURCES:\n" + additional_content
else:
final_context = content or ""
print(f"π Final context length: {len(final_context)}")
# Prompts (ask for JSON answers to improve parsing)
system_prompt = (
"You are an expert assistant. Read ALL provided context (including any 'ADDITIONAL INFORMATION FROM\n"
"SCRAPED SOURCES') and answer the questions comprehensively. If info is missing, say so."
)
user_prompt = f"""FULL CONTEXT:
{final_context[:8000]}{"..." if len(final_context) > 8000 else ""}
QUESTIONS:
{numbered_questions}
Respond in this EXACT JSON format:
{{
"answers": [
"<Answer to question 1>",
"<Answer to question 2>",
"<Answer to question 3>"
]
}}"""
print(f"π€ Using {llm_handler.provider.upper()} model: {llm_handler.model_name}")
raw = await llm_handler.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=0.4,
max_tokens=1800
)
print(f"π LLM response length: {len(raw) if raw else 0}")
answers = parse_answers_from_json(raw, len(questions))
print(f"β
Parsed {len(answers)} answers")
return answers
except Exception as e:
print(f"β Error in oneshot answer generation: {str(e)}")
return [f"Error processing question: {str(e)}" for _ in questions]
|