Spaces:
Sleeping
Sleeping
import sys | |
import os | |
from pyngrok import ngrok | |
import uvicorn | |
import asyncio | |
from .events import Events | |
from .constants import Constants | |
__all__ = [] | |
if __name__ == "__main__": | |
if Events.TOKEN is None: | |
sys.exit("No TELEGRAM_TOKEN found in the environment, exiting now.") | |
PORT = Events.PORT | |
loop = asyncio.get_event_loop() | |
# Run with ngrok if the parameter is given | |
running_option = None | |
if len(sys.argv) == 2: | |
running_option = sys.argv[1] | |
if running_option == "ngrok": | |
ngrok_token = str(os.environ.get("NGROK_TOKEN")) | |
if ngrok_token == "None": | |
print( | |
"NGROK auth token is not found in the environment. Ngrok will timeout after a few hours." | |
) | |
else: | |
print(f"NGROK TOKEN: {ngrok_token}") | |
ngrok.set_auth_token(ngrok_token) | |
http_tunnel = ngrok.connect(PORT, bind_tls=True) | |
ssh_tunnel = ngrok.connect(22, "tcp") | |
public_url = http_tunnel.public_url | |
ssh_url = ssh_tunnel.public_url | |
Events.HOST_URL = public_url | |
_ = loop.run_until_complete( | |
Events.send_a_message_to_user( | |
Constants.BROADCAST_CHAT_ID, f"ssh: {ssh_url}, http:{public_url}" | |
) | |
) | |
else: | |
public_url = loop.run_until_complete(Events.get_public_ip()) | |
Events.HOST_URL = f"https://{public_url}" | |
if running_option == "self_signed": | |
Events.SELF_SIGNED = True | |
print(Events.HOST_URL) | |
success = loop.run_until_complete(Events.set_telegram_webhook_url()) | |
if success: | |
uvicorn.run( | |
"src.listener:app", | |
host="0.0.0.0", | |
port=PORT, | |
reload=True, | |
log_level="info", | |
) | |
else: | |
print("Fail, closing the app.") | |