| from pathlib import Path |
| import shutil |
| import sys |
| import warnings |
|
|
| |
| def _log(msg): |
| print(msg, flush=True) |
|
|
| _log("1. Loading env...") |
| |
| warnings.filterwarnings("ignore", message=".*urllib3.*") |
| warnings.filterwarnings("ignore", message=".*chardet.*") |
|
|
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv(Path(__file__).resolve().parent.parent / ".env") |
| load_dotenv(Path(__file__).resolve().parent / ".env") |
|
|
| _log("2. Loading Gradio...") |
| from datetime import datetime |
| import gradio as gr |
| _log("2a. Loading gradio_client...") |
| import gradio_client.utils as gradio_client_utils |
|
|
| _log("3. Loading backend...") |
| from backend.ingestion_service import ingest_pdf_chunks, ingest_url_chunks, remove_chunks_for_source |
| from backend.notebook_service import create_notebook, list_notebooks, rename_notebook, delete_notebook |
| from backend.podcast_service import generate_podcast, generate_podcast_audio |
| from backend.chat_service import load_chat |
| from backend.rag_service import rag_chat |
| from backend.report_service import generate_report |
|
|
| import hashlib |
| _log("4. Imports done.") |
|
|
| _original_gradio_get_type = gradio_client_utils.get_type |
| _original_json_schema_to_python_type = gradio_client_utils._json_schema_to_python_type |
|
|
|
|
| def _patched_gradio_get_type(schema): |
| if isinstance(schema, bool): |
| return "Any" |
| return _original_gradio_get_type(schema) |
|
|
|
|
| def _patched_json_schema_to_python_type(schema, defs=None): |
| if isinstance(schema, bool): |
| return "Any" |
| return _original_json_schema_to_python_type(schema, defs) |
|
|
|
|
| gradio_client_utils.get_type = _patched_gradio_get_type |
| gradio_client_utils._json_schema_to_python_type = _patched_json_schema_to_python_type |
|
|
| |
| theme = gr.themes.Soft( |
| primary_hue="blue", |
| secondary_hue="slate", |
| ) |
|
|
| CUSTOM_CSS = """ |
| .gradio-container { max-width: 1000px !important; margin: 0 auto !important; } |
| .container { max-width: 1000px; margin: 0 auto; padding: 0 24px; } |
| |
| .header-bar { padding: 12px 0; border-bottom: 1px solid #e2e8f0; margin-bottom: 24px; display: flex !important; justify-content: space-between !important; align-items: center !important; white-space: nowrap; } |
| .login-center { display: flex; justify-content: center; width: 100%; } |
| #auth-text { white-space: nowrap; margin: 8px 0 16px 0; font-size: 0.95rem; opacity: 0.9; } |
| .gr-button { padding: 14px 28px !important; font-size: 0.9rem !important; border-radius: 12px !important; white-space: nowrap !important; width: auto !important; } |
| .gr-button[aria-label*="Logout"] { min-width: auto !important; display: inline-flex !important; align-items: center !important; justify-content: center !important; } |
| .header-bar .gr-button { padding-left: 28px !important; padding-right: 28px !important; min-width: 220px !important; font-size: 0.9rem !important; } |
| #login-btn, |
| #login-btn.gr-button, |
| #login-btn button, |
| #login-btn .gr-button { |
| display: inline-flex !important; |
| flex-direction: row !important; |
| align-items: center !important; |
| justify-content: center !important; |
| gap: 8px !important; |
| width: auto !important; |
| max-width: 100% !important; |
| min-width: 220px !important; |
| overflow: hidden !important; |
| } |
| #login-btn p, |
| #login-btn span, |
| #login-btn .md, |
| #login-btn .md p { |
| margin: 0 !important; |
| font-size: 0.95rem !important; |
| line-height: 1.2 !important; |
| white-space: nowrap !important; |
| overflow: hidden !important; |
| text-overflow: ellipsis !important; |
| } |
| .dark .header-bar { border-bottom: 1px solid #334155; } |
| |
| .hero-section { margin-bottom: 16px; } |
| .login-container { padding: 12px 0; } |
| .create-strip { padding: 18px; border-radius: 16px; } |
| .create-row { display: flex !important; align-items: center !important; gap: 16px !important; } |
| .create-label { white-space: nowrap; font-size: 0.95rem; margin: 0; min-width: 180px; } |
| .create-row .gr-textbox { flex: 1 !important; } |
| .create-row .gr-textbox textarea, |
| .create-row .gr-textbox input { border-radius: 10px !important; } |
| .create-row .gr-button { border-radius: 10px !important; padding: 10px 20px !important; } |
| .hero-title { font-size: 2rem; font-weight: 700; color: #1e293b; margin: 0 0 8px 0; } |
| .hero-sub { font-size: 1rem; color: #64748b; margin: 0; line-height: 1.5; } |
| |
| .section-card { padding: 24px; border-radius: 16px; background: #f8fafc; margin: 24px; box-shadow: 0 2px 8px rgba(0,0,0,0.06); } |
| .notebook-card { padding: 14px 20px; border-radius: 12px; background: #fff; margin: 8px 0 !important; border: 1px solid #e2e8f0; display: flex; align-items: center; gap: 12px; transition: background 0.15s ease; width: 100% !important; box-sizing: border-box !important; overflow: hidden; } |
| .notebook-card:hover { background: #f8fafc; } |
| .notebook-selected { border: 2px solid #3b82f6 !important; box-shadow: none !important; } |
| |
| .manager-card { border-top: 4px solid #3b82f6; } |
| .sources-card { border-top: 4px solid #14b8a6; } |
| .chat-card { border-top: 4px solid #8b5cf6; } |
| .artifacts-card { border-top: 4px solid #f97316; } |
| |
| .section-title { font-size: 1.125rem; font-weight: 600; color: #1e293b; margin: 0 0 16px 0; } |
| .section-row { display: flex !important; align-items: center !important; gap: 16px !important; margin-bottom: 12px; } |
| .section-row .gr-textbox { flex: 1 !important; } |
| .section-row .gr-button { border-radius: 10px !important; padding: 10px 20px !important; } |
| |
| .status { font-size: 0.875rem; color: #64748b; margin: 16px; padding: 12px 16px; background: #f1f5f9; border-radius: 12px; } |
| |
| @media (prefers-color-scheme: dark) { |
| .hero-title { color: #f1f5f9 !important; } |
| .hero-sub { color: #94a3b8 !important; } |
| .section-card { background: #1e293b !important; box-shadow: 0 2px 8px rgba(0,0,0,0.3); } |
| .section-title { color: #f1f5f9 !important; } |
| .notebook-card { background: #334155 !important; border-color: #475569; } |
| .notebook-card:hover { background: #475569 !important; } |
| .notebook-selected { border: 2px solid #60a5fa !important; box-shadow: none !important; } |
| .manager-card { border-top-color: #60a5fa; } |
| .sources-card { border-top-color: #2dd4bf; } |
| .chat-card { border-top-color: #a78bfa; } |
| .artifacts-card { border-top-color: #fb923c; } |
| .status { color: #94a3b8 !important; background: #334155 !important; } |
| } |
| .dark .hero-title { color: #f1f5f9 !important; } |
| .dark .hero-sub { color: #94a3b8 !important; } |
| .dark .section-card { background: #1e293b !important; } |
| .dark .section-title { color: #f1f5f9 !important; } |
| .dark .notebook-card { background: #334155 !important; border-color: #475569; } |
| .dark .notebook-card:hover { background: #475569 !important; } |
| .dark .status { color: #94a3b8 !important; background: #334155 !important; } |
| |
| #delete-btn { border-radius: 16px; background: #FF0000; } |
| #rename-btn { border-radius: 16px; background: #008000; } |
| #select-btn { border-radius: 16px; } |
| #ingest-url-btn { border-radius: 16px; } |
| """ |
|
|
| def _user_id(profile: gr.OAuthProfile | None) -> str | None: |
| """Extract user_id from HF OAuth profile. None if not logged in.""" |
| if not profile: |
| return None |
| return ( |
| getattr(profile, "id", None) |
| or getattr(profile, "sub", None) |
| or getattr(profile, "preferred_username", None) |
| or getattr(profile, "username", None) |
| or getattr(profile, "name", None) |
| ) |
|
|
|
|
| def _get_notebooks(user_id: str | None): |
| if not user_id: |
| return [] |
| return list_notebooks(user_id) |
|
|
|
|
| def _safe_create(new_name, state, selected_id, profile: gr.OAuthProfile | None = None): |
| """Create notebook with name from text box.""" |
| try: |
| user_id = _user_id(profile) |
| if not user_id: |
| return gr.skip(), gr.skip(), gr.skip(), "Please sign in with Hugging Face" |
| name = (new_name or "").strip() or "Untitled Notebook" |
| nb = create_notebook(user_id, name) |
| if nb: |
| notebooks = _get_notebooks(user_id) |
| new_state = [(n["notebook_id"], n["name"]) for n in notebooks] |
| status = f"Created: {nb['name']}" |
| return "", new_state, nb["notebook_id"], status |
| return gr.skip(), gr.skip(), gr.skip(), "Failed to create" |
| except Exception as e: |
| return gr.skip(), gr.skip(), gr.skip(), f"Error: {e}" |
|
|
|
|
| def _safe_rename(idx, new_name, state, selected_id, profile: gr.OAuthProfile | None = None): |
| """Rename notebook at index.""" |
| try: |
| if idx is None or idx < 0 or idx >= len(state): |
| return gr.skip(), gr.skip(), "Invalid selection" |
| nb_id, _ = state[idx] |
| name = (new_name or "").strip() |
| if not name: |
| return gr.skip(), gr.skip(), "Enter a name." |
| user_id = _user_id(profile) |
| if not user_id: |
| return gr.skip(), gr.skip(), "Please sign in" |
| ok = rename_notebook(user_id, nb_id, name) |
| if ok: |
| notebooks = _get_notebooks(user_id) |
| new_state = [(n["notebook_id"], n["name"]) for n in notebooks] |
| return new_state, selected_id, f"Renamed to: {name}" |
| return gr.skip(), gr.skip(), "Failed to rename" |
| except Exception as e: |
| return gr.skip(), gr.skip(), f"Error: {e}" |
|
|
|
|
| def _safe_delete(idx, state, selected_id, profile: gr.OAuthProfile | None = None): |
| """Delete notebook at index.""" |
| try: |
| if idx is None or idx < 0 or idx >= len(state): |
| return gr.skip(), gr.skip(), "Invalid selection" |
| nb_id, _ = state[idx] |
| user_id = _user_id(profile) |
| if not user_id: |
| return gr.skip(), gr.skip(), "Please sign in" |
| ok = delete_notebook(user_id, nb_id) |
| if ok: |
| notebooks = _get_notebooks(user_id) |
| new_state = [(n["notebook_id"], n["name"]) for n in notebooks] |
| new_selected = notebooks[0]["notebook_id"] if notebooks else None |
| return new_state, new_selected, "Notebook deleted" |
| return gr.skip(), gr.skip(), "Failed to delete" |
| except Exception as e: |
| return gr.skip(), gr.skip(), f"Error: {e}" |
|
|
|
|
| def _initial_load(profile: gr.OAuthProfile | None = None): |
| """Load notebooks on app load. Uses HF OAuth profile for user_id.""" |
| user_id = _user_id(profile) |
| notebooks = _get_notebooks(user_id) |
| state = [(n["notebook_id"], n["name"]) for n in notebooks] |
| selected = notebooks[0]["notebook_id"] if notebooks else None |
| status = f"Signed in as {user_id}" if user_id else "Sign in with Hugging Face to manage notebooks." |
| auth_update = f"You are logged in as {getattr(profile, 'name', None) or user_id} ({_user_id(profile)})" if user_id else "" |
| auth_row_visible = bool(user_id) |
| source_status = "" if user_id else "Sign in with Hugging Face to upload context material." |
| notebook_status_update = gr.update( |
| value="Sign in with Hugging Face to manage notebooks." if not user_id else "", |
| visible=not bool(user_id), |
| ) |
| return ( |
| state, |
| selected, |
| notebook_status_update, |
| auth_update, |
| gr.update(visible=auth_row_visible), |
| gr.update(visible=bool(user_id)), |
| gr.update(visible=not bool(user_id)), |
| source_status, |
| user_id, |
| ) |
|
|
|
|
| def _selected_notebook_text(selected_id, state) -> str: |
| if not selected_id: |
| return "**Selected notebook:** None" |
| name_map = {str(notebook_id): name for notebook_id, name in (state or [])} |
| name = name_map.get(str(selected_id)) |
| if name: |
| return f"**Selected notebook:** {name}" |
| return "**Selected notebook:** Unknown" |
|
|
|
|
| REPORT_SCOPE_LABELS = { |
| "All sources (PDFs, URLs, text)": "all", |
| "PDF uploads only": "pdf", |
| "Web URLs only": "url", |
| "Uploaded text only": "text", |
| } |
|
|
| REPORT_SCOPE_DESCRIPTIONS = { |
| "all": "PDFs, URLs, and uploaded text", |
| "pdf": "uploaded PDFs", |
| "url": "ingested web URLs", |
| "text": "uploaded text files", |
| } |
|
|
| DEFAULT_REPORT_SCOPE_LABEL = "All sources (PDFs, URLs, text)" |
|
|
|
|
| def _resolve_report_scope(label: str) -> tuple[str, str]: |
| value = REPORT_SCOPE_LABELS.get(label, "all") |
| desc = REPORT_SCOPE_DESCRIPTIONS.get(value, "selected sources") |
| return value, desc |
|
|
|
|
| def _generate_report(scope_label, notebook_id, profile: gr.OAuthProfile | None): |
| scope_value, scope_desc = _resolve_report_scope(scope_label) |
| user_id = _user_id(profile) |
| if not user_id: |
| return "Please sign in with Hugging Face before generating a report.", "" |
| if not notebook_id: |
| return "Select a notebook first to generate a report.", "" |
| try: |
| report_text = generate_report(notebook_id, scope_value) |
| status = f"Report ready for {scope_desc}." |
| return status, report_text |
| except ValueError as error: |
| return f"⚠️ {error}", "" |
| except Exception as error: |
| return f"Error generating report: {error}", "" |
|
|
|
|
| def _safe_upload_pdfs(files, selected_id, profile: gr.OAuthProfile | None): |
| """Upload PDF files for the selected notebook.""" |
| try: |
| user_id = _user_id(profile) |
| if not user_id: |
| return "Please sign in with Hugging Face before uploading PDFs." |
| if not selected_id: |
| return "Select a notebook first, then upload PDFs." |
| if not files: |
| return "Choose at least one PDF to upload." |
|
|
| if isinstance(files, str): |
| file_paths = [files] |
| else: |
| file_paths = [] |
| for file_item in files: |
| file_path = getattr(file_item, "name", file_item) |
| if file_path: |
| file_paths.append(file_path) |
|
|
| if not file_paths: |
| return "No files were received. Try uploading again." |
|
|
| target_dir = Path("data") / "uploads" / user_id / str(selected_id) |
| target_dir.mkdir(parents=True, exist_ok=True) |
|
|
| uploaded = [] |
| total_chunks = 0 |
| for file_path in file_paths: |
| source_path = Path(file_path) |
| if source_path.suffix.lower() != ".pdf": |
| continue |
|
|
| destination = target_dir / source_path.name |
| if destination.exists(): |
| index = 1 |
| while True: |
| candidate = target_dir / f"{source_path.stem}_{index}{source_path.suffix}" |
| if not candidate.exists(): |
| destination = candidate |
| break |
| index += 1 |
|
|
| shutil.copy2(source_path, destination) |
| uploaded.append(destination.name) |
| total_chunks += ingest_pdf_chunks(str(selected_id), destination.name, destination) |
|
|
| if not uploaded: |
| return "Only .pdf files are allowed." |
|
|
| return f"Uploaded {len(uploaded)} PDF(s): {', '.join(uploaded)}. Indexed {total_chunks} chunk(s) for RAG." |
| except Exception as error: |
| return f"Error uploading PDFs: {error}" |
|
|
|
|
| def _list_uploaded_pdfs(selected_id, profile: gr.OAuthProfile | None = None): |
| """List uploaded PDFs for the selected notebook.""" |
| user_id = _user_id(profile) |
| if not user_id or not selected_id: |
| return gr.update(choices=[], value=None) |
|
|
| target_dir = Path("data") / "uploads" / user_id / str(selected_id) |
| if not target_dir.exists(): |
| return gr.update(choices=[], value=None) |
|
|
| pdf_names = sorted([path.name for path in target_dir.glob("*.pdf")]) |
| selected_name = pdf_names[0] if pdf_names else None |
| return gr.update(choices=pdf_names, value=selected_name) |
|
|
|
|
| def _safe_remove_pdf(file_name, selected_id, profile: gr.OAuthProfile | None = None): |
| """Remove one uploaded PDF from the selected notebook.""" |
| try: |
| user_id = _user_id(profile) |
| if not user_id: |
| return "Please sign in with Hugging Face before removing PDFs." |
| if not selected_id: |
| return "Select a notebook first." |
| if not file_name: |
| return "Select a PDF to remove." |
|
|
| safe_name = Path(file_name).name |
| target_file = Path("data") / "uploads" / user_id / str(selected_id) / safe_name |
| if not target_file.exists() or target_file.suffix.lower() != ".pdf": |
| return "Selected PDF was not found." |
|
|
| target_file.unlink() |
| remove_chunks_for_source(str(selected_id), safe_name) |
| return f"Removed PDF: {safe_name}" |
| except Exception as error: |
| return f"Error removing PDF: {error}" |
| |
| def _url_source_id(url: str) -> str: |
| """Stable source_id so re-ingesting the same URL overwrites old chunks.""" |
| h = hashlib.sha256(url.encode("utf-8", errors="ignore")).hexdigest()[:16] |
| return f"url_{h}" |
|
|
|
|
| def _safe_ingest_url(url, selected_id, profile: gr.OAuthProfile | None = None): |
| """Ingest one URL into chunks table for the selected notebook.""" |
| try: |
| user_id = _user_id(profile) |
| if not user_id: |
| return "", "Please sign in with Hugging Face before ingesting a URL." |
| if not selected_id: |
| return "", "Select a notebook first, then ingest a URL." |
|
|
| cleaned = (url or "").strip() |
| if not cleaned: |
| return "", "Enter a URL." |
| if not (cleaned.startswith("http://") or cleaned.startswith("https://")): |
| return "", "URL must start with http:// or https://" |
|
|
| source_id = _url_source_id(cleaned) |
| chunk_count = ingest_url_chunks(str(selected_id), source_id, cleaned) |
|
|
| if chunk_count == 0: |
| return "", ( |
| "Ingested URL but extracted 0 chunks. Page may be JS-rendered/blocked/non-text. " |
| "Try a simpler static page (example.com / Wikipedia)." |
| ) |
|
|
| return "", f"Ingested URL. Indexed {chunk_count} chunk(s). Source: {cleaned}" |
| except Exception as error: |
| return "", f"Error ingesting URL: {error}" |
| |
| def _safe_remove_url(url, selected_id, profile: gr.OAuthProfile | None = None): |
| try: |
| user_id = _user_id(profile) |
| if not user_id: |
| return "", "Please sign in with Hugging Face before ingesting a URL." |
| if not selected_id: |
| return "", "Select a notebook first, then remove a URL." |
| |
| cleaned = (url or "").strip() |
| if not cleaned: |
| return "", "Enter a URL." |
| if not (cleaned.startswith("http://") or cleaned.startswith("https://")): |
| return "", "URL must start with http:// or https://" |
|
|
| source_id = _url_source_id(cleaned) |
| remove_chunks_for_source(str(selected_id), source_id) |
| return "", f"Removed URL: {cleaned}" |
| except Exception as error: |
| return "", f"Error removing URL: {error}" |
|
|
|
|
|
|
| |
| def _do_upload(text_content, title, notebook_id, profile: gr.OAuthProfile | None): |
| """Handle direct text input and ingestion.""" |
| from backend.ingestion_txt import ingest_txt |
|
|
| user_id = _user_id(profile) |
|
|
| if not user_id: |
| return "Please sign in first." |
| if not notebook_id: |
| return "Please select a notebook first." |
| if not text_content or not text_content.strip(): |
| return "No text entered." |
|
|
| try: |
| filename = (title or "").strip() |
| if not filename: |
| filename = f"text_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
| if not filename.endswith(".txt"): |
| filename = filename + ".txt" |
|
|
| file_bytes = text_content.encode("utf-8") |
|
|
| result = ingest_txt( |
| file_bytes=file_bytes, |
| filename=filename, |
| notebook_id=notebook_id, |
| user_id=user_id |
| ) |
|
|
| meta = result["metadata"] |
| return ( |
| f" **{result['filename']}** saved successfully!\n\n" |
| f"- Size: {meta['size_bytes'] / 1024:.1f} KB" |
| ) |
|
|
| except ValueError as e: |
| return f" {str(e)}" |
| except Exception as e: |
| return f"Unexpected error: {str(e)}" |
|
|
| def _format_sources(sources: list[dict]) -> str: |
| if not sources: |
| return "No sources yet." |
| lines = ["| Filename | Type | Status | Words |", |
| "|----------|------|--------|-------|"] |
| for s in sources: |
| meta = s.get("metadata") or {} |
| words = meta.get("word_count", "—") |
| lines.append(f"| {s['filename']} | {s['file_type']} | {s['status']} | {words} |") |
| return "\n".join(lines) |
|
|
|
|
| def _load_sources(notebook_id, profile: gr.OAuthProfile | None): |
| from backend.ingestion_txt import list_sources |
| if not notebook_id: |
| return "" |
| sources = list_sources(notebook_id) |
| return _format_sources(sources) |
|
|
|
|
| def _safe_generate_podcast(notebook_id, profile: gr.OAuthProfile | None = None): |
| user_id = _user_id(profile) |
| if not user_id: |
| return "Please sign in first.", "" |
| if not notebook_id: |
| return "Please select a notebook first.", "" |
|
|
| try: |
| result = generate_podcast(notebook_id=str(notebook_id), user_id=user_id) |
| status = ( |
| f"Podcast generated. Artifact: {result['artifact_id'] or 'saved'} | " |
| f"Sources: {result['sources_count']} | Chunks: {result['chunks_used']}" |
| ) |
| return status, result["script"] |
| except Exception as error: |
| return f"Error generating podcast: {error}", "" |
|
|
|
|
| def _safe_generate_podcast_audio(notebook_id, script, profile: gr.OAuthProfile | None = None): |
| user_id = _user_id(profile) |
| if not user_id: |
| return "Please sign in first.", None |
| if not notebook_id: |
| return "Please select a notebook first.", None |
| if not script or not script.strip(): |
| return "Generate a podcast script first.", None |
|
|
| try: |
| result = generate_podcast_audio(notebook_id=str(notebook_id), user_id=user_id, script=script) |
| status = f"Podcast audio generated. Artifact: {result['artifact_id'] or 'saved'}" |
| return status, result["audio_path"] |
| except Exception as error: |
| return f"Error generating podcast audio: {error}", None |
|
|
| |
| def _get_notebook_pdfs(notebook_id, profile: gr.OAuthProfile | None): |
| user_id = _user_id(profile) |
| if not user_id or not notebook_id: |
| return gr.update(choices=[], value=None, visible=False) |
| |
| target_dir = Path("data") / "uploads" / user_id / str(notebook_id) |
| if not target_dir.exists(): |
| return gr.update(choices=[], value=None, visible=False) |
| |
| pdfs = sorted([p.name for p in target_dir.glob("*.pdf")]) |
| return gr.update(choices=pdfs, value=pdfs[0] if pdfs else None, visible=True) |
|
|
|
|
| def _generate_quiz(notebook_id, source_type, pdf_source_id, profile: gr.OAuthProfile | None): |
| from backend.quiz_service import generate_quiz |
|
|
| user_id = _user_id(profile) |
| if not user_id: |
| return "Please sign in first.", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" |
| if not notebook_id: |
| return "Please select a notebook first.", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" |
|
|
| type_map = {"Text": "txt", "PDF": "pdf", "URL": "url", "All": "all"} |
| source_type_key = type_map.get(source_type, "all") |
|
|
| if source_type_key == "pdf" and not pdf_source_id: |
| return "Pick a PDF first.", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" |
|
|
| try: |
| result = generate_quiz(notebook_id, source_type=source_type_key, source_id=pdf_source_id) |
| questions = result["questions"] |
| updates = [] |
| for i in range(5): |
| if i < len(questions): |
| q = questions[i] |
| q_label = f"**Q{i+1}. {q['question']}**" |
| if q["type"] == "multiple_choice": |
| updates += [gr.update(visible=True), gr.update(value=q_label), gr.update(choices=q["options"], value=None, visible=True), gr.update(value="", visible=False)] |
| elif q["type"] == "true_false": |
| updates += [gr.update(visible=True), gr.update(value=q_label), gr.update(choices=["True", "False"], value=None, visible=True), gr.update(value="", visible=False)] |
| else: |
| |
| updates += [gr.update(visible=True), gr.update(value=q_label), gr.update(choices=[], value=None, visible=False), gr.update(value="", visible=True)] |
| else: |
| updates += [gr.update(visible=False), gr.update(value=""), gr.update(choices=[], value=None, visible=False), gr.update(value="", visible=False)] |
| return "Quiz generated!", questions, *updates, gr.update(visible=True), "" |
| except Exception as e: |
| return f" {e}", [], *([gr.update(visible=False)] * 5 * 4), gr.update(visible=False), "" |
|
|
|
|
| def _submit_quiz(questions, *answers): |
| if not questions: |
| return " No quiz loaded." |
| score = 0 |
| lines = [] |
| for i, q in enumerate(questions): |
| radio_ans = answers[i] or "" |
| text_ans = answers[i + 5] or "" |
| user_ans = text_ans.strip() if q["type"] == "short_answer" else radio_ans.strip() |
| correct = q["answer"].strip() |
|
|
| if not user_ans: |
| is_correct = False |
| elif q["type"] == "multiple_choice": |
| user_letter = user_ans.split(".")[0].strip().upper() |
| correct_letter = correct[0].upper() |
| is_correct = user_letter == correct_letter |
| elif q["type"] == "true_false": |
| is_correct = user_ans.lower() == correct.lower() |
| else: |
| is_correct = user_ans.lower() in correct.lower() or correct.lower() in user_ans.lower() |
|
|
| if is_correct: |
| score += 1 |
| lines.append(f"✅ **Q{i+1}**: Correct! *(Answer: {correct})*") |
| else: |
| lines.append(f"❌ **Q{i+1}**: Incorrect. *(Your answer: {user_ans or 'blank'} | Correct: {correct})*") |
|
|
| lines.append(f"\n**Score: {score}/{len(questions)}**") |
| return "\n\n".join(lines) |
| def _chat_history_to_pairs(messages: list[dict]) -> list[tuple[str, str]]: |
| """Convert load_chat output to Gradio Chatbot format [(user, assistant), ...].""" |
| pairs = [] |
| i = 0 |
| while i < len(messages): |
| m = messages[i] |
| if m["role"] == "user": |
| user_content = m["content"] or "" |
| asst_content = "" |
| if i + 1 < len(messages) and messages[i + 1]["role"] == "assistant": |
| asst_content = messages[i + 1]["content"] or "" |
| i += 1 |
| pairs.append((user_content, asst_content)) |
| i += 1 |
| return pairs |
|
|
|
|
| def _load_chat_history(notebook_id, user_id: str | None) -> tuple[list[tuple[str, str]], list[tuple[str, str]]]: |
| """Load chat for notebook. Returns (history_pairs, history_pairs) for State and Chatbot. |
| Only loads if notebook belongs to user (ownership validation). |
| """ |
| if not notebook_id: |
| return [], [] |
| messages = load_chat(notebook_id, user_id) |
| pairs = _chat_history_to_pairs(messages) |
| return pairs, pairs |
|
|
|
|
| def _format_citations(chunks: list[dict]) -> str: |
| """Format retrieved chunks for citation display.""" |
| if not chunks: |
| return "" |
| lines = ["**Sources cited:**", ""] |
| for i, c in enumerate(chunks, 1): |
| meta = c.get("metadata") or {} |
| source_label = meta.get("url") or meta.get("file_name") or meta.get("file_path") or "Source" |
| content = (c.get("content") or "")[:300] |
| if len(c.get("content") or "") > 300: |
| content += "..." |
| content = content.replace("\n", " ") |
| lines.append(f"**[{i}]** *{source_label}*") |
| lines.append(f"> {content}") |
| lines.append("") |
| return "\n".join(lines) |
|
|
|
|
| def _on_chat_submit(query, notebook_id, chat_history, profile: gr.OAuthProfile | None): |
| """Handle chat submit: call RAG, return updated history and citations.""" |
| if not notebook_id: |
| return "", chat_history, "Select a notebook first.", "", gr.update(visible=False) |
| if not query or not query.strip(): |
| return "", chat_history, "Enter a message.", "", gr.update(visible=False) |
| user_id = _user_id(profile) |
| if not user_id: |
| return "", chat_history, "Please sign in first.", "", gr.update(visible=False) |
| try: |
| answer, updated, chunks = rag_chat(notebook_id, query.strip(), chat_history, user_id=user_id) |
| citations_md = _format_citations(chunks) |
| accordion_update = gr.update(visible=True) if chunks else gr.update(visible=False) |
| return "", updated, "", citations_md, accordion_update |
| except Exception as e: |
| return "", chat_history, f"Error: {e}", "", gr.update(visible=False) |
|
|
| def _get_quiz_pdfs(source_type, notebook_id): |
| if source_type != "PDF": |
| return gr.update(visible=False, choices=[], value=None) |
| if not notebook_id: |
| return gr.update(visible=False, choices=[], value=None) |
| |
| |
| base = Path("data") / "uploads" |
| pdfs = [] |
| if base.exists(): |
| for user_dir in base.iterdir(): |
| nb_dir = user_dir / str(notebook_id) |
| if nb_dir.exists(): |
| pdfs = sorted([p.name for p in nb_dir.glob("*.pdf")]) |
| break |
| |
| print(f"DEBUG quiz pdfs found: {pdfs}") |
| return gr.update(visible=True, choices=pdfs, value=pdfs[0] if pdfs else None) |
|
|
| def _quiz_pdf_dropdown_update(source_type, notebook_id, profile: gr.OAuthProfile | None): |
| if source_type != "PDF": |
| return gr.update(visible=False, choices=[], value=None) |
| |
| if not notebook_id: |
| return gr.update(visible=True, choices=[], value=None) |
| |
| user_id = _user_id(profile) |
| |
| |
| if user_id: |
| target_dir = Path("data") / "uploads" / user_id / str(notebook_id) |
| if target_dir.exists(): |
| pdfs = sorted([p.name for p in target_dir.glob("*.pdf")]) |
| return gr.update(visible=True, choices=pdfs, value=pdfs[0] if pdfs else None) |
| |
| |
| base = Path("data") / "uploads" |
| if base.exists(): |
| for user_dir in base.iterdir(): |
| if not user_dir.is_dir(): |
| continue |
| nb_dir = user_dir / str(notebook_id) |
| if nb_dir.exists(): |
| pdfs = sorted([p.name for p in nb_dir.glob("*.pdf")]) |
| print(f"DEBUG (local fallback): notebook_id={notebook_id}, pdfs={pdfs}") |
| return gr.update(visible=True, choices=pdfs, value=pdfs[0] if pdfs else None) |
| |
| return gr.update(visible=True, choices=[], value=None) |
|
|
| def _generate_btn_update(source_type, pdf_name): |
| if source_type == "PDF": |
| return gr.update(interactive=bool(pdf_name)) |
| return gr.update(interactive=True) |
|
|
| with gr.Blocks( |
| title="NotebookLM Clone - Notebooks", |
| theme=theme, |
| css=CUSTOM_CSS, |
| ) as demo: |
| with gr.Row(elem_classes=["header-bar"]): |
| gr.Markdown("### 📓 NotebookLM Clone") |
| login_btn = gr.LoginButton(value="Login with Hugging Face", size="lg", elem_id="login-btn") |
|
|
| with gr.Row(visible=False) as auth_info_row: |
| auth_text = gr.Markdown("", elem_id="auth-text") |
|
|
| gr.HTML(""" |
| <div class="container hero-section"> |
| <h1 class="hero-title">📓 NotebookLM Clone</h1> |
| <p class="hero-sub">Chat with your documents. Generate reports, quizzes, and podcasts.</p> |
| </div> |
| """) |
|
|
| with gr.Column(visible=False, elem_classes=["login-container"]) as login_container: |
| gr.Markdown("**Sign in with Hugging Face to access your notebooks.**", elem_classes=["login-center"]) |
|
|
| with gr.Column(visible=False) as app_content: |
| nb_state = gr.State([]) |
| selected_notebook_id = gr.State(None) |
| chat_history_state = gr.State([]) |
| quiz_state = gr.State([]) |
| user_id_state = gr.State(None) |
|
|
| with gr.Group(elem_classes=["section-card", "manager-card"]): |
| gr.Markdown("**Notebook Manager**", elem_classes=["section-title"]) |
| selected_notebook_md = gr.Markdown("**Selected notebook:** None", elem_classes=["status"]) |
|
|
| with gr.Group(elem_classes=["create-strip"]): |
| with gr.Row(elem_classes=["create-row"]): |
| gr.Markdown("Create new notebook", elem_classes=["create-label"]) |
| create_txt = gr.Textbox( |
| placeholder="Enter new notebook name", |
| show_label=False, |
| container=False, |
| value="", |
| ) |
| create_btn = gr.Button("Create", variant="primary", size="sm") |
|
|
| notebook_status = gr.Markdown("", elem_classes=["status"], visible=False) |
|
|
| @gr.render(inputs=[nb_state, selected_notebook_id]) |
| def render_notebooks(state, selected_id): |
| if not state: |
| gr.Markdown("No notebooks yet. Create one to get started.") |
| else: |
| for i, (nb_id, name) in enumerate(state): |
| idx = i |
| is_selected = str(nb_id) == str(selected_id) |
| row_class = ["notebook-card", "notebook-selected"] if is_selected else ["notebook-card"] |
| with gr.Row(elem_classes=row_class): |
| name_txt = gr.Textbox( |
| value=name, |
| show_label=False, |
| scale=4, |
| min_width=240, |
| key=f"nb-name-{nb_id}", |
| ) |
| select_btn = gr.Button( |
| "Selected" if is_selected else "Select", |
| variant="primary" if is_selected else "secondary", |
| scale=1, |
| min_width=90, |
| size="sm", |
| ) |
| rename_btn = gr.Button("Rename", variant="secondary", scale=1, min_width=80, size="sm") |
| delete_btn = gr.Button("Delete", variant="stop", scale=1, min_width=80, size="sm") |
|
|
| def on_select(nb_id=nb_id): |
| return nb_id |
|
|
| def on_select_status(name=name): |
| return f"Selected notebook: {name}" |
|
|
| select_btn.click( |
| on_select, |
| inputs=None, |
| outputs=[selected_notebook_id], |
| api_name=False, |
| ).then(on_select_status, inputs=None, outputs=[notebook_status], api_name=False) |
|
|
| rename_btn.click( |
| _safe_rename, |
| inputs=[gr.State(idx), name_txt, nb_state, selected_notebook_id], |
| outputs=[nb_state, selected_notebook_id, notebook_status], |
| api_name=False, |
| ) |
|
|
| delete_btn.click( |
| _safe_delete, |
| inputs=[gr.State(idx), nb_state, selected_notebook_id], |
| outputs=[nb_state, selected_notebook_id, notebook_status], |
| api_name=False, |
| ) |
|
|
| with gr.Group(elem_classes=["section-card", "sources-card"]): |
| gr.Markdown("**Upload Sources**", elem_classes=["section-title"]) |
| gr.Markdown("*Add PDF, URL, and text content into the selected notebook.*") |
|
|
| source_status = gr.Markdown("", elem_classes=["status"]) |
|
|
| with gr.Row(elem_classes=["section-row"]): |
| pdf_upload_btn = gr.UploadButton( |
| "Upload PDFs", |
| file_types=[".pdf"], |
| file_count="multiple", |
| type="filepath", |
| variant="secondary", |
| ) |
|
|
| with gr.Row(elem_classes=["section-row"]): |
| uploaded_pdf_dd = gr.Dropdown( |
| label="Uploaded PDFs", |
| choices=[], |
| value=None, |
| scale=3, |
| allow_custom_value=False, |
| ) |
| remove_pdf_btn = gr.Button("Remove selected PDF", variant="stop", scale=1) |
|
|
| with gr.Row(elem_classes=["section-row"]): |
| url_txt = gr.Textbox( |
| label="Ingest web URL", |
| placeholder="https://example.com", |
| value="", |
| scale=3, |
| ) |
| ingest_url_btn = gr.Button("Ingest URL", variant="primary", scale=1) |
| remove_url_btn = gr.Button("Delete URL", variant="stop", scale=1) |
|
|
| gr.Markdown("**Text Source**", elem_classes=["section-title"]) |
| with gr.Row(elem_classes=["section-row"]): |
| txt_title = gr.Textbox( |
| label="Title", |
| placeholder="Give this text a name (e.g. 'Lecture Notes Week 1')", |
| scale=1, |
| ) |
|
|
| txt_input = gr.Textbox( |
| label="Text Content", |
| placeholder="Paste or type your text here...", |
| lines=10, |
| ) |
| submit_btn = gr.Button("Save & Process", variant="primary") |
| upload_status = gr.Markdown("", elem_classes=["status"]) |
| sources_display = gr.Markdown("No sources yet.") |
|
|
| with gr.Group(elem_classes=["section-card", "chat-card"]): |
| gr.Markdown("**Chat**", elem_classes=["section-title"]) |
| gr.Markdown("*Ask questions about your notebook sources. Answers are grounded in retrieved chunks with citations.*") |
| chatbot = gr.Chatbot(label="Chat history", height=400) |
| chat_input = gr.Textbox( |
| label="Message", |
| placeholder="Ask a question about your sources...", |
| show_label=False, |
| lines=2, |
| ) |
| chat_submit_btn = gr.Button("Send", variant="primary") |
| chat_status = gr.Markdown("", elem_classes=["status"]) |
| citations_display = gr.Accordion("📎 Sources cited (from last response)", open=True, visible=False) |
| with citations_display: |
| citations_md = gr.Markdown("", elem_classes=["status"]) |
|
|
| with gr.Group(elem_classes=["section-card", "artifacts-card"]): |
| gr.Markdown("**Artifacts**", elem_classes=["section-title"]) |
|
|
| gr.Markdown("**Report**") |
| with gr.Row(elem_classes=["section-row"]): |
| report_scope_dd = gr.Dropdown( |
| label="Report scope", |
| choices=list(REPORT_SCOPE_LABELS.keys()), |
| value=DEFAULT_REPORT_SCOPE_LABEL, |
| scale=3, |
| ) |
| report_btn = gr.Button("Generate report", variant="primary", scale=1) |
| report_status = gr.Markdown("Select a scope and click generate.", elem_classes=["status"]) |
| report_output = gr.Markdown("", elem_id="report-output") |
|
|
| gr.Markdown("**Podcast**") |
| with gr.Row(elem_classes=["section-row"]): |
| podcast_btn = gr.Button("Generate Podcast", variant="primary") |
| podcast_audio_btn = gr.Button("Generate Podcast Audio", variant="secondary") |
| podcast_status = gr.Markdown("", elem_classes=["status"]) |
| podcast_script = gr.Markdown("") |
| podcast_audio = gr.Audio(label="Podcast Audio", type="filepath") |
|
|
| gr.Markdown("**Quiz**") |
| gr.Markdown("Select a source type then generate a quiz.") |
| quiz_source_type = gr.Radio( |
| choices=["Text", "PDF", "URL", "All"], |
| value="All", |
| label="Source type", |
| ) |
| quiz_pdf_dd = gr.Dropdown( |
| label="Select PDF", |
| choices=[], |
| value=None, |
| visible=False, |
| ) |
| generate_quiz_btn = gr.Button("Generate Quiz", variant="primary") |
| quiz_status = gr.Markdown("") |
|
|
| quiz_components = [] |
| for i in range(5): |
| with gr.Group(visible=False) as q_group: |
| q_text = gr.Markdown("") |
| q_radio = gr.Radio(choices=[], label="Your answer", visible=False) |
| q_textbox = gr.Textbox(label="Your answer", visible=False) |
| quiz_components.append({"group": q_group, "text": q_text, "radio": q_radio, "textbox": q_textbox}) |
|
|
| submit_quiz_btn = gr.Button("Submit Answers", variant="secondary", visible=False) |
| quiz_results = gr.Markdown("") |
|
|
| demo.load( |
| _initial_load, |
| inputs=None, |
| outputs=[nb_state, selected_notebook_id, notebook_status, auth_text, auth_info_row, app_content, login_container, source_status, user_id_state], |
| api_name=False, |
| ) |
| demo.load(_list_uploaded_pdfs, inputs=[selected_notebook_id], outputs=[uploaded_pdf_dd], api_name=False) |
| demo.load(_load_sources, inputs=[selected_notebook_id], outputs=[sources_display], api_name=False) |
| demo.load(_selected_notebook_text, inputs=[selected_notebook_id, nb_state], outputs=[selected_notebook_md], api_name=False) |
|
|
| def _on_notebook_select_for_chat(notebook_id, user_id): |
| hist, _ = _load_chat_history(notebook_id, user_id) |
| return hist, hist, "", gr.update(visible=False) |
|
|
| selected_notebook_id.change( |
| _on_notebook_select_for_chat, |
| inputs=[selected_notebook_id, user_id_state], |
| outputs=[chat_history_state, chatbot, citations_md, citations_display], |
| api_name=False, |
| ) |
| selected_notebook_id.change(_list_uploaded_pdfs, inputs=[selected_notebook_id], outputs=[uploaded_pdf_dd], api_name=False) |
| selected_notebook_id.change(_load_sources, inputs=[selected_notebook_id], outputs=[sources_display], api_name=False) |
| selected_notebook_id.change(_selected_notebook_text, inputs=[selected_notebook_id, nb_state], outputs=[selected_notebook_md], api_name=False) |
| nb_state.change(_selected_notebook_text, inputs=[selected_notebook_id, nb_state], outputs=[selected_notebook_md], api_name=False) |
|
|
| create_btn.click( |
| _safe_create, |
| inputs=[create_txt, nb_state, selected_notebook_id], |
| outputs=[create_txt, nb_state, selected_notebook_id, notebook_status], |
| api_name=False, |
| ).then(_list_uploaded_pdfs, inputs=[selected_notebook_id], outputs=[uploaded_pdf_dd]) |
|
|
| pdf_upload_btn.upload( |
| _safe_upload_pdfs, |
| inputs=[pdf_upload_btn, selected_notebook_id], |
| outputs=[source_status], |
| api_name=False, |
| ).then(_list_uploaded_pdfs, inputs=[selected_notebook_id], outputs=[uploaded_pdf_dd]) |
|
|
| ingest_url_btn.click( |
| _safe_ingest_url, |
| inputs=[url_txt, selected_notebook_id], |
| outputs=[url_txt, source_status], |
| api_name=False, |
| ) |
|
|
| remove_url_btn.click( |
| _safe_remove_url, |
| inputs=[url_txt, selected_notebook_id], |
| outputs=[url_txt, source_status], |
| api_name=False |
| ) |
|
|
| remove_pdf_btn.click( |
| _safe_remove_pdf, |
| inputs=[uploaded_pdf_dd, selected_notebook_id], |
| outputs=[source_status], |
| api_name=False, |
| ).then(_list_uploaded_pdfs, inputs=[selected_notebook_id], outputs=[uploaded_pdf_dd]) |
|
|
| submit_btn.click( |
| _do_upload, |
| inputs=[txt_input, txt_title, selected_notebook_id], |
| outputs=[upload_status], |
| ).then(_load_sources, inputs=[selected_notebook_id], outputs=[sources_display]) |
|
|
| report_btn.click( |
| _generate_report, |
| inputs=[report_scope_dd, selected_notebook_id], |
| outputs=[report_status, report_output], |
| api_name=False, |
| ) |
|
|
| podcast_btn.click( |
| _safe_generate_podcast, |
| inputs=[selected_notebook_id], |
| outputs=[podcast_status, podcast_script], |
| api_name=False, |
| ) |
|
|
| podcast_audio_btn.click( |
| _safe_generate_podcast_audio, |
| inputs=[selected_notebook_id, podcast_script], |
| outputs=[podcast_status, podcast_audio], |
| api_name=False, |
| ) |
|
|
| quiz_source_type.change( |
| _quiz_pdf_dropdown_update, |
| inputs=[quiz_source_type, selected_notebook_id], |
| outputs=[quiz_pdf_dd], |
| api_name=False, |
| ).then( |
| _generate_btn_update, |
| inputs=[quiz_source_type, quiz_pdf_dd], |
| outputs=[generate_quiz_btn], |
| api_name=False, |
| ) |
|
|
| quiz_pdf_dd.change( |
| _generate_btn_update, |
| inputs=[quiz_source_type, quiz_pdf_dd], |
| outputs=[generate_quiz_btn], |
| api_name=False, |
| ) |
|
|
| |
|
|
| quiz_all_outputs = [quiz_status, quiz_state] |
| for c in quiz_components: |
| quiz_all_outputs += [c["group"], c["text"], c["radio"], c["textbox"]] |
| quiz_all_outputs += [submit_quiz_btn, quiz_results] |
|
|
| generate_quiz_btn.click( |
| lambda: gr.update(value="Generating quiz..."), |
| inputs=[], |
| outputs=[quiz_status], |
| api_name=False, |
| ).then( |
| _generate_quiz, |
| inputs=[selected_notebook_id, quiz_source_type, quiz_pdf_dd], |
| outputs=quiz_all_outputs, |
| api_name=False, |
| ) |
|
|
| submit_quiz_btn.click( |
| _submit_quiz, |
| inputs=[quiz_state] + [c["radio"] for c in quiz_components] + [c["textbox"] for c in quiz_components], |
| outputs=[quiz_results], |
| api_name=False, |
| ) |
|
|
| chat_submit_btn.click( |
| _on_chat_submit, |
| inputs=[chat_input, selected_notebook_id, chat_history_state], |
| outputs=[chat_input, chat_history_state, chat_status, citations_md, citations_display], |
| api_name=False, |
| ).then( |
| lambda h: (h, h), |
| inputs=[chat_history_state], |
| outputs=[chat_history_state, chatbot], |
| ) |
|
|
| if __name__ == "__main__": |
| _log("5. Launching Gradio...") |
| demo.launch() |
|
|