userIdc2024 commited on
Commit
e09970c
·
verified ·
1 Parent(s): 2109292

Update services/video_analyzer.py

Browse files
Files changed (1) hide show
  1. services/video_analyzer.py +61 -136
services/video_analyzer.py CHANGED
@@ -1,151 +1,76 @@
 
1
  import os
2
  import tempfile
 
3
  import pandas as pd
4
  import streamlit as st
 
 
 
5
 
6
- from services.video_analyzer import analyze_video_only
7
- from components.render_analysis import render_analyzer_results
8
- from utils.video import get_video_thumbnail_base64
9
- from utils.dataframe import analysis_to_csv
10
- from database import insert_video_analysis, get_all_video_analyses
11
-
12
- def analyzer_page():
13
- st.sidebar.header("Analyzer")
14
- selected_tab = st.sidebar.radio("Select Mode", ["Video Analyser", "History"], index=0)
15
-
16
- if selected_tab == "Video Analyser":
17
- st.subheader(" Video Analyser")
18
- uploaded_video = st.file_uploader(
19
- "Upload Video",
20
- type=['mp4', 'mov', 'avi', 'mkv'],
21
- help="Upload a video for analysis"
22
- )
23
- analyse_button = st.button("Run Analysis", use_container_width=True)
24
-
25
- if uploaded_video and analyse_button:
26
- # Save to a temp file
27
- suffix = os.path.splitext(uploaded_video.name)[1]
28
- with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
29
- tmp.write(uploaded_video.read())
30
- video_path = tmp.name
31
-
32
- with st.spinner("Analyzing video..."):
33
- result = analyze_video_only(video_path)
34
-
35
-
36
- st.session_state["video_name"] = uploaded_video.name
37
- st.session_state["video_path"] = video_path
38
- st.session_state["thumbnail"] = get_video_thumbnail_base64(video_path)
39
- st.session_state["analysis"] = result
40
-
41
- analysis = st.session_state.get("analysis")
42
-
43
- if analysis is None:
44
- return
45
-
46
-
47
- if not analysis:
48
- st.error("Analysis returned no data.")
49
- return
50
 
51
- if "__error__" in analysis:
52
- st.error(f"Analysis failed: {analysis['__error__']}")
53
- return
54
-
55
-
56
- render_analyzer_results(analysis)
57
-
58
- col1, col2 = st.columns(2)
59
 
60
- # CSV export
61
- with col1:
62
- frames = []
63
- if "storyboard" in analysis:
64
- df_storyboard = pd.DataFrame(analysis["storyboard"])
65
- df_storyboard["section"] = "Storyboard"
66
- frames.append(df_storyboard)
67
 
68
- if "script" in analysis:
69
- df_script = pd.DataFrame(analysis["script"])
70
- df_script["section"] = "Script"
71
- frames.append(df_script)
72
 
73
- if "video_analysis" in analysis and "video_metrics" in analysis["video_analysis"]:
74
- df_metrics = pd.DataFrame(analysis["video_analysis"]["video_metrics"])
75
- df_metrics["section"] = "Metrics"
76
- frames.append(df_metrics)
 
 
77
 
78
- if "timestamp_improvements" in analysis:
79
- df_improvements = pd.DataFrame(analysis["timestamp_improvements"])
80
- df_improvements["section"] = "Improvements"
81
- frames.append(df_improvements)
82
 
83
- if frames:
84
- csv_content = pd.concat(frames, ignore_index=True).to_csv(index=False)
85
- st.download_button(
86
- "Download CSV",
87
- data=csv_content,
88
- file_name=f"{st.session_state.get('video_name', 'analysis')}.csv",
89
- mime="text/csv",
90
- use_container_width=True,
91
- )
92
- else:
93
- st.info("No tabular data available for CSV export.")
94
 
95
- # Save to DB
96
- with col2:
97
- if st.button("Save to DB", use_container_width=True):
98
- try:
99
- insert_video_analysis(
100
- video_name=st.session_state.get("video_name", "unknown"),
101
- response=analysis,
102
- thumbnail=st.session_state.get("thumbnail", "")
103
- )
104
- st.success("Analysis saved to database ")
105
- except Exception as e:
106
- st.error(f"Failed to save analysis: {e}")
107
 
108
- else:
109
- # History tab
110
- st.subheader(" History")
 
 
 
 
 
 
 
 
 
 
 
 
111
  try:
