Sales-AI-Core / database.py
Romanchello-bit's picture
Finalize app and fix all dependencies
25f9ba9
import sqlite3
import pandas as pd
import json
import streamlit as st
DB_FILE = "leads.db"
def get_connection():
"""Create a SQLite connection with safe defaults (foreign keys on)."""
conn = sqlite3.connect(DB_FILE)
try:
conn.execute("PRAGMA foreign_keys = ON")
except Exception:
pass
return conn
def init_db():
"""Initializes all tables for the application."""
with get_connection() as conn:
cursor = conn.cursor()
# Main leads table
cursor.execute("""
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
Date TEXT, Name TEXT, Company TEXT, Type TEXT, Context TEXT,
Pain_Point TEXT, Budget TEXT, Outcome TEXT, Summary TEXT,
Archetype TEXT, Transcript TEXT
)
""")
# --- Colosseum Tables ---
cursor.execute("""
CREATE TABLE IF NOT EXISTS scenarios (
id INTEGER PRIMARY KEY AUTOINCREMENT,
generation INTEGER DEFAULT 0,
fitness_score REAL DEFAULT 0.0,
graph_json TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS simulations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scenario_id INTEGER,
customer_persona TEXT,
outcome TEXT,
score INTEGER,
transcript TEXT,
FOREIGN KEY (scenario_id) REFERENCES scenarios (id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS phrase_analytics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scenario_id INTEGER,
node_name TEXT,
phrase TEXT,
impact TEXT,
count INTEGER DEFAULT 1,
UNIQUE(scenario_id, node_name, phrase, impact),
FOREIGN KEY (scenario_id) REFERENCES scenarios (id)
)
""")
conn.commit()
def add_lead(lead_data):
"""Adds a new lead to the database.
lead_data: dict with optional keys matching leads table columns.
Returns inserted row id.
"""
with get_connection() as conn:
cursor = conn.cursor()
# Ensure DB exists
init_db()
# Valid columns as per schema
columns = [
"Date", "Name", "Company", "Type", "Context",
"Pain_Point", "Budget", "Outcome", "Summary",
"Archetype", "Transcript"
]
cols_used = []
vals_used = []
for col in columns:
if col in lead_data:
cols_used.append(col)
vals_used.append(lead_data[col])
if not cols_used:
return None
placeholders = ", ".join(["?"] * len(cols_used))
cols_sql = ", ".join(cols_used)
cursor.execute(
f"INSERT INTO leads ({cols_sql}) VALUES ({placeholders})",
tuple(vals_used)
)
conn.commit()
# Invalidate cached readers
try:
st.cache_data.clear()
except Exception:
pass
return cursor.lastrowid
# --- Functions that write data don't get cached ---
@st.cache_data
def get_all_leads():
"""Retrieves all leads from the database."""
with get_connection() as conn:
return pd.read_sql_query("SELECT * FROM leads", conn)
@st.cache_data
def get_scenario(scenario_id):
"""Retrieves a specific scenario."""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT graph_json FROM scenarios WHERE id = ?", (int(scenario_id),))
row = cursor.fetchone()
if not row:
return None
try:
return json.loads(row[0])
except Exception:
# Corrupt or invalid JSON stored
return None
# --- Evolution Hub Read Functions ---
@st.cache_data
def get_all_scenarios_with_stats():
"""Retrieves all scenarios with aggregated stats."""
with get_connection() as conn:
query = """
SELECT
s.id,
s.generation,
s.fitness_score,
COUNT(sim.id) as simulation_count
FROM scenarios s
LEFT JOIN simulations sim ON s.id = sim.scenario_id
GROUP BY s.id
ORDER BY s.fitness_score DESC
"""
return pd.read_sql_query(query, conn)
@st.cache_data
def get_simulations_for_scenario(scenario_id, limit=10):
"""Retrieves recent simulations for a specific scenario."""
# Sanitize inputs
try:
scenario_id = int(scenario_id)
except Exception:
scenario_id = -1
try:
limit = int(limit)
except Exception:
limit = 10
# Clamp limit to safe bounds
if limit < 1:
limit = 1
if limit > 100:
limit = 100
with get_connection() as conn:
query = (
"SELECT outcome, score, customer_persona FROM simulations "
"WHERE scenario_id = ? ORDER BY id DESC LIMIT ?"
)
return pd.read_sql_query(query, conn, params=(scenario_id, limit))
@st.cache_data
def get_phrase_analytics_for_scenario(scenario_id):
"""Retrieves phrase analytics for a specific scenario."""
try:
scenario_id = int(scenario_id)
except Exception:
scenario_id = -1
with get_connection() as conn:
query = (
"SELECT phrase, impact, count, node_name FROM phrase_analytics "
"WHERE scenario_id = ? ORDER BY count DESC"
)
return pd.read_sql_query(query, conn, params=(scenario_id,))
# --- Write functions (no caching) ---
def add_scenario(graph_json, generation=0):
"""Insert a new scenario and return its ID."""
init_db()
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO scenarios (generation, fitness_score, graph_json) VALUES (?, ?, ?)",
(generation, 0.0, json.dumps(graph_json))
)
conn.commit()
try:
st.cache_data.clear()
except Exception:
pass
return cursor.lastrowid
def log_simulation(log_data):
"""Insert a simulation log. Expects keys: scenario_id, customer_persona, outcome, score, transcript"""
required = ["scenario_id", "customer_persona", "outcome", "score", "transcript"]
for k in required:
if k not in log_data:
raise ValueError(f"Missing field in log_data: {k}")
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO simulations (scenario_id, customer_persona, outcome, score, transcript) "
"VALUES (?, ?, ?, ?, ?)",
(
log_data["scenario_id"],
json.dumps(log_data["customer_persona"]) if not isinstance(log_data["customer_persona"], str) else log_data["customer_persona"],
log_data["outcome"],
int(log_data["score"]),
log_data["transcript"],
)
)
conn.commit()
try:
st.cache_data.clear()
except Exception:
pass
return cursor.lastrowid
def update_phrase_analytics(analytics_data):
"""Update phrase analytics using upsert-like logic.
Expects list of dicts with keys: scenario_id, node_name, phrase, impact, count
"""
if not analytics_data:
return 0
updated = 0
with get_connection() as conn:
cursor = conn.cursor()
for item in analytics_data:
scenario_id = item.get("scenario_id")
node_name = item.get("node_name")
phrase = item.get("phrase")
impact = item.get("impact")
count = int(item.get("count", 1))
if not all([scenario_id, node_name, phrase, impact]):
continue
# Try insert; if conflict, update count
cursor.execute(
"INSERT OR IGNORE INTO phrase_analytics (scenario_id, node_name, phrase, impact, count) "
"VALUES (?, ?, ?, ?, ?)",
(scenario_id, node_name, phrase, impact, count)
)
cursor.execute(
"UPDATE phrase_analytics SET count = count + ? WHERE scenario_id = ? AND node_name = ? AND phrase = ? AND impact = ?",
(count, scenario_id, node_name, phrase, impact)
)
updated += 1
conn.commit()
try:
st.cache_data.clear()
except Exception:
pass
return updated
def update_scenario_fitness(scenario_id):
"""Recompute and update the fitness score of a scenario as the average of its simulations' scores."""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT AVG(score) FROM simulations WHERE scenario_id = ?",
(scenario_id,)
)
row = cursor.fetchone()
avg_score = row[0] if row and row[0] is not None else 0.0
cursor.execute(
"UPDATE scenarios SET fitness_score = ? WHERE id = ?",
(avg_score, scenario_id)
)
conn.commit()
try:
st.cache_data.clear()
except Exception:
pass
return avg_score
if __name__ == '__main__':
print("Initializing database for Colosseum...")
init_db()
print("Database initialized.")