File size: 4,518 Bytes
8f08689 15f0f89 8f08689 92a1315 8f08689 15f0f89 8f08689 92a1315 8f08689 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
import logging
import os
from pathlib import Path
import pandas as pd
from sqlalchemy import text
from database_manager import engine
from language_config import LANGUAGES
from datetime import datetime
logger = logging.getLogger(__name__)
# Define base directory for Parquet files
BASE_DIR = Path('datasets')
def update_parquet_files():
"""Extract verified records and update Parquet files for each language"""
try:
logger.info("Starting Parquet file update process")
for lang_code in LANGUAGES.keys():
try:
# Check if recordings table exists
with engine.connect() as conn:
table_exists = conn.execute(text("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = :table_name
)
"""), {"table_name": f"recordings_{lang_code}"}).scalar()
if not table_exists:
logger.debug(f"No recordings table for language: {lang_code}")
continue
# Modified query to only include verified records
query = text(f"""
SELECT r.user_id, r.audio_filename, t.transcription_text as transcription,
r.speaker_name, r.audio_path, r.sampling_rate, r.duration,
r.language, r.gender, r.country, r.state, r.city,
r.status, r.verified_by, r.age_group, r.accent,
r.domain, r.subdomain, t.domain as t_domain, t.subdomain as t_subdomain
FROM recordings_{lang_code} r
LEFT JOIN transcriptions_{lang_code} t ON r.transcription_id = t.transcription_id
WHERE r.status = 'verified' -- Only get verified recordings
""")
# Read verified records into DataFrame
df_new = pd.read_sql(query, conn)
if df_new.empty:
logger.debug(f"No verified records for language: {lang_code}")
continue
# Fill missing domain/subdomain from transcription if available
df_new['domain'] = df_new.apply(
lambda row: row['domain'] or row['t_domain'], axis=1)
df_new['subdomain'] = df_new.apply(
lambda row: row['subdomain'] or row['t_subdomain'], axis=1)
# Drop the temporary columns
df_new = df_new.drop(['t_domain', 't_subdomain'], axis=1)
# Create language directory if needed
lang_dir = BASE_DIR / lang_code
lang_dir.mkdir(parents=True, exist_ok=True)
parquet_path = lang_dir / f"{lang_code}.parquet"
# Merge with existing Parquet if it exists
if parquet_path.exists():
df_existing = pd.read_parquet(parquet_path)
# Identify new records by audio_filename
existing_files = set(df_existing['audio_filename'])
df_new = df_new[~df_new['audio_filename'].isin(existing_files)]
if not df_new.empty:
# Append new records
df_combined = pd.concat([df_existing, df_new], ignore_index=True)
df_combined.to_parquet(parquet_path, index=False)
logger.info(f"Added {len(df_new)} new records to {lang_code}.parquet")
else:
# Create new Parquet file
df_new.to_parquet(parquet_path, index=False)
logger.info(f"Created new Parquet file for {lang_code} with {len(df_new)} records")
except Exception as e:
logger.error(f"Error processing language {lang_code}: {str(e)}")
continue
except Exception as e:
logger.error(f"Error updating Parquet files: {str(e)}")
raise
if __name__ == "__main__":
# Set up logging
logging.basicConfig(level=logging.INFO)
update_parquet_files()
|