| | from fastapi import FastAPI, WebSocket, WebSocketDisconnect |
| | from typing import Dict, List |
| | import time |
| | import json |
| | import asyncio |
| |
|
| | app = FastAPI() |
| |
|
| | |
| | |
| | VALID_USER_IDS = [0, 1] |
| | |
| | userLastOnlineTime: Dict[int, int] = {id: 0 for id in VALID_USER_IDS} |
| | |
| | messages: List[Dict] = [] |
| |
|
| | def timeMiliSec(sec_time): |
| | return int(sec_time * 1000) |
| |
|
| |
|
| | class ConnectionManager: |
| | """Manages active WebSocket connections, mapping user_id to WebSocket.""" |
| | def __init__(self): |
| | |
| | self.active_connections: Dict[int, WebSocket] = {} |
| |
|
| | async def connect(self, user_id: int, websocket: WebSocket): |
| | await websocket.accept() |
| | self.active_connections[user_id] = websocket |
| |
|
| | def disconnect(self, user_id: int): |
| | if user_id in self.active_connections: |
| | del self.active_connections[user_id] |
| |
|
| | def isOnline(self, user_id: int): |
| | return user_id in self.active_connections |
| |
|
| | async def send_personal_message(self, message: Dict, user_id: int): |
| | """Sends a JSON message to a single connected user.""" |
| | if user_id in self.active_connections: |
| | try: |
| | await self.active_connections[user_id].send_json(message) |
| | return True |
| | except RuntimeError as e: |
| | print(f"Error sending message to user {user_id}: {e}") |
| | |
| | self.disconnect(user_id) |
| | return False |
| | return False |
| |
|
| | async def broadcast(self, message: Dict): |
| | """Sends a JSON message to all connected users.""" |
| | disconnects = [] |
| | for user_id, connection in self.active_connections.items(): |
| | try: |
| | |
| | await connection.send_json(message) |
| | except RuntimeError: |
| | |
| | disconnects.append(user_id) |
| | except Exception as e: |
| | print(f"Broadcast error to {user_id}: {e}") |
| | disconnects.append(user_id) |
| |
|
| | for user_id in disconnects: |
| | self.disconnect(user_id) |
| | print(f"User {user_id} disconnected during broadcast cleanup.") |
| |
|
| |
|
| | manager = ConnectionManager() |
| |
|
| |
|
| | def addMessage(sender_id: int, data: Dict) -> Dict: |
| | """ |
| | Creates a new message dictionary and adds it to the global messages list. |
| | |
| | Expected keys in data: "content", "timestamp" (client time), "receiver_id". |
| | """ |
| | new_message_id = len(messages) |
| |
|
| | |
| | |
| | |
| |
|
| | receiver_id = data.get("receiver_id") |
| | status_tracker = { |
| | sender_id: "sent", |
| | receiver_id: "delivered" |
| | } |
| |
|
| | new_message = { |
| | "id": new_message_id, |
| | "content": data.get("content", ""), |
| | "time_client": data.get("timestamp", timeMiliSec(time.time())), |
| | "time_server": timeMiliSec(time.time()), |
| | "sender_id": sender_id, |
| | "receiver_id": receiver_id, |
| | "status": status_tracker |
| | } |
| |
|
| | messages.append(new_message) |
| | return new_message |
| |
|
| |
|
| | |
| | async def send_undelivered_messages(user_id: int): |
| | """ |
| | Checks the message history and attempts to send any messages |
| | that were logged while the user was offline. |
| | """ |
| | print(f"Checking for undelivered messages for user {user_id}...") |
| |
|
| | |
| | |
| | |
| |
|
| | undelivered_count = 0 |
| |
|
| | |
| | for msg in messages: |
| | |
| | if msg.get("receiver_id") == user_id: |
| |
|
| | |
| | |
| | receiver_status = msg["status"].get(user_id) |
| |
|
| | if receiver_status == "delivered": |
| |
|
| | delivery_payload = { |
| | "type": "new_message", |
| | "message": msg |
| | } |
| |
|
| | |
| | sent_successfully = await manager.send_personal_message(delivery_payload, user_id) |
| |
|
| | if sent_successfully: |
| | |
| | msg["status"][user_id] = "received" |
| | undelivered_count += 1 |
| |
|
| | |
| | sender_id = msg["sender_id"] |
| | receipt_payload = { |
| | "type": "read_receipt", |
| | "message_id": msg["id"], |
| | "status": "received", |
| | "updated_by_user": user_id |
| | } |
| | await manager.send_personal_message(receipt_payload, sender_id) |
| |
|
| | if undelivered_count > 0: |
| | print(f"Delivered {undelivered_count} historical messages to user {user_id}.") |
| |
|
| |
|
| | @app.websocket("/ws/{user_id}") |
| | async def websocket_endpoint(websocket: WebSocket, user_id: int): |
| | |
| | if user_id not in VALID_USER_IDS: |
| | await websocket.close(code=1008, reason="Invalid User ID") |
| | return |
| |
|
| | await manager.connect(user_id, websocket) |
| | userLastOnlineTime[user_id] = timeMiliSec(time.time()) |
| | print(f"User {user_id} connected.") |
| |
|
| | |
| | online_status = {"type": "status_update", "user_id": user_id, "is_online": True} |
| | await manager.broadcast(online_status) |
| |
|
| | |
| | for other_user_id in VALID_USER_IDS: |
| | if other_user_id != user_id: |
| | if not manager.isOnline(other_user_id): |
| | offline_status = {"type": "status_update", "user_id": other_user_id, "is_online": False, "last_seen": userLastOnlineTime.get(other_user_id, 0.0)} |
| | await manager.send_personal_message(offline_status, user_id) |
| | else: |
| | online_status = {"type": "status_update", "user_id": other_user_id, "is_online": True} |
| | await manager.send_personal_message(online_status, user_id) |
| |
|
| |
|
| | |
| | |
| | await send_undelivered_messages(user_id) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| |
|
| | |
| | message_type = data.get("type", "chat_message") |
| |
|
| | if message_type == "chat_message": |
| | |
| |
|
| | |
| | if not all(k in data for k in ["content", "timestamp", "receiver_id"]): |
| | await manager.send_personal_message({"type": "error", "message": "Missing required fields (content, timestamp, receiver_id)."}, user_id) |
| | continue |
| |
|
| | |
| | new_message = addMessage(user_id, data) |
| |
|
| | |
| | delivery_payload = { |
| | "type": "new_message", |
| | "message": new_message |
| | } |
| |
|
| | |
| | sent_to_receiver = await manager.send_personal_message(delivery_payload, new_message["receiver_id"]) |
| |
|
| | if sent_to_receiver: |
| | |
| | await manager.send_personal_message({"type": "delivery_receipt", "message_id": new_message["id"], "status": "delivered"}, user_id) |
| | else: |
| | |
| | |
| | await manager.send_personal_message({"type": "delivery_receipt", "message_id": new_message["id"], "status": "pending"}, user_id) |
| |
|
| | elif message_type == "status_update": |
| | |
| |
|
| | required_keys = ["message_id", "new_status"] |
| | if not all(k in data for k in required_keys): |
| | await manager.send_personal_message({"type": "error", "message": "Missing required fields for status update."}, user_id) |
| | continue |
| |
|
| | message_id = data["message_id"] |
| | new_status = data["new_status"] |
| |
|
| | if 0 <= message_id < len(messages): |
| | message_to_update = messages[message_id] |
| | sender_id = message_to_update["sender_id"] |
| |
|
| | |
| | message_to_update["status"][user_id] = new_status |
| |
|
| | |
| | receipt_payload = { |
| | "type": "read_receipt", |
| | "message_id": message_id, |
| | "status": new_status |
| | } |
| | await manager.send_personal_message(receipt_payload, sender_id) |
| | else: |
| | await manager.send_personal_message({"type": "error", "message": f"Message ID {message_id} not found."}, user_id) |
| |
|
| | else: |
| | await manager.send_personal_message({"type": "error", "message": "Unknown message type."}, user_id) |
| |
|
| |
|
| | except WebSocketDisconnect: |
| | |
| | manager.disconnect(user_id) |
| | userLastOnlineTime[user_id] = timeMiliSec(time.time()) |
| | print(f"User {user_id} disconnected at {userLastOnlineTime[user_id]}.") |
| |
|
| | |
| | offline_status = { |
| | "type": "status_update", |
| | "user_id": user_id, |
| | "is_online": False, |
| | "last_seen": userLastOnlineTime[user_id] |
| | } |
| | await manager.broadcast(offline_status) |
| | except json.JSONDecodeError: |
| | print(f"Received invalid JSON from user {user_id}") |
| | await manager.send_personal_message({"type": "error", "message": "Invalid JSON format."}, user_id) |
| | except Exception as e: |
| | print(f"Unexpected error in WS loop for user {user_id}: {e}") |
| | manager.disconnect(user_id) |
| |
|
| | finally: |
| | |
| | if user_id in manager.active_connections: |
| | manager.disconnect(user_id) |
| | print(f"Cleanup disconnect for user {user_id}") |