import os |
import ssl |
import time |
from threading import Thread |
import requests |
from telegram import Update |
from telegram import __version__ as TG_VER |
from telegram.ext import ( |
Application, |
CommandHandler, |
ContextTypes, |
MessageHandler, |
filters, |
) |
from app_modules.init import * |
ctx = ssl.create_default_context() |
ctx.set_ciphers("DEFAULT") |
try: |
from telegram import __version_info__ |
except ImportError: |
__version_info__ = (0, 0, 0, 0, 0) |
if __version_info__ < (20, 0, 0, "alpha", 1): |
raise RuntimeError( |
f"This example is not compatible with your current PTB version {TG_VER}. To view the " |
f"{TG_VER} version of this example, " |
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" |
) |
ENDPOINT = os.getenv("CHAT_API_URL") |
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
"""Send a message when the command /start is issued.""" |
user = update.effective_user |
await update.message.reply_html( |
rf"Hi {user.mention_html()}! You are welcome to ask questions on anything!", |
) |
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
"""Send a message when the command /help is issued.""" |
await update.message.reply_text("Help!") |
async def chat_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
"""Echo the user message.""" |
print(update) |
tic = time.perf_counter() |
try: |
message = { |
"question": update.message.text, |
"chat_id": update.message.chat.id, |
} |
print(message) |
x = requests.post(ENDPOINT, json=message).json() |
temp = time.perf_counter() |
print(f"Received response in {temp - tic:0.4f} seconds") |
print(x) |
result = x["result"] |
print(result) |
await update.message.reply_text(result[0:8192]) |
toc = time.perf_counter() |
print(f"Response time in {toc - tic:0.4f} seconds") |
except Exception as e: |
print("error", e) |
def start_telegram_bot() -> None: |
"""Start the bot.""" |
print("starting telegram bot ...") |
application = Application.builder().token(TOKEN).build() |
application.add_handler(CommandHandler("start_command", start_command)) |
application.add_handler(CommandHandler("help", help_command)) |
application.add_handler( |
MessageHandler(filters.TEXT & ~filters.COMMAND, chat_command) |
) |
application.run_polling() |
if __name__ == "__main__": |
start_telegram_bot() |