cotienbot / app.py
Anothervin1's picture
Update app.py
87de215 verified
from fastapi import FastAPI, Request
from pydantic import BaseModel
from config.settings import Settings
from src.embedding import Embedding
from src.search import Search
from src.firestore_db import FirestoreDB
from src.gemini import Gemini
from src.cache import Cache
from src.lottery import Lottery
from src.logger import logger
import httpx
import json
from datetime import datetime
from typing import Dict, Optional
from functools import lru_cache
logger.info("Bắt đầu import các module")
app = FastAPI()
logger.info("Khởi tạo FastAPI hoàn tất")
# Tái sử dụng AsyncClient toàn cục
http_client = httpx.AsyncClient()
logger.info("Khởi tạo httpx.AsyncClient hoàn tất")
class CommandHandler:
"""Base class for command handlers."""
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None:
raise NotImplementedError
class StartCommand(CommandHandler):
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None:
await bot.send_message(chat_id, "Chào! Gửi câu hỏi hoặc dùng /help để xem các lệnh.")
class HelpCommand(CommandHandler):
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None:
help_text = (
"Hướng dẫn sử dụng @CotienBot:\n"
"- /start: Khởi động bot.\n"
"- /help: Hiển thị hướng dẫn này.\n"
"- /auth <mật_khẩu>: Xác thực để sử dụng các lệnh nhạy cảm.\n"
"- /train: Nhập dữ liệu đào tạo (FAISS và xổ số).\n"
"- /Gemini: Chuyển sang chế độ Gemini AI.\n"
"- /exit: Thoát chế độ Gemini.\n"
"- /lottery <đài> <ngày_bắt_đầu> <ngày_kết_thúc>: Phân tích tần suất, top bộ số (YYYY-MM-DD).\n"
"- /lottery_position <đài> <ngày> <số_ngày_trước> <độ_khớp>: Phân tích lồng quay, dự đoán (ngày DD/MM/YYYY).\n"
"- /load_lottery: Tải dữ liệu xổ số từ file JSON (data/lottery.json).\n"
"- Gửi câu hỏi thông thường để tìm kiếm thông tin."
)
cache_key = f"{user_id}:{text}"
bot.cache.set(cache_key, help_text)
await bot.send_message(chat_id, help_text)
class AuthCommand(CommandHandler):
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None:
if await bot.is_user_authenticated(user_id):
await bot.send_message(chat_id, "Bạn đã xác thực rồi!")
return
password = text.split(" ", 1)[1] if len(text.split(" ")) > 1 else ""
if await bot.authenticate_user(user_id, password):
await bot.send_message(chat_id, "Xác thực thành công! Bạn có thể sử dụng bot.")
else:
await bot.send_message(chat_id, "Mật khẩu sai. Vui lòng thử lại với /auth <mật_khẩu>.")
class GeminiCommand(CommandHandler):
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None:
bot.gemini_mode[user_id] = True
await bot.send_message(chat_id, "Chuyển sang chế độ Gemini. Gửi câu hỏi hoặc /exit.")
class ExitCommand(CommandHandler):
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None:
bot.gemini_mode[user_id] = False
await bot.send_message(chat_id, "Đã thoát chế độ Gemini.")
class TrainCommand(CommandHandler):
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None:
if not await bot.is_user_authenticated(user_id):
await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth <mật_khẩu>")
return
parts = text.split(maxsplit=1)
train_param = parts[1] if len(parts) > 1 else None
data_type = "đối thoại" # Mặc định là đối thoại
lottery_data = None
if train_param:
try:
parsed = json.loads(train_param)
if isinstance(parsed, dict) and "ngay" in parsed and "dai" in parsed and "giai" in parsed:
lottery_data = parsed
data_type = "xổ số"
except json.JSONDecodeError:
pass # Không phải JSON, coi là đối thoại
await bot.send_message(chat_id, f"Đang đào tạo {data_type}... (Dữ liệu: {train_param or 'Không có'})")
try:
from train import train_data
import asyncio
await asyncio.get_event_loop().run_in_executor(
None, train_data, bot.db, bot.search, bot.embedding, bot.lottery, train_param if not lottery_data else None, lottery_data
)
await bot.send_message(chat_id, f"Đào tạo {data_type} hoàn tất!")
except Exception as e:
error_type = "xổ số" if lottery_data else "đối thoại"
logger.error(f"Lỗi khi đào tạo {error_type}: {str(e)}")
await bot.send_message(chat_id, f"Lỗi khi đào tạo {error_type}: {str(e)}")
class LotteryPositionCommand(CommandHandler):
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None:
if not await bot.is_user_authenticated(user_id):
await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth <mật_khẩu>")
return
parts = text.split()
if len(parts) != 5:
await bot.send_message(chat_id, "Sai định dạng. Vui lòng dùng: /lottery_position <đài> <ngày> <số_ngày_trước> <độ_khớp> (ngày DD/MM/YYYY).")
return
_, dai, date, days_before, match_threshold = parts
try:
date = datetime.strptime(date, "%d/%m/%Y").strftime("%Y-%m-%d")
except ValueError:
await bot.send_message(chat_id, "Định dạng ngày không hợp lệ. Vui lòng dùng DD/MM/YYYY.")
return
result = bot.lottery.analyze_position(dai, date, days_before, match_threshold)
await bot.send_message(chat_id, result)
class LotteryCommand(CommandHandler):
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None:
if not await bot.is_user_authenticated(user_id):
await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth <mật_khẩu>")
return
parts = text.split()
if len(parts) != 4:
await bot.send_message(chat_id, "Sai định dạng. Vui lòng dùng: /lottery <đài> <ngày_bắt_đầu> <ngày_kết_thúc> (YYYY-MM-DD).")
return
_, dai, start_date, end_date = parts
result = bot.lottery.analyze(dai, start_date, end_date)
await bot.send_message(chat_id, result)
class LoadLotteryCommand(CommandHandler):
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None:
if not await bot.is_user_authenticated(user_id):
await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth <mật_khẩu>")
return
json_file_path = "data/lottery_data.json" # Đường dẫn mặc định
result = bot.lottery.load_from_json(json_file_path)
await bot.send_message(chat_id, result)
class DefaultCommand(CommandHandler):
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None:
cache_key = f"{user_id}:{text}"
gemini_cache_key = f"{user_id}:{text}:GEMINI"
cache_type = "gemini" if bot.gemini_mode.get(user_id) else "default"
cache_key_to_use = gemini_cache_key if bot.gemini_mode.get(user_id) else cache_key
cached = bot.cache.get(cache_key_to_use, type=cache_type)
if cached:
await bot.send_message(chat_id, cached)
return
query_vector = bot.embedding.generate(text)
results = bot.search.search(query_vector)
context = results[0][0] if results else None
if bot.gemini_mode.get(user_id):
response = bot.gemini.query(text, context)
bot.cache.set(gemini_cache_key, response, type="gemini")
else:
response = context or "Không tìm thấy thông tin."
bot.cache.set(cache_key, response)
await bot.send_message(chat_id, response)
logger.info("Định nghĩa các CommandHandler hoàn tất")
class Bot:
def __init__(self):
logger.info("Bắt đầu khởi tạo Bot")
self.gemini_mode: Dict[int, bool] = {}
self.authenticated_users: Dict[int, bool] = {}
self._embedding = None
self._search = None
self._db = None
self._gemini = None
self._cache = None
self._lottery = None
self.command_handlers = {
"/start": StartCommand(),
"/help": HelpCommand(),
"/auth": AuthCommand(),
"/Gemini": GeminiCommand(),
"/exit": ExitCommand(),
"/train": TrainCommand(),
"/lottery": LotteryCommand(),
"/lottery_position": LotteryPositionCommand(),
"/load_lottery": LoadLotteryCommand(), # Thêm handler mới
}
logger.info("Khởi tạo Bot hoàn tất")
@property
def embedding(self):
if self._embedding is None:
logger.info("Khởi tạo Embedding")
self._embedding = Embedding()
return self._embedding
@property
def search(self):
if self._search is None:
logger.info("Khởi tạo Search")
self._search = Search()
return self._search
@property
def db(self):
if self._db is None:
logger.info("Khởi tạo FirestoreDB")
self._db = FirestoreDB()
return self._db
@property
def gemini(self):
if self._gemini is None:
logger.info("Khởi tạo Gemini")
self._gemini = Gemini()
return self._gemini
@property
def cache(self):
if self._cache is None:
logger.info("Khởi tạo Cache")
self._cache = Cache()
return self._cache
@property
def lottery(self):
if self._lottery is None:
logger.info("Khởi tạo Lottery")
self._lottery = Lottery(self.db, self.cache, self.embedding)
return self._lottery
async def send_message(self, chat_id: int, text: str) -> None:
url = f"https://api.telegram.org/bot{Settings.TELEGRAM_TOKEN}/sendMessage"
payload = {"chat_id": chat_id, "text": text}
try:
await http_client.post(url, json=payload)
except Exception as e:
logger.error(f"Lỗi khi gửi tin nhắn: {str(e)}")
async def is_user_authenticated(self, user_id: int) -> bool:
if user_id in self.authenticated_users:
return True
doc = self.db.db.collection("auth_users").document(str(user_id)).get()
if doc.exists and doc.to_dict().get("authenticated", False):
self.authenticated_users[user_id] = True
return True
return False
async def authenticate_user(self, user_id: int, password: str) -> bool:
if password == Settings.BOT_PASSWORD:
self.authenticated_users[user_id] = True
self.db.db.collection("auth_users").document(str(user_id)).set({"authenticated": True})
logger.info(f"Người dùng {user_id} đã xác thực thành công")
return True
logger.warning(f"Xác thực thất bại cho người dùng {user_id}")
return False
async def handle_message(self, chat_id: int, user_id: int, text: str) -> None:
logger.info(f"Xử lý lệnh: {text} từ user_id: {user_id}")
command = text.split()[0] if text.startswith("/") else None
if command and command not in self.command_handlers:
await self.send_message(chat_id, "Lệnh không hợp lệ. Gửi /help để xem hướng dẫn.")
return
handler = self.command_handlers.get(command, DefaultCommand())
await handler.handle(self, chat_id, user_id, text)
logger.info("Định nghĩa lớp Bot hoàn tất")
bot = Bot()
logger.info("Khởi tạo bot hoàn tất")
class WebhookUpdate(BaseModel):
message: dict
@app.get("/")
async def read_root():
return {"status": "ok"}
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.post("/webhook")
async def webhook(update: WebhookUpdate):
message = update.message
chat_id = message["chat"]["id"]
user_id = message["from"]["id"]
text = message.get("text", "")
logger.info(f"Nhận tin nhắn từ {user_id}: {text}")
await bot.handle_message(chat_id, user_id, text)
return {"status": "ok"}
@app.on_event("startup")
async def startup_event():
logger.info("Bắt đầu startup_event")
webhook_url = f"{Settings.WEBHOOK_URL}/webhook"
url = f"https://api.telegram.org/bot{Settings.TELEGRAM_TOKEN}/setWebhook"
try:
logger.info(f"Gửi yêu cầu thiết lập webhook tới {url}")
response = await http_client.post(url, json={"url": webhook_url}, timeout=10.0)
logger.info(f"Phản hồi từ Telegram API: {response.status_code}")
if response.status_code == 200:
logger.info("Webhook đã được thiết lập")
else:
logger.error(f"Thiết lập webhook thất bại: {response.text}")
except Exception as e:
logger.error(f"Lỗi khi thiết lập webhook: {str(e)}")
logger.info("Kết thúc startup_event")
@app.on_event("shutdown")
async def shutdown_event():
logger.info("Bắt đầu shutdown_event")
await http_client.aclose()
logger.info("Đã đóng HTTP client")
logger.info("Định nghĩa các route và event handlers hoàn tất")