bottest2 / app.py
AstraOS's picture
Update app.py
2a40199 verified
raw
history blame contribute delete
No virus
12 kB
import telebot
from telebot import types
import subprocess
import signal
import os
import threading
import gradio as gr
TOKEN = '7046208780:AAH42htq7wp-gz-XrZ8c1GqXjgRgBNJqNvg'
bot = telebot.TeleBot(TOKEN)
class StreamingProcess:
def __init__(self, video_url, stream_key):
self.video_url = video_url
self.stream_key = stream_key
# Add the path to the logo image
self.logo_path = "hello.png"
# Modify the stream_cmd list to include the logo overlay filter with adjusted dimensions
self.stream_cmd = [
"ffmpeg",
"-re",
"-i", video_url,
"-i", self.logo_path, # Input logo image
"-filter_complex",
f"[0:v][1:v]overlay=W-w-10:10:format=auto,format=yuv444p", # Overlay logo in the top right corner with Lanczos resampling
"-c:v", "libx264",
"-preset", "veryfast",
"-crf", "22",
"-maxrate", "4000k",
"-bufsize", "8000k",
"-pix_fmt", "yuv420p",
"-g", "50",
"-c:a", "aac",
"-b:a", "192k",
"-ac", "2",
"-ar", "44100",
"-f", "flv",
f"rtmp://a.rtmp.youtube.com/live2/{stream_key}"
]
self.video_url = video_url
self.stream_key = stream_key
self.process = None
self.show_logs = True
self.user_chat_id = None
self.cancel_event = threading.Event()
def start_stream(self):
while True:
self.restart_requested = False
self.process = subprocess.Popen(
self.stream_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
preexec_fn=os.setsid
)
self.log_thread = threading.Thread(target=self.show_logs_thread)
self.log_thread.start()
user_input_thread = threading.Thread(target=self.handle_user_input)
user_input_thread.start()
user_input_thread.join()
self.log_thread.join()
if not self.restart_requested:
break
def show_logs_thread(self):
while self.process.poll() is None:
output = self.process.stdout.readline()
if output and self.show_logs:
print(output.strip())
def stop_stream(self):
if self.process:
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGINT)
self.process.wait(timeout=10)
print("Streaming process has been stopped.")
except ProcessLookupError:
print("Streaming process not found.")
# Clear the event to release the user input wait
self.cancel_event.set()
def handle_user_input(self):
chat_id = self.chat_id
markup = types.InlineKeyboardMarkup()
stop_button = types.InlineKeyboardButton(text="Stop Stream", callback_data="stop_stream_confirm")
markup.add(stop_button)
while self.process.poll() is None:
bot.send_message(chat_id, "Do you want to stop the stream?", reply_markup=markup)
self.cancel_event.wait() # Wait for a response from the user
self.cancel_event.clear() # Reset the event
def ask_to_restart(self):
user_input = input("Do you want to re-initiate the stream? (Yes/No): ")
if user_input.lower() == "yes":
self.restart_requested = True
self.cancel_event.set()
user_data = {}
welcome_message = "Welcome to the Streaming Bot! Please enter the video URL and RTMP key."
keyboard = types.ReplyKeyboardRemove(selective=False)
@bot.message_handler(commands=['start'])
def handle_start(message):
bot.send_message(message.chat.id, welcome_message, reply_markup=keyboard)
bot.send_message(message.chat.id, "Please enter the video URL:")
bot.register_next_step_handler(message, get_url)
def get_url(message):
user_data[message.chat.id] = {'url': message.text}
bot.send_message(message.chat.id, "Please enter the RTMP key:")
bot.register_next_step_handler(message, get_rtmp_key)
def get_rtmp_key(message):
chat_id = message.chat.id
user_info = user_data.get(chat_id)
if user_info:
user_info['rtmp_key'] = message.text
markup = types.InlineKeyboardMarkup()
start_button = types.InlineKeyboardButton(text="Start", callback_data="start_stream")
markup.add(start_button)
bot.send_message(chat_id, "You can now start the stream.", reply_markup=markup)
else:
bot.send_message(chat_id, "Oops! Something went wrong. Please start over.")
@bot.callback_query_handler(func=lambda call: call.data == "start_stream")
def start_stream_callback(call):
chat_id = call.message.chat.id
user_info = user_data.get(chat_id)
if user_info and 'url' in user_info and 'rtmp_key' in user_info:
video_url = user_info['url']
stream_key = user_info['rtmp_key']
streaming_process = StreamingProcess(video_url, stream_key)
threading.Thread(target=streaming_process.start_stream).start()
user_info['streaming_process'] = streaming_process
markup = types.InlineKeyboardMarkup()
stop_button = types.InlineKeyboardButton(text="Stop Stream", callback_data="stop_stream")
markup.add(stop_button)
bot.edit_message_text(
chat_id=chat_id,
message_id=call.message.message_id,
text="Stream has been started. Use the inline button to stop the stream.",
reply_markup=markup
)
else:
bot.answer_callback_query(call.id, text="Please enter the video URL and RTMP key first.")
@bot.callback_query_handler(func=lambda call: call.data == "stop_stream")
def stop_stream_callback(call):
chat_id = call.message.chat.id
user_info = user_data.get(chat_id)
if user_info and 'streaming_process' in user_info:
streaming_process = user_info['streaming_process']
streaming_process.user_chat_id = chat_id # Store the user's chat ID
threading.Thread(target=streaming_process.stop_stream).start() # Start the stop_stream in a separate thread
markup = types.InlineKeyboardMarkup()
reinit_button = types.InlineKeyboardButton(text="Re-initiate Stream", callback_data="reinit_stream")
new_stream_button = types.InlineKeyboardButton(text="Create New Stream", callback_data="new_stream")
markup.row(reinit_button, new_stream_button)
bot.edit_message_text(
chat_id=chat_id,
message_id=call.message.message_id,
text="Stream has been stopped. Choose an option:",
reply_markup=markup
)
else:
bot.answer_callback_query(call.id, text="Stream is not available.")
@bot.callback_query_handler(func=lambda call: call.data == "reinit_stream")
def reinit_stream_callback(call):
chat_id = call.message.chat.id
user_info = user_data.get(chat_id)
if user_info and 'url' in user_info and 'rtmp_key' in user_info:
video_url = user_info['url']
stream_key = user_info['rtmp_key']
streaming_process = StreamingProcess(video_url, stream_key)
threading.Thread(target=streaming_process.start_stream).start()
user_info['streaming_process'] = streaming_process
markup = types.InlineKeyboardMarkup()
stop_button = types.InlineKeyboardButton(text="Stop Stream", callback_data="stop_stream")
markup.add(stop_button)
bot.edit_message_text(
chat_id=chat_id,
message_id=call.message.message_id,
text="Stream has been re-initiated. Use the inline button to stop the stream.",
reply_markup=markup
)
else:
bot.answer_callback_query(call.id, text="Please enter the video URL and RTMP key first.")
@bot.callback_query_handler(func=lambda call: call.data == "new_stream")
def new_stream_callback(call):
chat_id = call.message.chat.id
user_info = user_data.get(chat_id)
if user_info:
bot.send_message(chat_id, "Please enter the video URL for the new stream:")
bot.register_next_step_handler(call.message, new_stream_enter_url)
else:
bot.answer_callback_query(call.id, text="Please start over.")
def new_stream_enter_url(message):
chat_id = message.chat.id
user_info = user_data.get(chat_id)
if user_info:
user_info['url'] = message.text
markup = types.InlineKeyboardMarkup()
rtmp_button = types.InlineKeyboardButton(text="Change RTMP Key", callback_data="new_stream_rtmp")
start_button = types.InlineKeyboardButton(text="Start Stream", callback_data="new_stream_start")
markup.row(rtmp_button, start_button)
bot.send_message(chat_id, "Choose an option from the given below commands:", reply_markup=markup)
user_info['stream_state'] = 'url_entered'
else:
bot.send_message(chat_id, "Oops! Something went wrong. Please start over.")
@bot.callback_query_handler(func=lambda call: call.data == "new_stream_rtmp")
def new_stream_rtmp_callback(call):
chat_id = call.message.chat.id
user_info = user_data.get(chat_id)
if user_info and 'url' in user_info:
bot.send_message(chat_id, "Please enter the new RTMP key:")
bot.register_next_step_handler(call.message, new_stream_enter_new_rtmp)
user_info['stream_state'] = 'rtmp_entering'
else:
bot.answer_callback_query(call.id, text="Please enter the video URL first.")
def new_stream_enter_new_rtmp(message):
chat_id = message.chat.id
user_info = user_data.get(chat_id)
if user_info and 'url' in user_info and user_info['stream_state'] == 'rtmp_entering':
user_info['rtmp_key'] = message.text
start_new_stream(chat_id, user_info)
else:
bot.send_message(chat_id, "Oops! Something went wrong. Please start over.")
@bot.callback_query_handler(func=lambda call: call.data == "new_stream_start")
def new_stream_start_callback(call):
chat_id = call.message.chat.id
user_info = user_data.get(chat_id)
if user_info and 'url' in user_info:
user_info['rtmp_key'] = user_info.get('rtmp_key', '')
start_new_stream(chat_id, user_info)
else:
bot.answer_callback_query(call.id, text="Please enter the video URL first.")
def start_new_stream(chat_id, user_info):
video_url = user_info['url']
stream_key = user_info['rtmp_key']
markup = types.InlineKeyboardMarkup()
stop_button = types.InlineKeyboardButton(text="Stop Stream", callback_data="stop_stream")
markup.add(stop_button)
streaming_process = StreamingProcess(video_url, stream_key)
threading.Thread(target=streaming_process.start_stream).start()
user_info['streaming_process'] = streaming_process
bot.send_message(chat_id, "New stream has been started. Use the inline button to stop the stream.", reply_markup=markup)
user_info.pop('stream_state', None)
def start_bot_polling():
bot.polling(none_stop=True, timeout=60) # Set timeout to 0 for no timeout
# Define your Gradio interface function
def gradio_function(input_text):
# You can perform actions based on user input from Gradio here
return "Gradio received: " + input_text
iface = gr.Interface(
fn=gradio_function,
inputs=gr.inputs.Textbox(label="Type a message..."),
outputs=gr.outputs.Textbox(),
live=True
)
if __name__ == "__main__":
# Start the bot polling in a separate thread
bot_thread = threading.Thread(target=start_bot_polling)
bot_thread.daemon = True
bot_thread.start()
# Start the Gradio interface in a separate thread
gradio_thread = threading.Thread(target=iface.launch)
gradio_thread.daemon = True
gradio_thread.start()
# Wait for both threads to finish (you can add more graceful exit handling if needed)
bot_thread.join()
gradio_thread.join()