smu-ai / telegram_bot.py
dh-mc's picture
fix tele bot
7fe5844
import os
import ssl
import time
from threading import Thread
import requests
from telegram import Update
from telegram import __version__ as TG_VER
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from app_modules.init import app_init
llm_loader, qa_chain = app_init()
ctx = ssl.create_default_context()
ctx.set_ciphers("DEFAULT")
try:
from telegram import __version_info__
except ImportError:
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
if __version_info__ < (20, 0, 0, "alpha", 1):
raise RuntimeError(
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
f"{TG_VER} version of this example, "
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
)
TOKEN = os.getenv("TELEGRAM_API_TOKEN")
# Define a few command handlers. These usually take the two arguments update and
# context.
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /start is issued."""
user = update.effective_user
await update.message.reply_html(
rf"Hi {user.mention_html()}! You are welcome to ask questions on anything!",
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued."""
await update.message.reply_text("Help!")
async def chat_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Echo the user message."""
print(update)
tic = time.perf_counter()
try:
result = qa_chain.call_chain(
{"question": update.message.text, "chat_history": []}, None
)
result = result["answer"]
print(result)
await update.message.reply_text(result[0:8192])
toc = time.perf_counter()
print(f"Response time in {toc - tic:0.4f} seconds")
except Exception as e:
print("error", e)
def start_telegram_bot() -> None:
"""Start the bot."""
print("starting telegram bot ...")
# Create the Application and pass it your bot's token.
application = Application.builder().token(TOKEN).build()
# on different commands - answer in Telegram
application.add_handler(CommandHandler("start_command", start_command))
application.add_handler(CommandHandler("help", help_command))
# on non command i.e message - chat_command the message on Telegram
application.add_handler(
MessageHandler(filters.TEXT & ~filters.COMMAND, chat_command)
)
application.run_polling()
if __name__ == "__main__":
start_telegram_bot()