diff --git "a/App_Function_Libraries/Gradio_Related.py" "b/App_Function_Libraries/Gradio_Related.py"
new file mode 100644--- /dev/null
+++ "b/App_Function_Libraries/Gradio_Related.py"
@@ -0,0 +1,2226 @@
+# Gradio_Related.py
+#########################################
+# Gradio UI Functions Library
+# This library is used to hold all UI-related functions for Gradio.
+# I fucking hate Gradio.
+#
+#####
+# Functions:
+#
+# download_audio_file(url, save_path)
+# process_audio(
+# process_audio_file(audio_url, audio_file, whisper_model="small.en", api_name=None, api_key=None)
+#
+#
+#########################################
+#
+# Built-In Imports
+from datetime import datetime
+import json
+import logging
+import os.path
+from pathlib import Path
+import sqlite3
+from typing import Dict, List, Tuple
+import traceback
+from functools import wraps
+#
+# Import 3rd-Party Libraries
+import yt_dlp
+import gradio as gr
+#
+# Local Imports
+from App_Function_Libraries.Article_Summarization_Lib import scrape_and_summarize_multiple
+from App_Function_Libraries.Audio_Files import process_audio_files, process_podcast
+from App_Function_Libraries.Chunk_Lib import improved_chunking_process, get_chat_completion
+from App_Function_Libraries.PDF_Ingestion_Lib import process_and_cleanup_pdf
+from App_Function_Libraries.Local_LLM_Inference_Engine_Lib import local_llm_gui_function
+from App_Function_Libraries.Local_Summarization_Lib import summarize_with_llama, summarize_with_kobold, \
+ summarize_with_oobabooga, summarize_with_tabbyapi, summarize_with_vllm, summarize_with_local_llm
+from App_Function_Libraries.Summarization_General_Lib import summarize_with_openai, summarize_with_cohere, \
+ summarize_with_anthropic, summarize_with_groq, summarize_with_openrouter, summarize_with_deepseek, \
+ summarize_with_huggingface, perform_summarization, save_transcription_and_summary, \
+ perform_transcription, summarize_chunk
+from App_Function_Libraries.SQLite_DB import update_media_content, list_prompts, search_and_display, db, DatabaseError, \
+ fetch_prompt_details, keywords_browser_interface, add_keyword, delete_keyword, \
+ export_keywords_to_csv, export_to_file, add_media_to_database, insert_prompt_to_db
+from App_Function_Libraries.Utils import sanitize_filename, extract_text_from_segments, create_download_directory, \
+ convert_to_seconds, load_comprehensive_config
+from App_Function_Libraries.Video_DL_Ingestion_Lib import parse_and_expand_urls, \
+ generate_timestamped_url, extract_metadata, download_video
+
+#
+#######################################################################################################################
+# Function Definitions
+#
+
+whisper_models = ["small", "medium", "small.en", "medium.en", "medium", "large", "large-v1", "large-v2", "large-v3",
+ "distil-large-v2", "distil-medium.en", "distil-small.en"]
+custom_prompt_input = None
+server_mode = False
+share_public = False
+
+
+def load_preset_prompts():
+ return list_prompts()
+
+
+def gradio_download_youtube_video(url):
+ """Download video using yt-dlp with specified options."""
+ # Determine ffmpeg path based on the operating system.
+ ffmpeg_path = './Bin/ffmpeg.exe' if os.name == 'nt' else 'ffmpeg'
+
+ # Extract information about the video
+ with yt_dlp.YoutubeDL({'quiet': True}) as ydl:
+ info_dict = ydl.extract_info(url, download=False)
+ sanitized_title = sanitize_filename(info_dict['title'])
+ original_ext = info_dict['ext']
+
+ # Setup the final directory and filename
+ download_dir = Path(f"results/{sanitized_title}")
+ download_dir.mkdir(parents=True, exist_ok=True)
+ output_file_path = download_dir / f"{sanitized_title}.{original_ext}"
+
+ # Initialize yt-dlp with generic options and the output template
+ ydl_opts = {
+ 'format': 'bestvideo+bestaudio/best',
+ 'ffmpeg_location': ffmpeg_path,
+ 'outtmpl': str(output_file_path),
+ 'noplaylist': True, 'quiet': True
+ }
+
+ # Execute yt-dlp to download the video
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
+ ydl.download([url])
+
+ # Final check to ensure file exists
+ if not output_file_path.exists():
+ raise FileNotFoundError(f"Expected file was not found: {output_file_path}")
+
+ return str(output_file_path)
+
+
+
+
+def format_transcription(content):
+ # Add extra space after periods for better readability
+ content = content.replace('.', '. ').replace('. ', '. ')
+ # Split the content into lines for multiline display
+ lines = content.split('. ')
+ # Join lines with HTML line break for better presentation in Markdown
+ formatted_content = "
".join(lines)
+ return formatted_content
+
+
+def format_file_path(file_path, fallback_path=None):
+ if file_path and os.path.exists(file_path):
+ logging.debug(f"File exists: {file_path}")
+ return file_path
+ elif fallback_path and os.path.exists(fallback_path):
+ logging.debug(f"File does not exist: {file_path}. Returning fallback path: {fallback_path}")
+ return fallback_path
+ else:
+ logging.debug(f"File does not exist: {file_path}. No fallback path available.")
+ return None
+
+
+def search_media(query, fields, keyword, page):
+ try:
+ results = search_and_display(query, fields, keyword, page)
+ return results
+ except Exception as e:
+ logger = logging.getLogger()
+ logger.error(f"Error searching media: {e}")
+ return str(e)
+
+
+
+
+# Sample data
+prompts_category_1 = [
+ "What are the key points discussed in the video?",
+ "Summarize the main arguments made by the speaker.",
+ "Describe the conclusions of the study presented."
+]
+
+prompts_category_2 = [
+ "How does the proposed solution address the problem?",
+ "What are the implications of the findings?",
+ "Can you explain the theory behind the observed phenomenon?"
+]
+
+all_prompts = prompts_category_1 + prompts_category_2
+
+
+
+
+
+# Handle prompt selection
+def handle_prompt_selection(prompt):
+ return f"You selected: {prompt}"
+
+def display_details(media_id):
+ # Gradio Search Function-related stuff
+ if media_id:
+ details = display_item_details(media_id)
+ details_html = ""
+ for detail in details:
+ details_html += f"
Prompt:
{detail[0]}
"
+ details_html += f"Summary:
{detail[1]}
"
+ details_html += f"Transcription:
{detail[2]}
"
+ return details_html
+ return "No details available."
+
+
+def fetch_items_by_title_or_url(search_query: str, search_type: str):
+ try:
+ with db.get_connection() as conn:
+ cursor = conn.cursor()
+ if search_type == 'Title':
+ cursor.execute("SELECT id, title, url FROM Media WHERE title LIKE ?", (f'%{search_query}%',))
+ elif search_type == 'URL':
+ cursor.execute("SELECT id, title, url FROM Media WHERE url LIKE ?", (f'%{search_query}%',))
+ results = cursor.fetchall()
+ return results
+ except sqlite3.Error as e:
+ raise DatabaseError(f"Error fetching items by {search_type}: {e}")
+
+
+def fetch_items_by_keyword(search_query: str):
+ try:
+ with db.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute("""
+ SELECT m.id, m.title, m.url
+ FROM Media m
+ JOIN MediaKeywords mk ON m.id = mk.media_id
+ JOIN Keywords k ON mk.keyword_id = k.id
+ WHERE k.keyword LIKE ?
+ """, (f'%{search_query}%',))
+ results = cursor.fetchall()
+ return results
+ except sqlite3.Error as e:
+ raise DatabaseError(f"Error fetching items by keyword: {e}")
+
+
+def fetch_items_by_content(search_query: str):
+ try:
+ with db.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute("SELECT id, title, url FROM Media WHERE content LIKE ?", (f'%{search_query}%',))
+ results = cursor.fetchall()
+ return results
+ except sqlite3.Error as e:
+ raise DatabaseError(f"Error fetching items by content: {e}")
+
+
+def fetch_item_details_single(media_id: int):
+ try:
+ with db.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute("""
+ SELECT prompt, summary
+ FROM MediaModifications
+ WHERE media_id = ?
+ ORDER BY modification_date DESC
+ LIMIT 1
+ """, (media_id,))
+ prompt_summary_result = cursor.fetchone()
+ cursor.execute("SELECT content FROM Media WHERE id = ?", (media_id,))
+ content_result = cursor.fetchone()
+
+ prompt = prompt_summary_result[0] if prompt_summary_result else ""
+ summary = prompt_summary_result[1] if prompt_summary_result else ""
+ content = content_result[0] if content_result else ""
+
+ return prompt, summary, content
+ except sqlite3.Error as e:
+ raise Exception(f"Error fetching item details: {e}")
+
+
+def fetch_item_details(media_id: int):
+ try:
+ with db.get_connection() as conn:
+ cursor = conn.cursor()
+ cursor.execute("""
+ SELECT prompt, summary
+ FROM MediaModifications
+ WHERE media_id = ?
+ ORDER BY modification_date DESC
+ LIMIT 1
+ """, (media_id,))
+ prompt_summary_result = cursor.fetchone()
+ cursor.execute("SELECT content FROM Media WHERE id = ?", (media_id,))
+ content_result = cursor.fetchone()
+
+ prompt = prompt_summary_result[0] if prompt_summary_result else ""
+ summary = prompt_summary_result[1] if prompt_summary_result else ""
+ content = content_result[0] if content_result else ""
+
+ return content, prompt, summary
+ except sqlite3.Error as e:
+ logging.error(f"Error fetching item details: {e}")
+ return "", "", "" # Return empty strings if there's an error
+
+
+def browse_items(search_query, search_type):
+ if search_type == 'Keyword':
+ results = fetch_items_by_keyword(search_query)
+ elif search_type == 'Content':
+ results = fetch_items_by_content(search_query)
+ else:
+ results = fetch_items_by_title_or_url(search_query, search_type)
+ return results
+
+
+def display_item_details(media_id):
+ # Function to display item details
+ prompt_summary_results, content = fetch_item_details(media_id)
+ content_section = f"Transcription:
{content}
"
+ prompt_summary_section = ""
+ for prompt, summary in prompt_summary_results:
+ prompt_summary_section += f"Prompt:
{prompt}
"
+ prompt_summary_section += f"Summary:
{summary}
"
+ return prompt_summary_section, content_section
+
+
+def update_dropdown(search_query, search_type):
+ results = browse_items(search_query, search_type)
+ item_options = [f"{item[1]} ({item[2]})" for item in results]
+ new_item_mapping = {f"{item[1]} ({item[2]})": item[0] for item in results}
+ print(f"Debug - Update Dropdown - New Item Mapping: {new_item_mapping}")
+ return gr.update(choices=item_options), new_item_mapping
+
+
+
+def get_media_id(selected_item, item_mapping):
+ return item_mapping.get(selected_item)
+
+
+def update_detailed_view(item, item_mapping):
+ # Function to update the detailed view based on selected item
+ if item:
+ item_id = item_mapping.get(item)
+ if item_id:
+ content, prompt, summary = fetch_item_details(item_id)
+ if content or prompt or summary:
+ details_html = "Details:
"
+ if prompt:
+ details_html += f"Prompt:
{prompt}"
+ if summary:
+ details_html += f"Summary:
{summary}"
+ # Format the transcription content for better readability
+ content_html = f"Transcription:
{format_transcription(content)}
"
+ return details_html, content_html
+ else:
+ return "No details available.", "No details available."
+ else:
+ return "No item selected", "No item selected"
+ else:
+ return "No item selected", "No item selected"
+
+
+def format_content(content):
+ # Format content using markdown
+ formatted_content = f"```\n{content}\n```"
+ return formatted_content
+
+
+def update_prompt_dropdown():
+ prompt_names = list_prompts()
+ return gr.update(choices=prompt_names)
+
+
+def display_prompt_details(selected_prompt):
+ if selected_prompt:
+ details = fetch_prompt_details(selected_prompt)
+ if details:
+ details_str = f"Details:
{details[0]}
"
+ system_str = f"System:
{details[1]}
"
+ user_str = f"User:
{details[2]}
" if details[2] else ""
+ return details_str + system_str + user_str
+ return "No details available."
+
+
+def display_search_results(query):
+ if not query.strip():
+ return "Please enter a search query."
+
+ results = search_prompts(query)
+
+ # Debugging: Print the results to the console to see what is being returned
+ print(f"Processed search results for query '{query}': {results}")
+
+ if results:
+ result_md = "## Search Results:\n"
+ for result in results:
+ # Debugging: Print each result to see its format
+ print(f"Result item: {result}")
+
+ if len(result) == 2:
+ name, details = result
+ result_md += f"**Title:** {name}\n\n**Description:** {details}\n\n---\n"
+ else:
+ result_md += "Error: Unexpected result format.\n\n---\n"
+ return result_md
+ return "No results found."
+
+
+def search_media_database(query: str) -> List[Tuple[int, str, str]]:
+ return browse_items(query, 'Title')
+
+
+def load_media_content(media_id: int) -> dict:
+ try:
+ print(f"Debug - Load Media Content - Media ID: {media_id}")
+ item_details = fetch_item_details(media_id)
+ print(f"Debug - Load Media Content - Item Details: {item_details}")
+
+ if isinstance(item_details, tuple) and len(item_details) == 3:
+ content, prompt, summary = item_details
+ else:
+ print(f"Debug - Load Media Content - Unexpected item_details format: {item_details}")
+ content, prompt, summary = "", "", ""
+
+ return {
+ "content": content or "No content available",
+ "prompt": prompt or "No prompt available",
+ "summary": summary or "No summary available"
+ }
+ except Exception as e:
+ print(f"Debug - Load Media Content - Error: {str(e)}")
+ return {"content": "", "prompt": "", "summary": ""}
+
+def load_preset_prompts():
+ return list_prompts()
+
+def chat(message, history, media_content, selected_parts, api_endpoint, api_key, prompt):
+ try:
+ print(f"Debug - Chat Function - Message: {message}")
+ print(f"Debug - Chat Function - Media Content: {media_content}")
+ print(f"Debug - Chat Function - Selected Parts: {selected_parts}")
+ print(f"Debug - Chat Function - API Endpoint: {api_endpoint}")
+ print(f"Debug - Chat Function - Prompt: {prompt}")
+
+ # Ensure selected_parts is a list
+ if not isinstance(selected_parts, (list, tuple)):
+ selected_parts = [selected_parts] if selected_parts else []
+
+ print(f"Debug - Chat Function - Selected Parts (after check): {selected_parts}")
+
+ # Combine the selected parts of the media content
+ combined_content = "\n\n".join([f"{part.capitalize()}: {media_content.get(part, '')}" for part in selected_parts if part in media_content])
+ print(f"Debug - Chat Function - Combined Content: {combined_content[:500]}...") # Print first 500 chars
+
+ # Prepare the input for the API
+ input_data = f"{combined_content}\n\nUser: {message}\nAI:"
+ print(f"Debug - Chat Function - Input Data: {input_data[:500]}...") # Print first 500 chars
+
+ # Use the existing API request code based on the selected endpoint
+ if api_endpoint.lower() == 'openai':
+ response = summarize_with_openai(api_key, input_data, prompt)
+ elif api_endpoint.lower() == "anthropic":
+ response = summarize_with_anthropic(api_key, input_data, prompt)
+ elif api_endpoint.lower() == "cohere":
+ response = summarize_with_cohere(api_key, input_data, prompt)
+ elif api_endpoint.lower() == "groq":
+ response = summarize_with_groq(api_key, input_data, prompt)
+ elif api_endpoint.lower() == "openrouter":
+ response = summarize_with_openrouter(api_key, input_data, prompt)
+ elif api_endpoint.lower() == "deepseek":
+ response = summarize_with_deepseek(api_key, input_data, prompt)
+ elif api_endpoint.lower() == "llama.cpp":
+ response = summarize_with_llama(input_data, prompt)
+ elif api_endpoint.lower() == "kobold":
+ response = summarize_with_kobold(input_data, api_key, prompt)
+ elif api_endpoint.lower() == "ooba":
+ response = summarize_with_oobabooga(input_data, api_key, prompt)
+ elif api_endpoint.lower() == "tabbyapi":
+ response = summarize_with_tabbyapi(input_data, prompt)
+ elif api_endpoint.lower() == "vllm":
+ response = summarize_with_vllm(input_data, prompt)
+ elif api_endpoint.lower() == "local-llm":
+ response = summarize_with_local_llm(input_data, prompt)
+ elif api_endpoint.lower() == "huggingface":
+ response = summarize_with_huggingface(api_key, input_data, prompt)
+ else:
+ raise ValueError(f"Unsupported API endpoint: {api_endpoint}")
+
+ return response
+
+ except Exception as e:
+ logging.error(f"Error in chat function: {str(e)}")
+ return f"An error occurred: {str(e)}"
+
+
+def save_chat_history(history: List[List[str]], media_content: Dict[str, str], selected_parts: List[str],
+ api_endpoint: str, prompt: str):
+ """
+ Save the chat history along with context information to a JSON file.
+ """
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ filename = f"chat_history_{timestamp}.json"
+
+ chat_data = {
+ "timestamp": timestamp,
+ "history": history,
+ "context": {
+ "selected_media": {
+ part: media_content.get(part, "") for part in selected_parts
+ },
+ "api_endpoint": api_endpoint,
+ "prompt": prompt
+ }
+ }
+
+ json_data = json.dumps(chat_data, indent=2)
+
+ return filename, json_data
+
+
+def error_handler(func):
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception as e:
+ error_message = f"Error in {func.__name__}: {str(e)}"
+ logging.error(f"{error_message}\n{traceback.format_exc()}")
+ return {"error": error_message, "details": traceback.format_exc()}
+ return wrapper
+
+
+def create_chunking_inputs():
+ chunk_text_by_words_checkbox = gr.Checkbox(label="Chunk Text by Words", value=False, visible=True)
+ max_words_input = gr.Number(label="Max Words", value=300, precision=0, visible=True)
+ chunk_text_by_sentences_checkbox = gr.Checkbox(label="Chunk Text by Sentences", value=False, visible=True)
+ max_sentences_input = gr.Number(label="Max Sentences", value=10, precision=0, visible=True)
+ chunk_text_by_paragraphs_checkbox = gr.Checkbox(label="Chunk Text by Paragraphs", value=False, visible=True)
+ max_paragraphs_input = gr.Number(label="Max Paragraphs", value=5, precision=0, visible=True)
+ chunk_text_by_tokens_checkbox = gr.Checkbox(label="Chunk Text by Tokens", value=False, visible=True)
+ max_tokens_input = gr.Number(label="Max Tokens", value=1000, precision=0, visible=True)
+ gr_semantic_chunk_long_file = gr.Checkbox(label="Semantic Chunking by Sentence similarity", value=False, visible=True)
+ gr_semantic_chunk_long_file_size = gr.Number(label="Max Chunk Size", value=2000, visible=True)
+ gr_semantic_chunk_long_file_overlap = gr.Number(label="Max Chunk Overlap Size", value=100, visible=True)
+ return [chunk_text_by_words_checkbox, max_words_input, chunk_text_by_sentences_checkbox, max_sentences_input,
+ chunk_text_by_paragraphs_checkbox, max_paragraphs_input, chunk_text_by_tokens_checkbox, max_tokens_input]
+
+
+
+def create_video_transcription_tab():
+ with gr.TabItem("Video Transcription + Summarization"):
+ gr.Markdown("# Transcribe & Summarize Videos from URLs")
+ with gr.Row():
+ gr.Markdown("""Follow this project at [tldw - GitHub](https://github.com/rmusser01/tldw)""")
+ with gr.Row():
+ with gr.Column():
+ url_input = gr.Textbox(label="URL(s) (Mandatory)",
+ placeholder="Enter video URLs here, one per line. Supports YouTube, Vimeo, and playlists.",
+ lines=5)
+ diarize_input = gr.Checkbox(label="Enable Speaker Diarization", value=False)
+ whisper_model_input = gr.Dropdown(choices=whisper_models, value="medium", label="Whisper Model")
+ custom_prompt_checkbox = gr.Checkbox(label="Use Custom Prompt", value=False, visible=True)
+ custom_prompt_input = gr.Textbox(label="Custom Prompt", placeholder="Enter custom prompt here", lines=3, visible=False)
+ custom_prompt_checkbox.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[custom_prompt_checkbox],
+ outputs=[custom_prompt_input]
+ )
+ api_name_input = gr.Dropdown(
+ choices=[None, "Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "OpenRouter",
+ "Llama.cpp", "Kobold", "Ooba", "Tabbyapi", "VLLM", "HuggingFace"],
+ value=None, label="API Name (Mandatory)")
+ api_key_input = gr.Textbox(label="API Key (Mandatory)", placeholder="Enter your API key here")
+ keywords_input = gr.Textbox(label="Keywords", placeholder="Enter keywords here (comma-separated)",
+ value="default,no_keyword_set")
+ batch_size_input = gr.Slider(minimum=1, maximum=10, value=1, step=1,
+ label="Batch Size (Number of videos to process simultaneously)")
+ timestamp_option = gr.Radio(choices=["Include Timestamps", "Exclude Timestamps"],
+ value="Include Timestamps", label="Timestamp Option")
+ keep_original_video = gr.Checkbox(label="Keep Original Video", value=False)
+ # First, create a checkbox to toggle the chunking options
+ chunking_options_checkbox = gr.Checkbox(label="Show Chunking Options", value=False)
+ summarize_recursively = gr.Checkbox(label="Enable Recursive Summarization", value=False)
+ use_cookies_input = gr.Checkbox(label="Use cookies for authenticated download", value=False)
+ use_time_input = gr.Checkbox(label="Use Start and End Time", value=False)
+
+ with gr.Row(visible=False) as time_input_box:
+ gr.Markdown("### Start and End time")
+ with gr.Column():
+ start_time_input = gr.Textbox(label="Start Time (Optional)",
+ placeholder="e.g., 1:30 or 90 (in seconds)")
+ end_time_input = gr.Textbox(label="End Time (Optional)", placeholder="e.g., 5:45 or 345 (in seconds)")
+
+ use_time_input.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[use_time_input],
+ outputs=[time_input_box]
+ )
+
+ cookies_input = gr.Textbox(
+ label="User Session Cookies",
+ placeholder="Paste your cookies here (JSON format)",
+ lines=3,
+ visible=False
+ )
+
+ use_cookies_input.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[use_cookies_input],
+ outputs=[cookies_input]
+ )
+ # Then, create a Box to group the chunking options
+ with gr.Row(visible=False) as chunking_options_box:
+ gr.Markdown("### Chunking Options")
+ with gr.Column():
+ chunk_method = gr.Dropdown(choices=['words', 'sentences', 'paragraphs', 'tokens'],
+ label="Chunking Method")
+ max_chunk_size = gr.Slider(minimum=100, maximum=1000, value=300, step=50, label="Max Chunk Size")
+ chunk_overlap = gr.Slider(minimum=0, maximum=100, value=0, step=10, label="Chunk Overlap")
+ use_adaptive_chunking = gr.Checkbox(label="Use Adaptive Chunking")
+ use_multi_level_chunking = gr.Checkbox(label="Use Multi-level Chunking")
+ chunk_language = gr.Dropdown(choices=['english', 'french', 'german', 'spanish'],
+ label="Chunking Language")
+
+ # Add JavaScript to toggle the visibility of the chunking options box
+ chunking_options_checkbox.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[chunking_options_checkbox],
+ outputs=[chunking_options_box]
+ )
+ process_button = gr.Button("Process Videos")
+
+ with gr.Column():
+ progress_output = gr.Textbox(label="Progress")
+ error_output = gr.Textbox(label="Errors", visible=False)
+ results_output = gr.HTML(label="Results")
+ download_transcription = gr.File(label="Download All Transcriptions as JSON")
+ download_summary = gr.File(label="Download All Summaries as Text")
+
+ @error_handler
+ def process_videos_with_error_handling(urls, start_time, end_time, diarize, whisper_model,
+ custom_prompt_checkbox, custom_prompt, chunking_options_checkbox,
+ chunk_method, max_chunk_size, chunk_overlap, use_adaptive_chunking,
+ use_multi_level_chunking, chunk_language, api_name,
+ api_key, keywords, use_cookies, cookies, batch_size,
+ timestamp_option, keep_original_video, summarize_recursively,
+ progress: gr.Progress = gr.Progress()) -> tuple:
+ try:
+ logging.info("Entering process_videos_with_error_handling")
+ logging.info(f"Received URLs: {urls}")
+
+ if not urls:
+ raise ValueError("No URLs provided")
+
+ logging.debug("Input URL(s) is(are) valid")
+
+ # Ensure batch_size is an integer
+ try:
+ batch_size = int(batch_size)
+ except (ValueError, TypeError):
+ batch_size = 1 # Default to processing one video at a time if invalid
+
+ expanded_urls = parse_and_expand_urls(urls)
+ logging.info(f"Expanded URLs: {expanded_urls}")
+
+ total_videos = len(expanded_urls)
+ logging.info(f"Total videos to process: {total_videos}")
+ results = []
+ errors = []
+ results_html = ""
+ all_transcriptions = {}
+ all_summaries = ""
+
+ for i in range(0, total_videos, batch_size):
+ batch = expanded_urls[i:i + batch_size]
+ batch_results = []
+
+ for url in batch:
+ try:
+ start_seconds = convert_to_seconds(start_time)
+ end_seconds = convert_to_seconds(end_time) if end_time else None
+
+ logging.info(f"Attempting to extract metadata for {url}")
+ video_metadata = extract_metadata(url, use_cookies, cookies)
+ if not video_metadata:
+ raise ValueError(f"Failed to extract metadata for {url}")
+
+ chunk_options = {
+ 'method': chunk_method,
+ 'max_size': max_chunk_size,
+ 'overlap': chunk_overlap,
+ 'adaptive': use_adaptive_chunking,
+ 'multi_level': use_multi_level_chunking,
+ 'language': chunk_language
+ } if chunking_options_checkbox else None
+
+ result = process_url_with_metadata(
+ url, 2, whisper_model,
+ custom_prompt if custom_prompt_checkbox else None,
+ start_seconds, api_name, api_key,
+ False, False, False, False, 0.01, None, keywords, None, diarize,
+ end_time=end_seconds,
+ include_timestamps=(timestamp_option == "Include Timestamps"),
+ metadata=video_metadata,
+ use_chunking=chunking_options_checkbox,
+ chunk_options=chunk_options,
+ keep_original_video=keep_original_video
+ )
+
+ if result[0] is None: # Check if the first return value is None
+ error_message = "Processing failed without specific error"
+ batch_results.append((url, error_message, "Error", video_metadata, None, None))
+ errors.append(f"Error processing {url}: {error_message}")
+ else:
+ url, transcription, summary, json_file, summary_file, result_metadata = result
+ if transcription is None:
+ error_message = f"Processing failed for {url}: Transcription is None"
+ batch_results.append((url, error_message, "Error", result_metadata, None, None))
+ errors.append(error_message)
+ else:
+ batch_results.append(
+ (url, transcription, "Success", result_metadata, json_file, summary_file))
+
+ except Exception as e:
+ error_message = f"Error processing {url}: {str(e)}"
+ logging.error(error_message, exc_info=True)
+ batch_results.append((url, error_message, "Error", {}, None, None))
+ errors.append(error_message)
+
+ results.extend(batch_results)
+ if isinstance(progress, gr.Progress):
+ progress((i + len(batch)) / total_videos,
+ f"Processed {i + len(batch)}/{total_videos} videos")
+
+ # Generate HTML for results
+ for url, transcription, status, metadata, json_file, summary_file in results:
+ if status == "Success":
+ title = metadata.get('title', 'Unknown Title')
+
+ # Check if transcription is a string (which it should be now)
+ if isinstance(transcription, str):
+ # Split the transcription into metadata and actual transcription
+ parts = transcription.split('\n\n', 1)
+ if len(parts) == 2:
+ metadata_text, transcription_text = parts
+ else:
+ metadata_text = "Metadata not found"
+ transcription_text = transcription
+ else:
+ metadata_text = "Metadata format error"
+ transcription_text = "Transcription format error"
+
+ summary = open(summary_file, 'r').read() if summary_file else "No summary available"
+
+ results_html += f"""
+
+
+
+ URL: {url}
+ Metadata:
+ {metadata_text}
+ Transcription:
+ {transcription_text}
+ Summary:
+ {summary}
+
+
+
+ """
+ logging.debug(f"Transcription for {url}: {transcription[:200]}...")
+ all_transcriptions[url] = transcription
+ all_summaries += f"Title: {title}\nURL: {url}\n\n{metadata_text}\n\nTranscription:\n{transcription_text}\n\nSummary:\n{summary}\n\n---\n\n"
+ else:
+ results_html += f"""
+
+
Error processing {url}
+
{transcription}
+
+ """
+
+ # Save all transcriptions and summaries to files
+ with open('all_transcriptions.json', 'w') as f:
+ json.dump(all_transcriptions, f, indent=2)
+
+ with open('all_summaries.txt', 'w') as f:
+ f.write(all_summaries)
+
+ error_summary = "\n".join(errors) if errors else "No errors occurred."
+
+ return (
+ f"Processed {total_videos} videos. {len(errors)} errors occurred.",
+ error_summary,
+ results_html,
+ 'all_transcriptions.json',
+ 'all_summaries.txt'
+ )
+ except Exception as e:
+ logging.error(f"Unexpected error in process_videos_with_error_handling: {str(e)}", exc_info=True)
+ return (
+ f"An unexpected error occurred: {str(e)}",
+ str(e),
+ "Unexpected Error
" + str(e) + "
",
+ None,
+ None
+ )
+
+ def process_videos_wrapper(urls, start_time, end_time, diarize, whisper_model,
+ custom_prompt_checkbox, custom_prompt, chunking_options_checkbox,
+ chunk_method, max_chunk_size, chunk_overlap, use_adaptive_chunking,
+ use_multi_level_chunking, chunk_language, summarize_recursively, api_name,
+ api_key, keywords, use_cookies, cookies, batch_size,
+ timestamp_option, keep_original_video):
+ try:
+ logging.info("process_videos_wrapper called")
+ result = process_videos_with_error_handling(
+ urls, start_time, end_time, diarize, whisper_model,
+ custom_prompt_checkbox, custom_prompt, chunking_options_checkbox,
+ chunk_method, max_chunk_size, chunk_overlap, use_adaptive_chunking,
+ use_multi_level_chunking, chunk_language, api_name,
+ api_key, keywords, use_cookies, cookies, batch_size,
+ timestamp_option, keep_original_video, summarize_recursively
+ )
+ logging.info("process_videos_with_error_handling completed")
+
+ # Ensure that result is a tuple with 5 elements
+ if not isinstance(result, tuple) or len(result) != 5:
+ raise ValueError(
+ f"Expected 5 outputs, but got {len(result) if isinstance(result, tuple) else 1}")
+
+ return result
+ except Exception as e:
+ logging.error(f"Error in process_videos_wrapper: {str(e)}", exc_info=True)
+ # Return a tuple with 5 elements in case of any error
+ return (
+ f"An error occurred: {str(e)}", # progress_output
+ str(e), # error_output
+ f"Error: {str(e)}
", # results_output
+ None, # download_transcription
+ None # download_summary
+ )
+
+ # FIXME - remove dead args for process_url_with_metadata
+ @error_handler
+ def process_url_with_metadata(url, num_speakers, whisper_model, custom_prompt, offset, api_name, api_key,
+ vad_filter, download_video_flag, download_audio, rolling_summarization,
+ detail_level, question_box, keywords, local_file_path, diarize, end_time=None,
+ include_timestamps=True, metadata=None, use_chunking=False,
+ chunk_options=None, keep_original_video=False):
+
+ try:
+ logging.info(f"Starting process_url_metadata for URL: {url}")
+ # Create download path
+ download_path = create_download_directory("Video_Downloads")
+ logging.info(f"Download path created at: {download_path}")
+
+ # Initialize info_dict
+ info_dict = {}
+
+ # Handle URL or local file
+ if local_file_path:
+ video_file_path = local_file_path
+ # Extract basic info from local file
+ info_dict = {
+ 'webpage_url': local_file_path,
+ 'title': os.path.basename(local_file_path),
+ 'description': "Local file",
+ 'channel_url': None,
+ 'duration': None,
+ 'channel': None,
+ 'uploader': None,
+ 'upload_date': None
+ }
+ else:
+ # Extract video information
+ with yt_dlp.YoutubeDL({'quiet': True}) as ydl:
+ try:
+ full_info = ydl.extract_info(url, download=False)
+
+ # Create a safe subset of info to log
+ safe_info = {
+ 'title': full_info.get('title', 'No title'),
+ 'duration': full_info.get('duration', 'Unknown duration'),
+ 'upload_date': full_info.get('upload_date', 'Unknown upload date'),
+ 'uploader': full_info.get('uploader', 'Unknown uploader'),
+ 'view_count': full_info.get('view_count', 'Unknown view count')
+ }
+
+ logging.debug(f"Full info extracted for {url}: {safe_info}")
+ except Exception as e:
+ logging.error(f"Error extracting video info: {str(e)}")
+ return None, None, None, None, None, None
+
+ # Filter the required metadata
+ if full_info:
+ info_dict = {
+ 'webpage_url': full_info.get('webpage_url', url),
+ 'title': full_info.get('title'),
+ 'description': full_info.get('description'),
+ 'channel_url': full_info.get('channel_url'),
+ 'duration': full_info.get('duration'),
+ 'channel': full_info.get('channel'),
+ 'uploader': full_info.get('uploader'),
+ 'upload_date': full_info.get('upload_date')
+ }
+ logging.debug(f"Filtered info_dict: {info_dict}")
+ else:
+ logging.error("Failed to extract video information")
+ return None, None, None, None, None, None
+
+ # Download video/audio
+ logging.info("Downloading video/audio...")
+ video_file_path = download_video(url, download_path, full_info, download_video_flag)
+ if not video_file_path:
+ logging.error(f"Failed to download video/audio from {url}")
+ return None, None, None, None, None, None
+
+ logging.info(f"Processing file: {video_file_path}")
+
+ # Perform transcription
+ logging.info("Starting transcription...")
+ audio_file_path, segments = perform_transcription(video_file_path, offset, whisper_model,
+ vad_filter)
+
+ if audio_file_path is None or segments is None:
+ logging.error("Transcription failed or segments not available.")
+ return None, None, None, None, None, None
+
+ logging.info(f"Transcription completed. Number of segments: {len(segments)}")
+
+ # Add metadata to segments
+ segments_with_metadata = {
+ "metadata": info_dict,
+ "segments": segments
+ }
+
+ # Save segments with metadata to JSON file
+ segments_json_path = os.path.splitext(audio_file_path)[0] + ".segments.json"
+ with open(segments_json_path, 'w') as f:
+ json.dump(segments_with_metadata, f, indent=2)
+
+ # Delete the .wav file after successful transcription
+ files_to_delete = [audio_file_path]
+ for file_path in files_to_delete:
+ if file_path and os.path.exists(file_path):
+ try:
+ os.remove(file_path)
+ logging.info(f"Successfully deleted file: {file_path}")
+ except Exception as e:
+ logging.warning(f"Failed to delete file {file_path}: {str(e)}")
+
+ # Delete the mp4 file after successful transcription if not keeping original audio
+ # Modify the file deletion logic to respect keep_original_video
+ if not keep_original_video:
+ files_to_delete = [audio_file_path, video_file_path]
+ for file_path in files_to_delete:
+ if file_path and os.path.exists(file_path):
+ try:
+ os.remove(file_path)
+ logging.info(f"Successfully deleted file: {file_path}")
+ except Exception as e:
+ logging.warning(f"Failed to delete file {file_path}: {str(e)}")
+ else:
+ logging.info(f"Keeping original video file: {video_file_path}")
+ logging.info(f"Keeping original audio file: {audio_file_path}")
+
+ # Process segments based on the timestamp option
+ if not include_timestamps:
+ segments = [{'Text': segment['Text']} for segment in segments]
+
+ logging.info(f"Segments processed for timestamp inclusion: {segments}")
+
+ # Extract text from segments
+ transcription_text = extract_text_from_segments(segments)
+
+ if transcription_text.startswith("Error:"):
+ logging.error(f"Failed to extract transcription: {transcription_text}")
+ return None, None, None, None, None, None
+
+ # Use transcription_text instead of segments for further processing
+ full_text_with_metadata = f"{json.dumps(info_dict, indent=2)}\n\n{transcription_text}"
+
+ logging.debug(f"Full text with metadata extracted: {full_text_with_metadata[:100]}...")
+
+ # Perform summarization if API is provided
+ summary_text = None
+ if api_name:
+ # API key resolution handled at base of function if none provided
+ api_key = api_key if api_key else None
+ logging.info(f"Starting summarization with {api_name}...")
+ summary_text = perform_summarization(api_name, full_text_with_metadata, custom_prompt, api_key)
+ if summary_text is None:
+ logging.error("Summarization failed.")
+ return None, None, None, None, None, None
+ logging.debug(f"Summarization completed: {summary_text[:100]}...")
+
+ # Save transcription and summary
+ logging.info("Saving transcription and summary...")
+ download_path = create_download_directory("Audio_Processing")
+ json_file_path, summary_file_path = save_transcription_and_summary(full_text_with_metadata,
+ summary_text,
+ download_path, info_dict)
+ logging.info(
+ f"Transcription and summary saved. JSON file: {json_file_path}, Summary file: {summary_file_path}")
+
+ # Prepare keywords for database
+ if isinstance(keywords, str):
+ keywords_list = [kw.strip() for kw in keywords.split(',') if kw.strip()]
+ elif isinstance(keywords, (list, tuple)):
+ keywords_list = keywords
+ else:
+ keywords_list = []
+ logging.info(f"Keywords prepared: {keywords_list}")
+
+ # Add to database
+ logging.info("Adding to database...")
+ add_media_to_database(info_dict['webpage_url'], info_dict, full_text_with_metadata, summary_text,
+ keywords_list, custom_prompt, whisper_model)
+ logging.info(f"Media added to database: {info_dict['webpage_url']}")
+
+ return info_dict[
+ 'webpage_url'], full_text_with_metadata, summary_text, json_file_path, summary_file_path, info_dict
+
+ except Exception as e:
+ logging.error(f"Error in process_url_with_metadata: {str(e)}", exc_info=True)
+ return None, None, None, None, None, None
+
+ process_button.click(
+ fn=process_videos_wrapper,
+ inputs=[
+ url_input, start_time_input, end_time_input, diarize_input, whisper_model_input,
+ custom_prompt_checkbox, custom_prompt_input, chunking_options_checkbox,
+ chunk_method, max_chunk_size, chunk_overlap, use_adaptive_chunking,
+ use_multi_level_chunking, chunk_language, summarize_recursively, api_name_input, api_key_input,
+ keywords_input, use_cookies_input, cookies_input, batch_size_input,
+ timestamp_option, keep_original_video
+ ],
+ outputs=[progress_output, error_output, results_output, download_transcription, download_summary]
+ )
+
+
+def create_audio_processing_tab():
+ with gr.TabItem("Audio File Transcription + Summarization"):
+ gr.Markdown("# Transcribe & Summarize Audio Files from URLs or Local Files!")
+ with gr.Row():
+ with gr.Column():
+ audio_url_input = gr.Textbox(label="Audio File URL(s)", placeholder="Enter the URL(s) of the audio file(s), one per line")
+ audio_file_input = gr.File(label="Upload Audio File", file_types=["audio/*"])
+
+ use_cookies_input = gr.Checkbox(label="Use cookies for authenticated download", value=False)
+ cookies_input = gr.Textbox(
+ label="Audio Download Cookies",
+ placeholder="Paste your cookies here (JSON format)",
+ lines=3,
+ visible=False
+ )
+
+ use_cookies_input.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[use_cookies_input],
+ outputs=[cookies_input]
+ )
+
+ diarize_input = gr.Checkbox(label="Enable Speaker Diarization", value=False)
+ whisper_model_input = gr.Dropdown(choices=whisper_models, value="medium", label="Whisper Model")
+ custom_prompt_checkbox = gr.Checkbox(label="Use Custom Prompt", value=False, visible=True)
+ custom_prompt_input = gr.Textbox(label="Custom Prompt", placeholder="Enter custom prompt here", lines=3, visible=False)
+ custom_prompt_checkbox.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[custom_prompt_checkbox],
+ outputs=[custom_prompt_input]
+ )
+ api_name_input = gr.Dropdown(
+ choices=[None, "Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "OpenRouter",
+ "Llama.cpp", "Kobold", "Ooba", "Tabbyapi", "VLLM", "HuggingFace"],
+ value=None,
+ label="API for Summarization (Optional)"
+ )
+ api_key_input = gr.Textbox(label="API Key (if required)", placeholder="Enter your API key here", type="password")
+ custom_keywords_input = gr.Textbox(label="Custom Keywords", placeholder="Enter custom keywords, comma-separated")
+ keep_original_input = gr.Checkbox(label="Keep original audio file", value=False)
+
+ chunking_options_checkbox = gr.Checkbox(label="Show Chunking Options", value=False)
+ with gr.Row(visible=False) as chunking_options_box:
+ gr.Markdown("### Chunking Options")
+ with gr.Column():
+ chunk_method = gr.Dropdown(choices=['words', 'sentences', 'paragraphs', 'tokens'], label="Chunking Method")
+ max_chunk_size = gr.Slider(minimum=100, maximum=1000, value=300, step=50, label="Max Chunk Size")
+ chunk_overlap = gr.Slider(minimum=0, maximum=100, value=0, step=10, label="Chunk Overlap")
+ use_adaptive_chunking = gr.Checkbox(label="Use Adaptive Chunking")
+ use_multi_level_chunking = gr.Checkbox(label="Use Multi-level Chunking")
+ chunk_language = gr.Dropdown(choices=['english', 'french', 'german', 'spanish'], label="Chunking Language")
+
+ chunking_options_checkbox.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[chunking_options_checkbox],
+ outputs=[chunking_options_box]
+ )
+
+ process_audio_button = gr.Button("Process Audio File(s)")
+
+ with gr.Column():
+ audio_progress_output = gr.Textbox(label="Progress")
+ audio_transcription_output = gr.Textbox(label="Transcription")
+ audio_summary_output = gr.Textbox(label="Summary")
+ download_transcription = gr.File(label="Download All Transcriptions as JSON")
+ download_summary = gr.File(label="Download All Summaries as Text")
+
+ process_audio_button.click(
+ fn=process_audio_files,
+ inputs=[audio_url_input, audio_file_input, whisper_model_input, api_name_input, api_key_input,
+ use_cookies_input, cookies_input, keep_original_input, custom_keywords_input, custom_prompt_input,
+ chunk_method, max_chunk_size, chunk_overlap, use_adaptive_chunking, use_multi_level_chunking,
+ chunk_language, diarize_input],
+ outputs=[audio_progress_output, audio_transcription_output, audio_summary_output]
+ )
+
+
+def create_podcast_tab():
+ with gr.TabItem("Podcast"):
+ gr.Markdown("# Podcast Transcription and Ingestion")
+ with gr.Row():
+ with gr.Column():
+ podcast_url_input = gr.Textbox(label="Podcast URL", placeholder="Enter the podcast URL here")
+ podcast_title_input = gr.Textbox(label="Podcast Title", placeholder="Will be auto-detected if possible")
+ podcast_author_input = gr.Textbox(label="Podcast Author", placeholder="Will be auto-detected if possible")
+
+ podcast_keywords_input = gr.Textbox(
+ label="Keywords",
+ placeholder="Enter keywords here (comma-separated, include series name if applicable)",
+ value="podcast,audio",
+ elem_id="podcast-keywords-input"
+ )
+
+ custom_prompt_checkbox = gr.Checkbox(label="Use Custom Prompt", value=False, visible=True)
+ podcast_custom_prompt_input = gr.Textbox(
+ label="Custom Prompt",
+ placeholder="Enter custom prompt for summarization (optional)",
+ lines=3,
+ visible=False
+ )
+ custom_prompt_checkbox.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[custom_prompt_checkbox],
+ outputs=[podcast_custom_prompt_input]
+ )
+
+ podcast_api_name_input = gr.Dropdown(
+ choices=[None, "Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "OpenRouter", "Llama.cpp",
+ "Kobold", "Ooba", "Tabbyapi", "VLLM", "HuggingFace"],
+ value=None,
+ label="API Name for Summarization (Optional)"
+ )
+ podcast_api_key_input = gr.Textbox(label="API Key (if required)", type="password")
+ podcast_whisper_model_input = gr.Dropdown(choices=whisper_models, value="medium", label="Whisper Model")
+
+ keep_original_input = gr.Checkbox(label="Keep original audio file", value=False)
+ enable_diarization_input = gr.Checkbox(label="Enable speaker diarization", value=False)
+
+ use_cookies_input = gr.Checkbox(label="Use cookies for yt-dlp", value=False)
+ cookies_input = gr.Textbox(
+ label="yt-dlp Cookies",
+ placeholder="Paste your cookies here (JSON format)",
+ lines=3,
+ visible=False
+ )
+
+ use_cookies_input.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[use_cookies_input],
+ outputs=[cookies_input]
+ )
+
+ chunking_options_checkbox = gr.Checkbox(label="Show Chunking Options", value=False)
+ with gr.Row(visible=False) as chunking_options_box:
+ gr.Markdown("### Chunking Options")
+ with gr.Column():
+ chunk_method = gr.Dropdown(choices=['words', 'sentences', 'paragraphs', 'tokens'], label="Chunking Method")
+ max_chunk_size = gr.Slider(minimum=100, maximum=1000, value=300, step=50, label="Max Chunk Size")
+ chunk_overlap = gr.Slider(minimum=0, maximum=100, value=0, step=10, label="Chunk Overlap")
+ use_adaptive_chunking = gr.Checkbox(label="Use Adaptive Chunking")
+ use_multi_level_chunking = gr.Checkbox(label="Use Multi-level Chunking")
+ chunk_language = gr.Dropdown(choices=['english', 'french', 'german', 'spanish'], label="Chunking Language")
+
+ chunking_options_checkbox.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[chunking_options_checkbox],
+ outputs=[chunking_options_box]
+ )
+
+ podcast_process_button = gr.Button("Process Podcast")
+
+ with gr.Column():
+ podcast_progress_output = gr.Textbox(label="Progress")
+ podcast_error_output = gr.Textbox(label="Error Messages")
+ podcast_transcription_output = gr.Textbox(label="Transcription")
+ podcast_summary_output = gr.Textbox(label="Summary")
+ download_transcription = gr.File(label="Download Transcription as JSON")
+ download_summary = gr.File(label="Download Summary as Text")
+
+ podcast_process_button.click(
+ fn=process_podcast,
+ inputs=[podcast_url_input, podcast_title_input, podcast_author_input,
+ podcast_keywords_input, podcast_custom_prompt_input, podcast_api_name_input,
+ podcast_api_key_input, podcast_whisper_model_input, keep_original_input,
+ enable_diarization_input, use_cookies_input, cookies_input,
+ chunk_method, max_chunk_size, chunk_overlap, use_adaptive_chunking,
+ use_multi_level_chunking, chunk_language],
+ outputs=[podcast_progress_output, podcast_transcription_output, podcast_summary_output,
+ podcast_title_input, podcast_author_input, podcast_keywords_input, podcast_error_output,
+ download_transcription, download_summary]
+ )
+
+
+def create_website_scraping_tab():
+ with gr.TabItem("Website Scraping"):
+ gr.Markdown("# Scrape Websites & Summarize Articles using a Headless Chrome Browser!")
+ with gr.Row():
+ with gr.Column():
+ url_input = gr.Textbox(label="Article URLs", placeholder="Enter article URLs here, one per line", lines=5)
+ custom_article_title_input = gr.Textbox(label="Custom Article Titles (Optional, one per line)",
+ placeholder="Enter custom titles for the articles, one per line",
+ lines=5)
+ custom_prompt_input = gr.Textbox(label="Custom Prompt (Optional)",
+ placeholder="Provide a custom prompt for summarization", lines=3)
+ api_name_input = gr.Dropdown(
+ choices=[None, "Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "OpenRouter",
+ "Llama.cpp", "Kobold", "Ooba", "Tabbyapi", "VLLM", "HuggingFace"], value=None, label="API Name (Mandatory for Summarization)")
+ api_key_input = gr.Textbox(label="API Key (Mandatory if API Name is specified)",
+ placeholder="Enter your API key here; Ignore if using Local API or Built-in API")
+ keywords_input = gr.Textbox(label="Keywords", placeholder="Enter keywords here (comma-separated)",
+ value="default,no_keyword_set", visible=True)
+
+ scrape_button = gr.Button("Scrape and Summarize")
+ with gr.Column():
+ result_output = gr.Textbox(label="Result", lines=20)
+
+ scrape_button.click(
+ fn=scrape_and_summarize_multiple,
+ inputs=[url_input, custom_prompt_input, api_name_input, api_key_input, keywords_input,
+ custom_article_title_input],
+ outputs=result_output
+ )
+
+
+def create_pdf_ingestion_tab():
+ with gr.TabItem("PDF Ingestion"):
+ # TODO - Add functionality to extract metadata from pdf as part of conversion process in marker
+ gr.Markdown("# Ingest PDF Files and Extract Metadata")
+ with gr.Row():
+ with gr.Column():
+ pdf_file_input = gr.File(label="Uploaded PDF File", file_types=[".pdf"], visible=False)
+ pdf_upload_button = gr.UploadButton("Click to Upload PDF", file_types=[".pdf"])
+ pdf_title_input = gr.Textbox(label="Title (Optional)")
+ pdf_author_input = gr.Textbox(label="Author (Optional)")
+ pdf_keywords_input = gr.Textbox(label="Keywords (Optional, comma-separated)")
+ pdf_ingest_button = gr.Button("Ingest PDF")
+
+ pdf_upload_button.upload(fn=lambda file: file, inputs=pdf_upload_button, outputs=pdf_file_input)
+ with gr.Column():
+ pdf_result_output = gr.Textbox(label="Result")
+
+ pdf_ingest_button.click(
+ fn=process_and_cleanup_pdf,
+ inputs=[pdf_file_input, pdf_title_input, pdf_author_input, pdf_keywords_input],
+ outputs=pdf_result_output
+ )
+#
+#
+################################################################################################################
+# Functions for Re-Summarization
+#
+
+
+
+def create_resummary_tab():
+ with gr.TabItem("Re-Summarize"):
+ gr.Markdown("# Re-Summarize Existing Content")
+ with gr.Row():
+ search_query_input = gr.Textbox(label="Search Query", placeholder="Enter your search query here...")
+ search_type_input = gr.Radio(choices=["Title", "URL", "Keyword", "Content"], value="Title", label="Search By")
+ search_button = gr.Button("Search")
+
+ items_output = gr.Dropdown(label="Select Item", choices=[], interactive=True)
+ item_mapping = gr.State({})
+
+ with gr.Row():
+ api_name_input = gr.Dropdown(
+ choices=["Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "OpenRouter",
+ "Llama.cpp", "Kobold", "Ooba", "Tabbyapi", "VLLM", "HuggingFace"],
+ value="Local-LLM", label="API Name")
+ api_key_input = gr.Textbox(label="API Key", placeholder="Enter your API key here")
+
+ chunking_options_checkbox = gr.Checkbox(label="Use Chunking", value=False)
+ with gr.Row(visible=False) as chunking_options_box:
+ chunk_method = gr.Dropdown(choices=['words', 'sentences', 'paragraphs', 'tokens'],
+ label="Chunking Method", value='words')
+ max_chunk_size = gr.Slider(minimum=100, maximum=1000, value=300, step=50, label="Max Chunk Size")
+ chunk_overlap = gr.Slider(minimum=0, maximum=100, value=0, step=10, label="Chunk Overlap")
+
+ custom_prompt_checkbox = gr.Checkbox(label="Use Custom Prompt", value=False)
+ custom_prompt_input = gr.Textbox(label="Custom Prompt", placeholder="Enter custom prompt here", lines=3, visible=False)
+
+ resummary_button = gr.Button("Re-Summarize")
+
+ result_output = gr.Textbox(label="Result")
+
+ # Connect the UI elements
+ search_button.click(
+ fn=update_resummary_dropdown,
+ inputs=[search_query_input, search_type_input],
+ outputs=[items_output, item_mapping]
+ )
+
+ chunking_options_checkbox.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[chunking_options_checkbox],
+ outputs=[chunking_options_box]
+ )
+
+ custom_prompt_checkbox.change(
+ fn=lambda x: gr.update(visible=x),
+ inputs=[custom_prompt_checkbox],
+ outputs=[custom_prompt_input]
+ )
+
+ resummary_button.click(
+ fn=resummary_content_wrapper,
+ inputs=[items_output, item_mapping, api_name_input, api_key_input, chunking_options_checkbox, chunk_method,
+ max_chunk_size, chunk_overlap, custom_prompt_checkbox, custom_prompt_input],
+ outputs=result_output
+ )
+
+ return search_query_input, search_type_input, search_button, items_output, item_mapping, api_name_input, api_key_input, chunking_options_checkbox, chunking_options_box, chunk_method, max_chunk_size, chunk_overlap, custom_prompt_checkbox, custom_prompt_input, resummary_button, result_output
+
+
+def update_resummary_dropdown(search_query, search_type):
+ if search_type in ['Title', 'URL']:
+ results = fetch_items_by_title_or_url(search_query, search_type)
+ elif search_type == 'Keyword':
+ results = fetch_items_by_keyword(search_query)
+ else: # Content
+ results = fetch_items_by_content(search_query)
+
+ item_options = [f"{item[1]} ({item[2]})" for item in results]
+ item_mapping = {f"{item[1]} ({item[2]})": item[0] for item in results}
+ return gr.update(choices=item_options), item_mapping
+
+
+def resummary_content_wrapper(selected_item, item_mapping, api_name, api_key, chunking_options_checkbox, chunk_method,
+ max_chunk_size, chunk_overlap, custom_prompt_checkbox, custom_prompt):
+ if not selected_item or not api_name or not api_key:
+ return "Please select an item and provide API details."
+
+ media_id = item_mapping.get(selected_item)
+ if not media_id:
+ return "Invalid selection."
+
+ content, old_prompt, old_summary = fetch_item_details(media_id)
+
+ if not content:
+ return "No content available for re-summarization."
+
+ # Prepare chunking options
+ chunk_options = {
+ 'method': chunk_method,
+ 'max_size': int(max_chunk_size),
+ 'overlap': int(chunk_overlap),
+ 'language': 'english',
+ 'adaptive': True,
+ 'multi_level': False,
+ } if chunking_options_checkbox else None
+
+ # Prepare summarization prompt
+ summarization_prompt = custom_prompt if custom_prompt_checkbox and custom_prompt else None
+
+ # Call the resummary_content function
+ result = resummary_content(media_id, content, api_name, api_key, chunk_options, summarization_prompt)
+
+ return result
+
+
+def resummary_content(selected_item, item_mapping, api_name, api_key, chunking_options_checkbox, chunk_method, max_chunk_size, chunk_overlap, custom_prompt_checkbox, custom_prompt):
+ if not selected_item or not api_name or not api_key:
+ return "Please select an item and provide API details."
+
+ media_id = item_mapping.get(selected_item)
+ if not media_id:
+ return "Invalid selection."
+
+ content, old_prompt, old_summary = fetch_item_details(media_id)
+
+ if not content:
+ return "No content available for re-summarization."
+
+ # Load configuration
+ config = load_comprehensive_config()
+
+ # Prepare chunking options
+ chunk_options = {
+ 'method': chunk_method,
+ 'max_size': int(max_chunk_size),
+ 'overlap': int(chunk_overlap),
+ 'language': 'english',
+ 'adaptive': True,
+ 'multi_level': False,
+ }
+
+ # Chunking logic
+ if chunking_options_checkbox:
+ chunks = improved_chunking_process(content, chunk_options)
+ else:
+ chunks = [{'text': content, 'metadata': {}}]
+
+ # Prepare summarization prompt
+ if custom_prompt_checkbox and custom_prompt:
+ summarization_prompt = custom_prompt
+ else:
+ summarization_prompt = config.get('Prompts', 'default_summary_prompt', fallback="Summarize the following text:")
+
+ # Summarization logic
+ summaries = []
+ for chunk in chunks:
+ chunk_text = chunk['text']
+ try:
+ chunk_summary = summarize_chunk(api_name, chunk_text, summarization_prompt, api_key)
+ if chunk_summary:
+ summaries.append(chunk_summary)
+ else:
+ logging.warning(f"Summarization failed for chunk: {chunk_text[:100]}...")
+ except Exception as e:
+ logging.error(f"Error during summarization: {str(e)}")
+ return f"Error during summarization: {str(e)}"
+
+ if not summaries:
+ return "Summarization failed for all chunks."
+
+ new_summary = " ".join(summaries)
+
+ # Update the database with the new summary
+ try:
+ update_result = update_media_content(selected_item, item_mapping, content, summarization_prompt, new_summary)
+ if "successfully" in update_result.lower():
+ return f"Re-summarization complete. New summary: {new_summary[:500]}..."
+ else:
+ return f"Error during database update: {update_result}"
+ except Exception as e:
+ logging.error(f"Error updating database: {str(e)}")
+ return f"Error updating database: {str(e)}"
+
+# End of Re-Summarization Functions
+#
+##############################################################################################################
+#
+# Search Tab
+
+def add_or_update_prompt(title, description, system_prompt, user_prompt):
+ if not title:
+ return "Error: Title is required."
+
+ existing_prompt = fetch_prompt_details(title)
+ if existing_prompt:
+ # Update existing prompt
+ result = update_prompt_in_db(title, description, system_prompt, user_prompt)
+ else:
+ # Insert new prompt
+ result = insert_prompt_to_db(title, description, system_prompt, user_prompt)
+
+ # Refresh the prompt dropdown
+ update_prompt_dropdown()
+ return result
+
+
+def load_prompt_details(selected_prompt):
+ if selected_prompt:
+ details = fetch_prompt_details(selected_prompt)
+ if details:
+ return details[0], details[1], details[2], details[3]
+ return "", "", "", ""
+
+
+def update_prompt_in_db(title, description, system_prompt, user_prompt):
+ try:
+ conn = sqlite3.connect('prompts.db')
+ cursor = conn.cursor()
+ cursor.execute(
+ "UPDATE Prompts SET details = ?, system = ?, user = ? WHERE name = ?",
+ (description, system_prompt, user_prompt, title)
+ )
+ conn.commit()
+ conn.close()
+ return "Prompt updated successfully!"
+ except sqlite3.Error as e:
+ return f"Error updating prompt: {e}"
+
+
+def search_prompts(query):
+ try:
+ conn = sqlite3.connect('prompts.db')
+ cursor = conn.cursor()
+ cursor.execute("SELECT name, details, system, user FROM Prompts WHERE name LIKE ? OR details LIKE ?",
+ (f"%{query}%", f"%{query}%"))
+ results = cursor.fetchall()
+ conn.close()
+ return results
+ except sqlite3.Error as e:
+ print(f"Error searching prompts: {e}")
+ return []
+
+
+def create_search_tab():
+ with gr.TabItem("Search / Detailed View"):
+ with gr.Row():
+ with gr.Column():
+ gr.Markdown("# Search across all ingested items in the Database")
+ gr.Markdown(" by Title / URL / Keyword / or Content via SQLite Full-Text-Search")
+ search_query_input = gr.Textbox(label="Search Query", placeholder="Enter your search query here...")
+ search_type_input = gr.Radio(choices=["Title", "URL", "Keyword", "Content"], value="Title", label="Search By")
+ search_button = gr.Button("Search")
+ items_output = gr.Dropdown(label="Select Item", choices=[])
+ item_mapping = gr.State({})
+ prompt_summary_output = gr.HTML(label="Prompt & Summary", visible=True)
+ content_output = gr.Markdown(label="Content", visible=True)
+
+ search_button.click(
+ fn=update_dropdown,
+ inputs=[search_query_input, search_type_input],
+ outputs=[items_output, item_mapping]
+ )
+ with gr.Column():
+ items_output.change(
+ fn=update_detailed_view,
+ inputs=[items_output, item_mapping],
+ outputs=[prompt_summary_output, content_output]
+ )
+def create_prompt_view_tab():
+ def display_search_results(query):
+ if not query.strip():
+ return "Please enter a search query."
+
+ results = search_prompts(query)
+
+ print(f"Processed search results for query '{query}': {results}")
+
+ if results:
+ result_md = "## Search Results:\n"
+ for result in results:
+ print(f"Result item: {result}")
+
+ if len(result) == 4:
+ name, details, system, user = result
+ result_md += f"**Title:** {name}\n\n"
+ result_md += f"**Description:** {details}\n\n"
+ result_md += f"**System Prompt:** {system}\n\n"
+ result_md += f"**User Prompt:** {user}\n\n"
+ result_md += "---\n"
+ else:
+ result_md += "Error: Unexpected result format.\n\n---\n"
+ return result_md
+ return "No results found."
+ with gr.TabItem("Search Prompts"):
+ with gr.Row():
+ with gr.Column():
+ gr.Markdown("# Search and View Prompt Details")
+ gr.Markdown("Currently has all of the https://github.com/danielmiessler/fabric prompts already available")
+ search_query_input = gr.Textbox(label="Search Prompts", placeholder="Enter your search query...")
+ search_button = gr.Button("Search Prompts")
+ with gr.Column():
+ search_results_output = gr.Markdown()
+ prompt_details_output = gr.HTML()
+ search_button.click(
+ fn=display_search_results,
+ inputs=[search_query_input],
+ outputs=[search_results_output]
+ )
+
+
+
+def create_prompt_edit_tab():
+ with gr.TabItem("Edit Prompts"):
+ with gr.Row():
+ with gr.Column():
+ prompt_dropdown = gr.Dropdown(
+ label="Select Prompt",
+ choices=[],
+ interactive=True
+ )
+ prompt_list_button = gr.Button("List Prompts")
+
+ with gr.Column():
+ title_input = gr.Textbox(label="Title", placeholder="Enter the prompt title")
+ description_input = gr.Textbox(label="Description", placeholder="Enter the prompt description", lines=3)
+ system_prompt_input = gr.Textbox(label="System Prompt", placeholder="Enter the system prompt", lines=3)
+ user_prompt_input = gr.Textbox(label="User Prompt", placeholder="Enter the user prompt", lines=3)
+ add_prompt_button = gr.Button("Add/Update Prompt")
+ add_prompt_output = gr.HTML()
+
+ # Event handlers
+ prompt_list_button.click(
+ fn=update_prompt_dropdown,
+ outputs=prompt_dropdown
+ )
+
+ add_prompt_button.click(
+ fn=add_or_update_prompt,
+ inputs=[title_input, description_input, system_prompt_input, user_prompt_input],
+ outputs=add_prompt_output
+ )
+
+ # Load prompt details when selected
+ prompt_dropdown.change(
+ fn=load_prompt_details,
+ inputs=[prompt_dropdown],
+ outputs=[title_input, description_input, system_prompt_input, user_prompt_input]
+ )
+
+
+# End of Search Tab Functions
+#
+################################################################################################################
+#
+# Llamafile Tab
+
+
+def start_llamafile(*args):
+ # Unpack arguments
+ (am_noob, verbose_checked, threads_checked, threads_value, http_threads_checked, http_threads_value,
+ model_checked, model_value, hf_repo_checked, hf_repo_value, hf_file_checked, hf_file_value,
+ ctx_size_checked, ctx_size_value, ngl_checked, ngl_value, host_checked, host_value, port_checked,
+ port_value) = args
+
+ # Construct command based on checked values
+ command = []
+ if am_noob:
+ am_noob = True
+ if verbose_checked is not None and verbose_checked:
+ command.append('-v')
+ if threads_checked and threads_value is not None:
+ command.extend(['-t', str(threads_value)])
+ if http_threads_checked and http_threads_value is not None:
+ command.extend(['--threads', str(http_threads_value)])
+ if model_checked and model_value is not None:
+ model_path = model_value.name
+ command.extend(['-m', model_path])
+ if hf_repo_checked and hf_repo_value is not None:
+ command.extend(['-hfr', hf_repo_value])
+ if hf_file_checked and hf_file_value is not None:
+ command.extend(['-hff', hf_file_value])
+ if ctx_size_checked and ctx_size_value is not None:
+ command.extend(['-c', str(ctx_size_value)])
+ if ngl_checked and ngl_value is not None:
+ command.extend(['-ngl', str(ngl_value)])
+ if host_checked and host_value is not None:
+ command.extend(['--host', host_value])
+ if port_checked and port_value is not None:
+ command.extend(['--port', str(port_value)])
+
+ # Code to start llamafile with the provided configuration
+ local_llm_gui_function(am_noob, verbose_checked, threads_checked, threads_value,
+ http_threads_checked, http_threads_value, model_checked,
+ model_value, hf_repo_checked, hf_repo_value, hf_file_checked,
+ hf_file_value, ctx_size_checked, ctx_size_value, ngl_checked,
+ ngl_value, host_checked, host_value, port_checked, port_value, )
+
+ # Example command output to verify
+ return f"Command built and ran: {' '.join(command)} \n\nLlamafile started successfully."
+
+def stop_llamafile():
+ # Code to stop llamafile
+ # ...
+ return "Llamafile stopped"
+
+
+def create_llamafile_settings_tab():
+ with gr.TabItem("Local LLM with Llamafile"):
+ gr.Markdown("# Settings for Llamafile")
+ am_noob = gr.Checkbox(label="Check this to enable sane defaults", value=False, visible=True)
+ advanced_mode_toggle = gr.Checkbox(label="Advanced Mode - Enable to show all settings", value=False)
+
+ model_checked = gr.Checkbox(label="Enable Setting Local LLM Model Path", value=False, visible=True)
+ model_value = gr.Textbox(label="Select Local Model File", value="", visible=True)
+ ngl_checked = gr.Checkbox(label="Enable Setting GPU Layers", value=False, visible=True)
+ ngl_value = gr.Number(label="Number of GPU Layers", value=None, precision=0, visible=True)
+
+ advanced_inputs = create_llamafile_advanced_inputs()
+
+ start_button = gr.Button("Start Llamafile")
+ stop_button = gr.Button("Stop Llamafile")
+ output_display = gr.Markdown()
+
+ start_button.click(
+ fn=start_llamafile,
+ inputs=[am_noob, model_checked, model_value, ngl_checked, ngl_value] + advanced_inputs,
+ outputs=output_display
+ )
+
+
+def create_llamafile_advanced_inputs():
+ verbose_checked = gr.Checkbox(label="Enable Verbose Output", value=False, visible=False)
+ threads_checked = gr.Checkbox(label="Set CPU Threads", value=False, visible=False)
+ threads_value = gr.Number(label="Number of CPU Threads", value=None, precision=0, visible=False)
+ http_threads_checked = gr.Checkbox(label="Set HTTP Server Threads", value=False, visible=False)
+ http_threads_value = gr.Number(label="Number of HTTP Server Threads", value=None, precision=0, visible=False)
+ hf_repo_checked = gr.Checkbox(label="Use Huggingface Repo Model", value=False, visible=False)
+ hf_repo_value = gr.Textbox(label="Huggingface Repo Name", value="", visible=False)
+ hf_file_checked = gr.Checkbox(label="Set Huggingface Model File", value=False, visible=False)
+ hf_file_value = gr.Textbox(label="Huggingface Model File", value="", visible=False)
+ ctx_size_checked = gr.Checkbox(label="Set Prompt Context Size", value=False, visible=False)
+ ctx_size_value = gr.Number(label="Prompt Context Size", value=8124, precision=0, visible=False)
+ host_checked = gr.Checkbox(label="Set IP to Listen On", value=False, visible=False)
+ host_value = gr.Textbox(label="Host IP Address", value="", visible=False)
+ port_checked = gr.Checkbox(label="Set Server Port", value=False, visible=False)
+ port_value = gr.Number(label="Port Number", value=None, precision=0, visible=False)
+
+ return [verbose_checked, threads_checked, threads_value, http_threads_checked, http_threads_value,
+ hf_repo_checked, hf_repo_value, hf_file_checked, hf_file_value, ctx_size_checked, ctx_size_value,
+ host_checked, host_value, port_checked, port_value]
+
+#
+# End of Llamafile Tab Functions
+################################################################################################################
+#
+# Chat Interface Tab Functions
+
+
+def create_chat_interface():
+ with gr.TabItem("Remote LLM Chat"):
+ gr.Markdown("# Chat with a designated LLM Endpoint, using your selected item as starting context")
+
+ with gr.Row():
+ with gr.Column(scale=1):
+ search_query_input = gr.Textbox(label="Search Query", placeholder="Enter your search query here...")
+ search_type_input = gr.Radio(choices=["Title", "URL", "Keyword", "Content"], value="Title", label="Search By")
+ search_button = gr.Button("Search")
+
+ with gr.Column(scale=2):
+ items_output = gr.Dropdown(label="Select Item", choices=[], interactive=True)
+ item_mapping = gr.State({})
+
+ with gr.Row():
+ use_content = gr.Checkbox(label="Use Content")
+ use_summary = gr.Checkbox(label="Use Summary")
+ use_prompt = gr.Checkbox(label="Use Prompt")
+
+ api_endpoint = gr.Dropdown(label="Select API Endpoint", choices=["Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "OpenRouter", "Llama.cpp", "Kobold", "Ooba", "Tabbyapi", "VLLM", "HuggingFace"])
+ api_key = gr.Textbox(label="API Key (if required)", type="password")
+ preset_prompt = gr.Dropdown(label="Select Preset Prompt", choices=load_preset_prompts())
+ user_prompt = gr.Textbox(label="Modify Prompt (Need to delete this after the first message, otherwise it'll "
+ "be used as the next message instead)", lines=3)
+
+ chatbot = gr.Chatbot(height=500)
+ msg = gr.Textbox(label="Enter your message")
+ submit = gr.Button("Submit")
+
+ chat_history = gr.State([])
+ media_content = gr.State({})
+ selected_parts = gr.State([])
+
+ save_button = gr.Button("Save Chat History")
+ download_file = gr.File(label="Download Chat History")
+
+ def chat_wrapper(message, history, media_content, selected_parts, api_endpoint, api_key, user_prompt):
+ print(f"Debug - Chat Wrapper - Message: {message}")
+ print(f"Debug - Chat Wrapper - Media Content: {media_content}")
+ print(f"Debug - Chat Wrapper - Selected Parts: {selected_parts}")
+ print(f"Debug - Chat Wrapper - API Endpoint: {api_endpoint}")
+ print(f"Debug - Chat Wrapper - User Prompt: {user_prompt}")
+
+ selected_content = "\n\n".join(
+ [f"{part.capitalize()}: {media_content.get(part, '')}" for part in selected_parts if
+ part in media_content])
+ print(f"Debug - Chat Wrapper - Selected Content: {selected_content[:500]}...") # Print first 500 chars
+
+ context = f"Selected content:\n{selected_content}\n\nUser message: {message}"
+ print(f"Debug - Chat Wrapper - Context: {context[:500]}...") # Print first 500 chars
+
+ # Use a default API endpoint if none is selected
+ if not api_endpoint:
+ api_endpoint = "OpenAI" # You can change this to any default endpoint you prefer
+ print(f"Debug - Chat Wrapper - Using default API Endpoint: {api_endpoint}")
+
+ bot_message = chat(context, history, media_content, selected_parts, api_endpoint, api_key, user_prompt)
+ print(f"Debug - Chat Wrapper - Bot Message: {bot_message[:500]}...") # Print first 500 chars
+
+ history.append((message, bot_message))
+ return "", history
+
+ submit.click(
+ chat_wrapper,
+ inputs=[msg, chat_history, media_content, selected_parts, api_endpoint, api_key, user_prompt],
+ outputs=[msg, chatbot]
+ )
+
+ def save_chat_history(history):
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ filename = f"chat_history_{timestamp}.json"
+ with open(filename, "w") as f:
+ json.dump(history, f)
+ return filename
+
+ save_button.click(save_chat_history, inputs=[chat_history], outputs=[download_file])
+
+ search_button.click(
+ fn=update_dropdown,
+ inputs=[search_query_input, search_type_input],
+ outputs=[items_output, item_mapping]
+ )
+
+ def update_user_prompt(preset_name):
+ details = fetch_prompt_details(preset_name)
+ if details:
+ return details[1] # Return the system prompt
+ return ""
+
+ preset_prompt.change(update_user_prompt, inputs=preset_prompt, outputs=user_prompt)
+
+ def update_chat_content(selected_item, use_content, use_summary, use_prompt, item_mapping):
+ print(f"Debug - Update Chat Content - Selected Item: {selected_item}")
+ print(f"Debug - Update Chat Content - Use Content: {use_content}")
+ print(f"Debug - Update Chat Content - Use Summary: {use_summary}")
+ print(f"Debug - Update Chat Content - Use Prompt: {use_prompt}")
+ print(f"Debug - Update Chat Content - Item Mapping: {item_mapping}")
+
+ if selected_item and selected_item in item_mapping:
+ media_id = item_mapping[selected_item]
+ content = load_media_content(media_id)
+ selected_parts = []
+ if use_content and "content" in content:
+ selected_parts.append("content")
+ if use_summary and "summary" in content:
+ selected_parts.append("summary")
+ if use_prompt and "prompt" in content:
+ selected_parts.append("prompt")
+ print(f"Debug - Update Chat Content - Content: {content}")
+ print(f"Debug - Update Chat Content - Selected Parts: {selected_parts}")
+ return content, selected_parts
+ else:
+ print(f"Debug - Update Chat Content - No item selected or item not in mapping")
+ return {}, []
+
+ items_output.change(
+ update_chat_content,
+ inputs=[items_output, use_content, use_summary, use_prompt, item_mapping],
+ outputs=[media_content, selected_parts]
+ )
+
+ def update_selected_parts(use_content, use_summary, use_prompt):
+ selected_parts = []
+ if use_content:
+ selected_parts.append("content")
+ if use_summary:
+ selected_parts.append("summary")
+ if use_prompt:
+ selected_parts.append("prompt")
+ print(f"Debug - Update Selected Parts: {selected_parts}")
+ return selected_parts
+
+ use_content.change(update_selected_parts, inputs=[use_content, use_summary, use_prompt],
+ outputs=[selected_parts])
+ use_summary.change(update_selected_parts, inputs=[use_content, use_summary, use_prompt],
+ outputs=[selected_parts])
+ use_prompt.change(update_selected_parts, inputs=[use_content, use_summary, use_prompt],
+ outputs=[selected_parts])
+
+ def update_selected_parts(use_content, use_summary, use_prompt):
+ selected_parts = []
+ if use_content:
+ selected_parts.append("content")
+ if use_summary:
+ selected_parts.append("summary")
+ if use_prompt:
+ selected_parts.append("prompt")
+ print(f"Debug - Update Selected Parts: {selected_parts}")
+ return selected_parts
+
+ use_content.change(update_selected_parts, inputs=[use_content, use_summary, use_prompt],
+ outputs=[selected_parts])
+ use_summary.change(update_selected_parts, inputs=[use_content, use_summary, use_prompt],
+ outputs=[selected_parts])
+ use_prompt.change(update_selected_parts, inputs=[use_content, use_summary, use_prompt],
+ outputs=[selected_parts])
+
+ # Add debug output
+ def debug_output(media_content, selected_parts):
+ print(f"Debug - Media Content: {media_content}")
+ print(f"Debug - Selected Parts: {selected_parts}")
+ return ""
+
+ items_output.change(debug_output, inputs=[media_content, selected_parts], outputs=[])
+
+#
+# End of Chat Interface Tab Functions
+################################################################################################################
+#
+# Media Edit Tab Functions
+
+def create_media_edit_tab():
+ with gr.TabItem("Edit Existing Items"):
+ gr.Markdown("# Search and Edit Media Items")
+
+ with gr.Row():
+ search_query_input = gr.Textbox(label="Search Query", placeholder="Enter your search query here...")
+ search_type_input = gr.Radio(choices=["Title", "URL", "Keyword", "Content"], value="Title", label="Search By")
+ search_button = gr.Button("Search")
+
+ with gr.Row():
+ items_output = gr.Dropdown(label="Select Item", choices=[], interactive=True)
+ item_mapping = gr.State({})
+
+ content_input = gr.Textbox(label="Edit Content", lines=10)
+ prompt_input = gr.Textbox(label="Edit Prompt", lines=3)
+ summary_input = gr.Textbox(label="Edit Summary", lines=5)
+
+ update_button = gr.Button("Update Media Content")
+ status_message = gr.Textbox(label="Status", interactive=False)
+
+ search_button.click(
+ fn=update_dropdown,
+ inputs=[search_query_input, search_type_input],
+ outputs=[items_output, item_mapping]
+ )
+
+ def load_selected_media_content(selected_item, item_mapping):
+ if selected_item and item_mapping and selected_item in item_mapping:
+ media_id = item_mapping[selected_item]
+ content, prompt, summary = fetch_item_details(media_id)
+ return content, prompt, summary
+ return "No item selected or invalid selection", "", ""
+
+ items_output.change(
+ fn=load_selected_media_content,
+ inputs=[items_output, item_mapping],
+ outputs=[content_input, prompt_input, summary_input]
+ )
+
+ update_button.click(
+ fn=update_media_content,
+ inputs=[items_output, item_mapping, content_input, prompt_input, summary_input],
+ outputs=status_message
+ )
+#
+#
+################################################################################################################
+#
+# Import Items Tab Functions
+
+
+def import_data(file, title, author, keywords, custom_prompt, summary, auto_summarize, api_name, api_key):
+ if file is None:
+ return "No file uploaded. Please upload a file."
+
+ try:
+ logging.debug(f"File object type: {type(file)}")
+ logging.debug(f"File object attributes: {dir(file)}")
+
+ if hasattr(file, 'name'):
+ file_name = file.name
+ else:
+ file_name = 'unknown_file'
+
+ if isinstance(file, str):
+ # If file is a string, it's likely a file path
+ file_path = file
+ with open(file_path, 'r', encoding='utf-8') as f:
+ file_content = f.read()
+ elif hasattr(file, 'read'):
+ # If file has a 'read' method, it's likely a file-like object
+ file_content = file.read()
+ if isinstance(file_content, bytes):
+ file_content = file_content.decode('utf-8')
+ else:
+ # If it's neither a string nor a file-like object, try converting it to a string
+ file_content = str(file)
+
+ logging.debug(f"File name: {file_name}")
+ logging.debug(f"File content (first 100 chars): {file_content[:100]}")
+
+ # Create info_dict
+ info_dict = {
+ 'title': title or 'Untitled',
+ 'uploader': author or 'Unknown',
+ }
+
+ # Create segments (assuming one segment for the entire content)
+ segments = [{'Text': file_content}]
+
+ # Process keywords
+ keyword_list = [kw.strip() for kw in keywords.split(',') if kw.strip()]
+
+ # Handle summarization
+ if auto_summarize and api_name and api_key:
+ summary = perform_summarization(api_name, file_content, custom_prompt, api_key)
+ elif not summary:
+ summary = "No summary provided"
+
+ # Add to database
+ add_media_to_database(
+ url=file_name, # Using filename as URL
+ info_dict=info_dict,
+ segments=segments,
+ summary=summary,
+ keywords=keyword_list,
+ custom_prompt_input=custom_prompt,
+ whisper_model="Imported", # Indicating this was an imported file,
+ media_type = "document"
+ )
+
+ return f"File '{file_name}' successfully imported with title '{title}' and author '{author}'."
+ except Exception as e:
+ logging.error(f"Error importing file: {str(e)}")
+ return f"Error importing file: {str(e)}"
+
+
+def create_import_item_tab():
+ with gr.TabItem("Import Items"):
+ gr.Markdown("# Import a markdown file or text file into the database")
+ gr.Markdown("...and have it tagged + summarized")
+ with gr.Row():
+ import_file = gr.File(label="Upload file for import", file_types=["txt", "md"])
+ with gr.Row():
+ title_input = gr.Textbox(label="Title", placeholder="Enter the title of the content")
+ author_input = gr.Textbox(label="Author", placeholder="Enter the author's name")
+ with gr.Row():
+ keywords_input = gr.Textbox(label="Keywords", placeholder="Enter keywords, comma-separated")
+ custom_prompt_input = gr.Textbox(label="Custom Prompt",
+ placeholder="Enter a custom prompt for summarization (optional)")
+ with gr.Row():
+ summary_input = gr.Textbox(label="Summary",
+ placeholder="Enter a summary or leave blank for auto-summarization", lines=3)
+ with gr.Row():
+ auto_summarize_checkbox = gr.Checkbox(label="Auto-summarize", value=False)
+ api_name_input = gr.Dropdown(
+ choices=[None, "Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "OpenRouter",
+ "Llama.cpp", "Kobold", "Ooba", "Tabbyapi", "VLLM", "HuggingFace"],
+ label="API for Auto-summarization"
+ )
+ api_key_input = gr.Textbox(label="API Key", type="password")
+ with gr.Row():
+ import_button = gr.Button("Import Data")
+ with gr.Row():
+ import_output = gr.Textbox(label="Import Status")
+
+ import_button.click(
+ fn=import_data,
+ inputs=[import_file, title_input, author_input, keywords_input, custom_prompt_input,
+ summary_input, auto_summarize_checkbox, api_name_input, api_key_input],
+ outputs=import_output
+ )
+
+#
+# End of Import Items Tab Functions
+################################################################################################################
+#
+# Export Items Tab Functions
+
+
+def create_export_tab():
+ with gr.Tab("Export"):
+ with gr.Tab("Export Search Results"):
+ search_query = gr.Textbox(label="Search Query", placeholder="Enter your search query here...")
+ search_fields = gr.CheckboxGroup(label="Search Fields", choices=["Title", "Content"], value=["Title"])
+ keyword_input = gr.Textbox(
+ label="Keyword (Match ALL, can use multiple keywords, separated by ',' (comma) )",
+ placeholder="Enter keywords here...")
+ page_input = gr.Number(label="Page", value=1, precision=0)
+ results_per_file_input = gr.Number(label="Results per File", value=1000, precision=0)
+ export_format = gr.Radio(label="Export Format", choices=["csv", "markdown"], value="csv")
+ export_search_button = gr.Button("Export Search Results")
+ export_search_output = gr.File(label="Download Exported Keywords")
+ export_search_status = gr.Textbox(label="Export Status")
+
+ export_search_button.click(
+ fn=export_to_file,
+ inputs=[search_query, search_fields, keyword_input, page_input, results_per_file_input, export_format],
+ outputs=[export_search_status, export_search_output]
+ )
+
+#
+# End of Export Items Tab Functions
+################################################################################################################
+#
+# Keyword Management Tab Functions
+
+def create_export_keywords_tab():
+ with gr.Group():
+ with gr.Tab("Export Keywords"):
+ export_keywords_button = gr.Button("Export Keywords")
+ export_keywords_output = gr.File(label="Download Exported Keywords")
+ export_keywords_status = gr.Textbox(label="Export Status")
+
+ export_keywords_button.click(
+ fn=export_keywords_to_csv,
+ outputs=[export_keywords_status, export_keywords_output]
+ )
+
+def create_view_keywords_tab():
+ with gr.TabItem("View Keywords"):
+ gr.Markdown("# Browse Keywords")
+ browse_output = gr.Markdown()
+ browse_button = gr.Button("View Existing Keywords")
+ browse_button.click(fn=keywords_browser_interface, outputs=browse_output)
+
+
+def create_add_keyword_tab():
+ with gr.TabItem("Add Keywords"):
+ with gr.Row():
+ gr.Markdown("# Add Keywords to the Database")
+ add_input = gr.Textbox(label="Add Keywords (comma-separated)", placeholder="Enter keywords here...")
+ add_button = gr.Button("Add Keywords")
+ with gr.Row():
+ add_output = gr.Textbox(label="Result")
+ add_button.click(fn=add_keyword, inputs=add_input, outputs=add_output)
+
+
+def create_delete_keyword_tab():
+ with gr.Tab("Delete Keywords"):
+ with gr.Row():
+ gr.Markdown("# Delete Keywords from the Database")
+ delete_input = gr.Textbox(label="Delete Keyword", placeholder="Enter keyword to delete here...")
+ delete_button = gr.Button("Delete Keyword")
+ with gr.Row():
+ delete_output = gr.Textbox(label="Result")
+ delete_button.click(fn=delete_keyword, inputs=delete_input, outputs=delete_output)
+
+#
+# End of Keyword Management Tab Functions
+################################################################################################################
+#
+# Utilities Tab Functions
+
+
+def create_utilities_tab():
+ with gr.Group():
+ with gr.Tab("YouTube Video Downloader"):
+ gr.Markdown(
+ "Youtube Video Downloader
This Input takes a Youtube URL as input and creates a webm file for you to download. If you want a full-featured one: https://github.com/StefanLobbenmeier/youtube-dl-gui or https://github.com/yt-dlg/yt-dlg
")
+ youtube_url_input = gr.Textbox(label="YouTube URL", placeholder="Enter YouTube video URL here")
+ download_button = gr.Button("Download Video")
+ output_file = gr.File(label="Download Video")
+
+ download_button.click(
+ fn=gradio_download_youtube_video,
+ inputs=youtube_url_input,
+ outputs=output_file
+ )
+
+ with gr.Tab("YouTube Audio Downloader"):
+ gr.Markdown(
+ "Youtube Audio Downloader
This Input takes a Youtube URL as input and creates an audio file for you to download. If you want a full-featured one: https://github.com/StefanLobbenmeier/youtube-dl-gui or https://github.com/yt-dlg/yt-dlg
")
+ youtube_url_input_audio = gr.Textbox(label="YouTube URL", placeholder="Enter YouTube video URL here")
+ download_button_audio = gr.Button("Download Audio")
+ output_file_audio = gr.File(label="Download Audio")
+
+ # Implement the audio download functionality here
+
+ with gr.Tab("Grammar Checker"):
+ gr.Markdown("# Grammar Check Utility to be added...")
+
+ with gr.Tab("YouTube Timestamp URL Generator"):
+ gr.Markdown("## Generate YouTube URL with Timestamp")
+ with gr.Row():
+ url_input = gr.Textbox(label="YouTube URL")
+ hours_input = gr.Number(label="Hours", value=0, minimum=0, precision=0)
+ minutes_input = gr.Number(label="Minutes", value=0, minimum=0, maximum=59, precision=0)
+ seconds_input = gr.Number(label="Seconds", value=0, minimum=0, maximum=59, precision=0)
+
+ generate_button = gr.Button("Generate URL")
+ output_url = gr.Textbox(label="Timestamped URL")
+
+ generate_button.click(
+ fn=generate_timestamped_url,
+ inputs=[url_input, hours_input, minutes_input, seconds_input],
+ outputs=output_url
+ )
+
+#
+# End of Utilities Tab Functions
+################################################################################################################
+
+# FIXME - Prompt sample box
+#
+# # Sample data
+# prompts_category_1 = [
+# "What are the key points discussed in the video?",
+# "Summarize the main arguments made by the speaker.",
+# "Describe the conclusions of the study presented."
+# ]
+#
+# prompts_category_2 = [
+# "How does the proposed solution address the problem?",
+# "What are the implications of the findings?",
+# "Can you explain the theory behind the observed phenomenon?"
+# ]
+#
+# all_prompts2 = prompts_category_1 + prompts_category_2
+
+
+def launch_ui(share_public=None, server_mode=False):
+ share=share_public
+ css = """
+ .result-box {
+ margin-bottom: 20px;
+ border: 1px solid #ddd;
+ padding: 10px;
+ }
+ .result-box.error {
+ border-color: #ff0000;
+ background-color: #ffeeee;
+ }
+ .transcription, .summary {
+ max-height: 300px;
+ overflow-y: auto;
+ border: 1px solid #eee;
+ padding: 10px;
+ margin-top: 10px;
+ }
+ """
+
+ with gr.Blocks(css=css) as iface:
+ gr.Markdown("# TL/DW: Too Long, Didn't Watch - Your Personal Research Multi-Tool")
+ with gr.Tabs():
+ with gr.TabItem("Transcription / Summarization / Ingestion"):
+ with gr.Tabs():
+ create_video_transcription_tab()
+ create_audio_processing_tab()
+ create_podcast_tab()
+ create_website_scraping_tab()
+ create_pdf_ingestion_tab()
+ create_resummary_tab()
+
+ with gr.TabItem("Search / Detailed View"):
+ create_search_tab()
+ create_prompt_view_tab()
+ create_prompt_edit_tab()
+
+ with gr.TabItem("Local LLM with Llamafile"):
+ create_llamafile_settings_tab()
+
+ with gr.TabItem("Remote LLM Chat"):
+ create_chat_interface()
+
+ with gr.TabItem("Edit Existing Items"):
+ create_media_edit_tab()
+
+ with gr.TabItem("Keywords"):
+ with gr.Tabs():
+ create_view_keywords_tab()
+ create_add_keyword_tab()
+ create_delete_keyword_tab()
+ create_export_keywords_tab()
+
+ with gr.TabItem("Import/Export"):
+ create_import_item_tab()
+ create_export_tab()
+
+ with gr.TabItem("Utilities"):
+ create_utilities_tab()
+
+ # Launch the interface
+ server_port_variable = 7860
+ if share==True:
+ iface.launch(share=True)
+ elif server_mode and not share_public:
+ iface.launch(share=False, server_name="0.0.0.0", server_port=server_port_variable)
+ else:
+ iface.launch(share=False)
+