|
import os |
|
import warnings |
|
import logging |
|
import time |
|
import json |
|
import hashlib |
|
from datetime import datetime |
|
from concurrent.futures import ThreadPoolExecutor |
|
from threading import Lock |
|
|
|
|
|
cache_dir = os.path.join(os.getcwd(), ".cache") |
|
os.makedirs(cache_dir, exist_ok=True) |
|
os.environ['HF_HOME'] = cache_dir |
|
os.environ['TRANSFORMERS_CACHE'] = cache_dir |
|
|
|
|
|
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' |
|
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' |
|
os.environ['TF_LOGGING_LEVEL'] = 'ERROR' |
|
os.environ['TF_ENABLE_DEPRECATION_WARNINGS'] = '0' |
|
|
|
warnings.filterwarnings('ignore', category=DeprecationWarning, module='tensorflow') |
|
logging.getLogger('tensorflow').setLevel(logging.ERROR) |
|
|
|
from fastapi import FastAPI, HTTPException, Depends, Header |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel |
|
from pdf_parser import parse_pdf_from_url_multithreaded as parse_pdf_from_url, parse_pdf_from_file_multithreaded as parse_pdf_from_file |
|
from embedder import build_faiss_index, preload_model |
|
from retriever import retrieve_chunks |
|
from llm import query_gemini |
|
import uvicorn |
|
|
|
app = FastAPI(title="HackRx Insurance Policy Assistant", version="1.0.0") |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
print("Starting up HackRx Insurance Policy Assistant...") |
|
print("Preloading sentence transformer model...") |
|
preload_model() |
|
print("Model preloading completed. API is ready to serve requests!") |
|
|
|
@app.get("/") |
|
async def root(): |
|
return {"message": "HackRx Insurance Policy Assistant API is running!"} |
|
|
|
@app.get("/health") |
|
async def health_check(): |
|
return {"status": "healthy"} |
|
|
|
class QueryRequest(BaseModel): |
|
documents: str |
|
questions: list[str] |
|
|
|
class LocalQueryRequest(BaseModel): |
|
document_path: str |
|
questions: list[str] |
|
|
|
def verify_token(authorization: str = Header(None)): |
|
if not authorization or not authorization.startswith("Bearer "): |
|
raise HTTPException(status_code=401, detail="Invalid authorization header") |
|
token = authorization.replace("Bearer ", "") |
|
if not token: |
|
raise HTTPException(status_code=401, detail="Invalid token") |
|
return token |
|
|
|
def process_batch(batch_questions, context_chunks): |
|
return query_gemini(batch_questions, context_chunks) |
|
|
|
def get_document_id_from_url(url: str) -> str: |
|
return hashlib.md5(url.encode()).hexdigest() |
|
|
|
|
|
doc_cache = {} |
|
doc_cache_lock = Lock() |
|
|
|
@app.post("/api/v1/hackrx/run") |
|
async def run_query(request: QueryRequest, token: str = Depends(verify_token)): |
|
start_time = time.time() |
|
timing_data = {} |
|
try: |
|
print("=== INPUT JSON ===") |
|
print(json.dumps({"documents": request.documents, "questions": request.questions}, indent=2)) |
|
print("==================\n") |
|
|
|
print(f"Processing {len(request.questions)} questions...") |
|
|
|
|
|
doc_id = get_document_id_from_url(request.documents) |
|
with doc_cache_lock: |
|
if doc_id in doc_cache: |
|
print("✅ Using cached document...") |
|
cached = doc_cache[doc_id] |
|
text_chunks = cached["chunks"] |
|
index = cached["index"] |
|
texts = cached["texts"] |
|
else: |
|
print("⚙️ Parsing and indexing new document...") |
|
pdf_start = time.time() |
|
text_chunks = parse_pdf_from_url(request.documents) |
|
timing_data['pdf_parsing'] = round(time.time() - pdf_start, 2) |
|
|
|
index_start = time.time() |
|
index, texts = build_faiss_index(text_chunks) |
|
timing_data['faiss_index_building'] = round(time.time() - index_start, 2) |
|
|
|
doc_cache[doc_id] = { |
|
"chunks": text_chunks, |
|
"index": index, |
|
"texts": texts |
|
} |
|
|
|
|
|
retrieval_start = time.time() |
|
all_chunks = set() |
|
for question in request.questions: |
|
top_chunks = retrieve_chunks(index, texts, question) |
|
all_chunks.update(top_chunks) |
|
timing_data['chunk_retrieval'] = round(time.time() - retrieval_start, 2) |
|
print(f"Retrieved {len(all_chunks)} unique chunks") |
|
|
|
|
|
questions = request.questions |
|
context_chunks = list(all_chunks) |
|
batch_size = 10 |
|
batches = [(i, questions[i:i + batch_size]) for i in range(0, len(questions), batch_size)] |
|
|
|
llm_start = time.time() |
|
results_dict = {} |
|
with ThreadPoolExecutor(max_workers=min(5, len(batches))) as executor: |
|
futures = [executor.submit(process_batch, batch, context_chunks) for _, batch in batches] |
|
for (start_idx, batch), future in zip(batches, futures): |
|
try: |
|
result = future.result() |
|
if isinstance(result, dict) and "answers" in result: |
|
for j, answer in enumerate(result["answers"]): |
|
results_dict[start_idx + j] = answer |
|
else: |
|
for j in range(len(batch)): |
|
results_dict[start_idx + j] = "Error in response" |
|
except Exception as e: |
|
for j in range(len(batch)): |
|
results_dict[start_idx + j] = f"Error: {str(e)}" |
|
timing_data['llm_processing'] = round(time.time() - llm_start, 2) |
|
|
|
responses = [results_dict.get(i, "Not Found") for i in range(len(questions))] |
|
timing_data['total_time'] = round(time.time() - start_time, 2) |
|
|
|
print(f"\n=== TIMING BREAKDOWN ===") |
|
for k, v in timing_data.items(): |
|
print(f"{k}: {v}s") |
|
print(f"=======================\n") |
|
|
|
print(f"=== OUTPUT JSON ===") |
|
print(json.dumps({"answers": responses}, indent=2)) |
|
print(f"==================\n") |
|
|
|
return {"answers": responses} |
|
|
|
except Exception as e: |
|
print(f"Error: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") |
|
|
|
@app.post("/api/v1/hackrx/local") |
|
async def run_local_query(request: LocalQueryRequest): |
|
start_time = time.time() |
|
timing_data = {} |
|
try: |
|
print("=== INPUT JSON ===") |
|
print(json.dumps({"document_path": request.document_path, "questions": request.questions}, indent=2)) |
|
print("==================\n") |
|
|
|
print(f"Processing {len(request.questions)} questions locally...") |
|
|
|
pdf_start = time.time() |
|
text_chunks = parse_pdf_from_file(request.document_path) |
|
timing_data['pdf_parsing'] = round(time.time() - pdf_start, 2) |
|
print(f"Extracted {len(text_chunks)} text chunks from PDF") |
|
|
|
index_start = time.time() |
|
index, texts = build_faiss_index(text_chunks) |
|
timing_data['faiss_index_building'] = round(time.time() - index_start, 2) |
|
|
|
retrieval_start = time.time() |
|
all_chunks = set() |
|
for question in request.questions: |
|
top_chunks = retrieve_chunks(index, texts, question) |
|
all_chunks.update(top_chunks) |
|
timing_data['chunk_retrieval'] = round(time.time() - retrieval_start, 2) |
|
print(f"Retrieved {len(all_chunks)} unique chunks") |
|
|
|
questions = request.questions |
|
context_chunks = list(all_chunks) |
|
batch_size = 20 |
|
batches = [(i, questions[i:i + batch_size]) for i in range(0, len(questions), batch_size)] |
|
|
|
llm_start = time.time() |
|
results_dict = {} |
|
with ThreadPoolExecutor(max_workers=min(5, len(batches))) as executor: |
|
futures = [executor.submit(process_batch, batch, context_chunks) for _, batch in batches] |
|
for (start_idx, batch), future in zip(batches, futures): |
|
try: |
|
result = future.result() |
|
if isinstance(result, dict) and "answers" in result: |
|
for j, answer in enumerate(result["answers"]): |
|
results_dict[start_idx + j] = answer |
|
else: |
|
for j in range(len(batch)): |
|
results_dict[start_idx + j] = "Error in response" |
|
except Exception as e: |
|
for j in range(len(batch)): |
|
results_dict[start_idx + j] = f"Error: {str(e)}" |
|
timing_data['llm_processing'] = round(time.time() - llm_start, 2) |
|
|
|
responses = [results_dict.get(i, "Not Found") for i in range(len(questions))] |
|
timing_data['total_time'] = round(time.time() - start_time, 2) |
|
|
|
print(f"\n=== TIMING BREAKDOWN ===") |
|
for k, v in timing_data.items(): |
|
print(f"{k}: {v}s") |
|
print(f"=======================\n") |
|
|
|
print(f"=== OUTPUT JSON ===") |
|
print(json.dumps({"answers": responses}, indent=2)) |
|
print(f"==================\n") |
|
|
|
return {"answers": responses} |
|
|
|
except Exception as e: |
|
print(f"Error: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") |
|
|
|
if __name__ == "__main__": |
|
port = int(os.environ.get("PORT", 7860)) |
|
uvicorn.run("app:app", host="0.0.0.0", port=port) |
|
|