Spaces:
Runtime error
Runtime error
from __future__ import annotations | |
import json | |
from functools import partialmethod | |
from typing import AsyncGenerator | |
from urllib.parse import urlparse | |
from curl_cffi.requests import AsyncSession, Session, Response | |
from .webdriver import WebDriver, WebDriverSession, bypass_cloudflare, get_driver_cookies | |
class StreamResponse: | |
""" | |
A wrapper class for handling asynchronous streaming responses. | |
Attributes: | |
inner (Response): The original Response object. | |
""" | |
def __init__(self, inner: Response) -> None: | |
"""Initialize the StreamResponse with the provided Response object.""" | |
self.inner: Response = inner | |
async def text(self) -> str: | |
"""Asynchronously get the response text.""" | |
return await self.inner.atext() | |
def raise_for_status(self) -> None: | |
"""Raise an HTTPError if one occurred.""" | |
self.inner.raise_for_status() | |
async def json(self, **kwargs) -> dict: | |
"""Asynchronously parse the JSON response content.""" | |
return json.loads(await self.inner.acontent(), **kwargs) | |
async def iter_lines(self) -> AsyncGenerator[bytes, None]: | |
"""Asynchronously iterate over the lines of the response.""" | |
async for line in self.inner.aiter_lines(): | |
yield line | |
async def iter_content(self) -> AsyncGenerator[bytes, None]: | |
"""Asynchronously iterate over the response content.""" | |
async for chunk in self.inner.aiter_content(): | |
yield chunk | |
async def __aenter__(self): | |
"""Asynchronously enter the runtime context for the response object.""" | |
inner: Response = await self.inner | |
self.inner = inner | |
self.request = inner.request | |
self.status_code: int = inner.status_code | |
self.reason: str = inner.reason | |
self.ok: bool = inner.ok | |
self.headers = inner.headers | |
self.cookies = inner.cookies | |
return self | |
async def __aexit__(self, *args): | |
"""Asynchronously exit the runtime context for the response object.""" | |
await self.inner.aclose() | |
class StreamSession(AsyncSession): | |
""" | |
An asynchronous session class for handling HTTP requests with streaming. | |
Inherits from AsyncSession. | |
""" | |
def request( | |
self, method: str, url: str, **kwargs | |
) -> StreamResponse: | |
"""Create and return a StreamResponse object for the given HTTP request.""" | |
return StreamResponse(super().request(method, url, stream=True, **kwargs)) | |
# Defining HTTP methods as partial methods of the request method. | |
head = partialmethod(request, "HEAD") | |
get = partialmethod(request, "GET") | |
post = partialmethod(request, "POST") | |
put = partialmethod(request, "PUT") | |
patch = partialmethod(request, "PATCH") | |
delete = partialmethod(request, "DELETE") | |
def get_session_from_browser(url: str, webdriver: WebDriver = None, proxy: str = None, timeout: int = 120) -> Session: | |
""" | |
Create a Session object using a WebDriver to handle cookies and headers. | |
Args: | |
url (str): The URL to navigate to using the WebDriver. | |
webdriver (WebDriver, optional): The WebDriver instance to use. | |
proxy (str, optional): Proxy server to use for the Session. | |
timeout (int, optional): Timeout in seconds for the WebDriver. | |
Returns: | |
Session: A Session object configured with cookies and headers from the WebDriver. | |
""" | |
with WebDriverSession(webdriver, "", proxy=proxy, virtual_display=True) as driver: | |
bypass_cloudflare(driver, url, timeout) | |
cookies = get_driver_cookies(driver) | |
user_agent = driver.execute_script("return navigator.userAgent") | |
parse = urlparse(url) | |
return Session( | |
cookies=cookies, | |
headers={ | |
'accept': '*/*', | |
'authority': parse.netloc, | |
'origin': f'{parse.scheme}://{parse.netloc}', | |
'referer': url, | |
'sec-fetch-dest': 'empty', | |
'sec-fetch-mode': 'cors', | |
'sec-fetch-site': 'same-origin', | |
'user-agent': user_agent | |
}, | |
proxies={"https": proxy, "http": proxy}, | |
timeout=timeout, | |
impersonate="chrome110" | |
) |