rfp_telegram_bot_old / bot_telegram.py
cevheri's picture
Update bot_telegram.py
7ec3534 verified
import os
import logging
import asyncio
import requests
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
import aiohttp
import io
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Load environment variables from Hugging Face Secrets
BOT_TOKEN = os.getenv("BOT_TOKEN")
BASE_URL = os.getenv("BASE_URL")
HF_TOKEN = os.getenv("HF_TOKEN")
API_USERNAME = os.getenv("API_USERNAME")
API_PASSWORD = os.getenv("API_PASSWORD")
# Set the secret password for authentication
SECRET_PASSWORD = "secure123" # Change this to your desired password
# Dictionary to store authenticated users
AUTHENTICATED_USERS = set()
AWAITING_PASSWORD = set()
class TelegramBot:
"""A Telegram bot with password-based authentication."""
def __init__(self, bot_token, base_url, username, password):
"""Initialize the bot with Telegram API token, API credentials, and authentication."""
self.bot_token = bot_token
self.base_url = base_url
# self.username = username
# self.password = password
# self.auth_token = None
# API Endpoints
self.login_url = f"{self.base_url}/api/v1/auth/login"
self.ai_url = f"{self.base_url}/api/v1/questions/text"
self.excel_url = f"{self.base_url}/api/v1/questions/excel"
# Start Telegram Bot
self.app = Application.builder().token(self.bot_token).build()
self.setup_handlers()
# Authenticate with API
logging.info("Authenticating with API...")
# self.authenticate()
# def authenticate(self):
# """Authenticate with the API and retrieve an access token."""
# payload = {"username": self.username, "password": self.password}
# headers = {"Content-Type": "application/json", "accept": "application/json"}
#
# try:
# response = requests.post(self.login_url, headers=headers, json=payload)
#
# if response.status_code == 200:
# self.auth_token = response.json().get("access_token")
# logging.info("Successfully authenticated with API")
# else:
# logging.error(f"Authentication failed: {response.status_code} - {response.text}")
#
# except Exception as e:
# logging.error(f"Authentication Error: {e}")
async def start_command(self, update: Update, context: CallbackContext):
"""Handles the /start command and asks for a password if the user is not authenticated."""
user_id = update.message.from_user.id
if user_id in AUTHENTICATED_USERS:
await update.message.reply_text(
"βœ… You are already authenticated!\n\n"
"You can:\n"
"1. Send me any question as text\n"
"2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n"
"Note: Excel files must contain no more than 50 questions."
)
else:
AWAITING_PASSWORD.add(user_id)
await update.message.reply_text("πŸ”‘ Please enter the secret password to access the bot.")
async def handle_message(self, update: Update, context: CallbackContext):
"""Handles all incoming messages."""
user_id = update.message.from_user.id
user_message = update.message.text.strip()
# If user is waiting to enter a password, validate it
if user_id in AWAITING_PASSWORD:
await self.check_password(update, context)
return
# If user is authenticated, process AI request
if user_id in AUTHENTICATED_USERS:
await self.chat_with_ai(update, context)
else:
await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
async def check_password(self, update: Update, context: CallbackContext):
"""Checks if the password is correct and authenticates the user."""
user_id = update.message.from_user.id
user_message = update.message.text.strip()
if user_message == SECRET_PASSWORD:
AUTHENTICATED_USERS.add(user_id)
AWAITING_PASSWORD.discard(user_id)
logging.info(f"User {user_id} authenticated successfully.")
await update.message.reply_text(
"βœ… Authentication successful!\n\n"
"You can:\n"
"1. Send me any question as text\n"
"2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n"
"Note: Excel files must contain no more than 50 questions."
)
else:
await update.message.reply_text("❌ Wrong password. Try again.")
async def chat_with_ai(self, update: Update, context: CallbackContext):
"""Handles messages and sends them to the AI API."""
user_id = update.message.from_user.id
if user_id not in AUTHENTICATED_USERS:
await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
return
# if not self.auth_token:
# self.authenticate()
#
# if not self.auth_token:
# await update.message.reply_text("Authentication failed. Please try again later.")
# return
user_message = update.message.text
hf_authorization = "Bearer " + HF_TOKEN
headers = {
"Authorization": hf_authorization,
"accept": "application/json"
}
json_payload = {"question": user_message}
form_payload = {"question": user_message}
try:
logging.info(f"Sending payload as JSON: {json_payload}")
response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/json"}, json=json_payload)
if response.status_code == 422:
logging.warning("JSON format rejected. Retrying with form-data...")
response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/x-www-form-urlencoded"},
data=form_payload)
if response.status_code == 200:
bot_reply = response.json().get("answer", "I didn't understand that.")
elif response.status_code == 401:
logging.warning("Authorization expired. Re-authenticating...")
# self.authenticate()
await self.chat_with_ai(update, context)
return
else:
logging.error(f"Error: {response.status_code}")
bot_reply = f"Error: {response.status_code}"
except Exception as e:
logging.error(f"Connection error: {e}")
bot_reply = f"Connection error: {e}"
await update.message.reply_text(bot_reply)
async def handle_excel(self, update: Update, context: CallbackContext):
"""Handles Excel file uploads."""
user_id = update.message.from_user.id
if user_id not in AUTHENTICATED_USERS:
await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
return
# if not self.auth_token:
# self.authenticate()
#
# if not self.auth_token:
# await update.message.reply_text("Authentication failed. Please try again later.")
# return
document = update.message.document
if not document.file_name.endswith(('.xls', '.xlsx')):
await update.message.reply_text("Please send only Excel files (.xls or .xlsx)")
return
try:
# Send initial processing message
processing_msg = await update.message.reply_text("πŸ“Š Processing your Excel file... This may take a few minutes.")
# Get file from Telegram
file = await context.bot.get_file(document.file_id)
file_bytes = await file.download_as_bytearray()
headers = {
"Authorization": f"Bearer {HF_TOKEN}",
"accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
}
async with aiohttp.ClientSession() as session:
# Create form data with the file
form_data = aiohttp.FormData()
form_data.add_field('file',
file_bytes,
filename=document.file_name,
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
# Send file to API
async with session.post(self.excel_url, headers=headers, data=form_data) as response:
if response.status == 200:
# Get the Excel file content directly
file_content = await response.read()
# Send the Excel file back to Telegram
await context.bot.send_document(
chat_id=update.effective_chat.id,
document=io.BytesIO(file_content),
filename='rfp_responses.xlsx',
caption="βœ… Here's your processed Excel file with answers!"
)
await processing_msg.delete()
elif response.status == 401:
logging.warning("Authorization expired. Re-authenticating...")
# self.authenticate()
await self.handle_excel(update, context)
else:
error_text = await response.text()
await processing_msg.edit_text(f"❌ Error processing Excel file: {error_text}")
except Exception as e:
logging.error(f"Error handling Excel file: {e}")
await update.message.reply_text(f"❌ Error processing Excel file: {str(e)}")
def setup_handlers(self):
"""Set up Telegram command and message handlers."""
self.app.add_handler(CommandHandler("start", self.start_command))
self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
self.app.add_handler(MessageHandler(filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"), self.handle_excel))
def run(self):
"""Start the bot and listen for messages."""
logging.info("Starting Telegram bot...")
self.app.run_polling()
if __name__ == "__main__":
bot = TelegramBot(
bot_token=BOT_TOKEN,
base_url=BASE_URL,
username=API_USERNAME,
password=API_PASSWORD
)
bot.run()