Spaces:
Sleeping
Sleeping
import fitz # PyMuPDF | |
from typing import List, Dict, Any, Tuple | |
import language_tool_python | |
import io | |
def extract_pdf_text(file) -> str: | |
"""Extracts full text from a PDF file using PyMuPDF.""" | |
try: | |
# Open the PDF file | |
doc = fitz.open(stream=file.read(), filetype="pdf") if not isinstance(file, str) else fitz.open(file) | |
full_text = "" | |
for page_num, page in enumerate(doc, start=1): | |
text = page.get_text("text") | |
full_text += text + "\n" | |
print(f"Extracted text from page {page_num}: {len(text)} characters.") | |
doc.close() | |
print(f"Total extracted text length: {len(full_text)} characters.") | |
return full_text | |
except Exception as e: | |
print(f"Error extracting text from PDF: {e}") | |
return "" | |
def check_language_issues(full_text: str) -> Dict[str, Any]: | |
"""Check for language issues using LanguageTool.""" | |
try: | |
language_tool = language_tool_python.LanguageTool('en-US') | |
matches = language_tool.check(full_text) | |
issues = [] | |
for match in matches: | |
issues.append({ | |
"message": match.message, | |
"context": match.context.strip(), | |
"suggestions": match.replacements[:3] if match.replacements else [], | |
"category": match.category, | |
"rule_id": match.ruleId, | |
"offset": match.offset, | |
"length": match.errorLength | |
}) | |
print(f"Total language issues found: {len(issues)}") | |
return { | |
"total_issues": len(issues), | |
"issues": issues | |
} | |
except Exception as e: | |
print(f"Error checking language issues: {e}") | |
return {"error": str(e)} | |
def highlight_issues_in_pdf(file, language_matches: List[Dict[str, Any]]) -> bytes: | |
""" | |
Highlights language issues in the PDF and returns the annotated PDF as bytes. | |
This function maps LanguageTool matches to specific words in the PDF | |
and highlights those words. | |
""" | |
try: | |
# Open the PDF | |
doc = fitz.open(stream=file.read(), filetype="pdf") if not isinstance(file, str) else fitz.open(file) | |
print(f"Opened PDF with {len(doc)} pages.") | |
# Extract words with positions from each page | |
word_list = [] # List of tuples: (page_number, word, x0, y0, x1, y1) | |
for page_number in range(len(doc)): | |
page = doc[page_number] | |
words = page.get_text("words") # List of tuples: (x0, y0, x1, y1, "word", block_no, line_no, word_no) | |
for w in words: | |
word_text = w[4] | |
# **Fix:** Insert a space before '[' to ensure "globally [2]" instead of "globally[2]" | |
if '[' in word_text: | |
word_text = word_text.replace('[', ' [') | |
word_list.append((page_number, word_text, w[0], w[1], w[2], w[3])) | |
print(f"Total words extracted: {len(word_list)}") | |
# Concatenate all words to form the full text | |
concatenated_text = " ".join([w[1] for w in word_list]) | |
print(f"Concatenated text length: {len(concatenated_text)} characters.") | |
# Iterate over each language issue | |
for idx, issue in enumerate(language_matches, start=1): | |
offset = issue["offset"] | |
length = issue["length"] | |
error_text = concatenated_text[offset:offset+length] | |
print(f"\nIssue {idx}: '{error_text}' at offset {offset} with length {length}") | |
# Find the words that fall within the error span | |
current_pos = 0 | |
target_words = [] | |
for word in word_list: | |
word_text = word[1] | |
word_length = len(word_text) + 1 # +1 for the space | |
if current_pos + word_length > offset and current_pos < offset + length: | |
target_words.append(word) | |
current_pos += word_length | |
if not target_words: | |
print("No matching words found for this issue.") | |
continue | |
# Add highlight annotations to the target words | |
for target in target_words: | |
page_num, word_text, x0, y0, x1, y1 = target | |
page = doc[page_num] | |
# Define a rectangle around the word with some padding | |
rect = fitz.Rect(x0 - 1, y0 - 1, x1 + 1, y1 + 1) | |
# Add a highlight annotation | |
highlight = page.add_highlight_annot(rect) | |
highlight.set_colors(stroke=(1, 1, 0)) # Yellow color | |
highlight.update() | |
print(f"Highlighted '{word_text}' on page {page_num + 1} at position ({x0}, {y0}, {x1}, {y1})") | |
# Save annotated PDF to bytes | |
byte_stream = io.BytesIO() | |
doc.save(byte_stream) | |
annotated_pdf_bytes = byte_stream.getvalue() | |
doc.close() | |
# Save annotated PDF locally for verification | |
with open("annotated_temp.pdf", "wb") as f: | |
f.write(annotated_pdf_bytes) | |
print("Annotated PDF saved as 'annotated_temp.pdf' for manual verification.") | |
return annotated_pdf_bytes | |
except Exception as e: | |
print(f"Error in highlighting PDF: {e}") | |
return b"" | |
def analyze_pdf(file) -> Tuple[Dict[str, Any], bytes]: | |
"""Analyzes the PDF for language issues and returns results and annotated PDF.""" | |
try: | |
# Reset file pointer before reading | |
file.seek(0) | |
full_text = extract_pdf_text(file) | |
if not full_text: | |
return {"error": "Failed to extract text from PDF."}, None | |
language_issues = check_language_issues(full_text) | |
if "error" in language_issues: | |
return language_issues, None | |
issues = language_issues.get("issues", []) | |
# Reset file pointer before highlighting | |
file.seek(0) | |
annotated_pdf = highlight_issues_in_pdf(file, issues) if issues else None | |
return language_issues, annotated_pdf | |
except Exception as e: | |
return {"error": str(e)}, None |