import logging import threading import time import datetime import traceback import fractions from fastapi import FastAPI, Request import av app = FastAPI() # ------------------------------------------------------------------- # Configuration & Global Variables # ------------------------------------------------------------------- # (No token or outgoing HTTP calls are used in this version.) # Conversation state user_inputs = {} # The conversation fields will depend on the mode. # Simple mode (default): Only "input_url" and "output_url" are required. # Advanced mode (if user sends /setting): Additional fields are required. conversation_fields = [] current_step = None advanced_mode = False # Default settings for advanced fields default_settings = { "quality_settings": "medium", "video_codec": "libx264", "audio_codec": "aac", "output_url": "rtmp://a.rtmp.youtube.com/live2" } # Streaming state & statistics streaming_state = "idle" # "idle", "streaming", "paused", "stopped" stream_chat_id = None # Chat ID for periodic updates stream_start_time = None frames_encoded = 0 bytes_sent = 0 # Stream resource objects video_stream = None audio_stream_in = None output_stream = None # Thread references stream_thread = None live_log_thread = None # Live logging globals live_log_lines = [] # Rolling list (max 50 log lines) live_log_display = "" # Global variable updated every second by live_log_updater error_notification = "" # Global variable to hold error details if any # ------------------------------------------------------------------- # Enhanced Logging Setup # ------------------------------------------------------------------- logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger() def append_live_log(line: str): global live_log_lines live_log_lines.append(line) if len(live_log_lines) > 50: live_log_lines.pop(0) class ListHandler(logging.Handler): def emit(self, record): log_entry = self.format(record) append_live_log(log_entry) list_handler = ListHandler() list_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S")) logger.addHandler(list_handler) # ------------------------------------------------------------------- # Helper Function: Compose Streaming Message (with extra stats) # ------------------------------------------------------------------- def compose_streaming_message(): global live_log_display, error_notification, streaming_state, frames_encoded, bytes_sent stats = f"State: {streaming_state} | Uptime: {get_uptime()} | Frames: {frames_encoded} | Bytes: {bytes_sent}\n" msg = "" if error_notification: msg += "ERROR: " + error_notification + "\n\n" msg += "🚀 Streaming in progress!\n" msg += stats + "\n" msg += "Live Logs:\n" + live_log_display + "\n\nUse the inline keyboard to control the stream." return msg # ------------------------------------------------------------------- # Utility Functions & UI Helpers # ------------------------------------------------------------------- def create_html_message(text: str): # Wrap text in
tags for monospaced output using HTML parse mode return {"parse_mode": "HTML", "text": f"{text}"} def get_inline_keyboard_for_stream(): # Inline keyboard for streaming controls after the stream has started keyboard = { "inline_keyboard": [ [ {"text": "⏸ Pause", "callback_data": "pause"}, {"text": "▶️ Resume", "callback_data": "resume"}, {"text": "⏹ Abort", "callback_data": "abort"} ], [ {"text": "📊 Status", "callback_data": "status"} ] ] } return keyboard def get_inline_keyboard_for_start(): # Inline keyboard with a start button for when conversation is complete. keyboard = { "inline_keyboard": [ [ {"text": "🚀 Start Streaming", "callback_data": "start_stream"} ] ] } return keyboard def help_text(): return ( "*Stream Bot Help*\n\n" "*/start* - Begin setup for streaming (simple mode: only Input & Output URL)\n" "*/setting* - Enter advanced settings (Input URL, Quality Settings, Video Codec, Audio Codec, Output URL)\n" "*/help* - Display this help text\n" "*/logs* - Show the log history (live log display)\n\n" "After inputs are collected, press the inline *Start Streaming* button.\n\n" "While streaming, you can use inline buttons or commands:\n" "*/pause* - Pause the stream\n" "*/resume* - Resume a paused stream\n" "*/abort* - Abort the stream\n" "*/status* - Get current stream statistics" ) def send_guide_message(chat_id, message): # Return a response dictionary to be sent as the webhook reply logging.info(f"Sending message to chat {chat_id}: {message}") return { "method": "sendMessage", "chat_id": chat_id, "text": message, "parse_mode": "Markdown" } def reset_statistics(): global stream_start_time, frames_encoded, bytes_sent stream_start_time = datetime.datetime.now() frames_encoded = 0 bytes_sent = 0 def get_uptime(): if stream_start_time: uptime = datetime.datetime.now() - stream_start_time return str(uptime).split('.')[0] return "0" def validate_inputs(): # Ensure all fields in conversation_fields have been provided missing = [field for field in conversation_fields if field not in user_inputs or not user_inputs[field]] if missing: return False, f"Missing fields: {', '.join(missing)}" return True, "" # ------------------------------------------------------------------- # Error Notification Helper # ------------------------------------------------------------------- def notify_error(chat_id, error_message): global error_notification error_notification = error_message logging.error(f"Error for chat {chat_id}: {error_message}") # ------------------------------------------------------------------- # Live Log Updater (Background Thread) # ------------------------------------------------------------------- def live_log_updater(): global live_log_display, streaming_state try: while streaming_state in ["streaming", "paused"]: # Update the global live_log_display with the last 15 log lines in HTML format live_log_display = "" + "\n".join(live_log_lines[-15:]) + "" time.sleep(1) except Exception as e: logging.error(f"Error in live log updater: {e}") # ------------------------------------------------------------------- # Logs History Handler (/logs) # ------------------------------------------------------------------- def logs_history(chat_id): global live_log_display, error_notification log_text = live_log_display if live_log_display else "No logs available yet." if error_notification: if log_text.startswith(""): log_text = f"ERROR: {error_notification}\n\n" + log_text[5:] else: log_text = f"ERROR: {error_notification}\n\n{log_text}" return { "method": "sendMessage", "chat_id": chat_id, "text": log_text, "parse_mode": "HTML" } # ------------------------------------------------------------------- # Conversation Handlers # ------------------------------------------------------------------- def handle_start(chat_id): global current_step, user_inputs, conversation_fields, advanced_mode # By default, use simple mode (unless advanced_mode was set via /setting) user_inputs = {} if not advanced_mode: conversation_fields = ["input_url", "output_url"] else: conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"] current_step = conversation_fields[0] text = ("👋 *Welcome to the Stream Bot!*\n\n" "Let's set up your stream.\n" f"Please enter the *{current_step.replace('_', ' ')}*" f"{' (no default)' if current_step not in default_settings else f' _(default: {default_settings[current_step]})_'}:") logging.info(f"/start command from chat {chat_id} (advanced_mode={advanced_mode})") return { "method": "sendMessage", "chat_id": chat_id, "text": text, "parse_mode": "Markdown" } def handle_setting(chat_id): global advanced_mode, conversation_fields, current_step, user_inputs advanced_mode = True conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"] user_inputs = {} current_step = conversation_fields[0] text = ("⚙️ *Advanced Mode Activated!*\n\n" "Please enter the *input url*:") logging.info(f"/setting command from chat {chat_id} - advanced mode enabled") return { "method": "sendMessage", "chat_id": chat_id, "text": text, "parse_mode": "Markdown" } def handle_help(chat_id): logging.info(f"/help command from chat {chat_id}") return { "method": "sendMessage", "chat_id": chat_id, "text": help_text(), "parse_mode": "Markdown" } def handle_conversation(chat_id, text): global current_step, user_inputs, conversation_fields if current_step: if text.strip() == "" and current_step in default_settings: user_inputs[current_step] = default_settings[current_step] logging.info(f"Using default for {current_step}: {default_settings[current_step]}") else: user_inputs[current_step] = text.strip() logging.info(f"Received {current_step}: {text.strip()}") idx = conversation_fields.index(current_step) if idx < len(conversation_fields) - 1: current_step = conversation_fields[idx + 1] prompt = f"Please enter the *{current_step.replace('_', ' ')}*" if current_step in default_settings: prompt += f" _(default: {default_settings[current_step]})_" return send_guide_message(chat_id, prompt) else: current_step = None valid, msg = validate_inputs() if not valid: return send_guide_message(chat_id, f"Validation error: {msg}") if not advanced_mode: user_inputs.setdefault("quality_settings", default_settings["quality_settings"]) user_inputs.setdefault("video_codec", default_settings["video_codec"]) user_inputs.setdefault("audio_codec", default_settings["audio_codec"]) return { "method": "sendMessage", "chat_id": chat_id, "text": "All inputs received. Press *🚀 Start Streaming* to begin.", "reply_markup": get_inline_keyboard_for_start(), "parse_mode": "Markdown" } else: return send_guide_message(chat_id, "Unrecognized input. Type /help for available commands.") # ------------------------------------------------------------------- # Background Streaming Functions # ------------------------------------------------------------------- def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, output_url, chat_id): global video_stream, audio_stream_in, output_stream, streaming_state, frames_encoded, bytes_sent logging.info("Initiating streaming to YouTube") try: streaming_state = "streaming" reset_statistics() input_stream = av.open(input_url) output_stream = av.open(output_url, mode='w', format='flv') # Configure video stream video_stream = output_stream.add_stream(video_codec, rate=30) video_stream.width = input_stream.streams.video[0].width video_stream.height = input_stream.streams.video[0].height video_stream.pix_fmt = input_stream.streams.video[0].format.name video_stream.codec_context.options.update({'g': '30'}) if quality_settings.lower() == "high": video_stream.bit_rate = 3000000 video_stream.bit_rate_tolerance = 1000000 elif quality_settings.lower() == "medium": video_stream.bit_rate = 1500000 video_stream.bit_rate_tolerance = 500000 elif quality_settings.lower() == "low": video_stream.bit_rate = 800000 video_stream.bit_rate_tolerance = 200000 # Configure audio stream audio_stream_in = input_stream.streams.audio[0] out_audio_stream = output_stream.add_stream(audio_codec, rate=audio_stream_in.rate) out_audio_stream.layout = "stereo" # video_stream.codec_context.time_base = fractions.Fraction(1, video_stream.rate) video_stream.codec_context.time_base = fractions.Fraction(1, 30) logging.info("Streaming started successfully.") # Start the live log updater in a background thread if not already running. global live_log_thread if live_log_thread is None or not live_log_thread.is_alive(): live_log_thread = threading.Thread(target=live_log_updater) live_log_thread.daemon = True live_log_thread.start() logging.info("Live log updater thread started.") # Stream loop: process packets until state changes while streaming_state in ["streaming", "paused"]: for packet in input_stream.demux(): if streaming_state == "stopped": break if packet.stream == input_stream.streams.video[0]: for frame in packet.decode(): if streaming_state == "paused": time.sleep(0.5) continue for out_packet in video_stream.encode(frame): output_stream.mux(out_packet) frames_encoded += 1 if hasattr(out_packet, "size"): bytes_sent += out_packet.size elif packet.stream == audio_stream_in: for frame in packet.decode(): if streaming_state == "paused": time.sleep(0.5) continue for out_packet in out_audio_stream.encode(frame): output_stream.mux(out_packet) if hasattr(out_packet, "size"): bytes_sent += out_packet.size # Flush remaining packets for out_packet in video_stream.encode(): output_stream.mux(out_packet) for out_packet in out_audio_stream.encode(): output_stream.mux(out_packet) if streaming_state == "paused": time.sleep(1) # Clean up resources try: video_stream.close() out_audio_stream.close() output_stream.close() input_stream.close() except Exception as cleanup_error: logging.error(f"Error during cleanup: {cleanup_error}") logging.info("Streaming complete, resources cleaned up.") streaming_state = "idle" except Exception as e: error_message = f"An error occurred during streaming: {str(e)}\n\n{traceback.format_exc()}" logging.error(error_message) streaming_state = "idle" notify_error(chat_id, error_message) def start_streaming(chat_id): global stream_thread, stream_chat_id valid, msg = validate_inputs() if not valid: return send_guide_message(chat_id, f"Validation error: {msg}") stream_chat_id = chat_id try: stream_thread = threading.Thread( target=stream_to_youtube, args=( user_inputs["input_url"], user_inputs["quality_settings"], user_inputs["video_codec"], user_inputs["audio_codec"], user_inputs["output_url"], chat_id, ) ) stream_thread.daemon = True stream_thread.start() logging.info("Streaming thread started.") # Immediately return a message that includes the live log display via compose_streaming_message() return { "method": "sendMessage", "chat_id": chat_id, "text": "🚀 Streaming initiated!\n\n" + compose_streaming_message(), "reply_markup": get_inline_keyboard_for_stream(), "parse_mode": "HTML" } except Exception as e: error_message = f"Failed to start streaming: {str(e)}" logging.error(error_message) notify_error(chat_id, error_message) return send_guide_message(chat_id, error_message) # ------------------------------------------------------------------- # Stream Control Handlers # ------------------------------------------------------------------- def pause_stream(chat_id): global streaming_state if streaming_state == "streaming": streaming_state = "paused" logging.info("Streaming paused.") return { "method": "sendMessage", "chat_id": chat_id, "text": "⏸ Streaming paused.", "parse_mode": "HTML" } return send_guide_message(chat_id, "Streaming is not active.") def resume_stream(chat_id): global streaming_state if streaming_state == "paused": streaming_state = "streaming" logging.info("Streaming resumed.") return { "method": "sendMessage", "chat_id": chat_id, "text": "▶️ Streaming resumed.", "parse_mode": "HTML" } return send_guide_message(chat_id, "Streaming is not paused.") def abort_stream(chat_id): global streaming_state if streaming_state in ["streaming", "paused"]: streaming_state = "stopped" logging.info("Streaming aborted by user.") return { "method": "sendMessage", "chat_id": chat_id, "text": "⏹ Streaming aborted.", "parse_mode": "HTML" } return send_guide_message(chat_id, "No active streaming to abort.") def stream_status(chat_id): stats = ( f"*Stream Status:*\n\n" f"• **State:** {streaming_state}\n" f"• **Uptime:** {get_uptime()}\n" f"• **Frames Encoded:** {frames_encoded}\n" f"• **Bytes Sent:** {bytes_sent}\n" ) return { "method": "sendMessage", "chat_id": chat_id, "text": stats, "parse_mode": "Markdown" } # ------------------------------------------------------------------- # FastAPI Webhook Endpoint for Telegram Updates # ------------------------------------------------------------------- @app.post("/webhook") async def telegram_webhook(request: Request): update = await request.json() logging.debug(f"Received update: {update}") # Process messages from users if "message" in update: chat_id = update["message"]["chat"]["id"] text = update["message"].get("text", "").strip() if text.startswith("/setting"): return handle_setting(chat_id) elif text.startswith("/start"): return handle_start(chat_id) elif text.startswith("/help"): return handle_help(chat_id) elif text.startswith("/logs"): return logs_history(chat_id) elif text.startswith("/pause"): return pause_stream(chat_id) elif text.startswith("/resume"): return resume_stream(chat_id) elif text.startswith("/abort"): return abort_stream(chat_id) elif text.startswith("/status"): return stream_status(chat_id) else: return handle_conversation(chat_id, text) # Process inline keyboard callback queries elif "callback_query" in update: callback_data = update["callback_query"]["data"] chat_id = update["callback_query"]["message"]["chat"]["id"] message_id = update["callback_query"]["message"]["message_id"] if callback_data == "pause": response = pause_stream(chat_id) elif callback_data == "resume": response = resume_stream(chat_id) elif callback_data == "abort": response = abort_stream(chat_id) elif callback_data == "status": response = stream_status(chat_id) elif callback_data == "start_stream": response = start_streaming(chat_id) else: response = send_guide_message(chat_id, "❓ Unknown callback command.") # Always update the message text with the latest live logs and keep the inline keyboard if callback_data in ["pause", "resume", "abort", "status", "start_stream"]: response["method"] = "editMessageText" response["message_id"] = message_id response["text"] = compose_streaming_message() response["parse_mode"] = "HTML" response["reply_markup"] = get_inline_keyboard_for_stream() return response return {"status": "ok"}