|
import logging |
|
import threading |
|
import time |
|
import datetime |
|
import traceback |
|
import fractions |
|
|
|
from fastapi import FastAPI, Request |
|
import av |
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
|
|
|
|
|
|
user_inputs = {} |
|
|
|
|
|
|
|
conversation_fields = [] |
|
current_step = None |
|
advanced_mode = False |
|
|
|
|
|
default_settings = { |
|
"quality_settings": "medium", |
|
"video_codec": "libx264", |
|
"audio_codec": "aac", |
|
"output_url": "rtmp://a.rtmp.youtube.com/live2" |
|
} |
|
|
|
|
|
streaming_state = "idle" |
|
stream_chat_id = None |
|
stream_start_time = None |
|
frames_encoded = 0 |
|
bytes_sent = 0 |
|
|
|
|
|
video_stream = None |
|
audio_stream_in = None |
|
output_stream = None |
|
|
|
|
|
stream_thread = None |
|
live_log_thread = None |
|
|
|
|
|
live_log_lines = [] |
|
live_log_display = "" |
|
error_notification = "" |
|
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
level=logging.DEBUG, |
|
format="%(asctime)s [%(levelname)s] %(message)s", |
|
datefmt="%Y-%m-%d %H:%M:%S", |
|
) |
|
logger = logging.getLogger() |
|
|
|
def append_live_log(line: str): |
|
global live_log_lines |
|
live_log_lines.append(line) |
|
if len(live_log_lines) > 50: |
|
live_log_lines.pop(0) |
|
|
|
class ListHandler(logging.Handler): |
|
def emit(self, record): |
|
log_entry = self.format(record) |
|
append_live_log(log_entry) |
|
|
|
list_handler = ListHandler() |
|
list_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S")) |
|
logger.addHandler(list_handler) |
|
|
|
|
|
|
|
|
|
def compose_streaming_message(): |
|
global live_log_display, error_notification, streaming_state, frames_encoded, bytes_sent |
|
stats = f"State: {streaming_state} | Uptime: {get_uptime()} | Frames: {frames_encoded} | Bytes: {bytes_sent}\n" |
|
msg = "" |
|
if error_notification: |
|
msg += "<b>ERROR:</b> " + error_notification + "\n\n" |
|
msg += "🚀 <b>Streaming in progress!</b>\n" |
|
msg += stats + "\n" |
|
msg += "Live Logs:\n" + live_log_display + "\n\nUse the inline keyboard to control the stream." |
|
return msg |
|
|
|
|
|
|
|
|
|
def create_html_message(text: str): |
|
|
|
return {"parse_mode": "HTML", "text": f"<pre>{text}</pre>"} |
|
|
|
def get_inline_keyboard_for_stream(): |
|
|
|
keyboard = { |
|
"inline_keyboard": [ |
|
[ |
|
{"text": "⏸ Pause", "callback_data": "pause"}, |
|
{"text": "▶️ Resume", "callback_data": "resume"}, |
|
{"text": "⏹ Abort", "callback_data": "abort"} |
|
], |
|
[ |
|
{"text": "📊 Status", "callback_data": "status"} |
|
] |
|
] |
|
} |
|
return keyboard |
|
|
|
def get_inline_keyboard_for_start(): |
|
|
|
keyboard = { |
|
"inline_keyboard": [ |
|
[ |
|
{"text": "🚀 Start Streaming", "callback_data": "start_stream"} |
|
] |
|
] |
|
} |
|
return keyboard |
|
|
|
def help_text(): |
|
return ( |
|
"*Stream Bot Help*\n\n" |
|
"*/start* - Begin setup for streaming (simple mode: only Input & Output URL)\n" |
|
"*/setting* - Enter advanced settings (Input URL, Quality Settings, Video Codec, Audio Codec, Output URL)\n" |
|
"*/help* - Display this help text\n" |
|
"*/logs* - Show the log history (live log display)\n\n" |
|
"After inputs are collected, press the inline *Start Streaming* button.\n\n" |
|
"While streaming, you can use inline buttons or commands:\n" |
|
"*/pause* - Pause the stream\n" |
|
"*/resume* - Resume a paused stream\n" |
|
"*/abort* - Abort the stream\n" |
|
"*/status* - Get current stream statistics" |
|
) |
|
|
|
def send_guide_message(chat_id, message): |
|
|
|
logging.info(f"Sending message to chat {chat_id}: {message}") |
|
return { |
|
"method": "sendMessage", |
|
"chat_id": chat_id, |
|
"text": message, |
|
"parse_mode": "Markdown" |
|
} |
|
|
|
def reset_statistics(): |
|
global stream_start_time, frames_encoded, bytes_sent |
|
stream_start_time = datetime.datetime.now() |
|
frames_encoded = 0 |
|
bytes_sent = 0 |
|
|
|
def get_uptime(): |
|
if stream_start_time: |
|
uptime = datetime.datetime.now() - stream_start_time |
|
return str(uptime).split('.')[0] |
|
return "0" |
|
|
|
def validate_inputs(): |
|
|
|
missing = [field for field in conversation_fields if field not in user_inputs or not user_inputs[field]] |
|
if missing: |
|
return False, f"Missing fields: {', '.join(missing)}" |
|
return True, "" |
|
|
|
|
|
|
|
|
|
def notify_error(chat_id, error_message): |
|
global error_notification |
|
error_notification = error_message |
|
logging.error(f"Error for chat {chat_id}: {error_message}") |
|
|
|
|
|
|
|
|
|
def live_log_updater(): |
|
global live_log_display, streaming_state |
|
try: |
|
while streaming_state in ["streaming", "paused"]: |
|
|
|
live_log_display = "<pre>" + "\n".join(live_log_lines[-15:]) + "</pre>" |
|
time.sleep(1) |
|
except Exception as e: |
|
logging.error(f"Error in live log updater: {e}") |
|
|
|
|
|
|
|
|
|
def logs_history(chat_id): |
|
global live_log_display, error_notification |
|
log_text = live_log_display if live_log_display else "<pre>No logs available yet.</pre>" |
|
if error_notification: |
|
if log_text.startswith("<pre>"): |
|
log_text = f"<pre>ERROR: {error_notification}\n\n" + log_text[5:] |
|
else: |
|
log_text = f"<pre>ERROR: {error_notification}\n\n{log_text}</pre>" |
|
return { |
|
"method": "sendMessage", |
|
"chat_id": chat_id, |
|
"text": log_text, |
|
"parse_mode": "HTML" |
|
} |
|
|
|
|
|
|
|
|
|
def handle_start(chat_id): |
|
global current_step, user_inputs, conversation_fields, advanced_mode |
|
|
|
user_inputs = {} |
|
if not advanced_mode: |
|
conversation_fields = ["input_url", "output_url"] |
|
else: |
|
conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"] |
|
current_step = conversation_fields[0] |
|
text = ("👋 *Welcome to the Stream Bot!*\n\n" |
|
"Let's set up your stream.\n" |
|
f"Please enter the *{current_step.replace('_', ' ')}*" |
|
f"{' (no default)' if current_step not in default_settings else f' _(default: {default_settings[current_step]})_'}:") |
|
logging.info(f"/start command from chat {chat_id} (advanced_mode={advanced_mode})") |
|
return { |
|
"method": "sendMessage", |
|
"chat_id": chat_id, |
|
"text": text, |
|
"parse_mode": "Markdown" |
|
} |
|
|
|
def handle_setting(chat_id): |
|
global advanced_mode, conversation_fields, current_step, user_inputs |
|
advanced_mode = True |
|
conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"] |
|
user_inputs = {} |
|
current_step = conversation_fields[0] |
|
text = ("⚙️ *Advanced Mode Activated!*\n\n" |
|
"Please enter the *input url*:") |
|
logging.info(f"/setting command from chat {chat_id} - advanced mode enabled") |
|
return { |
|
"method": "sendMessage", |
|
"chat_id": chat_id, |
|
"text": text, |
|
"parse_mode": "Markdown" |
|
} |
|
|
|
def handle_help(chat_id): |
|
logging.info(f"/help command from chat {chat_id}") |
|
return { |
|
"method": "sendMessage", |
|
"chat_id": chat_id, |
|
"text": help_text(), |
|
"parse_mode": "Markdown" |
|
} |
|
|
|
def handle_conversation(chat_id, text): |
|
global current_step, user_inputs, conversation_fields |
|
if current_step: |
|
if text.strip() == "" and current_step in default_settings: |
|
user_inputs[current_step] = default_settings[current_step] |
|
logging.info(f"Using default for {current_step}: {default_settings[current_step]}") |
|
else: |
|
user_inputs[current_step] = text.strip() |
|
logging.info(f"Received {current_step}: {text.strip()}") |
|
|
|
idx = conversation_fields.index(current_step) |
|
if idx < len(conversation_fields) - 1: |
|
current_step = conversation_fields[idx + 1] |
|
prompt = f"Please enter the *{current_step.replace('_', ' ')}*" |
|
if current_step in default_settings: |
|
prompt += f" _(default: {default_settings[current_step]})_" |
|
return send_guide_message(chat_id, prompt) |
|
else: |
|
current_step = None |
|
valid, msg = validate_inputs() |
|
if not valid: |
|
return send_guide_message(chat_id, f"Validation error: {msg}") |
|
if not advanced_mode: |
|
user_inputs.setdefault("quality_settings", default_settings["quality_settings"]) |
|
user_inputs.setdefault("video_codec", default_settings["video_codec"]) |
|
user_inputs.setdefault("audio_codec", default_settings["audio_codec"]) |
|
return { |
|
"method": "sendMessage", |
|
"chat_id": chat_id, |
|
"text": "All inputs received. Press *🚀 Start Streaming* to begin.", |
|
"reply_markup": get_inline_keyboard_for_start(), |
|
"parse_mode": "Markdown" |
|
} |
|
else: |
|
return send_guide_message(chat_id, "Unrecognized input. Type /help for available commands.") |
|
|
|
|
|
|
|
|
|
def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, output_url, chat_id): |
|
global video_stream, audio_stream_in, output_stream, streaming_state, frames_encoded, bytes_sent |
|
logging.info("Initiating streaming to YouTube") |
|
try: |
|
streaming_state = "streaming" |
|
reset_statistics() |
|
|
|
input_stream = av.open(input_url) |
|
output_stream = av.open(output_url, mode='w', format='flv') |
|
|
|
|
|
video_stream = output_stream.add_stream(video_codec, rate=30) |
|
video_stream.width = input_stream.streams.video[0].width |
|
video_stream.height = input_stream.streams.video[0].height |
|
video_stream.pix_fmt = input_stream.streams.video[0].format.name |
|
video_stream.codec_context.options.update({'g': '30'}) |
|
|
|
if quality_settings.lower() == "high": |
|
video_stream.bit_rate = 3000000 |
|
video_stream.bit_rate_tolerance = 1000000 |
|
elif quality_settings.lower() == "medium": |
|
video_stream.bit_rate = 1500000 |
|
video_stream.bit_rate_tolerance = 500000 |
|
elif quality_settings.lower() == "low": |
|
video_stream.bit_rate = 800000 |
|
video_stream.bit_rate_tolerance = 200000 |
|
|
|
|
|
audio_stream_in = input_stream.streams.audio[0] |
|
out_audio_stream = output_stream.add_stream(audio_codec, rate=audio_stream_in.rate) |
|
out_audio_stream.layout = "stereo" |
|
|
|
|
|
video_stream.codec_context.time_base = fractions.Fraction(1, 30) |
|
|
|
logging.info("Streaming started successfully.") |
|
|
|
|
|
global live_log_thread |
|
if live_log_thread is None or not live_log_thread.is_alive(): |
|
live_log_thread = threading.Thread(target=live_log_updater) |
|
live_log_thread.daemon = True |
|
live_log_thread.start() |
|
logging.info("Live log updater thread started.") |
|
|
|
|
|
while streaming_state in ["streaming", "paused"]: |
|
for packet in input_stream.demux(): |
|
if streaming_state == "stopped": |
|
break |
|
if packet.stream == input_stream.streams.video[0]: |
|
for frame in packet.decode(): |
|
if streaming_state == "paused": |
|
time.sleep(0.5) |
|
continue |
|
for out_packet in video_stream.encode(frame): |
|
output_stream.mux(out_packet) |
|
frames_encoded += 1 |
|
if hasattr(out_packet, "size"): |
|
bytes_sent += out_packet.size |
|
elif packet.stream == audio_stream_in: |
|
for frame in packet.decode(): |
|
if streaming_state == "paused": |
|
time.sleep(0.5) |
|
continue |
|
for out_packet in out_audio_stream.encode(frame): |
|
output_stream.mux(out_packet) |
|
if hasattr(out_packet, "size"): |
|
bytes_sent += out_packet.size |
|
|
|
|
|
for out_packet in video_stream.encode(): |
|
output_stream.mux(out_packet) |
|
for out_packet in out_audio_stream.encode(): |
|
output_stream.mux(out_packet) |
|
|
|
if streaming_state == "paused": |
|
time.sleep(1) |
|
|
|
|
|
try: |
|
video_stream.close() |
|
out_audio_stream.close() |
|
output_stream.close() |
|
input_stream.close() |
|
except Exception as cleanup_error: |
|
logging.error(f"Error during cleanup: {cleanup_error}") |
|
|
|
logging.info("Streaming complete, resources cleaned up.") |
|
streaming_state = "idle" |
|
except Exception as e: |
|
error_message = f"An error occurred during streaming: {str(e)}\n\n{traceback.format_exc()}" |
|
logging.error(error_message) |
|
streaming_state = "idle" |
|
notify_error(chat_id, error_message) |
|
|
|
def start_streaming(chat_id): |
|
global stream_thread, stream_chat_id |
|
valid, msg = validate_inputs() |
|
if not valid: |
|
return send_guide_message(chat_id, f"Validation error: {msg}") |
|
|
|
stream_chat_id = chat_id |
|
try: |
|
stream_thread = threading.Thread( |
|
target=stream_to_youtube, |
|
args=( |
|
user_inputs["input_url"], |
|
user_inputs["quality_settings"], |
|
user_inputs["video_codec"], |
|
user_inputs["audio_codec"], |
|
user_inputs["output_url"], |
|
chat_id, |
|
) |
|
) |
|
stream_thread.daemon = True |
|
stream_thread.start() |
|
logging.info("Streaming thread started.") |
|
|
|
return { |
|
"method": "sendMessage", |
|
"chat_id": chat_id, |
|
"text": "🚀 <b>Streaming initiated!</b>\n\n" + compose_streaming_message(), |
|
"reply_markup": get_inline_keyboard_for_stream(), |
|
"parse_mode": "HTML" |
|
} |
|
except Exception as e: |
|
error_message = f"Failed to start streaming: {str(e)}" |
|
logging.error(error_message) |
|
notify_error(chat_id, error_message) |
|
return send_guide_message(chat_id, error_message) |
|
|
|
|
|
|
|
|
|
def pause_stream(chat_id): |
|
global streaming_state |
|
if streaming_state == "streaming": |
|
streaming_state = "paused" |
|
logging.info("Streaming paused.") |
|
return { |
|
"method": "sendMessage", |
|
"chat_id": chat_id, |
|
"text": "⏸ <b>Streaming paused.</b>", |
|
"parse_mode": "HTML" |
|
} |
|
return send_guide_message(chat_id, "Streaming is not active.") |
|
|
|
def resume_stream(chat_id): |
|
global streaming_state |
|
if streaming_state == "paused": |
|
streaming_state = "streaming" |
|
logging.info("Streaming resumed.") |
|
return { |
|
"method": "sendMessage", |
|
"chat_id": chat_id, |
|
"text": "▶️ <b>Streaming resumed.</b>", |
|
"parse_mode": "HTML" |
|
} |
|
return send_guide_message(chat_id, "Streaming is not paused.") |
|
|
|
def abort_stream(chat_id): |
|
global streaming_state |
|
if streaming_state in ["streaming", "paused"]: |
|
streaming_state = "stopped" |
|
logging.info("Streaming aborted by user.") |
|
return { |
|
"method": "sendMessage", |
|
"chat_id": chat_id, |
|
"text": "⏹ <b>Streaming aborted.</b>", |
|
"parse_mode": "HTML" |
|
} |
|
return send_guide_message(chat_id, "No active streaming to abort.") |
|
|
|
def stream_status(chat_id): |
|
stats = ( |
|
f"*Stream Status:*\n\n" |
|
f"• **State:** {streaming_state}\n" |
|
f"• **Uptime:** {get_uptime()}\n" |
|
f"• **Frames Encoded:** {frames_encoded}\n" |
|
f"• **Bytes Sent:** {bytes_sent}\n" |
|
) |
|
return { |
|
"method": "sendMessage", |
|
"chat_id": chat_id, |
|
"text": stats, |
|
"parse_mode": "Markdown" |
|
} |
|
|
|
|
|
|
|
|
|
@app.post("/webhook") |
|
async def telegram_webhook(request: Request): |
|
update = await request.json() |
|
logging.debug(f"Received update: {update}") |
|
|
|
|
|
if "message" in update: |
|
chat_id = update["message"]["chat"]["id"] |
|
text = update["message"].get("text", "").strip() |
|
|
|
if text.startswith("/setting"): |
|
return handle_setting(chat_id) |
|
elif text.startswith("/start"): |
|
return handle_start(chat_id) |
|
elif text.startswith("/help"): |
|
return handle_help(chat_id) |
|
elif text.startswith("/logs"): |
|
return logs_history(chat_id) |
|
elif text.startswith("/pause"): |
|
return pause_stream(chat_id) |
|
elif text.startswith("/resume"): |
|
return resume_stream(chat_id) |
|
elif text.startswith("/abort"): |
|
return abort_stream(chat_id) |
|
elif text.startswith("/status"): |
|
return stream_status(chat_id) |
|
else: |
|
return handle_conversation(chat_id, text) |
|
|
|
|
|
elif "callback_query" in update: |
|
callback_data = update["callback_query"]["data"] |
|
chat_id = update["callback_query"]["message"]["chat"]["id"] |
|
message_id = update["callback_query"]["message"]["message_id"] |
|
|
|
if callback_data == "pause": |
|
response = pause_stream(chat_id) |
|
elif callback_data == "resume": |
|
response = resume_stream(chat_id) |
|
elif callback_data == "abort": |
|
response = abort_stream(chat_id) |
|
elif callback_data == "status": |
|
response = stream_status(chat_id) |
|
elif callback_data == "start_stream": |
|
response = start_streaming(chat_id) |
|
else: |
|
response = send_guide_message(chat_id, "❓ Unknown callback command.") |
|
|
|
|
|
if callback_data in ["pause", "resume", "abort", "status", "start_stream"]: |
|
response["method"] = "editMessageText" |
|
response["message_id"] = message_id |
|
response["text"] = compose_streaming_message() |
|
response["parse_mode"] = "HTML" |
|
response["reply_markup"] = get_inline_keyboard_for_stream() |
|
return response |
|
|
|
return {"status": "ok"} |
|
|