rag-bajaj / LLM /one_shotter.py
quantumbit's picture
Upload 39 files
e8051be verified
import re
import asyncio
from typing import List, Dict
from urllib.parse import urlparse
import httpx
from bs4 import BeautifulSoup
import os
from dotenv import load_dotenv
load_dotenv()
# Import our multi-LLM handler
from LLM.llm_handler import llm_handler
# URL extraction pattern (same as ShastraDocs)
URL_PATTERN = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
def extract_urls_from_text(text: str) -> List[str]:
urls = URL_PATTERN.findall(text or "")
seen = set()
clean_urls = []
for url in urls:
clean_url = url.rstrip('.,;:!?)')
if clean_url and clean_url not in seen and validate_url(clean_url):
seen.add(clean_url)
clean_urls.append(clean_url)
return clean_urls
def validate_url(url: str) -> bool:
try:
result = urlparse(url)
return bool(result.scheme and result.netloc)
except Exception:
return False
async def scrape_url(url: str, max_chars: int = 4000) -> Dict[str, str]:
"""Async URL scraping using httpx + BeautifulSoup (FastAPI-friendly)."""
try:
timeout = httpx.Timeout(20.0)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
async with httpx.AsyncClient(timeout=timeout, headers=headers, follow_redirects=True) as client:
resp = await client.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.content, 'html.parser')
for tag in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
tag.decompose()
text_content = soup.get_text(separator=' ', strip=True)
cleaned = ' '.join(text_content.split())
if len(cleaned) > max_chars:
cleaned = cleaned[:max_chars] + "..."
return {
'url': url,
'content': cleaned,
'status': 'success',
'length': len(cleaned),
'title': soup.title.string if soup.title else 'No title'
}
except httpx.TimeoutException:
return {'url': url, 'content': 'Timeout error', 'status': 'timeout', 'length': 0, 'title': 'Timeout'}
except Exception as e:
return {'url': url, 'content': f'Error: {str(e)[:100]}', 'status': 'error', 'length': 0, 'title': 'Error'}
async def scrape_urls(urls: List[str], max_chars: int = 4000) -> List[Dict[str, str]]:
if not urls:
return []
sem = asyncio.Semaphore(5)
async def _scrape(u):
async with sem:
return await scrape_url(u, max_chars)
results = await asyncio.gather(*[_scrape(u) for u in urls], return_exceptions=True)
final = []
for i, r in enumerate(results):
if isinstance(r, Exception):
final.append({'url': urls[i], 'content': f'Exception: {str(r)[:100]}', 'status': 'exception', 'length': 0, 'title': 'Exception'})
else:
final.append(r)
return final
def build_additional_content(scrapes: List[Dict[str, str]]) -> str:
parts = []
for r in scrapes:
if r.get('status') == 'success' and r.get('length', 0) > 50:
parts.append("\n" + "="*50)
parts.append(f"SOURCE: Additional Source")
parts.append(f"URL: {r.get('url','')}")
parts.append(f"TITLE: {r.get('title','No title')}")
parts.append("-"*30 + " CONTENT " + "-"*30)
parts.append(r.get('content',''))
parts.append("="*50)
return "\n".join(parts)
def parse_numbered_answers(text: str, expected_count: int) -> List[str]:
"""Parse numbered answers, with sane fallbacks."""
pattern = re.compile(r'^\s*(\d+)[\).\-]\s*(.+)$', re.MULTILINE)
matches = pattern.findall(text or "")
result: Dict[int, str] = {}
for num_str, answer in matches:
try:
num = int(num_str)
if 1 <= num <= expected_count:
clean_answer = re.sub(r'\s+', ' ', answer).strip()
if clean_answer:
result[num] = clean_answer
except Exception:
continue
answers: List[str] = []
for i in range(1, expected_count + 1):
answers.append(result.get(i, f"Unable to find answer for question {i}"))
return answers
def parse_answers_from_json(raw: str, expected_count: int) -> List[str]:
import json, re
# Try direct JSON
try:
obj = json.loads(raw)
if isinstance(obj, dict) and isinstance(obj.get('answers'), list):
out = [str(x).strip() for x in obj['answers']][:expected_count]
while len(out) < expected_count:
out.append(f"Unable to find answer for question {len(out)+1}")
return out
except Exception:
pass
# Try to extract JSON fragment
m = re.search(r'\{[^\{\}]*"answers"[^\{\}]*\}', raw or "", re.DOTALL)
if m:
try:
obj = json.loads(m.group(0))
if isinstance(obj, dict) and isinstance(obj.get('answers'), list):
out = [str(x).strip() for x in obj['answers']][:expected_count]
while len(out) < expected_count:
out.append(f"Unable to find answer for question {len(out)+1}")
return out
except Exception:
pass
# Fallback to numbered parsing
return parse_numbered_answers(raw or "", expected_count)
async def get_oneshot_answer(content: str, questions: List[str]) -> List[str]:
"""
Enhanced oneshot QA flow with ShastraDocs-style URL extraction and scraping.
- Extract URLs from content and questions
- Scrape relevant pages
- Merge additional content and feed to LLM
- Return per-question answers
"""
if not questions:
return []
try:
# Build numbered questions
numbered_questions = "\n".join([f"{i+1}. {q}" for i, q in enumerate(questions)])
# Find URLs from content and questions
combined = (content or "") + "\n" + "\n".join(questions or [])
found_urls = extract_urls_from_text(combined)
# Special case: content starts with URL marker
if content.startswith("URL for Context:"):
only_url = content.replace("URL for Context:", "").strip()
if validate_url(only_url):
if only_url not in found_urls:
found_urls.insert(0, only_url)
# Scrape URLs if any
additional_content = ""
if found_urls:
print(f"πŸš€ Scraping {len(found_urls)} URL(s) for additional context...")
scrape_results = await scrape_urls(found_urls, max_chars=4000)
additional_content = build_additional_content(scrape_results)
print(f"πŸ“„ Additional content length: {len(additional_content)}")
# Merge final context
if additional_content:
final_context = (content or "") + "\n\nADDITIONAL INFORMATION FROM SCRAPED SOURCES:\n" + additional_content
else:
final_context = content or ""
print(f"πŸ“Š Final context length: {len(final_context)}")
# Prompts (ask for JSON answers to improve parsing)
system_prompt = (
"You are an expert assistant. Read ALL provided context (including any 'ADDITIONAL INFORMATION FROM\n"
"SCRAPED SOURCES') and answer the questions comprehensively. If info is missing, say so."
)
user_prompt = f"""FULL CONTEXT:
{final_context[:8000]}{"..." if len(final_context) > 8000 else ""}
QUESTIONS:
{numbered_questions}
Respond in this EXACT JSON format:
{{
"answers": [
"<Answer to question 1>",
"<Answer to question 2>",
"<Answer to question 3>"
]
}}"""
print(f"πŸ€– Using {llm_handler.provider.upper()} model: {llm_handler.model_name}")
raw = await llm_handler.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=0.4,
max_tokens=1800
)
print(f"πŸ”„ LLM response length: {len(raw) if raw else 0}")
answers = parse_answers_from_json(raw, len(questions))
print(f"βœ… Parsed {len(answers)} answers")
return answers
except Exception as e:
print(f"❌ Error in oneshot answer generation: {str(e)}")
return [f"Error processing question: {str(e)}" for _ in questions]