|
import os |
|
import ssl |
|
import time |
|
from threading import Thread |
|
|
|
import requests |
|
from telegram import Update |
|
from telegram import __version__ as TG_VER |
|
from telegram.ext import ( |
|
Application, |
|
CommandHandler, |
|
ContextTypes, |
|
MessageHandler, |
|
filters, |
|
) |
|
|
|
from app_modules.init import * |
|
|
|
ctx = ssl.create_default_context() |
|
ctx.set_ciphers("DEFAULT") |
|
|
|
try: |
|
from telegram import __version_info__ |
|
except ImportError: |
|
__version_info__ = (0, 0, 0, 0, 0) |
|
|
|
if __version_info__ < (20, 0, 0, "alpha", 1): |
|
raise RuntimeError( |
|
f"This example is not compatible with your current PTB version {TG_VER}. To view the " |
|
f"{TG_VER} version of this example, " |
|
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" |
|
) |
|
|
|
TOKEN = os.getenv("TELEGRAM_API_TOKEN") |
|
ENDPOINT = os.getenv("CHAT_API_URL") |
|
|
|
|
|
|
|
|
|
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
|
"""Send a message when the command /start is issued.""" |
|
user = update.effective_user |
|
await update.message.reply_html( |
|
rf"Hi {user.mention_html()}! You are welcome to ask questions on anything!", |
|
) |
|
|
|
|
|
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
|
"""Send a message when the command /help is issued.""" |
|
await update.message.reply_text("Help!") |
|
|
|
|
|
async def chat_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
|
"""Echo the user message.""" |
|
print(update) |
|
tic = time.perf_counter() |
|
try: |
|
message = { |
|
"question": update.message.text, |
|
"chat_id": update.message.chat.id, |
|
} |
|
print(message) |
|
x = requests.post(ENDPOINT, json=message).json() |
|
temp = time.perf_counter() |
|
print(f"Received response in {temp - tic:0.4f} seconds") |
|
print(x) |
|
result = x["result"] |
|
print(result) |
|
await update.message.reply_text(result[0:8192]) |
|
toc = time.perf_counter() |
|
print(f"Response time in {toc - tic:0.4f} seconds") |
|
except Exception as e: |
|
print("error", e) |
|
|
|
|
|
def start_telegram_bot() -> None: |
|
"""Start the bot.""" |
|
print("starting telegram bot ...") |
|
|
|
application = Application.builder().token(TOKEN).build() |
|
|
|
|
|
application.add_handler(CommandHandler("start_command", start_command)) |
|
application.add_handler(CommandHandler("help", help_command)) |
|
|
|
|
|
application.add_handler( |
|
MessageHandler(filters.TEXT & ~filters.COMMAND, chat_command) |
|
) |
|
|
|
application.run_polling() |
|
|
|
|
|
if __name__ == "__main__": |
|
start_telegram_bot() |
|
|