game / app.py
Muthuraja18's picture
Update app.py (#97)
ac71dab verified
# app.py
"""
Unified Online/Offline AI Quiz Game with Friends, Chat, Presence, Invites.
- Offline: uses local JSON files under ./data/
- Online: uses Firebase Realtime Database when configured (optional)
- Put your Firebase service account JSON next to this file and name it serviceAccountKey.json
- FIREBASE_DB_URL is set to your Firebase project's Realtime DB (from your screenshot)
"""
import requests
import os
import uuid
import time
import json
from groq import Groq
from datetime import datetime, timedelta
import streamlit as st
import pandas as pd
import random
from streamlit.components.v1 import html
import plotly.express as px
# Try to import firebase-admin (optional). If unavailable, app will run Offline.
try:
import firebase_admin
from firebase_admin import credentials, db
FIREBASE_AVAILABLE = True
except Exception:
FIREBASE_AVAILABLE = False
# ---------------- Page config ----------------
st.set_page_config(page_title="AI Quiz Game โ€” Online/Offline", layout="wide")
# ---------------- Configuration ----------------
DATA_DIR = "data"
os.makedirs(DATA_DIR, exist_ok=True)
# Local filenames
GAMES_FILE = os.path.join(DATA_DIR, "games.json")
PLAYERS_FILE = os.path.join(DATA_DIR, "players.json")
MESSAGES_FILE = os.path.join(DATA_DIR, "messages.json")
SESSIONS_FILE = os.path.join(DATA_DIR, "sessions.json")
LEADERBOARD_FILE = os.path.join(DATA_DIR, "leaderboard.csv")
FRIENDS_FILE = os.path.join(DATA_DIR, "friends.json")
INBOX_FILE = os.path.join(DATA_DIR, "inbox.json") # friend requests & invitations
# Firebase defaults (you provided project)
FIREBASE_CREDENTIALS = os.getenv("FIREBASE_CREDENTIALS", "serviceAccountKey.json")
FIREBASE_DB_URL = os.getenv("FIREBASE_DB_URL", "https://real-time-database-fe632-default-rtdb.firebaseio.com/")
# Heartbeat threshold (seconds) to consider a session active
HEARTBEAT_THRESHOLD_SECONDS = 40
# ---------------- Example Questions DB (expand as needed) ----------------
questions_db = {
"Geography": [
("What is the capital of France?", ["Paris", "London", "Berlin", "Madrid"], "Paris"),
("Largest country by area?", ["Canada", "USA", "Russia", "China"], "Russia"),
("River through Egypt?", ["Nile", "Amazon", "Ganges", "Yangtze"], "Nile"),
("Mount Everest is in which country?", ["Nepal", "India", "China", "Bhutan"], "Nepal"),
("Which ocean is the largest?", ["Atlantic", "Pacific", "Indian", "Arctic"], "Pacific"),
("Which country has the most population?", ["China", "India", "USA", "Indonesia"], "China"),
("Capital of Japan?", ["Tokyo", "Kyoto", "Osaka", "Hiroshima"], "Tokyo"),
("Longest river in the world?", ["Nile", "Amazon", "Yangtze", "Mississippi"], "Nile"),
("Which desert is the largest?", ["Sahara", "Gobi", "Kalahari", "Arctic"], "Sahara"),
("Capital of Australia?", ["Sydney", "Melbourne", "Canberra", "Brisbane"], "Canberra")
],
"Math": [
("5 * 12?", ["50", "60", "55", "70"], "60"),
("sqrt(64)?", ["6","7","8","9"], "8"),
("What is 15 + 25?", ["35","40","45","50"], "40"),
("100 รท 4?", ["20", "25", "30", "24"], "25"),
("If x+5=12, x=?", ["5","6","7","8"], "7"),
("Area of a circle with radius 7?", ["154", "144", "160", "150"], "154"),
("7^2 = ?", ["49","42","56","36"], "49"),
("10% of 200?", ["10","20","15","25"], "20"),
("Solve: 3x = 15, x = ?", ["4","5","6","7"], "5"),
("What is 9 * 8?", ["72","81","64","69"], "72")
],
"Science": [
("H2O is?", ["Water","CO2","O2","H2"], "Water"),
("Who developed relativity?", ["Newton","Einstein","Tesla","Curie"], "Einstein"),
("Sun is a?", ["Star","Planet","Moon","Asteroid"], "Star"),
("Light speed is approximately?", ["3x10^8 m/s","3x10^6 m/s","3x10^5 km/s","3x10^7 km/s"], "3x10^8 m/s"),
("Which gas do plants absorb?", ["Oxygen","CO2","Nitrogen","Helium"], "CO2"),
("The human brain weighs about?", ["1kg","1.4kg","2kg","2.5kg"], "1.4kg"),
("Chemical symbol for Gold?", ["Au","Ag","Go","Gd"], "Au"),
("Which planet is called Red Planet?", ["Mars","Venus","Jupiter","Mercury"], "Mars"),
("Which part of the plant conducts photosynthesis?", ["Root","Stem","Leaf","Flower"], "Leaf"),
("What is the boiling point of water?", ["90ยฐC","100ยฐC","120ยฐC","80ยฐC"], "100ยฐC")
],
"IPL": [
("2020 IPL winner?", ["Mumbai Indians","Delhi Capitals","RCB","CSK"], "Mumbai Indians"),
("Which team is called Yellow Army?", ["CSK","MI","DC","SRH"], "CSK"),
("Who won the Orange Cap in 2021 IPL?", ["KL Rahul","Faf du Plessis","Ruturaj Gaikwad","Shikhar Dhawan"], "Ruturaj Gaikwad"),
("Which team has won the most IPL titles?", ["MI","CSK","RCB","KKR"], "MI"),
("Who is known as Mr. IPL?", ["MS Dhoni","Rohit Sharma","Virat Kohli","AB de Villiers"], "MS Dhoni"),
("First IPL season was in?", ["2007","2008","2009","2010"], "2008"),
("Which team represents Bangalore?", ["RCB","MI","KKR","SRH"], "RCB"),
("Purple Cap is for?", ["Highest scorer","Best bowler","Best fielder","Best captain"], "Best bowler"),
("Which team plays at Eden Gardens?", ["KKR","CSK","MI","RCB"], "KKR"),
("Who scored fastest 50 in IPL?", ["KL Rahul","Chris Gayle","Andre Russell","AB de Villiers"], "Chris Gayle")
],
"History": [
("Who was the first President of the USA?", ["George Washington","Abraham Lincoln","Thomas Jefferson","John Adams"], "George Washington"),
("In which year did India gain independence?", ["1945","1947","1950","1952"], "1947"),
("The Great Wall is in which country?", ["China","Japan","Korea","Mongolia"], "China"),
("Who discovered America?", ["Columbus","Magellan","Vasco da Gama","Cook"], "Columbus"),
("French Revolution started in?", ["1789","1776","1800","1799"], "1789"),
("First man on the moon?", ["Neil Armstrong","Buzz Aldrin","Yuri Gagarin","John Glenn"], "Neil Armstrong"),
("Who invented the printing press?", ["Gutenberg","Edison","Tesla","Newton"], "Gutenberg"),
("World War II ended in?", ["1943","1945","1947","1950"], "1945"),
("Who was known as Iron Man of India?", ["Sardar Patel","Nehru","Gandhi","Tilak"], "Sardar Patel"),
("Which empire built the Colosseum?", ["Roman Empire","Greek Empire","Egyptian Empire","Persian Empire"], "Roman Empire")
],
"Technology": [
("Who is the founder of Microsoft?", ["Steve Jobs","Bill Gates","Elon Musk","Mark Zuckerberg"], "Bill Gates"),
("HTML stands for?", ["Hyper Text Markup Language","High Text Markup Language","Hyperlinks Text Mark Language","None"], "Hyper Text Markup Language"),
("Python is a type of?", ["Snake","Programming Language","Car","Game"], "Programming Language"),
("CPU stands for?", ["Central Process Unit","Central Processing Unit","Control Processing Unit","Computer Processing Unit"], "Central Processing Unit"),
("Java is a?", ["Programming Language","Coffee","Operating System","Browser"], "Programming Language"),
("WWW stands for?", ["World Wide Web","Wide World Web","Web World Wide","None"], "World Wide Web"),
("What is 1 Gigabyte in MB?", ["512MB","1024MB","2048MB","1000MB"], "1024MB"),
("Google was founded in?", ["1996","1998","2000","2002"], "1998"),
("First computer virus was?", ["Creeper","ILOVEYOU","Michelangelo","Morris"], "Creeper"),
("Linux OS was developed by?", ["Linus Torvalds","Bill Gates","Steve Jobs","Tim Berners-Lee"], "Linus Torvalds")
],
"Sports": [
("How many players in a football team?", ["9","10","11","12"], "11"),
("Olympics are held every?", ["2 years","4 years","3 years","5 years"], "4 years"),
("Tennis player known as โ€˜Federerโ€™?", ["Roger Federer","Rafael Nadal","Novak Djokovic","Andy Murray"], "Roger Federer"),
("Cricket World Cup held every?", ["2","4","5","3"], "4"),
("Which country won the first football World Cup?", ["Brazil","Uruguay","Germany","Italy"], "Uruguay"),
("Formula 1 world champion 2020?", ["Hamilton","Verstappen","Vettel","Leclerc"], "Hamilton"),
("Number of players in a basketball team?", ["5","6","7","8"], "5"),
("Which country hosts Wimbledon?", ["USA","UK","France","Australia"], "UK"),
("Who holds most Olympic golds?", ["Michael Phelps","Usain Bolt","Carl Lewis","Mark Spitz"], "Michael Phelps"),
("Which sport uses the term 'love'?", ["Tennis","Badminton","Squash","Golf"], "Tennis")
]
}
# ----------------- JSON helpers ---------------
client = Groq(api_key=st.secrets["GROQ_API_KEY"].strip())
MODEL_NAME = "openai/gpt-oss-20b"
def generate_ai_questions(topic, num_questions=5, gid=None, questions_db=None):
"""
Generate MCQs using Groq AI for a topic.
Fallback to questions_db if AI fails or no data returned.
Returns list of dicts: {"question":..., "options": [...], "answer":...}
"""
# Ensure fallback exists
if questions_db is None:
questions_db = {}
prompt = f"""
Generate {num_questions} high-quality MCQ questions for the topic "{topic}".
STRICT FORMAT:
[
{{
"topic": "{topic}",
"question": "Question text",
"options": ["Option 1", "Option 2", "Option 3", "Option 4"],
"answer": "Correct option text"
}}
]
RULES:
- ONLY valid JSON array.
- No extra text.
- No markdown.
- No explanations.
"""
try:
st.info(f"Generating AI questions for topic '{topic}'...")
response = client.chat.completions.create(
model=MODEL_NAME,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=800,
)
raw = response.choices[0].message.content.strip()
# Remove code blocks if AI adds them
if raw.startswith("```"):
raw = raw.split("```")[1].replace("json", "").strip()
data = json.loads(raw)
# Optional: save Excel if gid provided
if gid:
os.makedirs("ai_questions_excel", exist_ok=True)
rows = []
for q in data:
rows.append({
"topic": q.get("topic", topic),
"question": q.get("question", ""),
"option_1": q.get("options", [""]*4)[0],
"option_2": q.get("options", [""]*4)[1],
"option_3": q.get("options", [""]*4)[2],
"option_4": q.get("options", [""]*4)[3],
"answer": q.get("answer", "")
})
df = pd.DataFrame(rows)
df.to_excel(f"ai_questions_excel/{gid}_questions.xlsx", index=False)
return data[:num_questions]
except Exception as e:
st.warning(f"AI generation failed for topic '{topic}': {e}")
st.info("Using fallback questions from questions_db if available.")
# Fallback to static questions_db
fallback = questions_db.get(topic.lower(), questions_db.get("default", []))
return fallback[:num_questions]
def load_json(path, default):
if os.path.exists(path):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return default
return default
def save_json(path, data):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# Ensure base files exist (local)
base_defaults = {
GAMES_FILE: {},
PLAYERS_FILE: {},
MESSAGES_FILE: {},
SESSIONS_FILE: {},
FRIENDS_FILE: {},
INBOX_FILE: {}
}
for p, d in base_defaults.items():
if not os.path.exists(p):
save_json(p, d)
if not os.path.exists(LEADERBOARD_FILE):
pd.DataFrame(columns=['name','score','game_id','topics','timestamp','avatar','questions','answers','correct_flags']).to_csv(LEADERBOARD_FILE, index=False)
# ----------------- Firebase init -----------------
def init_firebase_if_needed():
"""Initialize Firebase Admin if available and credentials present. Return (ok,msg)."""
global FIREBASE_AVAILABLE
if not FIREBASE_AVAILABLE:
return False, "firebase-admin not installed"
if not FIREBASE_DB_URL:
return False, "FIREBASE_DB_URL not set"
if not os.path.exists(FIREBASE_CREDENTIALS):
return False, f"Service account file not found at {FIREBASE_CREDENTIALS}"
try:
if not firebase_admin._apps:
cred = credentials.Certificate(FIREBASE_CREDENTIALS)
firebase_admin.initialize_app(cred, {"databaseURL": FIREBASE_DB_URL})
return True, "Firebase initialized"
except Exception as e:
return False, f"Firebase init error: {e}"
# Firebase helpers
def fb_get(path):
try:
ref = db.reference(path)
return ref.get()
except Exception:
return None
def fb_set(path, value):
ref = db.reference(path)
ref.set(value)
def fb_push(path, value):
ref = db.reference(path).push()
ref.set(value)
return ref.key
# ----------------- Unified DB API (Online or Offline) -----------------
def local_get(collection):
if collection == "games":
return load_json(GAMES_FILE, {})
if collection == "players":
return load_json(PLAYERS_FILE, {})
if collection == "messages":
return load_json(MESSAGES_FILE, {})
if collection == "sessions":
return load_json(SESSIONS_FILE, {})
if collection == "friends":
return load_json(FRIENDS_FILE, {})
if collection == "inbox":
return load_json(INBOX_FILE, {})
if collection == "leaderboard":
try:
df = pd.read_csv(LEADERBOARD_FILE)
return df.to_dict(orient="records")
except Exception:
return []
return {}
def local_set(collection, value):
if collection == "games":
save_json(GAMES_FILE, value)
elif collection == "players":
save_json(PLAYERS_FILE, value)
elif collection == "messages":
save_json(MESSAGES_FILE, value)
elif collection == "sessions":
save_json(SESSIONS_FILE, value)
elif collection == "friends":
save_json(FRIENDS_FILE, value)
elif collection == "inbox":
save_json(INBOX_FILE, value)
elif collection == "leaderboard":
try:
df = pd.DataFrame(value)
df.to_csv(LEADERBOARD_FILE, index=False)
except Exception:
pass
def unified_get(collection):
mode = st.session_state.get("mode_selection", "Offline")
if mode == "Online":
ok, msg = init_firebase_if_needed()
if not ok:
return local_get(collection)
# map collection to firebase path
if collection == "games":
return fb_get("/games") or {}
if collection == "players":
return fb_get("/players") or {}
if collection == "messages":
return fb_get("/messages") or {}
if collection == "sessions":
return fb_get("/active_sessions") or {}
if collection == "friends":
return fb_get("/friends") or {}
if collection == "inbox":
return fb_get("/inbox") or {}
if collection == "leaderboard":
raw = fb_get("/leaderboard") or {}
if isinstance(raw, dict):
return list(raw.values())
return raw
else:
return local_get(collection)
def unified_set(collection, data):
mode = st.session_state.get("mode_selection", "Offline")
if mode == "Online":
ok, msg = init_firebase_if_needed()
if not ok:
local_set(collection, data)
return
if collection == "games":
fb_set("/games", data); return
if collection == "players":
fb_set("/players", data); return
if collection == "messages":
fb_set("/messages", data); return
if collection == "sessions":
fb_set("/active_sessions", data); return
if collection == "friends":
fb_set("/friends", data); return
if collection == "inbox":
fb_set("/inbox", data); return
if collection == "leaderboard":
local_set(collection, data); return
else:
local_set(collection, data)
def unified_push_message(game_id, msg_obj):
mode = st.session_state.get("mode_selection", "Offline")
if mode == "Online":
ok, _ = init_firebase_if_needed()
if ok:
fb_push(f"/messages/{game_id}", msg_obj)
return
all_msgs = unified_get("messages") or {}
game_msgs = all_msgs.get(game_id, [])
game_msgs.append(msg_obj)
if len(game_msgs) > 500:
game_msgs = game_msgs[-500:]
all_msgs[game_id] = game_msgs
unified_set("messages", all_msgs)
def unified_push_leaderboard(row):
mode = st.session_state.get("mode_selection", "Offline")
if mode == "Online":
ok, _ = init_firebase_if_needed()
if ok:
fb_push("/leaderboard", row)
# also save local backup
try:
df = pd.read_csv(LEADERBOARD_FILE)
except Exception:
df = pd.DataFrame(columns=list(row.keys()))
df = pd.concat([df, pd.DataFrame([row])], ignore_index=True)
df.to_csv(LEADERBOARD_FILE, index=False)
return
# offline append CSV
try:
df = pd.read_csv(LEADERBOARD_FILE)
except Exception:
df = pd.DataFrame(columns=list(row.keys()))
df = pd.concat([df, pd.DataFrame([row])], ignore_index=True)
df.to_csv(LEADERBOARD_FILE, index=False)
def get_weekly_leaderboard(limit=10):
mode = st.session_state.get("mode_selection", "Offline")
rows = []
if mode == "Online":
ok, _ = init_firebase_if_needed()
if ok:
rows = fb_get("/leaderboard") or []
else:
try:
rows = pd.read_csv(LEADERBOARD_FILE).to_dict("records")
except Exception:
rows = []
if not rows:
return []
# Start of current week (Monday)
now = datetime.utcnow()
start_of_week = now - timedelta(days=now.weekday())
weekly = []
for r in rows:
ts = r.get("timestamp")
if not ts:
continue
try:
played = datetime.fromisoformat(ts)
except Exception:
continue
if played >= start_of_week:
weekly.append(r)
weekly.sort(key=lambda x: x.get("score", 0), reverse=True)
return weekly[:limit]
# ----------------- Session & Presence helpers -----------------
def now_iso():
return datetime.utcnow().isoformat()
def parse_iso(s):
try:
return datetime.fromisoformat(s)
except Exception:
return None
def ensure_session_ids():
if "uid" not in st.session_state:
st.session_state['uid'] = str(uuid.uuid4())
if "session_id" not in st.session_state:
st.session_state['session_id'] = str(uuid.uuid4())
ensure_session_ids()
def claim_session_unified(game_id, username):
sessions = unified_get("sessions") or {}
game_sessions = sessions.get(game_id, {})
rec = game_sessions.get(username)
now = now_iso()
if rec is None:
game_sessions[username] = {"session_id": st.session_state['session_id'], "last_heartbeat": now}
sessions[game_id] = game_sessions
unified_set("sessions", sessions)
return True, ""
last = parse_iso(rec.get("last_heartbeat"))
if rec.get("session_id") == st.session_state['session_id']:
rec["last_heartbeat"] = now
game_sessions[username] = rec
sessions[game_id] = game_sessions
unified_set("sessions", sessions)
return True, ""
if last and (datetime.utcnow() - last) < timedelta(seconds=HEARTBEAT_THRESHOLD_SECONDS):
return False, "You are active in another tab/device. Return to that tab or wait."
# override stale
game_sessions[username] = {"session_id": st.session_state['session_id'], "last_heartbeat": now}
sessions[game_id] = game_sessions
unified_set("sessions", sessions)
return True, ""
def heartbeat_unified(game_id, username):
sessions = unified_get("sessions") or {}
game_sessions = sessions.get(game_id, {})
rec = game_sessions.get(username, {})
rec["session_id"] = st.session_state['session_id']
rec["last_heartbeat"] = now_iso()
game_sessions[username] = rec
sessions[game_id] = game_sessions
unified_set("sessions", sessions)
# ----------------- Friends & Inbox helpers -----------------
def get_friends_map():
return unified_get("friends") or {}
def save_friends_map(m):
unified_set("friends", m)
def get_inbox():
return unified_get("inbox") or {}
def save_inbox(i):
unified_set("inbox", i)
def send_friend_request(from_user, to_user):
inbox = get_inbox()
user_inbox = inbox.get(to_user, [])
user_inbox.append({"type":"friend_request","from":from_user,"ts":now_iso()})
inbox[to_user] = user_inbox
save_inbox(inbox)
def accept_friend_request(current_user, from_user):
friends = get_friends_map()
friends.setdefault(current_user, [])
friends.setdefault(from_user, [])
if from_user not in friends[current_user]:
friends[current_user].append(from_user)
if current_user not in friends[from_user]:
friends[from_user].append(current_user)
save_friends_map(friends)
# remove request
inbox = get_inbox()
entries = inbox.get(current_user, [])
entries = [e for e in entries if not (e.get('type')=='friend_request' and e.get('from')==from_user)]
inbox[current_user] = entries
save_inbox(inbox)
def send_game_invite(from_user, to_user, game_id):
inbox = get_inbox()
user_inbox = inbox.get(to_user, [])
user_inbox.append({"type":"invite","from":from_user,"game_id":game_id,"ts":now_iso()})
inbox[to_user] = user_inbox
save_inbox(inbox)
# ----------------- Game helpers -----------------
def compute_winners(game_id):
rows = unified_get("leaderboard") or []
df = pd.DataFrame(rows)
if df.empty:
return []
df = df[df["game_id"] == game_id]
if df.empty:
return []
df = df.sort_values(
by=["score", "timestamp"],
ascending=[False, True]
)
return df.head(3).to_dict(orient="records")
#------Total players--------------------
def get_total_online_players():
players = unified_get("players") or {}
total = 0
for gid in players:
total += len(players[gid])
return total
# ----------------- Create Game Page -----------------
def create_game_page():
st.header("Create Game")
host = st.text_input(
"Host name",
value=st.session_state.get("username", "")
)
# ๐Ÿ”น Normal topics (static)
topics = st.multiselect(
"Topics",
list(questions_db.keys())
)
# ๐Ÿ”ฅ NEW: AI topic input
ai_topic = st.text_input(
"AI Topic (optional)",
placeholder="Eg: IPL 2024, Space, Python"
)
num_questions = st.number_input("Number of Questions", min_value=1, max_value=20, value=5)
auto_close = st.checkbox("Auto-close game after submission?", value=True)
if st.button("Create Game"):
if not host:
st.warning("Please enter your name.")
return
if not topics and not ai_topic:
st.warning("Please select at least one topic or enter an AI topic.")
return
# Pass AI topic to create_game
gid = create_game(topics=topics, num_questions=num_questions, auto_close=auto_close, ai_topic=ai_topic)
st.success(f"Game created! Game ID: {gid}")
st.session_state['game_id'] = gid
st.session_state['username'] = host
# Optional: generate AI questions immediately if AI topic is provided
if ai_topic:
st.info(f"AI questions will be generated for: {ai_topic}")
# Here you can call your AI question generator function if you have one
# 1๏ธโƒฃ Define join_game first
def join_game(game_id, username, avatar):
games = unified_get("games") or {}
if game_id not in games:
return False, "Invalid Game ID"
if games[game_id].get("closed"):
return False, "Game is closed"
players = unified_get("players") or {}
if game_id not in players:
players[game_id] = {}
game_players = players[game_id]
if username in game_players:
game_players[username]['avatar'] = avatar
game_players[username]['last_joined'] = now_iso()
else:
game_players[username] = {"avatar": avatar, "joined_at": now_iso(), "submitted": False}
players[game_id] = game_players
unified_set("players", players)
ok, msg = claim_session_unified(game_id, username)
if ok:
players[game_id][username]['last_heartbeat'] = now_iso()
unified_set("players", players)
return ok, msg
def compute_score(questions, answers, times=None):
"""
Compute the total score and correctness flags for each question.
Args:
questions (list): List of question dicts. Each dict must have 'answer' key.
answers (list): List of user's answers corresponding to questions.
times (list, optional): Time taken for each question (in seconds).
Returns:
score (int): Total score.
flags (list): List of correctness flags: True if correct, False if incorrect.
"""
score_per_question = 15
score = 0
flags = []
for idx, q in enumerate(questions):
correct_ans = q.get("answer")
user_ans = answers[idx] if idx < len(answers) else None
if user_ans == correct_ans:
score += score_per_question
flags.append(True)
else:
flags.append(False)
# Optional: implement time-based penalties if needed
# if times:
# if times[idx] > 15: # example: 15 sec limit
# score -= 5 # penalty
return score, flags
# ----------------- UI -----------------
st.sidebar.title("Mode & Profile")
mode_choice = st.sidebar.selectbox("Mode", ["Offline (local JSON)", "Online (Firebase)"], index=0)
st.session_state['mode_selection'] = "Online" if mode_choice.startswith("Online") else "Offline"
# If user chose Online, attempt to init and show feedback
if st.session_state['mode_selection'] == "Online":
ok, msg = init_firebase_if_needed()
if not ok:
st.sidebar.error(f"Online init failed: {msg}. Working Offline.")
st.session_state['mode_selection'] = "Offline"
else:
st.sidebar.success("Online (Firebase) ready.")
# Sidebar inputs
st.sidebar.markdown("### You")
username = st.sidebar.text_input("Your name", value=st.session_state.get("username",""), key="sidebar_username")
avatar = st.sidebar.selectbox("Avatar", ["๐ŸŽฎ","๐Ÿฑ","๐Ÿถ","๐Ÿฆ„","๐Ÿ‘ฝ","๐ŸŽฉ"], index=0, key="sidebar_avatar")
# Save to session_state
st.session_state['username'] = username or st.session_state.get("username","")
st.session_state['avatar'] = avatar or st.session_state.get("avatar","๐ŸŽฎ")
if st.sidebar.button("Refresh"):
st.rerun()
page = st.sidebar.selectbox("Page", ["Home","Create Game","Join Game","Play","Friends","Inbox","Leaderboard"], index=0)
st.title("AI Quiz Game โ€” Online/Offline (Friends & Chat)")
def render_copy_button(val, key):
copy_html = f'''
<div style="display:flex;gap:8px;align-items:center;">
<input id="gid_{key}" value="{val}" readonly style="padding:6px;border:1px solid #ddd;border-radius:6px;">
<button onclick="navigator.clipboard.writeText(document.getElementById('gid_{key}').value)" style="padding:6px 10px;border-radius:6px;cursor:pointer;">Copy</button>
</div>
'''
html(copy_html)
def get_top3_and_player_count(game_id):
rows = unified_get("leaderboard") or []
if not rows:
return [], 0
df = pd.DataFrame(rows)
df = df[df["game_id"] == game_id]
if df.empty:
return [], 0
df = df.sort_values(
by=["score", "timestamp"],
ascending=[False, True] # stable winner
)
top3 = df.head(3).to_dict(orient="records")
total_players = df["name"].nunique()
return top3, total_players
# Home page
HEARTBEAT_THRESHOLD_SECONDS = 60 # adjust if needed
def home_page():
st.header("Home")
st.write("Create games, invite friends, play and climb the leaderboard.")
# ---------------- TOTAL ONLINE PLAYERS ----------------
players_map = unified_get("players") or {}
total_online = sum(len(v) for v in players_map.values())
st.metric("๐ŸŸข Players Online", total_online)
# Show last score
if st.session_state.get('last_score') is not None:
st.success(
f"Your last score: {st.session_state['last_score']} "
f"(Game {st.session_state.get('last_game')})"
)
# Load games
games = unified_get("games") or {}
st.subheader("Recent games")
for g in sorted(games.values(), key=lambda x: x.get("created_at", ""), reverse=True)[:10]:
gid = g.get("game_id")
players_here = players_map.get(gid, {}) or {}
st.markdown(f"### ๐ŸŽฎ Game: **{gid}** {'(Closed)' if g.get('closed') else ''}")
st.write(f"Host: {g.get('host')} โ€” Topics: {', '.join(g.get('topics', []))}")
st.write(f"Created: {g.get('created_at')}")
# ---------------- GAME STATS ----------------
joined = len(players_here)
submitted = sum(1 for p in players_here.values() if p.get("submitted"))
st.write(f"Players joined: **{joined}**")
st.write(f"Submitted: **{submitted} / {joined}**")
# ---------------- TOP 3 PLAYERS ----------------
submitted_players = [
(u, d) for u, d in players_here.items() if d.get("submitted")
]
submitted_players.sort(key=lambda x: x[1].get("score", 0), reverse=True)
if submitted_players:
st.markdown("๐Ÿ† **Top 3 Players**")
for i, (uname_p, info) in enumerate(submitted_players[:3], start=1):
st.write(
f"{i}. {info.get('avatar','๐ŸŽฎ')} **{uname_p}** โ€” "
f"{info.get('score',0)} pts ({info.get('percentage',0)}%)"
)
# ---------------- PLAYER STATUS ----------------
if players_here:
st.markdown("**๐Ÿ‘ฅ Player Status**")
for uname_p, info in players_here.items():
status = "โœ… Submitted" if info.get("submitted") else "โณ Playing"
st.write(f"{info.get('avatar','๐ŸŽฎ')} **{uname_p}** โ€” {status}")
st.markdown("---")
# ---------------- INVITE & CHALLENGE ----------------
if not g.get("closed"):
st.info(f"Share this Game ID: {gid}")
render_copy_button(gid, gid)
if st.session_state.get("username"):
if st.button(f"Invite your friends to {gid}", key=f"invite_{gid}"):
friends = get_friends_map().get(st.session_state["username"], [])
if not friends:
st.warning("No friends to invite.")
else:
for f in friends:
send_game_invite(st.session_state["username"], f, gid)
st.success("Invites sent to friends.")
if st.button(f"Challenge friends with a new game like {gid}", key=f"challenge_{gid}"):
new_id = create_game(
st.session_state.get("username", "Host"),
g.get("topics", []),
num_questions=len(g.get("questions", []))
)
st.session_state["active_game_id"] = new_id
st.session_state["game_questions"] = games.get(new_id, {}).get("questions", [])
st.success(f"Challenge created: {new_id}")
st.rerun()
# Weekly leaderboard
st.subheader("๐Ÿ† Weekly Leaderboard (Top 10)")
weekly = get_weekly_leaderboard()
if not weekly:
st.info("No scores yet this week.")
else:
for i, r in enumerate(weekly, 1):
st.write(
f"{i}. {r.get('avatar','๐ŸŽฎ')} **{r['name']}** "
f"(Game {r['game_id']}) โ€” {r['score']} pts"
)
# ----------------------------
# Play Page
if 'active_game' not in st.session_state:
games = unified_get("games") or {}
if games:
first_game = list(games.keys())[0]
st.session_state['active_game'] = first_game
else:
st.warning("No games exist yet. Please create a game first.")
if 'username' not in st.session_state:
st.session_state['username'] = "Guest" # temporary default username
def create_game(host=None, topics=[], num_questions=5, auto_close=True, ai_topic=None):
games = unified_get("games") or {}
gid = f"GAME{int(time.time())}"
host = host or st.session_state.get("username", "Host")
questions = []
# 1๏ธโƒฃ AI questions (already dict format)
if ai_topic:
ai_questions = generate_ai_questions(ai_topic, num_questions=num_questions)
if ai_questions:
for q in ai_questions:
questions.append({
"question": q.get("question", ""),
"options": q.get("options", []),
"answer": q.get("answer", "")
})
# 2๏ธโƒฃ Static fallback (tuple โ†’ dict conversion)
if not questions:
for topic in topics:
qs = questions_db.get(topic, [])
for q in qs[:num_questions]:
questions.append({
"question": q[0],
"options": q[1],
"answer": q[2]
})
# ๐Ÿ”น Store game
games[gid] = {
"game_id": gid,
"host": host,
"topics": topics,
"questions": questions,
"created_at": now_iso(),
"closed": False,
"auto_close": auto_close
}
unified_set("games", games)
# โœ… REQUIRED: prepare play state
st.session_state['game_id'] = gid
st.session_state['active_game_id'] = gid
st.session_state['game_questions'] = questions
st.session_state['current_index'] = 0
st.session_state['answers'] = [""] * len(questions)
st.session_state['answer_times'] = [None] * len(questions)
st.session_state['question_started_at'] = None
st.success(f"Game created: {gid} with {len(questions)} questions.")
return gid
# -------------------------
# PLAY PAGE
# -------------------------
def play_page():
import time
import streamlit as st
gid = st.session_state.get("active_game_id")
uname = st.session_state.get("username")
if not gid or not uname:
st.error("No active game or username found. Please join or create a game first.")
return
# Get game and questions
games = unified_get("games") or {}
game = games.get(gid)
if not game:
st.error("Game not found.")
return
if game.get('closed'):
st.warning("This game is closed.")
return
questions = game.get("questions", [])
if not questions:
st.info("No questions loaded for this game.")
return
# Initialize session state for answers and times
if 'answers' not in st.session_state:
st.session_state['answers'] = [""] * len(questions)
if 'answer_times' not in st.session_state:
st.session_state['answer_times'] = [0] * len(questions)
if 'current_index' not in st.session_state:
st.session_state['current_index'] = 0
if 'question_started_at' not in st.session_state:
st.session_state['question_started_at'] = time.time()
idx = st.session_state['current_index']
# All done
if idx >= len(questions):
st.success("All done โ€” submit your answers!")
return
# ---------------- QUESTION UI ----------------
q = questions[idx]
st.subheader(f"Question {idx + 1}/{len(questions)}")
st.write(q["question"])
# Safe elapsed calculation
start_time = st.session_state.get('question_started_at') or time.time()
elapsed = int(time.time() - start_time)
time_limit = 15
st.markdown(f"**Time left:** {max(0, time_limit - elapsed)} seconds")
# Stable radio buttons
choice = st.radio(
"Choose an answer:",
q["options"],
key=f"choice_{gid}_{idx}"
)
col1, col2 = st.columns(2)
# ---------------- NEXT ----------------
with col1:
if st.button("Next", key=f"next_{gid}_{idx}"):
st.session_state['answers'][idx] = choice
st.session_state['answer_times'][idx] = time.time() - start_time
st.session_state['current_index'] = idx + 1
st.session_state['question_started_at'] = time.time()
st.rerun()
# ---------------- SUBMIT ----------------
with col2:
if idx == len(questions) - 1:
if st.button("Submit All Answers", key=f"submit_{gid}_{idx}"):
# Record last question
st.session_state['answers'][idx] = choice
st.session_state['answer_times'][idx] = time.time() - start_time
answers = st.session_state['answers']
times = st.session_state['answer_times']
# Compute score
score, flags = compute_score(questions, answers, times)
percentage = int(score / (len(questions) * 15) * 100)
# ---------------- Update players dict ----------------
players = unified_get("players") or {}
players.setdefault(gid, {})
players[gid][uname] = {
"submitted": True,
"score": score,
"percentage": percentage,
"answers": answers,
"avatar": st.session_state.get("avatar", "๐ŸŽฎ"),
"timestamp": now_iso()
}
unified_set("players", players)
# ---------------- Push leaderboard ----------------
row = {
"name": uname,
"avatar": st.session_state.get("avatar", "๐ŸŽฎ"),
"score": score,
"percentage": percentage,
"game_id": gid,
"topics": game.get("topics", []),
"timestamp": now_iso(),
"questions": len(questions),
"answers": answers,
"correct_flags": flags
}
unified_push_leaderboard(row)
# Auto-close game if needed
if game.get('auto_close', True):
games[gid]['closed'] = True
games[gid]['closed_at'] = now_iso()
unified_set("games", games)
# Reset session state
st.success(f"Submitted! Score: {score} / {len(questions)*15} ({percentage}%)")
st.balloons()
st.session_state['last_score'] = score
st.session_state['last_game'] = gid
st.session_state['current_index'] = 0
st.session_state['answers'] = []
st.session_state['answer_times'] = []
st.rerun()
# Join game
def join_game_page():
st.header("Join Game")
game_id = st.text_input("Enter Game ID")
username = st.text_input("Your Name")
avatar = st.selectbox("Choose Avatar", ["๐ŸŽฎ","๐Ÿค–","๐Ÿงฉ","๐Ÿ›ก๏ธ"])
if st.button("Join Game"):
if not game_id or not username:
st.warning("Enter both Game ID and Username")
return
ok, msg = join_game(game_id, username, avatar)
if ok:
st.success(f"Joined game {game_id} successfully!")
# Load game safely
games = unified_get("games") or {}
g = games.get(game_id, {})
# โœ… REQUIRED: set questions
st.session_state['game_questions'] = g.get('questions', [])
# โœ… REQUIRED: set BOTH ids
st.session_state['game_id'] = game_id
st.session_state['active_game_id'] = game_id
# โœ… REQUIRED: reset play state (prevents stored answer bugs)
st.session_state['current_index'] = 0
st.session_state['answers'] = [""] * len(st.session_state['game_questions'])
st.session_state['answer_times'] = [None] * len(st.session_state['game_questions'])
st.session_state['question_started_at'] = None
st.session_state['username'] = username
st.session_state['avatar'] = avatar
st.rerun()
else:
st.error(msg)
# Play page
# ----------------- Create Game -----------------
# Friends page
def friends_page():
st.header("Friends")
user = st.session_state.get('username')
if not user:
st.info("Enter your name in the sidebar to use Friends.")
return
friends_map = get_friends_map()
your_friends = friends_map.get(user, [])
st.subheader("Your friends")
if your_friends:
for f in your_friends:
sessions = unified_get("sessions") or {}
status = "offline"
for gid, users in (sessions or {}).items():
rec = users.get(f)
if rec:
last = parse_iso(rec.get('last_heartbeat'))
if last and (datetime.utcnow() - last) < timedelta(seconds=HEARTBEAT_THRESHOLD_SECONDS):
status = "online"
break
st.write(f"โ€ข {f} โ€” **{status}**")
if st.button(f"Invite {f} to a game", key=f"invitebtn_{f}"):
gid = st.text_input(f"Enter game id to invite {f} (or leave blank to create)", key=f"inviteinput_{f}")
if gid:
send_game_invite(st.session_state['username'], f, gid)
st.success(f"Invite sent to {f} for game {gid}")
else:
topics = list(questions_db.keys())[:1]
new_id = create_game(st.session_state['username'], topics, num_questions=5)
send_game_invite(st.session_state['username'], f, new_id)
st.success(f"Invite sent to {f} for new game {new_id}")
else:
st.write("You have no friends yet.")
st.markdown("---")
st.subheader("Find / Add friends")
all_users = set()
players = unified_get("players") or {}
for gid, users in (players or {}).items():
for u in (users or {}).keys():
all_users.add(u)
all_users = all_users.union(set(get_friends_map().keys()))
all_users.discard(user)
candidate = st.text_input("Search user to add (exact name)", value="")
if st.button("Send Friend Request"):
if not candidate:
st.error("Enter user name")
else:
send_friend_request(user, candidate)
st.success("Friend request sent.")
# Inbox page
def inbox_page():
st.header("Inbox")
user = st.session_state.get('username')
if not user:
st.info("Enter your name in the sidebar to view Inbox.")
return
inbox = get_inbox()
items = inbox.get(user, [])
if not items:
st.write("No messages.")
return
for idx, item in enumerate(items[:50]):
t = item.get('type')
if t == "friend_request":
fr = item.get('from')
st.write(f"Friend request from **{fr}** at {item.get('ts')}")
if st.button(f"Accept {idx}"):
accept_friend_request(user, fr)
st.success(f"You are now friends with {fr}")
st.rerun()
if st.button(f"Reject {idx}"):
entries = [it for it in items if not (it.get('type')=='friend_request' and it.get('from')==fr)]
inbox[user] = entries
save_inbox(inbox)
st.success("Rejected")
st.rerun()
elif t == "invite":
fr = item.get('from'); gid = item.get('game_id')
st.write(f"Invite from **{fr}** to join game **{gid}** at {item.get('ts')}")
if st.button(f"Join Invite {idx}"):
ok, msg = join_game(gid, user, st.session_state.get('avatar','๐ŸŽฎ'))
if ok:
st.success(f"Joined game {gid}")
items = [it for it in items if not (it.get('type')=='invite' and it.get('from')==fr and it.get('game_id')==gid)]
inbox[user] = items
save_inbox(inbox)
st.session_state['game_id'] = gid
st.session_state['username'] = user
st.rerun()
else:
st.error(msg)
# Leaderboard page
# ----------------- Leaderboard Page -----------------
def leaderboard_page():
st.header("๐Ÿ† Leaderboard")
# Get leaderboard data
leaderboard = unified_get("leaderboard") or []
if not leaderboard:
st.info("No scores yet.")
return
# Sort by score descending
leaderboard = sorted(leaderboard, key=lambda x: x.get("score", 0), reverse=True)
# Display top 10
st.subheader("Top Players")
for i, row in enumerate(leaderboard[:10], 1):
st.write(
f"{i}. {row.get('avatar','๐ŸŽฎ')} **{row.get('name','Guest')}** "
f"(Game {row.get('game_id','')}) โ€” {row.get('score',0)} pts โ€” {row.get('percentage',0)}%"
)
# Weekly leaderboard (optional)
st.subheader("Weekly Leaderboard (Top 10)")
week_leaderboard = [r for r in leaderboard if datetime.fromisoformat(r.get("timestamp", now_iso())).isocalendar()[1] == datetime.now().isocalendar()[1]]
if not week_leaderboard:
st.info("No scores yet this week.")
else:
week_leaderboard = sorted(week_leaderboard, key=lambda x: x.get("score",0), reverse=True)
for i, r in enumerate(week_leaderboard[:10], 1):
st.write(
f"{i}. {r.get('avatar','๐ŸŽฎ')} **{r.get('name','Guest')}** "
f"(Game {r.get('game_id','')}) โ€” {r.get('score',0)} pts"
)
def get_weekly_leaderboard(limit=10):
rows = unified_get("/leaderboard") or []
one_week_ago = datetime.utcnow() - timedelta(days=7)
weekly = []
for r in rows:
ts = r.get("timestamp")
if not ts:
continue
try:
played_time = datetime.fromisoformat(ts)
except Exception:
continue
if played_time >= one_week_ago:
weekly.append(r)
# Sort by score DESC
weekly.sort(key=lambda x: x.get("score", 0), reverse=True)
return weekly[:limit]
# Route pages
if page == "Home":
home_page()
elif page == "Create Game":
create_game_page()
elif page == "Join Game":
join_game_page()
elif page == "Play":
play_page()
elif page == "Friends":
friends_page()
elif page == "Inbox":
inbox_page()
elif page == "Leaderboard":
leaderboard_page()
# Resume quick action
if st.session_state.get('game_id') and st.session_state.get('username'):
players = unified_get("players") or {}
info = players.get(st.session_state['game_id'], {}).get(st.session_state['username'], {}) if players.get(st.session_state.get('game_id')) else {}
if info and info.get('submitted'):
st.info("You already submitted this game.")
else:
with st.expander("Resume Game"):
if st.button("Go to Play"):
st.rerun()
st.markdown("---")
st.write("Notes: Online mode requires firebase-admin and service account JSON named 'serviceAccountKey.json' placed next to app.py. Offline mode stores data in ./data/.")