Spaces:
Sleeping
Sleeping
import sqlite3 | |
import logging | |
from deep_translator import GoogleTranslator, exceptions | |
from tqdm import tqdm | |
import threading | |
import time | |
from queue import Queue | |
# Constants | |
DATABASE_FILE = 'gematria.db' # Use your actual database file name | |
BATCH_SIZE = 1000 | |
NUM_THREADS = 10 # Number of parallel translation threads | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Initialize the translator | |
translator = GoogleTranslator(source='yi', target='en') | |
logging.info("Translator initialized.") | |
# Separate Queue and tqdm | |
translation_queue = Queue() # Regular queue | |
translation_queue_tqdm = tqdm(total=0, dynamic_ncols=True, desc="Translation Queue") # tqdm for the queue | |
total_translations_tqdm = tqdm(total=0, dynamic_ncols=True, desc="Total Translations") # tqdm for overall progress | |
# Lock for database access | |
db_lock = threading.Lock() | |
translations_completed = 0 # Counter for completed translations | |
def translate_and_store(phrase: str) -> str: | |
"""Translates a Hebrew phrase to English using Google Translate.""" | |
global translator | |
max_retries = 3 | |
retries = 0 | |
while retries < max_retries: | |
try: | |
translation = translator.translate(phrase) | |
return translation | |
except (exceptions.TranslationNotFound, exceptions.NotValidPayload, | |
exceptions.ServerException, exceptions.RequestError) as e: | |
retries += 1 | |
logging.warning(f"Error translating phrase '{phrase}': {e}. Retrying... ({retries}/{max_retries})") | |
logging.error(f"Failed to translate phrase '{phrase}' after {max_retries} retries.") | |
return None | |
def translation_worker(): | |
"""Worker thread to process translations from the queue.""" | |
global conn, translator, translation_queue, db_lock, translation_queue_tqdm, translations_completed, total_translations_tqdm | |
while True: | |
phrase = translation_queue.get() # Get from the actual queue | |
translation_queue_tqdm.update() # Update the tqdm progress bar | |
if phrase is None: # Sentinel value to stop the thread | |
break | |
translation = translate_and_store(phrase) | |
# Acquire the lock before any database interaction for this phrase | |
with db_lock: | |
with sqlite3.connect(DATABASE_FILE) as conn: | |
cursor = conn.cursor() | |
if translation is not None: | |
cursor.execute("UPDATE results SET translation = ? WHERE words = ?", (translation, phrase)) | |
translations_completed += 1 # Increment the global counter | |
total_translations_tqdm.update() # Update the overall progress bar | |
conn.commit() | |
translation_queue.task_done() | |
def populate_translations(): | |
"""Populates translations for all Hebrew phrases in the database.""" | |
global conn, translator, translation_queue, translation_queue_tqdm, total_translations_tqdm | |
with sqlite3.connect(DATABASE_FILE) as conn: | |
cursor = conn.cursor() | |
# Get the total count of distinct phrases needing translation | |
cursor.execute("SELECT COUNT(DISTINCT words) FROM results WHERE translation IS NULL") | |
total_phrases = cursor.fetchone()[0] | |
logging.info(f"Found {total_phrases} distinct phrases to translate.") | |
# Get distinct Hebrew phrases that need translation using a generator | |
cursor.execute("SELECT DISTINCT words FROM results WHERE translation IS NULL") | |
phrases_generator = (phrase for phrase, in cursor) # Use a generator for tqdm | |
# Set the total for both tqdm progress bars | |
translation_queue_tqdm.total = total_phrases | |
total_translations_tqdm.total = total_phrases | |
# Build the translation queue first | |
for phrase in phrases_generator: | |
translation_queue.put(phrase) # Put into the actual queue | |
translation_queue_tqdm.update() # Update tqdm progress bar | |
# Close the translation queue tqdm after it's fully populated | |
translation_queue_tqdm.close() | |
# Start worker threads AFTER the queue is built | |
threads = [] | |
for _ in range(NUM_THREADS): | |
thread = threading.Thread(target=translation_worker) | |
thread.start() | |
threads.append(thread) | |
# Wait for all tasks to be completed | |
translation_queue.join() | |
# Stop worker threads | |
for _ in range(NUM_THREADS): | |
translation_queue.put(None) # Sentinel value to stop threads | |
for thread in threads: | |
thread.join() | |
logging.info("All translations completed.") | |
def save_translations_periodically(): | |
"""Saves translations to the database every minute.""" | |
while True: | |
time.sleep(60) # Wait for 1 minute | |
logging.info("Saving translations to the database...") | |
with db_lock: # Acquire the lock before saving | |
with sqlite3.connect(DATABASE_FILE) as conn: | |
conn.commit() | |
logging.info("Translations saved.") | |
if __name__ == "__main__": | |
# Start the translation process in a separate thread | |
translation_thread = threading.Thread(target=populate_translations) | |
translation_thread.start() | |
# Start the periodic saving thread | |
save_thread = threading.Thread(target=save_translations_periodically) | |
save_thread.start() | |
# Keep the main thread alive | |
while True: | |
time.sleep(1) |