Spaces:
Build error
Build error
| import streamlit as st | |
| from groq import Groq | |
| import json | |
| import os | |
| import time | |
| import numpy as np | |
| import tempfile | |
| from io import BytesIO | |
| from md2pdf.core import md2pdf | |
| from dotenv import load_dotenv | |
| from datetime import datetime | |
| import requests | |
| from requests.exceptions import RequestException | |
| from typing import Optional, Dict, Any | |
| from download import download_video_audio, delete_download | |
| from st_audiorec import st_audiorec # Import the audio recorder component | |
| # Set max file size for audio uploads (40MB) | |
| MAX_FILE_SIZE = 41943040 # 40MB | |
| FILE_TOO_LARGE_MESSAGE = "File too large. Maximum size is 40MB." | |
| # Load environment variables - will also load from Hugging Face secrets | |
| load_dotenv() | |
| # Initialize session states | |
| if 'api_key' not in st.session_state: | |
| st.session_state.api_key = os.environ.get("GROQ_API_KEY", "") | |
| # For Streamlit Cloud or Hugging Face | |
| if not st.session_state.api_key and st.secrets and "GROQ_API_KEY" in st.secrets: | |
| st.session_state.api_key = st.secrets["GROQ_API_KEY"] | |
| if 'transcript' not in st.session_state: | |
| st.session_state.transcript = "" | |
| if 'groq_client' not in st.session_state: | |
| st.session_state.groq_client = None | |
| if 'transcription_error' not in st.session_state: | |
| st.session_state.transcription_error = None | |
| # Set page configuration | |
| st.set_page_config( | |
| page_title="NoteME", | |
| page_icon | |
| ="π§ββοΈ", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Fixed model selections | |
| LLM_MODEL = "deepseek-r1-distill-llama-70b" | |
| # Configure retry parameters without backoff library | |
| MAX_RETRIES = 5 | |
| INITIAL_WAIT = 0.5 | |
| MAX_WAIT = 30 | |
| # Initialize Groq client with improved error handling | |
| def initialize_groq_client(api_key: str) -> Optional[Groq]: | |
| """Initialize Groq client with the provided API key""" | |
| if not api_key: | |
| return None | |
| try: | |
| client = Groq(api_key=api_key) | |
| # Perform a simple test call to validate the API key | |
| client.models.list() | |
| return client | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "401" in error_msg: | |
| st.error("β Invalid API key: Authentication failed") | |
| elif "403" in error_msg: | |
| st.error("β API key doesn't have permission to access Groq API") | |
| else: | |
| st.error(f"β Failed to initialize Groq client: {error_msg}") | |
| return None | |
| # Define custom exception for Groq API errors | |
| class GroqAPIError(Exception): | |
| """Custom exception for Groq API errors""" | |
| def __init__(self, message, status_code=None, response=None): | |
| self.message = message | |
| self.status_code = status_code | |
| self.response = response | |
| super().__init__(self.message) | |
| class GenerationStatistics: | |
| def __init__(self, input_time=0, output_time=0, input_tokens=0, output_tokens=0, total_time=0, model_name=LLM_MODEL): | |
| self.input_time = input_time | |
| self.output_time = output_time | |
| self.input_tokens = input_tokens | |
| self.output_tokens = output_tokens | |
| self.total_time = total_time # Sum of queue, prompt (input), and completion (output) times | |
| self.model_name = model_name | |
| def get_input_speed(self): | |
| """ Tokens per second calculation for input """ | |
| if self.input_time != 0: | |
| return self.input_tokens / self.input_time | |
| else: | |
| return 0 | |
| def get_output_speed(self): | |
| """ Tokens per second calculation for output """ | |
| if self.output_time != 0: | |
| return self.output_tokens / self.output_time | |
| else: | |
| return 0 | |
| def add(self, other): | |
| """ Add statistics from another GenerationStatistics object to this one. """ | |
| if not isinstance(other, GenerationStatistics): | |
| raise TypeError("Can only add GenerationStatistics objects") | |
| self.input_time += other.input_time | |
| self.output_time += other.output_time | |
| self.input_tokens += other.input_tokens | |
| self.output_tokens += other.output_tokens | |
| self.total_time += other.total_time | |
| def __str__(self): | |
| return (f"\n## {self.get_output_speed():.2f} T/s β‘\nRound trip time: {self.total_time:.2f}s Model: {self.model_name}\n\n" | |
| f"| Metric | Input | Output | Total |\n" | |
| f"|-----------------|----------------|-----------------|----------------|\n" | |
| f"| Speed (T/s) | {self.get_input_speed():.2f} | {self.get_output_speed():.2f} | {(self.input_tokens + self.output_tokens) / self.total_time if self.total_time != 0 else 0:.2f} |\n" | |
| f"| Tokens | {self.input_tokens} | {self.output_tokens} | {self.input_tokens + self.output_tokens} |\n" | |
| f"| Inference Time (s) | {self.input_time:.2f} | {self.output_time:.2f} | {self.total_time:.2f} |") | |
| class NoteSection: | |
| def __init__(self, structure, transcript): | |
| self.structure = structure | |
| self.contents = {title: "" for title in self.flatten_structure(structure)} | |
| self.placeholders = {title: st.empty() for title in self.flatten_structure(structure)} | |
| with st.expander("Raw Transcript", expanded=False): | |
| st.markdown(transcript) | |
| def flatten_structure(self, structure): | |
| sections = [] | |
| for title, content in structure.items(): | |
| sections.append(title) | |
| if isinstance(content, dict): | |
| sections.extend(self.flatten_structure(content)) | |
| return sections | |
| def update_content(self, title, new_content): | |
| try: | |
| self.contents[title] += new_content | |
| self.display_content(title) | |
| except TypeError as e: | |
| st.error(f"Error updating content: {e}") | |
| def display_content(self, title): | |
| if self.contents[title].strip(): | |
| self.placeholders[title].markdown(f"## {title}\n{self.contents[title]}") | |
| def return_existing_contents(self, level=1) -> str: | |
| existing_content = "" | |
| for title, content in self.structure.items(): | |
| if self.contents[title].strip(): | |
| existing_content += f"{'#' * level} {title}\n{self.contents[title]}\n\n" | |
| if isinstance(content, dict): | |
| existing_content += self.get_markdown_content(content, level + 1) | |
| return existing_content | |
| def display_structure(self, structure=None, level=1): | |
| if structure is None: | |
| structure = self.structure | |
| for title, content in structure.items(): | |
| if self.contents[title].strip(): | |
| st.markdown(f"{'#' * level} {title}") | |
| self.placeholders[title].markdown(self.contents[title]) | |
| if isinstance(content, dict): | |
| self.display_structure(content, level + 1) | |
| def display_toc(self, structure, columns, level=1, col_index=0): | |
| for title, content in structure.items(): | |
| with columns[col_index % len(columns)]: | |
| st.markdown(f"{' ' * (level-1) * 2}- {title}") | |
| col_index += 1 | |
| if isinstance(content, dict): | |
| col_index = self.display_toc(content, columns, level + 1, col_index) | |
| return col_index | |
| def get_markdown_content(self, structure=None, level=1): | |
| """ Returns the markdown styled pure string with the contents. """ | |
| if structure is None: | |
| structure = self.structure | |
| markdown_content = "" | |
| for title, content in structure.items(): | |
| if self.contents[title].strip(): | |
| markdown_content += f"{'#' * level} {title}\n{self.contents[title]}\n\n" | |
| if isinstance(content, dict): | |
| markdown_content += self.get_markdown_content(content, level + 1) | |
| return markdown_content | |
| def set_custom_theme(): | |
| # Add custom CSS | |
| st.markdown(""" | |
| <style> | |
| /* Main theme colors */ | |
| :root { | |
| --primary-color: #4A6FDC; | |
| --secondary-color: #45B7D1; | |
| --background-color: #F5F7FF; | |
| --text-color: #333333; | |
| --accent-color: #FF6B6B; | |
| } | |
| /* Header styling */ | |
| .main-header { | |
| background: linear-gradient(90deg, var(--primary-color), var(--secondary-color)); | |
| color: white; | |
| padding: 1.5rem 1rem; | |
| border-radius: 10px; | |
| margin-bottom: 2rem; | |
| text-align: center; | |
| } | |
| .main-header h1 { | |
| font-weight: 700; | |
| margin-bottom: 0.5rem; | |
| } | |
| .main-header p { | |
| font-size: 1.1rem; | |
| opacity: 0.9; | |
| } | |
| /* Card styling */ | |
| .stCard { | |
| border-radius: 10px; | |
| box-shadow: 0 4px 6px rgba(0,0,0,0.05); | |
| padding: 1.5rem; | |
| margin-bottom: 1rem; | |
| background-color: white; | |
| transition: transform 0.2s ease, box-shadow 0.2s ease; | |
| } | |
| .stCard:hover { | |
| transform: translateY(-2px); | |
| box-shadow: 0 6px 12px rgba(0,0,0,0.1); | |
| } | |
| /* Button styling */ | |
| .stButton>button { | |
| border-radius: 20px; | |
| padding: 0.5rem 1.5rem; | |
| font-weight: 500; | |
| transition: all 0.2s ease; | |
| } | |
| .primary-button button { | |
| background-color: var(--primary-color); | |
| color: white; | |
| } | |
| .secondary-button button { | |
| background-color: transparent; | |
| color: var(--primary-color); | |
| border: 1px solid var(--primary-color); | |
| } | |
| /* Input method selection */ | |
| .input-method-selector { | |
| display: flex; | |
| justify-content: space-evenly; | |
| margin-bottom: 2rem; | |
| flex-wrap: wrap; | |
| } | |
| .input-method-card { | |
| width: 140px; | |
| height: 120px; | |
| border-radius: 10px; | |
| display: flex; | |
| flex-direction: column; | |
| align-items: center; | |
| justify-content: center; | |
| cursor: pointer; | |
| padding: 1rem; | |
| margin: 0.5rem; | |
| transition: all 0.2s ease; | |
| } | |
| .input-method-card.selected { | |
| border: 2px solid var(--primary-color); | |
| background-color: rgba(74, 111, 220, 0.1); | |
| } | |
| .input-method-card:hover { | |
| transform: translateY(-3px); | |
| box-shadow: 0 6px 12px rgba(0,0,0,0.1); | |
| } | |
| .input-method-card i { | |
| font-size: 2.5rem; | |
| margin-bottom: 0.5rem; | |
| color: var(--primary-color); | |
| } | |
| .input-method-card p { | |
| font-size: 0.9rem; | |
| text-align: center; | |
| margin: 0; | |
| } | |
| /* Note visualization */ | |
| .note-section { | |
| padding: 1rem; | |
| border-radius: 8px; | |
| background-color: white; | |
| margin-bottom: 1rem; | |
| } | |
| .note-section h2 { | |
| color: var(--primary-color); | |
| border-bottom: 1px solid #eaeaea; | |
| padding-bottom: 0.5rem; | |
| margin-bottom: 0.5rem; | |
| } | |
| /* Expander customization */ | |
| .st-expander { | |
| border-radius: 8px; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.05); | |
| } | |
| /* Footer styling */ | |
| .footer { | |
| margin-top: 3rem; | |
| padding-top: 1rem; | |
| border-top: 1px solid #eaeaea; | |
| text-align: center; | |
| font-size: 0.9rem; | |
| color: #888; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Manual implementation of retry logic (replacing backoff library) | |
| def retry_with_exponential_backoff(max_tries=MAX_RETRIES, initial_wait=INITIAL_WAIT, max_wait=MAX_WAIT): | |
| """ | |
| Custom retry decorator with exponential backoff | |
| """ | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| wait = initial_wait | |
| tries = 0 | |
| while tries < max_tries: | |
| try: | |
| return func(*args, **kwargs) | |
| except (RequestException, GroqAPIError) as e: | |
| tries += 1 | |
| if tries == max_tries: | |
| raise | |
| # Calculate next wait time with exponential backoff | |
| wait = min(wait * 2, max_wait) | |
| # Add some jitter (Β±20%) | |
| jitter = wait * 0.4 * (np.random.random() - 0.5) | |
| wait_with_jitter = wait + jitter | |
| # Inform user of retry attempt | |
| st.info(f"Retrying transcription... (Attempt {tries}/{max_tries})") | |
| time.sleep(wait_with_jitter) | |
| return None # This line should never be reached but is added to satisfy the function signature | |
| return wrapper | |
| return decorator | |
| def transcribe_audio_with_groq(audio_data) -> str: | |
| """ | |
| Transcribe audio file using Groq's whisper-large-v3-turbo model | |
| Args: | |
| audio_data: Either file path string or binary audio data | |
| Returns: | |
| Transcribed text | |
| Raises: | |
| GroqAPIError: For API-related errors | |
| ValueError: For invalid input | |
| """ | |
| if not st.session_state.groq_client: | |
| raise ValueError("Groq client is not initialized. Please check your API key.") | |
| try: | |
| # Save audio data to a temporary file if it's binary data | |
| if isinstance(audio_data, bytes): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | |
| tmp_file.write(audio_data) | |
| audio_file_path = tmp_file.name | |
| else: | |
| # Assume it's a file path | |
| audio_file_path = audio_data | |
| if not os.path.exists(audio_file_path): | |
| raise ValueError(f"Audio file not found: {audio_file_path}") | |
| # Check file size before sending | |
| file_size = os.path.getsize(audio_file_path) | |
| if file_size > MAX_FILE_SIZE: | |
| raise ValueError(f"Audio file size ({file_size/1048576:.2f}MB) exceeds 40MB limit") | |
| with open(audio_file_path, "rb") as file: | |
| # Display a progress message since transcription can take time | |
| progress_placeholder = st.empty() | |
| progress_placeholder.info("Processing audio with whisper-large-v3-turbo...") | |
| # Use the whisper-large-v3-turbo model as requested | |
| transcription = st.session_state.groq_client.audio.transcriptions.create( | |
| file=(audio_file_path, file.read()), | |
| model="whisper-large-v3-turbo", | |
| response_format="verbose_json" | |
| ) | |
| # Clear the progress message when done | |
| progress_placeholder.empty() | |
| if not hasattr(transcription, 'text') or not transcription.text: | |
| raise GroqAPIError("Empty transcription result returned") | |
| # Delete temp file if we created one | |
| if isinstance(audio_data, bytes) and os.path.exists(audio_file_path): | |
| os.unlink(audio_file_path) | |
| return transcription.text | |
| except Exception as e: | |
| error_msg = str(e) | |
| # Handle specific error cases | |
| if "401" in error_msg: | |
| raise GroqAPIError("Authentication failed. Please check your API key.", 401) | |
| elif "429" in error_msg: | |
| raise GroqAPIError("Rate limit exceeded. Please try again later.", 429) | |
| elif "413" in error_msg: | |
| raise GroqAPIError("Audio file too large for processing.", 413) | |
| elif "500" in error_msg or "502" in error_msg or "503" in error_msg or "504" in error_msg: | |
| raise GroqAPIError("Groq server error. Please try again later.", int(error_msg[:3])) | |
| else: | |
| # Re-raise as a GroqAPIError for consistent handling | |
| raise GroqAPIError(f"Error transcribing audio: {error_msg}") | |
| def process_transcript(transcript): | |
| """Process transcript with Groq's DeepSeek model for highly structured notes""" | |
| if not st.session_state.groq_client: | |
| st.error("Groq client is not initialized. Please check your API key.") | |
| return None | |
| # Enhanced structure for better organization | |
| structure = { | |
| "Executive Summary": "", | |
| "Main Agenda": "", | |
| "Points Discussed": "", | |
| "Key Insights": "", | |
| "Questions & Considerations": "", | |
| "Detailed Analysis": { | |
| "Context & Background": "", | |
| "Supporting Evidence": "", | |
| }, | |
| "Next Steps": "" | |
| } | |
| prompt = f""" | |
| You are an expert meeting notes organizer with exceptional skills in creating structured, clear, and comprehensive notes. | |
| Please analyze the following transcript and transform it into organized meeting notes in the same language as the Transcript ensuring Clarity: | |
| ``` | |
| {transcript} | |
| ``` | |
| Create a professional meeting notes document with the following specific sections: | |
| # Executive Summary | |
| - Provide a concise 3-5 sentence overview of the meeting purpose and key outcomes | |
| - Use clear, direct language focused on the most important takeaways | |
| # Main Agenda | |
| - Extract and list the primary agenda items that were discussed in the meeting | |
| - Format as a numbered or bulleted list | |
| - Include time allocations or priority levels if mentioned in the transcript | |
| # Points Discussed | |
| - Provide a comprehensive breakdown of what was discussed for each agenda item | |
| - Use subheadings for each major topic | |
| - Include who raised specific points when identifiable | |
| - Organize chronologically as they appeared in the discussion | |
| # Key Insights | |
| - Extract 5-7 critical insights as bullet points | |
| - Each insight should be **bolded** and followed by 1-2 supporting sentences | |
| - Organize these insights in order of importance | |
| # Questions & Considerations | |
| - List all questions raised during the discussion | |
| - Include concerns or areas needing further exploration | |
| - For each question, provide brief context explaining why it matters | |
| # Detailed Analysis | |
| ## Context & Background | |
| - Summarize relevant background information necessary to understand the discussion | |
| - Explain the context in which the meeting took place | |
| - Include references to prior meetings or decisions if mentioned | |
| ## Supporting Evidence | |
| - Create a table summarizing any data, evidence, or examples mentioned | |
| - Include source information when available | |
| - Format data clearly using markdown tables when appropriate | |
| # Next Steps | |
| - Create a table with these columns: Action | Owner/Responsible Party | Timeline | Priority | |
| - List all tasks, assignments, follow-up items, and decisions made | |
| - If information is not explicitly stated, indicate with "Not specified" | |
| - Include any deadlines or important dates mentioned | |
| - This section should be comprehensive, capturing ALL action items from the meeting | |
| Make extensive use of markdown formatting: | |
| - Use tables for structured information | |
| - Use **bold** for emphasis on important points | |
| - Use bullet points and numbered lists for clarity | |
| - Use headings and subheadings to organize content | |
| - Include blockquotes for direct citations with > symbol | |
| Your notes should be professional, comprehensive yet concise, focusing on extracting the maximum value from the transcript. | |
| """ | |
| try: | |
| stats = GenerationStatistics(model_name=LLM_MODEL) | |
| start_time = time.time() | |
| response = st.session_state.groq_client.chat.completions.create( | |
| messages=[{"role": "user", "content": prompt}], | |
| model=LLM_MODEL, | |
| temperature=0.2, # Slightly lower temperature for more consistent structure | |
| max_tokens=4096, | |
| top_p=0.95, | |
| stream=True | |
| ) | |
| input_time = time.time() - start_time | |
| stats.input_time = input_time | |
| note_section = NoteSection(structure, transcript) | |
| current_section = None | |
| current_subsection = None | |
| notes_content = "" | |
| section_markers = { | |
| "# Executive Summary": "Executive Summary", | |
| "## Executive Summary": "Executive Summary", | |
| "# Main Agenda": "Main Agenda", | |
| "## Main Agenda": "Main Agenda", | |
| "# Points Discussed": "Points Discussed", | |
| "## Points Discussed": "Points Discussed", | |
| "# Key Insights": "Key Insights", | |
| "## Key Insights": "Key Insights", | |
| "# Questions & Considerations": "Questions & Considerations", | |
| "## Questions & Considerations": "Questions & Considerations", | |
| "# Detailed Analysis": "Detailed Analysis", | |
| "## Detailed Analysis": "Detailed Analysis", | |
| "## Context & Background": "Context & Background", | |
| "### Context & Background": "Context & Background", | |
| "## Supporting Evidence": "Supporting Evidence", | |
| "### Supporting Evidence": "Supporting Evidence", | |
| "# Next Steps": "Next Steps", | |
| "## Next Steps": "Next Steps" | |
| } | |
| for chunk in response: | |
| if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content is not None: | |
| content = chunk.choices[0].delta.content | |
| notes_content += content | |
| # Check for section markers in the accumulated content | |
| for marker, section in section_markers.items(): | |
| if marker in notes_content: | |
| if section in ["Context & Background", "Supporting Evidence"]: | |
| current_section = "Detailed Analysis" | |
| current_subsection = section | |
| else: | |
| current_section = section | |
| current_subsection = None | |
| # Update the appropriate section | |
| if current_section and current_section != "Detailed Analysis": | |
| note_section.update_content(current_section, content) | |
| elif current_section == "Detailed Analysis" and current_subsection: | |
| note_section.update_content(current_subsection, content) | |
| output_time = time.time() - start_time - input_time | |
| stats.output_time = output_time | |
| stats.total_time = time.time() - start_time | |
| # Display statistics in expandable section | |
| with st.expander("Generation Statistics", expanded=False): | |
| st.markdown(str(stats)) | |
| return note_section | |
| except Exception as e: | |
| st.error(f"Error processing transcript: {e}") | |
| return None | |
| def export_notes(notes, format="markdown"): | |
| """Export notes in the specified format""" | |
| if format == "markdown": | |
| markdown_content = notes.get_markdown_content() | |
| # Create a download button for the markdown file | |
| st.download_button( | |
| label="Download Markdown", | |
| data=markdown_content, | |
| file_name=f"notes_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md", | |
| mime="text/markdown" | |
| ) | |
| elif format == "pdf": | |
| markdown_content = notes.get_markdown_content() | |
| pdf_file = BytesIO() | |
| md2pdf(pdf_file, markdown_content) | |
| pdf_file.seek(0) | |
| # Create a download button for the PDF file | |
| st.download_button( | |
| label="Download PDF", | |
| data=pdf_file, | |
| file_name=f"notes_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf", | |
| mime="application/pdf" | |
| ) | |
| def main(): | |
| st.title("π§ββοΈ Note") | |
| st.markdown("Transform speech into structured notes") | |
| # Initialize API key from environment (Hugging Face secrets) | |
| if st.session_state.api_key and not st.session_state.groq_client: | |
| st.session_state.groq_client = initialize_groq_client(st.session_state.api_key) | |
| # Display model info in the sidebar | |
| with st.sidebar: | |
| st.info("Using DeepSeek-R1-Distill-Llama-70B model for note generation and Distil Whisper for transcription") | |
| # Check if API key is valid before proceeding | |
| if not st.session_state.api_key: | |
| st.error("β No API key found. Please set the GROQ_API_KEY secret in your Hugging Face Space settings.") | |
| st.stop() | |
| if not st.session_state.groq_client: | |
| st.error("β Failed to initialize Groq client. Please check your API key secret in Hugging Face Space settings.") | |
| st.stop() | |
| # Input methods tabs | |
| input_method = st.radio("Choose input method:", ["Live Recording", "Upload Audio", "YouTube URL", "Text Input"]) | |
| if input_method == "Live Recording": | |
| st.markdown("### Record Audio") | |
| st.markdown("Click the microphone button below to start recording. Click it again to stop.") | |
| # Use the streamlit-audiorec component for recording | |
| wav_audio_data = st_audiorec() | |
| # If audio data is returned, display and process it | |
| if wav_audio_data is not None: | |
| # Reset any previous transcription errors | |
| st.session_state.transcription_error = None | |
| # Display the audio | |
| st.audio(wav_audio_data, format='audio/wav') | |
| # Add a button to transcribe the recorded audio | |
| if st.button("Transcribe Recording", key="transcribe_rec"): | |
| with st.spinner("Transcribing audio with Groq..."): | |
| try: | |
| transcript = transcribe_audio_with_groq(wav_audio_data) | |
| if transcript: | |
| st.session_state.transcript = transcript | |
| st.success("β Transcription complete!") | |
| with st.expander("View Transcript", expanded=True): | |
| st.markdown(transcript) | |
| if st.button("Generate Structured Notes", key="generate_live"): | |
| with st.spinner("Creating structured notes..."): | |
| notes = process_transcript(transcript) | |
| if notes: | |
| st.success("Notes generated successfully!") | |
| # Export options | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Export as Markdown", key="md_live"): | |
| export_notes(notes, "markdown") | |
| with col2: | |
| if st.button("Export as PDF", key="pdf_live"): | |
| export_notes(notes, "pdf") | |
| except (ValueError, GroqAPIError) as e: | |
| st.session_state.transcription_error = str(e) | |
| st.error(f"β Transcription failed: {str(e)}") | |
| elif input_method == "Upload Audio": | |
| uploaded_file = st.file_uploader( | |
| "Upload an audio file (max 40MB)", | |
| type=["mp3", "wav", "m4a", "ogg"], | |
| help="Supported formats: MP3, WAV, M4A, OGG. Maximum size: 40MB" | |
| ) | |
| if uploaded_file: | |
| file_size = uploaded_file.size | |
| if file_size > MAX_FILE_SIZE: | |
| st.error(f"File size ({file_size/1048576:.2f}MB) exceeds the maximum allowed size of 40MB.") | |
| else: | |
| # Save the uploaded file temporarily | |
| with tempfile.NamedTemporaryFile(delete=False, suffix="." + uploaded_file.name.split(".")[-1]) as tmp_file: | |
| tmp_file.write(uploaded_file.getvalue()) | |
| audio_file_path = tmp_file.name | |
| st.audio(uploaded_file) | |
| if st.button("Transcribe and Generate Notes", key="transcribe_upload"): | |
| # Reset any previous transcription errors | |
| st.session_state.transcription_error = None | |
| with st.spinner("Transcribing audio with Groq..."): | |
| try: | |
| transcript = transcribe_audio_with_groq(audio_file_path) | |
| if transcript: | |
| st.session_state.transcript = transcript | |
| st.success("β Transcription complete!") | |
| with st.expander("View Transcript", expanded=True): | |
| st.markdown(transcript) | |
| with st.spinner("Creating structured notes..."): | |
| notes = process_transcript(transcript) | |
| if notes: | |
| st.success("Notes generated successfully!") | |
| # Export options | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Export as Markdown", key="md_upload"): | |
| export_notes(notes, "markdown") | |
| with col2: | |
| if st.button("Export as PDF", key="pdf_upload"): | |
| export_notes(notes, "pdf") | |
| except (ValueError, GroqAPIError) as e: | |
| st.session_state.transcription_error = str(e) | |
| st.error(f"β Transcription failed: {str(e)}") | |
| # Provide helpful suggestions based on error type | |
| if "Audio file too large" in str(e) or "exceeds" in str(e): | |
| st.info("π‘ Try trimming your audio file or uploading a shorter segment.") | |
| elif "API key" in str(e) or "Authentication" in str(e): | |
| st.info("π‘ Check that your Groq API key is correct in your Hugging Face Space settings.") | |
| elif "Rate limit" in str(e): | |
| st.info("π‘ You've hit Groq's rate limits. Please wait a few minutes before trying again.") | |
| elif input_method == "YouTube URL": | |
| youtube_url = st.text_input( | |
| "Enter YouTube URL:", | |
| help="Enter the full URL of a YouTube video (e.g., https://www.youtube.com/watch?v=example)" | |
| ) | |
| if youtube_url: | |
| if st.button("Process YouTube Content", key="process_yt"): | |
| # Reset any previous errors | |
| st.session_state.transcription_error = None | |
| with st.spinner("Downloading YouTube content..."): | |
| try: | |
| audio_path = download_video_audio(youtube_url) | |
| if audio_path: | |
| st.success("Video downloaded successfully!") | |
| st.audio(audio_path) | |
| with st.spinner("Transcribing audio with Groq..."): | |
| try: | |
| transcript = transcribe_audio_with_groq(audio_path) | |
| if transcript: | |
| st.session_state.transcript = transcript | |
| st.success("β Transcription complete!") | |
| with st.expander("View Transcript", expanded=True): | |
| st.markdown(transcript) | |
| with st.spinner("Creating structured notes..."): | |
| notes = process_transcript(transcript) | |
| if notes: | |
| st.success("Notes generated successfully!") | |
| # Export options | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Export as Markdown", key="md_yt"): | |
| export_notes(notes, "markdown") | |
| with col2: | |
| if st.button("Export as PDF", key="pdf_yt"): | |
| export_notes(notes, "pdf") | |
| except (ValueError, GroqAPIError) as e: | |
| st.session_state.transcription_error = str(e) | |
| st.error(f"β Transcription failed: {str(e)}") | |
| # Clean up downloaded files | |
| delete_download(audio_path) | |
| except Exception as e: | |
| if "exceeds maximum allowed size" in str(e): | |
| st.error(f"{FILE_TOO_LARGE_MESSAGE} Try a shorter video.") | |
| else: | |
| st.error(f"Error processing YouTube video: {e}") | |
| else: # Text Input | |
| transcript = st.text_area( | |
| "Enter transcript text:", | |
| height=300, | |
| help="Paste or type your transcript text here for generating structured notes" | |
| ) | |
| if transcript: | |
| st.session_state.transcript = transcript | |
| if st.button("Generate Structured Notes", key="process_text"): | |
| with st.spinner("Creating structured notes..."): | |
| notes = process_transcript(transcript) | |
| if notes: | |
| st.success("Notes generated successfully!") | |
| # Export options | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Export as Markdown", key="md_text"): | |
| export_notes(notes, "markdown") | |
| with col2: | |
| if st.button("Export as PDF", key="pdf_text"): | |
| export_notes(notes, "pdf") | |
| if __name__ == "__main__": | |
| main() |