Spaces:
Running
Running
import gradio as gr | |
import PyPDF2 | |
import requests | |
from bs4 import BeautifulSoup | |
import re | |
import random | |
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM | |
import torch | |
from sentence_transformers import SentenceTransformer | |
import json | |
from typing import List, Dict, Tuple | |
import numpy as np | |
class InterviewBot: | |
def __init__(self): | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"Using device: {self.device}") | |
# Initialize models | |
self.init_models() | |
# Storage for content and questions | |
self.content_chunks = [] | |
self.questions = [] | |
self.current_question_idx = 0 | |
self.user_answers = [] | |
self.scores = [] | |
def init_models(self): | |
"""Initialize Hugging Face models for question generation and evaluation""" | |
try: | |
# Question generation model (lighter model for Colab) | |
print("Loading question generation model...") | |
self.qg_tokenizer = AutoTokenizer.from_pretrained("mrm8488/t5-base-finetuned-question-generation-ap") | |
self.qg_model = AutoModelForSeq2SeqLM.from_pretrained("mrm8488/t5-base-finetuned-question-generation-ap") | |
self.qg_model.to(self.device) | |
# Text similarity model for answer evaluation | |
print("Loading similarity model...") | |
self.similarity_model = SentenceTransformer('all-MiniLM-L6-v2') | |
# Text summarization for content processing | |
print("Loading summarization pipeline...") | |
self.summarizer = pipeline("summarization", | |
model="facebook/bart-large-cnn", | |
device=0 if self.device == "cuda" else -1) | |
print("All models loaded successfully!") | |
except Exception as e: | |
print(f"Error loading models: {e}") | |
# Fallback to smaller models | |
self.init_fallback_models() | |
def init_fallback_models(self): | |
"""Initialize smaller models if main models fail to load""" | |
print("Loading fallback models...") | |
self.qg_tokenizer = AutoTokenizer.from_pretrained("t5-small") | |
self.qg_model = AutoModelForSeq2SeqLM.from_pretrained("t5-small") | |
self.similarity_model = SentenceTransformer('all-MiniLM-L6-v2') | |
self.summarizer = pipeline("summarization", model="t5-small", device=-1) | |
def extract_pdf_text(self, pdf_file) -> str: | |
"""Extract text from uploaded PDF file""" | |
try: | |
import io | |
# Handle different input types from Gradio | |
if isinstance(pdf_file, bytes): | |
# If it's bytes, create a BytesIO object | |
pdf_stream = io.BytesIO(pdf_file) | |
elif hasattr(pdf_file, 'read'): | |
# If it's a file-like object, read it | |
pdf_stream = io.BytesIO(pdf_file.read()) | |
elif isinstance(pdf_file, str): | |
# If it's a file path, open it | |
with open(pdf_file, 'rb') as f: | |
pdf_stream = io.BytesIO(f.read()) | |
else: | |
# Try to use it directly | |
pdf_stream = pdf_file | |
pdf_reader = PyPDF2.PdfReader(pdf_stream) | |
text = "" | |
# Limit to first 50 pages for Colab memory constraints | |
max_pages = min(50, len(pdf_reader.pages)) | |
for page_num in range(max_pages): | |
page = pdf_reader.pages[page_num] | |
page_text = page.extract_text() | |
if page_text.strip(): # Only add non-empty pages | |
text += page_text + "\n" | |
if not text.strip(): | |
return "Error extracting PDF: No readable text found in the PDF" | |
return text | |
except Exception as e: | |
return f"Error extracting PDF: {str(e)}" | |
def extract_web_content(self, url: str) -> str: | |
"""Extract text content from web URL""" | |
try: | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
} | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
# Extract text from main content areas | |
content_tags = soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'article', 'div']) | |
text = "" | |
for tag in content_tags: | |
if tag.get_text().strip(): | |
text += tag.get_text().strip() + "\n" | |
return text | |
except Exception as e: | |
return f"Error extracting web content: {str(e)}" | |
def chunk_text(self, text: str, chunk_size: int = 1000) -> List[str]: | |
"""Split text into manageable chunks""" | |
# Clean text | |
text = re.sub(r'\s+', ' ', text).strip() | |
# Split into sentences | |
sentences = re.split(r'[.!?]+', text) | |
chunks = [] | |
current_chunk = "" | |
for sentence in sentences: | |
sentence = sentence.strip() | |
if not sentence: | |
continue | |
if len(current_chunk) + len(sentence) < chunk_size: | |
current_chunk += sentence + ". " | |
else: | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
current_chunk = sentence + ". " | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
return chunks | |
def generate_questions(self, chunks: List[str], num_questions: int = 10) -> List[Dict]: | |
"""Generate questions from text chunks""" | |
questions = [] | |
# Select diverse chunks | |
selected_chunks = random.sample(chunks, min(len(chunks), num_questions * 2)) | |
for i, chunk in enumerate(selected_chunks[:num_questions]): | |
try: | |
# Prepare input for question generation | |
input_text = f"generate question: {chunk}" | |
inputs = self.qg_tokenizer.encode(input_text, | |
return_tensors="pt", | |
max_length=512, | |
truncation=True) | |
inputs = inputs.to(self.device) | |
# Generate question | |
with torch.no_grad(): | |
outputs = self.qg_model.generate(inputs, | |
max_length=64, | |
num_beams=3, | |
do_sample=True, | |
temperature=0.7) | |
question = self.qg_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
# Extract key information for answer evaluation | |
key_phrases = self.extract_key_phrases(chunk) | |
questions.append({ | |
'question': question, | |
'context': chunk, | |
'key_phrases': key_phrases, | |
'question_id': i + 1 | |
}) | |
except Exception as e: | |
print(f"Error generating question {i+1}: {e}") | |
# Fallback question | |
questions.append({ | |
'question': f"What is the main point discussed in this section?", | |
'context': chunk, | |
'key_phrases': self.extract_key_phrases(chunk), | |
'question_id': i + 1 | |
}) | |
return questions | |
def extract_key_phrases(self, text: str) -> List[str]: | |
"""Extract key phrases from text for answer evaluation""" | |
# Simple keyword extraction | |
words = re.findall(r'\b[A-Za-z]{4,}\b', text.lower()) | |
# Remove common words | |
stop_words = {'this', 'that', 'with', 'have', 'will', 'from', 'they', 'been', | |
'were', 'said', 'each', 'which', 'their', 'time', 'would', 'there'} | |
key_words = [word for word in words if word not in stop_words] | |
# Get most frequent words | |
word_freq = {} | |
for word in key_words: | |
word_freq[word] = word_freq.get(word, 0) + 1 | |
# Return top keywords | |
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True) | |
return [word for word, freq in sorted_words[:10]] | |
def evaluate_answer(self, question_data: Dict, user_answer: str) -> float: | |
"""Evaluate user answer and return score (0-100)""" | |
if not user_answer.strip(): | |
return 0.0 | |
try: | |
# Method 1: Semantic similarity | |
context_embedding = self.similarity_model.encode([question_data['context']]) | |
answer_embedding = self.similarity_model.encode([user_answer]) | |
similarity_score = np.dot(context_embedding[0], answer_embedding[0]) / ( | |
np.linalg.norm(context_embedding[0]) * np.linalg.norm(answer_embedding[0]) | |
) | |
# Method 2: Keyword matching | |
user_words = set(re.findall(r'\b[A-Za-z]{3,}\b', user_answer.lower())) | |
key_phrases = set(question_data['key_phrases']) | |
keyword_overlap = len(user_words.intersection(key_phrases)) / max(len(key_phrases), 1) | |
# Combined score | |
final_score = (similarity_score * 0.7 + keyword_overlap * 0.3) * 100 | |
return min(100.0, max(0.0, final_score)) | |
except Exception as e: | |
print(f"Error evaluating answer: {e}") | |
# Fallback scoring | |
return len(user_answer.split()) * 5 if len(user_answer.split()) <= 20 else 100 | |
def process_content(self, pdf_file=None, web_url="", num_questions=10): | |
"""Process uploaded content and generate questions""" | |
try: | |
# Extract text based on input type | |
if pdf_file is not None: | |
text = self.extract_pdf_text(pdf_file) | |
source = "PDF" | |
elif web_url.strip(): | |
text = self.extract_web_content(web_url.strip()) | |
source = "Web URL" | |
else: | |
return "Please provide either a PDF file or a web URL.", "", "" | |
if text.startswith("Error"): | |
return text, "", "" | |
# Process text | |
self.content_chunks = self.chunk_text(text) | |
if not self.content_chunks: | |
return "No valid content found to generate questions.", "", "" | |
# Generate questions | |
self.questions = self.generate_questions(self.content_chunks, num_questions) | |
self.current_question_idx = 0 | |
self.user_answers = [] | |
self.scores = [] | |
if not self.questions: | |
return "Failed to generate questions from the content.", "", "" | |
summary = f"Successfully processed {source}!\n" | |
summary += f"Extracted {len(self.content_chunks)} content chunks\n" | |
summary += f"Generated {len(self.questions)} questions\n" | |
summary += "Click 'Start Quiz' to begin the interview!" | |
first_question = f"Question 1/{len(self.questions)}:\n{self.questions[0]['question']}" | |
return summary, first_question, "" | |
except Exception as e: | |
return f"Error processing content: {str(e)}", "", "" | |
def submit_answer(self, user_answer): | |
"""Submit answer and get next question""" | |
if not self.questions: | |
return "No quiz in progress. Please upload content first.", "", "No active quiz" | |
if self.current_question_idx >= len(self.questions): | |
return "Quiz completed!", "", self.get_final_results() | |
# Evaluate current answer | |
current_question = self.questions[self.current_question_idx] | |
score = self.evaluate_answer(current_question, user_answer) | |
self.user_answers.append(user_answer) | |
self.scores.append(score) | |
self.current_question_idx += 1 | |
# Prepare response | |
feedback = f"Answer {self.current_question_idx} submitted! Score: {score:.1f}/100\n\n" | |
if self.current_question_idx < len(self.questions): | |
# Next question | |
next_question = self.questions[self.current_question_idx] | |
question_text = f"Question {self.current_question_idx + 1}/{len(self.questions)}:\n{next_question['question']}" | |
return feedback, question_text, self.get_current_progress() | |
else: | |
# Quiz completed | |
return feedback + "Quiz completed!", "π Interview Complete!", self.get_final_results() | |
def get_current_progress(self): | |
"""Get current progress summary with dynamic tips""" | |
if not self.scores: | |
return "No answers submitted yet" | |
avg_score = sum(self.scores) / len(self.scores) | |
progress = f"Progress: {len(self.scores)}/{len(self.questions)} questions answered\n" | |
progress += f"Average Score: {avg_score:.1f}/100\n" | |
progress += f"Latest Score: {self.scores[-1]:.1f}/100\n\n" | |
# Add dynamic tips based on current performance | |
if len(self.scores) >= 3: | |
progress += self.get_live_tips() | |
return progress | |
def get_live_tips(self): | |
"""Generate live tips during the quiz based on current performance""" | |
recent_scores = self.scores[-3:] # Last 3 answers | |
avg_recent = sum(recent_scores) / len(recent_scores) | |
tips = "π― **Live Performance Tips:**\n" | |
if avg_recent >= 80: | |
tips += "β’ Excellent work! Keep providing detailed answers\n" | |
elif avg_recent >= 60: | |
tips += "β’ Good progress! Try adding more specific examples\n" | |
elif avg_recent >= 40: | |
tips += "β’ Focus on key terms from the source material\n" | |
else: | |
tips += "β’ Take time to carefully read the content before answering\n" | |
# Trend analysis | |
if len(recent_scores) >= 2: | |
if recent_scores[-1] > recent_scores[-2]: | |
tips += "β’ π Improving! You're on the right track\n" | |
elif recent_scores[-1] < recent_scores[-2]: | |
tips += "β’ π Refocus needed - slow down and think carefully\n" | |
return tips | |
def get_final_results(self): | |
"""Generate final quiz results with personalized feedback""" | |
if not self.scores: | |
return "No quiz completed yet" | |
total_questions = len(self.scores) | |
avg_score = sum(self.scores) / total_questions | |
# Performance categorization | |
if avg_score >= 80: | |
performance = "Excellent! π" | |
elif avg_score >= 60: | |
performance = "Good! π" | |
elif avg_score >= 40: | |
performance = "Fair π" | |
else: | |
performance = "Needs Improvement πͺ" | |
results = f""" | |
π― INTERVIEW RESULTS π― | |
Total Questions: {total_questions} | |
Average Score: {avg_score:.1f}/100 | |
Performance: {performance} | |
π Detailed Scores: | |
""" | |
for i, score in enumerate(self.scores, 1): | |
results += f"Question {i}: {score:.1f}/100\n" | |
results += f"\nπ Best Score: {max(self.scores):.1f}/100" | |
results += f"\nπ Lowest Score: {min(self.scores):.1f}/100" | |
# Personalized improvement suggestions | |
results += "\n\n" + self.get_personalized_tips() | |
return results | |
def get_personalized_tips(self): | |
"""Generate personalized tips based on performance patterns""" | |
if not self.scores: | |
return "π‘ Complete a quiz to get personalized tips!" | |
avg_score = sum(self.scores) / len(self.scores) | |
low_scores = [s for s in self.scores if s < 50] | |
high_scores = [s for s in self.scores if s >= 80] | |
tips = "π‘ **Personalized Improvement Tips:**\n" | |
# Analyze performance patterns | |
if avg_score >= 80: | |
tips += "π **Excellent Performance!**\n" | |
tips += "β’ You're demonstrating strong comprehension\n" | |
tips += "β’ Continue providing detailed, context-rich answers\n" | |
tips += "β’ Try tackling more complex content to challenge yourself\n" | |
elif avg_score >= 60: | |
tips += "β **Good Foundation - Ready to Excel!**\n" | |
if len(low_scores) > 0: | |
tips += "β’ Focus on questions where you scored below 50 - review that content\n" | |
tips += "β’ Add more specific examples from the source material\n" | |
tips += "β’ Try to connect concepts across different sections\n" | |
elif avg_score >= 40: | |
tips += "π **Building Understanding - You're on the Right Track!**\n" | |
tips += "β’ Spend more time reading each question carefully\n" | |
tips += "β’ Include key terms and phrases from the original content\n" | |
tips += "β’ Structure answers with main points first, then details\n" | |
if len(low_scores) > len(self.scores) // 2: | |
tips += "β’ Consider re-reading the source material before answering\n" | |
else: | |
tips += "πͺ **Focus Areas for Improvement:**\n" | |
tips += "β’ Take time to thoroughly read the source content first\n" | |
tips += "β’ Look for main ideas and key concepts in each section\n" | |
tips += "β’ Practice paraphrasing content in your own words\n" | |
tips += "β’ Don't rush - quality over speed in your responses\n" | |
# Additional specific tips based on patterns | |
score_variance = max(self.scores) - min(self.scores) | |
if score_variance > 40: | |
tips += "β’ **Consistency Tip**: Your scores vary widely - focus on maintaining steady quality across all answers\n" | |
if len([s for s in self.scores if s < 30]) > 0: | |
tips += "β’ **Comprehension Tip**: Some very low scores suggest reviewing the content more carefully before answering\n" | |
# Answer length analysis (if we tracked that) | |
recent_trend = self.scores[-3:] if len(self.scores) >= 3 else self.scores | |
if len(recent_trend) > 1: | |
if recent_trend[-1] > recent_trend[0]: | |
tips += "β’ **Positive Trend**: Your recent answers are improving - keep up the momentum!\n" | |
elif recent_trend[-1] < recent_trend[0]: | |
tips += "β’ **Focus Needed**: Your recent scores are declining - take a moment to refocus\n" | |
return tips | |
# Initialize the bot | |
bot = InterviewBot() | |
# Create Gradio interface | |
def create_interface(): | |
with gr.Blocks(title="Interview Bot - PDF/Web Quiz Generator", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# π€ Interview Bot - PDF/Web Content Quiz Generator | |
Upload a PDF or provide a web URL to generate an interactive quiz for interview practice! | |
**Features:** | |
- Extracts content from PDFs (up to 50 pages) or web pages | |
- Generates intelligent questions using AI | |
- Evaluates your answers with semantic analysis | |
- Provides real-time scoring and feedback | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### π€ Content Input") | |
pdf_input = gr.File( | |
label="Upload PDF File", | |
file_types=[".pdf"] | |
) | |
web_input = gr.Textbox( | |
label="Or Enter Web URL", | |
placeholder="https://example.com/article", | |
lines=1 | |
) | |
num_questions = gr.Slider( | |
minimum=5, | |
maximum=20, | |
value=10, | |
step=1, | |
label="Number of Questions" | |
) | |
process_btn = gr.Button("π Process Content & Generate Quiz", variant="primary") | |
with gr.Column(scale=2): | |
gr.Markdown("### π Quiz Interface") | |
status_output = gr.Textbox( | |
label="Status", | |
lines=4, | |
value="Upload a PDF or enter a web URL to start!" | |
) | |
question_output = gr.Textbox( | |
label="Current Question", | |
lines=3, | |
value="Questions will appear here..." | |
) | |
answer_input = gr.Textbox( | |
label="Your Answer", | |
lines=4, | |
placeholder="Type your answer here..." | |
) | |
submit_btn = gr.Button("β Submit Answer", variant="secondary") | |
with gr.Row(): | |
progress_output = gr.Textbox( | |
label="Progress & Scoring", | |
lines=6, | |
value="Quiz progress will be shown here..." | |
) | |
# Event handlers | |
process_btn.click( | |
fn=bot.process_content, | |
inputs=[pdf_input, web_input, num_questions], | |
outputs=[status_output, question_output, progress_output] | |
) | |
submit_btn.click( | |
fn=bot.submit_answer, | |
inputs=[answer_input], | |
outputs=[status_output, question_output, progress_output] | |
).then( | |
fn=lambda: "", # Clear answer input after submission | |
outputs=[answer_input] | |
) | |
# Dynamic tips section | |
with gr.Row(): | |
tips_display = gr.Markdown(""" | |
### π‘ Dynamic Performance Tips: | |
Personalized tips will appear here based on your quiz performance! | |
Start answering questions to get customized feedback. | |
""") | |
# Update tips based on progress | |
def update_tips(): | |
if bot.scores: | |
return bot.get_personalized_tips() | |
else: | |
return """ | |
### π‘ Getting Started Tips: | |
- Read each question carefully | |
- Think about the main concepts from the source | |
- Provide specific, detailed answers | |
- Use terminology from the original content | |
""" | |
submit_btn.click( | |
fn=update_tips, | |
outputs=[tips_display] | |
) | |
return demo | |
# Launch the application | |
if __name__ == "__main__": | |
# Create and launch the interface | |
demo = create_interface() | |
# For Google Colab | |
demo.launch() | |
print("π Interview Bot is ready!") | |
print("Upload a PDF or enter a web URL to start generating your personalized quiz!") |