els_journal_date_range / populate_translations.py
neuralworm's picture
initial commit
1032a12
import sqlite3
import logging
from deep_translator import GoogleTranslator, exceptions
from tqdm import tqdm
import threading
import time
from queue import Queue
# Constants
DATABASE_FILE = 'gematria.db' # Use your actual database file name
BATCH_SIZE = 1000
NUM_THREADS = 10 # Number of parallel translation threads
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Initialize the translator
translator = GoogleTranslator(source='yi', target='en')
logging.info("Translator initialized.")
# Separate Queue and tqdm
translation_queue = Queue() # Regular queue
translation_queue_tqdm = tqdm(total=0, dynamic_ncols=True, desc="Translation Queue") # tqdm for the queue
total_translations_tqdm = tqdm(total=0, dynamic_ncols=True, desc="Total Translations") # tqdm for overall progress
# Lock for database access
db_lock = threading.Lock()
translations_completed = 0 # Counter for completed translations
def translate_and_store(phrase: str) -> str:
"""Translates a Hebrew phrase to English using Google Translate."""
global translator
max_retries = 3
retries = 0
while retries < max_retries:
try:
translation = translator.translate(phrase)
return translation
except (exceptions.TranslationNotFound, exceptions.NotValidPayload,
exceptions.ServerException, exceptions.RequestError) as e:
retries += 1
logging.warning(f"Error translating phrase '{phrase}': {e}. Retrying... ({retries}/{max_retries})")
logging.error(f"Failed to translate phrase '{phrase}' after {max_retries} retries.")
return None
def translation_worker():
"""Worker thread to process translations from the queue."""
global conn, translator, translation_queue, db_lock, translation_queue_tqdm, translations_completed, total_translations_tqdm
while True:
phrase = translation_queue.get() # Get from the actual queue
translation_queue_tqdm.update() # Update the tqdm progress bar
if phrase is None: # Sentinel value to stop the thread
break
translation = translate_and_store(phrase)
# Acquire the lock before any database interaction for this phrase
with db_lock:
with sqlite3.connect(DATABASE_FILE) as conn:
cursor = conn.cursor()
if translation is not None:
cursor.execute("UPDATE results SET translation = ? WHERE words = ?", (translation, phrase))
translations_completed += 1 # Increment the global counter
total_translations_tqdm.update() # Update the overall progress bar
conn.commit()
translation_queue.task_done()
def populate_translations():
"""Populates translations for all Hebrew phrases in the database."""
global conn, translator, translation_queue, translation_queue_tqdm, total_translations_tqdm
with sqlite3.connect(DATABASE_FILE) as conn:
cursor = conn.cursor()
# Get the total count of distinct phrases needing translation
cursor.execute("SELECT COUNT(DISTINCT words) FROM results WHERE translation IS NULL")
total_phrases = cursor.fetchone()[0]
logging.info(f"Found {total_phrases} distinct phrases to translate.")
# Get distinct Hebrew phrases that need translation using a generator
cursor.execute("SELECT DISTINCT words FROM results WHERE translation IS NULL")
phrases_generator = (phrase for phrase, in cursor) # Use a generator for tqdm
# Set the total for both tqdm progress bars
translation_queue_tqdm.total = total_phrases
total_translations_tqdm.total = total_phrases
# Build the translation queue first
for phrase in phrases_generator:
translation_queue.put(phrase) # Put into the actual queue
translation_queue_tqdm.update() # Update tqdm progress bar
# Close the translation queue tqdm after it's fully populated
translation_queue_tqdm.close()
# Start worker threads AFTER the queue is built
threads = []
for _ in range(NUM_THREADS):
thread = threading.Thread(target=translation_worker)
thread.start()
threads.append(thread)
# Wait for all tasks to be completed
translation_queue.join()
# Stop worker threads
for _ in range(NUM_THREADS):
translation_queue.put(None) # Sentinel value to stop threads
for thread in threads:
thread.join()
logging.info("All translations completed.")
def save_translations_periodically():
"""Saves translations to the database every minute."""
while True:
time.sleep(60) # Wait for 1 minute
logging.info("Saving translations to the database...")
with db_lock: # Acquire the lock before saving
with sqlite3.connect(DATABASE_FILE) as conn:
conn.commit()
logging.info("Translations saved.")
if __name__ == "__main__":
# Start the translation process in a separate thread
translation_thread = threading.Thread(target=populate_translations)
translation_thread.start()
# Start the periodic saving thread
save_thread = threading.Thread(target=save_translations_periodically)
save_thread.start()
# Keep the main thread alive
while True:
time.sleep(1)