Spaces:
Sleeping
Sleeping
# import gradio as gr | |
# import pandas as pd | |
# import os | |
# import io | |
# import zipfile | |
# import shutil | |
# from bs4 import BeautifulSoup | |
# from typing import List, TypedDict | |
# from langchain_huggingface import HuggingFaceEmbeddings | |
# from langchain_community.vectorstores import Chroma | |
# from langchain_core.documents import Document | |
# from langchain_core.prompts import PromptTemplate | |
# from langchain_core.output_parsers import StrOutputParser | |
# from langchain_core.runnables import RunnablePassthrough | |
# from langchain_nvidia_ai_endpoints import ChatNVIDIA | |
# from langchain_community.tools.tavily_search import TavilySearchResults | |
# from langgraph.graph import END, StateGraph, START | |
# import chromadb | |
# # ... (Keep all necessary imports from section 1 here) | |
# def process_documents(folder_path): | |
# """Process documents from the uploaded folder.""" | |
# d = {"chunk": [], "url": []} | |
# for path in os.listdir(folder_path): | |
# if not path.endswith(".html"): # Skip non-HTML files | |
# continue | |
# url = "https://" + path.replace("=", "/") | |
# file_path = os.path.join(folder_path, path) | |
# with open(file_path, 'rb') as stream: | |
# content = stream.read().decode("utf-8") | |
# soup = BeautifulSoup(content, "html.parser") | |
# title = soup.find("title") | |
# title_text = title.string.replace(" | Dataiku", "") if title else "No Title" | |
# main_content = soup.find("main") | |
# text_content = main_content.get_text(strip=True) if main_content else soup.get_text(strip=True) | |
# full_content = f"{title_text}\n\n{text_content}" | |
# d["chunk"].append(full_content) | |
# d["url"].append(url) | |
# return pd.DataFrame(d) | |
# def setup_rag_system(folder_path): | |
# """Initialize the RAG system with the provided documents.""" | |
# # ... (Keep your existing setup_rag_system implementation here) | |
# return vector_store | |
# def create_workflow(vector_store): | |
# """Create the RAG workflow.""" | |
# # ... (Keep your existing workflow creation code here) | |
# return workflow.compile() | |
# def handle_upload(folder_files, csv_file): | |
# try: | |
# # Create temporary directory | |
# temp_dir = "temp_upload" | |
# os.makedirs(temp_dir, exist_ok=True) | |
# # Process document files | |
# doc_dir = os.path.join(temp_dir, "docs") | |
# os.makedirs(doc_dir, exist_ok=True) | |
# # Handle zip file or individual files | |
# for file in folder_files: | |
# if file.name.endswith('.zip'): | |
# with zipfile.ZipFile(io.BytesIO(file.read())) as zip_ref: | |
# zip_ref.extractall(doc_dir) | |
# else: | |
# with open(os.path.join(doc_dir, file.name), "wb") as f: | |
# f.write(file.read()) | |
# # Process CSV requirements | |
# csv_content = csv_file.read() | |
# requirements_df = pd.read_csv(io.BytesIO(csv_content), encoding='latin-1') | |
# requirements = requirements_df.iloc[:, 0].tolist() # Get first column | |
# # Setup RAG system | |
# vector_store = setup_rag_system(doc_dir) | |
# app = create_workflow(vector_store) | |
# # Process requirements | |
# results = [] | |
# for question in requirements: | |
# inputs = {"question": question} | |
# output = app.invoke(inputs) | |
# results.append({ | |
# "Requirement": question, | |
# "Response": output.get("generation", "No response generated") | |
# }) | |
# # Cleanup | |
# shutil.rmtree(temp_dir) | |
# return pd.DataFrame(results) | |
# except Exception as e: | |
# return pd.DataFrame({"Error": [str(e)]}) | |
# def create_gradio_interface(): | |
# iface = gr.Interface( | |
# fn=handle_upload, | |
# inputs=[ | |
# gr.File(file_count="multiple", label="Upload Documents (ZIP or HTML files)"), | |
# gr.File(label="Upload Requirements CSV", type="binary") | |
# ], | |
# outputs=gr.Dataframe(), | |
# title="RAG System for RFP Analysis", | |
# description="Upload documents (ZIP or HTML files) and a CSV file with requirements." | |
# ) | |
# return iface | |
# if __name__ == "__main__": | |
# iface = create_gradio_interface() | |
# iface.launch() | |
import gradio as gr | |
import pandas as pd | |
import os | |
import torch | |
import zipfile | |
import tempfile | |
import shutil | |
from bs4 import BeautifulSoup | |
from typing import List, TypedDict | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import Chroma | |
from langchain_core.documents import Document | |
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.runnables import RunnablePassthrough | |
from langchain_nvidia_ai_endpoints import ChatNVIDIA | |
from langchain_core.pydantic_v1 import BaseModel, Field | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langgraph.graph import END, StateGraph, START | |
import chromadb | |
import io | |
# Environment variables setup | |
os.environ["TAVILY_API_KEY"] = "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" | |
os.environ["NVIDIA_API_KEY"] = "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" | |
os.environ["LANGCHAIN_PROJECT"] = "RAG project" | |
class GradeDocuments(BaseModel): | |
"""Binary score for relevance check on retrieved documents.""" | |
binary_score: str = Field(description="Documents are relevant to the question, 'yes' or 'no'") | |
class GraphState(TypedDict): | |
"""Represents the state of our graph.""" | |
question: str | |
generation: str | |
decision: str | |
documents: List[str] | |
import os | |
from bs4 import BeautifulSoup | |
import pandas as pd | |
def process_documents(temp_dir): | |
"""Process documents from the extracted zip folder with enhanced error handling.""" | |
d = {"chunk": [], "url": []} | |
# Debug information | |
print(f"Scanning directory: {temp_dir}") | |
file_count = 0 | |
processed_count = 0 | |
error_count = 0 | |
# Recursively traverse the directory | |
for root, dirs, files in os.walk(temp_dir): | |
for file_name in files: | |
file_count += 1 | |
file_path = os.path.join(root, file_name) | |
print(f"Processing file: {file_path}") | |
try: | |
# Try different encodings | |
encodings = ['utf-8', 'latin-1', 'cp1252'] | |
content = None | |
for encoding in encodings: | |
try: | |
with open(file_path, 'r', encoding=encoding) as stream: | |
content = stream.read() | |
break | |
except UnicodeDecodeError: | |
continue | |
if content is None: | |
print(f"Failed to read file {file_path} with any encoding") | |
error_count += 1 | |
continue | |
soup = BeautifulSoup(content, "html.parser") | |
title = soup.find("title") | |
title_text = title.string.replace(" | Dataiku", "") if title else "No Title" | |
main_content = soup.find("main") | |
text_content = main_content.get_text(strip=True) if main_content else soup.get_text(strip=True) | |
if not text_content.strip(): | |
print(f"No content extracted from {file_path}") | |
error_count += 1 | |
continue | |
full_content = f"{title_text}\n\n{text_content}" | |
d["chunk"].append(full_content) | |
d["url"].append("https://" + file_name.replace("=", "/")) | |
processed_count += 1 | |
print(f"Successfully processed {file_path}") | |
except Exception as e: | |
print(f"Error processing file {file_path}: {str(e)}") | |
error_count += 1 | |
continue | |
print(f"\nProcessing Summary:") | |
print(f"Total files found: {file_count}") | |
print(f"Successfully processed: {processed_count}") | |
print(f"Errors encountered: {error_count}") | |
if not d["chunk"]: | |
raise ValueError(f"No valid documents were processed. Processed {file_count} files with {error_count} errors.") | |
return pd.DataFrame(d) | |
# The rest of the code remains the same... | |
def setup_rag_system(temp_dir): | |
"""Initialize the RAG system with the provided documents.""" | |
# Initialize embedding model | |
model_name = "dunzhang/stella_en_1.5B_v5" | |
model_kwargs = {'trust_remote_code': 'True'} | |
embedding_model = HuggingFaceEmbeddings( | |
model_name=model_name, | |
show_progress=True, | |
model_kwargs=model_kwargs | |
) | |
# Process documents | |
df = process_documents(temp_dir) | |
if df.empty: | |
raise ValueError("No valid documents were processed") | |
df["chunk_id"] = range(len(df)) | |
# Create documents list | |
list_of_documents = [ | |
Document( | |
page_content=record['chunk'], | |
metadata={"source_url": record['url']} | |
) | |
for record in df[['chunk', 'url']].to_dict(orient='records') | |
] | |
# Setup vector store | |
ids = [str(i) for i in df['chunk_id'].to_list()] | |
client = chromadb.PersistentClient(path=tempfile.mkdtemp()) | |
vector_store = Chroma( | |
client=client, | |
collection_name="rag-chroma", | |
embedding_function=embedding_model, | |
) | |
# Add documents in batches | |
batch_size = 100 | |
for i in range(0, len(list_of_documents), batch_size): | |
end_idx = min(i + batch_size, len(list_of_documents)) | |
vector_store.add_documents( | |
documents=list_of_documents[i:end_idx], | |
ids=ids[i:end_idx] | |
) | |
return vector_store | |
def create_workflow(vector_store): | |
"""Create the RAG workflow.""" | |
retriever = vector_store.as_retriever(search_kwargs={"k": 7}) | |
llm = ChatNVIDIA(model="meta/llama-3.3-70b-instruct", temperature=0) | |
rag_prompt = PromptTemplate.from_template( | |
"""You are an assistant for responding to Request For Proposal documents for a | |
bidder in the field of Data Science and Engineering. Use the following pieces | |
of retrieved context to respond to the requests. If you don't know the answer, | |
just say that you don't know. Provide detailed responses with specific examples | |
and capabilities where possible. | |
Question: {question} | |
Context: {context} | |
Answer:""" | |
) | |
def format_docs(result): | |
return "\n\n".join(doc.page_content for doc in result) | |
rag_chain = ( | |
{"context": retriever | format_docs, "question": RunnablePassthrough()} | |
| rag_prompt | |
| llm | |
| StrOutputParser() | |
) | |
return rag_chain | |
def preprocess_csv(csv_file): | |
"""Preprocess the CSV file to ensure proper format.""" | |
try: | |
# First try reading as is | |
df = pd.read_csv(csv_file.name, encoding='latin-1') | |
# If there's only one column and no header | |
if len(df.columns) == 1 and df.columns[0] != 'requirement': | |
# Read again with no header and assign column name | |
df = pd.read_csv(csv_file.name, encoding='latin-1', header=None, names=['requirement']) | |
# If there's no 'requirement' column, assume first column is requirements | |
if 'requirement' not in df.columns: | |
df = df.rename(columns={df.columns[0]: 'requirement'}) | |
return df | |
except Exception as e: | |
# If standard CSV reading fails, try reading as plain text | |
try: | |
with open(csv_file.name, 'r', encoding='latin-1') as f: | |
requirements = f.read().strip().split('\n') | |
return pd.DataFrame({'requirement': requirements}) | |
except Exception as e2: | |
raise ValueError(f"Could not process CSV file: {str(e2)}") | |
def handle_upload(zip_file, csv_file): | |
"""Handle file uploads and process requirements with enhanced error handling.""" | |
try: | |
# Create temporary directory | |
temp_dir = tempfile.mkdtemp() | |
print(f"Created temporary directory: {temp_dir}") | |
try: | |
# Extract zip file | |
print(f"Extracting ZIP file: {zip_file.name}") | |
with zipfile.ZipFile(zip_file.name, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
print(f"ZIP contents: {zip_ref.namelist()}") | |
# Process documents | |
print("Processing documents...") | |
df = process_documents(temp_dir) | |
print(f"Processed {len(df)} documents") | |
# Preprocess and read requirements CSV | |
print("Processing CSV file...") | |
requirements_df = preprocess_csv(csv_file) | |
print(f"Found {len(requirements_df)} requirements") | |
# Setup RAG system | |
print("Setting up RAG system...") | |
vector_store = setup_rag_system(temp_dir) | |
rag_chain = create_workflow(vector_store) | |
# Process requirements | |
results = [] | |
for idx, req in enumerate(requirements_df['requirement'], 1): | |
print(f"Processing requirement {idx}/{len(requirements_df)}") | |
try: | |
response = rag_chain.invoke(req) | |
results.append({ | |
'requirement': req, | |
'response': response | |
}) | |
except Exception as e: | |
error_msg = f"Error processing requirement: {str(e)}" | |
print(error_msg) | |
results.append({ | |
'requirement': req, | |
'response': error_msg | |
}) | |
return pd.DataFrame(results) | |
finally: | |
# Cleanup | |
print(f"Cleaning up temporary directory: {temp_dir}") | |
shutil.rmtree(temp_dir) | |
except Exception as e: | |
error_msg = f"Processing error: {str(e)}" | |
print(error_msg) | |
return pd.DataFrame([{'error': error_msg}]) | |
def main(): | |
"""Main function to run the Gradio interface.""" | |
iface = gr.Interface( | |
fn=handle_upload, | |
inputs=[ | |
gr.File(label="Upload ZIP folder containing URLs", file_types=[".zip"]), | |
gr.File(label="Upload Requirements CSV", file_types=[".csv", ".txt"]) | |
], | |
outputs=gr.Dataframe(), | |
title="RAG System for RFP Analysis", | |
description="""Upload a ZIP folder containing URL documents and a CSV file with requirements to analyze. | |
The CSV file should contain requirements either as a single column or with a 'requirement' column header.""", | |
examples=[], | |
cache_examples=False | |
) | |
iface.launch(share=True) | |
if __name__ == "__main__": | |
main() |