| import json |
| import time |
| from html import unescape |
| from pathlib import Path |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from typing import Optional, Union |
|
|
| import cloudscraper |
| from bs4 import BeautifulSoup |
|
|
|
|
| _GALLERY_URL = "https://image-generation.perchance.org/gallery" |
| _PER_PAGE = 200 |
|
|
| _HEADERS = { |
| "User-Agent": ( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/145.0.0.0 Safari/537.36" |
| ), |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
| "Referer": "https://image-generation.perchance.org/", |
| "Origin": "https://image-generation.perchance.org", |
| } |
|
|
| _VALID_SORT = ("recent", "trending", "top") |
| _VALID_TIME = ("all-time", "1-month") |
| _VALID_FILTER = ("none", "pg13") |
|
|
|
|
| class GalleryScraper: |
| """ |
| Perchance AI Gallery scraper. |
| |
| Example: |
| result = GalleryScraper(start_page=1, pages=3, sort="top") |
| print(result.data) |
| """ |
|
|
| def __init__( |
| self, |
| start_page: int = 1, |
| pages: int = 1, |
| sort: str = "top", |
| time_range: str = "all-time", |
| content_filter: str = "none", |
| concurrency: int = 1, |
| timeout: int = 30, |
| save: Union[bool, str] = False, |
| ): |
| if start_page < 1: |
| raise ValueError("start_page must be >= 1") |
| if pages < 1: |
| raise ValueError("pages must be >= 1") |
| if sort not in _VALID_SORT: |
| raise ValueError(f"sort must be one of {_VALID_SORT}, got '{sort}'") |
| if time_range not in _VALID_TIME: |
| raise ValueError(f"time_range must be one of {_VALID_TIME}, got '{time_range}'") |
| if content_filter not in _VALID_FILTER: |
| raise ValueError(f"content_filter must be one of {_VALID_FILTER}, got '{content_filter}'") |
| if concurrency < 1: |
| concurrency = 1 |
|
|
| self.start_page = start_page |
| self.pages = pages |
| self.sort = sort |
| self.time_range = time_range |
| self.content_filter = content_filter |
| self.concurrency = concurrency |
| self.timeout = timeout |
|
|
| self.data: list[dict] = [] |
| self.total: int = 0 |
| self.elapsed: float = 0.0 |
|
|
| self._log( |
| f"start_page={start_page} pages={pages} concurrency={concurrency} " |
| f"sort={sort} time={time_range} filter={content_filter}" |
| ) |
| self._log("=" * 60) |
|
|
| started = time.time() |
| scraper = cloudscraper.create_scraper() |
|
|
| raw_pages = self._fetch_all(scraper) |
| self.data = self._parse_all(raw_pages) |
| self.total = len(self.data) |
|
|
| self.elapsed = time.time() - started |
| self._log("=" * 60) |
| self._log(f"Done | {self.total} items | {self.elapsed:.2f}s") |
|
|
| if save: |
| self._save(save) |
|
|
| def _build_params(self, page_index: int) -> dict: |
| skip = page_index * _PER_PAGE |
| params = { |
| "sort": self.sort, |
| "timeRange": self.time_range, |
| "hideIfScoreIsBelow": "-1", |
| "contentFilter": self.content_filter, |
| "subChannel": "public", |
| "channel": "ai-text-to-image-generator", |
| } |
| if skip > 0: |
| params["skip"] = skip |
| return params |
|
|
| def _fetch_one( |
| self, |
| scraper: cloudscraper.CloudScraper, |
| page_index: int, |
| ) -> tuple[int, str]: |
| actual_page = self.start_page + page_index |
| skip = (actual_page - 1) * _PER_PAGE |
| self._log(f" [β] Fetching page {actual_page} (skip={skip}) ...") |
| t = time.time() |
|
|
| try: |
| resp = scraper.get( |
| _GALLERY_URL, |
| params=self._build_params(actual_page - 1), |
| headers=_HEADERS, |
| timeout=self.timeout, |
| ) |
| except Exception as exc: |
| self._log(f" [β] Page {actual_page} error: {exc} ({time.time() - t:.2f}s)") |
| return (page_index, "") |
|
|
| dt = time.time() - t |
| if resp.status_code != 200: |
| self._log(f" [β] Page {actual_page} HTTP {resp.status_code} ({dt:.2f}s)") |
| return (page_index, "") |
|
|
| self._log(f" [β] Page {actual_page} OK β {len(resp.text):,} chars ({dt:.2f}s)") |
| return (page_index, resp.text) |
|
|
| def _fetch_all(self, scraper: cloudscraper.CloudScraper) -> dict[int, str]: |
| results: dict[int, str] = {} |
|
|
| if self.concurrency == 1: |
| for page_index in range(self.pages): |
| idx, html = self._fetch_one(scraper, page_index) |
| results[idx] = html |
| return results |
|
|
| with ThreadPoolExecutor(max_workers=self.concurrency) as pool: |
| futures = { |
| pool.submit(self._fetch_one, scraper, page_index): page_index |
| for page_index in range(self.pages) |
| } |
| for future in as_completed(futures): |
| idx, html = future.result() |
| results[idx] = html |
|
|
| return results |
|
|
| @staticmethod |
| def _clean(value: Optional[str]) -> str: |
| if value is None: |
| return "" |
| return unescape(str(value)).replace("\r", "\n").strip() |
|
|
| def _parse_page(self, html: str) -> list[dict]: |
| if not html: |
| return [] |
|
|
| soup = BeautifulSoup(html, "html.parser") |
| items: list[dict] = [] |
|
|
| for card in soup.select(".imageCtn"): |
| prompt = self._clean(card.get("data-prompt")) |
| negative_prompt = self._clean(card.get("data-negative-prompt")) |
| guidance_scale = self._clean(card.get("data-guidance-scale")) |
| seed = self._clean(card.get("data-seed")) |
| is_nsfw = self._clean(card.get("data-is-nsfw")).lower() == "true" |
| title_attr = self._clean(card.get("data-title")) |
|
|
| img_tag = card.select_one(".imageWrapperInner img.image") |
| image_url = img_tag.get("src", "") if img_tag else "" |
|
|
| title_el = card.select_one(".image-title") |
| visible_title = self._clean(title_el.get_text(" ", strip=True)) if title_el else "" |
|
|
| item = { |
| "no": "", |
| "image_url": image_url, |
| "title": title_attr or visible_title, |
| "prompt": prompt, |
| "guidance_scale": guidance_scale, |
| "seed": seed, |
| "nsfw": is_nsfw, |
| } |
|
|
| if negative_prompt: |
| item["negative_prompt"] = negative_prompt |
|
|
| items.append(item) |
|
|
| return items |
|
|
| def _parse_all(self, raw_pages: dict[int, str]) -> list[dict]: |
| all_items: list[dict] = [] |
|
|
| for page_index in sorted(raw_pages.keys()): |
| parsed = self._parse_page(raw_pages[page_index]) |
| actual_page = self.start_page + page_index |
| self._log(f" [parse] Page {actual_page} β {len(parsed)} items") |
| all_items.extend(parsed) |
|
|
| for idx, item in enumerate(all_items, start=1): |
| item["no"] = idx |
|
|
| return all_items |
|
|
| def _save(self, save: Union[bool, str]) -> None: |
| out = save if isinstance(save, str) else "gallery_data.json" |
| Path(out).parent.mkdir(parents=True, exist_ok=True) |
|
|
| with open(out, "w", encoding="utf-8") as fp: |
| json.dump(self.data, fp, ensure_ascii=False, indent=2) |
|
|
| self._log(f"Saved β {out}") |
|
|
| @staticmethod |
| def _log(msg: str) -> None: |
| print(msg) |
|
|
| def __len__(self) -> int: |
| return self.total |
|
|
| def __getitem__(self, index): |
| return self.data[index] |
|
|
| def __iter__(self): |
| return iter(self.data) |
|
|
| def __repr__(self) -> str: |
| return ( |
| f"GalleryScraper(" |
| f"total={self.total}, " |
| f"pages={self.pages}, " |
| f"start_page={self.start_page}, " |
| f"sort='{self.sort}', " |
| f"elapsed={self.elapsed:.2f}s)" |
| ) |