|
import logging |
|
import os |
|
from pathlib import Path |
|
import pandas as pd |
|
from sqlalchemy import text |
|
from database_manager import engine |
|
from language_config import LANGUAGES |
|
from datetime import datetime |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
BASE_DIR = Path('datasets') |
|
|
|
def update_parquet_files(): |
|
"""Extract verified records and update Parquet files for each language""" |
|
try: |
|
logger.info("Starting Parquet file update process") |
|
|
|
for lang_code in LANGUAGES.keys(): |
|
try: |
|
|
|
with engine.connect() as conn: |
|
table_exists = conn.execute(text(""" |
|
SELECT EXISTS ( |
|
SELECT FROM information_schema.tables |
|
WHERE table_name = :table_name |
|
) |
|
"""), {"table_name": f"recordings_{lang_code}"}).scalar() |
|
|
|
if not table_exists: |
|
logger.debug(f"No recordings table for language: {lang_code}") |
|
continue |
|
|
|
|
|
query = text(f""" |
|
SELECT r.user_id, r.audio_filename, t.transcription_text as transcription, |
|
r.speaker_name, r.audio_path, r.sampling_rate, r.duration, |
|
r.language, r.gender, r.country, r.state, r.city, |
|
r.status, r.verified_by, r.age_group, r.accent, |
|
r.domain, r.subdomain, t.domain as t_domain, t.subdomain as t_subdomain |
|
FROM recordings_{lang_code} r |
|
LEFT JOIN transcriptions_{lang_code} t ON r.transcription_id = t.transcription_id |
|
WHERE r.status = 'verified' -- Only get verified recordings |
|
""") |
|
|
|
|
|
df_new = pd.read_sql(query, conn) |
|
|
|
if df_new.empty: |
|
logger.debug(f"No verified records for language: {lang_code}") |
|
continue |
|
|
|
|
|
df_new['domain'] = df_new.apply( |
|
lambda row: row['domain'] or row['t_domain'], axis=1) |
|
df_new['subdomain'] = df_new.apply( |
|
lambda row: row['subdomain'] or row['t_subdomain'], axis=1) |
|
|
|
|
|
df_new = df_new.drop(['t_domain', 't_subdomain'], axis=1) |
|
|
|
|
|
lang_dir = BASE_DIR / lang_code |
|
lang_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
parquet_path = lang_dir / f"{lang_code}.parquet" |
|
|
|
|
|
if parquet_path.exists(): |
|
df_existing = pd.read_parquet(parquet_path) |
|
|
|
|
|
existing_files = set(df_existing['audio_filename']) |
|
df_new = df_new[~df_new['audio_filename'].isin(existing_files)] |
|
|
|
if not df_new.empty: |
|
|
|
df_combined = pd.concat([df_existing, df_new], ignore_index=True) |
|
df_combined.to_parquet(parquet_path, index=False) |
|
logger.info(f"Added {len(df_new)} new records to {lang_code}.parquet") |
|
else: |
|
|
|
df_new.to_parquet(parquet_path, index=False) |
|
logger.info(f"Created new Parquet file for {lang_code} with {len(df_new)} records") |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing language {lang_code}: {str(e)}") |
|
continue |
|
|
|
except Exception as e: |
|
logger.error(f"Error updating Parquet files: {str(e)}") |
|
raise |
|
|
|
if __name__ == "__main__": |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
update_parquet_files() |
|
|