Spaces:
Running
Running
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
from typing import List | |
import requests | |
from bs4 import BeautifulSoup | |
import time | |
import os | |
import json | |
import random | |
import logging | |
import groq | |
import numpy as np | |
from sklearn.metrics.pairwise import cosine_similarity | |
import uvicorn | |
from supabase import create_client, Client | |
from urllib.parse import urljoin, urlparse | |
# Initialize FastAPI app | |
app = FastAPI( | |
title="Web RAG System API", | |
description="Extract content from web pages and perform RAG operations", | |
version="1.0.0" | |
) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Initialize Supabase client with environment variables | |
try: | |
url = os.environ.get('SUPABASE_URL') | |
key = os.environ.get('SUPABASE_SERVICE_ROLE_KEY') | |
if not url or not key: | |
logger.warning("Supabase credentials not found in environment variables") | |
supabase = None | |
else: | |
supabase: Client = create_client(url, key) | |
logger.info("Supabase client initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize Supabase client: {e}") | |
supabase = None | |
# User agents for web scraping | |
user_agents = [ | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Firefox/102.0", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Safari/605.1.15", | |
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:102.0) Gecko/20100101 Firefox/102.0", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36", | |
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/103.0.1264.49", | |
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Mobile/15E148 Safari/604.1", | |
"Mozilla/5.0 (iPad; CPU OS 15_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Mobile/15E148 Safari/604.1", | |
"Mozilla/5.0 (Linux; Android 12; SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Mobile Safari/537.36", | |
"Mozilla/5.0 (Linux; Android 11; Pixel 5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Mobile Safari/537.36", | |
"Mozilla/5.0 (Linux; Android 11; SM-A217F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Mobile Safari/537.36", | |
"Mozilla/5.0 (Linux; Android 10; SM-G975F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Mobile Safari/537.36" | |
] | |
# Pydantic models | |
class RAGRequest(BaseModel): | |
file_path: str | |
prompt: str | |
class URL(BaseModel): | |
url: str | |
async def root(): | |
"""Health check endpoint""" | |
return {"message": "Web RAG System API is running", "status": "healthy"} | |
async def health_check(): | |
"""Detailed health check""" | |
health_status = { | |
"api": "healthy", | |
"supabase": "connected" if supabase else "not configured", | |
"hf_token": "configured" if os.environ.get('hf_token') else "not configured", | |
"groq_token": "configured" if os.environ.get('groq_token') else "not configured" | |
} | |
return health_status | |
async def rag(request: RAGRequest): | |
"""Perform RAG operations on extracted text""" | |
try: | |
# Check required environment variables | |
hf_token = os.environ.get('hf_token') | |
groq_token = os.environ.get('groq_token') | |
if not hf_token: | |
raise HTTPException(status_code=500, detail="HuggingFace token not configured") | |
if not groq_token: | |
raise HTTPException(status_code=500, detail="Groq token not configured") | |
if not supabase: | |
raise HTTPException(status_code=500, detail="Supabase not configured") | |
logger.info(f"Processing RAG request for file: {request.file_path}") | |
# HuggingFace Inference API for embeddings | |
API_URL = "https://router.huggingface.co/hf-inference/models/BAAI/bge-large-en-v1.5/pipeline/feature-extraction" | |
headers = { | |
"Authorization": hf_token, | |
} | |
def query(payload): | |
response = requests.post(API_URL, headers=headers, json=payload) | |
if response.status_code != 200: | |
logger.error(f"HuggingFace API error: {response.status_code} - {response.text}") | |
raise HTTPException(status_code=500, detail="Failed to get embeddings from HuggingFace") | |
return response.json() | |
# Create a Groq client | |
groq_client = groq.Client(api_key=groq_token) | |
def process_with_groq(query_text, context): | |
prompt = f""" | |
Context information: | |
{context} | |
Based on the context information above, please answer the following question: | |
{query_text} | |
Answer: | |
""" | |
try: | |
response = groq_client.chat.completions.create( | |
messages=[{"role": "user", "content": prompt}], | |
model="llama-3.3-70b-versatile", | |
temperature=0.4, | |
max_tokens=512 | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
logger.error(f"Groq API error: {e}") | |
raise HTTPException(status_code=500, detail="Failed to process with Groq") | |
def get_file_from_supabase(bucket_name, file_path): | |
try: | |
response = supabase.storage.from_(bucket_name).download(file_path) | |
content = response.decode('utf-8') | |
return content | |
except Exception as e: | |
logger.error(f"Error downloading file from Supabase: {e}") | |
raise HTTPException( | |
status_code=404, | |
detail=f"File not found in Supabase bucket: {file_path}" | |
) | |
# Get file content from Supabase | |
bucket_name = "url-2-ans-bucket" | |
file_path = request.file_path | |
content = get_file_from_supabase(bucket_name, file_path) | |
logger.info(f"Successfully downloaded file from Supabase: {file_path}") | |
# Simple text chunking | |
chunk_size = 1000 | |
overlap = 200 | |
chunks = [] | |
for i in range(0, len(content), chunk_size - overlap): | |
chunk = content[i:i + chunk_size] | |
if len(chunk) > 100: | |
chunks.append({"text": chunk, "position": i}) | |
logger.info(f"Created {len(chunks)} chunks from document") | |
# Get embeddings for all chunks | |
chunk_embeddings = [] | |
for chunk in chunks: | |
embedding = query({"inputs": chunk["text"]}) | |
chunk_embeddings.append(embedding) | |
# Get embedding for the query | |
query_embedding = query({"inputs": request.prompt}) | |
# Calculate similarity between query and all chunks | |
similarities = [] | |
for chunk_embedding in chunk_embeddings: | |
query_np = np.array(query_embedding) | |
chunk_np = np.array(chunk_embedding) | |
if len(query_np.shape) == 1: | |
query_np = query_np.reshape(1, -1) | |
if len(chunk_np.shape) == 1: | |
chunk_np = chunk_np.reshape(1, -1) | |
similarity = cosine_similarity(query_np, chunk_np)[0][0] | |
similarities.append(similarity) | |
# Get top 3 most similar chunks | |
top_k = 3 | |
top_indices = np.argsort(similarities)[-top_k:][::-1] | |
relevant_chunks = [chunks[i]["text"] for i in top_indices] | |
context_text = "\n\n".join(relevant_chunks) | |
# Process with Groq | |
answer = process_with_groq(request.prompt, context_text) | |
# Prepare sources | |
sources = [{"text": chunks[i]["text"][:200] + "...", "position": chunks[i]["position"]} | |
for i in top_indices] | |
return { | |
"sources": sources, | |
"user_query": request.prompt, | |
"assistant_response": answer, | |
"file_source": f"supabase://{bucket_name}/{file_path}" | |
} | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.exception("Error occurred in RAG process") | |
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
async def extract_links(url: URL): | |
"""Extract unique links from a given URL""" | |
def extract_unique_links(url_string, max_retries=3, timeout=30): | |
for attempt in range(max_retries): | |
try: | |
headers = {'User-Agent': random.choice(user_agents)} | |
response = requests.get(url_string, headers=headers, timeout=timeout) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
base_url = urlparse(url_string) | |
base_url = f"{base_url.scheme}://{base_url.netloc}" | |
a_tags = soup.find_all('a', href=True) | |
links = [] | |
for a in a_tags: | |
href = a.get('href') | |
full_url = urljoin(base_url, href) | |
links.append(full_url) | |
unique_links = list(dict.fromkeys(links)) | |
unique_links.insert(0, url_string) | |
return unique_links | |
except requests.RequestException as e: | |
logger.warning(f"Attempt {attempt + 1} failed: {e}") | |
if attempt < max_retries - 1: | |
wait_time = 5 * (attempt + 1) | |
time.sleep(wait_time) | |
else: | |
logger.error(f"Failed to retrieve {url_string} after {max_retries} attempts.") | |
raise HTTPException(status_code=500, detail=f"Failed to retrieve {url_string} after {max_retries} attempts.") | |
return [] | |
try: | |
unique_links = extract_unique_links(url.url) | |
return {"unique_links": unique_links} | |
except Exception as e: | |
logger.exception("Error in extract_links") | |
raise HTTPException(status_code=500, detail=f"Failed to extract links: {str(e)}") | |
async def extract_text(urls: List[str]): | |
"""Extract text content from multiple URLs""" | |
if not supabase: | |
raise HTTPException(status_code=500, detail="Supabase not configured") | |
output_file = "extracted_text.txt" | |
def upload_text_content(filename, content, bucket_name): | |
try: | |
file_content = content.encode('utf-8') | |
# Try to upload first | |
try: | |
response = supabase.storage.from_(bucket_name).upload( | |
path=filename, | |
file=file_content, | |
file_options={"content-type": "text/plain"} | |
) | |
logger.info(f"Text file uploaded successfully: {filename}") | |
return response | |
except Exception as upload_error: | |
# If upload fails (file exists), try to update | |
try: | |
response = supabase.storage.from_(bucket_name).update( | |
path=filename, | |
file=file_content, | |
file_options={"content-type": "text/plain"} | |
) | |
logger.info(f"Text file updated successfully: {filename}") | |
return response | |
except Exception as update_error: | |
logger.error(f"Error updating text content: {update_error}") | |
raise HTTPException(status_code=500, detail="Failed to save file to storage") | |
except Exception as e: | |
logger.error(f"Error with file operations: {e}") | |
raise HTTPException(status_code=500, detail="Failed to save file to storage") | |
def text_data_extractor(links): | |
extracted_texts = [] | |
for link in links: | |
parsed_url = urlparse(link) | |
if not parsed_url.scheme: | |
logger.warning(f"Invalid URL: {link}") | |
continue | |
retries = 3 | |
while retries > 0: | |
try: | |
headers = {'User-Agent': random.choice(user_agents)} | |
response = requests.get(link, headers=headers, timeout=30) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
text = soup.get_text() | |
clean_text = ' '.join(text.split()) | |
extracted_texts.append({"url": link, "text": clean_text}) | |
break | |
except requests.RequestException as e: | |
retries -= 1 | |
logger.warning(f"Retry {3 - retries} for {link} failed: {e}") | |
if retries > 0: | |
wait_time = 5 * (3 - retries) | |
time.sleep(wait_time) | |
if retries == 0: | |
extracted_texts.append({ | |
"url": link, | |
"text": "Failed to retrieve text after multiple attempts." | |
}) | |
return extracted_texts | |
try: | |
extracted_data = text_data_extractor(urls) | |
string_output = json.dumps(extracted_data, ensure_ascii=False, indent=2) | |
# Upload to Supabase | |
upload_text_content(output_file, string_output, "url-2-ans-bucket") | |
return {"extracted_data": extracted_data, "file_saved": output_file} | |
except Exception as e: | |
logger.exception("Error in extract_text") | |
raise HTTPException(status_code=500, detail=f"Failed to extract text: {str(e)}") | |
# Main execution | |
if __name__ == "__main__": | |
# Run the FastAPI app | |
uvicorn.run( | |
"main_api:app", | |
host="0.0.0.0", | |
port=8000, | |
reload=False, # Disable reload for production | |
access_log=True | |
) |