| import re |
| from urllib import parse |
|
|
| from pydantic import HttpUrl |
|
|
| from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url, get_original_scheme |
|
|
|
|
| class M3U8Processor: |
| def __init__(self, request, key_url: HttpUrl = None): |
| """ |
| Initializes the M3U8Processor with the request and URL prefix. |
| |
| Args: |
| request (Request): The incoming HTTP request. |
| key_url (HttpUrl, optional): The URL of the key server. Defaults to None. |
| """ |
| self.request = request |
| self.key_url = key_url |
| self.mediaflow_proxy_url = str(request.url_for("hls_stream_proxy").replace(scheme=get_original_scheme(request))) |
|
|
| async def process_m3u8(self, content: str, base_url: str) -> str: |
| """ |
| Processes the m3u8 content, proxying URLs and handling key lines. |
| |
| Args: |
| content (str): The m3u8 content to process. |
| base_url (str): The base URL to resolve relative URLs. |
| |
| Returns: |
| str: The processed m3u8 content. |
| """ |
| lines = content.splitlines() |
| processed_lines = [] |
| for line in lines: |
| if "URI=" in line: |
| processed_lines.append(await self.process_key_line(line, base_url)) |
| elif not line.startswith("#") and line.strip(): |
| processed_lines.append(await self.proxy_url(line, base_url)) |
| else: |
| processed_lines.append(line) |
| return "\n".join(processed_lines) |
|
|
| async def process_key_line(self, line: str, base_url: str) -> str: |
| """ |
| Processes a key line in the m3u8 content, proxying the URI. |
| |
| Args: |
| line (str): The key line to process. |
| base_url (str): The base URL to resolve relative URLs. |
| |
| Returns: |
| str: The processed key line. |
| """ |
| uri_match = re.search(r'URI="([^"]+)"', line) |
| if uri_match: |
| original_uri = uri_match.group(1) |
| uri = parse.urlparse(original_uri) |
| if self.key_url: |
| uri = uri._replace(scheme=self.key_url.scheme, netloc=self.key_url.host) |
| new_uri = await self.proxy_url(uri.geturl(), base_url) |
| line = line.replace(f'URI="{original_uri}"', f'URI="{new_uri}"') |
| return line |
|
|
| async def proxy_url(self, url: str, base_url: str) -> str: |
| """ |
| Proxies a URL, encoding it with the MediaFlow proxy URL. |
| |
| Args: |
| url (str): The URL to proxy. |
| base_url (str): The base URL to resolve relative URLs. |
| |
| Returns: |
| str: The proxied URL. |
| """ |
| full_url = parse.urljoin(base_url, url) |
|
|
| return encode_mediaflow_proxy_url( |
| self.mediaflow_proxy_url, |
| "", |
| full_url, |
| query_params=dict(self.request.query_params), |
| ) |
|
|