Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """app | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1pwwcBb5Zlw1DA3u5K8W8mjrwBTBWXc1L | |
| """ | |
| import gradio as gr | |
| import numpy as np | |
| from transformers import pipeline | |
| import os | |
| import time | |
| import groq | |
| import uuid # For generating unique filenames | |
| # OLD Updated imports to address LangChain deprecation warnings: | |
| #from langchain_groq import ChatGroq | |
| #*from langchain.schema import HumanMessage | |
| #from langchain_core.messages import HumanMessage | |
| #*from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| #try: | |
| # For newer versions | |
| # from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| #except ImportError: | |
| # For older versions | |
| # from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| #from langchain_community.vectorstores import Chroma | |
| #from langchain_community.embeddings import HuggingFaceEmbeddings | |
| #*from langchain.docstore.document import Document | |
| #from langchain_core.documents import Document | |
| #from langchain.chains import RetrievalQA # This one might still be in main langchain | |
| # NEW IMPORTS (current): | |
| from langchain_core.messages import HumanMessage, SystemMessage, AIMessage | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_core.documents import Document | |
| from langchain_community.document_loaders import TextLoader, PyPDFLoader | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.llms import HuggingFaceHub | |
| #from langchain_community.chains import RetrievalQA | |
| #from langchain.chains.retrieval_qa.base import RetrievalQA # This one might still be in main langchain | |
| from langchain_community.vectorstores import Chroma #from old library | |
| from langchain_groq import ChatGroq | |
| # Importing chardet (make sure to add chardet to your requirements.txt) | |
| import chardet | |
| import fitz # PyMuPDF for PDFs | |
| import docx # python-docx for Word files | |
| import gtts # Google Text-to-Speech library | |
| from pptx import Presentation # python-pptx for PowerPoint files | |
| import re | |
| # Initialize Whisper model for speech-to-text | |
| transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-base.en") | |
| # Set API Key (Ensure it's stored securely in an environment variable) | |
| groq.api_key = os.getenv("GROQ_API_KEY") | |
| # Initialize Chat Model | |
| chat_model = ChatGroq(model_name="llama-3.3-70b-versatile", api_key=groq.api_key) | |
| # Initialize Embeddings and chromaDB | |
| os.makedirs("chroma_db", exist_ok=True) | |
| embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| vectorstore = Chroma( | |
| embedding_function=embedding_model, | |
| persist_directory="chroma_db" | |
| ) | |
| # Short-term memory for the LLM | |
| chat_memory = [] | |
| # Prompt for quiz generation with added remark | |
| quiz_prompt = """ | |
| You are an AI assistant specialized in education and assessment creation. Given an uploaded document or text, generate a quiz with a mix of multiple-choice questions (MCQs) and fill-in-the-blank questions. The quiz should be directly based on the key concepts, facts, and details from the provided material. | |
| Generate 20 Questions. | |
| Remove all unnecessary formatting generated by the LLM, including <think> tags, asterisks, markdown formatting, and any bold or italic text, as well as **, ###, ##, and # tags. | |
| For each question: | |
| - Provide 4 answer choices (for MCQs), with only one correct answer. | |
| - Ensure fill-in-the-blank questions focus on key terms, phrases, or concepts from the document. | |
| - Include an answer key for all questions. | |
| - Ensure questions vary in difficulty and encourage comprehension rather than memorization. | |
| - Additionally, implement an instant feedback mechanism: | |
| - When a user selects an answer, indicate whether it is correct or incorrect. | |
| - If incorrect, provide a brief explanation from the document to guide learning. | |
| - Ensure responses are concise and educational to enhance understanding. | |
| Output Example: | |
| 1. Fill in the blank: The LLM Agent framework has a central decision-making unit called the _______________________. | |
| Answer: Agent Core | |
| Feedback: The Agent Core is the central component of the LLM Agent framework, responsible for managing goals, tool instructions, planning modules, memory integration, and agent persona. | |
| 2. What is the main limitation of LLM-based applications? | |
| a) Limited token capacity | |
| b) Lack of domain expertise | |
| c) Prone to hallucination | |
| d) All of the above | |
| Answer: d) All of the above | |
| Feedback: LLM-based applications have several limitations, including limited token capacity, lack of domain expertise, and being prone to hallucination, among others. | |
| 3. Given the following info, what is the value of P(jam|Rain)? | |
| P(no Rain) = 0.8; | |
| P(no Jam) = 0.2; | |
| P(Rain|Jam) = 0.1 | |
| a) 0.016 | |
| b) 0.025 | |
| c) 0.1 | |
| d) 0.4 | |
| Answer: d) 0.4 | |
| Feedback: This question tests understanding of Bayes' Theorem by requiring the calculation of conditional probability using the given values. | |
| """ | |
| # Function to clean AI response by removing unwanted formatting | |
| def clean_response(response): | |
| """Removes <think> tags, asterisks, and markdown formatting.""" | |
| cleaned_text = re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL) | |
| cleaned_text = re.sub(r"(\*\*|\*|\[|\])", "", cleaned_text) | |
| cleaned_text = re.sub(r"^##+\s*", "", cleaned_text, flags=re.MULTILINE) | |
| cleaned_text = re.sub(r"\\", "", cleaned_text) | |
| cleaned_text = re.sub(r"---", "", cleaned_text) | |
| return cleaned_text.strip() | |
| # Function to generate quiz based on content | |
| def generate_quiz(content): | |
| prompt = f"{quiz_prompt}\n\nDocument content:\n{content}" | |
| #response = chat_model([HumanMessage(content=prompt)]) | |
| # Use invoke method instead of direct calling | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) | |
| cleaned_response = clean_response(response.content) | |
| return cleaned_response | |
| # Function to retrieve relevant documents from vectorstore based on user query | |
| def retrieve_documents(query): | |
| results = vectorstore.similarity_search(query, k=3) | |
| return [doc.page_content for doc in results] | |
| # Function to convert tuple format to message format | |
| def convert_to_message_format(chat_history): | |
| """Convert from [(user, bot)] format to [{"role": "user", "content": user}, {"role": "assistant", "content": bot}] format""" | |
| message_format = [] | |
| for user_msg, bot_msg in chat_history: | |
| message_format.append({"role": "user", "content": user_msg}) | |
| message_format.append({"role": "assistant", "content": bot_msg}) | |
| return message_format | |
| # Function to convert message format to tuple format for processing | |
| def convert_to_tuple_format(chat_history): | |
| """Convert from message format back to tuple format for processing""" | |
| tuple_format = [] | |
| for i in range(0, len(chat_history), 2): | |
| if i+1 < len(chat_history): | |
| user_msg = chat_history[i]["content"] | |
| bot_msg = chat_history[i+1]["content"] | |
| tuple_format.append((user_msg, bot_msg)) | |
| return tuple_format | |
| # Function to handle chatbot interactions with short-term memory | |
| def chat_with_groq(user_input, chat_history): | |
| try: | |
| # Convert message format to tuple format for processing | |
| tuple_history = convert_to_tuple_format(chat_history) | |
| # Retrieve relevant documents for additional context | |
| relevant_docs = retrieve_documents(user_input) | |
| context = "\n".join(relevant_docs) if relevant_docs else "No relevant documents found." | |
| # Construct proper prompting with conversation history | |
| system_prompt = "You are a helpful AI assistant. Answer questions accurately and concisely." | |
| conversation_history = "\n".join(chat_memory[-10:]) # Keep the last 10 exchanges | |
| prompt = f"{system_prompt}\n\nConversation History:\n{conversation_history}\n\nUser Input: {user_input}\n\nContext:\n{context}" | |
| # Call the chat model | |
| #response = chat_model([HumanMessage(content=prompt)]) | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) # Call the chat model using invoke method | |
| # Clean response to remove any unwanted formatting | |
| cleaned_response_text = clean_response(response.content) | |
| # Append conversation history | |
| chat_memory.append(f"User: {user_input}") | |
| chat_memory.append(f"AI: {cleaned_response_text}") | |
| # Update chat history - add new messages in the correct format | |
| chat_history.append({"role": "user", "content": user_input}) | |
| chat_history.append({"role": "assistant", "content": cleaned_response_text}) | |
| # Convert response to speech | |
| audio_file = speech_playback(cleaned_response_text) | |
| return chat_history, "", audio_file | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| chat_history.append({"role": "user", "content": user_input}) | |
| chat_history.append({"role": "assistant", "content": error_msg}) | |
| return chat_history, "", None | |
| # Function to play response as speech using gTTS | |
| def speech_playback(text): | |
| try: | |
| # Generate a unique filename for each audio file | |
| unique_id = str(uuid.uuid4()) | |
| audio_file = f"output_audio_{unique_id}.mp3" | |
| # Convert text to speech | |
| tts = gtts.gTTS(text, lang='en') | |
| tts.save(audio_file) | |
| # Return the path to the audio file | |
| return audio_file | |
| except Exception as e: | |
| print(f"Error in speech_playback: {e}") | |
| return None | |
| # Function to detect encoding safely | |
| def detect_encoding(file_path): | |
| try: | |
| with open(file_path, "rb") as f: | |
| raw_data = f.read(4096) | |
| detected = chardet.detect(raw_data) | |
| encoding = detected["encoding"] | |
| return encoding if encoding else "utf-8" | |
| except Exception: | |
| return "utf-8" | |
| # Function to extract text from PDF | |
| def extract_text_from_pdf(pdf_path): | |
| try: | |
| doc = fitz.open(pdf_path) | |
| text = "\n".join([page.get_text("text") for page in doc]) | |
| return text if text.strip() else "No extractable text found." | |
| except Exception as e: | |
| return f"Error extracting text from PDF: {str(e)}" | |
| # Function to extract text from Word files (.docx) | |
| def extract_text_from_docx(docx_path): | |
| try: | |
| doc = docx.Document(docx_path) | |
| text = "\n".join([para.text for para in doc.paragraphs]) | |
| return text if text.strip() else "No extractable text found." | |
| except Exception as e: | |
| return f"Error extracting text from Word document: {str(e)}" | |
| # Function to extract text from PowerPoint files (.pptx) | |
| def extract_text_from_pptx(pptx_path): | |
| try: | |
| presentation = Presentation(pptx_path) | |
| text = "" | |
| for slide in presentation.slides: | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text"): | |
| text += shape.text + "\n" | |
| return text if text.strip() else "No extractable text found." | |
| except Exception as e: | |
| return f"Error extracting text from PowerPoint: {str(e)}" | |
| # Function to process documents safely | |
| def process_document(file): | |
| try: | |
| file_extension = os.path.splitext(file.name)[-1].lower() | |
| if file_extension in [".png", ".jpg", ".jpeg"]: | |
| return "Error: Images cannot be processed for text extraction." | |
| if file_extension == ".pdf": | |
| content = extract_text_from_pdf(file.name) | |
| elif file_extension == ".docx": | |
| content = extract_text_from_docx(file.name) | |
| elif file_extension == ".pptx": | |
| content = extract_text_from_pptx(file.name) | |
| else: | |
| encoding = detect_encoding(file.name) | |
| with open(file.name, "r", encoding=encoding, errors="replace") as f: | |
| content = f.read() | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) | |
| documents = [Document(page_content=chunk) for chunk in text_splitter.split_text(content)] | |
| vectorstore.add_documents(documents) | |
| quiz = generate_quiz(content) | |
| return f"Document processed successfully (File Type: {file_extension}). Quiz generated:\n{quiz}" | |
| except Exception as e: | |
| return f"Error processing document: {str(e)}" | |
| # Function to handle speech-to-text conversion | |
| #def transcribe_audio(audio): | |
| # sr, y = audio | |
| # if y.ndim > 1: | |
| # y = y.mean(axis=1) | |
| # y = y.astype(np.float32) | |
| # y /= np.max(np.abs(y)) | |
| # return transcriber({"sampling_rate": sr, "raw": y})["text"] | |
| #Quick Fixes You Can Try First: | |
| def transcribe_audio(audio): | |
| sr, y = audio | |
| if y.ndim > 1: | |
| y = y.mean(axis=1) | |
| y = y.astype(np.float32) | |
| # Improved normalization | |
| max_val = np.max(np.abs(y)) | |
| if max_val > 0: | |
| y /= max_val | |
| # Use better model | |
| better_transcriber = pipeline( | |
| "automatic-speech-recognition", | |
| model="openai/whisper-small.en", # More accurate | |
| chunk_length_s=30 | |
| ) | |
| return better_transcriber({"sampling_rate": sr, "raw": y})["text"] | |
| # the remaining is the same | |
| # Clear chat history function | |
| def clear_chat_history(): | |
| chat_memory.clear() | |
| return [], None | |
| def tutor_ai_chatbot(): | |
| """Main Gradio interface for the Tutor AI Chatbot.""" | |
| with gr.Blocks() as app: | |
| gr.Markdown("# 📚 AI Tutor - We.(POC)") | |
| gr.Markdown("An interactive Personal AI Tutor chatbot to help with your learning needs.") | |
| # Chatbot Tab | |
| with gr.Tab("AI Chatbot"): | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot(height=500, type="messages") | |
| with gr.Column(scale=1): | |
| audio_playback = gr.Audio(label="Audio Response", type="filepath") | |
| # Move the input controls here to span full width | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="Ask a question", | |
| placeholder="Type your question here...", | |
| container=False # Removes the default container styling | |
| ) | |
| submit = gr.Button("Send") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| audio_input = gr.Audio(type="numpy", label="Record or Upload Audio") | |
| # Voice recording tips - ONLY in AI Chatbot tab | |
| with gr.Accordion("🎤 Voice Recording Tips", open=False): | |
| gr.Markdown(""" | |
| **For better speech recognition accuracy:** | |
| - 🎙️ Speak clearly and at a moderate pace | |
| - 🔇 Record in a quiet environment | |
| - 📍 Keep the microphone close to your mouth (10-15 cm) | |
| - 🎧 Use a good quality microphone if possible | |
| - 📝 Review the transcribed text before sending | |
| - 🔄 If transcription is poor, try recording again or type manually | |
| """) | |
| # Clear chat history button | |
| clear_btn = gr.Button("Clear Chat") | |
| # Handle chat interaction | |
| submit.click( | |
| chat_with_groq, | |
| inputs=[msg, chatbot], | |
| outputs=[chatbot, msg, audio_playback] | |
| ) | |
| # Clear chat history function | |
| clear_btn.click( | |
| lambda: [], # Return empty list in message format | |
| inputs=None, | |
| outputs=[chatbot] | |
| ) | |
| # Also allow Enter key to submit | |
| msg.submit( | |
| chat_with_groq, | |
| inputs=[msg, chatbot], | |
| outputs=[chatbot, msg, audio_playback] | |
| ) | |
| # Add some examples of questions students might ask | |
| with gr.Accordion("Example Questions", open=False): | |
| gr.Examples( | |
| examples=[ | |
| "Can you explain the concept of RLHF AI?", | |
| "What are AI transformers?", | |
| "What is MoE AI?", | |
| "What's gate networks AI?", | |
| "I am making a switch, please generating baking recipe?" | |
| ], | |
| inputs=msg | |
| ) | |
| # Connect audio input to transcription | |
| audio_input.change(fn=transcribe_audio, inputs=audio_input, outputs=msg) | |
| # Upload Notes & Generate Quiz Tab | |
| with gr.Tab("Upload Notes & Generate Quiz"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| file_input = gr.File(label="Upload Lecture Notes (PDF, DOCX, PPTX)") | |
| with gr.Column(scale=3): | |
| quiz_output = gr.Textbox(label="Generated Quiz", lines=10) | |
| # Connect file input to document processing | |
| file_input.change(process_document, inputs=file_input, outputs=quiz_output) | |
| # Introduction Video Tab - Now with the working video | |
| with gr.Tab("Introduction Video"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Welcome to the Introduction Video") | |
| gr.Markdown("Music from Xu Mengyuan - China-O, musician Xu Mengyuan YUAN! | 徐梦圆 - China-O 音乐人徐梦圆YUAN!") | |
| # Use the local video file that's stored in your Space | |
| gr.Video("We_not_me_video.mp4", label="Introduction Video") | |
| # Launch the application | |
| app.launch(share=False) | |
| # Launch the AI chatbot | |
| if __name__ == "__main__": | |
| tutor_ai_chatbot() |