Update src/screenplay_analysis.py
Browse files- src/screenplay_analysis.py +29 -105
src/screenplay_analysis.py
CHANGED
|
@@ -2,62 +2,46 @@ import os
|
|
| 2 |
import requests
|
| 3 |
import PyPDF2
|
| 4 |
from tqdm import tqdm
|
|
|
|
| 5 |
|
| 6 |
-
#
|
| 7 |
API_URL = "https://api-inference.huggingface.co/models/meta-llama/Llama-3.2-1B-Instruct"
|
| 8 |
headers = {"Authorization": f"Bearer {os.environ['HUGGING_FACE_API_KEY']}"}
|
| 9 |
|
| 10 |
def query(payload):
|
|
|
|
| 11 |
response = requests.post(API_URL, headers=headers, json=payload)
|
| 12 |
return response.json()
|
| 13 |
|
| 14 |
-
|
| 15 |
-
|
| 16 |
-
"""Check if the PDF file exists and is valid"""
|
| 17 |
-
if not os.path.exists(file_path):
|
| 18 |
-
print(f"Error: File not found at path: {file_path}")
|
| 19 |
-
return False
|
| 20 |
-
if not file_path.lower().endswith('.pdf'):
|
| 21 |
-
print("Error: File is not a PDF")
|
| 22 |
-
return False
|
| 23 |
-
return True
|
| 24 |
-
|
| 25 |
-
def extract_text_from_pdf(file_path: str, max_pages: int = 20) -> str:
|
| 26 |
-
"""Extract text from first n pages of PDF"""
|
| 27 |
-
if not validate_pdf(file_path):
|
| 28 |
-
return None
|
| 29 |
-
|
| 30 |
try:
|
| 31 |
-
|
| 32 |
-
|
| 33 |
-
|
| 34 |
-
|
| 35 |
-
|
| 36 |
-
|
| 37 |
-
|
| 38 |
-
|
| 39 |
-
|
| 40 |
-
|
| 41 |
-
|
| 42 |
-
|
| 43 |
-
|
| 44 |
-
|
| 45 |
-
|
| 46 |
-
|
| 47 |
-
|
| 48 |
-
|
| 49 |
-
|
| 50 |
-
|
| 51 |
-
|
| 52 |
except Exception as e:
|
| 53 |
print(f"An error occurred: {str(e)}")
|
| 54 |
return None
|
| 55 |
|
| 56 |
-
# NEW FUNCTION GOES HERE - ADD THIS CHUNK FUNCTION RIGHT HERE, AFTER THE PDF FUNCTIONS BUT BEFORE THE FILE CHECKING CODE
|
| 57 |
def create_screenplay_chunks(text: str, chunk_size: int = 1000) -> list:
|
| 58 |
-
"""
|
| 59 |
-
Split screenplay into chunks at natural break points (scene headings where possible)
|
| 60 |
-
"""
|
| 61 |
# Split text into lines
|
| 62 |
lines = text.split('\n')
|
| 63 |
chunks = []
|
|
@@ -73,8 +57,7 @@ def create_screenplay_chunks(text: str, chunk_size: int = 1000) -> list:
|
|
| 73 |
current_chunk = []
|
| 74 |
current_length = 0
|
| 75 |
|
| 76 |
-
# Scene headings
|
| 77 |
-
# should start new chunks when possible
|
| 78 |
if line.strip().upper() == line.strip() and ('INT.' in line or 'EXT.' in line):
|
| 79 |
if current_chunk: # If we have a current chunk, save it
|
| 80 |
chunks.append('\n'.join(current_chunk))
|
|
@@ -90,28 +73,6 @@ def create_screenplay_chunks(text: str, chunk_size: int = 1000) -> list:
|
|
| 90 |
|
| 91 |
return chunks
|
| 92 |
|
| 93 |
-
# File checking code
|
| 94 |
-
print("Files in current directory:")
|
| 95 |
-
for file in os.listdir():
|
| 96 |
-
print(file)
|
| 97 |
-
|
| 98 |
-
# Try to extract from PDF
|
| 99 |
-
pdf_path = "./F20.pdf" # assuming F20.pdf is the exact filename
|
| 100 |
-
print("\nTrying to extract from:", pdf_path)
|
| 101 |
-
result = extract_text_from_pdf(pdf_path)
|
| 102 |
-
|
| 103 |
-
# If successful, show first 500 characters
|
| 104 |
-
if result:
|
| 105 |
-
print("\nFirst 500 characters of extracted text:")
|
| 106 |
-
print("-" * 50)
|
| 107 |
-
print(result[:500])
|
| 108 |
-
# NEW TEST CODE GOES HERE - ADD THIS RIGHT AFTER SHOWING THE FIRST 500 CHARACTERS
|
| 109 |
-
chunks = create_screenplay_chunks(result)
|
| 110 |
-
print(f"\nCreated {len(chunks)} chunks from the screenplay")
|
| 111 |
-
print("\nPreview of first chunk:")
|
| 112 |
-
print("-" * 50)
|
| 113 |
-
print(chunks[0])
|
| 114 |
-
|
| 115 |
def process_screenplay_chunk(chunk: str, chunk_num: int) -> str:
|
| 116 |
"""Process a single chunk of screenplay text"""
|
| 117 |
prompt = f"""<s>[INST]Clean this screenplay text, maintaining exact content and format:
|
|
@@ -136,7 +97,7 @@ def process_screenplay_chunk(chunk: str, chunk_num: int) -> str:
|
|
| 136 |
if len(cleaned_text) < 10 or len(cleaned_text) > len(chunk) * 3:
|
| 137 |
print(f"Warning: Chunk {chunk_num + 1} output seems invalid, using original")
|
| 138 |
return chunk
|
| 139 |
-
print(f"Processed chunk {chunk_num + 1}
|
| 140 |
return cleaned_text
|
| 141 |
else:
|
| 142 |
print(f"Error processing chunk {chunk_num + 1}: Unexpected output format")
|
|
@@ -144,41 +105,4 @@ def process_screenplay_chunk(chunk: str, chunk_num: int) -> str:
|
|
| 144 |
|
| 145 |
except Exception as e:
|
| 146 |
print(f"Error processing chunk {chunk_num + 1}: {str(e)}")
|
| 147 |
-
return chunk
|
| 148 |
-
|
| 149 |
-
# Test the processing with just the first chunk
|
| 150 |
-
if result:
|
| 151 |
-
chunks = create_screenplay_chunks(result)
|
| 152 |
-
print("\nTesting processing on first chunk:")
|
| 153 |
-
print("-" * 50)
|
| 154 |
-
processed_chunk = process_screenplay_chunk(chunks[0], 0)
|
| 155 |
-
print("\nProcessed result:")
|
| 156 |
-
print("-" * 50)
|
| 157 |
-
print(processed_chunk)
|
| 158 |
-
|
| 159 |
-
# Process all chunks and combine results
|
| 160 |
-
if result:
|
| 161 |
-
all_chunks = create_screenplay_chunks(result)
|
| 162 |
-
processed_chunks = []
|
| 163 |
-
|
| 164 |
-
print("\nProcessing all chunks...")
|
| 165 |
-
for i, chunk in enumerate(all_chunks):
|
| 166 |
-
processed_text = process_screenplay_chunk(chunk, i)
|
| 167 |
-
processed_chunks.append(processed_text)
|
| 168 |
-
|
| 169 |
-
# Combine all processed chunks
|
| 170 |
-
final_text = '\n\n'.join(processed_chunks)
|
| 171 |
-
|
| 172 |
-
# Save to file
|
| 173 |
-
output_file = 'cleaned_screenplay.txt'
|
| 174 |
-
with open(output_file, 'w', encoding='utf-8') as f:
|
| 175 |
-
f.write(final_text)
|
| 176 |
-
|
| 177 |
-
print(f"\nProcessing complete! Saved to {output_file}")
|
| 178 |
-
print(f"Total chunks processed: {len(processed_chunks)}")
|
| 179 |
-
|
| 180 |
-
# Show preview of final result
|
| 181 |
-
print("\nPreview of final cleaned text:")
|
| 182 |
-
print("-" * 50)
|
| 183 |
-
print(final_text[:500])
|
| 184 |
-
|
|
|
|
| 2 |
import requests
|
| 3 |
import PyPDF2
|
| 4 |
from tqdm import tqdm
|
| 5 |
+
import io
|
| 6 |
|
| 7 |
+
# API configuration
|
| 8 |
API_URL = "https://api-inference.huggingface.co/models/meta-llama/Llama-3.2-1B-Instruct"
|
| 9 |
headers = {"Authorization": f"Bearer {os.environ['HUGGING_FACE_API_KEY']}"}
|
| 10 |
|
| 11 |
def query(payload):
|
| 12 |
+
"""Send request to Hugging Face API"""
|
| 13 |
response = requests.post(API_URL, headers=headers, json=payload)
|
| 14 |
return response.json()
|
| 15 |
|
| 16 |
+
def extract_text_from_pdf(pdf_content: bytes, max_pages: int = 20) -> str:
|
| 17 |
+
"""Extract text from first n pages of PDF content"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 18 |
try:
|
| 19 |
+
# Create PDF reader object from bytes content
|
| 20 |
+
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_content))
|
| 21 |
+
|
| 22 |
+
# Get total number of pages
|
| 23 |
+
num_pages = len(pdf_reader.pages)
|
| 24 |
+
print(f"PDF has {num_pages} total pages. Will process first {max_pages} pages...")
|
| 25 |
+
|
| 26 |
+
extracted_text = []
|
| 27 |
+
|
| 28 |
+
# Iterate through pages up to max_pages
|
| 29 |
+
for page_num in range(min(num_pages, max_pages)):
|
| 30 |
+
page = pdf_reader.pages[page_num]
|
| 31 |
+
text = page.extract_text()
|
| 32 |
+
extracted_text.append(text)
|
| 33 |
+
print(f"Processed page {page_num + 1}")
|
| 34 |
+
|
| 35 |
+
final_text = '\n'.join(extracted_text)
|
| 36 |
+
print(f"\nExtraction complete! Total characters: {len(final_text)}")
|
| 37 |
+
return final_text
|
| 38 |
+
|
|
|
|
| 39 |
except Exception as e:
|
| 40 |
print(f"An error occurred: {str(e)}")
|
| 41 |
return None
|
| 42 |
|
|
|
|
| 43 |
def create_screenplay_chunks(text: str, chunk_size: int = 1000) -> list:
|
| 44 |
+
"""Split screenplay into chunks at natural break points"""
|
|
|
|
|
|
|
| 45 |
# Split text into lines
|
| 46 |
lines = text.split('\n')
|
| 47 |
chunks = []
|
|
|
|
| 57 |
current_chunk = []
|
| 58 |
current_length = 0
|
| 59 |
|
| 60 |
+
# Scene headings should start new chunks when possible
|
|
|
|
| 61 |
if line.strip().upper() == line.strip() and ('INT.' in line or 'EXT.' in line):
|
| 62 |
if current_chunk: # If we have a current chunk, save it
|
| 63 |
chunks.append('\n'.join(current_chunk))
|
|
|
|
| 73 |
|
| 74 |
return chunks
|
| 75 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 76 |
def process_screenplay_chunk(chunk: str, chunk_num: int) -> str:
|
| 77 |
"""Process a single chunk of screenplay text"""
|
| 78 |
prompt = f"""<s>[INST]Clean this screenplay text, maintaining exact content and format:
|
|
|
|
| 97 |
if len(cleaned_text) < 10 or len(cleaned_text) > len(chunk) * 3:
|
| 98 |
print(f"Warning: Chunk {chunk_num + 1} output seems invalid, using original")
|
| 99 |
return chunk
|
| 100 |
+
print(f"Processed chunk {chunk_num + 1}")
|
| 101 |
return cleaned_text
|
| 102 |
else:
|
| 103 |
print(f"Error processing chunk {chunk_num + 1}: Unexpected output format")
|
|
|
|
| 105 |
|
| 106 |
except Exception as e:
|
| 107 |
print(f"Error processing chunk {chunk_num + 1}: {str(e)}")
|
| 108 |
+
return chunk
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|