|
|
import os |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import List |
|
|
import google.generativeai as genai |
|
|
from PyPDF2 import PdfReader |
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
class GeminiProcessor: |
|
|
|
|
|
def __init__(self): |
|
|
self.api_key = os.getenv("GOOGLE_API_KEY") |
|
|
if not self.api_key: |
|
|
raise ValueError("GOOGLE_API_KEY not found") |
|
|
|
|
|
|
|
|
genai.configure(api_key=self.api_key) |
|
|
self.model = genai.GenerativeModel('gemini-pro') |
|
|
|
|
|
def preprocess_text(self, text: str) -> str: |
|
|
"""Enhanced preprocessing for screenplay text""" |
|
|
|
|
|
text = re.sub(r'<[^>]+>', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\n(INT\.|EXT\.|INT\/EXT\.)\s*\n', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\d+\.$', '', text, flags=re.MULTILINE) |
|
|
text = re.sub(r'\(CONT\'D\)\d*', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+([.,!?])', r'\1', text) |
|
|
|
|
|
|
|
|
text = re.sub(r' +', ' ', text) |
|
|
text = re.sub(r'\n{3,}', '\n\n', text) |
|
|
|
|
|
|
|
|
lines = text.split('\n') |
|
|
cleaned_lines = [] |
|
|
prev_line = None |
|
|
|
|
|
for line in lines: |
|
|
if not line.strip() or line == prev_line: |
|
|
continue |
|
|
if line.strip() in ['INT.', 'EXT.', 'INT/EXT.']: |
|
|
continue |
|
|
cleaned_lines.append(line) |
|
|
prev_line = line |
|
|
|
|
|
return '\n'.join(cleaned_lines) |
|
|
|
|
|
def split_into_scenes(self, text: str) -> list: |
|
|
"""Split screenplay into scenes while preserving headers and content""" |
|
|
|
|
|
scene_pattern = r'((?:INT\.|EXT\.|INT\/EXT\.)[^\n]+\n(?:(?!(?:INT\.|EXT\.|INT\/EXT\.))[^\n]+\n)*)' |
|
|
|
|
|
scenes = re.findall(scene_pattern, text, re.MULTILINE) |
|
|
|
|
|
|
|
|
valid_scenes = [] |
|
|
for scene in scenes: |
|
|
scene = scene.strip() |
|
|
if scene: |
|
|
valid_scenes.append(scene) |
|
|
|
|
|
return valid_scenes |
|
|
|
|
|
def clean_scene(self, scene: str) -> str: |
|
|
"""Process a single scene through Gemini""" |
|
|
prompt = f"""Fix ONLY spacing and indentation in this screenplay scene. |
|
|
DO NOT modify any words or content. DO NOT add or remove lines. |
|
|
Keep original capitalization and formatting: |
|
|
|
|
|
{scene}""" |
|
|
|
|
|
try: |
|
|
response = self.model.generate_content(prompt) |
|
|
if response.text: |
|
|
cleaned = response.text |
|
|
|
|
|
if abs(len(scene.split()) - len(cleaned.split())) <= 3: |
|
|
return cleaned.strip() |
|
|
return scene |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error cleaning scene: {str(e)}") |
|
|
return scene |
|
|
|
|
|
def process_screenplay(self, pdf_path: str, output_path: str) -> bool: |
|
|
"""Process entire screenplay""" |
|
|
try: |
|
|
|
|
|
with open(pdf_path, 'rb') as file: |
|
|
pdf = PdfReader(file) |
|
|
text = '\n'.join(page.extract_text() for page in pdf.pages) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
text = self.preprocess_text(text) |
|
|
|
|
|
|
|
|
scenes = self.split_into_scenes(text) |
|
|
print(f"Found {len(scenes)} scenes") |
|
|
|
|
|
|
|
|
cleaned_scenes = [] |
|
|
for scene in tqdm(scenes, desc="Processing scenes"): |
|
|
cleaned = self.clean_scene(scene) |
|
|
if cleaned: |
|
|
cleaned = self.preprocess_text(cleaned) |
|
|
cleaned_scenes.append(cleaned) |
|
|
|
|
|
|
|
|
Path(output_path).parent.mkdir(parents=True, exist_ok=True) |
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
|
f.write('\n\n'.join(cleaned_scenes)) |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing screenplay: {str(e)}") |
|
|
return False |
|
|
|