|
|
|
import re
|
|
import logging
|
|
from typing import List, Tuple
|
|
from datetime import datetime
|
|
import os
|
|
import unicodedata
|
|
import nltk
|
|
|
|
|
|
nltk.download('punkt', quiet=True)
|
|
from nltk.tokenize import sent_tokenize
|
|
|
|
class SentenceAnalyzer:
|
|
def __init__(self):
|
|
self._setup_logger()
|
|
|
|
|
|
self.SENTENCE_TYPES = ['exclamation', 'question', 'statement', 'ellipsis', 'quote', 'emphasis']
|
|
self.FLAGS = {
|
|
'exclamation': 'EXCL',
|
|
'question': 'QUES',
|
|
'statement': 'STMT',
|
|
'ellipsis': 'ELIP',
|
|
'quote': 'QUOT',
|
|
'emphasis': 'EMPH'
|
|
}
|
|
|
|
self.logger.info("SentenceAnalyzer initialized successfully")
|
|
|
|
def _setup_logger(self):
|
|
"""Set up logging configuration."""
|
|
try:
|
|
|
|
os.makedirs('logs', exist_ok=True)
|
|
|
|
|
|
current_date = datetime.now().strftime('%Y-%m-%d')
|
|
log_file = f'logs/sentence_analyzer_{current_date}.log'
|
|
|
|
|
|
self.logger = logging.getLogger('SentenceAnalyzer')
|
|
self.logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
if self.logger.handlers:
|
|
self.logger.handlers.clear()
|
|
|
|
|
|
file_handler = logging.FileHandler(log_file, encoding='utf-8')
|
|
file_handler.setLevel(logging.DEBUG)
|
|
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
|
|
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
file_handler.setFormatter(formatter)
|
|
console_handler.setFormatter(formatter)
|
|
|
|
|
|
self.logger.addHandler(file_handler)
|
|
self.logger.addHandler(console_handler)
|
|
|
|
self.logger.debug("Logger set up successfully")
|
|
|
|
except Exception as e:
|
|
print(f"Error setting up logger: {str(e)}")
|
|
raise
|
|
|
|
def split_into_sentences(self, text: str) -> List[str]:
|
|
"""Split text into sentences using NLTK's sentence tokenizer."""
|
|
if not text:
|
|
return []
|
|
|
|
self.logger.debug("Starting sentence splitting")
|
|
|
|
|
|
text = unicodedata.normalize('NFC', text)
|
|
self.logger.debug("Normalized text using NFC")
|
|
|
|
|
|
text = re.sub(r'Page \d+|Chapter \d+:.*', '', text)
|
|
self.logger.debug("Removed page numbers and chapter titles")
|
|
|
|
|
|
text = re.sub(r'-\s+\n', '', text)
|
|
text = re.sub(r'-\s+', '', text)
|
|
self.logger.debug("Replaced hyphenated line breaks")
|
|
|
|
|
|
text = re.sub(r'[\r\n]+', ' ', text)
|
|
self.logger.debug("Replaced multiple newlines with a space")
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|
self.logger.debug("Normalized whitespace")
|
|
|
|
|
|
sentences = sent_tokenize(text)
|
|
self.logger.debug(f"Split text into {len(sentences)} sentences using NLTK")
|
|
|
|
|
|
sentences = [sentence.strip() for sentence in sentences if sentence.strip()]
|
|
self.logger.info(f"Split text into {len(sentences)} sentences after cleanup")
|
|
return sentences
|
|
|
|
def analyze_sentence(self, sentence: str) -> Tuple[str, str, str]:
|
|
"""Analyze a sentence and return its type, color (handled by CSS), and flag."""
|
|
if not sentence:
|
|
return ('statement', '', self.FLAGS['statement'])
|
|
|
|
sentence = sentence.strip()
|
|
self.logger.debug(f"Analyzing sentence: '{sentence}'")
|
|
|
|
|
|
def has_complete_quote(text):
|
|
quote_pairs = [
|
|
('"', '"'),
|
|
("'", "'"),
|
|
('“', '”'),
|
|
('‘', '’'),
|
|
('«', '»')
|
|
]
|
|
text = text.strip()
|
|
for open_quote, close_quote in quote_pairs:
|
|
if text.startswith(open_quote) and text.endswith(close_quote):
|
|
|
|
if text.count(open_quote) == text.count(close_quote):
|
|
self.logger.debug(f"Sentence starts and ends with matching quotes: {open_quote}{close_quote}")
|
|
return True
|
|
return False
|
|
|
|
|
|
if has_complete_quote(sentence):
|
|
sent_type = 'quote'
|
|
self.logger.debug("Sentence classified as 'quote'")
|
|
|
|
elif re.search(r'\*[^*]+\*', sentence):
|
|
sent_type = 'emphasis'
|
|
self.logger.debug("Sentence classified as 'emphasis'")
|
|
|
|
elif sentence.endswith(('!', '!')):
|
|
sent_type = 'exclamation'
|
|
self.logger.debug("Sentence classified as 'exclamation'")
|
|
elif sentence.endswith(('?', '?')):
|
|
sent_type = 'question'
|
|
self.logger.debug("Sentence classified as 'question'")
|
|
elif sentence.endswith('…') or sentence.endswith('...'):
|
|
sent_type = 'ellipsis'
|
|
self.logger.debug("Sentence classified as 'ellipsis'")
|
|
else:
|
|
sent_type = 'statement'
|
|
self.logger.debug("Sentence classified as 'statement'")
|
|
|
|
color = ''
|
|
self.logger.debug(f"Sentence type: {sent_type}, Flag: {self.FLAGS[sent_type]}")
|
|
return (sent_type, color, self.FLAGS[sent_type])
|
|
|
|
def clean_sentence(self, sentence: str) -> str:
|
|
"""Remove special characters from the sentence that might confuse TTS models."""
|
|
|
|
pattern = r'[^\w\s.,!?\'"“”‘’«»\-—()]'
|
|
cleaned_sentence = re.sub(pattern, '', sentence)
|
|
self.logger.debug(f"Cleaned sentence: '{cleaned_sentence}'")
|
|
return cleaned_sentence
|
|
|
|
def process_text_interactive(self, text: str) -> str:
|
|
"""Process the text and return HTML-formatted output with interactive sentences."""
|
|
self.logger.info("Starting interactive text processing")
|
|
|
|
if not text:
|
|
self.logger.warning("Empty text received")
|
|
return ''
|
|
|
|
try:
|
|
|
|
text = unicodedata.normalize('NFC', text)
|
|
self.logger.debug("Normalized text using NFC in interactive processing")
|
|
|
|
sentences = self.split_into_sentences(text)
|
|
formatted_output = []
|
|
|
|
for index, sentence in enumerate(sentences, 1):
|
|
sent_type, color, flag = self.analyze_sentence(sentence)
|
|
|
|
formatted_sentence = f'''
|
|
<div class="sentence-row {sent_type}">
|
|
<div class="sentence-number">{index}.</div>
|
|
<div class="sentence-content">
|
|
{sentence}
|
|
</div>
|
|
<div class="sentence-type">{sent_type.capitalize()}</div>
|
|
</div>
|
|
'''
|
|
formatted_output.append(formatted_sentence)
|
|
self.logger.info(f"Processed sentence {index}/{len(sentences)} - Type: {sent_type}")
|
|
self.logger.debug(f"Formatted HTML for sentence {index}: {formatted_sentence}")
|
|
|
|
result = ''.join(formatted_output)
|
|
self.logger.info("Text processing completed successfully")
|
|
return result
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing text: {str(e)}", exc_info=True)
|
|
return f'<span style="color: red;">Error processing text: {str(e)}</span>'
|
|
|
|
def prepare_text_for_tts(self, sentences: List[str]) -> str:
|
|
"""Prepare the text for TTS by cleaning special characters from each sentence."""
|
|
cleaned_sentences = [self.clean_sentence(sentence) for sentence in sentences]
|
|
tts_text = ' '.join(cleaned_sentences)
|
|
self.logger.debug(f"Prepared text for TTS: '{tts_text}'")
|
|
return tts_text
|
|
|
|
def process_text(self, text: str) -> str:
|
|
"""Legacy method for non-interactive processing. Kept for compatibility."""
|
|
self.logger.info("Starting text processing (legacy method)")
|
|
|
|
if not text:
|
|
self.logger.warning("Empty text received")
|
|
return ""
|
|
|
|
try:
|
|
|
|
text = unicodedata.normalize('NFC', text)
|
|
self.logger.debug("Normalized text using NFC in legacy processing")
|
|
|
|
sentences = self.split_into_sentences(text)
|
|
formatted_output = []
|
|
|
|
for index, sentence in enumerate(sentences, 1):
|
|
sent_type, _, flag = self.analyze_sentence(sentence)
|
|
|
|
formatted_sentence = (
|
|
f'<span class="{sent_type}" '
|
|
f'data-flag="{flag}" '
|
|
f'title="Sentence type: {sent_type}">'
|
|
f'{sentence}</span>'
|
|
)
|
|
formatted_output.append(formatted_sentence)
|
|
self.logger.info(f"Processed sentence {index}/{len(sentences)} - Type: {sent_type}")
|
|
self.logger.debug(f"Formatted HTML for sentence {index}: {formatted_sentence}")
|
|
|
|
result = " ".join(formatted_output)
|
|
self.logger.info("Text processing completed successfully")
|
|
return result
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing text: {str(e)}", exc_info=True)
|
|
return f'<span style="color: red;">Error processing text: {str(e)}</span>'
|
|
|