Spaces:
Runtime error
Runtime error
| """icij_utils.py | |
| Building an SQL agent over the ICIJ financial data leaks files. | |
| :author: Didier Guillevic | |
| :date: 2025-01-12 | |
| """ | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| import pandas as pd | |
| import sqlite3 | |
| import os | |
| from pathlib import Path | |
| from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, Float | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import sessionmaker | |
| class ICIJDataLoader: | |
| def __init__(self, db_path='icij_data.db'): | |
| """Initialize the data loader with database path.""" | |
| self.db_path = db_path | |
| self.table_mappings = { | |
| 'nodes-addresses.csv': 'addresses', | |
| 'nodes-entities.csv': 'entities', | |
| 'nodes-intermediaries.csv': 'intermediaries', | |
| 'nodes-officers.csv': 'officers', | |
| 'nodes-others.csv': 'others', | |
| 'relationships.csv': 'relationships' | |
| } | |
| def create_connection(self): | |
| """Create a database connection.""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| return conn | |
| except sqlite3.Error as e: | |
| print(f"Error connecting to database: {e}") | |
| return None | |
| def create_table_from_csv(self, csv_path, table_name, conn): | |
| """Create a table based on CSV structure and load data.""" | |
| try: | |
| # Read the first few rows to get column names and types | |
| df = pd.read_csv(csv_path, nrows=5) | |
| # Create table with appropriate columns | |
| columns = [] | |
| for col in df.columns: | |
| # Determine SQLite type based on pandas dtype | |
| dtype = df[col].dtype | |
| if 'int' in str(dtype): | |
| sql_type = 'INTEGER' | |
| elif 'float' in str(dtype): | |
| sql_type = 'REAL' | |
| else: | |
| sql_type = 'TEXT' | |
| columns.append(f'"{col}" {sql_type}') | |
| # Create table | |
| create_table_sql = f'''CREATE TABLE IF NOT EXISTS {table_name} | |
| ({', '.join(columns)})''' | |
| conn.execute(create_table_sql) | |
| # Load data in chunks to handle large files | |
| chunksize = 10000 | |
| for chunk in pd.read_csv(csv_path, chunksize=chunksize): | |
| chunk.to_sql(table_name, conn, if_exists='append', index=False) | |
| print(f"Successfully loaded {table_name}") | |
| return True | |
| except Exception as e: | |
| print(f"Error processing {csv_path}: {e}") | |
| return False | |
| def load_all_files(self, data_directory): | |
| """Load all recognized CSV files from the directory into SQLite.""" | |
| conn = self.create_connection() | |
| if not conn: | |
| return False | |
| try: | |
| data_path = Path(data_directory) | |
| files_processed = 0 | |
| for csv_file, table_name in self.table_mappings.items(): | |
| file_path = data_path / csv_file | |
| if file_path.exists(): | |
| print(f"Processing {csv_file}...") | |
| if self.create_table_from_csv(file_path, table_name, conn): | |
| files_processed += 1 | |
| # Create indexes for better query performance | |
| self.create_indexes(conn) | |
| conn.commit() | |
| print(f"Successfully processed {files_processed} files") | |
| return True | |
| except Exception as e: | |
| print(f"Error during data loading: {e}") | |
| return False | |
| finally: | |
| conn.close() | |
| def create_indexes(self, conn): | |
| """Create indexes for better query performance.""" | |
| index_definitions = [ | |
| 'CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name)', | |
| 'CREATE INDEX IF NOT EXISTS idx_officers_name ON officers(name)', | |
| 'CREATE INDEX IF NOT EXISTS idx_relationships_from ON relationships(node_id_start)', | |
| 'CREATE INDEX IF NOT EXISTS idx_relationships_to ON relationships(node_id_end)' | |
| ] | |
| for index_sql in index_definitions: | |
| try: | |
| conn.execute(index_sql) | |
| except sqlite3.Error as e: | |
| print(f"Error creating index: {e}") | |
| class ICIJDatabaseConnector: | |
| def __init__(self, db_path='icij_leaks.db'): | |
| # Create the SQLAlchemy engine | |
| self.engine = create_engine(f'sqlite:///{db_path}', echo=False) | |
| # Create declarative base | |
| self.Base = declarative_base() | |
| # Create session factory | |
| self.Session = sessionmaker(bind=self.engine) | |
| # Initialize metadata | |
| self.metadata = MetaData() | |
| # Reflect existing tables | |
| self.metadata.reflect(bind=self.engine) | |
| def get_engine(self): | |
| """Return the SQLAlchemy engine.""" | |
| return self.engine | |
| def get_session(self): | |
| """Create and return a new session.""" | |
| return self.Session() | |
| def get_table(self, table_name): | |
| """Get a table by name from the metadata.""" | |
| return self.metadata.tables.get(table_name) | |
| def list_tables(self): | |
| """List all available tables in the database.""" | |
| return list(self.metadata.tables.keys()) | |
| def get_table_schema(self, table_name): | |
| """Get column names and their types for a specific table.""" | |
| table = self.get_table(table_name) | |
| if table is not None: | |
| return {column.name: str(column.type) for column in table.columns} | |
| return {} | |
| def get_full_database_schema(self): | |
| """Get the schema for all tables in the database.""" | |
| schema = {} | |
| for table_name in self.list_tables(): | |
| schema[table_name] = self.get_table_schema(table_name) | |
| return schema | |
| def get_table_columns(self, table_name): | |
| """Get column names for a specific table.""" | |
| table = self.get_table(table_name) | |
| if table is not None: | |
| return [column.name for column in table.columns] | |
| return [] | |
| def query_table(self, table_name, limit=1): | |
| """Execute a simple query on a table.""" | |
| table = self.get_table(table_name) | |
| if table is not None: | |
| stmt = select(table).limit(limit) | |
| with self.engine.connect() as connection: | |
| result = connection.execute(stmt) | |
| return [dict(row) for row in result] | |
| return [] | |
| class ICIJDatabaseMetadata: | |
| """Holds detailed documentation about the ICIJ database structure.""" | |
| # Comprehensive table documentation | |
| TABLE_DOCS = { | |
| 'entities': ( | |
| "Contains information about companies, trusts, and other entities mentioned in the leaks. " | |
| "These are typically offshore entities created in tax havens." | |
| ), | |
| 'officers': ( | |
| "Contains information about people or organizations connected to offshore entities. " | |
| "Officers can be directors, shareholders, beneficiaries, or have other roles." | |
| ), | |
| 'intermediaries': ( | |
| "Contains information about professional firms that help create and manage offshore entities. " | |
| "These are typically law firms, banks, or corporate service providers." | |
| ), | |
| 'addresses': ( | |
| "Contains physical address information connected to entities, officers, or intermediaries. " | |
| "Addresses can be shared between multiple parties." | |
| ), | |
| 'others': ( | |
| "Contains information about miscellaneous parties that don't fit into other categories. " | |
| "This includes vessel names, legal cases, events, and other related parties mentioned " | |
| "in the leaks that aren't classified as entities, officers, or intermediaries." | |
| ), | |
| 'relationships': ( | |
| "Defines connections between different nodes (entities, officers, intermediaries) in the database. " | |
| "Shows how different parties are connected to each other." | |
| ) | |
| } | |
| # Detailed column documentation for each table | |
| COLUMN_DOCS = { | |
| 'entities': { | |
| 'name': "Legal name of the offshore entity", | |
| 'original_name': "Name in original language/character set", | |
| 'former_name': "Previous names of the entity", | |
| #'jurisdiction': "Country/region where the entity is registered", | |
| 'jurisdiction_description': "Detailed description of the jurisdiction", | |
| 'company_type': "Legal structure of the entity (e.g., corporation, trust)", | |
| 'address': "Primary registered address", | |
| 'internal_id': "Unique identifier within the leak data", | |
| 'incorporation_date': "Date when the entity was created", | |
| 'inactivation_date': "Date when the entity became inactive", | |
| 'struck_off_date': "Date when entity was struck from register", | |
| 'dorm_date': "Date when entity became dormant", | |
| 'status': "Current status of the entity", | |
| 'service_provider': "Firm that provided offshore services", | |
| 'country_codes': '3 letter abbreviations of country names', | |
| 'countries': 'name of country', | |
| 'sourceID': "Identifier for the leak source" | |
| }, | |
| 'others': { | |
| 'name': "Name of the miscellaneous party or item", | |
| 'type': "Type of the other party (e.g., vessel, legal case)", | |
| 'incorporation_date': "Date of incorporation or creation if applicable", | |
| 'jurisdiction': "2 letter code of the Jurisdiction associated with the party", | |
| 'jurisdiction-description': 'full name of the jurisdiction', | |
| 'countries': "Countries associated with the party", | |
| 'status': "Current status", | |
| 'internal_id': "Unique identifier within the leak data", | |
| 'address': "Associated address if available", | |
| 'sourceID': "Identifier for the leak source", | |
| 'valid_until': "Date until which the information is valid" | |
| }, | |
| 'officers': { | |
| 'name': "Name of the individual or organization", | |
| 'countries': 'full name of the country connected to the officer', | |
| 'country_codes': "3 letter code of the countries connected to the officer", | |
| 'sourceID': "Identifier for the leak source", | |
| 'valid_until': "Date until which the information is valid", | |
| }, | |
| 'intermediaries': { | |
| 'name': "Name of the professional firm", | |
| 'internal_id': "Unique identifier within the leak data", | |
| 'address': "Business address", | |
| 'status': "Current status", | |
| 'countries': "Countries where intermediary operates", | |
| 'country_codes': "3 letter abbreviations of the countries where intermediary operates", | |
| 'sourceID': "Identifier for the leak source" | |
| }, | |
| 'addresses': { | |
| 'address': "Full address text", | |
| 'name': "Name associated with address", | |
| 'country_codes': "3 letter country codes for the address", | |
| 'countries': "Full country names", | |
| 'sourceID': "Identifier for the leak source", | |
| 'valid_until': "Date until which address is valid", | |
| }, | |
| 'relationships': { | |
| 'node_id_start': "Internal ID of the source node", | |
| 'node_id_end': "Internal ID of the target node", | |
| 'rel_type': "Type of relationship (e.g., shareholder, director)", | |
| 'link': "Additional details about the relationship", | |
| 'start_date': "When the relationship began", | |
| 'end_date': "When the relationship ended", | |
| 'sourceID': "Identifier for the leak source", | |
| 'status': "Current status of the relationship" | |
| } | |
| } | |
| # Source documentation | |
| SOURCE_IDS = { | |
| "PANAMA_PAPERS": "Data from Panama Papers leak (2016)", | |
| "PARADISE_PAPERS": "Data from Paradise Papers leak (2017)", | |
| "BAHAMAS_LEAKS": "Data from Bahamas Leaks (2016)", | |
| "OFFSHORE_LEAKS": "Data from Offshore Leaks (2013)", | |
| "PANDORA_PAPERS": "Data from Pandora Papers leak (2021)" | |
| } | |