import chromadb import pandas as pd from sentence_transformers import SentenceTransformer from langchain.text_splitter import RecursiveCharacterTextSplitter import json import openai from openai import OpenAI import numpy as np import requests import chromadb from chromadb import Client from sentence_transformers import SentenceTransformer, util from langchain_community.embeddings import HuggingFaceEmbeddings from chromadb import Client from chromadb import PersistentClient import gradio as gr from transformers import AutoModelForCausalLM, AutoTokenizer import torch import os import requests import time import tempfile from langdetect import detect import nltk nltk.download('punkt') from nltk.tokenize import word_tokenize from rank_bm25 import BM25Okapi API_KEY = os.environ.get("OPENROUTER_API_KEY") # Load the Excel file df = pd.read_excel("web_documents.xlsx", engine='openpyxl') # Initialize Chroma Persistent Client client = chromadb.PersistentClient(path="./db") # Create (or get) the Chroma collection collection = client.get_or_create_collection( name="rag_web_db_cosine_full_documents", metadata={"hnsw:space": "cosine"} ) # Load the embedding model embedding_model = SentenceTransformer('sentence-transformers/paraphrase-MiniLM-L6-v2') # Initialize the text splitter text_splitter = RecursiveCharacterTextSplitter(chunk_size=1200, chunk_overlap=300) total_chunks = 0 # Process each row in the DataFrame for idx, row in df.iterrows(): content = str(row['Content']) # Just in case it’s not a string metadata_str = str(row['Metadata']) # Convert metadata string back to a dictionary (optional: keep it simple if needed) metadata = {"metadata": metadata_str} # Split content into chunks chunks = text_splitter.split_text(content) total_chunks += len(chunks) # Generate embeddings for each chunk chunk_embeddings = embedding_model.encode(chunks) # Add each chunk to the Chroma collection for i, chunk in enumerate(chunks): collection.add( documents=[chunk], metadatas=[metadata], ids=[f"{idx}_chunk_{i}"], embeddings=[chunk_embeddings[i]] ) # ---------------------- Config ---------------------- SIMILARITY_THRESHOLD = 0.75 client1 = OpenAI(base_url="https://openrouter.ai/api/v1", api_key=API_KEY) # Replace with your OpenRouter API key # ---------------------- Models ---------------------- semantic_model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") # Load QA Data with open("qa.json", "r", encoding="utf-8") as f: qa_data = json.load(f) qa_questions = list(qa_data.keys()) qa_answers = list(qa_data.values()) qa_embeddings = semantic_model.encode(qa_questions, convert_to_tensor=True) #-------------------------bm25--------------------------------- def detect_language(text): try: lang = detect(text) return 'french' if lang.startswith('fr') else 'english' except: return 'english' # default fallback def clean_and_tokenize(text, lang): tokens = word_tokenize(text.lower(), language=lang) try: stop_words = set(stopwords.words(lang)) return [t for t in tokens if t not in stop_words] except: return tokens # fallback if stopwords not found def rerank_with_bm25(docs, query): lang = detect_language(query) tokenized_docs = [clean_and_tokenize(doc['content'], lang) for doc in docs] bm25 = BM25Okapi(tokenized_docs) tokenized_query = clean_and_tokenize(query, lang) scores = bm25.get_scores(tokenized_query) top_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:2] return [docs[i] for i in top_indices] # ---------------------- History-Aware CAG ---------------------- def retrieve_from_cag(user_query): query_embedding = semantic_model.encode(user_query, convert_to_tensor=True) cosine_scores = util.cos_sim(query_embedding, qa_embeddings)[0] best_idx = int(np.argmax(cosine_scores)) best_score = float(cosine_scores[best_idx]) print(f"[CAG] Best score: {best_score:.4f} | Closest question: {qa_questions[best_idx]}") if best_score >= SIMILARITY_THRESHOLD: return qa_answers[best_idx], best_score # Only return the answer else: return None, best_score # ---------------------- History-Aware RAG ---------------------- def retrieve_from_rag(user_query): # Combine history with current query #history_context = " ".join([f"User: {msg[0]} Bot: {msg[1]}" for msg in chat_history]) + " " #full_query = history_context + user_query #full_query= user_query print("Searching in RAG with history context...") query_embedding = embedding_model.encode(user_query) results = collection.query(query_embeddings=[query_embedding], n_results=3) # Get top 5 first if not results or not results.get('documents'): return None # Build docs list documents = [] for i, content in enumerate(results['documents'][0]): metadata = results['metadatas'][0][i] documents.append({ "content": content.strip(), "metadata": metadata }) print(metadata) # Rerank with BM25 top_docs = rerank_with_bm25(documents, user_query) print("BM25-selected top 3 documents:", top_docs) return top_docs # ---------------------- Generation function (OpenRouter) ---------------------- def generate_via_openrouter(context, query, chat_history=None): print("\n--- Generating via OpenRouter ---") print("Context received:", context) prompt = f"""[INST] You are a Moodle expert assistant. Instructions: - Always respond in the same language as the question. - Use only the provided documents below to answer. - If the answer is not in the documents, simply say: "I don't know." / "Je ne sais pas." - Cite only the sources you use, indicated at the end of each document like (Source: https://example.com). Documents: {context} Question: {query} Answer: [/INST] """ try: response = client1.chat.completions.create( # model="mistralai/mistral-7b-instruct:free", model="mistralai/mistral-small-3.1-24b-instruct:free", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content.strip() except Exception as e: print(f"Erreur lors de la génération : {e}") return "Erreur lors de la génération." # ---------------------- Main Chatbot ---------------------- def chatbot(query, chat_history): print("\n==== New Query ====") print("User Query:", query) # Try to retrieve from CAG (cache) answer, score = retrieve_from_cag(query) if answer: print("Answer retrieved from CAG cache.") return answer # If not found, retrieve from RAG docs = retrieve_from_rag(query) if docs: context_blocks = [] for doc in docs: content = doc.get("content", "").strip() metadata = doc.get("metadata") or {} source = "Source inconnue" if isinstance(metadata, dict): source_field = metadata.get("metadata", "") if isinstance(source_field, str) and source_field.startswith("source:"): source = source_field.replace("source:", "").strip() context_blocks.append(f"{content}\n(Source: {source})") context = "\n\n".join(context_blocks) # Choose the generation backend (OpenRouter) response = generate_via_openrouter(context, query) # chat_history.append((query, response)) # Append the new question-answer pair to history return response else: print("No relevant documents found.") # chat_history.append((query, "Je ne sais pas.")) return "Je ne sais pas." # ---------------------- Gradio App ---------------------- def save_chat_to_file(chat_history): timestamp = time.strftime("%Y%m%d-%H%M%S") filename = f"chat_history_{timestamp}.json" # Create a temporary file temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, filename) # Write the chat history into the file with open(file_path, "w", encoding="utf-8") as f: json.dump(chat_history, f, ensure_ascii=False, indent=2) return file_path def ask(user_message, chat_history): if not user_message: return chat_history , chat_history, "" response = chatbot(user_message, chat_history) chat_history.append((user_message, response)) return chat_history , chat_history, "" # Initialize chat history with a welcome messageinitial_message = (None, "Hello, how can I help you with Moodle?") initial_message = (None, "Hello, how can I help you with Moodle?") with gr.Blocks(theme=gr.themes.Soft()) as demo: chat_history = gr.State([initial_message]) chatbot_ui = gr.Chatbot(value=[initial_message]) question = gr.Textbox(placeholder="Ask me anything about Moodle...", show_label=False) clear_button = gr.Button("Clear") save_button = gr.Button("Save Chat") question.submit(ask, [question, chat_history], [chatbot_ui, chat_history, question]) clear_button.click(lambda: ([initial_message], [initial_message], ""), None, [chatbot_ui, chat_history, question], queue=False) save_button.click(save_chat_to_file, [chat_history], gr.File(label="Download your chat history")) demo.queue() demo.launch(share=False)