Spaces:
Runtime error
Runtime error
# Lectūra Research Demo: A Multi-Agent Tool for Self-taught Mastery. | |
# Author: Jaward Sesay | |
# © Lectūra Labs. All rights reserved. | |
import os | |
import json | |
import re | |
import gradio as gr | |
import asyncio | |
import logging | |
import torch | |
import zipfile | |
import shutil | |
import datetime | |
from serpapi import GoogleSearch | |
from pydantic import BaseModel | |
from autogen_agentchat.agents import AssistantAgent | |
from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination | |
from autogen_agentchat.teams import Swarm | |
from autogen_agentchat.ui import Console | |
from autogen_agentchat.messages import TextMessage, HandoffMessage, StructuredMessage | |
from autogen_ext.models.anthropic import AnthropicChatCompletionClient | |
from autogen_ext.models.openai import OpenAIChatCompletionClient | |
from autogen_ext.models.ollama import OllamaChatCompletionClient | |
from autogen_ext.models.azure import AzureAIChatCompletionClient | |
from azure.core.credentials import AzureKeyCredential | |
import traceback | |
import soundfile as sf | |
import tempfile | |
from pydub import AudioSegment | |
from TTS.api import TTS | |
import markdown | |
import PyPDF2 | |
import io | |
import copy | |
def get_instructor_name(speaker): | |
instructor_names = { | |
"feynman.mp3": "Professor Richard Feynman", | |
"einstein.mp3": "Professor Albert Einstein", | |
"samantha.mp3": "Professor Samantha", | |
"socrates.mp3": "Professor Socrates", | |
"professor_lectura_male.mp3": "Professor Lectūra" | |
} | |
return instructor_names.get(speaker, "Professor Lectūra") | |
# Set up logging | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
handlers=[ | |
logging.FileHandler("lecture_generation.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Set up environment | |
OUTPUT_DIR = os.path.join(os.getcwd(), "outputs") | |
UPLOAD_DIR = os.path.join(os.getcwd(), "uploads") | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
os.makedirs(UPLOAD_DIR, exist_ok=True) | |
logger.info(f"Using output directory: {OUTPUT_DIR}") | |
logger.info(f"Using upload directory: {UPLOAD_DIR}") | |
os.environ["COQUI_TOS_AGREED"] = "1" | |
# Initialize TTS model | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device) | |
logger.info("TTS model initialized on %s", device) | |
# Define model for slide data | |
class Slide(BaseModel): | |
title: str | |
content: str | |
class SlidesOutput(BaseModel): | |
slides: list[Slide] | |
# Search tool using SerpApi | |
def search_web(query: str, serpapi_key: str) -> str: | |
try: | |
params = { | |
"q": query, | |
"engine": "google", | |
"api_key": serpapi_key, | |
"num": 5 | |
} | |
search = GoogleSearch(params) | |
results = search.get_dict() | |
if "error" in results: | |
logger.error("SerpApi error: %s", results["error"]) | |
return None | |
if "organic_results" not in results or not results["organic_results"]: | |
logger.info("No search results found for query: %s", query) | |
return None | |
formatted_results = [] | |
for item in results["organic_results"][:5]: | |
title = item.get("title", "No title") | |
snippet = item.get("snippet", "No snippet") | |
link = item.get("link", "No link") | |
formatted_results.append(f"Title: {title}\nSnippet: {snippet}\nLink: {link}\n") | |
formatted_output = "\n".join(formatted_results) | |
logger.info("Successfully retrieved search results for query: %s", query) | |
return formatted_output | |
except Exception as e: | |
logger.error("Unexpected error during search: %s", str(e)) | |
return None | |
def create_search_web_with_key(serpapi_key: str): | |
def search_web_with_key(query: str) -> str: | |
return search_web(query, serpapi_key) | |
return search_web_with_key | |
# Custom renderer for slides - Markdown to HTML | |
def render_md_to_html(md_content: str) -> str: | |
try: | |
html_content = markdown.markdown(md_content, extensions=['extra', 'fenced_code', 'tables']) | |
return html_content | |
except Exception as e: | |
logger.error("Failed to render Markdown to HTML: %s", str(e)) | |
return "<div>Error rendering content</div>" | |
# Slide tool for generating HTML slides used by slide_agent | |
def create_slides(slides: list[dict], title: str, instructor_name: str, output_dir: str = OUTPUT_DIR) -> list[str]: | |
try: | |
html_files = [] | |
template_file = os.path.join(os.getcwd(), "slide_template.html") | |
with open(template_file, "r", encoding="utf-8") as f: | |
template_content = f.read() | |
for i, slide in enumerate(slides): | |
slide_number = i + 1 | |
md_content = slide['content'] | |
html_content = render_md_to_html(md_content) | |
date = datetime.datetime.now().strftime("%Y-%m-%d") | |
# Replace placeholders in the template | |
slide_html = template_content.replace("<!--SLIDE_NUMBER-->", str(slide_number)) | |
slide_html = slide_html.replace("section title", f"{slide['title']}") | |
slide_html = slide_html.replace("Lecture title", title) | |
slide_html = slide_html.replace("<!--CONTENT-->", html_content) | |
slide_html = slide_html.replace("speaker name", instructor_name) | |
slide_html = slide_html.replace("date", date) | |
html_file = os.path.join(output_dir, f"slide_{slide_number}.html") | |
with open(html_file, "w", encoding="utf-8") as f: | |
f.write(slide_html) | |
logger.info("Generated HTML slide: %s", html_file) | |
html_files.append(html_file) | |
# Save slide content as Markdown files | |
for i, slide in enumerate(slides): | |
slide_number = i + 1 | |
md_file = os.path.join(output_dir, f"slide_{slide_number}_content.md") | |
with open(md_file, "w", encoding="utf-8") as f: | |
f.write(slide['content']) | |
logger.info("Saved slide content to Markdown: %s", md_file) | |
return html_files | |
except Exception as e: | |
logger.error("Failed to create HTML slides: %s", str(e)) | |
return [] | |
# Dynamic progress bar | |
def html_with_progress(label, progress): | |
return f""" | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 8px;"> | |
<div style="width: 70%; background-color: lightgrey; border-radius: 80px; overflow: hidden; margin-bottom: 20px;"> | |
<div style="width: {progress}%; height: 15px; background-color: #4CAF50; border-radius: 80px;"></div> | |
</div> | |
<h2 style="font-style: italic; color: #555 !important;">{label}</h2> | |
</div> | |
""" | |
# Get model client based on selected service | |
def get_model_client(service, api_key): | |
if service == "OpenAI-gpt-4o-2024-08-06": | |
return OpenAIChatCompletionClient(model="gpt-4o-2024-08-06", api_key=api_key) | |
elif service == "Anthropic-claude-3-sonnet-20240229": | |
return AnthropicChatCompletionClient(model="claude-3-sonnet-20240229", api_key=api_key) | |
elif service == "Google-gemini-2.0-flash": | |
return OpenAIChatCompletionClient(model="gemini-2.0-flash", api_key=api_key) | |
elif service == "Ollama-llama3.2": | |
return OllamaChatCompletionClient(model="llama3.2") | |
elif service == "Azure AI Foundry": | |
return AzureAIChatCompletionClient( | |
model="phi-4", | |
endpoint="https://models.inference.ai.azure.com", | |
credential=AzureKeyCredential(os.environ.get("GITHUB_TOKEN", "")), | |
model_info={ | |
"json_output": False, | |
"function_calling": False, | |
"vision": False, | |
"family": "unknown", | |
"structured_output": False, | |
} | |
) | |
else: | |
raise ValueError("Invalid service") | |
# Helper function to clean script text | |
def clean_script_text(script): | |
if not script or not isinstance(script, str): | |
logger.error("Invalid script input: %s", script) | |
return None | |
script = re.sub(r"\*\*Slide \d+:.*?\*\*", "", script) | |
script = re.sub(r"\[.*?\]", "", script) | |
script = re.sub(r"Title:.*?\n|Content:.*?\n", "", script) | |
script = script.replace("humanlike", "human-like").replace("problemsolving", "problem-solving") | |
script = re.sub(r"\s+", " ", script).strip() | |
if len(script) < 10: | |
logger.error("Cleaned script too short (%d characters): %s", len(script), script) | |
return None | |
logger.info("Cleaned script: %s", script) | |
return script | |
# Helper to validate and convert speaker audio | |
async def validate_and_convert_speaker_audio(speaker_audio): | |
if not speaker_audio or not os.path.exists(speaker_audio): | |
logger.warning("Speaker audio file does not exist: %s. Using default voice.", speaker_audio) | |
default_voice = os.path.join(os.path.dirname(__file__), "professor_lectura_male.mp3") | |
if os.path.exists(default_voice): | |
speaker_audio = default_voice | |
else: | |
logger.error("Default voice not found. Cannot proceed with TTS.") | |
return None | |
try: | |
ext = os.path.splitext(speaker_audio)[1].lower() | |
if ext == ".mp3": | |
logger.info("Converting MP3 to WAV: %s", speaker_audio) | |
audio = AudioSegment.from_mp3(speaker_audio) | |
audio = audio.set_channels(1).set_frame_rate(22050) | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False, dir=OUTPUT_DIR) as temp_file: | |
audio.export(temp_file.name, format="wav") | |
speaker_wav = temp_file.name | |
elif ext == ".wav": | |
speaker_wav = speaker_audio | |
else: | |
logger.error("Unsupported audio format: %s", ext) | |
return None | |
data, samplerate = sf.read(speaker_wav) | |
if samplerate < 16000 or samplerate > 48000: | |
logger.error("Invalid sample rate for %s: %d Hz", speaker_wav, samplerate) | |
return None | |
if len(data) < 16000: | |
logger.error("Speaker audio too short: %d frames", len(data)) | |
return None | |
if data.ndim == 2: | |
logger.info("Converting stereo WAV to mono: %s", speaker_wav) | |
data = data.mean(axis=1) | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False, dir=OUTPUT_DIR) as temp_file: | |
sf.write(temp_file.name, data, samplerate) | |
speaker_wav = temp_file.name | |
logger.info("Validated speaker audio: %s", speaker_wav) | |
return speaker_wav | |
except Exception as e: | |
logger.error("Failed to validate or convert speaker audio %s: %s", speaker_audio, str(e)) | |
return None | |
# Helper function to generate audio using Coqui TTS API | |
def generate_xtts_audio(tts, text, speaker_wav, output_path): | |
if not tts: | |
logger.error("TTS model not initialized") | |
return False | |
try: | |
tts.tts_to_file(text=text, speaker_wav=speaker_wav, language="en", file_path=output_path) | |
logger.info("Generated audio for %s", output_path) | |
return True | |
except Exception as e: | |
logger.error("Failed to generate audio for %s: %s", output_path, str(e)) | |
return False | |
# Helper function to extract JSON from messages | |
def extract_json_from_message(message): | |
if isinstance(message, TextMessage): | |
content = message.content | |
logger.debug("Extracting JSON from TextMessage: %s", content) | |
if not isinstance(content, str): | |
logger.warning("TextMessage content is not a string: %s", content) | |
return None | |
pattern = r"```json\s*(.*?)\s*```" | |
match = re.search(pattern, content, re.DOTALL) | |
if match: | |
try: | |
json_str = match.group(1).strip() | |
logger.debug("Found JSON in code block: %s", json_str) | |
return json.loads(json_str) | |
except json.JSONDecodeError as e: | |
logger.error("Failed to parse JSON from code block: %s", e) | |
json_patterns = [ | |
r"\[\s*\{.*?\}\s*\]", | |
r"\{\s*\".*?\"\s*:.*?\}", | |
] | |
for pattern in json_patterns: | |
match = re.search(pattern, content, re.DOTALL) | |
if match: | |
try: | |
json_str = match.group(0).strip() | |
logger.debug("Found JSON with pattern %s: %s", pattern, json_str) | |
return json.loads(json_str) | |
except json.JSONDecodeError as e: | |
logger.error("Failed to parse JSON with pattern %s: %s", pattern, e) | |
try: | |
for i in range(len(content)): | |
for j in range(len(content), i, -1): | |
substring = content[i:j].strip() | |
if (substring.startswith('{') and substring.endswith('}')) or \ | |
(substring.startswith('[') and substring.endswith(']')): | |
try: | |
parsed = json.loads(substring) | |
if isinstance(parsed, (list, dict)): | |
logger.info("Found JSON in substring: %s", substring) | |
return parsed | |
except json.JSONDecodeError: | |
continue | |
except Exception as e: | |
logger.error("Error in JSON substring search: %s", e) | |
logger.warning("No JSON found in TextMessage content") | |
return None | |
elif isinstance(message, StructuredMessage): | |
content = message.content | |
logger.debug("Extracting JSON from StructuredMessage: %s", content) | |
try: | |
if isinstance(content, BaseModel): | |
content_dict = content.dict() | |
return content_dict.get("slides", content_dict) | |
return content | |
except Exception as e: | |
logger.error("Failed to extract JSON from StructuredMessage: %s, Content: %s", e, content) | |
return None | |
elif isinstance(message, HandoffMessage): | |
logger.debug("Extracting JSON from HandoffMessage context") | |
for ctx_msg in message.context: | |
if hasattr(ctx_msg, "content"): | |
content = ctx_msg.content | |
logger.debug("HandoffMessage context content: %s", content) | |
if isinstance(content, str): | |
pattern = r"```json\s*(.*?)\s*```" | |
match = re.search(pattern, content, re.DOTALL) | |
if match: | |
try: | |
return json.loads(match.group(1)) | |
except json.JSONDecodeError as e: | |
logger.error("Failed to parse JSON from HandoffMessage: %s", e) | |
json_patterns = [ | |
r"\[\s*\{.*?\}\s*\]", | |
r"\{\s*\".*?\"\s*:.*?\}", | |
] | |
for pattern in json_patterns: | |
match = re.search(pattern, content, re.DOTALL) | |
if match: | |
try: | |
return json.loads(match.group(0)) | |
except json.JSONDecodeError as e: | |
logger.error("Failed to parse JSON with pattern %s: %s", pattern, e) | |
elif isinstance(content, dict): | |
return content.get("slides", content) | |
logger.warning("No JSON found in HandoffMessage context") | |
return None | |
logger.warning("Unsupported message type for JSON extraction: %s", type(message)) | |
return None | |
# Async update audio preview | |
async def update_audio_preview(audio_file): | |
if audio_file: | |
logger.info("Updating audio preview for file: %s", audio_file) | |
return audio_file | |
return None | |
# Create a zip file of .md, .txt, and .mp3 files | |
def create_zip_of_files(file_paths): | |
zip_path = os.path.join(OUTPUT_DIR, "all_lecture_materials.zip") | |
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
for file_path in file_paths: | |
if os.path.exists(file_path): | |
_, ext = os.path.splitext(file_path) | |
if ext in ['.md', '.txt', '.mp3']: | |
zipf.write(file_path, os.path.basename(file_path)) | |
logger.info("Added %s to zip", file_path) | |
logger.info("Created zip file: %s", zip_path) | |
return zip_path | |
# Access local files | |
def get_gradio_file_url(local_path): | |
relative_path = os.path.relpath(local_path, os.getcwd()) | |
return f"/gradio_api/file={relative_path}" | |
# Async generate lecture materials and audio | |
async def on_generate(api_service, api_key, serpapi_key, title, lecture_content_description, lecture_type, lecture_style, speaker_audio, num_slides): | |
print(f"Received serpapi_key: '{serpapi_key}' (type: {type(serpapi_key)}, length: {len(serpapi_key) if serpapi_key else 0})") | |
model_client = get_model_client(api_service, api_key) | |
# Get the speaker from the speaker_audio path | |
speaker = os.path.basename(speaker_audio) if speaker_audio else "professor_lectura_male.mp3" | |
logger.info(f"Selected speaker file: {speaker}") | |
instructor_name = get_instructor_name(speaker) | |
logger.info(f"Using instructor: {instructor_name}") | |
if os.path.exists(OUTPUT_DIR): | |
try: | |
for item in os.listdir(OUTPUT_DIR): | |
item_path = os.path.join(OUTPUT_DIR, item) | |
if os.path.isfile(item_path): | |
os.unlink(item_path) | |
elif os.path.isdir(item_path): | |
shutil.rmtree(item_path) | |
logger.info("Cleared outputs directory: %s", OUTPUT_DIR) | |
except Exception as e: | |
logger.error("Failed to clear outputs directory: %s", str(e)) | |
else: | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
logger.info("Created outputs directory: %s", OUTPUT_DIR) | |
# Total slides include user-specified content slides plus Introduction and Closing slides | |
content_slides = num_slides | |
total_slides = content_slides + 2 | |
date = datetime.datetime.now().strftime("%Y-%m-%d") | |
research_agent = AssistantAgent( | |
name="research_agent", | |
model_client=model_client, | |
handoffs=["slide_agent"], | |
system_message="You are a Research Agent. Use the search_web tool to gather information on the topic and keywords from the initial message. Summarize the findings concisely in a single message, then use the handoff_to_slide_agent tool to pass the task to the Slide Agent. Do not produce any other output.", | |
tools=[create_search_web_with_key(serpapi_key)] | |
) | |
slide_agent = AssistantAgent( | |
name="slide_agent", | |
model_client=model_client, | |
handoffs=["script_agent"], | |
system_message=f""" | |
You are a Slide Agent. Using the research from the conversation history and the specified number of content slides ({content_slides}), generate exactly {content_slides} content slides, plus an Introduction slide as the first slide and a Closing slide as the last slide, making a total of {total_slides} slides. | |
- The Introduction slide (first slide) should have the title "{title}" and content containing only the lecture title, speaker name ({get_instructor_name(speaker_audio)}), and date {date}, centered, in plain text. | |
- The Closing slide (last slide) should have the title "Closing" and content containing only "The End\nThank you", centered, in plain text. | |
- The remaining {content_slides} slides should be content slides based on the lecture description, audience type, and lecture style ({lecture_style}), with meaningful titles and content in valid Markdown format. Adapt the content to the lecture style to suit diverse learners: | |
- Feynman: Explains complex ideas with simplicity, clarity, and enthusiasm, emulating Richard Feynman's teaching style. | |
- Socratic: Poses thought-provoking questions to guide learners to insights without requiring direct interaction. | |
- Humorous: Infuses wit and light-hearted anecdotes to make content engaging and memorable. | |
- Inspirational - Motivating: Uses motivational language and visionary ideas to spark enthusiasm and curiosity. | |
- Reflective: Encourages introspection with a calm, contemplative tone to deepen understanding. | |
Output ONLY a JSON array wrapped in ```json ... ``` in a TextMessage, where each slide is an object with 'title' and 'content' keys. After generating the JSON, use the create_slides tool to produce HTML slides, then use the handoff_to_script_agent tool to pass the task to the Script Agent. Do not include any explanatory text or other messages. | |
Example output for 1 content slide (total 3 slides): | |
```json | |
[ | |
{{"title": "Introduction to AI Basics", "content": "AI Basics\n{get_instructor_name(speaker_audio)}\n{date}"}}, | |
{{"title": "What is AI?", "content": "# What is AI?\n- Definition: Systems that mimic human intelligence\n- Key areas: ML, NLP, Robotics"}}, | |
{{"title": "Closing", "content": "The End\nThank you"}} | |
] | |
```""", | |
tools=[create_slides], | |
output_content_type=None, | |
reflect_on_tool_use=False | |
) | |
script_agent = AssistantAgent( | |
name="script_agent", | |
model_client=model_client, | |
handoffs=["instructor_agent"], | |
system_message=f""" | |
You are a Script Agent. Access the JSON array of {total_slides} slides from the conversation history, which includes an Introduction slide, {content_slides} content slides, and a Closing slide. Generate a narration script (1-2 sentences) for each of the {total_slides} slides, summarizing its content in a clear, academically inclined tone. Ensure the lecture is engaging, covers the fundamental requirements of the topic, and aligns with the lecture style ({lecture_style}) to suit diverse learners. The lecture will be delivered by {instructor_name}. | |
Output ONLY a JSON array wrapped in ```json ... ``` with exactly {total_slides} strings, one script per slide, in the same order. Ensure the JSON is valid and complete. After outputting, use the handoff_to_instructor_agent tool. If scripts cannot be generated, retry once. | |
Example for 3 slides (1 content slide): | |
```json | |
[ | |
"Welcome to the lecture on AI Basics. I am {instructor_name}, and today we will explore the fundamentals of artificial intelligence.", | |
"Let us begin by defining artificial intelligence: it refers to systems that mimic human intelligence, spanning key areas such as machine learning, natural language processing, and robotics.", | |
"That concludes our lecture on AI Basics. Thank you for your attention, and I hope you found this session insightful." | |
] | |
```""", | |
output_content_type=None, | |
reflect_on_tool_use=False | |
) | |
def get_instructor_prompt(speaker, lecture_style): | |
base_prompts = { | |
"feynman.mp3": f"You are {instructor_name}, known for your ability to explain complex concepts with remarkable clarity and enthusiasm. Your teaching style is characterized by:", | |
"einstein.mp3": f"You are {instructor_name}, known for your profound insights and ability to connect abstract concepts to the physical world. Your teaching style is characterized by:", | |
"samantha.mp3": f"You are {instructor_name}, known for your engaging and accessible approach to teaching. Your teaching style is characterized by:", | |
"socrates.mp3": f"You are {instructor_name}, known for your method of questioning and guiding students to discover knowledge themselves. Your teaching style is characterized by:", | |
"professor_lectura_male.mp3": f"You are {instructor_name}, known for your clear and authoritative teaching style. Your teaching style is characterized by:" | |
} | |
style_characteristics = { | |
"Feynman - Simplifies complex ideas with enthusiasm": """ | |
- Breaking down complex ideas into simple, understandable parts | |
- Using analogies and real-world examples | |
- Maintaining enthusiasm and curiosity throughout | |
- Encouraging critical thinking and questioning | |
- Making abstract concepts tangible and relatable""", | |
"Socratic - Guides insights with probing questions": """ | |
- Using thought-provoking questions to guide understanding | |
- Encouraging self-discovery and critical thinking | |
- Challenging assumptions and exploring implications | |
- Building knowledge through dialogue and inquiry | |
- Fostering intellectual curiosity and reflection""", | |
"Inspirational - Sparks enthusiasm with visionary ideas": """ | |
- Connecting concepts to broader implications and possibilities | |
- Using motivational language and visionary thinking | |
- Inspiring curiosity and wonder about the subject | |
- Highlighting the transformative potential of knowledge | |
- Encouraging students to think beyond conventional boundaries""", | |
"Reflective - Promotes introspection with a calm tone": """ | |
- Creating a contemplative learning environment | |
- Encouraging deep thinking and personal connection | |
- Using a calm, measured delivery | |
- Promoting self-reflection and understanding | |
- Building connections between concepts and personal experience""", | |
"Humorous - Uses wit and anecdotes for engaging content": """ | |
- Incorporating relevant humor and anecdotes | |
- Making learning enjoyable and memorable | |
- Using wit to highlight key concepts | |
- Creating an engaging and relaxed atmosphere | |
- Balancing entertainment with educational value""" | |
} | |
base_prompt = base_prompts.get(speaker, base_prompts["feynman.mp3"]) | |
style_prompt = style_characteristics.get(lecture_style, style_characteristics["Feynman - Simplifies complex ideas with enthusiasm"]) | |
return f"""{base_prompt} | |
{style_prompt} | |
Review the slides and scripts from the conversation history to ensure coherence, completeness, and that exactly {total_slides} slides and {total_slides} scripts are received, including the Introduction and Closing slides. Verify that HTML slide files exist in the outputs directory and align with the lecture style ({lecture_style}). Output a confirmation message summarizing the number of slides, scripts, and HTML files status. If slides, scripts, or HTML files are missing, invalid, or do not match the expected count ({total_slides}), report the issue clearly. Use 'TERMINATE' to signal completion. | |
Example: 'Received {total_slides} slides, {total_slides} scripts, and HTML files. Lecture is coherent and aligns with {lecture_style} style. TERMINATE' | |
""" | |
instructor_agent = AssistantAgent( | |
name="instructor_agent", | |
model_client=model_client, | |
handoffs=[], | |
system_message=get_instructor_prompt(speaker_audio, lecture_style) | |
) | |
swarm = Swarm( | |
participants=[research_agent, slide_agent, script_agent, instructor_agent], | |
termination_condition=HandoffTermination(target="user") | TextMentionTermination("TERMINATE") | |
) | |
progress = 0 | |
label = "Researching lecture topic..." | |
yield ( | |
html_with_progress(label, progress), | |
[] | |
) | |
await asyncio.sleep(0.1) | |
initial_message = f""" | |
Lecture Title: {title} | |
Lecture Content Description: {lecture_content_description} | |
Audience: {lecture_type} | |
Lecture Style: {lecture_style} | |
Number of Content Slides: {content_slides} | |
Please start by researching the topic, or proceed without research if search is unavailable. | |
""" | |
logger.info("Starting lecture generation for title: %s with %d content slides (total %d slides), style: %s", title, content_slides, total_slides, lecture_style) | |
slides = None | |
scripts = None | |
html_files = [] | |
error_html = """ | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 8px;"> | |
<h2 style="color: #d9534f;">Failed to generate lecture materials</h2> | |
<p style="margin-top: 20px;">Please try again with different parameters or a different model.</p> | |
</div> | |
""" | |
try: | |
logger.info("Research Agent starting...") | |
if serpapi_key: | |
task_result = await Console(swarm.run_stream(task=initial_message)) | |
else: | |
logger.warning("No SerpApi key provided, bypassing research phase") | |
task_result = await Console(swarm.run_stream(task=f"{initial_message}\nNo search available, proceed with slide generation.")) | |
logger.info("Swarm execution completed") | |
slide_retry_count = 0 | |
script_retry_count = 0 | |
max_retries = 2 | |
for message in task_result.messages: | |
source = getattr(message, 'source', getattr(message, 'sender', None)) | |
logger.debug("Processing message from %s, type: %s", source, type(message)) | |
if isinstance(message, HandoffMessage): | |
logger.info("Handoff from %s to %s", source, message.target) | |
if source == "research_agent" and message.target == "slide_agent": | |
progress = 25 | |
label = "Slides: generating..." | |
yield ( | |
html_with_progress(label, progress), | |
[] | |
) | |
await asyncio.sleep(0.1) | |
elif source == "slide_agent" and message.target == "script_agent": | |
if slides is None: | |
logger.warning("Slide Agent handoff without slides JSON") | |
extracted_json = extract_json_from_message(message) | |
if extracted_json: | |
slides = extracted_json | |
logger.info("Extracted slides JSON from HandoffMessage context: %s", slides) | |
if slides is None or len(slides) != total_slides: | |
if slide_retry_count < max_retries: | |
slide_retry_count += 1 | |
logger.info("Retrying slide generation (attempt %d/%d)", slide_retry_count, max_retries) | |
retry_message = TextMessage( | |
content=f"Please generate exactly {total_slides} slides (Introduction, {content_slides} content slides, and Closing) as per your instructions.", | |
source="user", | |
recipient="slide_agent" | |
) | |
task_result.messages.append(retry_message) | |
continue | |
progress = 50 | |
label = "Scripts: generating..." | |
yield ( | |
html_with_progress(label, progress), | |
[] | |
) | |
await asyncio.sleep(0.1) | |
elif source == "script_agent" and message.target == "instructor_agent": | |
if scripts is None: | |
logger.warning("Script Agent handoff without scripts JSON") | |
extracted_json = extract_json_from_message(message) | |
if extracted_json: | |
scripts = extracted_json | |
logger.info("Extracted scripts JSON from HandoffMessage context: %s", scripts) | |
progress = 75 | |
label = "Review: in progress..." | |
yield ( | |
html_with_progress(label, progress), | |
[] | |
) | |
await asyncio.sleep(0.1) | |
elif source == "research_agent" and isinstance(message, TextMessage) and "handoff_to_slide_agent" in message.content: | |
logger.info("Research Agent completed research") | |
progress = 25 | |
label = "Slides: generating..." | |
yield ( | |
html_with_progress(label, progress), | |
[] | |
) | |
await asyncio.sleep(0.1) | |
elif source == "slide_agent" and isinstance(message, (TextMessage, StructuredMessage)): | |
logger.debug("Slide Agent message received") | |
extracted_json = extract_json_from_message(message) | |
if extracted_json: | |
slides = extracted_json | |
logger.info("Slide Agent generated %d slides: %s", len(slides), slides) | |
if len(slides) != total_slides: | |
if slide_retry_count < max_retries: | |
slide_retry_count += 1 | |
logger.info("Retrying slide generation (attempt %d/%d)", slide_retry_count, max_retries) | |
retry_message = TextMessage( | |
content=f"Please generate exactly {total_slides} slides (Introduction, {content_slides} content slides, and Closing) as per your instructions.", | |
source="user", | |
recipient="slide_agent" | |
) | |
task_result.messages.append(retry_message) | |
continue | |
# Generate HTML slides with instructor name | |
html_files = create_slides(slides, title, instructor_name) | |
if not html_files: | |
logger.error("Failed to generate HTML slides") | |
progress = 50 | |
label = "Scripts: generating..." | |
yield ( | |
html_with_progress(label, progress), | |
[] | |
) | |
await asyncio.sleep(0.1) | |
else: | |
logger.warning("No JSON extracted from slide_agent message") | |
if slide_retry_count < max_retries: | |
slide_retry_count += 1 | |
logger.info("Retrying slide generation (attempt %d/%d)", slide_retry_count, max_retries) | |
retry_message = TextMessage( | |
content=f"Please generate exactly {total_slides} slides (Introduction, {content_slides} content slides, and Closing) as per your instructions.", | |
source="user", | |
recipient="slide_agent" | |
) | |
task_result.messages.append(retry_message) | |
continue | |
elif source == "script_agent" and isinstance(message, (TextMessage, StructuredMessage)): | |
logger.debug("Script Agent message received") | |
extracted_json = extract_json_from_message(message) | |
if extracted_json: | |
scripts = extracted_json | |
logger.info("Script Agent generated scripts for %d slides: %s", len(scripts), scripts) | |
for i, script in enumerate(scripts): | |
script_file = os.path.join(OUTPUT_DIR, f"slide_{i+1}_script.txt") | |
try: | |
with open(script_file, "w", encoding="utf-8") as f: | |
f.write(script) | |
logger.info("Saved script to %s", script_file) | |
except Exception as e: | |
logger.error("Error saving script to %s: %s", script_file, str(e)) | |
progress = 75 | |
label = "Scripts generated and saved. Reviewing..." | |
yield ( | |
html_with_progress(label, progress), | |
[] | |
) | |
await asyncio.sleep(0.1) | |
else: | |
logger.warning("No JSON extracted from script_agent message") | |
if script_retry_count < max_retries: | |
script_retry_count += 1 | |
logger.info("Retrying script generation (attempt %d/%d)", script_retry_count, max_retries) | |
retry_message = TextMessage( | |
content=f"Please generate exactly {total_slides} scripts for the {total_slides} slides as per your instructions.", | |
source="user", | |
recipient="script_agent" | |
) | |
task_result.messages.append(retry_message) | |
continue | |
elif source == "instructor_agent" and isinstance(message, TextMessage) and "TERMINATE" in message.content: | |
logger.info("Instructor Agent completed lecture review: %s", message.content) | |
progress = 90 | |
label = "Lecture materials ready. Generating lecture speech..." | |
file_paths = [f for f in os.listdir(OUTPUT_DIR) if f.endswith(('.md', '.txt'))] | |
file_paths.sort() | |
file_paths = [os.path.join(OUTPUT_DIR, f) for f in file_paths] | |
yield ( | |
html_with_progress(label, progress), | |
file_paths | |
) | |
await asyncio.sleep(0.1) | |
logger.info("Slides state: %s", "Generated" if slides else "None") | |
logger.info("Scripts state: %s", "Generated" if scripts else "None") | |
logger.info("HTML files state: %s", "Generated" if html_files else "None") | |
if not slides or not scripts: | |
error_message = f"Failed to generate {'slides and scripts' if not slides and not scripts else 'slides' if not slides else 'scripts'}" | |
error_message += f". Received {len(slides) if slides else 0} slides and {len(scripts) if scripts else 0} scripts." | |
logger.error("%s", error_message) | |
logger.debug("Dumping all messages for debugging:") | |
for msg in task_result.messages: | |
source = getattr(msg, 'source', getattr(msg, 'sender', None)) | |
logger.debug("Message from %s, type: %s, content: %s", source, type(msg), msg.to_text() if hasattr(msg, 'to_text') else str(msg)) | |
yield ( | |
error_html, | |
[] | |
) | |
return | |
if len(slides) != total_slides: | |
logger.error("Expected %d slides, but received %d", total_slides, len(slides)) | |
yield ( | |
f""" | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 8px;"> | |
<h2 style="color: #d9534f;">Incorrect number of slides</h2> | |
<p style="margin-top: 20px;">Expected {total_slides} slides, but generated {len(slides)}. Please try again.</p> | |
</div> | |
""", | |
[] | |
) | |
return | |
if not isinstance(scripts, list) or not all(isinstance(s, str) for s in scripts): | |
logger.error("Scripts are not a list of strings: %s", scripts) | |
yield ( | |
f""" | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 8px;"> | |
<h2 style="color: #d9534f;">Invalid script format</h2> | |
<p style="margin-top: 20px;">Scripts must be a list of strings. Please try again.</p> | |
</div> | |
""", | |
[] | |
) | |
return | |
if len(scripts) != total_slides: | |
logger.error("Mismatch between number of slides (%d) and scripts (%d)", len(slides), len(scripts)) | |
yield ( | |
f""" | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 8px;"> | |
<h2 style="color: #d9534f;">Mismatch in slides and scripts</h2> | |
<p style="margin-top: 20px;">Generated {len(slides)} slides but {len(scripts)} scripts. Please try again.</p> | |
</div> | |
""", | |
[] | |
) | |
return | |
# Access the generated HTML files | |
html_file_urls = [get_gradio_file_url(html_file) for html_file in html_files] | |
audio_urls = [None] * len(scripts) | |
audio_timeline = "" | |
for i in range(len(scripts)): | |
audio_timeline += f'<audio id="audio-{i+1}" controls src="" style="display: inline-block; margin: 0 10px; width: 200px;"><span>Loading...</span></audio>' | |
file_paths = [f for f in os.listdir(OUTPUT_DIR) if f.endswith(('.md', '.txt'))] | |
file_paths.sort() | |
file_paths = [os.path.join(OUTPUT_DIR, f) for f in file_paths] | |
audio_files = [] | |
validated_speaker_wav = await validate_and_convert_speaker_audio(speaker_audio) | |
if not validated_speaker_wav: | |
logger.error("Invalid speaker audio after conversion, skipping TTS") | |
yield ( | |
f""" | |
<div style=\"display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 8px;\"> | |
<h2 style=\"color: #d9534f;\">Invalid speaker audio</h2> | |
<p style=\"margin-top: 20px;\">Please upload a valid MP3 or WAV audio file and try again.</p> | |
</div> | |
""", | |
[], | |
None | |
) | |
return | |
for i, script in enumerate(scripts): | |
cleaned_script = clean_script_text(script) | |
audio_file = os.path.join(OUTPUT_DIR, f"slide_{i+1}.mp3") | |
script_file = os.path.join(OUTPUT_DIR, f"slide_{i+1}_script.txt") | |
try: | |
with open(script_file, "w", encoding="utf-8") as f: | |
f.write(cleaned_script or "") | |
logger.info("Saved script to %s: %s", script_file, cleaned_script) | |
except Exception as e: | |
logger.error("Error saving script to %s: %s", | |
script_file, str(e)) | |
if not cleaned_script: | |
logger.error("Skipping audio for slide %d due to empty or invalid script", i + 1) | |
audio_files.append(None) | |
audio_urls[i] = None | |
progress = 90 + ((i + 1) / len(scripts)) * 10 | |
label = f"Generating lecture speech for slide {i + 1}/{len(scripts)}..." | |
yield ( | |
html_with_progress(label, progress), | |
file_paths, | |
None | |
) | |
await asyncio.sleep(0.1) | |
continue | |
max_audio_retries = 2 | |
for attempt in range(max_audio_retries + 1): | |
try: | |
current_text = cleaned_script | |
if attempt > 0: | |
sentences = re.split(r"[.!?]+", cleaned_script) | |
sentences = [s.strip() for s in sentences if s.strip()][:2] | |
current_text = ". ".join(sentences) + "." | |
logger.info("Retry %d for slide %d with simplified text: %s", attempt, i + 1, current_text) | |
success = generate_xtts_audio(tts, current_text, validated_speaker_wav, audio_file) | |
if not success: | |
raise RuntimeError("TTS generation failed") | |
logger.info("Generated audio for slide %d: %s", i + 1, audio_file) | |
audio_files.append(audio_file) | |
audio_urls[i] = get_gradio_file_url(audio_file) | |
progress = 90 + ((i + 1) / len(scripts)) * 10 | |
label = f"Generating lecture speech for slide {i + 1}/{len(scripts)}..." | |
file_paths.append(audio_file) | |
yield ( | |
html_with_progress(label, progress), | |
file_paths, | |
None | |
) | |
await asyncio.sleep(0.1) | |
break | |
except Exception as e: | |
logger.error("Error generating audio for slide %d (attempt %d): %s\n%s", i + 1, attempt, str(e), traceback.format_exc()) | |
if attempt == max_audio_retries: | |
logger.error("Max retries reached for slide %d, skipping", i + 1) | |
audio_files.append(None) | |
audio_urls[i] = None | |
progress = 90 + ((i + 1) / len(scripts)) * 10 | |
label = f"Generating lecture speech for slide {i + 1}/{len(scripts)}..." | |
yield ( | |
html_with_progress(label, progress), | |
file_paths, | |
None | |
) | |
await asyncio.sleep(0.1) | |
break | |
# Create zip file with all materials except .html files | |
zip_file = create_zip_of_files(file_paths) | |
file_paths.append(zip_file) | |
# Slide hack: Render the lecture container with iframe containing HTML slides | |
audio_timeline = "" | |
for j, url in enumerate(audio_urls): | |
if url: | |
audio_timeline += f'<audio id="audio-{j+1}" controls src="{url}" style="display: inline-block; margin: 0 10px; width: 200px;"></audio>' | |
else: | |
audio_timeline += f'<audio id="audio-{j+1}" controls src="" style="display: inline-block; margin: 0 10px; width: 200px;"><span>Audio unavailable</span></audio>' | |
slides_info = json.dumps({"htmlFiles": html_file_urls, "audioFiles": audio_urls}) | |
html_output = f""" | |
<div id="lecture-data" style="display: none;">{slides_info}</div> | |
<div id="lecture-container" style="height: 700px; border: 1px solid #ddd; border-radius: 8px; display: flex; flex-direction: column; justify-content: space-between;"> | |
<div id="slide-content" style="flex: 1; overflow: auto; padding: 20px; text-align: center; background-color: #fff;"> | |
<iframe id="slide-iframe" style="width: 100%; height: 100%; border: none;"></iframe> | |
</div> | |
<div style="padding: 20px; text-align: center;"> | |
<div class="audio-timeline" style="display: flex; justify-content: center; margin-bottom: 10px;"> | |
{audio_timeline} | |
</div> | |
<div style="display: center; justify-content: center; margin-bottom: 10px;"> | |
<button id="prev-btn" style="border-radius: 50%; width: 40px; height: 40px; margin: 0 5px; font-size: 1.2em; cursor: pointer; background-color: black"><i class="fas fa-step-backward" style="color: #fff !important"></i></button> | |
<button id="play-btn" style="border-radius: 50%; width: 40px; height: 40px; margin: 0 5px; font-size: 1.2em; cursor: pointer; background-color: black"><i class="fas fa-play" style="color: #fff !important"></i></button> | |
<button id="next-btn" style="border-radius: 50%; width: 40px; height: 40px; margin: 0 5px; font-size: 1.2em; cursor: pointer; background-color: black"><i class="fas fa-step-forward" style="color: #fff !important"></i></button> | |
<button id="fullscreen-btn" style="border-radius: 50%; width: 40px; height: 40px; margin: 0 5px; font-size: 1.2em; cursor: pointer; background-color: black"><i style="color: #fff !important" class="fas fa-expand"></i></button> | |
<button id="reload-btn" style="border-radius: 50%; width: 40px; height: 40px; margin: 0 5px; font-size: 1.2em; cursor: pointer; background-color: black"><i style="color: #fff !important" class="fas fa-sync-alt"></i></button> | |
<button id="clear-btn" style="border-radius: 50%; width: 40px; height: 40px; margin: 0 5px; font-size: 1.2em; cursor: pointer; background-color: black"><i style="color: #fff !important" class="fas fa-paint-brush"></i></button> | |
</div> | |
</div> | |
</div> | |
""" | |
logger.info("Yielding final lecture materials after audio generation") | |
# --- YIELD LECTURE CONTEXT FOR AGENTS --- | |
lecture_context = { | |
"slides": slides, | |
"scripts": scripts, | |
"title": title, | |
"description": lecture_content_description, | |
"style": lecture_style, | |
"audience": lecture_type | |
} | |
yield ( | |
html_output, | |
file_paths, | |
lecture_context | |
) | |
logger.info("Lecture generation completed successfully") | |
except Exception as e: | |
logger.error("Error during lecture generation: %s\n%s", str(e), traceback.format_exc()) | |
yield ( | |
f""" | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 8px;"> | |
<h2 style="color: #000;">Error during lecture generation</h2> | |
<p style="margin-top: 10px; font-size: 16px;color: #000;">{str(e)}</p> | |
<p style="margin-top: 20px;">Please try again</p> | |
</div> | |
""", | |
[], | |
None | |
) | |
return | |
# custom js | |
js_code = """ | |
() => { | |
// Function to wait for an element to appear in the DOM | |
function waitForElement(selector, callback, maxAttempts = 50, interval = 100) { | |
let attempts = 0; | |
const intervalId = setInterval(() => { | |
const element = document.querySelector(selector); | |
if (element) { | |
clearInterval(intervalId); | |
console.log(`Element ${selector} found after ${attempts} attempts`); | |
callback(element); | |
} else if (attempts >= maxAttempts) { | |
clearInterval(intervalId); | |
console.error(`Element ${selector} not found after ${maxAttempts} attempts`); | |
} | |
attempts++; | |
}, interval); | |
} | |
// Main initialization function | |
function initializeSlides() { | |
console.log("Initializing slides..."); | |
// Wait for lecture-data to load the JSON data | |
waitForElement('#lecture-data', (dataElement) => { | |
if (!dataElement.textContent) { | |
console.error("Lecture data element is empty"); | |
return; | |
} | |
let lectureData; | |
try { | |
lectureData = JSON.parse(dataElement.textContent); | |
console.log("Lecture data parsed successfully:", lectureData); | |
} catch (e) { | |
console.error("Failed to parse lecture data:", e); | |
return; | |
} | |
if (!lectureData.htmlFiles || lectureData.htmlFiles.length === 0) { | |
console.error("No HTML files found in lecture data"); | |
return; | |
} | |
let currentSlide = 0; | |
const totalSlides = lectureData.htmlFiles.length; | |
let audioElements = []; | |
let isPlaying = false; | |
let hasNavigated = false; | |
let currentAudioIndex = 0; | |
let lastRenderedSlide = -1; | |
let isReloading = false; | |
// Wait for slide-content element | |
waitForElement('#slide-content', (slideContent) => { | |
console.log("Slide content element found"); | |
// Initialize audio elements | |
for (let i = 0; i < totalSlides; i++) { | |
const audio = document.getElementById(`audio-${i+1}`); | |
if (audio) { | |
audioElements.push(audio); | |
console.log(`Found audio element audio-${i+1}:`, audio); | |
} else { | |
console.error(`Audio element audio-${i+1} not found`); | |
} | |
} | |
function renderSlide() { | |
console.log("Rendering slide:", currentSlide + 1); | |
if (currentSlide === lastRenderedSlide && !isReloading) { | |
console.log("Slide already rendered, skipping"); | |
return; | |
} | |
if (currentSlide >= 0 && currentSlide < totalSlides && lectureData.htmlFiles[currentSlide]) { | |
const iframe = document.getElementById('slide-iframe'); | |
if (iframe) { | |
iframe.src = lectureData.htmlFiles[currentSlide]; | |
console.log("Set iframe src to:", lectureData.htmlFiles[currentSlide]); | |
lastRenderedSlide = currentSlide; | |
// Adjust font size based on content length and screen size | |
waitForElement('iframe', (iframe) => { | |
iframe.onload = () => { | |
const doc = iframe.contentDocument || iframe.contentWindow.document; | |
const body = doc.body; | |
if (body) { | |
const textLength = body.textContent.length; | |
const screenWidth = window.innerWidth; | |
const screenHeight = window.innerHeight; | |
// Base font size calculation | |
let baseFontSize; | |
if (screenWidth >= 1920) { | |
baseFontSize = 20; // Large screens | |
} else if (screenWidth >= 1366) { | |
baseFontSize = 18; // Medium screens | |
} else { | |
baseFontSize = 16; // Small screens | |
} | |
// Adjust based on content length | |
let adjustedFontSize; | |
if (textLength > 1000) { | |
adjustedFontSize = baseFontSize * 0.8; // Reduce for long content | |
} else if (textLength > 500) { | |
adjustedFontSize = baseFontSize * 0.9; // Slightly reduce for medium content | |
} else { | |
adjustedFontSize = baseFontSize; // Keep base size for short content | |
} | |
// Ensure minimum and maximum sizes | |
adjustedFontSize = Math.max(14, Math.min(24, adjustedFontSize)); | |
// Apply to all elements | |
const elements = body.getElementsByTagName('*'); | |
for (let elem of elements) { | |
elem.style.fontSize = `${adjustedFontSize}px`; | |
} | |
console.log(`Adjusted font size to ${adjustedFontSize}px for ${textLength} characters on ${screenWidth}x${screenHeight} screen`); | |
} | |
}; | |
}); | |
} else { | |
console.error("Iframe not found"); | |
} | |
} else { | |
const iframe = document.getElementById('slide-iframe'); | |
if (iframe) { | |
iframe.src = "about:blank"; | |
console.log("No valid slide content for index:", currentSlide); | |
} | |
} | |
} | |
function updateSlide(callback) { | |
console.log("Updating slide to index:", currentSlide); | |
renderSlide(); | |
// Pause and reset all audio elements | |
audioElements.forEach(audio => { | |
if (audio && audio.pause) { | |
audio.pause(); | |
audio.currentTime = 0; | |
audio.style.border = 'none'; | |
console.log("Paused and reset audio:", audio.id); | |
} | |
}); | |
// Wait briefly to ensure pause completes before proceeding | |
setTimeout(() => { | |
if (callback) callback(); | |
}, 100); | |
} | |
function updateAudioSources(audioUrls) { | |
console.log("Updating audio sources:", audioUrls); | |
audioUrls.forEach((url, index) => { | |
const audio = audioElements[index]; | |
if (audio && url && audio.src !== url) { | |
audio.src = url; | |
audio.load(); | |
console.log(`Updated audio-${index+1} src to:`, url); | |
} else if (!audio) { | |
console.error(`Audio element at index ${index} not found`); | |
} | |
}); | |
} | |
function prevSlide() { | |
console.log("Previous button clicked, current slide:", currentSlide); | |
hasNavigated = true; | |
if (currentSlide > 0) { | |
currentSlide--; | |
updateSlide(() => { | |
const audio = audioElements[currentSlide]; | |
if (audio && audio.play && isPlaying) { | |
audio.style.border = '5px solid #50f150'; | |
audio.style.borderRadius = '30px'; | |
audio.play().catch(e => console.error('Audio play failed:', e)); | |
} | |
}); | |
} else { | |
console.log("Already at first slide"); | |
} | |
} | |
function nextSlide() { | |
console.log("Next button clicked, current slide:", currentSlide); | |
hasNavigated = true; | |
if (currentSlide < totalSlides - 1) { | |
currentSlide++; | |
updateSlide(() => { | |
const audio = audioElements[currentSlide]; | |
if (audio && audio.play && isPlaying) { | |
audio.style.border = '5px solid #50f150'; | |
audio.style.borderRadius = '30px'; | |
audio.play().catch(e => console.error('Audio play failed:', e)); | |
} | |
}); | |
} else { | |
console.log("Already at last slide"); | |
} | |
} | |
function playAll() { | |
console.log("Play button clicked, isPlaying:", isPlaying); | |
const playBtn = document.getElementById('play-btn'); | |
if (!playBtn) { | |
console.error("Play button not found"); | |
return; | |
} | |
const playIcon = playBtn.querySelector('i'); | |
if (isPlaying) { | |
// Pause playback | |
isPlaying = false; | |
audioElements.forEach(audio => { | |
if (audio && audio.pause) { | |
audio.pause(); | |
audio.style.border = 'none'; | |
console.log("Paused audio:", audio.id); | |
} | |
}); | |
playIcon.className = 'fas fa-play'; | |
return; | |
} | |
// Start playback | |
isPlaying = true; | |
playIcon.className = 'fas fa-pause'; | |
currentSlide = 0; | |
currentAudioIndex = 0; | |
updateSlide(() => { | |
function playNext() { | |
if (currentAudioIndex >= totalSlides || !isPlaying) { | |
isPlaying = false; | |
playIcon.className = 'fas fa-play'; | |
audioElements.forEach(audio => { | |
if (audio) audio.style.border = 'none'; | |
}); | |
console.log("Finished playing all slides or paused"); | |
return; | |
} | |
currentSlide = currentAudioIndex; | |
updateSlide(() => { | |
const audio = audioElements[currentAudioIndex]; | |
if (audio && audio.play) { | |
audioElements.forEach(a => a.style.border = 'none'); | |
audio.style.border = '5px solid #16cd16'; | |
audio.style.borderRadius = '30px'; | |
console.log(`Attempting to play audio for slide ${currentAudioIndex + 1}`); | |
const playAudio = () => { | |
audio.play().then(() => { | |
console.log(`Playing audio for slide ${currentAudioIndex + 1}`); | |
audio.onended = null; | |
audio.addEventListener('ended', () => { | |
if (isPlaying) { | |
console.log(`Audio ended for slide ${currentAudioIndex + 1}`); | |
currentAudioIndex++; | |
playNext(); | |
} | |
}, { once: true }); | |
const checkDuration = setInterval(() => { | |
if (!isPlaying) { | |
clearInterval(checkDuration); | |
return; | |
} | |
if (audio.duration && audio.currentTime >= audio.duration - 0.1) { | |
console.log(`Fallback: Audio for slide ${currentAudioIndex + 1} considered ended`); | |
clearInterval(checkDuration); | |
audio.onended = null; | |
currentAudioIndex++; | |
playNext(); | |
} | |
}, 1000); | |
}).catch(e => { | |
console.error(`Audio play failed for slide ${currentAudioIndex + 1}:`, e); | |
if (isPlaying) { | |
setTimeout(playAudio, 500); | |
} | |
}); | |
}; | |
playAudio(); | |
} else { | |
currentAudioIndex++; | |
playNext(); | |
} | |
}); | |
} | |
playNext(); | |
}); | |
} | |
function reloadSlides() { | |
console.log("Reloading slides"); | |
isReloading = true; | |
lastRenderedSlide = -1; | |
currentSlide = 0; | |
currentAudioIndex = 0; | |
isPlaying = false; | |
// Reset play button | |
const playBtn = document.getElementById('play-btn'); | |
if (playBtn) { | |
const playIcon = playBtn.querySelector('i'); | |
if (playIcon) { | |
playIcon.className = 'fas fa-play'; | |
} | |
} | |
// Reset audio elements | |
audioElements.forEach(audio => { | |
if (audio) { | |
audio.pause(); | |
audio.currentTime = 0; | |
audio.style.border = 'none'; | |
} | |
}); | |
updateSlide(() => { | |
isReloading = false; | |
}); | |
} | |
function toggleFullScreen() { | |
console.log("Fullscreen button clicked"); | |
const container = document.getElementById('lecture-container'); | |
if (!container) { | |
console.error("Lecture container not found"); | |
return; | |
} | |
if (!document.fullscreenElement) { | |
container.requestFullscreen().catch(err => { | |
console.error('Error enabling full-screen:', err); | |
}); | |
} else { | |
document.exitFullscreen(); | |
console.log("Exited fullscreen"); | |
} | |
} | |
// Attach event listeners | |
waitForElement('#prev-btn', (prevBtn) => { | |
prevBtn.addEventListener('click', prevSlide); | |
console.log("Attached event listener to prev-btn"); | |
}); | |
waitForElement('#play-btn', (playBtn) => { | |
playBtn.addEventListener('click', playAll); | |
console.log("Attached event listener to play-btn"); | |
}); | |
waitForElement('#next-btn', (nextBtn) => { | |
nextBtn.addEventListener('click', nextSlide); | |
console.log("Attached event listener to next-btn"); | |
}); | |
waitForElement('#fullscreen-btn', (fullscreenBtn) => { | |
fullscreenBtn.addEventListener('click', toggleFullScreen); | |
console.log("Attached event listener to fullscreen-btn"); | |
}); | |
waitForElement('#reload-btn', (reloadBtn) => { | |
reloadBtn.addEventListener('click', reloadSlides); | |
console.log("Attached event listener to reload-btn"); | |
}); | |
// Initialize audio sources and render first slide | |
updateAudioSources(lectureData.audioFiles); | |
renderSlide(); | |
console.log("Initial slide rendered, starting at slide:", currentSlide + 1); | |
}); | |
}); | |
} | |
// Observe DOM changes to detect when lecture container is added | |
const observer = new MutationObserver((mutations) => { | |
mutations.forEach((mutation) => { | |
if (mutation.addedNodes.length) { | |
const lectureContainer = document.getElementById('lecture-container'); | |
if (lectureContainer) { | |
console.log("Lecture container detected in DOM"); | |
observer.disconnect(); | |
initializeSlides(); | |
} | |
} | |
}); | |
}); | |
observer.observe(document.body, { childList: true, subtree: true }); | |
console.log("Started observing DOM for lecture container"); | |
} | |
""" | |
def process_uploaded_file(file): | |
"""Process uploaded file and extract text content.""" | |
try: | |
# Determine if file is a NamedString (Gradio string-like object) or file-like object | |
file_name = os.path.basename(file.name if hasattr(file, 'name') else str(file)) | |
file_path = os.path.join(UPLOAD_DIR, file_name) | |
# Get file extension | |
_, ext = os.path.splitext(file_path) | |
ext = ext.lower() | |
# Handle PDF files differently | |
if ext == '.pdf': | |
# For PDF files, write the raw bytes | |
if hasattr(file, 'read'): | |
with open(file_path, 'wb') as f: | |
f.write(file.read()) | |
else: | |
# If it's a file path, copy the file | |
shutil.copy2(str(file), file_path) | |
# Process PDF file | |
pdf_reader = PyPDF2.PdfReader(file_path) | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() + "\n" | |
logger.info("Extracted text from PDF: %s", file_path) | |
return text | |
# Handle text files | |
elif ext in ('.txt', '.md'): | |
# Read content and save to UPLOAD_DIR | |
if hasattr(file, 'read'): # File-like object | |
content = file.read() | |
if isinstance(content, bytes): | |
content = content.decode('utf-8', errors='replace') | |
with open(file_path, 'w', encoding='utf-8') as f: | |
f.write(content) | |
else: # NamedString or string-like | |
# If it's a file path, read the file | |
if os.path.exists(str(file)): | |
with open(str(file), 'r', encoding='utf-8') as f: | |
content = f.read() | |
else: | |
content = str(file) | |
with open(file_path, 'w', encoding='utf-8') as f: | |
f.write(content) | |
# Clean and return content | |
cleaned_content = clean_script_text(content) | |
logger.info("Cleaned content for %s: %s", file_path, cleaned_content[:100] + "..." if len(cleaned_content) > 100 else cleaned_content) | |
return cleaned_content | |
else: | |
raise ValueError(f"Unsupported file format: {ext}") | |
except Exception as e: | |
logger.error(f"Error processing file {file_path}: {str(e)}") | |
raise | |
async def study_mode_process(file, api_service, api_key): | |
"""Process uploaded file in study mode.""" | |
max_retries = 1 | |
for attempt in range(max_retries + 1): | |
try: | |
# Extract text from file | |
content = process_uploaded_file(file) | |
logger.info("Successfully extracted content from file: %s", file) | |
# Create study agent | |
logger.info("Initializing model client for service: %s", api_service) | |
model_client = get_model_client(api_service, api_key) | |
logger.info("Model client initialized successfully") | |
study_agent = AssistantAgent( | |
name="study_agent", | |
model_client=model_client, | |
system_message="""You are a Study Agent that analyzes lecture materials and generates appropriate inputs for the lecture generation system. | |
Analyze the provided content and generate: | |
1. A concise title (max 10 words) | |
2. A brief content description (max 20 words) | |
Output the results in JSON format: | |
{ | |
"title": "string", | |
"content_description": "string" | |
}""" | |
) | |
# Process content with study agent | |
logger.info("Running study agent with content length: %d", len(content)) | |
task_result = await Console(study_agent.run_stream(task=content)) | |
logger.info("Study agent execution completed") | |
for message in task_result.messages: | |
extracted_json = extract_json_from_message(message) | |
if extracted_json and isinstance(extracted_json, dict): | |
if "title" in extracted_json and "content_description" in extracted_json: | |
logger.info("Valid JSON output: %s", extracted_json) | |
return extracted_json | |
else: | |
logger.warning("Incomplete JSON output: %s", extracted_json) | |
raise ValueError("No valid JSON output with title and content_description from study agent") | |
except Exception as e: | |
logger.error("Attempt %d/%d failed: %s\n%s", attempt + 1, max_retries + 1, str(e), traceback.format_exc()) | |
if attempt == max_retries: | |
raise Exception(f"Failed to process file after {max_retries + 1} attempts: {str(e)}") | |
logger.info("Retrying study mode processing...") | |
await asyncio.sleep(1) # Brief delay before retry | |
# Gradio interface | |
with gr.Blocks( | |
title="Lectūra AI", | |
css=""" | |
.gradio-container-5-32-0 .prose * {color: #fd7b00 !important;} | |
h2, h3 {text-align: center; color: #000 !important;} | |
.gradio-container-5-29-0 .prose :last-child {color: #fff !important; } | |
#lecture-container {font-family: 'Times New Roman', Times, serif;} | |
#slide-content {font-size: 48px; line-height: 1.2;} | |
#form-group {box-shadow: 0 0 2rem rgba(0, 0, 0, .14) !important; border-radius: 30px; color: #000; background-color: white;} | |
#download {box-shadow: 0 0 2rem rgba(0, 0, 0, .14) !important; border-radius: 30px;} | |
#uploaded-file {box-shadow: 0 0 2rem rgba(0, 0, 0, .14) !important; border-radius: 30px;} | |
#slide-display {box-shadow: 0 0 2rem rgba(0, 0, 0, .14) !important; border-radius: 30px; background-color: white;} | |
.gradio-container { background: #fff !important; box-shadow: 0 0 2rem rgba(255, 255, 255, 0.14);padding-top: 30px;} | |
button {transition: background-color 0.3s;} | |
button:hover {background-color: #e0e0e0;} | |
.upload-area {border: 2px dashed #ccc; border-radius: 20px; padding: 40px; text-align: center; cursor: pointer; height: 100%; min-height: 700px; display: flex; flex-direction: column; justify-content: center; align-items: center;} | |
.upload-area:hover {border-color: #16cd16;} | |
.upload-area.dragover {border-color: #16cd16; background-color: rgba(22, 205, 22, 0.1);} | |
.wrap.svelte-1kzox3m {justify-content: center;} | |
#mode-tabs {border-radius: 30px !important;} | |
#component-2 {border-radius: 30px; box-shadow: rgba(0, 0, 0, 0.14) 0px 0px 2rem !important; width: 290px;} | |
#component-0 {align-items: center;justify-content: center;} | |
#component-26 {box-shadow: rgba(0, 0, 0, 0.14) 0px 0px 2rem !important; border-radius: 30px; height: 970px !important; overflow: auto !important;} | |
#right-column {padding: 10px !important; height: 100% !important; display: flex !important; flex-direction: column !important; gap: 20px !important;} | |
#notes-section {box-shadow: 0 0 2rem rgba(0, 0, 0, .14) !important; border-radius: 30px; background-color: white; padding: 20px; flex: 0 0 auto; display: flex; flex-direction: column; overflow: hidden;} | |
#chat-section {box-shadow: 0 0 2rem rgba(0, 0, 0, .14) !important; border-radius: 30px; background-color: white; padding: 20px; flex: 1; display: flex; flex-direction: column; overflow: hidden; min-height: 760px;} | |
.note-button {width: 100%; border-radius: 15px; margin-bottom: 10px; padding: 10px; background-color: #f0f0f0; border: none; cursor: pointer; color: #000 !important} | |
.note-button:hover {background-color: #e0e0e0;} | |
.notes-list {flex: 1; overflow-y: auto; margin-top: 0px; min-height: 0;} | |
.chat-input-container {display: flex; gap: 10px; margin-top: auto; padding-top: 20px;} | |
.chat-input {flex-grow: 1; border-radius: 20px; padding: 10px 20px; border: 1px solid #ddd;background-color: rgb(240, 240, 240)} | |
.send-button {border-radius: 20px; padding: 10px 25px; background-color: #16cd16; color: white; border: none; cursor: pointer;} | |
.send-button:hover {background-color: #14b814;} | |
.back-button {border-radius: 50%; width: 40px; height: 40px; background-color: #f0f0f0; border: none; cursor: pointer; display: flex; align-items: center; justify-content: center;} | |
.back-button:hover {background-color: #e0e0e0;} | |
.note-editor {display: none; width: 100%; height: 100%; min-height: 0;} | |
.note-editor.active {display: flex; flex-direction: column;} | |
.notes-view {display: flex; flex-direction: column; height: 100%; min-height: 0;} | |
.notes-view.hidden {display: none;} | |
.chat-messages {flex: 1; overflow-y: auto; margin-bottom: 20px; min-height: 0;} | |
#study-guide-btn {margin-bottom: 0px !important} | |
#component-26 {padding: 20px} | |
.gradio-container-5-29-0 .prose :last-child {color: black !important;} | |
#add-note-btn, #study-guide-btn, #quiz-btn, #send-btn{border-radius: 30px !important;} | |
#chatbot {border-radius: 20px !important;} | |
#chat-input-row {align-items: center !important;} | |
.gradio-container { background-color: white !important; color: black !important;} | |
main {max-width: fit-content !important} | |
#component-36 {height: 460px !important} | |
""", | |
js=js_code, | |
head='<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/5.15.4/css/all.min.css">' | |
) as demo: | |
gr.Markdown(""" | |
## <center>Lectūra: Your AI Genie for Self-taught Mastery.</center> | |
### <center>(Disclaimer: This demo is part of a submission to the AgentX – LLM Agents MOOC Competition, hosted by Berkeley RDI. © Lectūra Labs. All rights reserved)</center> | |
### Note: Genarating lecture speech takes a while, given that this demo is running on cpu. Recommend limiting number of slides to 3 on cpu. For faster generation, please run the app with access to GPU. | |
### Official Website: [https://lecturalabs.com/](https://lecturalabs.com/)""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
with gr.Group(elem_id="mode-tabs"): | |
mode_tabs = gr.Radio( | |
choices=["Learn Mode", "Study Mode"], | |
value="Learn Mode", | |
label="Mode", | |
elem_id="mode-tabs", | |
show_label=False | |
) | |
with gr.Row(): | |
# Left column (existing form) | |
with gr.Column(scale=1): | |
with gr.Group(elem_id="form-group"): | |
title = gr.Textbox(label="Lecture Title", placeholder="e.g. Introduction to AI") | |
lecture_content_description = gr.Textbox(label="Lecture Content Description", placeholder="e.g. Focus on recent advancements") | |
lecture_type = gr.Dropdown(["Conference", "University", "High school"], label="Audience", value="University") | |
lecture_style = gr.Dropdown( | |
["Feynman - Simplifies complex ideas with enthusiasm", "Socratic - Guides insights with probing questions", "Inspirational - Sparks enthusiasm with visionary ideas", "Reflective - Promotes introspection with a calm tone", "Humorous - Uses wit and anecdotes for engaging content"], | |
label="Lecture Style", | |
value="Feynman - Simplifies complex ideas with enthusiasm" | |
) | |
api_service = gr.Dropdown( | |
choices=[ | |
"Azure AI Foundry", | |
"OpenAI-gpt-4o-2024-08-06", | |
"Anthropic-claude-3-sonnet-20240229", | |
"Google-gemini-2.0-flash", | |
"Ollama-llama3.2", | |
], | |
label="Model", | |
value="Google-gemini-2.0-flash" | |
) | |
api_key = gr.Textbox(label="Model Provider API Key", type="password", placeholder="Not required for Ollama or Azure AI Foundry (use GITHUB_TOKEN env var)") | |
serpapi_key = gr.Textbox(label="SerpApi Key (For Research Agent)", type="password", placeholder="Enter your SerpApi key (optional)") | |
num_slides = gr.Slider(1, 20, step=1, label="Number of Lecture Slides (will add intro and closing slides)", value=3) | |
speaker_select = gr.Dropdown( | |
choices=["feynman.mp3", "einstein.mp3", "samantha.mp3", "socrates.mp3", "professor_lectura_male.mp3"], | |
value="professor_lectura_male.mp3", | |
label="Select Instructor", | |
elem_id="speaker-select" | |
) | |
speaker_audio = gr.Audio(value="professor_lectura_male.mp3", label="Speaker sample speech (MP3 or WAV)", type="filepath", elem_id="speaker-audio") | |
generate_btn = gr.Button("Generate Lecture") | |
# Middle column (existing slide display) | |
with gr.Column(scale=2): | |
default_slide_html = """ | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 30px; box-shadow: 0 0 2rem rgba(0, 0, 0, .14) !important;"> | |
<h2 style="font-style: italic; color: #000 !important;">Waiting for lecture content...</h2> | |
<p style="margin-top: 10px; font-size: 16px;color: #000 !important">Please Generate lecture content via the form on the left first before lecture begins</p> | |
</div> | |
""" | |
# Study mode upload area | |
study_mode_html = """ | |
<div class="upload-area" id="upload-area"> | |
<h2 style="margin-top: 20px; color: #000;">Please upload lecture material by clicking the upload button below</h2> | |
<p style="color: #666;">(only supports .pdf, .txt and .md)</p> | |
</div> | |
""" | |
slide_display = gr.HTML(label="Lecture Slides", value=default_slide_html, elem_id="slide-display") | |
uploaded_file = gr.File(label="Upload Lecture Material", visible=False, elem_id="uploaded-file") | |
file_output = gr.File(label="Download Lecture Materials", elem_id="download") | |
# --- RIGHT COLUMN SPLIT: NOTES (TOP) AND CHAT (BOTTOM) --- | |
with gr.Column(scale=1, elem_id="right-column"): | |
# State for notes and lecture context | |
notes_state = gr.State([]) # List of notes: [{"title": ..., "content": ...}] | |
lecture_context_state = gr.State({}) # Dict with latest lecture slides/scripts | |
chat_history_state = gr.State([]) # List of {user, assistant} | |
with gr.Row(): | |
with gr.Column(scale=1, elem_id="notes-section"): | |
with gr.Row(): | |
add_note_btn = gr.Button("+ Add note", elem_id="add-note-btn") | |
study_guide_btn = gr.Button("Study Guide", elem_id="study-guide-btn") | |
quiz_btn = gr.Button("Quiz Yourself", elem_id="quiz-btn") | |
note_response = gr.Textbox(label="Response", visible=True, value="Your notes, study guides, and quizzes will appear here...") | |
notes_list = gr.Dataframe(headers=["Title"], interactive=False, label="Your Notes", elem_id="notes-list") | |
with gr.Column(visible=False) as note_editor: | |
note_title = gr.Textbox(label="Note Title", elem_id="note-title") | |
note_content = gr.Textbox(label="Note Content", lines=10, elem_id="note-content") | |
with gr.Row(): | |
save_note_btn = gr.Button("Save Note", elem_id="save-note-btn") | |
back_btn = gr.Button("Back", elem_id="back-btn") | |
with gr.Column(scale=1, elem_id="chat-section"): | |
with gr.Column(): | |
chatbot = gr.Chatbot(label="Chat", elem_id="chatbot", height=220, show_copy_button=True, type="messages") | |
with gr.Row(elem_id="chat-input-row"): | |
chat_input = gr.Textbox(show_label=False, placeholder="Type your message...", lines=1, elem_id="chat-input", scale=10) | |
send_btn = gr.Button("Send", elem_id="send-btn", scale=1) | |
# --- UI LOGIC FOR SHOWING/HIDING RESPONSE COMPONENTS --- | |
def show_only(component): | |
return ( | |
gr.update(visible=(component == "note")), | |
gr.update(visible=(component == "study")), | |
gr.update(visible=(component == "quiz")), | |
) | |
async def add_note_fn(notes, lecture_context, api_service, api_key, title_val, desc_val, style_val, audience_val): | |
context = get_fallback_lecture_context(lecture_context, title_val, desc_val, style_val, audience_val) | |
note = await run_note_agent(api_service, api_key, context, "", "") | |
note_text = (note.get("title", "") + "\n" + note.get("content", "")).strip() | |
return ( | |
gr.update(value=note_text), | |
note.get("title", ""), | |
note.get("content", "") | |
) | |
add_note_btn.click( | |
fn=add_note_fn, | |
inputs=[notes_state, lecture_context_state, api_service, api_key, title, lecture_content_description, lecture_style, lecture_type], | |
outputs=[note_response, note_title, note_content] | |
) | |
# Study Guide button: generate study guide and show response | |
async def study_guide_btn_fn(notes, lecture_context, api_service, api_key, title_val, desc_val, style_val, audience_val): | |
context = get_fallback_lecture_context(lecture_context, title_val, desc_val, style_val, audience_val) | |
guide = await run_study_agent(api_service, api_key, context) | |
return gr.update(value=guide) | |
study_guide_btn.click( | |
fn=study_guide_btn_fn, | |
inputs=[notes_state, lecture_context_state, api_service, api_key, title, lecture_content_description, lecture_style, lecture_type], | |
outputs=[note_response] | |
) | |
# Quiz button: generate quiz and show response | |
async def quiz_btn_fn(notes, lecture_context, api_service, api_key, title_val, desc_val, style_val, audience_val): | |
context = get_fallback_lecture_context(lecture_context, title_val, desc_val, style_val, audience_val) | |
quiz = await run_quiz_agent(api_service, api_key, context) | |
return gr.update(value=quiz) | |
quiz_btn.click( | |
fn=quiz_btn_fn, | |
inputs=[notes_state, lecture_context_state, api_service, api_key, title, lecture_content_description, lecture_style, lecture_type], | |
outputs=[note_response] | |
) | |
# Back button: clear response | |
back_btn.click( | |
fn=lambda: gr.update(value="Click any button above to generate content..."), | |
inputs=[], | |
outputs=[note_response] | |
) | |
async def save_note(note_title_val, note_content_val, notes, lecture_context, api_service, api_key, note_type=None): | |
note = await run_note_agent(api_service, api_key, get_fallback_lecture_context(lecture_context, note_title_val, note_content_val, "", ""), note_title_val, note_content_val) | |
# Prefix title with note type if provided | |
if note_type: | |
note["title"] = note_type_prefix(note_type, note.get("title", "")) | |
new_notes = copy.deepcopy(notes) | |
new_notes.append(note) | |
# Save note content to a .txt file | |
note_file = os.path.join(OUTPUT_DIR, f"{note['title']}.txt") | |
with open(note_file, "w", encoding="utf-8") as f: | |
f.write(note['content']) | |
return ( | |
update_notes_list(new_notes), | |
new_notes, | |
gr.update(value="Click any button above to generate content...") | |
) | |
save_note_btn.click( | |
fn=save_note, | |
inputs=[note_title, note_content, notes_state, lecture_context_state, api_service, api_key], | |
outputs=[notes_list, notes_state, note_response] | |
) | |
# --- CHAT AGENT LOGIC --- | |
async def chat_fn(user_message, chat_history, lecture_context, api_service, api_key, title_val, desc_val): | |
if not user_message.strip(): | |
return chat_history, "", chat_history, gr.update(), gr.update() | |
form_update, response = await run_chat_agent(api_service, api_key, lecture_context, chat_history, user_message) | |
new_history = chat_history.copy() | |
# Append user message | |
if user_message: | |
new_history.append({"role": "user", "content": user_message}) | |
# Append assistant response | |
if response: | |
new_history.append({"role": "assistant", "content": response}) | |
title_update = gr.update() | |
desc_update = gr.update() | |
if form_update: | |
title = form_update.get("title") | |
desc = form_update.get("content_description") | |
msg = "" | |
if title: | |
msg += f"\nLecture Title: {title}" | |
title_update = gr.update(value=title) | |
if desc: | |
msg += f"\nLecture Content Description: {desc}" | |
desc_update = gr.update(value=desc) | |
new_history.append({"role": "assistant", "content": msg.strip()}) | |
return new_history, "", new_history, title_update, desc_update | |
return new_history, "", new_history, title_update, desc_update | |
send_btn.click( | |
fn=chat_fn, | |
inputs=[chat_input, chat_history_state, lecture_context_state, api_service, api_key, title, lecture_content_description], | |
outputs=[chatbot, chat_input, chat_history_state, title, lecture_content_description] | |
) | |
js_code = js_code + """ | |
// Add file upload handling | |
function initializeFileUpload() { | |
const uploadArea = document.getElementById('upload-area'); | |
if (!uploadArea) return; | |
// Create hidden file input | |
const fileInput = document.createElement('input'); | |
fileInput.type = 'file'; | |
fileInput.accept = '.pdf,.txt,.md'; | |
fileInput.style.display = 'none'; | |
uploadArea.appendChild(fileInput); | |
// Handle click on the entire upload area | |
uploadArea.addEventListener('click', (e) => { | |
if (e.target !== fileInput) { | |
fileInput.click(); | |
} | |
}); | |
fileInput.addEventListener('change', (e) => { | |
const file = e.target.files[0]; | |
if (file) { | |
const dataTransfer = new DataTransfer(); | |
dataTransfer.items.add(file); | |
const gradioFileInput = document.querySelector('input[type="file"]'); | |
if (gradioFileInput) { | |
gradioFileInput.files = dataTransfer.files; | |
const event = new Event('change', { bubbles: true }); | |
gradioFileInput.dispatchEvent(event); | |
} | |
} | |
}); | |
// Handle drag and drop | |
['dragenter', 'dragover', 'dragleave', 'drop'].forEach(eventName => { | |
uploadArea.addEventListener(eventName, preventDefaults, false); | |
}); | |
function preventDefaults(e) { | |
e.preventDefault(); | |
e.stopPropagation(); | |
} | |
['dragenter', 'dragover'].forEach(eventName => { | |
uploadArea.addEventListener(eventName, highlight, false); | |
}); | |
['dragleave', 'drop'].forEach(eventName => { | |
uploadArea.addEventListener(eventName, unhighlight, false); | |
}); | |
function highlight(e) { | |
uploadArea.classList.add('dragover'); | |
} | |
function unhighlight(e) { | |
uploadArea.classList.remove('dragover'); | |
} | |
uploadArea.addEventListener('drop', handleDrop, false); | |
function handleDrop(e) { | |
const dt = e.dataTransfer; | |
const file = dt.files[0]; | |
if (file) { | |
const dataTransfer = new DataTransfer(); | |
dataTransfer.items.add(file); | |
const gradioFileInput = document.querySelector('input[type="file"]'); | |
if (gradioFileInput) { | |
gradioFileInput.files = dataTransfer.files; | |
const event = new Event('change', { bubbles: true }); | |
gradioFileInput.dispatchEvent(event); | |
} | |
} | |
} | |
} | |
// Initialize clear button functionality | |
function initializeClearButton() { | |
const clearButton = document.getElementById('clear-btn'); | |
if (clearButton) { | |
clearButton.addEventListener('click', () => { | |
const modeTabs = document.querySelector('.mode-tabs input[type="radio"]:checked'); | |
const isStudyMode = modeTabs && modeTabs.value === 'Study Mode'; | |
// Reset all audio elements | |
const audioElements = document.querySelectorAll('audio'); | |
audioElements.forEach(audio => { | |
audio.pause(); | |
audio.currentTime = 0; | |
audio.style.border = 'none'; | |
}); | |
// Reset play button | |
const playBtn = document.getElementById('play-btn'); | |
if (playBtn) { | |
const playIcon = playBtn.querySelector('i'); | |
if (playIcon) { | |
playIcon.className = 'fas fa-play'; | |
} | |
} | |
const slideContent = document.getElementById('slide-content'); | |
if (slideContent) { | |
if (isStudyMode) { | |
slideContent.innerHTML = ` | |
<div class="upload-area" id="upload-area"> | |
<h2 style="margin-top: 20px; color: #000;">Please upload lecture material by clicking the upload button below</h2> | |
<p style="color: #666;">(only supports .pdf, .txt and .md)</p> | |
</div> | |
`; | |
initializeFileUpload(); | |
} else { | |
slideContent.innerHTML = ` | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 30px; box-shadow: 0 0 2rem rgba(0, 0, 0, .14) !important;"> | |
<h2 style="font-style: italic; color: #000 !important;">Waiting for lecture content...</h2> | |
<p style="margin-top: 10px; font-size: 16px;color: #000">Please Generate lecture content via the form on the left first before lecture begins</p> | |
</div> | |
`; | |
} | |
} | |
}); | |
} | |
} | |
// Initialize speaker selection | |
function initializeSpeakerSelect() { | |
const speakerSelect = document.getElementById('speaker-select'); | |
const speakerAudio = document.querySelector('#speaker-audio input[type="file"]'); | |
if (speakerSelect && speakerAudio) { | |
speakerSelect.addEventListener('change', (e) => { | |
const selectedSpeaker = e.target.value; | |
// Create a new File object from the selected speaker | |
fetch(selectedSpeaker) | |
.then(response => response.blob()) | |
.then(blob => { | |
const file = new File([blob], selectedSpeaker, { type: 'audio/mpeg' }); | |
const dataTransfer = new DataTransfer(); | |
dataTransfer.items.add(file); | |
speakerAudio.files = dataTransfer.files; | |
const event = new Event('change', { bubbles: true }); | |
speakerAudio.dispatchEvent(event); | |
}); | |
}); | |
} | |
} | |
// Initialize file upload when study mode is active | |
function checkAndInitializeUpload() { | |
const uploadArea = document.getElementById('upload-area'); | |
if (uploadArea) { | |
console.log('Initializing file upload...'); | |
initializeFileUpload(); | |
} | |
initializeClearButton(); | |
initializeSpeakerSelect(); | |
} | |
// Check immediately and also set up an observer | |
checkAndInitializeUpload(); | |
const modeObserver = new MutationObserver((mutations) => { | |
mutations.forEach((mutation) => { | |
if (mutation.addedNodes.length) { | |
checkAndInitializeUpload(); | |
} | |
}); | |
}); | |
modeObserver.observe(document.body, { childList: true, subtree: true }); | |
""" | |
# Handle mode switching | |
def switch_mode(mode): | |
if mode == "Learn Mode": | |
return default_slide_html, gr.update(visible=True), gr.update(visible=False) | |
else: | |
return study_mode_html, gr.update(visible=True), gr.update(visible=True) | |
mode_tabs.change( | |
fn=switch_mode, | |
inputs=[mode_tabs], | |
outputs=[slide_display, generate_btn, uploaded_file] | |
) | |
# Handle file upload in study mode | |
async def handle_file_upload(file, api_service, api_key): | |
"""Handle file upload in study mode and validate API key.""" | |
if not file: | |
yield default_slide_html, None, None | |
return | |
# Validate API key or GITHUB_TOKEN for Azure AI Foundry | |
if not api_key and api_service != "Azure AI Foundry": | |
error_html = """ | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 8px;"> | |
<h2 style="color: #d9534f;">Please input api key first</h2> | |
<p style="margin-top: 20px;">An API key is required to process uploaded files in Study mode. Please provide a valid API key and try again.</p> | |
</div> | |
""" | |
logger.warning("API key is empty, terminating file upload") | |
yield error_html, None, None | |
return | |
elif api_service == "Azure AI Foundry" and not os.environ.get("GITHUB_TOKEN"): | |
error_html = """ | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 8px;"> | |
<h2 style="color: #d9534f;">GITHUB_TOKEN not set</h2> | |
<p style="margin-top: 20px;">Azure AI Foundry requires a GITHUB_TOKEN environment variable. Please set it and try again.</p> | |
</div> | |
""" | |
logger.warning("GITHUB_TOKEN is missing for Azure AI Foundry, terminating file upload") | |
yield error_html, None, None | |
return | |
try: | |
# Show uploading progress | |
yield html_with_progress("Uploading Lecture Material...", 25), None, None | |
await asyncio.sleep(0.1) | |
# Show processing progress | |
yield html_with_progress("Processing file...", 50), None, None | |
await asyncio.sleep(0.1) | |
# Process file and generate inputs | |
yield html_with_progress("Researching lecture material...", 75), None, None | |
await asyncio.sleep(0.1) | |
result = await study_mode_process(file, api_service, api_key) | |
# Show success message with updated inputs | |
success_html = """ | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 30px; box-shadow: 0 0 2rem rgba(0, 0, 0, .14) !important;"> | |
<h2 style="font-style: italic; color: #000 !important;">Research on study material completed, you can now generate lecture</h2> | |
<p style="margin-top: 10px; font-size: 16px;color: #000">The form has been updated with the extracted information. Click Generate Lecture to proceed.</p> | |
</div> | |
""" | |
# Prompt via chat updates only title and description form inputs | |
yield ( | |
success_html, | |
result["title"], | |
result["content_description"] | |
) | |
except Exception as e: | |
error_html = f""" | |
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; height: 100%; min-height: 700px; padding: 20px; text-align: center; border: 1px solid #ddd; border-radius: 8px;"> | |
<h2 style="color: #d9534f;">Error processing file</h2> | |
<p style="margin-top: 20px;">{str(e)}</p> | |
</div> | |
""" | |
logger.error(f"Error processing file: {str(e)}") | |
yield error_html, None, None | |
uploaded_file.change( | |
fn=handle_file_upload, | |
inputs=[uploaded_file, api_service, api_key], | |
outputs=[slide_display, title, lecture_content_description] | |
) | |
speaker_audio.change( | |
fn=update_audio_preview, | |
inputs=speaker_audio, | |
outputs=speaker_audio | |
) | |
generate_btn.click( | |
fn=on_generate, | |
inputs=[api_service, api_key, serpapi_key, title, lecture_content_description, lecture_type, lecture_style, speaker_audio, num_slides], | |
outputs=[slide_display, file_output] | |
) | |
# Handle speaker selection | |
def update_speaker_audio(speaker): | |
logger.info(f"Speaker selection changed to: {speaker}") | |
return speaker | |
speaker_select.change( | |
fn=update_speaker_audio, | |
inputs=[speaker_select], | |
outputs=[speaker_audio] | |
) | |
js_code = js_code + """ | |
// Add note editor functionality | |
function initializeNoteEditor() { | |
const addNoteBtn = document.getElementById('add-note-btn'); | |
const backBtn = document.getElementById('back-btn'); | |
const notesView = document.getElementById('notes-view'); | |
const noteEditor = document.getElementById('note-editor'); | |
if (addNoteBtn && backBtn && notesView && noteEditor) { | |
addNoteBtn.addEventListener('click', () => { | |
notesView.style.display = 'none'; | |
noteEditor.style.display = 'block'; | |
}); | |
backBtn.addEventListener('click', () => { | |
noteEditor.style.display = 'none'; | |
notesView.style.display = 'block'; | |
}); | |
} | |
} | |
// Initialize all components | |
function initializeComponents() { | |
initializeFileUpload(); | |
initializeClearButton(); | |
initializeSpeakerSelect(); | |
initializeNoteEditor(); | |
} | |
initializeComponents(); | |
const observer = new MutationObserver((mutations) => { | |
mutations.forEach((mutation) => { | |
if (mutation.addedNodes.length) { | |
initializeComponents(); | |
} | |
}); | |
}); | |
observer.observe(document.body, { childList: true, subtree: true }); | |
""" | |
async def run_note_agent(api_service, api_key, lecture_context, note_title, note_content): | |
model_client = get_model_client(api_service, api_key) | |
system_message = ( | |
"You are a Note Agent. Given the current lecture slides and scripts, help the user draft a note. " | |
"If a title or content is provided, improve or complete the note. If not, suggest a new note based on the lecture. " | |
"Always use the lecture context. Output a JSON object: {\"title\": ..., \"content\": ...}." | |
) | |
note_agent = AssistantAgent( | |
name="note_agent", | |
model_client=model_client, | |
system_message=system_message | |
) | |
context_str = json.dumps(lecture_context) | |
user_input = f"Lecture Context: {context_str}\nNote Title: {note_title}\nNote Content: {note_content}" | |
result = await Console(note_agent.run_stream(task=user_input)) | |
# Return only the agent's reply | |
for msg in reversed(result.messages): | |
if getattr(msg, 'source', None) == 'note_agent' and hasattr(msg, 'content') and isinstance(msg.content, str): | |
try: | |
extracted = extract_json_from_message(msg) | |
if extracted and isinstance(extracted, dict): | |
return extracted | |
except Exception: | |
continue | |
for msg in reversed(result.messages): | |
if hasattr(msg, 'content') and isinstance(msg.content, str): | |
try: | |
extracted = extract_json_from_message(msg) | |
if extracted and isinstance(extracted, dict): | |
return extracted | |
except Exception: | |
continue | |
return {"title": note_title, "content": note_content} | |
async def run_study_agent(api_service, api_key, lecture_context): | |
model_client = get_model_client(api_service, api_key) | |
system_message = ( | |
"You are a Study Guide Agent. Given the current lecture slides and scripts, generate a concise study guide (max 200 words) summarizing the key points and actionable steps for the student. Output plain text only." | |
) | |
study_agent = AssistantAgent( | |
name="study_agent", | |
model_client=model_client, | |
system_message=system_message | |
) | |
context_str = json.dumps(lecture_context) | |
user_input = f"Lecture Context: {context_str}" | |
result = await Console(study_agent.run_stream(task=user_input)) | |
# Return only the agent's reply | |
for msg in reversed(result.messages): | |
if getattr(msg, 'source', None) == 'study_agent' and hasattr(msg, 'content') and isinstance(msg.content, str): | |
return msg.content.strip() | |
for msg in reversed(result.messages): | |
if hasattr(msg, 'content') and isinstance(msg.content, str): | |
return msg.content.strip() | |
return "No study guide generated." | |
async def run_quiz_agent(api_service, api_key, lecture_context): | |
model_client = get_model_client(api_service, api_key) | |
system_message = ( | |
"You are a Quiz Agent. Given the current lecture slides and scripts, generate a short quiz (3-5 questions) to test understanding. Output plain text only." | |
) | |
quiz_agent = AssistantAgent( | |
name="quiz_agent", | |
model_client=model_client, | |
system_message=system_message | |
) | |
context_str = json.dumps(lecture_context) | |
user_input = f"Lecture Context: {context_str}" | |
result = await Console(quiz_agent.run_stream(task=user_input)) | |
# Return only the agent's reply | |
for msg in reversed(result.messages): | |
if getattr(msg, 'source', None) == 'quiz_agent' and hasattr(msg, 'content') and isinstance(msg.content, str): | |
return msg.content.strip() | |
for msg in reversed(result.messages): | |
if hasattr(msg, 'content') and isinstance(msg.content, str): | |
return msg.content.strip() | |
return "No quiz generated." | |
async def run_chat_agent(api_service, api_key, lecture_context, chat_history, user_message): | |
model_client = get_model_client(api_service, api_key) | |
system_message = ( | |
"You are a helpful Chat Agent. Answer questions about the lecture, and if the user asks for a lecture title or content description, suggest appropriate values. " | |
"If you want to update the form, output a JSON object: {\"title\": ..., \"content_description\": ...}. Otherwise, just reply as normal." | |
) | |
chat_agent = AssistantAgent( | |
name="chat_agent", | |
model_client=model_client, | |
system_message=system_message | |
) | |
context_str = json.dumps(lecture_context) | |
chat_str = "\n".join([f"User: {m['content']}" if m['role']=='user' else f"Assistant: {m['content']}" for m in chat_history]) | |
user_input = f"Lecture Context: {context_str}\nChat History: {chat_str}\nUser: {user_message}" | |
result = await Console(chat_agent.run_stream(task=user_input)) | |
# Return only the chat_agent's reply | |
for msg in reversed(result.messages): | |
if getattr(msg, 'source', None) == 'chat_agent' and hasattr(msg, 'content') and isinstance(msg.content, str): | |
extracted = extract_json_from_message(msg) | |
if extracted and isinstance(extracted, dict): | |
return extracted, None | |
return None, msg.content.strip() | |
for msg in reversed(result.messages): | |
if hasattr(msg, 'content') and isinstance(msg.content, str): | |
extracted = extract_json_from_message(msg) | |
if extracted and isinstance(extracted, dict): | |
return extracted, None | |
return None, msg.content.strip() | |
return None, "No response." | |
def update_notes_list(notes): | |
"""Convert notes list to DataFrame format for Gradio Dataframe (titles only).""" | |
return [[n["title"]] for n in notes] | |
def show_note_editor_with_content(title, content): | |
return ( | |
gr.update(visible=True), # note_editor | |
gr.update(visible=False), # notes_list | |
gr.update(visible=False), # study_guide_output | |
gr.update(visible=False), # quiz_output | |
gr.update(value=title), # note_title | |
gr.update(value=content) # note_content | |
) | |
def hide_note_editor(): | |
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) | |
def show_study_guide(guide): | |
return gr.update(visible=False), gr.update(visible=True), gr.update(value=guide, visible=True), gr.update(visible=False) | |
def show_quiz(quiz): | |
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(value=quiz, visible=True) | |
# Helper to get fallback lecture context from form fields | |
def get_fallback_lecture_context(lecture_context, title_val, desc_val, style_val, audience_val): | |
# If slides/scripts missing, use form fields | |
if lecture_context and (lecture_context.get("slides") or lecture_context.get("scripts")): | |
return lecture_context | |
return { | |
"slides": [], | |
"scripts": [], | |
"title": title_val or "Untitled Lecture", | |
"description": desc_val or "No description provided.", | |
"style": style_val or "Feynman - Simplifies complex ideas with enthusiasm", | |
"audience": audience_val or "University" | |
} | |
def show_note_content(evt: dict, notes): | |
# Handle both event dict and direct index | |
idx = evt.get('index', 0) if isinstance(evt, dict) else evt | |
if 0 <= idx < len(notes): | |
note = notes[idx] | |
note_file = os.path.join(OUTPUT_DIR, f"{note['title']}.txt") | |
if os.path.exists(note_file): | |
with open(note_file, "r", encoding="utf-8") as f: | |
note_text = f.read() | |
return gr.update(value=note_text) | |
return gr.update(value="Click any button above to generate content...") | |
notes_list.select( | |
fn=show_note_content, | |
inputs=[notes_state], | |
outputs=note_response | |
) | |
# --- NOTES LOGIC --- | |
def note_type_prefix(note_type, title): | |
if note_type and not title.startswith(note_type): | |
return f"{note_type} - {title}" | |
return title | |
custom_css = """ | |
#right-column {height: 100% !important; display: flex !important; flex-direction: column !important; gap: 20px !important;} | |
#notes-section, #chat-section {flex: 1 1 0; min-height: 0; max-height: 50vh; overflow-y: auto;} | |
#chat-section {display: flex; flex-direction: column; position: relative;} | |
#chatbot {flex: 1 1 auto; min-height: 0; max-height: calc(50vh - 60px); overflow-y: auto;} | |
#chat-input-row {position: sticky; bottom: 0; background: white; z-index: 2; padding-top: 8px;} | |
""" | |
demo.css += custom_css | |
if __name__ == "__main__": | |
demo.launch(allowed_paths=[OUTPUT_DIR]) |