Spaces:
Sleeping
Sleeping
| """ | |
| Agent 1: Token Extractor | |
| Design System Extractor v2 | |
| Persona: Meticulous Design Archaeologist | |
| Responsibilities: | |
| - Crawl pages at specified viewport | |
| - Extract computed styles from all elements | |
| - Parse CSS files for variables and rules | |
| - Extract colors from SVGs | |
| - Collect colors, typography, spacing, radius, shadows | |
| - Track frequency and context for each token | |
| """ | |
| import asyncio | |
| import re | |
| from typing import Optional, Callable | |
| from datetime import datetime | |
| from collections import defaultdict | |
| from playwright.async_api import async_playwright, Browser, Page, BrowserContext | |
| from core.token_schema import ( | |
| Viewport, | |
| ExtractedTokens, | |
| ColorToken, | |
| TypographyToken, | |
| SpacingToken, | |
| RadiusToken, | |
| ShadowToken, | |
| FontFamily, | |
| TokenSource, | |
| Confidence, | |
| ) | |
| from core.color_utils import ( | |
| normalize_hex, | |
| parse_color, | |
| get_contrast_with_white, | |
| get_contrast_with_black, | |
| check_wcag_compliance, | |
| ) | |
| from config.settings import get_settings | |
| class TokenExtractor: | |
| """ | |
| Extracts design tokens from web pages. | |
| This is the second part of Agent 1's job — after pages are confirmed, | |
| we crawl and extract all CSS values. | |
| Enhanced with: | |
| - CSS file parsing for variables and rules | |
| - SVG color extraction | |
| - Inline style extraction | |
| """ | |
| def __init__(self, viewport: Viewport = Viewport.DESKTOP): | |
| self.settings = get_settings() | |
| self.viewport = viewport | |
| self.browser: Optional[Browser] = None | |
| self.context: Optional[BrowserContext] = None | |
| # Token collection | |
| self.colors: dict[str, ColorToken] = {} | |
| self.typography: dict[str, TypographyToken] = {} | |
| self.spacing: dict[str, SpacingToken] = {} | |
| self.radius: dict[str, RadiusToken] = {} | |
| self.shadows: dict[str, ShadowToken] = {} | |
| # CSS Variables collection | |
| self.css_variables: dict[str, str] = {} | |
| # Font tracking | |
| self.font_families: dict[str, FontFamily] = {} | |
| # Statistics | |
| self.total_elements = 0 | |
| self.errors: list[str] = [] | |
| self.warnings: list[str] = [] | |
| async def __aenter__(self): | |
| """Async context manager entry.""" | |
| await self._init_browser() | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| """Async context manager exit.""" | |
| await self._close_browser() | |
| async def _init_browser(self): | |
| """Initialize Playwright browser.""" | |
| playwright = await async_playwright().start() | |
| self.browser = await playwright.chromium.launch( | |
| headless=self.settings.browser.headless | |
| ) | |
| # Set viewport based on extraction mode | |
| if self.viewport == Viewport.DESKTOP: | |
| width = self.settings.viewport.desktop_width | |
| height = self.settings.viewport.desktop_height | |
| else: | |
| width = self.settings.viewport.mobile_width | |
| height = self.settings.viewport.mobile_height | |
| self.context = await self.browser.new_context( | |
| viewport={"width": width, "height": height}, | |
| user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" | |
| ) | |
| async def _close_browser(self): | |
| """Close browser and cleanup.""" | |
| if self.context: | |
| await self.context.close() | |
| if self.browser: | |
| await self.browser.close() | |
| async def _scroll_page(self, page: Page): | |
| """Scroll page to load lazy content.""" | |
| await page.evaluate(""" | |
| async () => { | |
| const delay = ms => new Promise(resolve => setTimeout(resolve, ms)); | |
| const height = document.body.scrollHeight; | |
| const step = window.innerHeight; | |
| for (let y = 0; y < height; y += step) { | |
| window.scrollTo(0, y); | |
| await delay(100); | |
| } | |
| // Scroll back to top | |
| window.scrollTo(0, 0); | |
| } | |
| """) | |
| # Wait for network idle after scrolling | |
| await page.wait_for_load_state("networkidle", timeout=self.settings.browser.network_idle_timeout) | |
| async def _extract_styles_from_page(self, page: Page) -> dict: | |
| """ | |
| Extract computed styles from all elements on the page. | |
| This is the core extraction logic — we get getComputedStyle for every element. | |
| """ | |
| styles_data = await page.evaluate(""" | |
| () => { | |
| const elements = document.querySelectorAll('*'); | |
| const results = { | |
| colors: [], | |
| typography: [], | |
| spacing: [], | |
| radius: [], | |
| shadows: [], | |
| elements_count: elements.length, | |
| }; | |
| const colorProperties = [ | |
| 'color', 'background-color', 'border-color', | |
| 'border-top-color', 'border-right-color', | |
| 'border-bottom-color', 'border-left-color', | |
| 'outline-color', 'text-decoration-color', | |
| ]; | |
| const spacingProperties = [ | |
| 'margin-top', 'margin-right', 'margin-bottom', 'margin-left', | |
| 'padding-top', 'padding-right', 'padding-bottom', 'padding-left', | |
| 'gap', 'row-gap', 'column-gap', | |
| ]; | |
| elements.forEach(el => { | |
| const tag = el.tagName.toLowerCase(); | |
| const styles = window.getComputedStyle(el); | |
| // Skip invisible elements | |
| if (styles.display === 'none' || styles.visibility === 'hidden') { | |
| return; | |
| } | |
| // --- COLORS --- | |
| colorProperties.forEach(prop => { | |
| const value = styles.getPropertyValue(prop); | |
| if (value && value !== 'rgba(0, 0, 0, 0)' && value !== 'transparent') { | |
| results.colors.push({ | |
| value: value, | |
| property: prop, | |
| element: tag, | |
| context: prop.includes('background') ? 'background' : | |
| prop.includes('border') ? 'border' : 'text', | |
| }); | |
| } | |
| }); | |
| // --- TYPOGRAPHY --- | |
| const fontFamily = styles.getPropertyValue('font-family'); | |
| const fontSize = styles.getPropertyValue('font-size'); | |
| const fontWeight = styles.getPropertyValue('font-weight'); | |
| const lineHeight = styles.getPropertyValue('line-height'); | |
| const letterSpacing = styles.getPropertyValue('letter-spacing'); | |
| if (fontSize && fontFamily) { | |
| results.typography.push({ | |
| fontFamily: fontFamily, | |
| fontSize: fontSize, | |
| fontWeight: fontWeight, | |
| lineHeight: lineHeight, | |
| letterSpacing: letterSpacing, | |
| element: tag, | |
| }); | |
| } | |
| // --- SPACING --- | |
| spacingProperties.forEach(prop => { | |
| const value = styles.getPropertyValue(prop); | |
| if (value && value !== '0px' && value !== 'auto' && value !== 'normal') { | |
| const px = parseFloat(value); | |
| if (!isNaN(px) && px > 0 && px < 500) { | |
| results.spacing.push({ | |
| value: value, | |
| valuePx: Math.round(px), | |
| property: prop, | |
| context: prop.includes('margin') ? 'margin' : | |
| prop.includes('padding') ? 'padding' : 'gap', | |
| }); | |
| } | |
| } | |
| }); | |
| // --- BORDER RADIUS --- | |
| const radiusProps = [ | |
| 'border-radius', 'border-top-left-radius', | |
| 'border-top-right-radius', 'border-bottom-left-radius', | |
| 'border-bottom-right-radius', | |
| ]; | |
| radiusProps.forEach(prop => { | |
| const value = styles.getPropertyValue(prop); | |
| if (value && value !== '0px') { | |
| results.radius.push({ | |
| value: value, | |
| element: tag, | |
| }); | |
| } | |
| }); | |
| // --- BOX SHADOW --- | |
| const shadow = styles.getPropertyValue('box-shadow'); | |
| if (shadow && shadow !== 'none') { | |
| results.shadows.push({ | |
| value: shadow, | |
| element: tag, | |
| }); | |
| } | |
| }); | |
| return results; | |
| } | |
| """) | |
| return styles_data | |
| async def _extract_css_variables(self, page: Page) -> dict: | |
| """ | |
| Extract CSS custom properties (variables) from :root and stylesheets. | |
| This catches colors defined as: | |
| - :root { --primary-color: #3860be; } | |
| - :root { --brand-cyan: #00c4cc; } | |
| """ | |
| css_vars = await page.evaluate(""" | |
| () => { | |
| const variables = {}; | |
| // 1. Get CSS variables from :root computed styles | |
| const rootStyles = getComputedStyle(document.documentElement); | |
| const rootCss = document.documentElement.style.cssText; | |
| // 2. Parse all stylesheets for CSS variables | |
| for (const sheet of document.styleSheets) { | |
| try { | |
| const rules = sheet.cssRules || sheet.rules; | |
| for (const rule of rules) { | |
| if (rule.style) { | |
| for (let i = 0; i < rule.style.length; i++) { | |
| const prop = rule.style[i]; | |
| if (prop.startsWith('--')) { | |
| const value = rule.style.getPropertyValue(prop).trim(); | |
| if (value) { | |
| variables[prop] = value; | |
| } | |
| } | |
| } | |
| } | |
| // Also check @media rules | |
| if (rule.cssRules) { | |
| for (const innerRule of rule.cssRules) { | |
| if (innerRule.style) { | |
| for (let i = 0; i < innerRule.style.length; i++) { | |
| const prop = innerRule.style[i]; | |
| if (prop.startsWith('--')) { | |
| const value = innerRule.style.getPropertyValue(prop).trim(); | |
| if (value) { | |
| variables[prop] = value; | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } catch (e) { | |
| // CORS may block access to external stylesheets | |
| console.log('Could not access stylesheet:', e); | |
| } | |
| } | |
| // 3. Get computed CSS variable values from :root | |
| const computedVars = {}; | |
| for (const prop of Object.keys(variables)) { | |
| const computed = rootStyles.getPropertyValue(prop).trim(); | |
| if (computed) { | |
| computedVars[prop] = computed; | |
| } | |
| } | |
| return { raw: variables, computed: computedVars }; | |
| } | |
| """) | |
| return css_vars | |
| async def _extract_svg_colors(self, page: Page) -> list[dict]: | |
| """ | |
| Extract colors from SVG elements (fill, stroke). | |
| This catches colors in: | |
| - <svg fill="#00c4cc"> | |
| - <path stroke="#3860be"> | |
| - <circle fill="rgb(188, 212, 50)"> | |
| """ | |
| svg_colors = await page.evaluate(""" | |
| () => { | |
| const colors = []; | |
| // Find all SVG elements | |
| const svgs = document.querySelectorAll('svg, svg *'); | |
| svgs.forEach(el => { | |
| // Check fill attribute | |
| const fill = el.getAttribute('fill'); | |
| if (fill && fill !== 'none' && fill !== 'currentColor' && !fill.startsWith('url(')) { | |
| colors.push({ | |
| value: fill, | |
| property: 'svg-fill', | |
| element: el.tagName.toLowerCase(), | |
| context: 'svg', | |
| }); | |
| } | |
| // Check stroke attribute | |
| const stroke = el.getAttribute('stroke'); | |
| if (stroke && stroke !== 'none' && stroke !== 'currentColor' && !stroke.startsWith('url(')) { | |
| colors.push({ | |
| value: stroke, | |
| property: 'svg-stroke', | |
| element: el.tagName.toLowerCase(), | |
| context: 'svg', | |
| }); | |
| } | |
| // Check computed styles for SVG elements | |
| const styles = getComputedStyle(el); | |
| const computedFill = styles.fill; | |
| const computedStroke = styles.stroke; | |
| if (computedFill && computedFill !== 'none' && !computedFill.startsWith('url(')) { | |
| colors.push({ | |
| value: computedFill, | |
| property: 'svg-fill-computed', | |
| element: el.tagName.toLowerCase(), | |
| context: 'svg', | |
| }); | |
| } | |
| if (computedStroke && computedStroke !== 'none' && !computedStroke.startsWith('url(')) { | |
| colors.push({ | |
| value: computedStroke, | |
| property: 'svg-stroke-computed', | |
| element: el.tagName.toLowerCase(), | |
| context: 'svg', | |
| }); | |
| } | |
| }); | |
| return colors; | |
| } | |
| """) | |
| return svg_colors | |
| async def _extract_inline_styles(self, page: Page) -> dict: | |
| """ | |
| Extract colors from inline style attributes. | |
| This catches colors in: | |
| - <div style="background-color: #bcd432;"> | |
| - <span style="color: rgb(0, 196, 204);"> | |
| """ | |
| inline_data = await page.evaluate(""" | |
| () => { | |
| const colors = []; | |
| const colorRegex = /#[0-9a-fA-F]{3,8}|rgb\\([^)]+\\)|rgba\\([^)]+\\)|hsl\\([^)]+\\)|hsla\\([^)]+\\)/gi; | |
| // Find all elements with inline styles | |
| const elements = document.querySelectorAll('[style]'); | |
| elements.forEach(el => { | |
| const styleAttr = el.getAttribute('style'); | |
| if (styleAttr) { | |
| const matches = styleAttr.match(colorRegex); | |
| if (matches) { | |
| matches.forEach(color => { | |
| colors.push({ | |
| value: color, | |
| property: 'inline-style', | |
| element: el.tagName.toLowerCase(), | |
| context: 'inline', | |
| }); | |
| }); | |
| } | |
| } | |
| }); | |
| return colors; | |
| } | |
| """) | |
| return inline_data | |
| async def _extract_stylesheet_colors(self, page: Page) -> list[dict]: | |
| """ | |
| Parse CSS stylesheets for color values. | |
| This catches colors defined in CSS rules that may not be | |
| currently applied to visible elements. | |
| Also fetches external stylesheets that may be CORS-blocked. | |
| """ | |
| css_colors = await page.evaluate(""" | |
| () => { | |
| const colors = []; | |
| const colorRegex = /#[0-9a-fA-F]{3,8}|rgb\\([^)]+\\)|rgba\\([^)]+\\)|hsl\\([^)]+\\)|hsla\\([^)]+\\)/gi; | |
| // Color-related CSS properties | |
| const colorProps = [ | |
| 'color', 'background-color', 'background', 'border-color', | |
| 'border-top-color', 'border-right-color', 'border-bottom-color', 'border-left-color', | |
| 'outline-color', 'box-shadow', 'text-shadow', 'fill', 'stroke', | |
| 'caret-color', 'column-rule-color', 'text-decoration-color', | |
| ]; | |
| // Parse all stylesheets | |
| for (const sheet of document.styleSheets) { | |
| try { | |
| const rules = sheet.cssRules || sheet.rules; | |
| for (const rule of rules) { | |
| if (rule.style) { | |
| colorProps.forEach(prop => { | |
| const value = rule.style.getPropertyValue(prop); | |
| if (value) { | |
| const matches = value.match(colorRegex); | |
| if (matches) { | |
| matches.forEach(color => { | |
| colors.push({ | |
| value: color, | |
| property: prop, | |
| element: 'css-rule', | |
| context: 'stylesheet', | |
| selector: rule.selectorText || '', | |
| }); | |
| }); | |
| } | |
| } | |
| }); | |
| } | |
| } | |
| } catch (e) { | |
| // CORS may block access to external stylesheets | |
| } | |
| } | |
| return colors; | |
| } | |
| """) | |
| return css_colors | |
| async def _fetch_external_css_colors(self, page: Page) -> list[dict]: | |
| """ | |
| Fetch and parse external CSS files directly to bypass CORS. | |
| This catches colors in external stylesheets that are blocked by CORS. | |
| """ | |
| colors = [] | |
| try: | |
| # Get all stylesheet URLs | |
| css_urls = await page.evaluate(""" | |
| () => { | |
| const urls = []; | |
| const links = document.querySelectorAll('link[rel="stylesheet"]'); | |
| links.forEach(link => { | |
| if (link.href) { | |
| urls.push(link.href); | |
| } | |
| }); | |
| return urls; | |
| } | |
| """) | |
| # Color regex pattern | |
| color_regex = re.compile(r'#[0-9a-fA-F]{3,8}|rgb\([^)]+\)|rgba\([^)]+\)|hsl\([^)]+\)|hsla\([^)]+\)', re.IGNORECASE) | |
| # Fetch each CSS file | |
| for css_url in css_urls[:10]: # Limit to 10 files | |
| try: | |
| response = await page.request.get(css_url, timeout=5000) | |
| if response.ok: | |
| css_text = await response.text() | |
| # Find all color values in CSS text | |
| matches = color_regex.findall(css_text) | |
| for match in matches: | |
| colors.append({ | |
| "value": match, | |
| "property": "external-css", | |
| "element": "css-file", | |
| "context": "external-stylesheet", | |
| }) | |
| except Exception as e: | |
| # Skip if fetch fails | |
| pass | |
| except Exception as e: | |
| self.warnings.append(f"External CSS fetch failed: {str(e)}") | |
| return colors | |
| async def _extract_all_page_colors(self, page: Page) -> list[dict]: | |
| """ | |
| Extract ALL color values from the page source and styles. | |
| This is a brute-force approach that scans the entire page HTML | |
| and all style blocks for any color values. | |
| """ | |
| colors = await page.evaluate(""" | |
| () => { | |
| const colors = []; | |
| const colorRegex = /#[0-9a-fA-F]{3,8}|rgb\\([^)]+\\)|rgba\\([^)]+\\)|hsl\\([^)]+\\)|hsla\\([^)]+\\)/gi; | |
| // 1. Scan all <style> tags | |
| const styleTags = document.querySelectorAll('style'); | |
| styleTags.forEach(style => { | |
| const matches = style.textContent.match(colorRegex); | |
| if (matches) { | |
| matches.forEach(color => { | |
| colors.push({ | |
| value: color, | |
| property: 'style-tag', | |
| element: 'style', | |
| context: 'style-block', | |
| }); | |
| }); | |
| } | |
| }); | |
| // 2. Scan data attributes that might contain colors | |
| const allElements = document.querySelectorAll('*'); | |
| allElements.forEach(el => { | |
| // Check data attributes | |
| for (const attr of el.attributes) { | |
| if (attr.name.startsWith('data-') || attr.name === 'style') { | |
| const matches = attr.value.match(colorRegex); | |
| if (matches) { | |
| matches.forEach(color => { | |
| colors.push({ | |
| value: color, | |
| property: attr.name, | |
| element: el.tagName.toLowerCase(), | |
| context: 'attribute', | |
| }); | |
| }); | |
| } | |
| } | |
| } | |
| // Check for color in class names (some frameworks use color classes) | |
| const classList = el.className; | |
| if (typeof classList === 'string') { | |
| const colorMatches = classList.match(colorRegex); | |
| if (colorMatches) { | |
| colorMatches.forEach(color => { | |
| colors.push({ | |
| value: color, | |
| property: 'class', | |
| element: el.tagName.toLowerCase(), | |
| context: 'class-name', | |
| }); | |
| }); | |
| } | |
| } | |
| }); | |
| // 3. Look for colors in script tags (config objects) | |
| const scriptTags = document.querySelectorAll('script'); | |
| scriptTags.forEach(script => { | |
| if (script.textContent && !script.src) { | |
| const matches = script.textContent.match(colorRegex); | |
| if (matches) { | |
| matches.forEach(color => { | |
| colors.push({ | |
| value: color, | |
| property: 'script', | |
| element: 'script', | |
| context: 'javascript', | |
| }); | |
| }); | |
| } | |
| } | |
| }); | |
| return colors; | |
| } | |
| """) | |
| return colors | |
| def _process_css_variables(self, css_vars: dict): | |
| """Process CSS variables and extract color tokens from them.""" | |
| computed = css_vars.get("computed", {}) | |
| raw = css_vars.get("raw", {}) | |
| # Store CSS variables | |
| self.css_variables = {**raw, **computed} | |
| # Extract colors from CSS variables | |
| color_regex = re.compile(r'#[0-9a-fA-F]{3,8}|rgb\([^)]+\)|rgba\([^)]+\)|hsl\([^)]+\)|hsla\([^)]+\)', re.IGNORECASE) | |
| for var_name, value in computed.items(): | |
| if color_regex.match(value.strip()): | |
| # This is a color variable | |
| color_data = { | |
| "value": value.strip(), | |
| "property": var_name, | |
| "element": ":root", | |
| "context": "css-variable", | |
| } | |
| hex_value = self._process_color(color_data) | |
| if hex_value and hex_value not in self.colors: | |
| contrast_white = get_contrast_with_white(hex_value) | |
| contrast_black = get_contrast_with_black(hex_value) | |
| compliance = check_wcag_compliance(hex_value, "#ffffff") | |
| self.colors[hex_value] = ColorToken( | |
| value=hex_value, | |
| frequency=1, | |
| contexts=["css-variable"], | |
| elements=[":root"], | |
| css_properties=[var_name], | |
| contrast_white=round(contrast_white, 2), | |
| contrast_black=round(contrast_black, 2), | |
| wcag_aa_large_text=compliance["aa_large_text"], | |
| wcag_aa_small_text=compliance["aa_normal_text"], | |
| source=TokenSource.DETECTED, # CSS variable is still "detected" | |
| confidence=Confidence.HIGH, | |
| ) | |
| elif hex_value and hex_value in self.colors: | |
| # Update existing token | |
| token = self.colors[hex_value] | |
| token.frequency += 1 | |
| if "css-variable" not in token.contexts: | |
| token.contexts.append("css-variable") | |
| if var_name not in token.css_properties: | |
| token.css_properties.append(var_name) | |
| def _process_color(self, color_data: dict) -> Optional[str]: | |
| """Process and normalize a color value.""" | |
| value = color_data.get("value", "") | |
| # Parse and normalize | |
| parsed = parse_color(value) | |
| if not parsed: | |
| return None | |
| return parsed.hex | |
| def _aggregate_colors(self, raw_colors: list[dict]): | |
| """Aggregate color data from extraction.""" | |
| for color_data in raw_colors: | |
| hex_value = self._process_color(color_data) | |
| if not hex_value: | |
| continue | |
| if hex_value not in self.colors: | |
| # Calculate contrast ratios | |
| contrast_white = get_contrast_with_white(hex_value) | |
| contrast_black = get_contrast_with_black(hex_value) | |
| compliance = check_wcag_compliance(hex_value, "#ffffff") | |
| self.colors[hex_value] = ColorToken( | |
| value=hex_value, | |
| frequency=0, | |
| contexts=[], | |
| elements=[], | |
| css_properties=[], | |
| contrast_white=round(contrast_white, 2), | |
| contrast_black=round(contrast_black, 2), | |
| wcag_aa_large_text=compliance["aa_large_text"], | |
| wcag_aa_small_text=compliance["aa_normal_text"], | |
| ) | |
| # Update frequency and context | |
| token = self.colors[hex_value] | |
| token.frequency += 1 | |
| context = color_data.get("context", "") | |
| if context and context not in token.contexts: | |
| token.contexts.append(context) | |
| element = color_data.get("element", "") | |
| if element and element not in token.elements: | |
| token.elements.append(element) | |
| prop = color_data.get("property", "") | |
| if prop and prop not in token.css_properties: | |
| token.css_properties.append(prop) | |
| def _aggregate_typography(self, raw_typography: list[dict]): | |
| """Aggregate typography data from extraction.""" | |
| for typo_data in raw_typography: | |
| # Create unique key | |
| font_family = typo_data.get("fontFamily", "") | |
| font_size = typo_data.get("fontSize", "") | |
| font_weight = typo_data.get("fontWeight", "400") | |
| line_height = typo_data.get("lineHeight", "normal") | |
| key = f"{font_size}|{font_weight}|{font_family[:50]}" | |
| if key not in self.typography: | |
| # Parse font size to px | |
| font_size_px = None | |
| if font_size.endswith("px"): | |
| try: | |
| font_size_px = float(font_size.replace("px", "")) | |
| except ValueError: | |
| pass | |
| # Parse line height | |
| line_height_computed = None | |
| if line_height and line_height != "normal": | |
| if line_height.endswith("px") and font_size_px: | |
| try: | |
| lh_px = float(line_height.replace("px", "")) | |
| line_height_computed = round(lh_px / font_size_px, 2) | |
| except ValueError: | |
| pass | |
| else: | |
| try: | |
| line_height_computed = float(line_height) | |
| except ValueError: | |
| pass | |
| self.typography[key] = TypographyToken( | |
| font_family=font_family.split(",")[0].strip().strip('"\''), | |
| font_size=font_size, | |
| font_size_px=font_size_px, | |
| font_weight=int(font_weight) if font_weight.isdigit() else 400, | |
| line_height=line_height, | |
| line_height_computed=line_height_computed, | |
| letter_spacing=typo_data.get("letterSpacing"), | |
| frequency=0, | |
| elements=[], | |
| ) | |
| # Update | |
| token = self.typography[key] | |
| token.frequency += 1 | |
| element = typo_data.get("element", "") | |
| if element and element not in token.elements: | |
| token.elements.append(element) | |
| # Track font families | |
| primary_font = token.font_family | |
| if primary_font not in self.font_families: | |
| self.font_families[primary_font] = FontFamily( | |
| name=primary_font, | |
| fallbacks=[f.strip().strip('"\'') for f in font_family.split(",")[1:]], | |
| frequency=0, | |
| ) | |
| self.font_families[primary_font].frequency += 1 | |
| def _aggregate_spacing(self, raw_spacing: list[dict]): | |
| """Aggregate spacing data from extraction.""" | |
| for space_data in raw_spacing: | |
| value = space_data.get("value", "") | |
| value_px = space_data.get("valuePx", 0) | |
| key = str(value_px) | |
| if key not in self.spacing: | |
| self.spacing[key] = SpacingToken( | |
| value=f"{value_px}px", | |
| value_px=value_px, | |
| frequency=0, | |
| contexts=[], | |
| properties=[], | |
| fits_base_4=value_px % 4 == 0, | |
| fits_base_8=value_px % 8 == 0, | |
| ) | |
| token = self.spacing[key] | |
| token.frequency += 1 | |
| context = space_data.get("context", "") | |
| if context and context not in token.contexts: | |
| token.contexts.append(context) | |
| prop = space_data.get("property", "") | |
| if prop and prop not in token.properties: | |
| token.properties.append(prop) | |
| def _aggregate_radius(self, raw_radius: list[dict]): | |
| """Aggregate border radius data.""" | |
| for radius_data in raw_radius: | |
| value = radius_data.get("value", "") | |
| # Normalize to simple format | |
| # "8px 8px 8px 8px" -> "8px" | |
| parts = value.split() | |
| if len(set(parts)) == 1: | |
| value = parts[0] | |
| if value not in self.radius: | |
| value_px = None | |
| if value.endswith("px"): | |
| try: | |
| value_px = int(float(value.replace("px", ""))) | |
| except ValueError: | |
| pass | |
| self.radius[value] = RadiusToken( | |
| value=value, | |
| value_px=value_px, | |
| frequency=0, | |
| elements=[], | |
| fits_base_4=value_px % 4 == 0 if value_px else False, | |
| fits_base_8=value_px % 8 == 0 if value_px else False, | |
| ) | |
| token = self.radius[value] | |
| token.frequency += 1 | |
| element = radius_data.get("element", "") | |
| if element and element not in token.elements: | |
| token.elements.append(element) | |
| def _aggregate_shadows(self, raw_shadows: list[dict]): | |
| """Aggregate box shadow data.""" | |
| for shadow_data in raw_shadows: | |
| value = shadow_data.get("value", "") | |
| if value not in self.shadows: | |
| self.shadows[value] = ShadowToken( | |
| value=value, | |
| frequency=0, | |
| elements=[], | |
| ) | |
| token = self.shadows[value] | |
| token.frequency += 1 | |
| element = shadow_data.get("element", "") | |
| if element and element not in token.elements: | |
| token.elements.append(element) | |
| def _calculate_confidence(self, frequency: int) -> Confidence: | |
| """Calculate confidence level based on frequency.""" | |
| if frequency >= 10: | |
| return Confidence.HIGH | |
| elif frequency >= 3: | |
| return Confidence.MEDIUM | |
| return Confidence.LOW | |
| def _detect_spacing_base(self) -> Optional[int]: | |
| """Detect the base spacing unit (4 or 8).""" | |
| fits_4 = sum(1 for s in self.spacing.values() if s.fits_base_4) | |
| fits_8 = sum(1 for s in self.spacing.values() if s.fits_base_8) | |
| total = len(self.spacing) | |
| if total == 0: | |
| return None | |
| # If 80%+ values fit base 8, use 8 | |
| if fits_8 / total >= 0.8: | |
| return 8 | |
| # If 80%+ values fit base 4, use 4 | |
| elif fits_4 / total >= 0.8: | |
| return 4 | |
| return None | |
| async def extract( | |
| self, | |
| pages: list[str], | |
| progress_callback: Optional[Callable[[float], None]] = None | |
| ) -> ExtractedTokens: | |
| """ | |
| Extract tokens from a list of pages. | |
| Enhanced extraction includes: | |
| - DOM computed styles | |
| - CSS variables from :root | |
| - SVG fill/stroke colors | |
| - Inline style colors | |
| - Stylesheet color rules | |
| Args: | |
| pages: List of URLs to crawl | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| ExtractedTokens with all discovered tokens | |
| """ | |
| start_time = datetime.now() | |
| pages_crawled = [] | |
| async with self: | |
| for i, url in enumerate(pages): | |
| try: | |
| page = await self.context.new_page() | |
| # Navigate with fallback strategy | |
| try: | |
| await page.goto( | |
| url, | |
| wait_until="domcontentloaded", | |
| timeout=60000 # 60 seconds | |
| ) | |
| # Wait for JS to render | |
| await page.wait_for_timeout(2000) | |
| except Exception as nav_error: | |
| # Fallback to load event | |
| try: | |
| await page.goto( | |
| url, | |
| wait_until="load", | |
| timeout=60000 | |
| ) | |
| await page.wait_for_timeout(3000) | |
| except Exception: | |
| self.warnings.append(f"Slow load for {url}, extracting partial content") | |
| # Scroll to load lazy content | |
| await self._scroll_page(page) | |
| # ========================================================= | |
| # ENHANCED EXTRACTION: Multiple sources | |
| # ========================================================= | |
| # Track counts before extraction for this page | |
| colors_before = len(self.colors) | |
| typo_before = len(self.typography) | |
| spacing_before = len(self.spacing) | |
| radius_before = len(self.radius) | |
| shadows_before = len(self.shadows) | |
| # 1. Extract DOM computed styles (original method) | |
| styles = await self._extract_styles_from_page(page) | |
| dom_colors = len(styles.get("colors", [])) | |
| self._aggregate_colors(styles.get("colors", [])) | |
| self._aggregate_typography(styles.get("typography", [])) | |
| self._aggregate_spacing(styles.get("spacing", [])) | |
| self._aggregate_radius(styles.get("radius", [])) | |
| self._aggregate_shadows(styles.get("shadows", [])) | |
| # 2. Extract CSS variables (--primary-color, etc.) | |
| css_var_count = 0 | |
| try: | |
| css_vars = await self._extract_css_variables(page) | |
| css_var_count = len(css_vars.get("computed", {})) | |
| self._process_css_variables(css_vars) | |
| except Exception as e: | |
| self.warnings.append(f"CSS variables extraction failed: {str(e)}") | |
| # 3. Extract SVG colors (fill, stroke) | |
| svg_color_count = 0 | |
| try: | |
| svg_colors = await self._extract_svg_colors(page) | |
| svg_color_count = len(svg_colors) | |
| self._aggregate_colors(svg_colors) | |
| except Exception as e: | |
| self.warnings.append(f"SVG color extraction failed: {str(e)}") | |
| # 4. Extract inline style colors | |
| inline_color_count = 0 | |
| try: | |
| inline_colors = await self._extract_inline_styles(page) | |
| inline_color_count = len(inline_colors) | |
| self._aggregate_colors(inline_colors) | |
| except Exception as e: | |
| self.warnings.append(f"Inline style extraction failed: {str(e)}") | |
| # 5. Extract stylesheet colors (CSS rules) | |
| stylesheet_color_count = 0 | |
| try: | |
| stylesheet_colors = await self._extract_stylesheet_colors(page) | |
| stylesheet_color_count = len(stylesheet_colors) | |
| self._aggregate_colors(stylesheet_colors) | |
| except Exception as e: | |
| self.warnings.append(f"Stylesheet color extraction failed: {str(e)}") | |
| # 6. Fetch external CSS files (bypass CORS) | |
| external_css_count = 0 | |
| try: | |
| external_colors = await self._fetch_external_css_colors(page) | |
| external_css_count = len(external_colors) | |
| self._aggregate_colors(external_colors) | |
| except Exception as e: | |
| self.warnings.append(f"External CSS fetch failed: {str(e)}") | |
| # 7. Brute-force scan all page content for colors | |
| page_scan_count = 0 | |
| try: | |
| page_colors = await self._extract_all_page_colors(page) | |
| page_scan_count = len(page_colors) | |
| self._aggregate_colors(page_colors) | |
| except Exception as e: | |
| self.warnings.append(f"Page scan failed: {str(e)}") | |
| # ========================================================= | |
| # Log extraction results for this page | |
| # ========================================================= | |
| colors_new = len(self.colors) - colors_before | |
| typo_new = len(self.typography) - typo_before | |
| spacing_new = len(self.spacing) - spacing_before | |
| radius_new = len(self.radius) - radius_before | |
| shadows_new = len(self.shadows) - shadows_before | |
| # Store extraction stats for logging | |
| self._last_extraction_stats = { | |
| "url": url, | |
| "dom_colors": dom_colors, | |
| "css_variables": css_var_count, | |
| "svg_colors": svg_color_count, | |
| "inline_colors": inline_color_count, | |
| "stylesheet_colors": stylesheet_color_count, | |
| "external_css_colors": external_css_count, | |
| "page_scan_colors": page_scan_count, | |
| "new_colors": colors_new, | |
| "new_typography": typo_new, | |
| "new_spacing": spacing_new, | |
| "new_radius": radius_new, | |
| "new_shadows": shadows_new, | |
| } | |
| # ========================================================= | |
| self.total_elements += styles.get("elements_count", 0) | |
| pages_crawled.append(url) | |
| await page.close() | |
| # Progress callback | |
| if progress_callback: | |
| progress_callback((i + 1) / len(pages)) | |
| # Rate limiting | |
| await asyncio.sleep(self.settings.crawl.crawl_delay_ms / 1000) | |
| except Exception as e: | |
| self.errors.append(f"Error extracting {url}: {str(e)}") | |
| # Calculate confidence for all tokens | |
| for token in self.colors.values(): | |
| token.confidence = self._calculate_confidence(token.frequency) | |
| for token in self.typography.values(): | |
| token.confidence = self._calculate_confidence(token.frequency) | |
| for token in self.spacing.values(): | |
| token.confidence = self._calculate_confidence(token.frequency) | |
| # Detect spacing base | |
| spacing_base = self._detect_spacing_base() | |
| # Mark outliers in spacing | |
| if spacing_base: | |
| for token in self.spacing.values(): | |
| if spacing_base == 8 and not token.fits_base_8: | |
| token.is_outlier = True | |
| elif spacing_base == 4 and not token.fits_base_4: | |
| token.is_outlier = True | |
| # Determine primary font | |
| if self.font_families: | |
| primary_font = max(self.font_families.values(), key=lambda f: f.frequency) | |
| primary_font.usage = "primary" | |
| # Build result | |
| end_time = datetime.now() | |
| duration_ms = int((end_time - start_time).total_seconds() * 1000) | |
| return ExtractedTokens( | |
| viewport=self.viewport, | |
| source_url=pages[0] if pages else "", | |
| pages_crawled=pages_crawled, | |
| colors=list(self.colors.values()), | |
| typography=list(self.typography.values()), | |
| spacing=list(self.spacing.values()), | |
| radius=list(self.radius.values()), | |
| shadows=list(self.shadows.values()), | |
| font_families=list(self.font_families.values()), | |
| spacing_base=spacing_base, | |
| extraction_timestamp=start_time, | |
| extraction_duration_ms=duration_ms, | |
| total_elements_analyzed=self.total_elements, | |
| unique_colors=len(self.colors), | |
| unique_font_sizes=len(set(t.font_size for t in self.typography.values())), | |
| unique_spacing_values=len(self.spacing), | |
| errors=self.errors, | |
| warnings=self.warnings, | |
| ) | |
| # ============================================================================= | |
| # CONVENIENCE FUNCTIONS | |
| # ============================================================================= | |
| async def extract_from_pages( | |
| pages: list[str], | |
| viewport: Viewport = Viewport.DESKTOP | |
| ) -> ExtractedTokens: | |
| """Convenience function to extract tokens from pages.""" | |
| extractor = TokenExtractor(viewport=viewport) | |
| return await extractor.extract(pages) | |
| async def extract_both_viewports(pages: list[str]) -> tuple[ExtractedTokens, ExtractedTokens]: | |
| """Extract tokens from both desktop and mobile viewports.""" | |
| desktop_extractor = TokenExtractor(viewport=Viewport.DESKTOP) | |
| mobile_extractor = TokenExtractor(viewport=Viewport.MOBILE) | |
| desktop_result = await desktop_extractor.extract(pages) | |
| mobile_result = await mobile_extractor.extract(pages) | |
| return desktop_result, mobile_result | |