@woai
🧹 Major code cleanup and internationalization - Remove Russian comments/strings, translate UI to English, clean linter errors, remove hardcoded tokens, delete test files. Ready for production deployment
e775565
import os | |
from google import genai | |
from google.genai import types | |
from google.api_core import retry | |
from dotenv import load_dotenv | |
from typing import List, Dict, Any, Optional | |
import traceback | |
# Load environment variables | |
load_dotenv() | |
# Get Gemini API key from environment variables | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
print(f"GEMINI_API_KEY is set: {'Yes' if GEMINI_API_KEY else 'No'}") | |
# Initialize Gemini API | |
client = None | |
if GEMINI_API_KEY: | |
try: | |
client = genai.Client(api_key=GEMINI_API_KEY) | |
print("Gemini client successfully initialized") | |
# Configure retry logic for API errors | |
def is_retriable(e): | |
return (isinstance(e, Exception) and | |
(hasattr(e, 'code') and e.code in {429, 503})) | |
# Apply retry to generate_content method | |
if hasattr(client.aio.models, 'generate_content'): | |
original_method = client.aio.models.generate_content | |
client.aio.models.generate_content = retry.Retry( | |
predicate=is_retriable, | |
initial=1.0, # Initial delay in seconds | |
maximum=60.0, # Maximum delay in seconds | |
multiplier=2.0, # Backoff multiplier | |
deadline=300.0 # Total timeout in seconds | |
)(original_method) | |
print("Retry logic configured for Gemini API") | |
except Exception as e: | |
print(f"Error initializing Gemini client: {str(e)}") | |
traceback.print_exc() | |
else: | |
print("WARNING: Gemini API key not configured. LLM timecode generation functions will be unavailable.") | |
# Default Gemini model | |
DEFAULT_MODEL = "gemini-2.0-flash-001" | |
# Alternative models if main one doesn't work | |
ALTERNATIVE_MODELS = ["gemini-1.5-flash-001"] | |
def format_transcript_for_prompt(transcript_entries: List[Dict[str, Any]], video_duration_seconds: int = None) -> str: | |
"""Formats transcript for passing to prompt.""" | |
formatted_transcript = "" | |
# Determine maximum time in transcript if video duration is not provided | |
if video_duration_seconds is None: | |
if transcript_entries: | |
last_entry = transcript_entries[-1] | |
# Handle both dict format and FetchedTranscriptSnippet objects | |
if hasattr(last_entry, 'start'): # FetchedTranscriptSnippet object | |
max_time = last_entry.start + last_entry.duration | |
elif isinstance(last_entry, dict): # Dict format | |
max_time = last_entry.get("start", 0) + last_entry.get("duration", 0) | |
else: | |
max_time = 0 | |
video_duration_seconds = int(max_time) + 10 # Add small buffer | |
# For very long videos (>60 min), sample transcript to ensure full coverage | |
if video_duration_seconds and video_duration_seconds > 3600: # More than 60 minutes | |
# Sample every 3rd entry to reduce size but maintain coverage | |
sampled_entries = transcript_entries[::3] | |
print(f"Sampled transcript: {len(sampled_entries)} entries from {len(transcript_entries)} total") | |
elif video_duration_seconds and video_duration_seconds > 1800: # More than 30 minutes | |
# Sample every 2nd entry | |
sampled_entries = transcript_entries[::2] | |
print(f"Sampled transcript: {len(sampled_entries)} entries from {len(transcript_entries)} total") | |
else: | |
sampled_entries = transcript_entries | |
for entry in sampled_entries: | |
# Handle both dict format and FetchedTranscriptSnippet objects | |
if hasattr(entry, 'start'): # FetchedTranscriptSnippet object | |
start_time = entry.start | |
text = entry.text | |
elif isinstance(entry, dict): # Dict format | |
start_time = entry.get("start", 0) | |
text = entry.get("text", "") | |
else: | |
continue # Skip invalid entries | |
# Check that time doesn't exceed total video duration | |
if video_duration_seconds and start_time > video_duration_seconds: | |
continue | |
# Format time in hours:minutes:seconds format | |
time_str = format_time_hms(start_time) | |
formatted_transcript += f"[{time_str}] {text}\n" | |
return formatted_transcript | |
def format_time_hms(seconds: float) -> str: | |
""" | |
Formats time in seconds to hours:minutes:seconds format. | |
For videos shorter than an hour, uses minutes:seconds format. | |
""" | |
hours = int(seconds // 3600) | |
minutes = int((seconds % 3600) // 60) | |
secs = int(seconds % 60) | |
if hours > 0: | |
return f"{hours:02d}:{minutes:02d}:{secs:02d}" | |
else: | |
return f"{minutes:02d}:{secs:02d}" | |
def get_timecode_prompt(video_title: str, transcript: str, format_type: str = "youtube", language: str = None, video_duration_minutes: int = None, timecode_count: str = None, interval_text: str = None) -> str: | |
"""Creates prompt for generating timecodes based on transcript.""" | |
# Determine prompt language based on video language | |
if language and (language.lower().startswith('uk') or language.lower().startswith('ua')): | |
target_language = "Ukrainian" | |
example_description = "Discussion of main principles" | |
elif language and language.lower().startswith('ru'): | |
target_language = "Russian" | |
example_description = "Discussion of main principles" | |
else: | |
target_language = "the same language as the video transcript" | |
example_description = "Discussion of main principles" | |
prompt = f""" | |
You are a YouTube assistant. Analyze the FULL TRANSCRIPT below and identify all major topic shifts or sections. | |
Your task: | |
- Generate timestamps that cover the ENTIRE {video_duration_minutes}-minute video | |
- Each timestamp must be paired with a precise time from the transcript | |
- Timestamps must reflect the actual content flow throughout the video | |
Format requirements: | |
- Plain text output ONLY | |
- Each line format: MM:SS Topic description (or HH:MM:SS for longer videos) | |
- Use {target_language} for descriptions (3-6 words each) | |
- Start with early timestamp (first few minutes) | |
- End with late timestamp (last 10-15 minutes of video) | |
- NO explanations, NO numbering, NO extra text | |
CRITICAL: The transcript below spans {video_duration_minutes} minutes. You MUST create timestamps that span from beginning to end, not just the first portion. | |
Full transcript to analyze: | |
{transcript} | |
Generate {timecode_count} timestamps covering the complete {video_duration_minutes}-minute duration: | |
""" | |
return prompt | |
async def generate_timecodes_with_gemini( | |
transcript_entries: List[Dict[str, Any]], | |
video_title: str, | |
format_type: str = "youtube", | |
model_name: Optional[str] = None, | |
language: Optional[str] = None | |
) -> Dict[str, Any]: | |
""" | |
Generates timecodes using Gemini based on transcript. | |
Args: | |
transcript_entries: List of transcript entries | |
video_title: Video title | |
format_type: Timecode format (youtube, markdown) | |
model_name: Gemini model name (defaults to DEFAULT_MODEL) | |
language: Transcript language (if known) | |
Returns: | |
Dictionary with generation results | |
""" | |
if not GEMINI_API_KEY or client is None: | |
return { | |
"error": "Gemini API key is not configured. Please add GEMINI_API_KEY to .env file" | |
} | |
try: | |
print(f"Starting timecode generation with model: {model_name or DEFAULT_MODEL}") | |
# Determine transcript language if not provided | |
detected_language = language | |
if not detected_language: | |
# Simple heuristic for language detection from first 10 segments | |
# Handle both dict format and FetchedTranscriptSnippet objects | |
text_sample_parts = [] | |
for entry in transcript_entries[:10]: | |
if hasattr(entry, 'text'): # FetchedTranscriptSnippet object | |
text_sample_parts.append(entry.text) | |
elif isinstance(entry, dict): # Dict format | |
text_sample_parts.append(entry.get("text", "")) | |
text_sample = " ".join(text_sample_parts) | |
# Set of Ukrainian letters that differ from Russian alphabet | |
ukrainian_specific = set("ґєії") | |
# If there's at least one specific Ukrainian letter | |
if any(char in ukrainian_specific for char in text_sample.lower()): | |
detected_language = "uk" | |
print("Detected transcript language: Ukrainian") | |
# Check for Cyrillic in general | |
elif any(ord('а') <= ord(char) <= ord('я') for char in text_sample.lower()): | |
detected_language = "ru" | |
print("Detected transcript language: Russian") | |
else: | |
detected_language = "en" | |
print("Detected transcript language: English (or other)") | |
# Determine video duration (in seconds and minutes) | |
video_duration_seconds = 0 | |
max_timecodes = 30 # Default value | |
if transcript_entries: | |
last_entry = transcript_entries[-1] | |
# Handle both dict format and FetchedTranscriptSnippet objects | |
if hasattr(last_entry, 'start'): # FetchedTranscriptSnippet object | |
video_duration_seconds = last_entry.start + last_entry.duration | |
elif isinstance(last_entry, dict): # Dict format | |
video_duration_seconds = last_entry.get("start", 0) + last_entry.get("duration", 0) | |
video_duration_minutes = int(video_duration_seconds / 60) | |
print(f"Determined video duration: {video_duration_minutes} minutes ({video_duration_seconds} seconds)") | |
# Set max_timecodes based on video duration | |
if video_duration_minutes <= 30: | |
max_timecodes = 20 | |
elif video_duration_minutes <= 60: | |
max_timecodes = 35 | |
elif video_duration_minutes <= 120: | |
max_timecodes = 50 | |
else: | |
max_timecodes = 60 | |
else: | |
video_duration_minutes = None | |
# Determine number of timecodes based on video duration | |
if video_duration_minutes: | |
if video_duration_minutes <= 30: | |
timecode_count = "8-12" | |
max_timecodes = 15 | |
elif video_duration_minutes <= 60: | |
timecode_count = "12-18" | |
max_timecodes = 20 | |
elif video_duration_minutes <= 120: | |
timecode_count = "18-25" | |
max_timecodes = 30 | |
else: | |
timecode_count = "25-35" | |
max_timecodes = 40 | |
else: | |
timecode_count = "10-15" | |
max_timecodes = 20 | |
# Format transcript for prompt | |
formatted_transcript = format_transcript_for_prompt(transcript_entries, video_duration_seconds) | |
# Create prompt considering language and duration | |
# Calculate recommended interval for timestamps | |
if video_duration_minutes and timecode_count: | |
target_count = int(timecode_count.split('-')[0]) if timecode_count.split('-')[0].isdigit() else 20 | |
interval_minutes = video_duration_minutes // target_count | |
interval_text = f"approximately every {interval_minutes}-{interval_minutes + 2} minutes" | |
else: | |
interval_text = "evenly throughout the video" | |
prompt = get_timecode_prompt( | |
video_title, | |
formatted_transcript, | |
format_type, | |
detected_language, | |
video_duration_minutes, | |
timecode_count, | |
interval_text | |
) | |
print(f"Prompt prepared, length: {len(prompt)} characters") | |
# List of models to try | |
models_to_try = [model_name or DEFAULT_MODEL] + [m for m in ALTERNATIVE_MODELS if m != (model_name or DEFAULT_MODEL)] | |
last_error = None | |
for current_model in models_to_try: | |
try: | |
# Use async API client for content generation | |
print(f"Making request to Gemini API with model {current_model}...") | |
response = await client.aio.models.generate_content( | |
model=current_model, | |
contents=prompt, | |
config=types.GenerateContentConfig( | |
temperature=0.2, # Low temperature for more deterministic results | |
max_output_tokens=2048, # Enough for timecode list | |
) | |
) | |
print(f"Response received: {type(response)}") | |
# Get response text | |
timecodes_text = response.text | |
print(f"Response text length: {len(timecodes_text)}") | |
# Split into lines and clean | |
timecodes = [line.strip() for line in timecodes_text.split('\n') if line.strip()] | |
# Filter timecodes to remove "video start" and "video end" | |
filtered_timecodes = [] | |
for tc in timecodes: | |
# Extract description (everything after time) | |
parts = tc.split(" ", 1) | |
if len(parts) > 1: | |
time_part, description = parts | |
# Skip timecodes with "video start" or "video end" | |
lowercase_desc = description.lower() | |
if any(phrase in lowercase_desc for phrase in [ | |
"video start", "video end", "start of video", "end of video", | |
"beginning", "conclusion", "intro", "outro" | |
]): | |
continue | |
filtered_timecodes.append(tc) | |
# If too many timecodes, select evenly distributed ones | |
if len(filtered_timecodes) > max_timecodes: | |
print(f"Too many timecodes ({len(filtered_timecodes)}), reducing to {max_timecodes}") | |
# Calculate step for selecting timecodes evenly | |
step = len(filtered_timecodes) / max_timecodes | |
# Select indices for timecodes | |
indices = [int(i * step) for i in range(max_timecodes)] | |
# Ensure we have first and last timecode | |
if indices[-1] != len(filtered_timecodes) - 1: | |
indices[-1] = len(filtered_timecodes) - 1 | |
# Select timecodes by indices | |
final_timecodes = [filtered_timecodes[i] for i in indices] | |
else: | |
final_timecodes = filtered_timecodes | |
print(f"Final timecodes count after processing: {len(final_timecodes)}") | |
return { | |
"timecodes": final_timecodes, | |
"format": format_type, | |
"model": current_model, | |
"video_title": video_title, | |
"detected_language": detected_language, | |
"video_duration_minutes": video_duration_minutes | |
} | |
except Exception as api_error: | |
print(f"Error with model {current_model}: {str(api_error)}") | |
traceback.print_exc() | |
last_error = api_error | |
continue | |
# If all models failed | |
return { | |
"error": f"Failed to execute request with any model. Last error: {str(last_error)}" | |
} | |
except Exception as e: | |
print(f"General error: {str(e)}") | |
traceback.print_exc() | |
return { | |
"error": f"Error generating timecodes with Gemini: {str(e)}" | |
} |