gemma-demo / tests /test_media.py
AndyC
added additional tests for pdf processing
4644da6
import pytest
import os
import cv2
from PIL import Image
from pathlib import Path
import tempfile
from app import get_frames, process_video, process_user_input, process_history, extract_pdf_text
# Get the project root directory
ROOT_DIR = Path(__file__).parent.parent
def test_correct_frame_return():
"""Test that get_frames returns a list of (Image, float) tuples."""
# Path to a test video file
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
# Ensure the test video exists
assert os.path.exists(video_path)
# Test with a small number of frames
max_images = 3
frames = get_frames(video_path, max_images)
assert isinstance(frames, list)
assert all(isinstance(item, tuple) and len(item) == 2 for item in frames)
assert all(
isinstance(img, Image.Image) and isinstance(ts, float) for img, ts in frames
)
def test_process_video_structure():
"""Test that process_video returns the expected list structure."""
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
max_images = 2
result = process_video(video_path, max_images)
# Should have 2 items (text + image) per frame
assert len(result) == max_images * 2
# Check structure of items
for i in range(0, len(result), 2):
# Text item
assert result[i]["type"] == "text"
assert result[i]["text"].startswith("Frame ")
# Image item
assert result[i + 1]["type"] == "image"
assert "url" in result[i + 1]
assert os.path.exists(result[i + 1]["url"])
# Verify the image file is valid
try:
img = Image.open(result[i + 1]["url"])
img.verify() # Make sure it's a valid image
except Exception as e:
pytest.fail(f"Invalid image file: {e}")
def test_process_video_timestamps():
"""Test that timestamps in the result are properly formatted."""
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
max_images = 3
result = process_video(video_path, max_images)
# Extract timestamps from text items
timestamps = []
for i in range(0, len(result), 2):
if result[i]["type"] == "text":
# Extract timestamp from "Frame X.XX:" format
timestamp_text = result[i]["text"].split()[1].rstrip(":")
timestamps.append(float(timestamp_text))
# Check timestamps are ascending
assert len(timestamps) == max_images
assert all(timestamps[i] <= timestamps[i + 1] for i in range(len(timestamps) - 1))
def test_process_video_temp_files():
"""Test that temporary files are created and cleaned up properly."""
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
max_images = 1
result = process_video(video_path, max_images)
# Verify temp file exists
image_path = result[1]["url"]
assert os.path.exists(image_path)
assert image_path.endswith(".png")
def test_process_video_invalid_path():
"""Test that process_video handles invalid paths appropriately."""
with pytest.raises(ValueError):
process_video("nonexistent_video.mp4", 3)
def test_process_user_input_text_only():
"""Test processing user input with text only (no files)."""
message = {
"text": "This is a test message",
"files": []
}
# Add the max_images parameter
result = process_user_input(message, 5)
# Should return a single text item
assert len(result) == 1
assert result[0]["type"] == "text"
assert result[0]["text"] == "This is a test message"
def test_process_user_input_with_video():
"""Test processing user input with a video file."""
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
assert os.path.exists(video_path), f"Test video not found at {video_path}"
message = {
"text": "Video analysis",
"files": [video_path]
}
result = process_user_input(message, 4)
# Should have at least 3 items (text + at least one frame with text and image)
assert len(result) >= 3
# First item should be the message text
assert result[0]["type"] == "text"
assert result[0]["text"] == "Video analysis"
# Following items should be frame text and images
assert result[1]["type"] == "text"
assert result[1]["text"].startswith("Frame ")
assert result[2]["type"] == "image"
assert "url" in result[2]
assert os.path.exists(result[2]["url"])
def test_process_user_input_with_images():
"""Test processing user input with image files."""
# Create temporary image files for testing
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as img1, \
tempfile.NamedTemporaryFile(suffix=".png", delete=False) as img2:
image_paths = [img1.name, img2.name]
message = {
"text": "Image analysis",
"files": image_paths
}
result = process_user_input(message, 5)
# Should have 3 items (text + 2 images)
assert len(result) == 3
# First item should be the message text
assert result[0]["type"] == "text"
assert result[0]["text"] == "Image analysis"
# Following items should be images
assert result[1]["type"] == "image"
assert result[1]["url"] == image_paths[0]
assert result[2]["type"] == "image"
assert result[2]["url"] == image_paths[1]
# Clean up temp files
for path in image_paths:
if os.path.exists(path):
os.unlink(path)
def test_process_user_input_empty_text():
"""Test processing user input with empty text but with files."""
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
message = {
"text": "", # Empty text
"files": [video_path]
}
# Add max_images parameter
result = process_user_input(message, 3)
# First item should be empty text
assert result[0]["type"] == "text"
assert result[0]["text"] == ""
# Rest should be video frames
assert len(result) > 1
def test_process_user_input_handles_empty_files_list():
"""Test that an empty files list is handled correctly."""
message = {
"text": "No files",
"files": []
}
# Add max_images parameter
result = process_user_input(message, 3)
assert len(result) == 1
assert result[0]["type"] == "text"
assert result[0]["text"] == "No files"
def test_process_user_input_max_images_effect():
"""Test that max_images parameter correctly limits the number of frames."""
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
message = {
"text": "Video with few frames",
"files": [video_path]
}
result_few = process_user_input(message, 2)
result_many = process_user_input(message, 5)
# Count actual frames (each frame has a text and image entry)
frames_few = (len(result_few) - 1) // 2 # -1 for initial text message
frames_many = (len(result_many) - 1) // 2
# Should respect max_images parameter
assert frames_few <= 2
assert frames_many <= 5
assert frames_few < frames_many
def test_process_history_basic_functionality():
"""Test basic conversation processing and content buffering."""
# Empty history
assert process_history([]) == []
# Simple conversation
history = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
{"role": "user", "content": "How are you?"}
]
result = process_history(history)
assert len(result) == 3
assert result[0] == {"role": "user", "content": [{"type": "text", "text": "Hello"}]}
assert result[1] == {"role": "assistant", "content": [{"type": "text", "text": "Hi there!"}]}
assert result[2] == {"role": "user", "content": [{"type": "text", "text": "How are you?"}]}
def test_process_history_file_handling():
"""Test processing of different file types and content buffering."""
# Create temp image file
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as img:
image_path = img.name
video_path = os.path.join(ROOT_DIR, "assets", "test_video.mp4")
try:
history = [
{"role": "user", "content": (image_path,)},
{"role": "user", "content": "What's this image?"},
{"role": "user", "content": (video_path,)},
{"role": "assistant", "content": "I see an image and video."},
{"role": "user", "content": "First"},
{"role": "user", "content": "Second"},
{"role": "user", "content": "Third"} # Multiple user messages at end
]
result = process_history(history)
assert len(result) == 3
# First user turn: image + text + video
assert result[0]["role"] == "user"
assert len(result[0]["content"]) == 3
assert result[0]["content"][0] == {"type": "image", "url": image_path}
assert result[0]["content"][1] == {"type": "text", "text": "What's this image?"}
assert result[0]["content"][2] == {"type": "text", "text": "[Video uploaded previously]"}
# Assistant response
assert result[1]["role"] == "assistant"
assert result[1]["content"] == [{"type": "text", "text": "I see an image and video."}]
# Final user turn: multiple buffered messages
assert result[2]["role"] == "user"
assert len(result[2]["content"]) == 3
assert result[2]["content"][0] == {"type": "text", "text": "First"}
assert result[2]["content"][1] == {"type": "text", "text": "Second"}
assert result[2]["content"][2] == {"type": "text", "text": "Third"}
finally:
if os.path.exists(image_path):
os.unlink(image_path)
def test_extract_pdf_text_nonexistent_file():
"""Test that extract_pdf_text handles non-existent files appropriately."""
with pytest.raises(ValueError, match="File not found"):
extract_pdf_text("nonexistent_file.pdf")
def test_extract_pdf_text_with_mock_pdf():
"""Test PDF text extraction with a simple PDF file."""
import fitz # PyMuPDF
# Create a temporary PDF with some text content
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
pdf_path = temp_pdf.name
try:
# Create a simple PDF with text
doc = fitz.open() # Create new PDF
page = doc.new_page()
# Add some text to the page
text_content = "This is a test PDF document.\nIt contains multiple lines of text.\nPage 1 content here."
page.insert_text((50, 100), text_content, fontsize=12)
# Save the PDF
doc.save(pdf_path)
doc.close()
# Test the extract_pdf_text function
result = extract_pdf_text(pdf_path)
# Verify the extracted text contains our content
assert isinstance(result, str)
assert "This is a test PDF document" in result
assert "Page 1:" in result # Should include page number
assert "multiple lines of text" in result
finally:
# Clean up the temporary PDF file
if os.path.exists(pdf_path):
os.unlink(pdf_path)
def test_extract_pdf_text_empty_pdf():
"""Test PDF text extraction with an empty PDF (no text content)."""
import fitz
# Create a temporary empty PDF
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
pdf_path = temp_pdf.name
try:
# Create an empty PDF
doc = fitz.open() # Create new PDF
page = doc.new_page() # Add empty page
doc.save(pdf_path)
doc.close()
# Test the extract_pdf_text function
result = extract_pdf_text(pdf_path)
# Should return message about no content
assert result == "No text content found in the PDF."
finally:
# Clean up
if os.path.exists(pdf_path):
os.unlink(pdf_path)
def test_extract_pdf_text_multipage():
"""Test PDF text extraction with multiple pages."""
import fitz
# Create a temporary multi-page PDF
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
pdf_path = temp_pdf.name
try:
# Create a PDF with multiple pages
doc = fitz.open()
# Page 1
page1 = doc.new_page()
page1.insert_text((50, 100), "Content from page one.", fontsize=12)
# Page 2
page2 = doc.new_page()
page2.insert_text((50, 100), "Content from page two.", fontsize=12)
# Page 3 (empty)
page3 = doc.new_page()
# Page 4
page4 = doc.new_page()
page4.insert_text((50, 100), "Content from page four.", fontsize=12)
doc.save(pdf_path)
doc.close()
# Test the extract_pdf_text function
result = extract_pdf_text(pdf_path)
# Verify all pages with content are included
assert "Page 1:" in result
assert "Content from page one" in result
assert "Page 2:" in result
assert "Content from page two" in result
assert "Page 4:" in result
assert "Content from page four" in result
# Page 3 should be excluded (empty)
assert "Page 3:" not in result
# Check that pages are separated properly
assert "\n\n" in result # Pages should be separated by double newlines
finally:
# Clean up
if os.path.exists(pdf_path):
os.unlink(pdf_path)
def test_process_user_input_with_pdf():
"""Test processing user input with a PDF file."""
import fitz
# Create a temporary PDF for testing
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
pdf_path = temp_pdf.name
try:
# Create a simple PDF
doc = fitz.open()
page = doc.new_page()
page.insert_text((50, 100), "Test PDF content for user input processing.", fontsize=12)
doc.save(pdf_path)
doc.close()
# Test processing user input with PDF
message = {
"text": "Analyze this PDF",
"files": [pdf_path]
}
result = process_user_input(message, 3)
# Should have 2 items (original text + PDF content)
assert len(result) == 2
# First item should be the message text
assert result[0]["type"] == "text"
assert result[0]["text"] == "Analyze this PDF"
# Second item should be PDF content
assert result[1]["type"] == "text"
assert "PDF Content:" in result[1]["text"]
assert "Test PDF content for user input processing" in result[1]["text"]
assert "Page 1:" in result[1]["text"]
finally:
# Clean up
if os.path.exists(pdf_path):
os.unlink(pdf_path)
def test_process_user_input_pdf_error_handling():
"""Test that PDF processing errors are handled gracefully."""
# Create a file that looks like a PDF but isn't valid
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file:
temp_file.write(b"This is not a valid PDF file content")
invalid_pdf_path = temp_file.name
try:
message = {
"text": "Process invalid PDF",
"files": [invalid_pdf_path]
}
result = process_user_input(message, 3)
# Should have 2 items (original text + error message)
assert len(result) == 2
# First item should be the message text
assert result[0]["type"] == "text"
assert result[0]["text"] == "Process invalid PDF"
# Second item should be error message
assert result[1]["type"] == "text"
assert "Error processing PDF:" in result[1]["text"]
finally:
# Clean up
if os.path.exists(invalid_pdf_path):
os.unlink(invalid_pdf_path)
def test_process_history_with_pdf():
"""Test that PDF files in history are handled correctly."""
import fitz
# Create a temporary PDF for testing
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf:
pdf_path = temp_pdf.name
try:
# Create a simple PDF
doc = fitz.open()
page = doc.new_page()
page.insert_text((50, 100), "Historical PDF content.", fontsize=12)
doc.save(pdf_path)
doc.close()
# Test history with PDF file
history = [
{"role": "user", "content": (pdf_path,)},
{"role": "user", "content": "What does this PDF contain?"},
{"role": "assistant", "content": "The PDF contains some text."},
{"role": "user", "content": "Thanks!"}
]
result = process_history(history)
# Should have 3 messages (user turn, assistant turn, final user turn)
assert len(result) == 3
# First user turn should have PDF placeholder and text
assert result[0]["role"] == "user"
assert len(result[0]["content"]) == 2
assert result[0]["content"][0] == {"type": "text", "text": "[PDF uploaded previously]"}
assert result[0]["content"][1] == {"type": "text", "text": "What does this PDF contain?"}
# Assistant response
assert result[1]["role"] == "assistant"
assert result[1]["content"] == [{"type": "text", "text": "The PDF contains some text."}]
# Final user message
assert result[2]["role"] == "user"
assert result[2]["content"] == [{"type": "text", "text": "Thanks!"}]
finally:
# Clean up
if os.path.exists(pdf_path):
os.unlink(pdf_path)