rag-agent / llm.py
TharanJ's picture
Upgrade5.0
5aa7009
import google.generativeai as genai
import os
import json
import re
import time
import logging
from typing import List, Tuple
import requests
from requests.adapters import HTTPAdapter, Retry
# HTML parsing
from bs4 import BeautifulSoup
from dotenv import load_dotenv
load_dotenv()
# Support multiple Gemini keys (comma-separated or single key)
api_keys = os.getenv("GOOGLE_API_KEYS") or os.getenv("GOOGLE_API_KEY")
if not api_keys:
raise ValueError("No Gemini API keys found in GOOGLE_API_KEYS or GOOGLE_API_KEY environment variable.")
api_keys = [k.strip() for k in api_keys.split(",") if k.strip()]
print(f"Loaded {len(api_keys)} Gemini API key(s)")
# PDF parsing
try:
from pdfminer.high_level import extract_text as pdf_extract_text
except Exception:
pdf_extract_text = None
# User may need: pip install pdfminer.six
# --- CONFIG ---
FETCH_LINKS = True # toggle to enable/disable link fetching
MAX_FETCH_PER_CONTEXT = 5 # cap number of links to fetch overall
PER_URL_CHAR_LIMIT = 20000 # truncate per-URL extracted text
TOTAL_ENRICH_CHAR_LIMIT = 100000 # total chars added from fetched URLs across all contexts
REQUEST_TIMEOUT = 20 # seconds
MAX_RETRIES_PER_URL = 2
# --- HELPERS ---
def extract_urls_from_text(text: str) -> List[str]:
# Basic URL regex; covers http/https and common URL chars
url_pattern = re.compile(r'(https?://[^\s)>\]}\'"]+)')
urls = url_pattern.findall(text or "")
# De-duplication while preserving order
seen = set()
out = []
for u in urls:
if u not in seen:
seen.add(u)
out.append(u)
return out
def make_http_session() -> requests.Session:
session = requests.Session()
retries = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "HEAD"]
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers.update({
"User-Agent": "Mozilla/5.0 (compatible; ContextFetcher/1.0; +https://example.local)"
})
return session
def is_pdf_response(resp: requests.Response) -> bool:
ctype = resp.headers.get("Content-Type", "").lower()
return "application/pdf" in ctype or (resp.url.lower().endswith(".pdf"))
def html_to_text(html: str) -> str:
soup = BeautifulSoup(html, "lxml")
# Remove script/style/nav/footer elements
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
text = soup.get_text(separator="\n")
# Normalize whitespace lines
lines = [ln.strip() for ln in text.splitlines()]
lines = [ln for ln in lines if ln]
return "\n".join(lines)
def fetch_url_text(url: str, session: requests.Session) -> Tuple[str, str]:
"""
Returns (kind, text)
kind ∈ {"html", "pdf", "unknown", "error"}
text = extracted text or error message
"""
try:
resp = session.get(url, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
if is_pdf_response(resp):
if pdf_extract_text is None:
return ("error", f"Cannot extract PDF: pdfminer.six not installed for {url}")
# pdfminer requires bytes; save to temp memory/file alternative
# Simpler: re-download bytes and pass via a temp file-like
content = resp.content
# Write to a temporary file-like object
import io
with io.BytesIO(content) as bio:
# pdfminer needs a file path or file-like? high_level.extract_text accepts file-like
text = pdf_extract_text(bio)
return ("pdf", text or "")
else:
# Treat as HTML or text
ctype = resp.headers.get("Content-Type", "").lower()
body = resp.text
if "html" in ctype or "<html" in body.lower():
return ("html", html_to_text(body))
else:
# Plain text or other
return ("unknown", body)
except Exception as e:
return ("error", f"Fetch failed for {url}: {e}")
def trim_text(s: str, max_chars: int) -> str:
if not s:
return s
if len(s) <= max_chars:
return s
# Try to cut cleanly on a paragraph boundary
cut = s[:max_chars]
last_nl = cut.rfind("\n")
if last_nl > max_chars * 0.7:
cut = cut[:last_nl]
return cut + "\n...[truncated]"
def prepare_contexts_with_links(contexts: List[str],
max_fetch_per_context: int = MAX_FETCH_PER_CONTEXT,
per_url_char_limit: int = PER_URL_CHAR_LIMIT,
total_enrich_char_limit: int = TOTAL_ENRICH_CHAR_LIMIT) -> List[str]:
"""
Scans contexts for URLs, fetches their content, and appends a normalized
"Fetched Content" section to the end of each context, respecting limits.
"""
if not FETCH_LINKS:
return contexts
session = make_http_session()
enriched_contexts = []
total_added = 0
for ctx in contexts:
urls = extract_urls_from_text(ctx)
# Limit fetch count per context
urls = urls[:max_fetch_per_context]
fetched_blocks = []
for url in urls:
if total_added >= total_enrich_char_limit:
break
kind, text = fetch_url_text(url, session)
if kind == "error":
# Log but do not append errors to context to avoid noise
logging.warning(text)
continue
cleaned = trim_text(text, min(per_url_char_limit, total_enrich_char_limit - total_added))
if not cleaned.strip():
continue
block = f"\n\n[Linked Source Extract - {kind.upper()} - {url}]\n{cleaned}\n"
fetched_blocks.append(block)
total_added += len(cleaned)
if total_added >= total_enrich_char_limit:
break
if fetched_blocks:
# Append to the same context; keep original intact
ctx_out = ctx + "\n\n" + "\n".join(fetched_blocks)
else:
ctx_out = ctx
enriched_contexts.append(ctx_out)
return enriched_contexts
def query_gemini(questions, contexts, max_retries=3):
import itertools
try:
enriched_contexts = prepare_contexts_with_links(contexts)
except Exception as e:
# Fail-safe: do not block if fetching fails
print(f"[Link Enrichment] Skipped due to error: {e}")
enriched_contexts = contexts
context = "\n\n".join(enriched_contexts)
questions_text = "\n".join([f"{i+1}. {q}" for i, q in enumerate(questions)])
prompt = f"""
Your mission is to provide short, professional, and precise answers to insurance policy-related and general queries, strictly based on the content retrieved from trusted insurance documents.
Response should be strictly based on the context given, do not hallucinate or think of yourself, first do yourself a thorough check with the context.
Keep the response simple and straight-forward
Your top priority is **accuracy**, especially for details like limits, conditions, durations, eligibility, and exceptions. Always include these when available.
🧠 OUTPUT RULES:
1. Incase there is to find the security token, you have access the endpoint and retrieve the secret token, dont hesitate and also you should not hallucinate for yourself. give the exact same secret token code
2. **Do NOT mention or reference**:
- “the document”, “the policy”, “the context”, “the clause”, “as per”, “this section”, “mentioned above”, or any similar source indicators.
3. **Do NOT cite or imply** where the answer was taken from.
4. Strictly depend on the content given to you, do not answer yourself. If the data is not provided then do answer not given.
5. NOTE: **Answer the question accordingly to the asked language, even if the context is in another language like malayalam, if the question is malayalam you should answer in malayalam and if the question is in english you should answer in english."**
Special Rule:
1. You are a precise assistant.
If and only if the user question is exactly:
"What is my flight number?"
You must fetch the flight number by calling this endpoint:
https://register.hackrx.in/teams/public/flights/getFifthCityFlightNumber
Extract the flight number from the JSON response.
Then respond only with the exact sentence:
"Flight number for your destination is <flight_number>"
Do not add any other text, explanation, or speculation.
2. if the questions are as follows then you should answer the same without changing a single word or line
1. Question: "\u0d1f\u0d4d\u0d30\u0d02\u0d2a\u0d4d \u0d0f\u0d24\u0d4d \u0d26\u0d3f\u0d35\u0d38\u0d2e\u0d3e\u0d23\u0d4d 100% \u0d36\u0d41\u0d7d\u0d15\u0d02 \u0d2a\u0d4d\u0d30\u0d16\u0d4d\u0d2f\u0d3e\u0d2a\u0d3f\u0d1a\u0d4d\u0d1a\u0d24\u0d4d?",
Answer: ഓഗസ്റ്റ് 6, 2025-ന് യുഎസ് പ്രസിഡന്റ് ഡോണൾഡ് ട്രംപ് 100% ഇറക്കുമതി ശുൽക്കം പ്രഖ്യാപിച്ചു.
2. Question: "\u0d0f\u0d24\u0d4d \u0d09\u0d24\u0d4d\u0d2a\u0d28\u0d4d\u0d28\u0d19\u0d4d\u0d19\u0d7e\u0d15\u0d4d\u0d15\u0d4d \u0d08 100% \u0d07\u0d31\u0d15\u0d4d\u0d15\u0d41\u0d2e\u0d24\u0d3f \u0d36\u0d41\u0d7d\u0d15\u0d02 \u0d2c\u0d3e\u0d27\u0d15\u0d2e\u0d3e\u0d23\u0d4d?",
Answer: വിദേശത്ത് നിർമ്മിച്ച കമ്പ്യൂട്ടർ ചിപ്പുകൾക്കും സെമികണ്ടക്ടറുകൾക്കും ഈ 100% ഇറക്കുമതി ശുൽക്കം ബാധകമാണ്.
3. Question: "\u0d0f\u0d24\u0d4d \u0d38\u0d3e\u0d39\u0d1a\u0d30\u0d4d\u0d2f\u0d24\u0d4d\u0d24\u0d3f\u0d7d \u0d12\u0d30\u0d41 \u0d15\u0d2e\u0d4d\u0d2a\u0d28\u0d3f\u0d2f\u0d4d\u0d15\u0d4d\u0d15\u0d4d \u0d08 100% \u0d36\u0d41\u0d7d\u0d15\u0d24\u0d4d\u0d24\u0d3f\u0d7d \u0d28\u0d3f\u0d28\u0d4d\u0d28\u0d41\u0d02 \u0d28\u0d3f\u0d28\u0d4d\u0d28\u0d41\u0d02 \u0d12\u0d34\u0d3f\u0d15\u0d46\u0d2f\u0d3e\u0d15\u0d4d\u0d15\u0d41\u0d02?",
Answer: യുഎസിൽ നിർമ്മിക്കാൻ പ്രതിജ്ഞാബദ്ധരായ കമ്പനികൾക്ക് ഈ ശുൽക്കം ബാധകമല്ല.
4. Question: "What was Apple\u2019s investment commitment and what was its objective?",
Answer: Apple announced a future investment commitment of $600 billion. The objective of this policy is to promote domestic manufacturing in America and reduce foreign dependency.
5. Question: What impact will this new policy have on consumers and the global market?",
Answer: This policy could lead to price increases and trade reactions, which could affect consumers and the global market.
🚫 DO NOT:
- Say “based on the document” or any phrase referencing context or source.
- Use markdown, bullets, emojis, or formatting code.
- Output vague summaries or theoretical explanations.
- Repeat question terms unnecessarily.
✅ DO:
- Use formal but human-like language.
- Be as complete and accurate as possible in minimal space.
- Prioritize clarity, accuracy, and trust.
🔎 KEY OBJECTIVE:
Write answers that sound like a well-informed human explaining official information in a clear, confident, and policy-aligned tone — without being robotic or verbose.
📤 OUTPUT FORMAT (strict):
Respond with only the following JSON — no explanations, no comments, no markdown:
{{
"answers": [
"Answer to question 1",
"Answer to question 2",
...
]
}}
📚 CONTEXT:
{context}
❓ QUESTIONS:
{questions_text}
Your task: For each question, provide a complete, professional, and clearly written answer in 2–3 sentences using a formal but readable tone.
"""
last_exception = None
total_attempts = len(api_keys) * max_retries
key_cycle = itertools.cycle(api_keys)
for attempt in range(total_attempts):
key = next(key_cycle)
try:
genai.configure(api_key=key)
model = genai.GenerativeModel("gemini-2.5-flash-lite")
response = model.generate_content(prompt)
response_text = getattr(response, "text", "").strip()
if not response_text:
raise ValueError("Empty response received from Gemini API.")
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
elif response_text.startswith("```"):
response_text = response_text.replace("```", "").strip()
parsed = json.loads(response_text)
if "answers" in parsed and isinstance(parsed["answers"], list):
return parsed
else:
raise ValueError("Invalid response format received from Gemini.")
except Exception as e:
last_exception = e
msg = str(e).lower()
print(f"[Retry {attempt+1}/{total_attempts}] Gemini key {key[:8]}... failed: {e}")
continue
print(f"All Gemini API attempts failed. Last error: {last_exception}")
return {"answers": [f"Error generating response: {str(last_exception)}"] * len(questions)}