Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import sqlite3 | |
| import hashlib | |
| from datetime import datetime | |
| # ---------------- Database ---------------- | |
| DB_PATH = './chat.db' | |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False) | |
| cursor = conn.cursor() | |
| # Users table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS users( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT UNIQUE, | |
| password_hash TEXT | |
| ) | |
| ''') | |
| # Messages table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS messages( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| sender_id INTEGER, | |
| receiver_id INTEGER, | |
| message TEXT, | |
| timestamp TEXT, | |
| FOREIGN KEY(sender_id) REFERENCES users(id), | |
| FOREIGN KEY(receiver_id) REFERENCES users(id) | |
| ) | |
| ''') | |
| conn.commit() | |
| # ---------------- Default Admin ---------------- | |
| DEFAULT_ADMIN = "admin" | |
| DEFAULT_PASSWORD = "1234" | |
| cursor.execute("SELECT * FROM users WHERE username=?", (DEFAULT_ADMIN,)) | |
| if not cursor.fetchone(): | |
| cursor.execute( | |
| "INSERT INTO users(username,password_hash) VALUES (?,?)", | |
| (DEFAULT_ADMIN, hashlib.sha256(DEFAULT_PASSWORD.encode()).hexdigest()) | |
| ) | |
| conn.commit() | |
| # ---------------- Utility Functions ---------------- | |
| def hash_password(password): | |
| return hashlib.sha256(password.encode()).hexdigest() | |
| def register(username, password): | |
| try: | |
| cursor.execute( | |
| "INSERT INTO users(username,password_hash) VALUES (?,?)", | |
| (username, hash_password(password)) | |
| ) | |
| conn.commit() | |
| return "Account created successfully!" | |
| except sqlite3.IntegrityError: | |
| return "Username already exists!" | |
| def login(username, password): | |
| cursor.execute( | |
| "SELECT * FROM users WHERE username=? AND password_hash=?", | |
| (username, hash_password(password)) | |
| ) | |
| user = cursor.fetchone() | |
| if user: | |
| return "Login successful!", gr.update(visible=True), gr.State.update(value=user[0]) | |
| else: | |
| return "Login failed!", gr.update(visible=False), gr.State.update(value=None) | |
| def send_message(sender_id, receiver_id, message): | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| cursor.execute( | |
| "INSERT INTO messages(sender_id,receiver_id,message,timestamp) VALUES (?,?,?,?)", | |
| (sender_id, receiver_id, message, timestamp) | |
| ) | |
| conn.commit() | |
| def get_messages_html(user_id, receiver_id): | |
| cursor.execute(""" | |
| SELECT sender_id, message, timestamp FROM messages | |
| WHERE (sender_id=? AND receiver_id=?) OR (sender_id=? AND receiver_id=?) | |
| ORDER BY id ASC | |
| """, (user_id, receiver_id, receiver_id, user_id)) | |
| msgs = cursor.fetchall() | |
| result = "" | |
| for s_id, msg, ts in msgs: | |
| if s_id == user_id: | |
| result += f'<div style="text-align:right; margin:5px;"><span style="background:#DCF8C6; padding:8px; border-radius:10px; display:inline-block;">{msg} <br><small>{ts}</small></span></div>' | |
| else: | |
| result += f'<div style="text-align:left; margin:5px;"><span style="background:#FFF; padding:8px; border-radius:10px; display:inline-block;">{msg} <br><small>{ts}</small></span></div>' | |
| return result if result else "No messages yet." | |
| def get_all_users(exclude_id): | |
| cursor.execute("SELECT username FROM users WHERE id!=?", (exclude_id,)) | |
| users = [u[0] for u in cursor.fetchall()] | |
| if not users: | |
| users = ["No other users"] | |
| return users | |
| # ---------------- Gradio App ---------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## 💬 WhatsApp-Style Chat App") | |
| # Login/Register | |
| with gr.Row(): | |
| username = gr.Textbox(label="Username") | |
| password = gr.Textbox(label="Password", type="password") | |
| login_btn = gr.Button("Login") | |
| register_btn = gr.Button("Register") | |
| login_status = gr.Label("") | |
| chat_ui = gr.Column(visible=False) | |
| current_user_id = gr.State(value=None) | |
| # Login/Register events | |
| login_btn.click(login, [username, password], [login_status, chat_ui, current_user_id]) | |
| register_btn.click(register, [username, password], login_status) | |
| # ---------------- Chat Interface ---------------- | |
| with chat_ui: | |
| with gr.Row(): | |
| receiver_dropdown = gr.Dropdown(label="Select User") | |
| message_input = gr.Textbox(label="Message", lines=2) | |
| send_btn = gr.Button("Send") | |
| refresh_btn = gr.Button("Refresh Chat") | |
| chat_display = gr.HTML(label="Chat") | |
| # Update dropdown dynamically after login | |
| def update_dropdown(user_id): | |
| return gr.Dropdown.update(choices=get_all_users(user_id)) | |
| current_user_id.change(update_dropdown, current_user_id, receiver_dropdown) | |
| # Send message + refresh | |
| def send_and_refresh(sender_id, receiver, msg): | |
| if not receiver or not msg: | |
| return "", gr.update(value="") | |
| cursor.execute("SELECT id FROM users WHERE username=?", (receiver,)) | |
| r = cursor.fetchone() | |
| if not r: | |
| return "Receiver not found.", gr.update(value="") | |
| send_message(sender_id, r[0], msg) | |
| return get_messages_html(sender_id, r[0]), gr.update(value="") | |
| send_btn.click( | |
| send_and_refresh, | |
| [current_user_id, receiver_dropdown, message_input], | |
| [chat_display, message_input] | |
| ) | |
| # Manual refresh function | |
| def refresh(user_id, receiver): | |
| if not receiver: | |
| return "" | |
| cursor.execute("SELECT id FROM users WHERE username=?", (receiver,)) | |
| r = cursor.fetchone() | |
| if not r: | |
| return "Receiver not found." | |
| return get_messages_html(user_id, r[0]) | |
| # Connect refresh button | |
| refresh_btn.click(refresh, [current_user_id, receiver_dropdown], chat_display) | |
| demo.launch() |