File size: 3,298 Bytes
de68d43
 
 
 
 
 
 
 
 
 
 
 
5b51e03
de68d43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b51e03
 
de68d43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from __future__ import annotations

from typing import TYPE_CHECKING, AsyncContextManager

from playwright.async_api import (  # noqa: F401
    Browser,
    BrowserContext,
    Page,
    TimeoutError,
    async_playwright,
)

from .models import GetContentModel, PageModel, ScreenshotModel  # noqa: TCH001

if TYPE_CHECKING:
    from types import TracebackType


class AsyncPlaywrightContext:
    async def __aenter__(self) -> AsyncContextManager:
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.firefox.launch(
            firefox_user_prefs={
                "extensions.enabledScopes": 1,
                "extensions.autoDisableScopes": 1,
                "dom.webdriver.enabled": False,
                "useAutomationExtension": False,
                "general.useragent.override": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36",  # noqa: E501
            },
        )
        return self

    async def new_browser_page(self, browser: Browser, page_model: PageModel) -> Page:
        return await browser.new_page(
            color_scheme=page_model.color_scheme,
            java_script_enabled=page_model.java_script_enabled,
            no_viewport=page_model.no_viewport,
            proxy=page_model.proxy.model_dump() if page_model.proxy else None,
            viewport=page_model.viewport.model_dump() if page_model.viewport else None,
        )

    async def screenshot(
        self,
        screenshot_model: ScreenshotModel,
        page_model: PageModel,
    ) -> bytes:
        page = await self.new_browser_page(browser=self.browser, page_model=page_model)

        await page.goto(str(screenshot_model.url))

        await page.wait_for_timeout(screenshot_model.ms_delay)

        screenshot_locator = (
            page.locator(screenshot_model.query_selector)
            if screenshot_model.query_selector
            else None
        )

        if screenshot_locator:
            if screenshot_model.wait_selector:
                await screenshot_locator.wait_for()
            screenshot_data: bytes = await screenshot_locator.screenshot()
        else:
            screenshot_data: bytes = await page.screenshot(full_page=screenshot_model.full_page)

        await page.close()
        return screenshot_data

    async def get_content(
        self,
        get_content_model: GetContentModel,
    ) -> str:
        page = await self.new_browser_page(browser=self.browser, page_model=page_model)
        
        await page.goto(str(get_content_model.url))
        await page.wait_for_timeout(get_content_model.ms_delay)

        wait_locator = (
            await page.locator(get_content_model.query_selector)
            if get_content_model.query_selector
            else None
        )

        if wait_locator:
            await wait_locator.wait_for()
        html = page.content()
        await page.close()
        return html

    async def __aexit__(
        self,
        typ: type[BaseException] | None,
        exc: BaseException | None,
        tb: TracebackType | None,
    ) -> None:
        if self.browser:
            await self.browser.close()
        if self.playwright:
            await self.playwright.stop()