| | from flask import Flask, render_template, request, jsonify, session, redirect, url_for, send_file |
| | from flask_cors import CORS |
| | import os |
| | import json |
| | import pandas as pd |
| | import re |
| | from datetime import datetime |
| | from werkzeug.utils import secure_filename |
| | from deep_translator import GoogleTranslator |
| | from langchain.text_splitter import RecursiveCharacterTextSplitter |
| | from langchain_community.document_loaders import WebBaseLoader |
| | from langchain_community.embeddings import HuggingFaceEmbeddings |
| | from langchain_community.vectorstores import Chroma |
| | from langchain.prompts import PromptTemplate |
| | from langchain.chains import RetrievalQA |
| | from transformers import pipeline |
| | import chromadb |
| | from chromadb.config import Settings |
| |
|
| | app = Flask(__name__) |
| | app.secret_key = 'your-secret-key-here' |
| | CORS(app) |
| | app.config['UPLOAD_FOLDER'] = 'uploads' |
| | app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 |
| | app.config['ALLOWED_EXTENSIONS'] = {'xlsx'} |
| |
|
| | |
| | chat_history = [] |
| | query_logs = [] |
| | keyword_responses = {} |
| | vector_store = None |
| | qa_chain = None |
| | llm_pipeline = None |
| |
|
| | |
| | SASTRA_URLS = [ |
| | "https://www.sastra.edu/", |
| | "https://www.sastra.edu/admissions/", |
| | "https://www.sastra.edu/academics/", |
| | "https://www.sastra.edu/placements/", |
| | "https://www.sastra.edu/facilities/", |
| | ] |
| |
|
| | def allowed_file(filename): |
| | return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] |
| |
|
| | def clean_llm_output(text): |
| | """Clean LLM output for display""" |
| | if not text: |
| | return "" |
| | |
| | |
| | text = re.sub(r'<.*?>', '', text) |
| | text = re.sub(r'\s+', ' ', text) |
| | text = re.sub(r'^\s*\.\s*', '', text) |
| | |
| | |
| | url_pattern = r'(https?://[^\s]+)' |
| | text = re.sub(url_pattern, r'<a href="\1" target="_blank">\1</a>', text) |
| | |
| | return text.strip() |
| |
|
| | def format_response(response_text, query_lang="en"): |
| | """Format the response text""" |
| | if not response_text: |
| | return "I couldn't find an answer to your question. Please try rephrasing." |
| | |
| | |
| | formatted = clean_llm_output(response_text) |
| | |
| | |
| | greetings = ["hello", "hi", "hey", "greetings"] |
| | if any(greet in formatted.lower() for greet in greetings): |
| | return f"Hello! {formatted}" |
| | |
| | return formatted |
| |
|
| | def translate_text(text, target_lang="en", source_lang="auto"): |
| | """Translate text using deep-translator""" |
| | try: |
| | if target_lang == "en": |
| | return text |
| | translator = GoogleTranslator(source=source_lang, target=target_lang) |
| | return translator.translate(text) |
| | except Exception as e: |
| | print(f"Translation error: {e}") |
| | return text |
| |
|
| | def load_excel_data(filepath="training_data.xlsx"): |
| | """Load keyword-response pairs from Excel""" |
| | try: |
| | df = pd.read_excel(filepath) |
| | keyword_dict = {} |
| | for _, row in df.iterrows(): |
| | keyword = str(row.get('keyword', '')).lower().strip() |
| | response = str(row.get('response', '')).strip() |
| | if keyword and response: |
| | keyword_dict[keyword] = response |
| | return keyword_dict |
| | except Exception as e: |
| | print(f"Error loading Excel data: {e}") |
| | return {} |
| |
|
| | def initialize_model(): |
| | """Initialize the RAG pipeline""" |
| | global vector_store, qa_chain, llm_pipeline, keyword_responses |
| | |
| | try: |
| | |
| | keyword_responses = load_excel_data() |
| | |
| | |
| | embeddings = HuggingFaceEmbeddings( |
| | model_name="sentence-transformers/all-MiniLM-L6-v2" |
| | ) |
| | |
| | |
| | print("Loading web documents...") |
| | documents = [] |
| | for url in SASTRA_URLS: |
| | try: |
| | loader = WebBaseLoader(url) |
| | docs = loader.load() |
| | documents.extend(docs) |
| | print(f"Loaded {len(docs)} documents from {url}") |
| | except Exception as e: |
| | print(f"Error loading {url}: {e}") |
| | |
| | |
| | text_splitter = RecursiveCharacterTextSplitter( |
| | chunk_size=1000, |
| | chunk_overlap=200 |
| | ) |
| | splits = text_splitter.split_documents(documents) |
| | |
| | |
| | print("Creating vector store...") |
| | vector_store = Chroma.from_documents( |
| | documents=splits, |
| | embedding=embeddings, |
| | persist_directory="./chroma_db" |
| | ) |
| | |
| | |
| | llm_pipeline = pipeline( |
| | "text2text-generation", |
| | model="google/flan-t5-base", |
| | max_length=512, |
| | temperature=0.3 |
| | ) |
| | |
| | |
| | class TransformersLLM: |
| | def __init__(self, pipeline): |
| | self.pipeline = pipeline |
| | |
| | def __call__(self, prompt): |
| | result = self.pipeline(prompt) |
| | return result[0]['generated_text'] |
| | |
| | llm = TransformersLLM(llm_pipeline) |
| | |
| | |
| | template = """Use the following context to answer the question. If you don't know the answer, say you don't know. Be concise and accurate. |
| | |
| | Context: {context} |
| | |
| | Question: {question} |
| | |
| | Answer: """ |
| | |
| | prompt = PromptTemplate( |
| | template=template, |
| | input_variables=["context", "question"] |
| | ) |
| | |
| | |
| | qa_chain = RetrievalQA.from_chain_type( |
| | llm=llm, |
| | chain_type="stuff", |
| | retriever=vector_store.as_retriever(search_kwargs={"k": 3}), |
| | chain_type_kwargs={"prompt": prompt} |
| | ) |
| | |
| | print("Model initialization complete!") |
| | return True |
| | |
| | except Exception as e: |
| | print(f"Error initializing model: {e}") |
| | return False |
| |
|
| | def log_query(user_query, response, lang, response_type): |
| | """Log query to JSON file""" |
| | try: |
| | log_entry = { |
| | "timestamp": datetime.now().isoformat(), |
| | "query": user_query, |
| | "response": response[:500], |
| | "language": lang, |
| | "response_type": response_type |
| | } |
| | |
| | |
| | if os.path.exists("query_logs.json"): |
| | with open("query_logs.json", "r") as f: |
| | logs = json.load(f) |
| | else: |
| | logs = [] |
| | |
| | |
| | logs.append(log_entry) |
| | |
| | |
| | with open("query_logs.json", "w") as f: |
| | json.dump(logs, f, indent=2) |
| | |
| | except Exception as e: |
| | print(f"Error logging query: {e}") |
| |
|
| | def get_analytics(): |
| | """Get analytics from query logs""" |
| | try: |
| | if not os.path.exists("query_logs.json"): |
| | return { |
| | "total_queries": 0, |
| | "top_questions": [], |
| | "language_distribution": {}, |
| | "response_types": {} |
| | } |
| | |
| | with open("query_logs.json", "r") as f: |
| | logs = json.load(f) |
| | |
| | total_queries = len(logs) |
| | |
| | |
| | lang_dist = {} |
| | response_types = {} |
| | |
| | for log in logs: |
| | lang = log.get("language", "unknown") |
| | lang_dist[lang] = lang_dist.get(lang, 0) + 1 |
| | |
| | rtype = log.get("response_type", "unknown") |
| | response_types[rtype] = response_types.get(rtype, 0) + 1 |
| | |
| | |
| | top_questions = [log["query"] for log in logs[-10:]] |
| | |
| | return { |
| | "total_queries": total_queries, |
| | "top_questions": top_questions, |
| | "language_distribution": lang_dist, |
| | "response_types": response_types |
| | } |
| | |
| | except Exception as e: |
| | print(f"Error getting analytics: {e}") |
| | return {} |
| |
|
| | |
| | initialize_model() |
| |
|
| | |
| | @app.route('/') |
| | def index(): |
| | return render_template('index.html') |
| |
|
| | @app.route('/admin') |
| | def admin(): |
| | if not session.get('logged_in'): |
| | return redirect(url_for('login')) |
| | return render_template('admin.html') |
| |
|
| | @app.route('/login', methods=['GET', 'POST']) |
| | def login(): |
| | if request.method == 'POST': |
| | password = request.form.get('password') |
| | if password == 'admin123': |
| | session['logged_in'] = True |
| | return redirect(url_for('admin')) |
| | else: |
| | return render_template('login.html', error='Invalid password') |
| | return render_template('login.html') |
| |
|
| | @app.route('/logout') |
| | def logout(): |
| | session.pop('logged_in', None) |
| | return redirect(url_for('index')) |
| |
|
| | @app.route('/api/chat', methods=['POST']) |
| | def chat(): |
| | try: |
| | data = request.json |
| | user_message = data.get('message', '').strip() |
| | lang = data.get('language', 'en') |
| | |
| | if not user_message: |
| | return jsonify({'error': 'Empty message'}), 400 |
| | |
| | |
| | if lang != 'en': |
| | user_message_en = translate_text(user_message, target_lang="en", source_lang=lang) |
| | else: |
| | user_message_en = user_message |
| | |
| | response_type = "llm" |
| | response_text = "" |
| | |
| | |
| | user_lower = user_message_en.lower() |
| | for keyword, response in keyword_responses.items(): |
| | if keyword in user_lower: |
| | response_text = response |
| | response_type = "keyword" |
| | break |
| | |
| | |
| | if not response_text and qa_chain: |
| | try: |
| | result = qa_chain.run(user_message_en) |
| | response_text = result |
| | response_type = "rag" |
| | except Exception as e: |
| | print(f"RAG error: {e}") |
| | response_text = "I encountered an error processing your question. Please try again." |
| | response_type = "error" |
| | |
| | |
| | formatted_response = format_response(response_text) |
| | |
| | |
| | if lang != 'en': |
| | final_response = translate_text(formatted_response, target_lang=lang, source_lang="en") |
| | else: |
| | final_response = formatted_response |
| | |
| | |
| | log_query(user_message, final_response[:200], lang, response_type) |
| | |
| | |
| | chat_entry = { |
| | 'user': user_message, |
| | 'bot': final_response, |
| | 'lang': lang, |
| | 'timestamp': datetime.now().isoformat() |
| | } |
| | chat_history.append(chat_entry) |
| | |
| | return jsonify({ |
| | 'response': final_response, |
| | 'type': response_type |
| | }) |
| | |
| | except Exception as e: |
| | print(f"Chat error: {e}") |
| | return jsonify({'error': 'Internal server error'}), 500 |
| |
|
| | @app.route('/api/retrain', methods=['POST']) |
| | def retrain(): |
| | if not session.get('logged_in'): |
| | return jsonify({'error': 'Unauthorized'}), 401 |
| | |
| | try: |
| | |
| | if 'file' in request.files: |
| | file = request.files['file'] |
| | if file and allowed_file(file.filename): |
| | filename = secure_filename(file.filename) |
| | filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
| | file.save(filepath) |
| | |
| | |
| | global keyword_responses |
| | keyword_responses.update(load_excel_data(filepath)) |
| | |
| | |
| | success = initialize_model() |
| | |
| | if success: |
| | return jsonify({'message': 'Model retrained successfully!'}) |
| | else: |
| | return jsonify({'error': 'Failed to retrain model'}), 500 |
| | |
| | except Exception as e: |
| | print(f"Retrain error: {e}") |
| | return jsonify({'error': str(e)}), 500 |
| |
|
| | @app.route('/api/analytics') |
| | def get_analytics_data(): |
| | if not session.get('logged_in'): |
| | return jsonify({'error': 'Unauthorized'}), 401 |
| | |
| | analytics = get_analytics() |
| | return jsonify(analytics) |
| |
|
| | @app.route('/api/download_logs') |
| | def download_logs(): |
| | if not session.get('logged_in'): |
| | return jsonify({'error': 'Unauthorized'}), 401 |
| | |
| | if os.path.exists("query_logs.json"): |
| | return send_file("query_logs.json", as_attachment=True) |
| | else: |
| | return jsonify({'error': 'No logs found'}), 404 |
| |
|
| | @app.route('/api/chat_history') |
| | def get_chat_history(): |
| | return jsonify(chat_history[-50:]) |
| |
|
| | if __name__ == '__main__': |
| | |
| | os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) |
| | os.makedirs('static/css', exist_ok=True) |
| | os.makedirs('static/js', exist_ok=True) |
| | os.makedirs('templates', exist_ok=True) |
| | |
| | app.run(debug=True, port=5000) |