|
import telebot |
|
from telebot import types |
|
import subprocess |
|
import signal |
|
import os |
|
import threading |
|
import gradio as gr |
|
|
|
TOKEN = '7046208780:AAH42htq7wp-gz-XrZ8c1GqXjgRgBNJqNvg' |
|
bot = telebot.TeleBot(TOKEN) |
|
|
|
class StreamingProcess: |
|
def __init__(self, video_url, stream_key): |
|
self.video_url = video_url |
|
self.stream_key = stream_key |
|
|
|
self.logo_path = "hello.png" |
|
|
|
|
|
self.stream_cmd = [ |
|
"ffmpeg", |
|
"-re", |
|
"-i", video_url, |
|
"-i", self.logo_path, |
|
"-filter_complex", |
|
f"[0:v][1:v]overlay=W-w-10:10:format=auto,format=yuv444p", |
|
"-c:v", "libx264", |
|
"-preset", "veryfast", |
|
"-crf", "22", |
|
"-maxrate", "4000k", |
|
"-bufsize", "8000k", |
|
"-pix_fmt", "yuv420p", |
|
"-g", "50", |
|
"-c:a", "aac", |
|
"-b:a", "192k", |
|
"-ac", "2", |
|
"-ar", "44100", |
|
"-f", "flv", |
|
f"rtmp://a.rtmp.youtube.com/live2/{stream_key}" |
|
] |
|
self.video_url = video_url |
|
self.stream_key = stream_key |
|
self.process = None |
|
self.show_logs = True |
|
self.user_chat_id = None |
|
self.cancel_event = threading.Event() |
|
|
|
def start_stream(self): |
|
while True: |
|
self.restart_requested = False |
|
self.process = subprocess.Popen( |
|
self.stream_cmd, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT, |
|
text=True, |
|
preexec_fn=os.setsid |
|
) |
|
self.log_thread = threading.Thread(target=self.show_logs_thread) |
|
self.log_thread.start() |
|
user_input_thread = threading.Thread(target=self.handle_user_input) |
|
user_input_thread.start() |
|
user_input_thread.join() |
|
self.log_thread.join() |
|
if not self.restart_requested: |
|
break |
|
|
|
def show_logs_thread(self): |
|
while self.process.poll() is None: |
|
output = self.process.stdout.readline() |
|
if output and self.show_logs: |
|
print(output.strip()) |
|
|
|
def stop_stream(self): |
|
if self.process: |
|
try: |
|
os.killpg(os.getpgid(self.process.pid), signal.SIGINT) |
|
self.process.wait(timeout=10) |
|
print("Streaming process has been stopped.") |
|
except ProcessLookupError: |
|
print("Streaming process not found.") |
|
|
|
|
|
self.cancel_event.set() |
|
|
|
def handle_user_input(self): |
|
chat_id = self.chat_id |
|
markup = types.InlineKeyboardMarkup() |
|
stop_button = types.InlineKeyboardButton(text="Stop Stream", callback_data="stop_stream_confirm") |
|
markup.add(stop_button) |
|
|
|
while self.process.poll() is None: |
|
bot.send_message(chat_id, "Do you want to stop the stream?", reply_markup=markup) |
|
self.cancel_event.wait() |
|
self.cancel_event.clear() |
|
|
|
def ask_to_restart(self): |
|
user_input = input("Do you want to re-initiate the stream? (Yes/No): ") |
|
if user_input.lower() == "yes": |
|
self.restart_requested = True |
|
self.cancel_event.set() |
|
|
|
user_data = {} |
|
|
|
welcome_message = "Welcome to the Streaming Bot! Please enter the video URL and RTMP key." |
|
keyboard = types.ReplyKeyboardRemove(selective=False) |
|
|
|
@bot.message_handler(commands=['start']) |
|
def handle_start(message): |
|
bot.send_message(message.chat.id, welcome_message, reply_markup=keyboard) |
|
bot.send_message(message.chat.id, "Please enter the video URL:") |
|
bot.register_next_step_handler(message, get_url) |
|
|
|
def get_url(message): |
|
user_data[message.chat.id] = {'url': message.text} |
|
bot.send_message(message.chat.id, "Please enter the RTMP key:") |
|
bot.register_next_step_handler(message, get_rtmp_key) |
|
|
|
def get_rtmp_key(message): |
|
chat_id = message.chat.id |
|
user_info = user_data.get(chat_id) |
|
if user_info: |
|
user_info['rtmp_key'] = message.text |
|
markup = types.InlineKeyboardMarkup() |
|
start_button = types.InlineKeyboardButton(text="Start", callback_data="start_stream") |
|
markup.add(start_button) |
|
bot.send_message(chat_id, "You can now start the stream.", reply_markup=markup) |
|
else: |
|
bot.send_message(chat_id, "Oops! Something went wrong. Please start over.") |
|
|
|
@bot.callback_query_handler(func=lambda call: call.data == "start_stream") |
|
def start_stream_callback(call): |
|
chat_id = call.message.chat.id |
|
user_info = user_data.get(chat_id) |
|
|
|
if user_info and 'url' in user_info and 'rtmp_key' in user_info: |
|
video_url = user_info['url'] |
|
stream_key = user_info['rtmp_key'] |
|
streaming_process = StreamingProcess(video_url, stream_key) |
|
threading.Thread(target=streaming_process.start_stream).start() |
|
user_info['streaming_process'] = streaming_process |
|
markup = types.InlineKeyboardMarkup() |
|
stop_button = types.InlineKeyboardButton(text="Stop Stream", callback_data="stop_stream") |
|
markup.add(stop_button) |
|
bot.edit_message_text( |
|
chat_id=chat_id, |
|
message_id=call.message.message_id, |
|
text="Stream has been started. Use the inline button to stop the stream.", |
|
reply_markup=markup |
|
) |
|
else: |
|
bot.answer_callback_query(call.id, text="Please enter the video URL and RTMP key first.") |
|
|
|
@bot.callback_query_handler(func=lambda call: call.data == "stop_stream") |
|
def stop_stream_callback(call): |
|
chat_id = call.message.chat.id |
|
user_info = user_data.get(chat_id) |
|
|
|
if user_info and 'streaming_process' in user_info: |
|
streaming_process = user_info['streaming_process'] |
|
streaming_process.user_chat_id = chat_id |
|
threading.Thread(target=streaming_process.stop_stream).start() |
|
markup = types.InlineKeyboardMarkup() |
|
reinit_button = types.InlineKeyboardButton(text="Re-initiate Stream", callback_data="reinit_stream") |
|
new_stream_button = types.InlineKeyboardButton(text="Create New Stream", callback_data="new_stream") |
|
markup.row(reinit_button, new_stream_button) |
|
bot.edit_message_text( |
|
chat_id=chat_id, |
|
message_id=call.message.message_id, |
|
text="Stream has been stopped. Choose an option:", |
|
reply_markup=markup |
|
) |
|
else: |
|
bot.answer_callback_query(call.id, text="Stream is not available.") |
|
|
|
@bot.callback_query_handler(func=lambda call: call.data == "reinit_stream") |
|
def reinit_stream_callback(call): |
|
chat_id = call.message.chat.id |
|
user_info = user_data.get(chat_id) |
|
|
|
if user_info and 'url' in user_info and 'rtmp_key' in user_info: |
|
video_url = user_info['url'] |
|
stream_key = user_info['rtmp_key'] |
|
streaming_process = StreamingProcess(video_url, stream_key) |
|
threading.Thread(target=streaming_process.start_stream).start() |
|
user_info['streaming_process'] = streaming_process |
|
markup = types.InlineKeyboardMarkup() |
|
stop_button = types.InlineKeyboardButton(text="Stop Stream", callback_data="stop_stream") |
|
markup.add(stop_button) |
|
bot.edit_message_text( |
|
chat_id=chat_id, |
|
message_id=call.message.message_id, |
|
text="Stream has been re-initiated. Use the inline button to stop the stream.", |
|
reply_markup=markup |
|
) |
|
else: |
|
bot.answer_callback_query(call.id, text="Please enter the video URL and RTMP key first.") |
|
|
|
@bot.callback_query_handler(func=lambda call: call.data == "new_stream") |
|
def new_stream_callback(call): |
|
chat_id = call.message.chat.id |
|
user_info = user_data.get(chat_id) |
|
|
|
if user_info: |
|
bot.send_message(chat_id, "Please enter the video URL for the new stream:") |
|
bot.register_next_step_handler(call.message, new_stream_enter_url) |
|
else: |
|
bot.answer_callback_query(call.id, text="Please start over.") |
|
|
|
def new_stream_enter_url(message): |
|
chat_id = message.chat.id |
|
user_info = user_data.get(chat_id) |
|
|
|
if user_info: |
|
user_info['url'] = message.text |
|
markup = types.InlineKeyboardMarkup() |
|
rtmp_button = types.InlineKeyboardButton(text="Change RTMP Key", callback_data="new_stream_rtmp") |
|
start_button = types.InlineKeyboardButton(text="Start Stream", callback_data="new_stream_start") |
|
markup.row(rtmp_button, start_button) |
|
bot.send_message(chat_id, "Choose an option from the given below commands:", reply_markup=markup) |
|
user_info['stream_state'] = 'url_entered' |
|
else: |
|
bot.send_message(chat_id, "Oops! Something went wrong. Please start over.") |
|
|
|
@bot.callback_query_handler(func=lambda call: call.data == "new_stream_rtmp") |
|
def new_stream_rtmp_callback(call): |
|
chat_id = call.message.chat.id |
|
user_info = user_data.get(chat_id) |
|
|
|
if user_info and 'url' in user_info: |
|
bot.send_message(chat_id, "Please enter the new RTMP key:") |
|
bot.register_next_step_handler(call.message, new_stream_enter_new_rtmp) |
|
user_info['stream_state'] = 'rtmp_entering' |
|
else: |
|
bot.answer_callback_query(call.id, text="Please enter the video URL first.") |
|
|
|
def new_stream_enter_new_rtmp(message): |
|
chat_id = message.chat.id |
|
user_info = user_data.get(chat_id) |
|
|
|
if user_info and 'url' in user_info and user_info['stream_state'] == 'rtmp_entering': |
|
user_info['rtmp_key'] = message.text |
|
start_new_stream(chat_id, user_info) |
|
else: |
|
bot.send_message(chat_id, "Oops! Something went wrong. Please start over.") |
|
|
|
@bot.callback_query_handler(func=lambda call: call.data == "new_stream_start") |
|
def new_stream_start_callback(call): |
|
chat_id = call.message.chat.id |
|
user_info = user_data.get(chat_id) |
|
|
|
if user_info and 'url' in user_info: |
|
user_info['rtmp_key'] = user_info.get('rtmp_key', '') |
|
start_new_stream(chat_id, user_info) |
|
else: |
|
bot.answer_callback_query(call.id, text="Please enter the video URL first.") |
|
|
|
def start_new_stream(chat_id, user_info): |
|
video_url = user_info['url'] |
|
stream_key = user_info['rtmp_key'] |
|
markup = types.InlineKeyboardMarkup() |
|
stop_button = types.InlineKeyboardButton(text="Stop Stream", callback_data="stop_stream") |
|
markup.add(stop_button) |
|
streaming_process = StreamingProcess(video_url, stream_key) |
|
threading.Thread(target=streaming_process.start_stream).start() |
|
user_info['streaming_process'] = streaming_process |
|
bot.send_message(chat_id, "New stream has been started. Use the inline button to stop the stream.", reply_markup=markup) |
|
user_info.pop('stream_state', None) |
|
|
|
def start_bot_polling(): |
|
bot.polling(none_stop=True, timeout=60) |
|
|
|
|
|
def gradio_function(input_text): |
|
|
|
return "Gradio received: " + input_text |
|
|
|
iface = gr.Interface( |
|
fn=gradio_function, |
|
inputs=gr.inputs.Textbox(label="Type a message..."), |
|
outputs=gr.outputs.Textbox(), |
|
live=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
|
|
bot_thread = threading.Thread(target=start_bot_polling) |
|
bot_thread.daemon = True |
|
bot_thread.start() |
|
|
|
|
|
gradio_thread = threading.Thread(target=iface.launch) |
|
gradio_thread.daemon = True |
|
gradio_thread.start() |
|
|
|
|
|
bot_thread.join() |
|
gradio_thread.join() |