| | import streamlit as st |
| | import os |
| | import PyPDF2 |
| | import docx |
| | from langchain.text_splitter import RecursiveCharacterTextSplitter |
| | from langchain_community.embeddings import HuggingFaceEmbeddings |
| | from langchain_community.vectorstores import Chroma |
| | from groq import Groq |
| | from langchain_core.prompts import PromptTemplate |
| | import json |
| | import random |
| | import plotly.graph_objects as go |
| | import plotly.express as px |
| | import pandas as pd |
| | from datetime import datetime |
| |
|
| | |
| |
|
| | class DocumentProcessor: |
| | def __init__(self): |
| | |
| | self.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") |
| | self.text_splitter = RecursiveCharacterTextSplitter( |
| | chunk_size=1000, |
| | chunk_overlap=200 |
| | ) |
| | |
| | def extract_text_from_pdf(self, pdf_path): |
| | """Extract text from PDF file""" |
| | text = "" |
| | with open(pdf_path, 'rb') as file: |
| | pdf_reader = PyPDF2.PdfReader(file) |
| | for page in pdf_reader.pages: |
| | text += page.extract_text() |
| | return text |
| | |
| | def extract_text_from_docx(self, docx_path): |
| | """Extract text from DOCX file""" |
| | doc = docx.Document(docx_path) |
| | text = "" |
| | for paragraph in doc.paragraphs: |
| | text += paragraph.text + "\n" |
| | return text |
| | |
| | def process_document(self, file_path, file_type): |
| | """Process document and create vector store""" |
| | if file_type.lower() == 'pdf': |
| | text = self.extract_text_from_pdf(file_path) |
| | elif file_type.lower() in ['docx', 'doc']: |
| | text = self.extract_text_from_docx(file_path) |
| | else: |
| | raise ValueError("Unsupported file type") |
| | |
| | chunks = self.text_splitter.split_text(text) |
| | |
| | vectorstore = Chroma.from_texts( |
| | texts=chunks, |
| | embedding=self.embeddings |
| | ) |
| | |
| | return vectorstore, len(chunks) |
| |
|
| | class RAGLearningSystem: |
| | def __init__(self, vectorstore): |
| | |
| | if "GROQ_API_KEY" not in os.environ: |
| | st.error("Groq API key is required for generating responses.") |
| | st.stop() |
| | self.llm = Groq(api_key=os.environ["GROQ_API_KEY"]) |
| |
|
| | self.vectorstore = vectorstore |
| | self.retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) |
| | |
| | |
| | self.story_prompt = PromptTemplate( |
| | input_variables=["context", "topic"], |
| | template=""" |
| | Based on the following context from the book, explain {topic} as an engaging story. |
| | Make it educational yet entertaining, using metaphors, analogies, and narrative elements. |
| | |
| | Context: {context} |
| | |
| | Create a story explanation for {topic}: |
| | """ |
| | ) |
| | |
| | |
| | self.mcq_prompt = PromptTemplate( |
| | input_variables=["context", "topic"], |
| | template=""" |
| | Based on this context about {topic}, create 3 multiple choice questions. |
| | Format as JSON with structure: |
| | {{ |
| | "questions": [ |
| | {{ |
| | "question": "Question text", |
| | "options": ["A. Option 1", "B. Option 2", "C. Option 3", "D. Option 4"], |
| | "correct": "A", |
| | "explanation": "Why this answer is correct" |
| | }} |
| | ] |
| | }} |
| | |
| | Context: {context} |
| | """ |
| | ) |
| | |
| | self.fill_blank_prompt = PromptTemplate( |
| | input_variables=["context", "topic"], |
| | template=""" |
| | Based on this context about {topic}, create 3 fill-in-the-blank questions. |
| | Format as JSON with structure: |
| | {{ |
| | "questions": [ |
| | {{ |
| | "question": "Question with _____ blank", |
| | "answer": "correct answer", |
| | "hint": "helpful hint" |
| | }} |
| | ] |
| | }} |
| | |
| | Context: {context} |
| | """ |
| | ) |
| | |
| | self.match_prompt = PromptTemplate( |
| | input_variables=["context", "topic"], |
| | template=""" |
| | Based on this context about {topic}, create a matching exercise with 4 pairs. |
| | Format as JSON with structure: |
| | {{ |
| | "left_items": ["Item 1", "Item 2", "Item 3", "Item 4"], |
| | "right_items": ["Match A", "Match B", "Match C", "Match D"], |
| | "correct_matches": {{"Item 1": "Match A", "Item 2": "Match B", "Item 3": "Match C", "Item 4": "Match D"}} |
| | }} |
| | |
| | Context: {context} |
| | """ |
| | ) |
| | |
| | def get_story_explanation(self, topic): |
| | docs = self.retriever.get_relevant_documents(topic) |
| | context = "\n".join([doc.page_content for doc in docs]) |
| | |
| | response = self.llm.chat.completions.create( |
| | messages=[ |
| | { |
| | "role": "user", |
| | "content": self.story_prompt.format(context=context, topic=topic), |
| | } |
| | ], |
| | model="llama3-8b-8192", |
| | ) |
| | |
| | return response.choices[0].message.content |
| | |
| | def generate_mcq_questions(self, topic): |
| | docs = self.retriever.get_relevant_documents(topic) |
| | context = "\n".join([doc.page_content for doc in docs]) |
| | |
| | response = self.llm.chat.completions.create( |
| | messages=[ |
| | { |
| | "role": "user", |
| | "content": self.mcq_prompt.format(context=context, topic=topic), |
| | } |
| | ], |
| | model="llama3-8b-8192", |
| | response_format={"type": "json_object"}, |
| | ) |
| | |
| | try: |
| | return json.loads(response.choices[0].message.content) |
| | except json.JSONDecodeError: |
| | return {"questions": []} |
| | |
| | def generate_fill_blank_questions(self, topic): |
| | docs = self.retriever.get_relevant_documents(topic) |
| | context = "\n".join([doc.page_content for doc in docs]) |
| | |
| | response = self.llm.chat.completions.create( |
| | messages=[ |
| | { |
| | "role": "user", |
| | "content": self.fill_blank_prompt.format(context=context, topic=topic), |
| | } |
| | ], |
| | model="llama3-8b-8192", |
| | response_format={"type": "json_object"}, |
| | ) |
| | |
| | try: |
| | return json.loads(response.choices[0].message.content) |
| | except json.JSONDecodeError: |
| | return {"questions": []} |
| | |
| | def generate_matching_questions(self, topic): |
| | docs = self.retriever.get_relevant_documents(topic) |
| | context = "\n".join([doc.page_content for doc in docs]) |
| | |
| | response = self.llm.chat.completions.create( |
| | messages=[ |
| | { |
| | "role": "user", |
| | "content": self.match_prompt.format(context=context, topic=topic), |
| | } |
| | ], |
| | model="llama3-8b-8192", |
| | response_format={"type": "json_object"}, |
| | ) |
| | |
| | try: |
| | return json.loads(response.choices[0].message.content) |
| | except json.JSONDecodeError: |
| | return {"left_items": [], "right_items": [], "correct_matches": {}} |
| |
|
| | class LearningGames: |
| | def __init__(self): |
| | self.init_session_state() |
| | |
| | def init_session_state(self): |
| | if 'game_scores' not in st.session_state: |
| | st.session_state.game_scores = { |
| | 'mcq': [], |
| | 'fill_blank': [], |
| | 'matching': [] |
| | } |
| | |
| | if 'current_topic' not in st.session_state: |
| | st.session_state.current_topic = "" |
| | |
| | def play_mcq_game(self, questions, topic): |
| | st.subheader(f"๐ฏ Multiple Choice Quiz: {topic}") |
| | if not questions.get('questions'): |
| | st.error("No questions available for this topic.") |
| | return |
| | |
| | score = 0 |
| | total_questions = len(questions['questions']) |
| | with st.form("mcq_form"): |
| | answers = {} |
| | for i, q in enumerate(questions['questions']): |
| | st.write(f"**Question {i+1}:** {q['question']}") |
| | answers[i] = st.radio( |
| | f"Select answer for Q{i+1}:", |
| | q['options'], |
| | key=f"mcq_{i}" |
| | ) |
| | st.write("---") |
| | |
| | submitted = st.form_submit_button("Submit Quiz") |
| | if submitted: |
| | for i, q in enumerate(questions['questions']): |
| | selected = answers[i] |
| | correct = q['correct'] |
| | if selected.startswith(correct): |
| | score += 1 |
| | st.success(f"Q{i+1}: Correct! โ
") |
| | else: |
| | st.error(f"Q{i+1}: Wrong. Correct answer: {correct}") |
| | st.info(f"Explanation: {q.get('explanation', 'No explanation provided')}") |
| | |
| | percentage = (score / total_questions) * 100 |
| | st.write(f"**Final Score: {score}/{total_questions} ({percentage:.1f}%)**") |
| | st.session_state.game_scores['mcq'].append({ |
| | 'topic': topic, |
| | 'score': percentage, |
| | 'timestamp': datetime.now(), |
| | 'questions_attempted': total_questions |
| | }) |
| | return percentage |
| | |
| | def play_fill_blank_game(self, questions, topic): |
| | st.subheader(f"๐ Fill in the Blanks: {topic}") |
| | if not questions.get('questions'): |
| | st.error("No questions available for this topic.") |
| | return |
| | |
| | score = 0 |
| | total_questions = len(questions['questions']) |
| | with st.form("fill_blank_form"): |
| | answers = {} |
| | for i, q in enumerate(questions['questions']): |
| | st.write(f"**Question {i+1}:** {q['question']}") |
| | st.write(f"๐ก Hint: {q.get('hint', 'No hint available')}") |
| | answers[i] = st.text_input( |
| | f"Your answer for Q{i+1}:", |
| | key=f"fill_{i}" |
| | ) |
| | st.write("---") |
| | |
| | submitted = st.form_submit_button("Submit Answers") |
| | if submitted: |
| | for i, q in enumerate(questions['questions']): |
| | user_answer = answers[i].strip().lower() |
| | correct_answer = q['answer'].strip().lower() |
| | if user_answer == correct_answer: |
| | score += 1 |
| | st.success(f"Q{i+1}: Correct! โ
") |
| | else: |
| | st.error(f"Q{i+1}: Wrong. Correct answer: {q['answer']}") |
| | |
| | percentage = (score / total_questions) * 100 |
| | st.write(f"**Final Score: {score}/{total_questions} ({percentage:.1f}%)**") |
| | st.session_state.game_scores['fill_blank'].append({ |
| | 'topic': topic, |
| | 'score': percentage, |
| | 'timestamp': datetime.now(), |
| | 'questions_attempted': total_questions |
| | }) |
| | return percentage |
| | |
| | def play_matching_game(self, questions, topic): |
| | st.subheader(f"๐ Match the Following: {topic}") |
| | if not questions.get('left_items') or not questions.get('right_items'): |
| | st.error("No matching pairs available for this topic.") |
| | return |
| | |
| | left_items = questions['left_items'] |
| | right_items = questions['right_items'].copy() |
| | correct_matches = questions['correct_matches'] |
| | random.shuffle(right_items) |
| | |
| | score = 0 |
| | total_pairs = len(left_items) |
| | with st.form("matching_form"): |
| | matches = {} |
| | st.write("Match each item on the left with the correct item on the right:") |
| | for i, left_item in enumerate(left_items): |
| | matches[left_item] = st.selectbox( |
| | f"**{left_item}** matches with:", |
| | ["Select..."] + right_items, |
| | key=f"match_{i}" |
| | ) |
| | |
| | submitted = st.form_submit_button("Submit Matches") |
| | if submitted: |
| | for left_item, user_match in matches.items(): |
| | correct_match = correct_matches.get(left_item, "") |
| | if user_match == correct_match: |
| | score += 1 |
| | st.success(f"โ
{left_item} โ {user_match} (Correct!)") |
| | else: |
| | st.error(f"โ {left_item} โ {user_match} (Wrong! Correct: {correct_match})") |
| | |
| | percentage = (score / total_pairs) * 100 |
| | st.write(f"**Final Score: {score}/{total_pairs} ({percentage:.1f}%)**") |
| | st.session_state.game_scores['matching'].append({ |
| | 'topic': topic, |
| | 'score': percentage, |
| | 'timestamp': datetime.now(), |
| | 'questions_attempted': total_pairs |
| | }) |
| | return percentage |
| |
|
| | class LearningDashboard: |
| | def __init__(self): |
| | pass |
| | |
| | def show_dashboard(self): |
| | st.title("๐ Learning Analytics Dashboard") |
| | if not any(st.session_state.game_scores.values()): |
| | st.info("No learning data available yet. Complete some games to see your analytics!") |
| | return |
| | |
| | self.show_overall_stats() |
| | col1, col2 = st.columns(2) |
| | with col1: |
| | self.show_game_type_performance() |
| | with col2: |
| | self.show_topic_performance() |
| | self.show_progress_over_time() |
| | self.show_strengths_weaknesses() |
| | |
| | def show_overall_stats(self): |
| | st.subheader("๐ฏ Overall Performance") |
| | all_scores = [] |
| | for game_type, scores in st.session_state.game_scores.items(): |
| | for score_data in scores: |
| | all_scores.append({ |
| | 'game_type': game_type, |
| | 'score': score_data['score'], |
| | 'topic': score_data['topic'], |
| | 'timestamp': score_data['timestamp'] |
| | }) |
| | if not all_scores: |
| | return |
| | df = pd.DataFrame(all_scores) |
| | col1, col2, col3, col4 = st.columns(4) |
| | with col1: |
| | avg_score = df['score'].mean() |
| | st.metric("Average Score", f"{avg_score:.1f}%") |
| | with col2: |
| | total_games = len(df) |
| | st.metric("Games Played", total_games) |
| | with col3: |
| | best_score = df['score'].max() |
| | st.metric("Best Score", f"{best_score:.1f}%") |
| | with col4: |
| | unique_topics = df['topic'].nunique() |
| | st.metric("Topics Studied", unique_topics) |
| | |
| | def show_game_type_performance(self): |
| | st.subheader("๐ฎ Performance by Game Type") |
| | game_averages = {} |
| | for game_type, scores in st.session_state.game_scores.items(): |
| | if scores: |
| | avg_score = sum(score['score'] for score in scores) / len(scores) |
| | game_averages[game_type] = avg_score |
| | if game_averages: |
| | fig = go.Figure(data=[ |
| | go.Bar( |
| | x=list(game_averages.keys()), |
| | y=list(game_averages.values()), |
| | marker_color=['#FF6B6B', '#4ECDC4', '#45B7D1'] |
| | ) |
| | ]) |
| | fig.update_layout( |
| | title="Average Score by Game Type", |
| | xaxis_title="Game Type", |
| | yaxis_title="Average Score (%)", |
| | showlegend=False |
| | ) |
| | st.plotly_chart(fig, use_container_width=True) |
| | |
| | def show_topic_performance(self): |
| | st.subheader("๐ Performance by Topic") |
| | topic_scores = {} |
| | for game_type, scores in st.session_state.game_scores.items(): |
| | for score_data in scores: |
| | topic = score_data['topic'] |
| | if topic not in topic_scores: |
| | topic_scores[topic] = [] |
| | topic_scores[topic].append(score_data['score']) |
| | topic_averages = {topic: sum(scores)/len(scores) for topic, scores in topic_scores.items()} |
| | if topic_averages: |
| | fig = go.Figure(data=[ |
| | go.Bar( |
| | x=list(topic_averages.keys()), |
| | y=list(topic_averages.values()), |
| | marker_color='#96CEB4' |
| | ) |
| | ]) |
| | fig.update_layout( |
| | title="Average Score by Topic", |
| | xaxis_title="Topic", |
| | yaxis_title="Average Score (%)", |
| | showlegend=False |
| | ) |
| | st.plotly_chart(fig, use_container_width=True) |
| | |
| | def show_progress_over_time(self): |
| | st.subheader("๐ Progress Over Time") |
| | all_data = [] |
| | for game_type, scores in st.session_state.game_scores.items(): |
| | for score_data in scores: |
| | all_data.append({ |
| | 'timestamp': score_data['timestamp'], |
| | 'score': score_data['score'], |
| | 'game_type': game_type, |
| | 'topic': score_data['topic'] |
| | }) |
| | if all_data: |
| | df = pd.DataFrame(all_data) |
| | df = df.sort_values('timestamp') |
| | fig = px.line(df, x='timestamp', y='score', |
| | color='game_type', |
| | title="Score Progress Over Time", |
| | labels={'timestamp': 'Time', 'score': 'Score (%)'}) |
| | st.plotly_chart(fig, use_container_width=True) |
| | |
| | def show_strengths_weaknesses(self): |
| | st.subheader("๐ช Strengths & Areas for Improvement") |
| | game_averages = {} |
| | topic_averages = {} |
| | for game_type, scores in st.session_state.game_scores.items(): |
| | if scores: |
| | game_averages[game_type] = sum(score['score'] for score in scores) / len(scores) |
| | topic_scores = {} |
| | for game_type, scores in st.session_state.game_scores.items(): |
| | for score_data in scores: |
| | topic = score_data['topic'] |
| | if topic not in topic_scores: |
| | topic_scores[topic] = [] |
| | topic_scores[topic].append(score_data['score']) |
| | topic_averages = {topic: sum(scores)/len(scores) for topic, scores in topic_scores.items()} |
| | col1, col2 = st.columns(2) |
| | with col1: |
| | st.write("**๐ฏ Strengths:**") |
| | if game_averages: |
| | best_game = max(game_averages, key=game_averages.get) |
| | st.success(f"โข Excellent at {best_game} games ({game_averages[best_game]:.1f}% avg)") |
| | if topic_averages: |
| | best_topic = max(topic_averages, key=topic_averages.get) |
| | st.success(f"โข Strong understanding of {best_topic} ({topic_averages[best_topic]:.1f}% avg)") |
| | with col2: |
| | st.write("**๐ Areas for Improvement:**") |
| | if game_averages: |
| | weak_game = min(game_averages, key=game_averages.get) |
| | if game_averages[weak_game] < 80: |
| | st.warning(f"โข Practice {weak_game} games more ({game_averages[weak_game]:.1f}% avg)") |
| | if topic_averages: |
| | weak_topic = min(topic_averages, key=topic_averages.get) |
| | if topic_averages[weak_topic] < 80: |
| | st.warning(f"โข Review {weak_topic} concepts ({topic_averages[weak_topic]:.1f}% avg)") |
| | st.subheader("๐ Personalized Recommendations") |
| | if game_averages: |
| | overall_avg = sum(game_averages.values()) / len(game_averages) |
| | if overall_avg >= 90: |
| | st.success("๐ Excellent performance! You're mastering the material well.") |
| | elif overall_avg >= 75: |
| | st.info("๐ Good progress! Focus on your weaker areas to improve further.") |
| | else: |
| | st.warning("๐ Keep practicing! Consider reviewing the story explanations before attempting games.") |
| |
|
| | |
| |
|
| | def upload_and_process_page(doc_processor): |
| | st.header("๐ Process Your Learning Material") |
| | |
| | |
| | file_path = "ragdatascience.pdf" |
| | file_extension = "pdf" |
| | |
| | st.info(f"Processing the pre-uploaded file: `{file_path}`") |
| | |
| | if st.button("Process Document"): |
| | with st.spinner("Processing document..."): |
| | try: |
| | vectorstore, chunk_count = doc_processor.process_document( |
| | file_path, file_extension |
| | ) |
| | st.session_state.vectorstore = vectorstore |
| | st.session_state.document_name = file_path |
| | st.success(f"Document processed successfully! Created {chunk_count} text chunks.") |
| | st.info("You can now go to 'Learn Topic' to start learning!") |
| | except Exception as e: |
| | st.error(f"Error processing document: {str(e)}") |
| |
|
| | def learn_topic_page(rag_system): |
| | st.header("๐ Learn About Any Topic") |
| | topic = st.text_input("What would you like to learn about?", |
| | placeholder="e.g., machine learning algorithms, statistics, data visualization") |
| | if st.button("Get Story Explanation") and topic: |
| | with st.spinner("Generating story explanation..."): |
| | try: |
| | story = rag_system.get_story_explanation(topic) |
| | st.session_state.current_topic = topic |
| | st.subheader(f"๐ Story: {topic}") |
| | st.write(story) |
| | st.success("Story generated! Now you can test your understanding with games.") |
| | except Exception as e: |
| | st.error(f"Error generating explanation: {str(e)}") |
| |
|
| | def play_games_page(rag_system, games): |
| | st.header("๐ฎ Test Your Knowledge") |
| | topic = st.text_input("Enter topic to test:", |
| | value=st.session_state.get('current_topic', '')) |
| | if topic: |
| | game_type = st.selectbox("Choose game type:", |
| | ["Multiple Choice", "Fill in the Blanks", "Matching"]) |
| | if st.button("Generate Questions"): |
| | with st.spinner("Generating questions..."): |
| | try: |
| | if game_type == "Multiple Choice": |
| | questions = rag_system.generate_mcq_questions(topic) |
| | games.play_mcq_game(questions, topic) |
| | elif game_type == "Fill in the Blanks": |
| | questions = rag_system.generate_fill_blank_questions(topic) |
| | games.play_fill_blank_game(questions, topic) |
| | elif game_type == "Matching": |
| | questions = rag_system.generate_matching_questions(topic) |
| | games.play_matching_game(questions, topic) |
| | except Exception as e: |
| | st.error(f"Error generating questions: {str(e)}") |
| |
|
| | |
| | def main(): |
| | st.set_page_config( |
| | page_title="RAG Learning System", |
| | page_icon="๐ค", |
| | layout="wide" |
| | ) |
| | st.title("๐ค RAG Learning System") |
| | st.write("Upload your learning materials and start your interactive learning journey!") |
| | |
| | |
| | if "COHERE_API_KEY" not in os.environ or "GROQ_API_KEY" not in os.environ: |
| | st.error("API keys not found. Please add `COHERE_API_KEY` and `GROQ_API_KEY` as secrets in the Hugging Face Space settings.") |
| | st.stop() |
| |
|
| | doc_processor = DocumentProcessor() |
| | games = LearningGames() |
| | dashboard = LearningDashboard() |
| | |
| | st.sidebar.title("Navigation") |
| | page = st.sidebar.selectbox("Choose a page:", |
| | ["Process Document", "Learn Topic", "Play Games", "Dashboard"]) |
| | |
| | if page == "Process Document": |
| | upload_and_process_page(doc_processor) |
| | elif page == "Learn Topic": |
| | if 'vectorstore' in st.session_state: |
| | learn_topic_page(RAGLearningSystem(st.session_state.vectorstore)) |
| | else: |
| | st.warning("Please process a document first!") |
| | elif page == "Play Games": |
| | if 'vectorstore' in st.session_state: |
| | play_games_page(RAGLearningSystem(st.session_state.vectorstore), games) |
| | else: |
| | st.warning("Please process a document first!") |
| | elif page == "Dashboard": |
| | dashboard.show_dashboard() |
| |
|
| | if __name__ == "__main__": |
| | main() |