Spaces:
Runtime error
Runtime error
File size: 2,888 Bytes
d5edf96 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
import os
import ssl
import time
from threading import Thread
import requests
from telegram import Update
from telegram import __version__ as TG_VER
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from app_modules.init import *
ctx = ssl.create_default_context()
ctx.set_ciphers("DEFAULT")
try:
from telegram import __version_info__
except ImportError:
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
if __version_info__ < (20, 0, 0, "alpha", 1):
raise RuntimeError(
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
f"{TG_VER} version of this example, "
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
)
TOKEN = os.getenv("TELEGRAM_API_TOKEN")
ENDPOINT = os.getenv("CHAT_API_URL")
# Define a few command handlers. These usually take the two arguments update and
# context.
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /start is issued."""
user = update.effective_user
await update.message.reply_html(
rf"Hi {user.mention_html()}! You are welcome to ask questions on anything!",
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued."""
await update.message.reply_text("Help!")
async def chat_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Echo the user message."""
print(update)
tic = time.perf_counter()
try:
message = {
"question": update.message.text,
"chat_id": update.message.chat.id,
}
print(message)
x = requests.post(ENDPOINT, json=message).json()
temp = time.perf_counter()
print(f"Received response in {temp - tic:0.4f} seconds")
print(x)
result = x["result"]
print(result)
await update.message.reply_text(result[0:8192])
toc = time.perf_counter()
print(f"Response time in {toc - tic:0.4f} seconds")
except Exception as e:
print("error", e)
def start_telegram_bot() -> None:
"""Start the bot."""
print("starting telegram bot ...")
# Create the Application and pass it your bot's token.
application = Application.builder().token(TOKEN).build()
# on different commands - answer in Telegram
application.add_handler(CommandHandler("start_command", start_command))
application.add_handler(CommandHandler("help", help_command))
# on non command i.e message - chat_command the message on Telegram
application.add_handler(
MessageHandler(filters.TEXT & ~filters.COMMAND, chat_command)
)
application.run_polling()
if __name__ == "__main__":
start_telegram_bot()
|