Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from google import genai | |
| import tempfile | |
| import os | |
| import time | |
| import json | |
| from typing import Optional | |
| import pandas as pd | |
| import logging | |
| from database import insert_analysis_result | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Backend API Key Configuration | |
| GEMINI_API_KEY = os.getenv("GEMINI_KEY") | |
| # Page configuration | |
| st.set_page_config( | |
| page_title="Video Analyser and Script Generator", | |
| page_icon="🎥", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def configure_gemini(): | |
| """Configure Gemini API with backend key""" | |
| return genai.Client(api_key=GEMINI_API_KEY) | |
| # Enhanced system prompt with timestamp-based improvements | |
| SYSTEM_PROMPT = f"""{os.getenv("SYS_PROMPT")}""" | |
| def analyze_video_and_generate_script( | |
| video_bytes, | |
| video_name, | |
| offer_details: str = "", | |
| target_audience: str = "", | |
| specific_hooks: str = "", | |
| additional_context: str = "" | |
| ): | |
| """ | |
| Analyze video and generate direct response script variations | |
| """ | |
| try: | |
| # Save uploaded video to temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(video_name)[1]) as tmp_file: | |
| tmp_file.write(video_bytes) | |
| tmp_file_path = tmp_file.name | |
| # Configure Gemini | |
| client = configure_gemini() | |
| # Show upload progress | |
| upload_progress = st.progress(0) | |
| upload_status = st.empty() | |
| upload_status.text("Uploading video to Google AI...") | |
| upload_progress.progress(20) | |
| # Upload video to Gemini | |
| video_file_obj = client.files.upload(file=tmp_file_path) | |
| upload_progress.progress(40) | |
| upload_status.text("Processing video...") | |
| while video_file_obj.state.name == "PROCESSING": | |
| time.sleep(2) | |
| video_file_obj = client.files.get(name=video_file_obj.name) | |
| upload_progress.progress(60) | |
| if video_file_obj.state.name == "FAILED": | |
| upload_status.error("Google AI file processing failed. Please try another video.") | |
| return None | |
| upload_progress.progress(80) | |
| upload_status.text("Generating script variations...") | |
| # Build the enhanced user prompt | |
| user_prompt = f"""Analyze this reference video and generate 3 high-converting direct response video script variations with detailed timestamp-based improvements. | |
| IMPORTANT CONTEXT TO FOLLOW WHEN CREATING OUTPUT: | |
| - Offer Details: {offer_details} | |
| - Target Audience: {target_audience} | |
| - Specific Hooks: {specific_hooks} | |
| ADDITIONAL CONTEXT (MANDATORY TO FOLLOW): | |
| {additional_context} | |
| You must reflect this additional context in: | |
| - The script tone, CTA, visuals | |
| - Compliance or branding constraints | |
| - Any assumptions about audience or product | |
| Failure to include this will be considered incomplete. | |
| Please provide a comprehensive analysis including: | |
| 1. DETAILED VIDEO ANALYSIS with timestamp-based metrics: | |
| - Break down the video into 5-10 second segments | |
| - Rate each segment's effectiveness (1-10 scale) | |
| - Identify specific elements (hook, transition, proof, CTA, etc.) | |
| 2. TIMESTAMP-BASED IMPROVEMENTS: | |
| - Specific recommendations for each time segment | |
| - Priority level for each improvement | |
| - Expected impact of implementing changes | |
| 3. SCRIPT VARIATIONS: | |
| - Create 2-3 complete script variations | |
| - Each with timestamp-by-timestamp breakdown | |
| - Different psychological triggers and approaches | |
| IMPORTANT: Return only valid JSON in the exact format specified in the system prompt. Analyze the video second-by-second for maximum detail.""" | |
| # Generate response | |
| response = client.models.generate_content( | |
| model="gemini-2.0-flash", | |
| contents=[video_file_obj, user_prompt + "\n\n" + SYSTEM_PROMPT] | |
| ) | |
| upload_progress.progress(100) | |
| upload_status.success("Analysis complete!") | |
| # Clean up temporary file | |
| os.unlink(tmp_file_path) | |
| # Parse JSON response | |
| try: | |
| response_text = response.text.strip() | |
| if response_text.startswith('```json'): | |
| response_text = response_text[7:-3] | |
| elif response_text.startswith('```'): | |
| response_text = response_text[3:-3] | |
| json_response = json.loads(response_text) | |
| return json_response | |
| except json.JSONDecodeError as e: | |
| st.error(f"Error parsing AI response: {str(e)}") | |
| return None | |
| except Exception as e: | |
| st.error(f"Error processing video: {str(e)}") | |
| return None | |
| def display_script_variations(json_data): | |
| """Display script variations in formatted tables""" | |
| if not json_data or "script_variations" not in json_data: | |
| st.error("No script variations found in the response") | |
| return | |
| for i, variation in enumerate(json_data["script_variations"], 1): | |
| variation_name = variation.get("variation_name", f"Variation {i}") | |
| st.markdown(f"### Variation {i}: {variation_name}") | |
| #Convert script table to DataFrame for better display | |
| script_data = variation.get("script_table") | |
| if not script_data: | |
| st.warning(f"No script data for {variation_name}") | |
| continue | |
| df = pd.DataFrame(script_data) | |
| # Rename columns for better display | |
| df = df.rename(columns={ | |
| 'timestamp': 'Timestamp', | |
| 'script_voiceover': 'Script / Voiceover', | |
| 'visual_direction': 'Visual Direction', | |
| 'psychological_trigger': 'Psychological Trigger', | |
| 'cta_action': 'CTA / Action' | |
| }) | |
| st.table(df) | |
| st.markdown("---") | |
| def display_video_analysis(json_data): | |
| """Display video analysis in tabular format""" | |
| if not json_data or "video_analysis" not in json_data: | |
| st.error("No video analysis found in the response") | |
| return | |
| analysis = json_data["video_analysis"] | |
| #Display general analysis | |
| video_metrics = [] | |
| if isinstance(analysis, dict): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Effectiveness Factors") | |
| st.write(analysis.get('effectiveness_factors', 'N/A')) | |
| st.subheader("Target Audience") | |
| st.write(analysis.get('target_audience', 'N/A')) | |
| with col2: | |
| st.subheader("Psychological Triggers") | |
| st.write(analysis.get('psychological_triggers', 'N/A')) | |
| video_metrics = analysis.get("video_metrics", []) | |
| else: | |
| st.warning("Unexpected format in video_analysis. Skipping metadata.") | |
| if isinstance(analysis, list): | |
| video_metrics = analysis | |
| if video_metrics: | |
| metrics_df = pd.DataFrame(video_metrics) | |
| # Rename columns for better display | |
| column_mapping = { | |
| 'timestamp': 'Timestamp', | |
| 'element': 'Element', | |
| 'current_approach': 'Current Approach', | |
| 'effectiveness_score': 'Score', | |
| 'notes': 'Analysis Notes' | |
| } | |
| metrics_df = metrics_df.rename(columns=column_mapping) | |
| st.dataframe( | |
| metrics_df, | |
| use_container_width=True, | |
| hide_index=True, | |
| column_config={ | |
| "Timestamp": st.column_config.TextColumn(width="small"), | |
| "Element": st.column_config.TextColumn(width="medium"), | |
| "Current Approach": st.column_config.TextColumn(width="large"), | |
| "Score": st.column_config.TextColumn(width="small"), | |
| "Analysis Notes": st.column_config.TextColumn(width="large") | |
| } | |
| ) | |
| else: | |
| st.warning("No detailed video metrics available") | |
| def display_timestamp_improvements(json_data): | |
| """Display timestamp-based improvements in tabular format""" | |
| improvements = json_data.get("timestamp_improvements") | |
| if improvements is None: | |
| st.error("No timestamp improvements found in the response") | |
| return | |
| if not improvements: | |
| st.warning("No timestamp improvements available") | |
| return | |
| st.subheader("Timestamp-by-Timestamp Improvement Recommendations") | |
| improvements = json_data["timestamp_improvements"] | |
| if improvements: | |
| improvements_df = pd.DataFrame(improvements) | |
| # Rename columns for better display | |
| column_mapping = { | |
| 'timestamp': 'Timestamp', | |
| 'current_element': 'Current Element', | |
| 'improvement_type': 'Improvement Type', | |
| 'recommended_change': 'Recommended Change', | |
| 'expected_impact': 'Expected Impact', | |
| 'priority': 'Priority' | |
| } | |
| improvements_df = improvements_df.rename(columns=column_mapping) | |
| # Color code priority | |
| def color_priority(val): | |
| if val == 'High': | |
| return 'background-color: #ffcccb' | |
| elif val == 'Medium': | |
| return 'background-color: #ffffcc' | |
| elif val == 'Low': | |
| return 'background-color: #ccffcc' | |
| return '' | |
| styled_df = improvements_df.style.applymap(color_priority, subset=['Priority']) | |
| st.dataframe( | |
| styled_df, | |
| use_container_width=True, | |
| hide_index=True, | |
| column_config={ | |
| "Timestamp": st.column_config.TextColumn(width="small"), | |
| "Current Element": st.column_config.TextColumn(width="medium"), | |
| "Improvement Type": st.column_config.TextColumn(width="medium"), | |
| "Recommended Change": st.column_config.TextColumn(width="large"), | |
| "Expected Impact": st.column_config.TextColumn(width="medium"), | |
| "Priority": st.column_config.TextColumn(width="small") | |
| } | |
| ) | |
| else: | |
| st.warning("No timestamp improvements available") | |
| def create_csv_download(json_data): | |
| """Create CSV content with all scripts combined""" | |
| all_scripts_data = [] | |
| # Combine all script variations into one dataset | |
| for i, variation in enumerate(json_data.get("script_variations", []), 1): | |
| variation_name = variation.get("variation_name", f"Variation {i}") | |
| for row in variation.get("script_table", []): | |
| script_row = { | |
| 'Variation': variation_name, | |
| 'Timestamp': row.get('timestamp', ''), | |
| 'Script_Voiceover': row.get('script_voiceover', ''), | |
| 'Visual_Direction': row.get('visual_direction', ''), | |
| 'Psychological_Trigger': row.get('psychological_trigger', ''), | |
| 'CTA_Action': row.get('cta_action', '') | |
| } | |
| all_scripts_data.append(script_row) | |
| # Convert to DataFrame and then to CSV | |
| if all_scripts_data: | |
| df = pd.DataFrame(all_scripts_data) | |
| return df.to_csv(index=False) | |
| else: | |
| return "No script data available" | |
| def check_token(user_token): | |
| ACCESS_TOKEN = os.getenv("ACCESS_TOKEN") | |
| if not ACCESS_TOKEN: | |
| logger.critical("ACCESS_TOKEN not set in environment.") | |
| return False, "Server error: Access token not configured." | |
| if user_token == ACCESS_TOKEN: | |
| logger.info("Access token validated successfully.") | |
| return True, "" | |
| logger.warning("Invalid access token attempt.") | |
| return False, "Invalid token." | |
| def main(): | |
| """Main application function""" | |
| st.set_page_config( | |
| page_title="Video Analyser and Script Generator", | |
| page_icon="🎥", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| st.title("Video Analyser and Script Generator") | |
| st.divider() | |
| if "authenticated" not in st.session_state: | |
| st.session_state["authenticated"] = False | |
| if not st.session_state["authenticated"]: | |
| st.markdown("## Access Required") | |
| token_input = st.text_input("Enter Access Token", type="password") | |
| if st.button("Unlock App"): | |
| ok, error_msg = check_token(token_input) | |
| if ok: | |
| st.session_state["authenticated"] = True | |
| st.rerun() | |
| else: | |
| st.error(error_msg) | |
| return | |
| # Sidebar navigation | |
| if st.session_state["authenticated"]: | |
| selected_tab = st.sidebar.radio("Select Mode", ["Script Generator", "History"]) | |
| # ========== SCRIPT GENERATOR ========== | |
| if selected_tab == "Script Generator": | |
| with st.expander("How to Use This Tool", expanded=False): | |
| st.markdown(""" | |
| ### Upload Guidelines: | |
| - **Best videos to analyze**: Already profitable Facebook/TikTok ads in your niche | |
| - **Video length**: 30–90 seconds work best for analysis | |
| - **Quality**: Clear audio and visuals help with better analysis | |
| ### Context Tips: | |
| - **Offer details**: Be specific about your main promise and mechanism | |
| - **Audience**: Include demographics, pain points, and desires | |
| - **Hooks**: Mention any specific angles that have worked for you | |
| ### Script Optimization: | |
| - Generated scripts focus on stopping scroll and driving clicks | |
| - Each variation tests different psychological triggers | |
| - Use the timestamp format for precise video production | |
| - Test multiple variations to find your best performer | |
| """) | |
| st.subheader("Input Configuration") | |
| uploaded_video = st.file_uploader( | |
| "Upload Reference Video", | |
| type=['mp4', 'mov', 'avi', 'mkv'], | |
| help="Upload a profitable ad video to analyze and create variations from" | |
| ) | |
| if uploaded_video is None: | |
| st.info("Please upload a reference video to begin analysis.") | |
| st.subheader("Additional Context (Optional)") | |
| offer_details = st.text_area( | |
| "Offer Details", | |
| placeholder="e.g., Solar installation with $0 down payment...", | |
| height=80, | |
| help="Describe the product/service and main promise" | |
| ) | |
| target_audience = st.text_area( | |
| "Target Audience", | |
| placeholder="e.g., 40+ homeowners with high electricity bills...", | |
| height=80, | |
| help="Describe the ideal customer demographics and pain points" | |
| ) | |
| specific_hooks = st.text_area( | |
| "Specific Hooks to Test", | |
| placeholder="e.g., Government rebate angle, celebrity endorsement...", | |
| height=80, | |
| help="Any specific angles or hooks you want to incorporate" | |
| ) | |
| additional_context = st.text_area( | |
| "Additional Context", | |
| placeholder="Any other relevant information...", | |
| height=100, | |
| help="Compliance requirements, brand guidelines, or other notes" | |
| ) | |
| generate_button = st.button("Generate Script Variations", use_container_width=True) | |
| if "analysis_results" in st.session_state and st.session_state["analysis_results"]: | |
| if st.button("Clear Results", use_container_width=True): | |
| del st.session_state["analysis_results"] | |
| st.rerun() | |
| # Generate & show results | |
| if uploaded_video and generate_button: | |
| with st.spinner("Analyzing video and generating scripts..."): | |
| video_bytes = uploaded_video.read() | |
| uploaded_video.seek(0) | |
| json_response = analyze_video_and_generate_script( | |
| video_bytes, | |
| uploaded_video.name, | |
| offer_details, | |
| target_audience, | |
| specific_hooks, | |
| additional_context | |
| ) | |
| if json_response: | |
| insert_analysis_result( | |
| video_name=uploaded_video.name, | |
| offer_details=offer_details, | |
| target_audience=target_audience, | |
| specific_hook=specific_hooks, | |
| additional_context=additional_context, | |
| response=json_response | |
| ) | |
| st.session_state["analysis_results"] = json_response | |
| if "analysis_results" in st.session_state: | |
| json_response = st.session_state["analysis_results"] | |
| tab1, tab2, tab3 = st.tabs(["Script Variations", "Video Analysis", "Improvement Recommendations"]) | |
| with tab1: | |
| display_script_variations(json_response) | |
| csv_content = create_csv_download(json_response) | |
| st.download_button("Download All Scripts (CSV)", data=csv_content, | |
| file_name="video_script_variations.csv", mime="text/csv") | |
| with tab2: | |
| display_video_analysis(json_response) | |
| with tab3: | |
| display_timestamp_improvements(json_response) | |
| # ========== HISTORY ========== | |
| elif selected_tab == "History": | |
| from database import get_all_results | |
| history_items = get_all_results(limit=20) | |
| if history_items: | |
| video_titles = [ | |
| f"{item['video_name']} ({item['created_at'].strftime('%Y-%m-%d %H:%M')})" | |
| for item in history_items | |
| ] | |
| selected = st.sidebar.radio("History Items", video_titles, index=0) | |
| selected_index = video_titles.index(selected) | |
| selected_data = history_items[selected_index] | |
| st.subheader(f"Analysis for: {selected_data['video_name']}") | |
| json_response = selected_data.get("response") | |
| if json_response: | |
| tab1, tab2, tab3 = st.tabs(["Script Variations", "Video Analysis", "Improvement Recommendations"]) | |
| with tab1: | |
| display_script_variations(json_response) | |
| with tab2: | |
| display_video_analysis(json_response) | |
| with tab3: | |
| display_timestamp_improvements(json_response) | |
| else: | |
| st.warning("No valid response data for this analysis.") | |
| else: | |
| st.sidebar.info("No saved analyses found.") | |
| st.info("No saved history available.") | |
| if __name__ == "__main__": | |
| try: | |
| logger.info("Launching Streamlit app...") | |
| main() | |
| except Exception as e: | |
| logger.exception("Unhandled error during app launch.") |