# src/auth.py import os, re, time, uuid, sqlite3, secrets, logging, sys from datetime import datetime, timedelta from pathlib import Path import streamlit as st import bcrypt from email_validator import validate_email as _validate_email, EmailNotValidError from PIL import Image import pandas as pd from src.utils import get_connection # -------- logging to HF Runtime logs -------- LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logging.basicConfig( stream=sys.stdout, level=getattr(logging, LOG_LEVEL, logging.INFO), format="%(asctime)s %(levelname)s [auth] %(message)s", force=True, ) log = logging.getLogger("auth") # ========================= # Config (ENV-driven) # ========================= LOCKOUT_MINUTES = int(os.getenv("LOCKOUT_MINUTES", "15")) PASSWORD_EXPIRY_DAYS = int(os.getenv("PASSWORD_EXPIRY_DAYS", "90")) PASSWORD_HISTORY_SIZE = int(os.getenv("PASSWORD_HISTORY_SIZE", "3")) SESSION_TIMEOUT_MINUTES = int(os.getenv("SESSION_TIMEOUT_MINUTES", "30")) # Pepper + optional fallbacks (for rotation/migration) PEPPER = (os.getenv("PEPPER") or "").strip() PEPPER_FALLBACKS = [p.strip() for p in os.getenv("PEPPER_FALLBACKS", "").split(",") if p.strip()] # ========================= # Password policy # ========================= PASS_MIN_LEN = 8 _PASS_UPPER = re.compile(r"[A-Z]") _PASS_LOWER = re.compile(r"[a-z]") _PASS_DIGIT = re.compile(r"\d") _PASS_SPECIAL = re.compile(r"[^A-Za-z0-9]") def password_policy_errors(pw: str) -> list[str]: errs = [] if len(pw) < PASS_MIN_LEN: errs.append(f"Min length {PASS_MIN_LEN}") if not _PASS_UPPER.search(pw): errs.append("At least 1 uppercase") if not _PASS_LOWER.search(pw): errs.append("At least 1 lowercase") if not _PASS_DIGIT.search(pw): errs.append("At least 1 digit") if not _PASS_SPECIAL.search(pw): errs.append("At least 1 special char") return errs def normalize_email(email: str) -> str: return (email or "").strip().lower() def is_alnum(s: str) -> bool: return bool(re.fullmatch(r"[A-Za-z0-9]+", s or "")) def is_valid_username(u: str) -> bool: return bool(re.fullmatch(r"[A-Za-z0-9_]{3,32}", u or "")) # ========================= # Hashing helpers (bcrypt + pepper with fallbacks) # ========================= def hash_password(pw: str) -> str: raw = (pw + PEPPER).encode("utf-8") return "bcrypt$" + bcrypt.hashpw(raw, bcrypt.gensalt(rounds=12)).decode("utf-8") def verify_password_migrating(email: str, plaintext_pw: str) -> tuple[bool, dict | None]: """ Try current PEPPER, then any fallbacks, then legacy (no pepper). On fallback/legacy match, rehash with CURRENT pepper and update DB. """ em = normalize_email(email) with get_connection() as conn: conn.row_factory = sqlite3.Row row = conn.execute( "SELECT id, password_hash FROM users WHERE email=?", (em,) ).fetchone() if not row: log.info("login: no user for email=%s", em) return (False, None) stored = (row["password_hash"] or "") if not stored.startswith("bcrypt$"): log.warning("login: non-bcrypt hash for uid=%s", row["id"]) return (False, None) hashed = stored[len("bcrypt$"):].encode("utf-8") # 1) current pepper try: if bcrypt.checkpw((plaintext_pw + PEPPER).encode("utf-8"), hashed): log.info("login: uid=%s pepper=current match", row["id"]) return (True, {"rehash": False, "user_id": row["id"]}) except Exception as e: log.exception("login: bcrypt error (current pepper): %s", e) # 2) fallbacks for old in PEPPER_FALLBACKS: try: if bcrypt.checkpw((plaintext_pw + old).encode("utf-8"), hashed): log.info("login: uid=%s pepper=fallback match -> rehash", row["id"]) now = datetime.utcnow().isoformat(timespec="seconds") new_hash = hash_password(plaintext_pw) conn.execute( "UPDATE users SET password_hash=?, password_changed_at=?, updated_at=? WHERE id=?", (new_hash, now, now, row["id"]) ) conn.execute( "INSERT INTO user_password_history (user_id, password_hash, changed_at) VALUES (?, ?, ?)", (row["id"], new_hash, now) ) conn.commit() return (True, {"rehash": True, "user_id": row["id"]}) except Exception as e: log.exception("login: bcrypt error (fallback pepper): %s", e) # 3) legacy (no pepper) try: if bcrypt.checkpw(plaintext_pw.encode("utf-8"), hashed): log.info("login: uid=%s pepper=legacy(none) match -> rehash", row["id"]) now = datetime.utcnow().isoformat(timespec="seconds") new_hash = hash_password(plaintext_pw) conn.execute( "UPDATE users SET password_hash=?, password_changed_at=?, updated_at=? WHERE id=?", (new_hash, now, now, row["id"]) ) conn.execute( "INSERT INTO user_password_history (user_id, password_hash, changed_at) VALUES (?, ?, ?)", (row["id"], new_hash, now) ) conn.commit() return (True, {"rehash": True, "user_id": row["id"]}) except Exception as e: log.exception("login: bcrypt error (legacy no pepper): %s", e) log.info("login: uid=%s password mismatch", row["id"]) return (False, None) # ========================= # CAPTCHA helpers (rotate with nonce so widget remounts) # ========================= def rotate_captcha(key: str): a, b = secrets.randbelow(8) + 2, secrets.randbelow(8) + 2 # 2..9 st.session_state[key] = (a, b, a + b) st.session_state[key + "_nonce"] = st.session_state.get(key + "_nonce", 0) + 1 # --- DEBUG --- log.debug("captcha.rotate: key=%s a=%d b=%d sum=%d nonce_now=%d", key, a, b, a + b, st.session_state[key + "_nonce"]) def get_captcha_and_nonce(key: str): if key not in st.session_state: log.debug("captcha.get: key=%s missing; creating new", key) rotate_captcha(key) a, b, s = st.session_state[key] nonce = st.session_state.get(key + "_nonce", 0) # --- DEBUG --- log.debug("captcha.get: key=%s a=%d b=%d sum=%d nonce=%d", key, a, b, s, nonce) return a, b, s, nonce def read_captcha(key: str): triplet = st.session_state.get(key, (0, 0, 0)) # --- DEBUG --- try: a, b, s = triplet log.debug("captcha.read: key=%s a=%d b=%d sum=%d", key, a, b, s) except Exception: log.debug("captcha.read: key=%s triplet=%s", key, triplet) return triplet def check_captcha(ans, key: str) -> bool: """Compare submitted answer to expected sum; logs both sides.""" _, _, s = read_captcha(key) try: if isinstance(ans, str): ans_str = ans ans = ans.strip() if ans == "": log.debug("captcha.check: empty string; expected=%s key=%s", s, key) return False ans = int(ans) log.debug("captcha.check: parsed string '%s' -> %d (expected=%d)", ans_str, ans, s) else: log.debug("captcha.check: numeric ans=%s expected=%s key=%s", ans, s, key) ok = int(ans) == int(s) log.debug("captcha.check: result=%s", ok) return ok except Exception as e: log.debug("captcha.check: exception parsing ans=%s expected=%s key=%s err=%s", ans, s, key, e) return False # ========================= # Schema init / bootstrap # ========================= def init_auth_schema(): with get_connection() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, email TEXT NOT NULL UNIQUE, customer_id TEXT NOT NULL, mobile TEXT, role TEXT NOT NULL CHECK (role IN ('admin','user')), password_hash TEXT NOT NULL, password_algo TEXT NOT NULL DEFAULT 'bcrypt', password_changed_at TEXT, failed_attempts INTEGER NOT NULL DEFAULT 0, locked_until TEXT, is_active INTEGER NOT NULL DEFAULT 1, last_login_at TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, require_pw_reset INTEGER NOT NULL DEFAULT 0 )""") conn.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)") conn.execute(""" CREATE TABLE IF NOT EXISTS user_password_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, password_hash TEXT NOT NULL, changed_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE )""") conn.execute(""" CREATE TABLE IF NOT EXISTS login_audit ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, email TEXT NOT NULL, user_id INTEGER, result TEXT NOT NULL, -- success | failure | locked reason TEXT, client_ip TEXT, user_agent TEXT, session_id TEXT )""") def bootstrap_admin_from_env(): """ Create the initial admin if none exists. If ADMIN_RESET=1 and ADMIN_PASSWORD is provided, reset that admin's password (and unlock/reset counters) on every boot. """ import os from datetime import datetime admin_user = os.getenv("ADMIN_USER", "admin") admin_email = normalize_email(os.getenv("ADMIN_EMAIL", "admin@test.com")) admin_pw = os.getenv("ADMIN_PASSWORD") # optional admin_cust = os.getenv("ADMIN_CUSTOMER_ID", "CUST-ADMIN-001") # When set to "1", we will reset password for existing admin_email do_reset = os.getenv("ADMIN_RESET", "0") == "1" with get_connection() as conn: # Is there any admin at all? row = conn.execute( "SELECT id, email FROM users WHERE role='admin' ORDER BY id ASC LIMIT 1" ).fetchone() if not row: # No admin โ†’ create one (password required) if not admin_pw: log.warning("bootstrap_admin: no admin exists and ADMIN_PASSWORD not set; cannot create admin") return now = datetime.utcnow().isoformat(timespec="seconds") pwh = hash_password(admin_pw) # uses current PEPPER conn.execute(""" INSERT INTO users (username, email, customer_id, role, password_hash, password_algo, password_changed_at, created_at, updated_at, is_active, require_pw_reset, failed_attempts, locked_until) VALUES (?, ?, ?, 'admin', ?, 'bcrypt+pepper', ?, ?, ?, 1, 0, 0, NULL) """, (admin_user, admin_email, admin_cust, pwh, now, now, now)) conn.commit() log.info("bootstrap_admin: created admin user %s", admin_email) return # Admin exists. If ADMIN_RESET=1 and we have ADMIN_PASSWORD, reset it for that email. if do_reset and admin_pw: existing = conn.execute( "SELECT id FROM users WHERE email = ? LIMIT 1", (admin_email,) ).fetchone() if existing: uid = existing["id"] now = datetime.utcnow().isoformat(timespec="seconds") pwh = hash_password(admin_pw) # rehash with current PEPPER conn.execute(""" UPDATE users SET password_hash = ?, password_algo = 'bcrypt+pepper', password_changed_at = ?, updated_at = ?, failed_attempts = 0, locked_until = NULL, is_active = 1, require_pw_reset = 0 WHERE id = ? """, (pwh, now, now, uid)) conn.commit() log.info("bootstrap_admin: reset password for %s (id=%s)", admin_email, uid) else: # If ADMIN_EMAIL changed but original admin wasnโ€™t that email, we can also upsert a second admin. now = datetime.utcnow().isoformat(timespec="seconds") pwh = hash_password(admin_pw) conn.execute(""" INSERT INTO users (username, email, customer_id, role, password_hash, password_algo, password_changed_at, created_at, updated_at, is_active, require_pw_reset, failed_attempts, locked_until) VALUES (?, ?, ?, 'admin', ?, 'bcrypt+pepper', ?, ?, ?, 1, 0, 0, NULL) """, (admin_user, admin_email, admin_cust, pwh, now, now, now)) conn.commit() log.info("bootstrap_admin: created secondary admin %s via reset path", admin_email) else: log.info("bootstrap_admin: admin exists; no reset requested (ADMIN_RESET!=1 or no ADMIN_PASSWORD)") # --- Admin: Create User --------------------------------------------------------- import re from datetime import datetime, timedelta import streamlit as st # If you already have these helpers, use your versions and delete the fallbacks: # def normalize_email(e: str) -> str: return (e or "").strip().lower() # def password_policy_errors(pw: str) -> list[str]: ... # def cannot_reuse_password(user_id: int, pw: str) -> bool: ... # def hash_password(pw: str) -> str: ... # PASSWORD_HISTORY_SIZE = 3 EMAIL_RX = re.compile(r"^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}$", re.I) USERNAME_RX = re.compile(r"^[A-Za-z0-9._-]{3,32}$") CUSTOMER_ID_RX = re.compile(r"^[A-Za-z0-9-]{3,64}$") def _email_ok(v: str) -> bool: return bool(v and EMAIL_RX.match(v)) def _username_ok(v: str) -> bool: return bool(v and USERNAME_RX.match(v)) def _customer_id_ok(v: str) -> bool: # You said alphanumeric; allowing dash for flexibility return bool(v and CUSTOMER_ID_RX.match(v)) def _is_unique(conn, field: str, value: str) -> bool: cur = conn.execute(f"SELECT 1 FROM users WHERE {field} = ? LIMIT 1", (value,)) return cur.fetchone() is None def render_admin_create_user(): """Admin UI: create a new user with validations and hashed password.""" st.subheader("๐Ÿ‘ค Create User") with st.form("admin_create_user"): c1, c2 = st.columns([1,1]) with c1: username = st.text_input("Username (3โ€“32, letters/digits/._-)") email = normalize_email(st.text_input("Email")) customer_id = st.text_input("Customer ID (alphanumeric)") with c2: role = st.selectbox("Role", ["user", "admin"]) pw1 = st.text_input("Password", type="password") pw2 = st.text_input("Confirm Password", type="password") create_clicked = st.form_submit_button("Create User") if not create_clicked: return # Validations errs = [] if not _username_ok(username): errs.append("Username must be 3โ€“32 characters, letters/digits/._- only.") if not _email_ok(email): errs.append("Invalid email address.") if not _customer_id_ok(customer_id): errs.append("Customer ID must be 3โ€“64 characters, alphanumeric (dash allowed).") if pw1 != pw2: errs.append("Passwords do not match.") policy = password_policy_errors(pw1) if policy: errs.append("Password policy: " + ", ".join(policy)) if errs: for e in errs: st.error(e) return # Uniqueness from src.utils import get_connection with get_connection() as conn: if not _is_unique(conn, "email", email): st.error("Email already exists."); return if not _is_unique(conn, "username", username): st.error("Username already exists."); return if not _is_unique(conn, "customer_id", customer_id): st.error("Customer ID already exists."); return now = datetime.utcnow().isoformat(timespec="seconds") pwh = hash_password(pw1) conn.execute( """ INSERT INTO users (username, email, customer_id, role, password_hash, password_algo, password_changed_at, created_at, updated_at, is_active, require_pw_reset, failed_attempts, locked_until) VALUES (?, ?, ?, ?, ?, 'bcrypt', ?, ?, ?, 1, 0, 0, NULL) """, (username, email, customer_id, role, pwh, now, now, now), ) # insert password history row uid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] conn.execute( "INSERT INTO user_password_history (user_id, password_hash, changed_at) VALUES (?, ?, ?)", (uid, pwh, now), ) conn.commit() st.success(f"User created: {username} ({role})") # --- Admin: Manage Users (enable/disable, reset password) ----------------------- @st.cache_data def _list_users(q: str | None): from src.utils import get_connection q = (q or "").strip().lower() with get_connection() as conn: conn.row_factory = sqlite3.Row if q: rows = conn.execute( """ SELECT id, username, email, customer_id, role, is_active, locked_until, last_login_at FROM users WHERE lower(username) LIKE ? OR lower(email) LIKE ? OR lower(customer_id) LIKE ? ORDER BY role DESC, username ASC LIMIT 200 """, (f"%{q}%", f"%{q}%", f"%{q}%"), ).fetchall() else: rows = conn.execute( """ SELECT id, username, email, customer_id, role, is_active, locked_until, last_login_at FROM users ORDER BY role DESC, username ASC LIMIT 200 """ ).fetchall() return pd.DataFrame(rows) def _toggle_active(uid: int): with get_connection() as conn: is_active = conn.execute("SELECT is_active FROM users WHERE id=?", (uid,)).fetchone()[0] new_status = 0 if is_active else 1 conn.execute( "UPDATE users SET is_active=?, updated_at=? WHERE id=?", (new_status, datetime.utcnow().isoformat(timespec="seconds"), uid) ) conn.commit() # Rerun the script to reflect the change st.rerun() def _reset_password(uid: int, new_pw: str): errs = password_policy_errors(new_pw) if errs: for e in errs: st.error(e) return False if cannot_reuse_password(uid, new_pw): st.error(f"Cannot reuse any of the last {PASSWORD_HISTORY_SIZE} passwords.") return False now = datetime.utcnow().isoformat(timespec="seconds") pwh = hash_password(new_pw) with get_connection() as conn: conn.execute( "UPDATE users SET password_hash=?, password_changed_at=?, updated_at=?, require_pw_reset=1, failed_attempts=0, locked_until=NULL WHERE id=?", (pwh, now, now, uid) ) conn.execute( "INSERT INTO user_password_history (user_id, password_hash, changed_at) VALUES (?, ?, ?)", (uid, pwh, now) ) conn.commit() st.success("Password reset. User will be required to change it at next login.") st.rerun() return True def render_admin_manage_users(): """Admin UI: search users; enable/disable; reset password (force reset at next login).""" st.subheader("๐Ÿ”ง Manage Users") # Use a text_input widget with a key to manage the search query in Session State if "search_query" not in st.session_state: st.session_state.search_query = "" q = st.text_input("Search by username / email / customer ID", value=st.session_state.search_query, key="user_search") st.session_state.search_query = q # Fetch users and convert to DataFrame df = _list_users(q) if df.empty: st.info("No users found for the current filter."); return # Display the dataframe with an action column # Use st.columns to create a more dynamic display if not df.empty: cols = st.columns([1, 1, 1, 1, 1]) with cols[0]: st.write("Username") with cols[1]: st.write("Email") with cols[2]: st.write("Role") with cols[3]: st.write("Status") with cols[4]: st.write("Actions") for index, row in df.iterrows(): rid, username, email, role, is_active, locked_until, last_login = row["id"], row["username"], row["email"], row["role"], row["is_active"], row["locked_until"], row["last_login_at"] with st.container(border=True): c1, c2, c3, c4, c5 = st.columns([1, 1, 1, 1, 1]) with c1: st.markdown(f"**{username}**") with c2: st.markdown(f"{email}") with c3: st.write(f" `{role}`") with c4: st.write(f"{'โœ… Active' if is_active else 'โŒ Disabled'}") with c5: # Toggle button toggle_label = "Disable" if is_active else "Enable" st.button(toggle_label, key=f"toggle_{rid}", on_click=_toggle_active, args=(rid,)) # Reset password expander with st.expander("Reset Password"): with st.form(f"reset_form_{rid}", clear_on_submit=True): npw1 = st.text_input("New password", type="password", key=f"npw1_{rid}") npw2 = st.text_input("Confirm new password", type="password", key=f"npw2_{rid}") reset_btn = st.form_submit_button("Apply Reset") if reset_btn: if npw1 != npw2: st.error("Passwords do not match.") else: _reset_password(rid, npw1) # ========================= # Session helpers # ========================= def set_session_user(row: sqlite3.Row): st.session_state["_sid"] = st.session_state.get("_sid") or str(uuid.uuid4()) st.session_state["user"] = { "id": row["id"], "username": row["username"], "email": row["email"], "role": row["role"], "customer_id": row["customer_id"], "last_activity": time.time(), "password_changed_at": row["password_changed_at"], "require_pw_reset": int(row["require_pw_reset"]) if "require_pw_reset" in row.keys() else 0, } def session_active() -> bool: u = st.session_state.get("user") if not u: return False idle = time.time() - u.get("last_activity", 0) if idle > SESSION_TIMEOUT_MINUTES * 60: st.session_state.pop("user", None) return False u["last_activity"] = time.time() st.session_state["user"] = u return True def session_countdown_widget(): u = st.session_state.get("user") if not u: return idle = time.time() - u.get("last_activity", 0) remain = max(0, SESSION_TIMEOUT_MINUTES*60 - int(idle)) mins, secs = divmod(remain, 60) st.sidebar.caption(f"โณ Session expires in {mins:02d}:{secs:02d}") # ========================= # Queries / checks # ========================= def user_by_email(email: str) -> sqlite3.Row | None: with get_connection() as conn: conn.row_factory = sqlite3.Row return conn.execute("SELECT * FROM users WHERE email=?", (email,)).fetchone() def email_exists(email: str) -> bool: with get_connection() as conn: return bool(conn.execute("SELECT 1 FROM users WHERE email=?", (email,)).fetchone()) def username_exists(username: str) -> bool: with get_connection() as conn: return bool(conn.execute("SELECT 1 FROM users WHERE username=?", (username,)).fetchone()) def within_lock(row: sqlite3.Row) -> bool: val = row["locked_until"] if not val: return False try: return datetime.utcnow() < datetime.fromisoformat(val) except Exception: return False def is_password_expired(row: sqlite3.Row) -> bool: changed = row["password_changed_at"] if not changed: return True try: return datetime.utcnow() - datetime.fromisoformat(changed) > timedelta(days=PASSWORD_EXPIRY_DAYS) except Exception: return True def record_login_audit(email: str, result: str, reason: str = "", user_id: int | None = None): with get_connection() as conn: conn.execute(""" INSERT INTO login_audit (timestamp, email, user_id, result, reason, client_ip, user_agent, session_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( datetime.utcnow().isoformat(timespec="seconds"), email, user_id, result, reason, None, None, st.session_state.get("_sid") )) conn.commit() def cannot_reuse_password(user_id: int, plaintext_pw: str) -> bool: with get_connection() as conn: rows = conn.execute(""" SELECT password_hash FROM user_password_history WHERE user_id=? ORDER BY changed_at DESC LIMIT ? """, (user_id, PASSWORD_HISTORY_SIZE)).fetchall() raw = (plaintext_pw + PEPPER).encode("utf-8") for (stored,) in rows: if not stored.startswith("bcrypt$"): continue if bcrypt.checkpw(raw, stored[len("bcrypt$"):].encode("utf-8")): return True return False # ========================= # Login / Signup UI # ========================= def _load_login_hero(): p = os.getenv("HERO_IMAGE", "assets/login_side.jpg") try: return Image.open(p) except Exception: try: return Image.open("assets/login_hero.jpg") except Exception: return None def render_signup_panel_inline(): st.subheader("Create your account") # Use same nonce-based CAPTCHA as login CAPTCHA_KEY_SIGNUP = "_captcha_signup" a2, b2, _sum2, nonce2 = get_captcha_and_nonce(CAPTCHA_KEY_SIGNUP) log.debug("captcha.signup:init a=%d b=%d sum=%d nonce=%d", a2, b2, _sum2, nonce2) with st.form("signup_form", clear_on_submit=True): username = st.text_input("Username") email2 = normalize_email(st.text_input("Email address")) cust = st.text_input("Customer ID") pcol1, pcol2 = st.columns(2) with pcol1: pw1 = st.text_input("Password", type="password") with pcol2: pw2 = st.text_input("Confirm Password", type="password") # CAPTCHA row: text input (blank by default) + refresh button cS1, cS2 = st.columns([3, 1]) with cS1: cap_raw = st.text_input( f"CAPTCHA: What is {a2} + {b2}?", key=f"cap_signup_{nonce2}", placeholder="Type the sum" ) with cS2: refresh_signup = st.form_submit_button("โ†ป New CAPTCHA") ok2 = st.form_submit_button("Create Account", width="stretch") # Handle refresh before any validation if refresh_signup: log.info("captcha.signup: manual refresh pressed (old a=%d b=%d sum=%d nonce=%d)", a2, b2, _sum2, nonce2) rotate_captcha(CAPTCHA_KEY_SIGNUP) st.stop() if ok2: log.debug("signup.click: email=%s user=%s cust=%s cap_entered_raw=%s expected=%s", email2, username, cust, cap_raw, _sum2) # --- CAPTCHA parsing & check (fix) --- if cap_raw is None or cap_raw.strip() == "": st.warning("Please answer the CAPTCHA.") st.stop() try: cap_ans = int(cap_raw.strip()) except Exception: st.error("CAPTCHA must be a number.") st.stop() if not check_captcha(cap_ans, CAPTCHA_KEY_SIGNUP): log.info("captcha.signup: wrong answer email=%s entered=%s expected=%s", email2, cap_ans, _sum2) st.error("CAPTCHA incorrect. Please try again.") rotate_captcha(CAPTCHA_KEY_SIGNUP) st.stop() # ---- keep your existing validations below ---- try: _validate_email(email2) except EmailNotValidError: st.error("Invalid email format."); rotate_captcha(CAPTCHA_KEY_SIGNUP); st.stop() if not is_valid_username(username): st.error("Username must be 3โ€“32 and letters/digits/_ only."); rotate_captcha(CAPTCHA_KEY_SIGNUP); st.stop() if not is_alnum(cust): st.error("Customer ID must be alphanumeric."); rotate_captcha(CAPTCHA_KEY_SIGNUP); st.stop() if email_exists(email2): st.error("Email already in use."); rotate_captcha(CAPTCHA_KEY_SIGNUP); st.stop() if username_exists(username): st.error("Username already in use."); rotate_captcha(CAPTCHA_KEY_SIGNUP); st.stop() if pw1 != pw2: st.error("Passwords do not match."); rotate_captcha(CAPTCHA_KEY_SIGNUP); st.stop() errs = password_policy_errors(pw1) if errs: st.error("Password policy: " + ", ".join(errs)); rotate_captcha(CAPTCHA_KEY_SIGNUP); st.stop() # --- create user (unchanged) --- now = datetime.utcnow().isoformat(timespec="seconds") pwh = hash_password(pw1) with get_connection() as conn: conn.execute(""" INSERT INTO users (username, email, customer_id, role, password_hash, password_algo, password_changed_at, created_at, updated_at, is_active, require_pw_reset) VALUES (?, ?, ?, 'user', ?, 'bcrypt', ?, ?, ?, 1, 0) """, (username, email2, cust, pwh, now, now, now)) uid = conn.execute("SELECT id FROM users WHERE email=?", (email2,)).fetchone()[0] conn.execute( "INSERT INTO user_password_history (user_id, password_hash, changed_at) VALUES (?, ?, ?)", (uid, pwh, now) ) conn.commit() st.success("Account created. Please log in.") st.session_state["_auth_view"] = "login" st.rerun() def render_login_signup_gate(): """ Clean side-by-side login/signup. - No raw HTML wrappers that can create stray blocks. - A single columns() call controls the entire layout. - All login logic preserved. Returns False (not logged in at end of render); caller handles rerun. """ view = st.session_state.get("_auth_view", "login") log.debug("login.view: %s", view) # Inject custom CSS to align heights and handle flickering # This CSS targets Streamlit's div elements to force flexbox behavior # and ensure the login form's height matches the image's height. st.markdown(""" """, unsafe_allow_html=True) # ---- One row wrapper so CSS can control equal heights ---- st.markdown('
', unsafe_allow_html=True) # ---- layout: one columns() only ---- col_left, col_right = st.columns([1, 1], gap="large") # Left: hero image with col_left: hero = _load_login_hero() st.markdown('
', unsafe_allow_html=True) if hero is not None: # Replaced use_column_width=True with use_container_width=True st.image(hero, use_container_width=True) else: st.markdown( "
" "Add assets/login_side.jpg
", unsafe_allow_html=True, ) st.markdown('
', unsafe_allow_html=True) # Right: login/signup card (native border=True to get a card look) with col_right: st.markdown('
', unsafe_allow_html=True) card = st.container(border=True) # <- native, no custom HTML with card: # Header (logo + title) c_logo, c_title = st.columns([0.18, 0.82]) with c_logo: try: st.image("assets/logo.png", width=40) except Exception: st.write(" ") with c_title: st.markdown("## Ecomm Chatbot") st.caption("Sign into your account") # ===== SIGNUP ===== #if view == "signup": if st.session_state.get("_auth_view", "login") == "signup": render_signup_panel_inline() # your existing function (with captcha fix) l1, l2 = st.columns([1, 1]) with l1: st.caption("Forgot password?") with l2: if st.button("Back to login", key="back_to_login_link"): st.session_state["_auth_view"] = "login" st.rerun() # close shell and row then return st.markdown('
', unsafe_allow_html=True) #
st.markdown('', unsafe_allow_html=True) # return False # ===== LOGIN ===== CAPTCHA_KEY_LOGIN = "_captcha_login" a, b, _sum, nonce = get_captcha_and_nonce(CAPTCHA_KEY_LOGIN) log.debug("captcha.login:init a=%d b=%d sum=%d nonce=%d", a, b, _sum, nonce) with st.form("login_form", clear_on_submit=False): email = normalize_email(st.text_input("Email address")) pw = st.text_input("Password", type="password") cA, cB = st.columns([3, 1]) with cA: cap_raw = st.text_input( f"CAPTCHA: What is {a} + {b}?", key=f"cap_login_{nonce}", placeholder="Type the sum" ) with cB: refresh_captcha = st.form_submit_button("โ†ป New CAPTCHA") login_clicked = st.form_submit_button("LOGIN", width="stretch") if refresh_captcha: log.info("captcha.login: manual refresh pressed (old a=%d b=%d sum=%d nonce=%d)", a, b, _sum, nonce) rotate_captcha(CAPTCHA_KEY_LOGIN) st.stop() if login_clicked: # Parse CAPTCHA safely if cap_raw is None or cap_raw.strip() == "": st.warning("Please answer the CAPTCHA.") st.stop() try: cap_ans = int(cap_raw.strip()) except Exception: st.error("CAPTCHA must be a number.") st.stop() log.debug("login.click: email=%s cap_entered=%s expected=%s nonce=%d", email, cap_ans, _sum, nonce) # 1) CAPTCHA if not check_captcha(cap_ans, CAPTCHA_KEY_LOGIN): log.info("captcha.login: wrong answer email=%s entered=%s expected=%s nonce=%d", email, cap_ans, _sum, nonce) st.error("CAPTCHA incorrect. Please try again.") rotate_captcha(CAPTCHA_KEY_LOGIN) st.stop() # 2) Basic fields if not email or not pw: log.debug("login.fail: missing fields email?%s pw?%s", bool(email), bool(pw)) st.error("Email and password required.") rotate_captcha(CAPTCHA_KEY_LOGIN) st.stop() # Email validation check added try: _validate_email(email) except EmailNotValidError: st.error("Invalid email format."); rotate_captcha(CAPTCHA_KEY_LOGIN); st.stop() # 3) Lookup user row = user_by_email(email) if not row: log.info("login.fail: no user email=%s", email) record_login_audit(email, "failure", "no_user", None) st.error("Invalid credentials.") rotate_captcha(CAPTCHA_KEY_LOGIN) st.stop() else: log.debug("login.user: id=%s role=%s active=%s", row["id"], row["role"], row["is_active"]) # 4) Account checks if int(row["is_active"]) != 1: log.info("login.fail: disabled id=%s", row["id"]) record_login_audit(email, "failure", "disabled", row["id"]) st.error("Account disabled. Contact admin.") rotate_captcha(CAPTCHA_KEY_LOGIN) st.stop() if within_lock(row): log.info("login.fail: locked id=%s until=%s", row["id"], row["locked_until"]) record_login_audit(email, "locked", "locked", row["id"]) st.error(f"Account locked until {row['locked_until']}.") rotate_captcha(CAPTCHA_KEY_LOGIN) st.stop() # 5) Password verify (migration-aware) ok, _meta = verify_password_migrating(email, pw) log.debug("login.verify: ok=%s meta=%s", ok, _meta) if not ok: with get_connection() as conn: fa = int(row["failed_attempts"]) + 1 locked_until = None if fa >= 3: locked_until = (datetime.utcnow() + timedelta(minutes=LOCKOUT_MINUTES)).isoformat(timespec="seconds") fa = 0 conn.execute( "UPDATE users SET failed_attempts=?, locked_until=?, updated_at=? WHERE id=?", (fa, locked_until, datetime.utcnow().isoformat(timespec="seconds"), row["id"]) ) conn.commit() record_login_audit(email, "failure", "bad_password", row["id"]) log.info("login.fail: bad_password id=%s next_locked_until=%s", row["id"], locked_until) st.error("Invalid credentials." + (f" Locked until {locked_until}." if locked_until else "")) rotate_captcha(CAPTCHA_KEY_LOGIN) st.stop() # 6) Success with get_connection() as conn: conn.execute( "UPDATE users SET failed_attempts=0, locked_until=NULL, last_login_at=?, updated_at=? WHERE id=?", (datetime.utcnow().isoformat(timespec="seconds"), datetime.utcnow().isoformat(timespec="seconds"), row["id"]) ) conn.commit() record_login_audit(email, "success", "", row["id"]) log.info("login.success: id=%s", row["id"]) set_session_user(row) # Force change if required/expired if st.session_state["user"].get("require_pw_reset", 0) == 1 or is_password_expired(row): log.info("login: forcing password change id=%s", row["id"]) st.session_state["_force_change_pw"] = True st.rerun() st.rerun() # Links row l1, l2 = st.columns([1, 1]) with l1: st.caption("Forgot password?") with l2: if st.button("Register here", key="go_signup_link"): st.session_state["_auth_view"] = "signup" st.rerun() st.markdown('', unsafe_allow_html=True) # # Not logged in during this render return False # ========================= # Change password flows โ€” auto return to main tabs # ========================= def _do_change_password(user_id: int, p1: str, p2: str) -> bool: errs = password_policy_errors(p1) if p1 != p2: st.error("Passwords do not match."); return False if errs: st.error("Password policy: " + ", ".join(errs)); return False if cannot_reuse_password(user_id, p1): st.error(f"Cannot reuse any of your last {PASSWORD_HISTORY_SIZE} passwords."); return False now = datetime.utcnow().isoformat(timespec="seconds") pwh = hash_password(p1) with get_connection() as conn: conn.execute( "UPDATE users SET password_hash=?, password_changed_at=?, updated_at=?, require_pw_reset=0 WHERE id=?", (pwh, now, now, user_id) ) conn.execute( "INSERT INTO user_password_history (user_id, password_hash, changed_at) VALUES (?, ?, ?)", (user_id, pwh, now) ) conn.commit() st.success("Password updated.") return True def render_force_change_password(): st.subheader("Set a new password") with st.form("change_pw_forced", clear_on_submit=True): p1 = st.text_input("New Password", type="password") p2 = st.text_input("Confirm Password", type="password") ok = st.form_submit_button("Update Password") if ok: u = st.session_state.get("user") if not u: st.error("Session missing. Please log in again."); return False if _do_change_password(u["id"], p1, p2): st.session_state.pop("_force_change_pw", None) st.session_state["show_change_pw"] = False st.rerun() return False def render_change_password_page(): st.title("๐Ÿ”‘ Change my password") with st.form("change_pw_self", clear_on_submit=True): p1 = st.text_input("New Password", type="password") p2 = st.text_input("Confirm Password", type="password") ok = st.form_submit_button("Update Password") if ok: u = st.session_state.get("user") if not u: st.error("Session missing. Please log in again."); return if _do_change_password(u["id"], p1, p2): st.session_state["show_change_pw"] = False st.rerun() # ========================= # WhoAmI panel (sidebar) # ========================= def _fmt_dt(s: str | None) -> str: if not s: return "โ€”" try: return datetime.fromisoformat(s).strftime("%Y-%m-%d %H:%M") except Exception: try: return datetime.strptime(s, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d %H:%M") except Exception: return s def render_whoami_sidebar(): u = st.session_state.get("user") if not u: return with get_connection() as conn: row = conn.execute( "SELECT email, customer_id, role, last_login_at, password_changed_at, is_active " "FROM users WHERE id=?", (u["id"],) ).fetchone() if not row: st.sidebar.warning("User record not found.") return email, customer_id, role, last_login_at, pw_changed_at, is_active = row is_active = int(is_active) == 1 days_left_str = "โ€”" if pw_changed_at: try: changed_dt = datetime.fromisoformat(pw_changed_at) except Exception: try: changed_dt = datetime.strptime(pw_changed_at, "%Y-%m-%d %H:%M:%S") except Exception: changed_dt = None if changed_dt: expires_on = changed_dt + timedelta(days=PASSWORD_EXPIRY_DAYS) days_left = (expires_on - datetime.utcnow()).days days_left_str = f"{max(days_left, 0)} day(s)" role_badge = "๐Ÿ› ๏ธ Admin" if role == "admin" else "๐Ÿ‘ค User" status_chip = "โœ… Active" if is_active else "โ›” Inactive" st.sidebar.markdown( f"""
Who am I
{role_badge}  ยท  {status_chip}

Username: {u.get('username','โ€”')}
Email: {email or "โ€”"}
Customer ID: {customer_id or "โ€”"}
Last login: {_fmt_dt(last_login_at)}
Pwd changed: {_fmt_dt(pw_changed_at)}
Pwd expires in: {days_left_str}
""", unsafe_allow_html=True )