aiavatartest / stream_server.py
Spanicin's picture
Update stream_server.py
911e927 verified
raw
history blame
9.25 kB
import os
import subprocess
import pathlib
# from fastapi import HTTPException
import asyncio
# Define base directories
BASE_DIR = pathlib.Path('./videos').resolve()
HLS_DIR = pathlib.Path('./hls_videos').resolve()
HLS_DIR.mkdir(exist_ok=True)
# Keep track of video names added to the queue
video_names = []
segment_counter = 1
total_processed_duration = 0
segment_lock = asyncio.Lock()
def is_valid_path(video_name):
"""
Validates the video path to prevent directory traversal attacks.
Args:
video_name (str): Name of the video file.
Returns:
bool: True if valid, False otherwise.
"""
video_path = (BASE_DIR / video_name).resolve()
return str(video_path).startswith(str(BASE_DIR))
def convert_to_hls(input_path, output_dir, video_name, start_number):
"""
Converts an MP4 video to HLS format with 3-second segments starting from `start_number`.
Args:
input_path (str): Path to the input MP4 video.
output_dir (str): Directory where HLS files will be stored.
video_name (str): Name of the video (used for playlist naming).
start_number (int): The starting number for HLS segments.
Returns:
int: Number of segments generated.
"""
# FFmpeg command to convert MP4 to HLS with 3-second segments
cmd = [
'ffmpeg',
'-y', # Overwrite output files without asking
'-i', input_path, # Input file
#'-codec', 'copy', # Audio codec-vf scale=1280:720 -r 30 -c:v libx264 -b:v 1500k -c:a aac -b:a 128k
'-vf','scale=1280:720',
'-r','30',
'-c:v','libx264',
'-b:v','1500k',
'-c:a','aac',
'-b:a','128k',
'-hls_time', '3', # Segment duration in seconds
'-hls_playlist_type', 'vod', # Video on Demand playlist type
'-start_number', str(start_number), # Starting segment number
'-hls_segment_filename', os.path.join(output_dir, 'video_stream%d.ts'), # Segment file naming
os.path.join(output_dir, f'{video_name}.m3u8') # Output playlist
]
try:
# Execute the FFmpeg command
subprocess.run(cmd, check=True, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
# Log FFmpeg errors and raise an HTTP exception
error_message = e.stderr.decode()
print(f"FFmpeg error: {error_message}")
raise HTTPException(status_code=500, detail="Video conversion failed")
async def add_video(video_name, output_path, audio_duration):
"""
Endpoint to add a video to the streaming queue.
Args:
video_name (str): Name of the video file to add.
request (Request): FastAPI request object to extract base URL.
Returns:
dict: Success message.
"""
global total_processed_duration
print((HLS_DIR / f'{video_name}.m3u8').exists())
if not (HLS_DIR / f'{video_name}.m3u8').exists():
async with segment_lock:
global segment_counter
current_start_number = segment_counter
num_segments_before = len([f for f in os.listdir(HLS_DIR) if f.startswith('video_stream') and f.endswith('.ts')])
await asyncio.get_event_loop().run_in_executor(
None, convert_to_hls, str(video_name), str(HLS_DIR), video_name, current_start_number
)
num_segments_after = len([f for f in os.listdir(HLS_DIR) if f.startswith('video_stream') and f.endswith('.ts')])
num_new_segments = num_segments_after - num_segments_before
# Update the global segment counter
segment_counter += num_new_segments
video_names.append(video_name)
segment_duration = 3
full_segments = int(audio_duration // segment_duration)
remaining_duration = audio_duration % segment_duration
existing_segments = set()
if output_path:
with open(output_path, 'r') as m3u8_file:
for line in m3u8_file:
if line.startswith('/live_stream/video_stream'):
existing_segments.add(line.strip())
new_segments = []
# Generate new segment paths
for i in range(1, full_segments + 1):
new_segment_path = f"/live_stream/video_stream{i}.ts"
if new_segment_path not in existing_segments:
new_segments.append(new_segment_path)
total_processed_duration += segment_duration
with open(output_path, 'a') as m3u8_file:
for segment in new_segments:
m3u8_file.write(f"#EXTINF:{segment_duration:.3f},\n")
m3u8_file.write(f"{segment}\n")
if remaining_duration > 0:
remaining_segment_path = f"/live_stream/video_stream{full_segments + 1}.ts"
if remaining_segment_path not in existing_segments:
m3u8_file.write(f"#EXTINF:{remaining_duration:.3f},\n")
m3u8_file.write(f"{remaining_segment_path}\n")
total_processed_duration += remaining_duration
# Add #EXT-X-ENDLIST only once after all segments have been added
if total_processed_duration == audio_duration:
with open(output_path, 'a') as m3u8_file:
m3u8_file.write("#EXT-X-ENDLIST\n")
total_processed_duration = 0
return {"message": f'"{video_name}" added to the streaming queue.'}
async def concatenate_playlists(video_names, base_dir):
"""
Concatenates multiple HLS playlists into a single playlist with unique segment numbering.
Args:
video_names (list): List of video names added to the queue.
base_dir (str): Base directory where HLS files are stored.
request (Request): FastAPI request object to extract base URL.
"""
concatenated_playlist_path = os.path.join(base_dir, 'master.m3u8')
max_segment_duration = 3 # Since we set hls_time to 3
segment_lines = [] # To store segment lines
# Construct base URL from the incoming request
for video_name in video_names:
video_playlist_path = os.path.join(base_dir, f'{video_name}.m3u8')
if os.path.exists(video_playlist_path):
with open(video_playlist_path, 'r') as infile:
lines = infile.readlines()
for line in lines:
line = line.strip()
if line.startswith('#EXTINF'):
# Append EXTINF line
segment_lines.append(line)
elif line.endswith('.ts'):
segment_file = line
# Update segment URI to include full URL
segment_path = f'{segment_file}'
segment_lines.append(segment_path)
elif line.startswith('#EXT-X-BYTERANGE'):
# Include byte range if present
segment_lines.append(line)
elif line.startswith('#EXT-X-ENDLIST'):
# Do not include this here; we'll add it at the end
continue
elif line.startswith('#EXTM3U') or line.startswith('#EXT-X-VERSION') or line.startswith('#EXT-X-PLAYLIST-TYPE') or line.startswith('#EXT-X-TARGETDURATION') or line.startswith('#EXT-X-MEDIA-SEQUENCE'):
# Skip these tags; they'll be added in the concatenated playlist
continue
else:
# Include any other necessary tags
segment_lines.append(line)
# Write the concatenated playlist
with open(concatenated_playlist_path, 'w') as outfile:
outfile.write('#EXTM3U\n')
outfile.write('#EXT-X-PLAYLIST-TYPE:VOD\n')
outfile.write(f'#EXT-X-TARGETDURATION:{max_segment_duration}\n')
outfile.write('#EXT-X-VERSION:4\n')
outfile.write(f'#EXT-X-MEDIA-SEQUENCE:{1}\n') # Starting from segment number 1
for line in segment_lines:
outfile.write(f'{line}\n')
outfile.write('#EXT-X-ENDLIST\n')
def generate_m3u8(video_duration, output_path,segment_duration=3):
# Initialize playlist content
m3u8_content = "#EXTM3U\n"
m3u8_content += "#EXT-X-PLAYLIST-TYPE:VOD\n"
m3u8_content += f"#EXT-X-TARGETDURATION:{segment_duration}\n"
m3u8_content += "#EXT-X-VERSION:4\n"
m3u8_content += "#EXT-X-MEDIA-SEQUENCE:1\n"
# # Calculate the number of full segments and the remaining duration
# segment_duration = int(segment_duration)
# full_segments = int(video_duration // segment_duration)
# remaining_duration = video_duration % segment_duration
# # Add full segments to the playlist
# for i in range(full_segments):
# m3u8_content += f"#EXTINF:{segment_duration:.6f},\n"
# m3u8_content += f"/live_stream/video_stream{i + 1}.ts\n"
# # Add the remaining segment if there's any leftover duration
# if remaining_duration > 0:
# m3u8_content += f"#EXTINF:{remaining_duration:.6f},\n"
# m3u8_content += f"/live_stream/video_stream{full_segments + 1}.ts\n"
# # End the playlist
# m3u8_content += "#EXT-X-ENDLIST\n"
with open(output_path, "w") as file:
file.write(m3u8_content)
print(f"M3U8 playlist saved to {output_path}")