|
""" |
|
H1B Data Analytics Pipeline |
|
|
|
This module provides a comprehensive ETL pipeline for processing H1B visa application data. |
|
It loads CSV files into DuckDB, creates dimensional models, and performs data quality checks. |
|
""" |
|
|
|
import os |
|
import gc |
|
import logging |
|
import hashlib |
|
from datetime import datetime |
|
from typing import List, Optional, Tuple |
|
import traceback |
|
|
|
import duckdb |
|
import pandas as pd |
|
import numpy as np |
|
import psutil |
|
|
|
|
|
class H1BDataPipeline: |
|
""" |
|
Main pipeline class for processing H1B visa application data. |
|
|
|
This class handles the complete ETL process including: |
|
- Loading CSV files into DuckDB |
|
- Creating dimensional models |
|
- Data quality checks |
|
- Database persistence |
|
""" |
|
|
|
def __init__(self, db_path: str = ':memory:', log_level: int = logging.INFO): |
|
""" |
|
Initialize the H1B data pipeline. |
|
|
|
Args: |
|
db_path: Path to DuckDB database file. Use ':memory:' for in-memory database. |
|
log_level: Logging level for the pipeline. |
|
""" |
|
self.db_path = db_path |
|
self.conn = None |
|
self.logger = self._setup_logging(log_level) |
|
self._setup_database() |
|
|
|
def _setup_logging(self, log_level: int) -> logging.Logger: |
|
"""Set up logging configuration for the pipeline.""" |
|
logger = logging.getLogger(__name__) |
|
logging.basicConfig( |
|
level=log_level, |
|
format="{asctime} - {name} - {levelname} - {message}", |
|
style="{", |
|
datefmt="%Y-%m-%d %H:%M:%S", |
|
) |
|
return logger |
|
|
|
def _setup_database(self) -> None: |
|
"""Initialize DuckDB connection.""" |
|
try: |
|
self.conn = duckdb.connect(self.db_path) |
|
self.logger.info(f"DuckDB connection established to {self.db_path}") |
|
self.logger.info(f"DuckDB version: {duckdb.__version__}") |
|
|
|
|
|
test_result = self.conn.execute("SELECT 'Hello DuckDB!' as message").fetchone() |
|
self.logger.info(f"Connection test: {test_result[0]}") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Failed to establish database connection: {e}") |
|
raise |
|
|
|
def __enter__(self): |
|
"""Context manager entry.""" |
|
return self |
|
|
|
def close(self) -> None: |
|
"""Close database connection and cleanup resources.""" |
|
if self.conn: |
|
self.conn.close() |
|
self.logger.info("Database connection closed") |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
"""Context manager exit with cleanup.""" |
|
self.close() |
|
|
|
|
|
class MemoryManager: |
|
"""Utility class for monitoring and managing memory usage.""" |
|
|
|
@staticmethod |
|
def check_memory_usage() -> float: |
|
""" |
|
Check current memory usage of the process. |
|
|
|
Returns: |
|
Memory usage in MB. |
|
""" |
|
process = psutil.Process(os.getpid()) |
|
memory_mb = process.memory_info().rss / 1024 / 1024 |
|
print(f"Current memory usage: {memory_mb:.1f} MB") |
|
return memory_mb |
|
|
|
@staticmethod |
|
def clear_memory() -> None: |
|
"""Force garbage collection to clear memory.""" |
|
gc.collect() |
|
print("Memory cleared") |
|
|
|
|
|
class FileValidator: |
|
"""Utility class for validating file existence and accessibility.""" |
|
|
|
@staticmethod |
|
def validate_files(file_paths: List[str]) -> Tuple[List[str], List[str]]: |
|
""" |
|
Validate that files exist and are accessible. |
|
|
|
Args: |
|
file_paths: List of file paths to validate. |
|
|
|
Returns: |
|
Tuple of (existing_files, missing_files). |
|
""" |
|
existing_files = [] |
|
missing_files = [] |
|
|
|
for file_path in file_paths: |
|
if os.path.exists(file_path): |
|
existing_files.append(file_path) |
|
print(f"✓ Found: {file_path}") |
|
else: |
|
missing_files.append(file_path) |
|
print(f"✗ Missing: {file_path}") |
|
|
|
return existing_files, missing_files |
|
|
|
|
|
class DataLoader: |
|
"""Handles loading data from various sources into DuckDB.""" |
|
|
|
def __init__(self, conn: duckdb.DuckDBPyConnection, logger: logging.Logger): |
|
""" |
|
Initialize data loader. |
|
|
|
Args: |
|
conn: DuckDB connection object. |
|
logger: Logger instance for tracking operations. |
|
""" |
|
self.conn = conn |
|
self.logger = logger |
|
|
|
def load_csv_files(self, file_paths: List[str]) -> None: |
|
""" |
|
Load CSV files directly into DuckDB without loading into pandas first. |
|
|
|
Args: |
|
file_paths: List of CSV file paths to load. |
|
""" |
|
self.logger.info("Loading CSV files directly into DuckDB...") |
|
|
|
for file_path in file_paths: |
|
try: |
|
self._load_single_csv(file_path) |
|
except Exception as e: |
|
self.logger.error(f"Error loading {file_path}: {e}") |
|
|
|
def _load_single_csv(self, file_path: str) -> None: |
|
""" |
|
Load a single CSV file into DuckDB. |
|
|
|
Args: |
|
file_path: Path to the CSV file. |
|
""" |
|
self.logger.info(f"Loading {file_path}") |
|
|
|
|
|
filename = file_path.split('/')[-1].replace('.csv', '') |
|
table_name = f"raw_{filename}" |
|
fiscal_year = self._extract_fiscal_year(filename) |
|
|
|
|
|
self.conn.execute(f""" |
|
CREATE TABLE {table_name} AS |
|
SELECT *, |
|
'{file_path}' as source_file, |
|
'{fiscal_year}' as fiscal_year |
|
FROM read_csv_auto('{file_path}', header=true, normalize_names=true, ignore_errors=true) |
|
""") |
|
|
|
|
|
self._clean_column_names(table_name) |
|
|
|
|
|
count = self.conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0] |
|
self.logger.info(f"Loaded {count:,} records from {file_path} into {table_name}") |
|
|
|
def _extract_fiscal_year(self, filename: str) -> str: |
|
"""Extract fiscal year from filename.""" |
|
import re |
|
match = re.search(r'FY(\d{4})', filename) |
|
if match: |
|
return match.group(1) |
|
return "unknown" |
|
|
|
def _clean_column_names(self, table_name: str) -> None: |
|
""" |
|
Clean column names in DuckDB table. |
|
|
|
Args: |
|
table_name: Name of the table to clean. |
|
""" |
|
columns_query = f"PRAGMA table_info('{table_name}')" |
|
columns_info = self.conn.execute(columns_query).fetchall() |
|
|
|
for col_info in columns_info: |
|
old_name = col_info[1] |
|
new_name = self._normalize_column_name(old_name) |
|
|
|
if old_name != new_name: |
|
self.conn.execute(f""" |
|
ALTER TABLE {table_name} |
|
RENAME COLUMN "{old_name}" TO {new_name} |
|
""") |
|
|
|
@staticmethod |
|
def _normalize_column_name(column_name: str) -> str: |
|
""" |
|
Normalize column name to follow consistent naming convention. |
|
|
|
Args: |
|
column_name: Original column name. |
|
|
|
Returns: |
|
Normalized column name. |
|
""" |
|
import re |
|
|
|
|
|
normalized = re.sub(r'https?://[^\s]+', '', str(column_name)) |
|
normalized = re.sub(r'[^\w\s]', '_', normalized) |
|
normalized = re.sub(r'\s+', '_', normalized) |
|
normalized = re.sub(r'_+', '_', normalized) |
|
normalized = normalized.lower().strip('_') |
|
|
|
|
|
if normalized and not (normalized[0].isalpha() or normalized[0] == '_'): |
|
normalized = f'col_{normalized}' |
|
|
|
return normalized if normalized else 'unnamed_column' |
|
|
|
|
|
class DataTransformer: |
|
"""Handles data transformation and dimensional modeling.""" |
|
|
|
def __init__(self, conn: duckdb.DuckDBPyConnection, logger: logging.Logger): |
|
""" |
|
Initialize data transformer. |
|
|
|
Args: |
|
conn: DuckDB connection object. |
|
logger: Logger instance for tracking operations. |
|
""" |
|
self.conn = conn |
|
self.logger = logger |
|
|
|
def create_combined_table(self) -> None: |
|
"""Create a combined table from all raw tables in DuckDB.""" |
|
self.logger.info("Creating combined table in DuckDB...") |
|
|
|
|
|
raw_tables = self.conn.execute(""" |
|
SELECT table_name |
|
FROM information_schema.tables |
|
WHERE table_name LIKE 'raw_%' |
|
""").fetchall() |
|
|
|
if not raw_tables: |
|
raise ValueError("No raw tables found") |
|
|
|
|
|
union_parts = [f"SELECT * FROM {table_info[0]}" for table_info in raw_tables] |
|
union_query = " UNION ALL ".join(union_parts) |
|
|
|
|
|
self.conn.execute(f""" |
|
CREATE TABLE combined_data AS |
|
{union_query} |
|
""") |
|
|
|
count = self.conn.execute("SELECT COUNT(*) FROM combined_data").fetchone()[0] |
|
self.logger.info(f"Created combined table with {count:,} records") |
|
|
|
def remove_columns_with_missing_data(self, table_name: str, threshold: float = 0.8) -> List[str]: |
|
""" |
|
Remove columns with high missing data directly in DuckDB. |
|
|
|
Args: |
|
table_name: Name of the table to clean. |
|
threshold: Threshold for missing data ratio (0.0 to 1.0). |
|
|
|
Returns: |
|
List of columns that were kept. |
|
""" |
|
self.logger.info(f"Removing columns with >{threshold*100}% missing data from {table_name}...") |
|
|
|
total_rows = self.conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0] |
|
columns_info = self.conn.execute(f"PRAGMA table_info('{table_name}')").fetchall() |
|
|
|
columns_to_keep = [] |
|
columns_removed = [] |
|
|
|
for col_info in columns_info: |
|
col_name = col_info[1] |
|
col_type = col_info[2] |
|
|
|
try: |
|
|
|
if col_type.upper() in ['INTEGER', 'BIGINT', 'DOUBLE', 'FLOAT', 'DECIMAL', 'NUMERIC']: |
|
|
|
non_null_count = self.conn.execute(f""" |
|
SELECT COUNT(*) |
|
FROM {table_name} |
|
WHERE "{col_name}" IS NOT NULL |
|
""").fetchone()[0] |
|
else: |
|
|
|
non_null_count = self.conn.execute(f""" |
|
SELECT COUNT(*) |
|
FROM {table_name} |
|
WHERE "{col_name}" IS NOT NULL |
|
AND TRIM(CAST("{col_name}" AS VARCHAR)) != '' |
|
""").fetchone()[0] |
|
|
|
missing_ratio = 1 - (non_null_count / total_rows) |
|
|
|
self.logger.debug(f"Column {col_name}: {non_null_count}/{total_rows} non-null ({missing_ratio:.2%} missing)") |
|
|
|
if missing_ratio <= threshold: |
|
columns_to_keep.append(col_name) |
|
else: |
|
columns_removed.append(col_name) |
|
self.logger.info(f"Removing column {col_name} with {missing_ratio:.2%} missing data") |
|
|
|
except Exception as e: |
|
self.logger.warning(f"Error processing column {col_name}: {e}") |
|
|
|
columns_to_keep.append(col_name) |
|
|
|
if columns_removed: |
|
self.logger.info(f"Removing {len(columns_removed)} columns with high missing data") |
|
self._recreate_table_with_columns(table_name, columns_to_keep) |
|
|
|
return columns_to_keep |
|
|
|
def _recreate_table_with_columns(self, table_name: str, columns_to_keep: List[str]) -> None: |
|
""" |
|
Recreate table with only specified columns. |
|
|
|
Args: |
|
table_name: Original table name. |
|
columns_to_keep: List of column names to retain. |
|
""" |
|
columns_str = ', '.join(columns_to_keep) |
|
self.conn.execute(f""" |
|
CREATE TABLE {table_name}_clean AS |
|
SELECT {columns_str} |
|
FROM {table_name} |
|
""") |
|
|
|
self.conn.execute(f"DROP TABLE {table_name}") |
|
self.conn.execute(f"ALTER TABLE {table_name}_clean RENAME TO {table_name}") |
|
|
|
|
|
class DimensionalModeler: |
|
"""Creates dimensional model tables for analytics.""" |
|
|
|
def __init__(self, conn: duckdb.DuckDBPyConnection, logger: logging.Logger): |
|
""" |
|
Initialize dimensional modeler. |
|
|
|
Args: |
|
conn: DuckDB connection object. |
|
logger: Logger instance for tracking operations. |
|
""" |
|
self.conn = conn |
|
self.logger = logger |
|
|
|
def create_all_dimensions(self) -> None: |
|
"""Create all dimension tables.""" |
|
self._prepare_cleaned_data() |
|
self._create_beneficiary_dimension() |
|
self._create_employer_dimension() |
|
self._create_job_dimension() |
|
self._create_agent_dimension() |
|
self._create_status_dimension() |
|
self._create_date_dimension() |
|
|
|
def _prepare_cleaned_data(self) -> None: |
|
"""Prepare cleaned data table for dimension creation.""" |
|
self.logger.info("Preparing cleaned data...") |
|
|
|
columns_info = self.conn.execute("PRAGMA table_info('combined_data')").fetchall() |
|
available_columns = [col[1] for col in columns_info] |
|
self.logger.info(f"Available columns in combined_data: {available_columns}") |
|
self.conn.execute(""" |
|
CREATE TABLE cleaned_data AS |
|
SELECT |
|
ROW_NUMBER() OVER () as original_row_id, |
|
* |
|
FROM combined_data |
|
""") |
|
|
|
def _create_beneficiary_dimension(self) -> None: |
|
"""Create beneficiary dimension table.""" |
|
self.logger.info("Creating dim_beneficiary...") |
|
self.conn.execute(""" |
|
CREATE TABLE dim_beneficiary AS |
|
SELECT DISTINCT |
|
ROW_NUMBER() OVER () as beneficiary_key, |
|
MD5(CONCAT( |
|
COALESCE(country_of_birth, ''), '|', |
|
COALESCE(country_of_nationality, ''), '|', |
|
COALESCE(CAST(ben_year_of_birth AS VARCHAR), ''), '|', |
|
COALESCE(gender, '') |
|
)) as beneficiary_id, |
|
country_of_birth, |
|
country_of_nationality, |
|
ben_year_of_birth, |
|
gender, |
|
ben_sex, |
|
ben_country_of_birth, |
|
ben_current_class, |
|
ben_education_code, |
|
ed_level_definition, |
|
ben_pfield_of_study |
|
FROM cleaned_data |
|
WHERE country_of_birth IS NOT NULL |
|
OR country_of_nationality IS NOT NULL |
|
OR ben_year_of_birth IS NOT NULL |
|
OR gender IS NOT NULL |
|
""") |
|
|
|
def _create_employer_dimension(self) -> None: |
|
"""Create employer dimension table.""" |
|
self.logger.info("Creating dim_employer...") |
|
self.conn.execute(""" |
|
CREATE TABLE dim_employer AS |
|
SELECT DISTINCT |
|
ROW_NUMBER() OVER () as employer_key, |
|
MD5(CONCAT( |
|
COALESCE(employer_name, ''), '|', |
|
COALESCE(fein, '') |
|
)) as employer_id, |
|
employer_name, |
|
fein, |
|
mail_addr, |
|
city, |
|
state, |
|
zip |
|
FROM cleaned_data |
|
WHERE employer_name IS NOT NULL OR fein IS NOT NULL |
|
""") |
|
|
|
def _create_job_dimension(self) -> None: |
|
"""Create job dimension table.""" |
|
self.logger.info("Creating dim_job...") |
|
self.conn.execute(""" |
|
CREATE TABLE dim_job AS |
|
SELECT DISTINCT |
|
ROW_NUMBER() OVER () as job_key, |
|
MD5(CONCAT( |
|
COALESCE(job_title, ''), '|', |
|
COALESCE(naics_code, '') |
|
)) as job_id, |
|
job_title, |
|
dot_code, |
|
naics_code, |
|
wage_amt, |
|
wage_unit, |
|
full_time_ind, |
|
ben_comp_paid, |
|
worksite_city, |
|
worksite_state |
|
FROM cleaned_data |
|
WHERE job_title IS NOT NULL OR naics_code IS NOT NULL |
|
""") |
|
|
|
def _create_agent_dimension(self) -> None: |
|
"""Create agent dimension table.""" |
|
self.logger.info("Creating dim_agent...") |
|
self.conn.execute(""" |
|
CREATE TABLE dim_agent AS |
|
SELECT DISTINCT |
|
ROW_NUMBER() OVER () as agent_key, |
|
MD5(CONCAT( |
|
COALESCE(agent_first_name, ''), '|', |
|
COALESCE(agent_last_name, '') |
|
)) as agent_id, |
|
agent_first_name, |
|
agent_last_name |
|
FROM cleaned_data |
|
WHERE agent_first_name IS NOT NULL OR agent_last_name IS NOT NULL |
|
""") |
|
|
|
def _create_status_dimension(self) -> None: |
|
"""Create status dimension table.""" |
|
self.logger.info("Creating dim_status...") |
|
self.conn.execute(""" |
|
CREATE TABLE dim_status AS |
|
SELECT DISTINCT |
|
ROW_NUMBER() OVER () as status_key, |
|
status_type, |
|
first_decision |
|
FROM cleaned_data |
|
WHERE status_type IS NOT NULL OR first_decision IS NOT NULL |
|
""") |
|
|
|
def _create_date_dimension(self) -> None: |
|
"""Create date dimension table.""" |
|
self.logger.info("Creating dim_date...") |
|
self.conn.execute(""" |
|
CREATE TABLE dim_date AS |
|
WITH all_dates AS ( |
|
-- Handle MM/DD/YYYY format |
|
SELECT TRY_STRPTIME(rec_date, '%m/%d/%Y') as date_value |
|
FROM cleaned_data |
|
WHERE rec_date IS NOT NULL |
|
AND rec_date NOT LIKE '%(%' |
|
AND LENGTH(rec_date) >= 8 |
|
AND rec_date ~ '^[0-9/-]+$' |
|
AND TRY_STRPTIME(rec_date, '%m/%d/%Y') IS NOT NULL |
|
|
|
UNION |
|
|
|
-- Handle YYYY-MM-DD format |
|
SELECT TRY_STRPTIME(rec_date, '%Y-%m-%d') as date_value |
|
FROM cleaned_data |
|
WHERE rec_date IS NOT NULL |
|
AND rec_date NOT LIKE '%(%' |
|
AND LENGTH(rec_date) >= 8 |
|
AND rec_date ~ '^[0-9-]+$' |
|
AND TRY_STRPTIME(rec_date, '%Y-%m-%d') IS NOT NULL |
|
|
|
UNION |
|
|
|
-- Handle first_decision_date MM/DD/YYYY format |
|
SELECT TRY_STRPTIME(first_decision_date, '%m/%d/%Y') as date_value |
|
FROM cleaned_data |
|
WHERE first_decision_date IS NOT NULL |
|
AND first_decision_date NOT LIKE '%(%' |
|
AND LENGTH(first_decision_date) >= 8 |
|
AND first_decision_date ~ '^[0-9/-]+$' |
|
AND TRY_STRPTIME(first_decision_date, '%m/%d/%Y') IS NOT NULL |
|
|
|
UNION |
|
|
|
-- Handle first_decision_date YYYY-MM-DD format |
|
SELECT TRY_STRPTIME(first_decision_date, '%Y-%m-%d') as date_value |
|
FROM cleaned_data |
|
WHERE first_decision_date IS NOT NULL |
|
AND first_decision_date NOT LIKE '%(%' |
|
AND LENGTH(first_decision_date) >= 8 |
|
AND first_decision_date ~ '^[0-9-]+$' |
|
AND TRY_STRPTIME(first_decision_date, '%Y-%m-%d') IS NOT NULL |
|
|
|
UNION |
|
|
|
-- Handle valid_from MM/DD/YYYY format |
|
SELECT TRY_STRPTIME(valid_from, '%m/%d/%Y') as date_value |
|
FROM cleaned_data |
|
WHERE valid_from IS NOT NULL |
|
AND valid_from NOT LIKE '%(%' |
|
AND LENGTH(valid_from) >= 8 |
|
AND valid_from ~ '^[0-9/-]+$' |
|
AND TRY_STRPTIME(valid_from, '%m/%d/%Y') IS NOT NULL |
|
|
|
UNION |
|
|
|
-- Handle valid_from YYYY-MM-DD format |
|
SELECT TRY_STRPTIME(valid_from, '%Y-%m-%d') as date_value |
|
FROM cleaned_data |
|
WHERE valid_from IS NOT NULL |
|
AND valid_from NOT LIKE '%(%' |
|
AND LENGTH(valid_from) >= 8 |
|
AND valid_from ~ '^[0-9-]+$' |
|
AND TRY_STRPTIME(valid_from, '%Y-%m-%d') IS NOT NULL |
|
|
|
UNION |
|
|
|
-- Handle valid_to MM/DD/YYYY format |
|
SELECT TRY_STRPTIME(valid_to, '%m/%d/%Y') as date_value |
|
FROM cleaned_data |
|
WHERE valid_to IS NOT NULL |
|
AND valid_to NOT LIKE '%(%' |
|
AND LENGTH(valid_to) >= 8 |
|
AND valid_to ~ '^[0-9/]+$' |
|
AND valid_to LIKE '%/%/%' |
|
AND TRY_STRPTIME(valid_to, '%m/%d/%Y') IS NOT NULL |
|
|
|
UNION |
|
|
|
-- Handle valid_to YYYY-MM-DD format |
|
SELECT TRY_STRPTIME(valid_to, '%Y-%m-%d') as date_value |
|
FROM cleaned_data |
|
WHERE valid_to IS NOT NULL |
|
AND valid_to NOT LIKE '%(%' |
|
AND LENGTH(valid_to) >= 8 |
|
AND valid_to ~ '^[0-9-]+$' |
|
AND TRY_STRPTIME(valid_to, '%Y-%m-%d') IS NOT NULL |
|
) |
|
SELECT DISTINCT |
|
date_value as date, |
|
EXTRACT(YEAR FROM date_value) as year, |
|
EXTRACT(MONTH FROM date_value) as month, |
|
EXTRACT(QUARTER FROM date_value) as quarter, |
|
EXTRACT(DOW FROM date_value) as day_of_week, |
|
MONTHNAME(date_value) as month_name, |
|
'Q' || CAST(EXTRACT(QUARTER FROM date_value) AS VARCHAR) as quarter_name, |
|
CASE |
|
WHEN EXTRACT(MONTH FROM date_value) >= 10 |
|
THEN EXTRACT(YEAR FROM date_value) |
|
ELSE EXTRACT(YEAR FROM date_value) - 1 |
|
END as fiscal_year |
|
FROM all_dates |
|
WHERE date_value IS NOT NULL |
|
ORDER BY date_value |
|
""") |
|
|
|
def create_fact_table(self) -> None: |
|
"""Create the fact table with foreign keys.""" |
|
self.logger.info("Creating fact table in DuckDB...") |
|
|
|
self.conn.execute(""" |
|
CREATE TABLE fact_h1b_applications AS |
|
SELECT |
|
ROW_NUMBER() OVER () as record_id, |
|
|
|
COALESCE(db.beneficiary_key, -1) as beneficiary_key, |
|
COALESCE(de.employer_key, -1) as employer_key, |
|
COALESCE(dj.job_key, -1) as job_key, |
|
COALESCE(da.agent_key, -1) as agent_key, |
|
COALESCE(ds.status_key, -1) as status_key, |
|
|
|
-- Handle multiple date formats for rec_date |
|
CASE |
|
WHEN cd.rec_date IS NOT NULL AND cd.rec_date NOT LIKE '%(%' |
|
THEN CASE |
|
WHEN TRY_STRPTIME(cd.rec_date, '%m/%d/%Y') IS NOT NULL |
|
THEN CAST(STRFTIME('%Y%m%d', TRY_STRPTIME(cd.rec_date, '%m/%d/%Y')) AS INTEGER) |
|
WHEN TRY_STRPTIME(cd.rec_date, '%Y-%m-%d') IS NOT NULL |
|
THEN CAST(STRFTIME('%Y%m%d', TRY_STRPTIME(cd.rec_date, '%Y-%m-%d')) AS INTEGER) |
|
ELSE NULL |
|
END |
|
ELSE NULL |
|
END as rec_date_key, |
|
|
|
-- Handle multiple date formats for first_decision_date |
|
CASE |
|
WHEN cd.first_decision_date IS NOT NULL AND cd.first_decision_date NOT LIKE '%(%' |
|
THEN CASE |
|
WHEN TRY_STRPTIME(cd.first_decision_date, '%m/%d/%Y') IS NOT NULL |
|
THEN CAST(STRFTIME('%Y%m%d', TRY_STRPTIME(cd.first_decision_date, '%m/%d/%Y')) AS INTEGER) |
|
WHEN TRY_STRPTIME(cd.first_decision_date, '%Y-%m-%d') IS NOT NULL |
|
THEN CAST(STRFTIME('%Y%m%d', TRY_STRPTIME(cd.first_decision_date, '%Y-%m-%d')) AS INTEGER) |
|
ELSE NULL |
|
END |
|
ELSE NULL |
|
END as first_decision_date_key, |
|
|
|
cd.lottery_year, |
|
cd.ben_multi_reg_ind, |
|
cd.receipt_number, |
|
cd.source_file, |
|
cd.fiscal_year |
|
|
|
FROM cleaned_data cd |
|
|
|
LEFT JOIN dim_beneficiary db ON |
|
cd.original_row_id = db.beneficiary_key AND |
|
COALESCE(cd.country_of_birth, '') = COALESCE(db.country_of_birth, '') AND |
|
COALESCE(cd.country_of_nationality, '') = COALESCE(db.country_of_nationality, '') AND |
|
COALESCE(cd.ben_year_of_birth, '0') = COALESCE(db.ben_year_of_birth, '0') AND |
|
COALESCE(cd.gender, '') = COALESCE(db.gender, '') |
|
|
|
LEFT JOIN dim_employer de ON |
|
cd.original_row_id = de.employer_key AND |
|
COALESCE(cd.employer_name, '') = COALESCE(de.employer_name, '') AND |
|
COALESCE(cd.fein, '') = COALESCE(de.fein, '') |
|
|
|
LEFT JOIN dim_job dj ON |
|
cd.original_row_id = dj.job_key AND |
|
COALESCE(cd.job_title, '') = COALESCE(dj.job_title, '') AND |
|
COALESCE(cd.naics_code, '') = COALESCE(dj.naics_code, '') |
|
|
|
LEFT JOIN dim_agent da ON |
|
cd.original_row_id = da.agent_key AND |
|
COALESCE(cd.agent_first_name, '') = COALESCE(da.agent_first_name, '') AND |
|
COALESCE(cd.agent_last_name, '') = COALESCE(da.agent_last_name, '') |
|
|
|
LEFT JOIN dim_status ds ON |
|
cd.original_row_id = ds.status_key AND |
|
COALESCE(cd.status_type, '') = COALESCE(ds.status_type, '') AND |
|
COALESCE(cd.first_decision, '') = COALESCE(ds.first_decision, '') |
|
""") |
|
|
|
def create_lookup_tables(self) -> None: |
|
"""Create lookup tables for reference data.""" |
|
self.logger.info("Creating lookup tables in DuckDB...") |
|
|
|
|
|
self.conn.execute(""" |
|
CREATE TABLE lookup_country_codes AS |
|
SELECT * FROM VALUES |
|
('IND', 'India', 'Asia'), |
|
('CHN', 'China', 'Asia'), |
|
('KOR', 'South Korea', 'Asia'), |
|
('CAN', 'Canada', 'North America'), |
|
('NPL', 'Nepal', 'Asia'), |
|
('USA', 'United States', 'North America') |
|
AS t(country_code, country_name, region) |
|
""") |
|
|
|
|
|
self.conn.execute(""" |
|
CREATE TABLE lookup_education_levels AS |
|
SELECT * FROM VALUES |
|
('A', 'No Diploma', 'Basic'), |
|
('B', 'High School', 'Basic'), |
|
('C', 'Some College', 'Undergraduate'), |
|
('D', 'College No Degree', 'Undergraduate'), |
|
('E', 'Associates', 'Undergraduate'), |
|
('F', 'Bachelors', 'Undergraduate'), |
|
('G', 'Masters', 'Graduate'), |
|
('H', 'Professional', 'Graduate'), |
|
('I', 'Doctorate', 'Graduate') |
|
AS t(education_code, education_level, education_category) |
|
""") |
|
|
|
|
|
self.conn.execute(""" |
|
CREATE TABLE lookup_status_types AS |
|
SELECT * FROM VALUES |
|
('ELIGIBLE', 'Application is eligible for lottery', 'Lottery'), |
|
('SELECTED', 'Selected in H-1B lottery', 'Lottery'), |
|
('CREATED', 'Application record created', 'Administrative') |
|
AS t(status_type, status_description, status_category) |
|
""") |
|
|
|
|
|
class DatabaseOptimizer: |
|
"""Handles database optimization and indexing.""" |
|
|
|
def __init__(self, conn: duckdb.DuckDBPyConnection, logger: logging.Logger): |
|
""" |
|
Initialize database optimizer. |
|
|
|
Args: |
|
conn: DuckDB connection object. |
|
logger: Logger instance for tracking operations. |
|
""" |
|
self.conn = conn |
|
self.logger = logger |
|
|
|
def create_indexes(self) -> None: |
|
"""Create indexes for better query performance.""" |
|
self.logger.info("Creating indexes in DuckDB...") |
|
|
|
indexes = [ |
|
("idx_fact_beneficiary", "fact_h1b_applications", "beneficiary_key"), |
|
("idx_fact_employer", "fact_h1b_applications", "employer_key"), |
|
("idx_fact_job", "fact_h1b_applications", "job_key"), |
|
("idx_fact_lottery_year", "fact_h1b_applications", "lottery_year"), |
|
("idx_fact_fiscal_year", "fact_h1b_applications", "fiscal_year"), |
|
("idx_fact_rec_date", "fact_h1b_applications", "rec_date_key"), |
|
("idx_dim_beneficiary_id", "dim_beneficiary", "beneficiary_id"), |
|
("idx_dim_employer_id", "dim_employer", "employer_id"), |
|
("idx_dim_job_id", "dim_job", "job_id"), |
|
] |
|
|
|
for index_name, table_name, column_name in indexes: |
|
try: |
|
self.conn.execute(f"CREATE INDEX {index_name} ON {table_name}({column_name})") |
|
except Exception as e: |
|
self.logger.warning(f"Could not create index {index_name}: {e}") |
|
|
|
self.logger.info("Indexes created successfully!") |
|
|
|
|
|
class DataQualityChecker: |
|
"""Performs data quality checks and validation.""" |
|
|
|
def __init__(self, conn: duckdb.DuckDBPyConnection, logger: logging.Logger): |
|
""" |
|
Initialize data quality checker. |
|
|
|
Args: |
|
conn: DuckDB connection object. |
|
logger: Logger instance for tracking operations. |
|
""" |
|
self.conn = conn |
|
self.logger = logger |
|
|
|
def run_all_checks(self) -> bool: |
|
""" |
|
Run all data quality checks. |
|
|
|
Returns: |
|
True if all checks pass, False otherwise. |
|
""" |
|
self.logger.info("Running data quality checks...") |
|
|
|
try: |
|
self._check_table_counts() |
|
self._check_fact_table_integrity() |
|
return True |
|
except Exception as e: |
|
self.logger.error(f"Error in data quality checks: {e}") |
|
return False |
|
|
|
def _check_table_counts(self) -> None: |
|
"""Check row counts for all tables.""" |
|
tables_query = """ |
|
SELECT table_name, estimated_size as row_count |
|
FROM duckdb_tables() |
|
WHERE schema_name = 'main' |
|
ORDER BY table_name |
|
""" |
|
tables_info = self.conn.execute(tables_query).fetchall() |
|
|
|
self.logger.info("Table row counts:") |
|
for table_name, _ in tables_info: |
|
if not table_name.startswith('raw_'): |
|
count = self.conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0] |
|
self.logger.info(f" {table_name}: {count:,} records") |
|
|
|
def _check_fact_table_integrity(self) -> None: |
|
"""Check fact table for duplicates and integrity.""" |
|
dup_check = self.conn.execute(""" |
|
SELECT COUNT(*) as total_records, |
|
COUNT(DISTINCT record_id) as unique_records |
|
FROM fact_h1b_applications |
|
""").fetchone() |
|
|
|
self.logger.info(f"Fact table: {dup_check[0]:,} total records, {dup_check[1]:,} unique records") |
|
|
|
|
|
class DatabasePersistence: |
|
"""Handles database persistence operations.""" |
|
|
|
def __init__(self, logger: logging.Logger): |
|
""" |
|
Initialize database persistence handler. |
|
|
|
Args: |
|
logger: Logger instance for tracking operations. |
|
""" |
|
self.logger = logger |
|
|
|
def save_to_persistent_database(self, source_conn: duckdb.DuckDBPyConnection, |
|
target_path: str) -> None: |
|
""" |
|
Save tables to a persistent database file. |
|
|
|
Args: |
|
source_conn: Source database connection. |
|
target_path: Path to the target persistent database file. |
|
""" |
|
self.logger.info(f"Saving to persistent database: {target_path}") |
|
|
|
|
|
if os.path.exists(target_path): |
|
os.remove(target_path) |
|
self.logger.info(f"Removed existing database file: {target_path}") |
|
|
|
|
|
with duckdb.connect(target_path) as persistent_conn: |
|
|
|
tables_to_keep = source_conn.execute(""" |
|
SELECT table_name |
|
FROM information_schema.tables |
|
WHERE table_name NOT LIKE 'raw_%' |
|
AND table_name NOT IN ('combined_data', 'cleaned_data') |
|
AND table_schema = 'main' |
|
""").fetchall() |
|
|
|
|
|
for table_info in tables_to_keep: |
|
table_name = table_info[0] |
|
df = source_conn.execute(f"SELECT * FROM {table_name}").df() |
|
persistent_conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df") |
|
self.logger.info(f"Copied table {table_name} to persistent database") |
|
|
|
self.logger.info(f"Persistent database saved to: {target_path}") |
|
|
|
|
|
|
|
class Config: |
|
"""Configuration class for the H1B data pipeline.""" |
|
|
|
CSV_FILES = [ |
|
'./data/TRK_13139_FY2021.csv', |
|
'./data/TRK_13139_FY2022.csv', |
|
'./data/TRK_13139_FY2023.csv', |
|
'./data/TRK_13139_FY2024_single_reg.csv', |
|
'./data/TRK_13139_FY2024_multi_reg.csv' |
|
] |
|
|
|
XLSX_FILE = './data/TRK_13139_I129_H1B_Registrations_FY21_FY24_FOIA_FIN.xlsx' |
|
PERSISTENT_DB_PATH = './data/h1bs_analytics.duckdb' |
|
MISSING_DATA_THRESHOLD = 0.99 |
|
|
|
|
|
def main(): |
|
"""Main execution function for the H1B data pipeline.""" |
|
print("Starting H1B Data Analytics Pipeline...") |
|
print("All imports successful!") |
|
|
|
|
|
MemoryManager.check_memory_usage() |
|
|
|
|
|
existing_files, missing_files = FileValidator.validate_files(Config.CSV_FILES) |
|
if missing_files: |
|
print(f"Warning: {len(missing_files)} files are missing") |
|
|
|
|
|
try: |
|
with H1BDataPipeline() as pipeline: |
|
|
|
data_loader = DataLoader(pipeline.conn, pipeline.logger) |
|
data_loader.load_csv_files(existing_files) |
|
|
|
|
|
transformer = DataTransformer(pipeline.conn, pipeline.logger) |
|
transformer.create_combined_table() |
|
kept_columns = transformer.remove_columns_with_missing_data( |
|
'combined_data', Config.MISSING_DATA_THRESHOLD |
|
) |
|
print(f"Kept {len(kept_columns)} columns after cleaning") |
|
|
|
|
|
modeler = DimensionalModeler(pipeline.conn, pipeline.logger) |
|
modeler.create_all_dimensions() |
|
modeler.create_fact_table() |
|
modeler.create_lookup_tables() |
|
|
|
|
|
optimizer = DatabaseOptimizer(pipeline.conn, pipeline.logger) |
|
optimizer.create_indexes() |
|
|
|
|
|
quality_checker = DataQualityChecker(pipeline.conn, pipeline.logger) |
|
quality_checker.run_all_checks() |
|
|
|
|
|
persistence = DatabasePersistence(pipeline.logger) |
|
persistence.save_to_persistent_database( |
|
pipeline.conn, Config.PERSISTENT_DB_PATH |
|
) |
|
|
|
|
|
MemoryManager.check_memory_usage() |
|
MemoryManager.clear_memory() |
|
|
|
print("Pipeline completed successfully!") |
|
|
|
except Exception as e: |
|
print(f"Pipeline failed with error: {e}") |
|
traceback.print_exc() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |