mhdzumair's picture
Add Support for play expired or self-signed SSL certificates server streams
0e2d63a
import logging
import math
import re
from datetime import datetime, timedelta, timezone
from typing import List, Dict
from urllib.parse import urljoin
import xmltodict
logger = logging.getLogger(__name__)
def parse_mpd(mpd_content: str | bytes) -> dict:
"""
Parses the MPD content into a dictionary.
Args:
mpd_content (str | bytes): The MPD content to parse.
Returns:
dict: The parsed MPD content as a dictionary.
"""
return xmltodict.parse(mpd_content)
def parse_mpd_dict(
mpd_dict: dict, mpd_url: str, parse_drm: bool = True, parse_segment_profile_id: str | None = None
) -> dict:
"""
Parses the MPD dictionary and extracts relevant information.
Args:
mpd_dict (dict): The MPD content as a dictionary.
mpd_url (str): The URL of the MPD manifest.
parse_drm (bool, optional): Whether to parse DRM information. Defaults to True.
parse_segment_profile_id (str, optional): The profile ID to parse segments for. Defaults to None.
Returns:
dict: The parsed MPD information including profiles and DRM info.
This function processes the MPD dictionary to extract profiles, DRM information, and other relevant data.
It handles both live and static MPD manifests.
"""
profiles = []
parsed_dict = {}
source = "/".join(mpd_url.split("/")[:-1])
is_live = mpd_dict["MPD"].get("@type", "static").lower() == "dynamic"
parsed_dict["isLive"] = is_live
media_presentation_duration = mpd_dict["MPD"].get("@mediaPresentationDuration")
# Parse additional MPD attributes for live streams
if is_live:
parsed_dict["minimumUpdatePeriod"] = parse_duration(mpd_dict["MPD"].get("@minimumUpdatePeriod", "PT0S"))
parsed_dict["timeShiftBufferDepth"] = parse_duration(mpd_dict["MPD"].get("@timeShiftBufferDepth", "PT2M"))
parsed_dict["availabilityStartTime"] = datetime.fromisoformat(
mpd_dict["MPD"]["@availabilityStartTime"].replace("Z", "+00:00")
)
parsed_dict["publishTime"] = datetime.fromisoformat(
mpd_dict["MPD"].get("@publishTime", "").replace("Z", "+00:00")
)
periods = mpd_dict["MPD"]["Period"]
periods = periods if isinstance(periods, list) else [periods]
for period in periods:
parsed_dict["PeriodStart"] = parse_duration(period.get("@start", "PT0S"))
for adaptation in period["AdaptationSet"]:
representations = adaptation["Representation"]
representations = representations if isinstance(representations, list) else [representations]
for representation in representations:
profile = parse_representation(
parsed_dict,
representation,
adaptation,
source,
media_presentation_duration,
parse_segment_profile_id,
)
if profile:
profiles.append(profile)
parsed_dict["profiles"] = profiles
if parse_drm:
drm_info = extract_drm_info(periods, mpd_url)
else:
drm_info = {}
parsed_dict["drmInfo"] = drm_info
return parsed_dict
def pad_base64(encoded_key_id):
"""
Pads a base64 encoded key ID to make its length a multiple of 4.
Args:
encoded_key_id (str): The base64 encoded key ID.
Returns:
str: The padded base64 encoded key ID.
"""
return encoded_key_id + "=" * (4 - len(encoded_key_id) % 4)
def extract_drm_info(periods: List[Dict], mpd_url: str) -> Dict:
"""
Extracts DRM information from the MPD periods.
Args:
periods (List[Dict]): The list of periods in the MPD.
mpd_url (str): The URL of the MPD manifest.
Returns:
Dict: The extracted DRM information.
This function processes the ContentProtection elements in the MPD to extract DRM system information,
such as ClearKey, Widevine, and PlayReady.
"""
drm_info = {"isDrmProtected": False}
for period in periods:
adaptation_sets: list[dict] | dict = period.get("AdaptationSet", [])
if not isinstance(adaptation_sets, list):
adaptation_sets = [adaptation_sets]
for adaptation_set in adaptation_sets:
# Check ContentProtection in AdaptationSet
process_content_protection(adaptation_set.get("ContentProtection", []), drm_info)
# Check ContentProtection inside each Representation
representations: list[dict] | dict = adaptation_set.get("Representation", [])
if not isinstance(representations, list):
representations = [representations]
for representation in representations:
process_content_protection(representation.get("ContentProtection", []), drm_info)
# If we have a license acquisition URL, make sure it's absolute
if "laUrl" in drm_info and not drm_info["laUrl"].startswith(("http://", "https://")):
drm_info["laUrl"] = urljoin(mpd_url, drm_info["laUrl"])
return drm_info
def process_content_protection(content_protection: list[dict] | dict, drm_info: dict):
"""
Processes the ContentProtection elements to extract DRM information.
Args:
content_protection (list[dict] | dict): The ContentProtection elements.
drm_info (dict): The dictionary to store DRM information.
This function updates the drm_info dictionary with DRM system information found in the ContentProtection elements.
"""
if not isinstance(content_protection, list):
content_protection = [content_protection]
for protection in content_protection:
drm_info["isDrmProtected"] = True
scheme_id_uri = protection.get("@schemeIdUri", "").lower()
if "clearkey" in scheme_id_uri:
drm_info["drmSystem"] = "clearkey"
if "clearkey:Laurl" in protection:
la_url = protection["clearkey:Laurl"].get("#text")
if la_url and "laUrl" not in drm_info:
drm_info["laUrl"] = la_url
elif "widevine" in scheme_id_uri or "edef8ba9-79d6-4ace-a3c8-27dcd51d21ed" in scheme_id_uri:
drm_info["drmSystem"] = "widevine"
pssh = protection.get("cenc:pssh", {}).get("#text")
if pssh:
drm_info["pssh"] = pssh
elif "playready" in scheme_id_uri or "9a04f079-9840-4286-ab92-e65be0885f95" in scheme_id_uri:
drm_info["drmSystem"] = "playready"
if "@cenc:default_KID" in protection:
key_id = protection["@cenc:default_KID"].replace("-", "")
if "keyId" not in drm_info:
drm_info["keyId"] = key_id
if "ms:laurl" in protection:
la_url = protection["ms:laurl"].get("@licenseUrl")
if la_url and "laUrl" not in drm_info:
drm_info["laUrl"] = la_url
return drm_info
def parse_representation(
parsed_dict: dict,
representation: dict,
adaptation: dict,
source: str,
media_presentation_duration: str,
parse_segment_profile_id: str | None,
) -> dict | None:
"""
Parses a representation and extracts profile information.
Args:
parsed_dict (dict): The parsed MPD data.
representation (dict): The representation data.
adaptation (dict): The adaptation set data.
source (str): The source URL.
media_presentation_duration (str): The media presentation duration.
parse_segment_profile_id (str, optional): The profile ID to parse segments for. Defaults to None.
Returns:
dict | None: The parsed profile information or None if not applicable.
"""
mime_type = _get_key(adaptation, representation, "@mimeType") or (
"video/mp4" if "avc" in representation["@codecs"] else "audio/mp4"
)
if "video" not in mime_type and "audio" not in mime_type:
return None
profile = {
"id": representation.get("@id") or adaptation.get("@id"),
"mimeType": mime_type,
"lang": representation.get("@lang") or adaptation.get("@lang"),
"codecs": representation.get("@codecs") or adaptation.get("@codecs"),
"bandwidth": int(representation.get("@bandwidth") or adaptation.get("@bandwidth")),
"startWithSAP": (_get_key(adaptation, representation, "@startWithSAP") or "1") == "1",
"mediaPresentationDuration": media_presentation_duration,
}
if "audio" in profile["mimeType"]:
profile["audioSamplingRate"] = representation.get("@audioSamplingRate") or adaptation.get("@audioSamplingRate")
profile["channels"] = representation.get("AudioChannelConfiguration", {}).get("@value", "2")
else:
profile["width"] = int(representation["@width"])
profile["height"] = int(representation["@height"])
frame_rate = representation.get("@frameRate") or adaptation.get("@maxFrameRate") or "30000/1001"
frame_rate = frame_rate if "/" in frame_rate else f"{frame_rate}/1"
profile["frameRate"] = round(int(frame_rate.split("/")[0]) / int(frame_rate.split("/")[1]), 3)
profile["sar"] = representation.get("@sar", "1:1")
if parse_segment_profile_id is None or profile["id"] != parse_segment_profile_id:
return profile
item = adaptation.get("SegmentTemplate") or representation.get("SegmentTemplate")
if item:
profile["segments"] = parse_segment_template(parsed_dict, item, profile, source)
else:
profile["segments"] = parse_segment_base(representation, source)
return profile
def _get_key(adaptation: dict, representation: dict, key: str) -> str | None:
"""
Retrieves a key from the representation or adaptation set.
Args:
adaptation (dict): The adaptation set data.
representation (dict): The representation data.
key (str): The key to retrieve.
Returns:
str | None: The value of the key or None if not found.
"""
return representation.get(key, adaptation.get(key, None))
def parse_segment_template(parsed_dict: dict, item: dict, profile: dict, source: str) -> List[Dict]:
"""
Parses a segment template and extracts segment information.
Args:
parsed_dict (dict): The parsed MPD data.
item (dict): The segment template data.
profile (dict): The profile information.
source (str): The source URL.
Returns:
List[Dict]: The list of parsed segments.
"""
segments = []
timescale = int(item.get("@timescale", 1))
# Initialization
if "@initialization" in item:
media = item["@initialization"]
media = media.replace("$RepresentationID$", profile["id"])
media = media.replace("$Bandwidth$", str(profile["bandwidth"]))
if not media.startswith("http"):
media = f"{source}/{media}"
profile["initUrl"] = media
# Segments
if "SegmentTimeline" in item:
segments.extend(parse_segment_timeline(parsed_dict, item, profile, source, timescale))
elif "@duration" in item:
segments.extend(parse_segment_duration(parsed_dict, item, profile, source, timescale))
return segments
def parse_segment_timeline(parsed_dict: dict, item: dict, profile: dict, source: str, timescale: int) -> List[Dict]:
"""
Parses a segment timeline and extracts segment information.
Args:
parsed_dict (dict): The parsed MPD data.
item (dict): The segment timeline data.
profile (dict): The profile information.
source (str): The source URL.
timescale (int): The timescale for the segments.
Returns:
List[Dict]: The list of parsed segments.
"""
timelines = item["SegmentTimeline"]["S"]
timelines = timelines if isinstance(timelines, list) else [timelines]
period_start = parsed_dict["availabilityStartTime"] + timedelta(seconds=parsed_dict.get("PeriodStart", 0))
presentation_time_offset = int(item.get("@presentationTimeOffset", 0))
start_number = int(item.get("@startNumber", 1))
segments = [
create_segment_data(timeline, item, profile, source, timescale)
for timeline in preprocess_timeline(timelines, start_number, period_start, presentation_time_offset, timescale)
]
return segments
def preprocess_timeline(
timelines: List[Dict], start_number: int, period_start: datetime, presentation_time_offset: int, timescale: int
) -> List[Dict]:
"""
Preprocesses the segment timeline data.
Args:
timelines (List[Dict]): The list of timeline segments.
start_number (int): The starting segment number.
period_start (datetime): The start time of the period.
presentation_time_offset (int): The presentation time offset.
timescale (int): The timescale for the segments.
Returns:
List[Dict]: The list of preprocessed timeline segments.
"""
processed_data = []
current_time = 0
for timeline in timelines:
repeat = int(timeline.get("@r", 0))
duration = int(timeline["@d"])
start_time = int(timeline.get("@t", current_time))
for _ in range(repeat + 1):
segment_start_time = period_start + timedelta(seconds=(start_time - presentation_time_offset) / timescale)
segment_end_time = segment_start_time + timedelta(seconds=duration / timescale)
processed_data.append(
{
"number": start_number,
"start_time": segment_start_time,
"end_time": segment_end_time,
"duration": duration,
"time": start_time,
}
)
start_time += duration
start_number += 1
current_time = start_time
return processed_data
def parse_segment_duration(parsed_dict: dict, item: dict, profile: dict, source: str, timescale: int) -> List[Dict]:
"""
Parses segment duration and extracts segment information.
This is used for static or live MPD manifests.
Args:
parsed_dict (dict): The parsed MPD data.
item (dict): The segment duration data.
profile (dict): The profile information.
source (str): The source URL.
timescale (int): The timescale for the segments.
Returns:
List[Dict]: The list of parsed segments.
"""
duration = int(item["@duration"])
start_number = int(item.get("@startNumber", 1))
segment_duration_sec = duration / timescale
if parsed_dict["isLive"]:
segments = generate_live_segments(parsed_dict, segment_duration_sec, start_number)
else:
segments = generate_vod_segments(profile, duration, timescale, start_number)
return [create_segment_data(seg, item, profile, source, timescale) for seg in segments]
def generate_live_segments(parsed_dict: dict, segment_duration_sec: float, start_number: int) -> List[Dict]:
"""
Generates live segments based on the segment duration and start number.
This is used for live MPD manifests.
Args:
parsed_dict (dict): The parsed MPD data.
segment_duration_sec (float): The segment duration in seconds.
start_number (int): The starting segment number.
Returns:
List[Dict]: The list of generated live segments.
"""
time_shift_buffer_depth = timedelta(seconds=parsed_dict.get("timeShiftBufferDepth", 60))
segment_count = math.ceil(time_shift_buffer_depth.total_seconds() / segment_duration_sec)
current_time = datetime.now(tz=timezone.utc)
earliest_segment_number = max(
start_number
+ math.floor((current_time - parsed_dict["availabilityStartTime"]).total_seconds() / segment_duration_sec)
- segment_count,
start_number,
)
return [
{
"number": number,
"start_time": parsed_dict["availabilityStartTime"]
+ timedelta(seconds=(number - start_number) * segment_duration_sec),
"duration": segment_duration_sec,
}
for number in range(earliest_segment_number, earliest_segment_number + segment_count)
]
def generate_vod_segments(profile: dict, duration: int, timescale: int, start_number: int) -> List[Dict]:
"""
Generates VOD segments based on the segment duration and start number.
This is used for static MPD manifests.
Args:
profile (dict): The profile information.
duration (int): The segment duration.
timescale (int): The timescale for the segments.
start_number (int): The starting segment number.
Returns:
List[Dict]: The list of generated VOD segments.
"""
total_duration = profile.get("mediaPresentationDuration") or 0
if isinstance(total_duration, str):
total_duration = parse_duration(total_duration)
segment_count = math.ceil(total_duration * timescale / duration)
return [{"number": start_number + i, "duration": duration / timescale} for i in range(segment_count)]
def create_segment_data(segment: Dict, item: dict, profile: dict, source: str, timescale: int | None = None) -> Dict:
"""
Creates segment data based on the segment information. This includes the segment URL and metadata.
Args:
segment (Dict): The segment information.
item (dict): The segment template data.
profile (dict): The profile information.
source (str): The source URL.
timescale (int, optional): The timescale for the segments. Defaults to None.
Returns:
Dict: The created segment data.
"""
media_template = item["@media"]
media = media_template.replace("$RepresentationID$", profile["id"])
media = media.replace("$Number%04d$", f"{segment['number']:04d}")
media = media.replace("$Number$", str(segment["number"]))
media = media.replace("$Bandwidth$", str(profile["bandwidth"]))
if "time" in segment and timescale is not None:
media = media.replace("$Time$", str(int(segment["time"] * timescale)))
if not media.startswith("http"):
media = f"{source}/{media}"
segment_data = {
"type": "segment",
"media": media,
"number": segment["number"],
}
if "start_time" in segment and "end_time" in segment:
segment_data.update(
{
"start_time": segment["start_time"],
"end_time": segment["end_time"],
"extinf": (segment["end_time"] - segment["start_time"]).total_seconds(),
"program_date_time": segment["start_time"].isoformat() + "Z",
}
)
elif "start_time" in segment and "duration" in segment:
duration = segment["duration"]
segment_data.update(
{
"start_time": segment["start_time"],
"end_time": segment["start_time"] + timedelta(seconds=duration),
"extinf": duration,
"program_date_time": segment["start_time"].isoformat() + "Z",
}
)
elif "duration" in segment:
segment_data["extinf"] = segment["duration"]
return segment_data
def parse_segment_base(representation: dict, source: str) -> List[Dict]:
"""
Parses segment base information and extracts segment data. This is used for single-segment representations.
Args:
representation (dict): The representation data.
source (str): The source URL.
Returns:
List[Dict]: The list of parsed segments.
"""
segment = representation["SegmentBase"]
start, end = map(int, segment["@indexRange"].split("-"))
if "Initialization" in segment:
start, _ = map(int, segment["Initialization"]["@range"].split("-"))
return [
{
"type": "segment",
"range": f"{start}-{end}",
"media": f"{source}/{representation['BaseURL']}",
}
]
def parse_duration(duration_str: str) -> float:
"""
Parses a duration ISO 8601 string into seconds.
Args:
duration_str (str): The duration string to parse.
Returns:
float: The parsed duration in seconds.
"""
pattern = re.compile(r"P(?:(\d+)Y)?(?:(\d+)M)?(?:(\d+)D)?T?(?:(\d+)H)?(?:(\d+)M)?(?:(\d+(?:\.\d+)?)S)?")
match = pattern.match(duration_str)
if not match:
raise ValueError(f"Invalid duration format: {duration_str}")
years, months, days, hours, minutes, seconds = [float(g) if g else 0 for g in match.groups()]
return years * 365 * 24 * 3600 + months * 30 * 24 * 3600 + days * 24 * 3600 + hours * 3600 + minutes * 60 + seconds