FAMILY / app.py
VIATEUR-AI's picture
Update app.py
8efd745 verified
import gradio as gr
import sqlite3
import hashlib
from datetime import datetime
# ---------------- Database ----------------
DB_PATH = './chat.db'
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = conn.cursor()
# Users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users(
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE,
password_hash TEXT
)
''')
# Messages table
cursor.execute('''
CREATE TABLE IF NOT EXISTS messages(
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_id INTEGER,
receiver_id INTEGER,
message TEXT,
timestamp TEXT,
FOREIGN KEY(sender_id) REFERENCES users(id),
FOREIGN KEY(receiver_id) REFERENCES users(id)
)
''')
conn.commit()
# ---------------- Default Admin ----------------
DEFAULT_ADMIN = "admin"
DEFAULT_PASSWORD = "1234"
cursor.execute("SELECT * FROM users WHERE username=?", (DEFAULT_ADMIN,))
if not cursor.fetchone():
cursor.execute(
"INSERT INTO users(username,password_hash) VALUES (?,?)",
(DEFAULT_ADMIN, hashlib.sha256(DEFAULT_PASSWORD.encode()).hexdigest())
)
conn.commit()
# ---------------- Utility Functions ----------------
def hash_password(password):
return hashlib.sha256(password.encode()).hexdigest()
def register(username, password):
try:
cursor.execute(
"INSERT INTO users(username,password_hash) VALUES (?,?)",
(username, hash_password(password))
)
conn.commit()
return "Account created successfully!"
except sqlite3.IntegrityError:
return "Username already exists!"
def login(username, password):
cursor.execute(
"SELECT * FROM users WHERE username=? AND password_hash=?",
(username, hash_password(password))
)
user = cursor.fetchone()
if user:
return "Login successful!", gr.update(visible=True), gr.State.update(value=user[0])
else:
return "Login failed!", gr.update(visible=False), gr.State.update(value=None)
def send_message(sender_id, receiver_id, message):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cursor.execute(
"INSERT INTO messages(sender_id,receiver_id,message,timestamp) VALUES (?,?,?,?)",
(sender_id, receiver_id, message, timestamp)
)
conn.commit()
def get_messages_html(user_id, receiver_id):
cursor.execute("""
SELECT sender_id, message, timestamp FROM messages
WHERE (sender_id=? AND receiver_id=?) OR (sender_id=? AND receiver_id=?)
ORDER BY id ASC
""", (user_id, receiver_id, receiver_id, user_id))
msgs = cursor.fetchall()
result = ""
for s_id, msg, ts in msgs:
if s_id == user_id:
result += f'<div style="text-align:right; margin:5px;"><span style="background:#DCF8C6; padding:8px; border-radius:10px; display:inline-block;">{msg} <br><small>{ts}</small></span></div>'
else:
result += f'<div style="text-align:left; margin:5px;"><span style="background:#FFF; padding:8px; border-radius:10px; display:inline-block;">{msg} <br><small>{ts}</small></span></div>'
return result if result else "No messages yet."
def get_all_users(exclude_id):
cursor.execute("SELECT username FROM users WHERE id!=?", (exclude_id,))
users = [u[0] for u in cursor.fetchall()]
if not users:
users = ["No other users"]
return users
# ---------------- Gradio App ----------------
with gr.Blocks() as demo:
gr.Markdown("## 💬 WhatsApp-Style Chat App")
# Login/Register
with gr.Row():
username = gr.Textbox(label="Username")
password = gr.Textbox(label="Password", type="password")
login_btn = gr.Button("Login")
register_btn = gr.Button("Register")
login_status = gr.Label("")
chat_ui = gr.Column(visible=False)
current_user_id = gr.State(value=None)
# Login/Register events
login_btn.click(login, [username, password], [login_status, chat_ui, current_user_id])
register_btn.click(register, [username, password], login_status)
# ---------------- Chat Interface ----------------
with chat_ui:
with gr.Row():
receiver_dropdown = gr.Dropdown(label="Select User")
message_input = gr.Textbox(label="Message", lines=2)
send_btn = gr.Button("Send")
refresh_btn = gr.Button("Refresh Chat")
chat_display = gr.HTML(label="Chat")
# Update dropdown dynamically after login
def update_dropdown(user_id):
return gr.Dropdown.update(choices=get_all_users(user_id))
current_user_id.change(update_dropdown, current_user_id, receiver_dropdown)
# Send message + refresh
def send_and_refresh(sender_id, receiver, msg):
if not receiver or not msg:
return "", gr.update(value="")
cursor.execute("SELECT id FROM users WHERE username=?", (receiver,))
r = cursor.fetchone()
if not r:
return "Receiver not found.", gr.update(value="")
send_message(sender_id, r[0], msg)
return get_messages_html(sender_id, r[0]), gr.update(value="")
send_btn.click(
send_and_refresh,
[current_user_id, receiver_dropdown, message_input],
[chat_display, message_input]
)
# Manual refresh function
def refresh(user_id, receiver):
if not receiver:
return ""
cursor.execute("SELECT id FROM users WHERE username=?", (receiver,))
r = cursor.fetchone()
if not r:
return "Receiver not found."
return get_messages_html(user_id, r[0])
# Connect refresh button
refresh_btn.click(refresh, [current_user_id, receiver_dropdown], chat_display)
demo.launch()