YouTube_Creator_MetaData / gemini_helper.py
@woai
🧹 Major code cleanup and internationalization - Remove Russian comments/strings, translate UI to English, clean linter errors, remove hardcoded tokens, delete test files. Ready for production deployment
e775565
import os
from google import genai
from google.genai import types
from google.api_core import retry
from dotenv import load_dotenv
from typing import List, Dict, Any, Optional
import traceback
# Load environment variables
load_dotenv()
# Get Gemini API key from environment variables
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
print(f"GEMINI_API_KEY is set: {'Yes' if GEMINI_API_KEY else 'No'}")
# Initialize Gemini API
client = None
if GEMINI_API_KEY:
try:
client = genai.Client(api_key=GEMINI_API_KEY)
print("Gemini client successfully initialized")
# Configure retry logic for API errors
def is_retriable(e):
return (isinstance(e, Exception) and
(hasattr(e, 'code') and e.code in {429, 503}))
# Apply retry to generate_content method
if hasattr(client.aio.models, 'generate_content'):
original_method = client.aio.models.generate_content
client.aio.models.generate_content = retry.Retry(
predicate=is_retriable,
initial=1.0, # Initial delay in seconds
maximum=60.0, # Maximum delay in seconds
multiplier=2.0, # Backoff multiplier
deadline=300.0 # Total timeout in seconds
)(original_method)
print("Retry logic configured for Gemini API")
except Exception as e:
print(f"Error initializing Gemini client: {str(e)}")
traceback.print_exc()
else:
print("WARNING: Gemini API key not configured. LLM timecode generation functions will be unavailable.")
# Default Gemini model
DEFAULT_MODEL = "gemini-2.0-flash-001"
# Alternative models if main one doesn't work
ALTERNATIVE_MODELS = ["gemini-1.5-flash-001"]
def format_transcript_for_prompt(transcript_entries: List[Dict[str, Any]], video_duration_seconds: int = None) -> str:
"""Formats transcript for passing to prompt."""
formatted_transcript = ""
# Determine maximum time in transcript if video duration is not provided
if video_duration_seconds is None:
if transcript_entries:
last_entry = transcript_entries[-1]
# Handle both dict format and FetchedTranscriptSnippet objects
if hasattr(last_entry, 'start'): # FetchedTranscriptSnippet object
max_time = last_entry.start + last_entry.duration
elif isinstance(last_entry, dict): # Dict format
max_time = last_entry.get("start", 0) + last_entry.get("duration", 0)
else:
max_time = 0
video_duration_seconds = int(max_time) + 10 # Add small buffer
# For very long videos (>60 min), sample transcript to ensure full coverage
if video_duration_seconds and video_duration_seconds > 3600: # More than 60 minutes
# Sample every 3rd entry to reduce size but maintain coverage
sampled_entries = transcript_entries[::3]
print(f"Sampled transcript: {len(sampled_entries)} entries from {len(transcript_entries)} total")
elif video_duration_seconds and video_duration_seconds > 1800: # More than 30 minutes
# Sample every 2nd entry
sampled_entries = transcript_entries[::2]
print(f"Sampled transcript: {len(sampled_entries)} entries from {len(transcript_entries)} total")
else:
sampled_entries = transcript_entries
for entry in sampled_entries:
# Handle both dict format and FetchedTranscriptSnippet objects
if hasattr(entry, 'start'): # FetchedTranscriptSnippet object
start_time = entry.start
text = entry.text
elif isinstance(entry, dict): # Dict format
start_time = entry.get("start", 0)
text = entry.get("text", "")
else:
continue # Skip invalid entries
# Check that time doesn't exceed total video duration
if video_duration_seconds and start_time > video_duration_seconds:
continue
# Format time in hours:minutes:seconds format
time_str = format_time_hms(start_time)
formatted_transcript += f"[{time_str}] {text}\n"
return formatted_transcript
def format_time_hms(seconds: float) -> str:
"""
Formats time in seconds to hours:minutes:seconds format.
For videos shorter than an hour, uses minutes:seconds format.
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
else:
return f"{minutes:02d}:{secs:02d}"
def get_timecode_prompt(video_title: str, transcript: str, format_type: str = "youtube", language: str = None, video_duration_minutes: int = None, timecode_count: str = None, interval_text: str = None) -> str:
"""Creates prompt for generating timecodes based on transcript."""
# Determine prompt language based on video language
if language and (language.lower().startswith('uk') or language.lower().startswith('ua')):
target_language = "Ukrainian"
example_description = "Discussion of main principles"
elif language and language.lower().startswith('ru'):
target_language = "Russian"
example_description = "Discussion of main principles"
else:
target_language = "the same language as the video transcript"
example_description = "Discussion of main principles"
prompt = f"""
You are a YouTube assistant. Analyze the FULL TRANSCRIPT below and identify all major topic shifts or sections.
Your task:
- Generate timestamps that cover the ENTIRE {video_duration_minutes}-minute video
- Each timestamp must be paired with a precise time from the transcript
- Timestamps must reflect the actual content flow throughout the video
Format requirements:
- Plain text output ONLY
- Each line format: MM:SS Topic description (or HH:MM:SS for longer videos)
- Use {target_language} for descriptions (3-6 words each)
- Start with early timestamp (first few minutes)
- End with late timestamp (last 10-15 minutes of video)
- NO explanations, NO numbering, NO extra text
CRITICAL: The transcript below spans {video_duration_minutes} minutes. You MUST create timestamps that span from beginning to end, not just the first portion.
Full transcript to analyze:
{transcript}
Generate {timecode_count} timestamps covering the complete {video_duration_minutes}-minute duration:
"""
return prompt
async def generate_timecodes_with_gemini(
transcript_entries: List[Dict[str, Any]],
video_title: str,
format_type: str = "youtube",
model_name: Optional[str] = None,
language: Optional[str] = None
) -> Dict[str, Any]:
"""
Generates timecodes using Gemini based on transcript.
Args:
transcript_entries: List of transcript entries
video_title: Video title
format_type: Timecode format (youtube, markdown)
model_name: Gemini model name (defaults to DEFAULT_MODEL)
language: Transcript language (if known)
Returns:
Dictionary with generation results
"""
if not GEMINI_API_KEY or client is None:
return {
"error": "Gemini API key is not configured. Please add GEMINI_API_KEY to .env file"
}
try:
print(f"Starting timecode generation with model: {model_name or DEFAULT_MODEL}")
# Determine transcript language if not provided
detected_language = language
if not detected_language:
# Simple heuristic for language detection from first 10 segments
# Handle both dict format and FetchedTranscriptSnippet objects
text_sample_parts = []
for entry in transcript_entries[:10]:
if hasattr(entry, 'text'): # FetchedTranscriptSnippet object
text_sample_parts.append(entry.text)
elif isinstance(entry, dict): # Dict format
text_sample_parts.append(entry.get("text", ""))
text_sample = " ".join(text_sample_parts)
# Set of Ukrainian letters that differ from Russian alphabet
ukrainian_specific = set("ґєії")
# If there's at least one specific Ukrainian letter
if any(char in ukrainian_specific for char in text_sample.lower()):
detected_language = "uk"
print("Detected transcript language: Ukrainian")
# Check for Cyrillic in general
elif any(ord('а') <= ord(char) <= ord('я') for char in text_sample.lower()):
detected_language = "ru"
print("Detected transcript language: Russian")
else:
detected_language = "en"
print("Detected transcript language: English (or other)")
# Determine video duration (in seconds and minutes)
video_duration_seconds = 0
max_timecodes = 30 # Default value
if transcript_entries:
last_entry = transcript_entries[-1]
# Handle both dict format and FetchedTranscriptSnippet objects
if hasattr(last_entry, 'start'): # FetchedTranscriptSnippet object
video_duration_seconds = last_entry.start + last_entry.duration
elif isinstance(last_entry, dict): # Dict format
video_duration_seconds = last_entry.get("start", 0) + last_entry.get("duration", 0)
video_duration_minutes = int(video_duration_seconds / 60)
print(f"Determined video duration: {video_duration_minutes} minutes ({video_duration_seconds} seconds)")
# Set max_timecodes based on video duration
if video_duration_minutes <= 30:
max_timecodes = 20
elif video_duration_minutes <= 60:
max_timecodes = 35
elif video_duration_minutes <= 120:
max_timecodes = 50
else:
max_timecodes = 60
else:
video_duration_minutes = None
# Determine number of timecodes based on video duration
if video_duration_minutes:
if video_duration_minutes <= 30:
timecode_count = "8-12"
max_timecodes = 15
elif video_duration_minutes <= 60:
timecode_count = "12-18"
max_timecodes = 20
elif video_duration_minutes <= 120:
timecode_count = "18-25"
max_timecodes = 30
else:
timecode_count = "25-35"
max_timecodes = 40
else:
timecode_count = "10-15"
max_timecodes = 20
# Format transcript for prompt
formatted_transcript = format_transcript_for_prompt(transcript_entries, video_duration_seconds)
# Create prompt considering language and duration
# Calculate recommended interval for timestamps
if video_duration_minutes and timecode_count:
target_count = int(timecode_count.split('-')[0]) if timecode_count.split('-')[0].isdigit() else 20
interval_minutes = video_duration_minutes // target_count
interval_text = f"approximately every {interval_minutes}-{interval_minutes + 2} minutes"
else:
interval_text = "evenly throughout the video"
prompt = get_timecode_prompt(
video_title,
formatted_transcript,
format_type,
detected_language,
video_duration_minutes,
timecode_count,
interval_text
)
print(f"Prompt prepared, length: {len(prompt)} characters")
# List of models to try
models_to_try = [model_name or DEFAULT_MODEL] + [m for m in ALTERNATIVE_MODELS if m != (model_name or DEFAULT_MODEL)]
last_error = None
for current_model in models_to_try:
try:
# Use async API client for content generation
print(f"Making request to Gemini API with model {current_model}...")
response = await client.aio.models.generate_content(
model=current_model,
contents=prompt,
config=types.GenerateContentConfig(
temperature=0.2, # Low temperature for more deterministic results
max_output_tokens=2048, # Enough for timecode list
)
)
print(f"Response received: {type(response)}")
# Get response text
timecodes_text = response.text
print(f"Response text length: {len(timecodes_text)}")
# Split into lines and clean
timecodes = [line.strip() for line in timecodes_text.split('\n') if line.strip()]
# Filter timecodes to remove "video start" and "video end"
filtered_timecodes = []
for tc in timecodes:
# Extract description (everything after time)
parts = tc.split(" ", 1)
if len(parts) > 1:
time_part, description = parts
# Skip timecodes with "video start" or "video end"
lowercase_desc = description.lower()
if any(phrase in lowercase_desc for phrase in [
"video start", "video end", "start of video", "end of video",
"beginning", "conclusion", "intro", "outro"
]):
continue
filtered_timecodes.append(tc)
# If too many timecodes, select evenly distributed ones
if len(filtered_timecodes) > max_timecodes:
print(f"Too many timecodes ({len(filtered_timecodes)}), reducing to {max_timecodes}")
# Calculate step for selecting timecodes evenly
step = len(filtered_timecodes) / max_timecodes
# Select indices for timecodes
indices = [int(i * step) for i in range(max_timecodes)]
# Ensure we have first and last timecode
if indices[-1] != len(filtered_timecodes) - 1:
indices[-1] = len(filtered_timecodes) - 1
# Select timecodes by indices
final_timecodes = [filtered_timecodes[i] for i in indices]
else:
final_timecodes = filtered_timecodes
print(f"Final timecodes count after processing: {len(final_timecodes)}")
return {
"timecodes": final_timecodes,
"format": format_type,
"model": current_model,
"video_title": video_title,
"detected_language": detected_language,
"video_duration_minutes": video_duration_minutes
}
except Exception as api_error:
print(f"Error with model {current_model}: {str(api_error)}")
traceback.print_exc()
last_error = api_error
continue
# If all models failed
return {
"error": f"Failed to execute request with any model. Last error: {str(last_error)}"
}
except Exception as e:
print(f"General error: {str(e)}")
traceback.print_exc()
return {
"error": f"Error generating timecodes with Gemini: {str(e)}"
}