112
- history_items = get_all_video_analyses(limit=20)
113
- except Exception as e:
114
- st.error(f"Failed to load history: {e}")
115
- return
116
-
117
- if not history_items:
118
- st.info("No saved history available.")
119
- return
120
-
121
- video_titles = [
122
- f"{item['video_name']} ({item['created_at'].strftime('%Y-%m-%d %H:%M')})"
123
- for item in history_items
124
- ]
125
- selected = st.sidebar.radio("History Items", video_titles, index=0)
126
- idx = video_titles.index(selected)
127
- selected_data = history_items[idx]
128
-
129
- st.subheader(f"Analysis for: {selected_data['video_name']}")
130
- if selected_data.get("thumbnail"):
131
- st.image("data:image/jpeg;base64," + selected_data["thumbnail"], width=150)
132
-
133
- json_response = selected_data.get("response")
134
- if not json_response:
135
- st.info("No analysis payload found for this item.")
136
- return
137
-
138
- tabs = st.tabs(["Video Analysis"])
139
- with tabs[0]:
140
- render_analyzer_results(json_response)
141
  try:
142
- csv_data = analysis_to_csv(json_response)
143
- st.download_button(
144
- "Download CSV",
145
- data=csv_data,
146
- file_name=f"{selected_data['video_name']}_analysis.csv",
147
- mime="text/csv",
148
- use_container_width=True,
149
- )
150
- except Exception as e:
151
- st.error(f"CSV export failed: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
  import os
3
  import tempfile
4
+ import logging
5
  import pandas as pd
6
  import streamlit as st
7
+ from typing import Dict, Any, List
8
+ from google import genai
9
+ import time
10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
 
12
+ from config import configure_gemini
13
+ from prompt.analyser_prompt import analyser_prompt
14
+ from schema import AdAnalysis
15
+ from utils.video import get_video_thumbnail_base64
16
+ from dotenv import load_dotenv
 
 
 
17
 
18
+ logger = logging.getLogger(__name__)
 
 
 
 
 
 
19
 
20
+ GEMINI_API_KEY = os.getenv("GEMINI_KEY")
 
 
 
21
 
22
+ logging.basicConfig(
23
+ level=logging.INFO,
24
+ format="%(asctime)s [%(levelname)s] %(message)s",
25
+ handlers=[logging.StreamHandler()]
26
+ )
27
+ logger = logging.getLogger(__name__)
28
 
 
 
 
 
29
 
30
+ def configure_gemini():
31
+ return genai.Client(api_key=GEMINI_API_KEY)
 
 
 
 
 
 
 
 
 
32
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
+ def analyze_video_only(video_path: str) -> Dict[str, Any]:
35
+ client = configure_gemini()
36
+ try:
37
+ video_file = client.files.upload(file=video_path)
38
+ while getattr(video_file.state, "name", "") == "PROCESSING":
39
+ time.sleep(2)
40
+ video_file = client.files.get(name=video_file.name)
41
+ if getattr(video_file.state, "name", "") == "FAILED":
42
+ return {}
43
+ resp = client.models.generate_content(
44
+ model="gemini-2.0-flash",
45
+ contents=[analyser_prompt, video_file],
46
+ config={"response_mime_type": "application/json"}
47
+ )
48
+ raw = getattr(resp, "text", "") or ""
49
  try:
50
+ model_obj = AdAnalysis.model_validate_json(raw)
51
+ return model_obj.model_dump()
52
+ except Exception:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
  try:
54
+ return json.loads(raw)
55
+ except Exception:
56
+ return {}
57
+ except Exception:
58
+ return {}
59
+
60
+
61
+ def analyze_multiple_videos(video_files: List[st.runtime.uploaded_file_manager.UploadedFile]) -> List[Dict[str, Any]]:
62
+ results = []
63
+ for file in video_files:
64
+ with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.name)[1]) as tmp:
65
+ tmp.write(file.read())
66
+ video_path = tmp.name
67
+
68
+ analysis = analyze_video_only(video_path)
69
+ thumbnail_b64 = get_video_thumbnail_base64(video_path)
70
+
71
+ results.append({
72
+ "video_name": file.name,
73
+ "analysis": analysis,
74
+ "thumbnail": thumbnail_b64
75
+ })
76
+ return results