Spaces:
Running
Running
# app.py | |
import os | |
import uuid | |
import nltk | |
import trafilatura | |
import chromadb | |
import tiktoken | |
import gradio as gr | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.runnables import RunnableLambda, RunnablePassthrough | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_together import ChatTogether | |
from langchain_community.vectorstores import Chroma | |
from sentence_transformers import SentenceTransformer | |
from nltk.tokenize import sent_tokenize | |
from langchain_huggingface import HuggingFaceEmbeddings | |
# Download NLTK resources | |
nltk.download('punkt') | |
nltk.download('punkt_tab') | |
# Initialize tokenizer | |
tokenizer = tiktoken.get_encoding("cl100k_base") | |
# Initialize embedding model | |
embedding_model = SentenceTransformer("BAAI/bge-base-en-v1.5") | |
embedding_function = HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5") | |
# Initialize ChromaDB | |
chroma_client = chromadb.PersistentClient(path="./chroma_store") | |
collection = chroma_client.get_or_create_collection(name="imageonline_chunks") | |
# Sectioned URL List | |
url_dict = { | |
"Website Designing": [ | |
"https://www.imageonline.co.in/website-designing-mumbai.html", | |
"https://www.imageonline.co.in/domain-hosting-services-india.html", | |
"https://www.imageonline.co.in/best-seo-company-mumbai.html", | |
"https://www.imageonline.co.in/wordpress-blog-designing-india.html", | |
"https://www.imageonline.co.in/social-media-marketing-company-mumbai.html", | |
"https://www.imageonline.co.in/website-template-customization-india.html", | |
"https://www.imageonline.co.in/regular-website-maintanence-services.html", | |
"https://www.imageonline.co.in/mobile-app-designing-mumbai.html", | |
"https://www.imageonline.co.in/web-application-screen-designing.html" | |
], | |
"Website Development": [ | |
"https://www.imageonline.co.in/website-development-mumbai.html", | |
"https://www.imageonline.co.in/open-source-customization.html", | |
"https://www.imageonline.co.in/ecommerce-development-company-mumbai.html", | |
"https://www.imageonline.co.in/website-with-content-management-system.html", | |
"https://www.imageonline.co.in/web-application-development-india.html" | |
], | |
"Mobile App Development": [ | |
"https://www.imageonline.co.in/mobile-app-development-company-mumbai.html" | |
], | |
"About Us": [ | |
"https://www.imageonline.co.in/about-us.html", | |
"https://www.imageonline.co.in/vision.html", | |
"https://www.imageonline.co.in/team.html" | |
], | |
"Testimonials": [ | |
"https://www.imageonline.co.in/testimonial.html" | |
] | |
} | |
# Helper functions | |
def extract_clean_text(url): | |
try: | |
print(f"π Fetching URL: {url}") | |
downloaded = trafilatura.fetch_url(url) | |
if downloaded: | |
content = trafilatura.extract(downloaded, include_comments=False, include_tables=False) | |
print(f"β Extracted text from {url}") | |
return content | |
else: | |
print(f"β οΈ Failed to fetch content from {url}") | |
except Exception as e: | |
print(f"β Error fetching {url}: {e}") | |
return None | |
def chunk_text(text, max_tokens=400): | |
sentences = sent_tokenize(text) | |
chunks = [] | |
current_chunk = [] | |
for sentence in sentences: | |
current_chunk.append(sentence) | |
tokens = tokenizer.encode(" ".join(current_chunk)) | |
if len(tokens) > max_tokens: | |
current_chunk.pop() | |
chunks.append(" ".join(current_chunk).strip()) | |
current_chunk = [sentence] | |
if current_chunk: | |
chunks.append(" ".join(current_chunk).strip()) | |
print(f"π Text split into {len(chunks)} chunks.") | |
return chunks | |
# Check refresh override | |
force_refresh = os.getenv("FORCE_REFRESH", "false").lower() == "true" | |
# Load data into ChromaDB | |
if collection.count() == 0 or force_refresh: | |
print("π Loading documents into ChromaDB...") | |
for section, urls in url_dict.items(): | |
for url in urls: | |
text = extract_clean_text(url) | |
if not text: | |
continue | |
chunks = chunk_text(text) | |
embeddings = embedding_model.encode(chunks, convert_to_numpy=True) | |
metadatas = [{"source": url, "section": section} for _ in chunks] | |
ids = [str(uuid.uuid4()) for _ in chunks] | |
collection.add( | |
documents=chunks, | |
embeddings=embeddings.tolist(), | |
metadatas=metadatas, | |
ids=ids | |
) | |
print("β Document loading complete.") | |
else: | |
print("β Using existing ChromaDB collection.") | |
# Vectorstore & Retriever | |
vectorstore = Chroma( | |
client=chroma_client, | |
collection_name="imageonline_chunks", | |
embedding_function=embedding_function | |
) | |
retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) | |
# Together.ai LLM | |
llm = ChatTogether( | |
model="meta-llama/Llama-3-8b-chat-hf", | |
temperature=0.3, | |
max_tokens=1024, | |
top_p=0.7, | |
together_api_key=os.getenv("TOGETHER_API_KEY") | |
) | |
# Prompt template (refined) | |
prompt = ChatPromptTemplate.from_template(""" | |
You are a helpful assistant for ImageOnline Web Solutions. | |
Use ONLY the information provided in the context to answer the user's query. | |
Context: | |
{context} | |
Question: | |
{question} | |
If the answer is not found in the context, say "I'm sorry, I don't have enough information to answer that." | |
""") | |
# Context retrieval | |
def retrieve_and_format(query): | |
docs = retriever.get_relevant_documents(query) | |
context_strings = [] | |
for doc in docs: | |
content = doc.page_content | |
metadata = doc.metadata | |
source = metadata.get("source", "") | |
section = metadata.get("section", "") | |
context_strings.append(f"[{section}] {content}\n(Source: {source})") | |
return "\n\n".join(context_strings) | |
# RAG chain | |
rag_chain = ( | |
{"context": RunnableLambda(retrieve_and_format), "question": RunnablePassthrough()} | |
| prompt | |
| llm | |
| StrOutputParser() | |
) | |
# Gradio Interface | |
def chat_interface(message, history): | |
history = history or [] | |
history.append(("π§ You: " + message, "β³ Generating response...")) | |
try: | |
answer = rag_chain.invoke(message) | |
history[-1] = ("π§ You: " + message, "π€ Bot: " + answer) | |
except Exception as e: | |
error_msg = f"β οΈ Error: {str(e)}" | |
history[-1] = ("π§ You: " + message, f"π€ Bot: {error_msg}") | |
return history, history | |
def launch_gradio(): | |
with gr.Blocks() as demo: | |
gr.Markdown("# π¬ ImageOnline RAG Chatbot") | |
gr.Markdown("Ask about Website Designing, App Development, SEO, Hosting, etc.") | |
chatbot = gr.Chatbot() | |
state = gr.State([]) | |
with gr.Row(): | |
msg = gr.Textbox(placeholder="Ask your question here...", show_label=False, scale=8) | |
send_btn = gr.Button("π¨ Send", scale=1) | |
msg.submit(chat_interface, inputs=[msg, state], outputs=[chatbot, state]) | |
send_btn.click(chat_interface, inputs=[msg, state], outputs=[chatbot, state]) | |
with gr.Row(): | |
clear_btn = gr.Button("π§Ή Clear Chat") | |
clear_btn.click(fn=lambda: ([], []), outputs=[chatbot, state]) | |
return demo | |
if __name__ == "__main__": | |
demo = launch_gradio() | |
demo.launch() | |