BizIntel_AI / db_connector.py
mgbam's picture
Update db_connector.py
17b2c9a verified
# db_connector.py
import os
import pandas as pd
from sqlalchemy import create_engine, inspect
SUPPORTED_ENGINES = ["PostgreSQL", "MySQL", "SQLite", "MSSQL", "Oracle"]
def _env_to_uri(engine: str) -> str | None:
"""Build SQLAlchemy URI from env vars; return None if vars missing."""
match engine:
case "PostgreSQL":
host = os.getenv("PG_HOST"); port = os.getenv("PG_PORT", "5432")
db = os.getenv("PG_DB"); user = os.getenv("PG_USER"); pw = os.getenv("PG_PW")
if all([host, db, user, pw]):
return f"postgresql://{user}:{pw}@{host}:{port}/{db}"
case "MySQL":
host = os.getenv("MYSQL_HOST"); port = os.getenv("MYSQL_PORT", "3306")
db = os.getenv("MYSQL_DB"); user = os.getenv("MYSQL_USER"); pw = os.getenv("MYSQL_PW")
if all([host, db, user, pw]):
return f"mysql+mysqlconnector://{user}:{pw}@{host}:{port}/{db}"
case "MSSQL":
if os.getenv("MSSQL_CONN_STR"):
return os.getenv("MSSQL_CONN_STR")
# add Oracle, etc.
return None
def list_tables(conn_str: str):
engine = create_engine(conn_str)
return inspect(engine).get_table_names()
def fetch_data_from_db(conn_str: str, table: str) -> str:
engine = create_engine(conn_str)
df = pd.read_sql_table(table, engine)
tmp_path = os.path.join(tempfile.gettempdir(), f"{table}.csv")
df.to_csv(tmp_path, index=False)
return tmp_path
def get_connection_string(engine: str, manual_input: str | None) -> str | None:
"""Prefer env‑vars; fallback to user input."""
auto = _env_to_uri(engine)
return auto or manual_input