import os import logging import asyncio import requests from telegram import Update from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext import aiohttp import io # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # Load environment variables from Hugging Face Secrets BOT_TOKEN = os.getenv("BOT_TOKEN") BASE_URL = os.getenv("BASE_URL") HF_TOKEN = os.getenv("HF_TOKEN") API_USERNAME = os.getenv("API_USERNAME") API_PASSWORD = os.getenv("API_PASSWORD") # Set the secret password for authentication SECRET_PASSWORD = "secure123" # Change this to your desired password # Dictionary to store authenticated users AUTHENTICATED_USERS = set() AWAITING_PASSWORD = set() class TelegramBot: """A Telegram bot with password-based authentication.""" def __init__(self, bot_token, base_url, username, password): """Initialize the bot with Telegram API token, API credentials, and authentication.""" self.bot_token = bot_token self.base_url = base_url # self.username = username # self.password = password # self.auth_token = None # API Endpoints self.login_url = f"{self.base_url}/api/v1/auth/login" self.ai_url = f"{self.base_url}/api/v1/questions/text" self.excel_url = f"{self.base_url}/api/v1/questions/excel" # Start Telegram Bot self.app = Application.builder().token(self.bot_token).build() self.setup_handlers() # Authenticate with API logging.info("Authenticating with API...") # self.authenticate() # def authenticate(self): # """Authenticate with the API and retrieve an access token.""" # payload = {"username": self.username, "password": self.password} # headers = {"Content-Type": "application/json", "accept": "application/json"} # # try: # response = requests.post(self.login_url, headers=headers, json=payload) # # if response.status_code == 200: # self.auth_token = response.json().get("access_token") # logging.info("Successfully authenticated with API") # else: # logging.error(f"Authentication failed: {response.status_code} - {response.text}") # # except Exception as e: # logging.error(f"Authentication Error: {e}") async def start_command(self, update: Update, context: CallbackContext): """Handles the /start command and asks for a password if the user is not authenticated.""" user_id = update.message.from_user.id if user_id in AUTHENTICATED_USERS: await update.message.reply_text( "✅ You are already authenticated!\n\n" "You can:\n" "1. Send me any question as text\n" "2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n" "Note: Excel files must contain no more than 50 questions." ) else: AWAITING_PASSWORD.add(user_id) await update.message.reply_text("🔑 Please enter the secret password to access the bot.") async def handle_message(self, update: Update, context: CallbackContext): """Handles all incoming messages.""" user_id = update.message.from_user.id user_message = update.message.text.strip() # If user is waiting to enter a password, validate it if user_id in AWAITING_PASSWORD: await self.check_password(update, context) return # If user is authenticated, process AI request if user_id in AUTHENTICATED_USERS: await self.chat_with_ai(update, context) else: await update.message.reply_text("❌ You are not authenticated. Please enter the password first.") async def check_password(self, update: Update, context: CallbackContext): """Checks if the password is correct and authenticates the user.""" user_id = update.message.from_user.id user_message = update.message.text.strip() if user_message == SECRET_PASSWORD: AUTHENTICATED_USERS.add(user_id) AWAITING_PASSWORD.discard(user_id) logging.info(f"User {user_id} authenticated successfully.") await update.message.reply_text( "✅ Authentication successful!\n\n" "You can:\n" "1. Send me any question as text\n" "2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n" "Note: Excel files must contain no more than 50 questions." ) else: await update.message.reply_text("❌ Wrong password. Try again.") async def chat_with_ai(self, update: Update, context: CallbackContext): """Handles messages and sends them to the AI API.""" user_id = update.message.from_user.id if user_id not in AUTHENTICATED_USERS: await update.message.reply_text("❌ You are not authenticated. Please enter the password first.") return # if not self.auth_token: # self.authenticate() # # if not self.auth_token: # await update.message.reply_text("Authentication failed. Please try again later.") # return user_message = update.message.text hf_authorization = "Bearer " + HF_TOKEN headers = { "Authorization": hf_authorization, "accept": "application/json" } json_payload = {"question": user_message} form_payload = {"question": user_message} try: logging.info(f"Sending payload as JSON: {json_payload}") response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/json"}, json=json_payload) if response.status_code == 422: logging.warning("JSON format rejected. Retrying with form-data...") response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/x-www-form-urlencoded"}, data=form_payload) if response.status_code == 200: bot_reply = response.json().get("answer", "I didn't understand that.") elif response.status_code == 401: logging.warning("Authorization expired. Re-authenticating...") # self.authenticate() await self.chat_with_ai(update, context) return else: logging.error(f"Error: {response.status_code}") bot_reply = f"Error: {response.status_code}" except Exception as e: logging.error(f"Connection error: {e}") bot_reply = f"Connection error: {e}" await update.message.reply_text(bot_reply) async def handle_excel(self, update: Update, context: CallbackContext): """Handles Excel file uploads.""" user_id = update.message.from_user.id if user_id not in AUTHENTICATED_USERS: await update.message.reply_text("❌ You are not authenticated. Please enter the password first.") return # if not self.auth_token: # self.authenticate() # # if not self.auth_token: # await update.message.reply_text("Authentication failed. Please try again later.") # return document = update.message.document if not document.file_name.endswith(('.xls', '.xlsx')): await update.message.reply_text("Please send only Excel files (.xls or .xlsx)") return try: # Send initial processing message processing_msg = await update.message.reply_text("📊 Processing your Excel file... This may take a few minutes.") # Get file from Telegram file = await context.bot.get_file(document.file_id) file_bytes = await file.download_as_bytearray() headers = { "Authorization": f"Bearer {HF_TOKEN}", "accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" } async with aiohttp.ClientSession() as session: # Create form data with the file form_data = aiohttp.FormData() form_data.add_field('file', file_bytes, filename=document.file_name, content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') # Send file to API async with session.post(self.excel_url, headers=headers, data=form_data) as response: if response.status == 200: # Get the Excel file content directly file_content = await response.read() # Send the Excel file back to Telegram await context.bot.send_document( chat_id=update.effective_chat.id, document=io.BytesIO(file_content), filename='rfp_responses.xlsx', caption="✅ Here's your processed Excel file with answers!" ) await processing_msg.delete() elif response.status == 401: logging.warning("Authorization expired. Re-authenticating...") # self.authenticate() await self.handle_excel(update, context) else: error_text = await response.text() await processing_msg.edit_text(f"❌ Error processing Excel file: {error_text}") except Exception as e: logging.error(f"Error handling Excel file: {e}") await update.message.reply_text(f"❌ Error processing Excel file: {str(e)}") def setup_handlers(self): """Set up Telegram command and message handlers.""" self.app.add_handler(CommandHandler("start", self.start_command)) self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) self.app.add_handler(MessageHandler(filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"), self.handle_excel)) def run(self): """Start the bot and listen for messages.""" logging.info("Starting Telegram bot...") self.app.run_polling() if __name__ == "__main__": bot = TelegramBot( bot_token=BOT_TOKEN, base_url=BASE_URL, username=API_USERNAME, password=API_PASSWORD ) bot.run()