from fastapi import FastAPI, Request from pydantic import BaseModel from config.settings import Settings from src.embedding import Embedding from src.search import Search from src.firestore_db import FirestoreDB from src.gemini import Gemini from src.cache import Cache from src.lottery import Lottery from src.logger import logger import httpx import json from datetime import datetime from typing import Dict, Optional from functools import lru_cache logger.info("Bắt đầu import các module") app = FastAPI() logger.info("Khởi tạo FastAPI hoàn tất") # Tái sử dụng AsyncClient toàn cục http_client = httpx.AsyncClient() logger.info("Khởi tạo httpx.AsyncClient hoàn tất") class CommandHandler: """Base class for command handlers.""" async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: raise NotImplementedError class StartCommand(CommandHandler): async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: await bot.send_message(chat_id, "Chào! Gửi câu hỏi hoặc dùng /help để xem các lệnh.") class HelpCommand(CommandHandler): async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: help_text = ( "Hướng dẫn sử dụng @CotienBot:\n" "- /start: Khởi động bot.\n" "- /help: Hiển thị hướng dẫn này.\n" "- /auth : Xác thực để sử dụng các lệnh nhạy cảm.\n" "- /train: Nhập dữ liệu đào tạo (FAISS và xổ số).\n" "- /Gemini: Chuyển sang chế độ Gemini AI.\n" "- /exit: Thoát chế độ Gemini.\n" "- /lottery <đài> : Phân tích tần suất, top bộ số (YYYY-MM-DD).\n" "- /lottery_position <đài> <độ_khớp>: Phân tích lồng quay, dự đoán (ngày DD-MM-YYYY).\n" "- /load_lottery: Tải dữ liệu xổ số từ file JSON (data/lottery_data.json).\n" "- Gửi câu hỏi thông thường để tìm kiếm thông tin." ) cache_key = f"{user_id}:{text}" bot.cache.set(cache_key, help_text) await bot.send_message(chat_id, help_text) class AuthCommand(CommandHandler): async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: if await bot.is_user_authenticated(user_id): await bot.send_message(chat_id, "Bạn đã xác thực rồi!") return password = text.split(" ", 1)[1] if len(text.split(" ")) > 1 else "" if await bot.authenticate_user(user_id, password): await bot.send_message(chat_id, "Xác thực thành công! Bạn có thể sử dụng bot.") else: await bot.send_message(chat_id, "Mật khẩu sai. Vui lòng thử lại với /auth .") class GeminiCommand(CommandHandler): async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: bot.gemini_mode[user_id] = True await bot.send_message(chat_id, "Chuyển sang chế độ Gemini. Gửi câu hỏi hoặc /exit.") class ExitCommand(CommandHandler): async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: bot.gemini_mode[user_id] = False await bot.send_message(chat_id, "Đã thoát chế độ Gemini.") class TrainCommand(CommandHandler): async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: if not await bot.is_user_authenticated(user_id): await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth ") return parts = text.split(maxsplit=1) train_param = parts[1] if len(parts) > 1 else None data_type = "đối thoại" lottery_data = None if train_param: try: parsed = json.loads(train_param) if isinstance(parsed, dict) and "ngay" in parsed and "dai" in parsed and "giai" in parsed: lottery_data = parsed data_type = "xổ số" except json.JSONDecodeError: pass await bot.send_message(chat_id, f"Đang đào tạo {data_type}... (Dữ liệu: {train_param or 'Không có'})") try: from train import train_data import asyncio await asyncio.get_event_loop().run_in_executor( None, train_data, bot.db, bot.search, bot.embedding, bot.lottery, train_param if not lottery_data else None, lottery_data ) await bot.send_message(chat_id, f"Đào tạo {data_type} hoàn tất!") except Exception as e: error_type = "xổ số" if lottery_data else "đối thoại" logger.error(f"Lỗi khi đào tạo {error_type}: {str(e)}") await bot.send_message(chat_id, f"Lỗi khi đào tạo {error_type}: {str(e)}") class LotteryPositionCommand(CommandHandler): async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: if not await bot.is_user_authenticated(user_id): await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth ") return parts = text.split() if len(parts) < 5: await bot.send_message(chat_id, "Sai định dạng. Vui lòng dùng: /lottery_position <đài> <độ_khớp> (ngày DD-MM-YYYY).") return match_threshold = parts[-1] days_before = parts[-2] date = parts[-3].replace("/", "-") # Chuyển DD/MM/YYYY thành DD-MM-YYYY try: datetime.strptime(date, "%d-%m-%Y") except ValueError: await bot.send_message(chat_id, "Định dạng ngày không hợp lệ. Vui lòng dùng DD-MM-YYYY (ví dụ: 16-04-2025).") return dai = " ".join(parts[1:-3]) if not dai: await bot.send_message(chat_id, "Thiếu tên đài. Vui lòng dùng: /lottery_position <đài> <độ_khớp>.") return result = bot.lottery.analyze_position(dai, date, days_before, match_threshold) await bot.send_message(chat_id, result) class LotteryCommand(CommandHandler): async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: if not await bot.is_user_authenticated(user_id): await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth ") return parts = text.split() if len(parts) < 4: await bot.send_message(chat_id, "Sai định dạng. Vui lòng dùng: /lottery <đài> (YYYY-MM-DD).") return end_date = parts[-1] start_date = parts[-2] try: datetime.strptime(start_date, "%Y-%m-%d") datetime.strptime(end_date, "%Y-%m-%d") except ValueError: await bot.send_message(chat_id, "Định dạng ngày không hợp lệ. Vui lòng dùng YYYY-MM-DD (ví dụ: 2025-05-11).") return dai = " ".join(parts[1:-2]) if not dai: await bot.send_message(chat_id, "Thiếu tên đài. Vui lòng dùng: /lottery <đài> .") return result = bot.lottery.analyze(dai, start_date, end_date) await bot.send_message(chat_id, result) class LoadLotteryCommand(CommandHandler): def __init__(self): self.is_loading = False async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: if not await bot.is_user_authenticated(user_id): await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth ") return if self.is_loading: await bot.send_message(chat_id, "Đang tải dữ liệu xổ số, vui lòng chờ...") return self.is_loading = True try: json_file_path = "data/lottery_data.json" result = bot.lottery.load_from_json(json_file_path) await bot.send_message(chat_id, result) finally: self.is_loading = False class DefaultCommand(CommandHandler): async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: cache_key = f"{user_id}:{text}" gemini_cache_key = f"{user_id}:{text}:GEMINI" cache_type = "gemini" if bot.gemini_mode.get(user_id) else "default" cache_key_to_use = gemini_cache_key if bot.gemini_mode.get(user_id) else cache_key cached = bot.cache.get(cache_key_to_use, type=cache_type) if cached: await bot.send_message(chat_id, cached) return query_vector = bot.embedding.generate(text) results = bot.search.search(query_vector) context = results[0][0] if results else None if bot.gemini_mode.get(user_id): response = bot.gemini.query(text, context) bot.cache.set(gemini_cache_key, response, type="gemini") else: response = context or "Không tìm thấy thông tin." bot.cache.set(cache_key, response) await bot.send_message(chat_id, response) logger.info("Định nghĩa các CommandHandler hoàn tất") class Bot: def __init__(self): logger.info("Bắt đầu khởi tạo Bot") self.gemini_mode: Dict[int, bool] = {} self.authenticated_users: Dict[int, bool] = {} self._embedding = None self._search = None self._db = None self._gemini = None self._cache = None self._lottery = None self.command_handlers = { "/start": StartCommand(), "/help": HelpCommand(), "/auth": AuthCommand(), "/Gemini": GeminiCommand(), "/exit": ExitCommand(), "/train": TrainCommand(), "/lottery": LotteryCommand(), "/lottery_position": LotteryPositionCommand(), "/load_lottery": LoadLotteryCommand(), } logger.info("Khởi tạo Bot hoàn tất") @property def embedding(self): if self._embedding is None: logger.info("Khởi tạo Embedding") self._embedding = Embedding() return self._embedding @property def search(self): if self._search is None: logger.info("Khởi tạo Search") self._search = Search() return self._search @property def db(self): if self._db is None: logger.info("Khởi tạo FirestoreDB") self._db = FirestoreDB() return self._db @property def gemini(self): if self._gemini is None: logger.info("Khởi tạo Gemini") self._gemini = Gemini() return self._gemini @property def cache(self): if self._cache is None: logger.info("Khởi tạo Cache") self._cache = Cache() return self._cache @property def lottery(self): if self._lottery is None: logger.info("Khởi tạo Lottery") self._lottery = Lottery(self.db, self.cache, self.embedding) return self._lottery async def send_message(self, chat_id: int, text: str) -> None: url = f"https://api.telegram.org/bot{Settings.TELEGRAM_TOKEN}/sendMessage" payload = {"chat_id": chat_id, "text": text} try: await http_client.post(url, json=payload) except Exception as e: logger.error(f"Lỗi khi gửi tin nhắn: {str(e)}") async def is_user_authenticated(self, user_id: int) -> bool: if user_id in self.authenticated_users: return True doc = self.db.db.collection("auth_users").document(str(user_id)).get() if doc.exists and doc.to_dict().get("authenticated", False): self.authenticated_users[user_id] = True return True return False async def authenticate_user(self, user_id: int, password: str) -> bool: if password == Settings.BOT_PASSWORD: self.authenticated_users[user_id] = True self.db.db.collection("auth_users").document(str(user_id)).set({"authenticated": True}) logger.info(f"Người dùng {user_id} đã xác thực thành công") return True logger.warning(f"Xác thực thất bại cho người dùng {user_id}") return False async def handle_message(self, chat_id: int, user_id: int, text: str) -> None: logger.info(f"Xử lý lệnh: {text} từ user_id: {user_id}") command = text.split()[0] if text.startswith("/") else None if command and command not in self.command_handlers: await self.send_message(chat_id, "Lệnh không hợp lệ. Gửi /help để xem hướng dẫn.") return handler = self.command_handlers.get(command, DefaultCommand()) await handler.handle(self, chat_id, user_id, text) logger.info("Định nghĩa lớp Bot hoàn tất") bot = Bot() logger.info("Khởi tạo bot hoàn tất") class WebhookUpdate(BaseModel): message: dict @app.get("/") async def read_root(): return {"status": "ok"} @app.get("/health") async def health_check(): return {"status": "healthy"} @app.post("/webhook") async def webhook(update: WebhookUpdate): message = update.message chat_id = message["chat"]["id"] user_id = message["from"]["id"] text = message.get("text", "") logger.info(f"Nhận tin nhắn từ {user_id}: {text}") await bot.handle_message(chat_id, user_id, text) return {"status": "ok"} @app.on_event("startup") async def startup_event(): logger.info("Bắt đầu startup_event") webhook_url = f"{Settings.WEBHOOK_URL}/webhook" url = f"https://api.telegram.org/bot{Settings.TELEGRAM_TOKEN}/setWebhook" try: logger.info(f"Gửi yêu cầu thiết lập webhook tới {url}") response = await http_client.post(url, json={"url": webhook_url}, timeout=10.0) logger.info(f"Phản hồi từ Telegram API: {response.status_code}") if response.status_code == 200: logger.info("Webhook đã được thiết lập") else: logger.error(f"Thiết lập webhook thất bại: {response.text}") except Exception as e: logger.error(f"Lỗi khi thiết lập webhook: {str(e)}") logger.info("Kết thúc startup_event") @app.on_event("shutdown") async def shutdown_event(): logger.info("Bắt đầu shutdown_event") await http_client.aclose() logger.info("Đã đóng HTTP client") logger.info("Định nghĩa các route và event handlers hoàn tất")