| import json |
| import os |
| import sqlite3 |
| import threading |
| import time |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| import shutil |
| from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union |
|
|
| import requests |
| from tqdm import tqdm |
|
|
| LOCAL = 0 |
| DRY_RUN = 0 |
| ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) |
| SCRIPTS_DIR = os.path.join(ROOT_DIR, "Scripts") |
| IMAGES_DIR = os.path.join(ROOT_DIR, "images") |
| STASH_DIR = os.path.join(IMAGES_DIR, "Stash") |
| DB_PATH = os.path.join(ROOT_DIR, "db.sqlite") |
| ENDPOINT = "http://127.0.0.1:7860" if LOCAL else "https://q6-p.hf.space" |
| IMG_BASE = "https://i.pximg.net/img-original/img/" |
| REQUEST_HEADERS = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", |
| "Referer": "https://www.pixiv.net/", |
| } |
| MAX_WORKERS = min(16, os.cpu_count() or 8) |
| REQUEST_TIMEOUT = (120, 120) |
| STREAM_REQUEST_TIMEOUT = (120, 30) |
| STREAM_IDLE_TIMEOUT_SECONDS = 45 |
| STREAM_MAX_READ_TIMEOUTS = 3 |
| STREAM_MAX_RETRIES = 3 |
| STREAM_RETRY_DELAY_SECONDS = 2 |
| stop_event = threading.Event() |
|
|
| def read_dotenv_value(path: str, key: str) -> Optional[str]: |
| try: |
| with open(path, "r") as env_file: |
| for line in env_file: |
| line = line.strip() |
| if not line or line.startswith("#") or "=" not in line: |
| continue |
| k, v = line.split("=", 1) |
| if k == key: |
| return v |
| except FileNotFoundError: |
| return None |
| return None |
|
|
| def get_phpsessid() -> str: |
| phpsessid = os.getenv("PHPSESSID") |
| if phpsessid: |
| return phpsessid |
| env_path = os.path.join(ROOT_DIR, ".env") |
| phpsessid = read_dotenv_value(env_path, "PHPSESSID") |
| if phpsessid: |
| return phpsessid |
| raise RuntimeError("PHPSESSID is not set in the environment or .env") |
|
|
| phpsessid = get_phpsessid() |
|
|
| os.makedirs(STASH_DIR, exist_ok=True) |
|
|
| images_cache = set(os.listdir(STASH_DIR)) |
|
|
| def open_db(path: str) -> sqlite3.Connection: |
| conn = sqlite3.connect(path) |
| conn.execute( |
| """ |
| CREATE TABLE IF NOT EXISTS pixif_cache ( |
| post_id TEXT PRIMARY KEY, |
| url TEXT |
| ) |
| """ |
| ) |
| conn.commit() |
| return conn |
|
|
| def chunked(seq: Sequence[str], size: int) -> Iterator[List[str]]: |
| for i in range(0, len(seq), size): |
| yield seq[i:i + size] |
|
|
| def fetch_cached_state( |
| conn: sqlite3.Connection, |
| post_ids: Sequence[str], |
| ) -> Dict[str, Optional[str]]: |
| post_ids_dict = {post_id: None for post_id in post_ids} |
| if not post_ids: |
| return post_ids_dict |
|
|
| for chunk in chunked(post_ids, 900): |
| placeholders = ",".join("?" for _ in chunk) |
| query = f"SELECT post_id, COALESCE(url, '') FROM pixif_cache WHERE post_id IN ({placeholders})" |
| for post_id, url in conn.execute(query, chunk): |
| post_ids_dict[post_id] = url |
|
|
| return post_ids_dict |
|
|
| def upsert_urls(conn: sqlite3.Connection, rows: Sequence[Tuple[str, str]]) -> None: |
| if not rows: |
| return |
| conn.executemany( |
| """ |
| INSERT INTO pixif_cache (post_id, url) |
| VALUES (?, ?) |
| ON CONFLICT(post_id) DO UPDATE SET url = excluded.url |
| """, |
| rows, |
| ) |
|
|
| def iter_api_hunt_results( |
| post_ids: Sequence[str], |
| phpsessid: str, |
| desc: str, |
| stop_event: threading.Event = stop_event, |
| ) -> Iterator[Tuple[str, str]]: |
| post_ids_list = list(post_ids) |
| if not post_ids_list: |
| return |
| payload = {"post_ids": post_ids_list, "phpsessid": phpsessid} |
| try: |
| with requests.post( |
| f"{ENDPOINT}/pixif_stream", |
| json=payload, |
| stream=True, |
| timeout=STREAM_REQUEST_TIMEOUT, |
| ) as response: |
| response.raise_for_status() |
| with tqdm(total=len(post_ids_list), unit="post", desc=desc) as pbar: |
| last_progress = time.monotonic() |
| consecutive_timeouts = 0 |
| idle_break = False |
| while not stop_event.is_set() and pbar.n < pbar.total: |
| if time.monotonic() - last_progress > STREAM_IDLE_TIMEOUT_SECONDS: |
| break |
| try: |
| for line in response.iter_lines(decode_unicode=True): |
| if stop_event.is_set(): |
| break |
| now = time.monotonic() |
| if now - last_progress > STREAM_IDLE_TIMEOUT_SECONDS: |
| idle_break = True |
| break |
| if not line: |
| continue |
| try: |
| event = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| post_id = event.get("post_id") |
| if post_id is None: |
| continue |
| post_id = str(post_id) |
| url = event.get("url") |
| if url is None: |
| url = "" |
| if DRY_RUN and url: |
| tqdm.write(f"{post_id} -> {decode_if_binary(url)}") |
| pbar.update(1) |
| last_progress = now |
| consecutive_timeouts = 0 |
| yield post_id, url |
| if idle_break: |
| break |
| break |
| except requests.exceptions.ReadTimeout: |
| consecutive_timeouts += 1 |
| if consecutive_timeouts >= STREAM_MAX_READ_TIMEOUTS: |
| break |
| continue |
| except KeyboardInterrupt: |
| stop_event.set() |
| raise |
|
|
| conn = open_db(DB_PATH) |
|
|
| valid = [f for f in os.listdir(ROOT_DIR) if f.endswith(".txt")] |
| for idx, file_name in enumerate(valid): |
| print(f"{idx + 1}: {file_name}") |
|
|
| inputs = input("Enter the index of the file: ").split() |
| indexs = [] |
| for inp in inputs: |
| if "-" in inp: |
| start, end = map(int, inp.split("-")) |
| indexs.extend(range(start - 1, end)) |
| elif inp.isdigit(): |
| indexs.append(int(inp) - 1) |
|
|
| def build_image_url(url: str) -> str: |
| if url.startswith("http"): |
| return url |
| return IMG_BASE + url |
|
|
| def download_one_image( |
| post_id: str, |
| url: str, |
| dest_dir: str, |
| phpsessid: str, |
| stop_event: threading.Event = stop_event, |
| ) -> Tuple[str, str, Optional[str]]: |
| dest_path = os.path.join(dest_dir, f"{post_id}.png") |
| if os.path.exists(dest_path): |
| return post_id, "exists", None |
| if stop_event.is_set(): |
| return post_id, "cancelled", None |
|
|
| tmp_path = dest_path + ".part" |
| full_url = build_image_url(url) |
| try: |
| with requests.get( |
| full_url, |
| headers=REQUEST_HEADERS, |
| cookies={"PHPSESSID": phpsessid}, |
| stream=True, |
| timeout=REQUEST_TIMEOUT, |
| ) as response: |
| response.raise_for_status() |
| with open(tmp_path, "wb") as handle: |
| for chunk in response.iter_content(chunk_size=1024 * 1024): |
| if stop_event.is_set(): |
| raise KeyboardInterrupt |
| if chunk: |
| handle.write(chunk) |
| os.replace(tmp_path, dest_path) |
| return post_id, "ok", None |
| except KeyboardInterrupt: |
| if os.path.exists(tmp_path): |
| try: |
| os.remove(tmp_path) |
| except OSError: |
| pass |
| return post_id, "cancelled", None |
| except Exception as exc: |
| if os.path.exists(tmp_path): |
| try: |
| os.remove(tmp_path) |
| except OSError: |
| pass |
| return post_id, "error", str(exc) |
|
|
| def link_group_image(post_id: str, group_dir: str, post_indexes: Dict[str, int]) -> None: |
| index = post_indexes.get(post_id) |
| if index is None: |
| return |
| stash_path = os.path.join(STASH_DIR, f"{post_id}.png") |
| dest_path = os.path.join(group_dir, f"{index}_{post_id}.png") |
| if os.path.exists(stash_path) and not os.path.exists(dest_path): |
| os.link(stash_path, dest_path) |
|
|
| def handle_download_result(future, group_dir: str, post_indexes: Dict[str, int], pbar: tqdm) -> None: |
| post_id, status, detail = future.result() |
| if status == "error": |
| tqdm.write(f"Failed {post_id}: {detail}") |
| elif status in {"ok", "exists"}: |
| images_cache.add(f"{post_id}.png") |
| link_group_image(post_id, group_dir, post_indexes) |
| pbar.update(1) |
|
|
| def drain_downloads( |
| futures, |
| group_dir: str, |
| post_indexes: Dict[str, int], |
| pbar: tqdm, |
| ) -> None: |
| for future in list(futures): |
| if future.done(): |
| futures.remove(future) |
| handle_download_result(future, group_dir, post_indexes, pbar) |
|
|
| def finish_downloads( |
| futures, |
| group_dir: str, |
| post_indexes: Dict[str, int], |
| pbar: tqdm, |
| stop_event: threading.Event = stop_event, |
| ) -> None: |
| for future in as_completed(list(futures)): |
| if stop_event.is_set(): |
| break |
| futures.remove(future) |
| handle_download_result(future, group_dir, post_indexes, pbar) |
|
|
| def decode_if_binary(val: Union[str, bytes]) -> str: |
| if type(val) is bytes: |
| return val.decode() |
| return val |
|
|
| def scan_with_retries( |
| post_ids: Sequence[str], |
| phpsessid: str, |
| conn: sqlite3.Connection, |
| post_ids_dict: Dict[str, Optional[str]], |
| executor: ThreadPoolExecutor, |
| futures, |
| queued_downloads, |
| group_dir: str, |
| post_indexes: Dict[str, int], |
| download_pbar: tqdm, |
| desc: str, |
| stop_event: threading.Event = stop_event, |
| ) -> None: |
| if not post_ids: |
| return |
| remaining = list(post_ids) |
| attempts = 0 |
| while remaining and attempts < STREAM_MAX_RETRIES and not stop_event.is_set(): |
| received = set() |
| for post_id, url in iter_api_hunt_results(remaining, phpsessid, desc, stop_event): |
| received.add(post_id) |
| post_ids_dict[post_id] = url |
| if not DRY_RUN: |
| with conn: |
| upsert_urls(conn, [(post_id, url)]) |
| if url and f"{post_id}.png" not in images_cache and post_id not in queued_downloads: |
| queued_downloads.add(post_id) |
| future = executor.submit(download_one_image, post_id, url, STASH_DIR, phpsessid, stop_event) |
| futures.append(future) |
| download_pbar.total += 1 |
| download_pbar.refresh() |
| drain_downloads(futures, group_dir, post_indexes, download_pbar) |
| remaining = [post_id for post_id in remaining if post_id not in received] |
| if not remaining: |
| break |
| attempts += 1 |
| if STREAM_RETRY_DELAY_SECONDS: |
| time.sleep(STREAM_RETRY_DELAY_SECONDS) |
|
|
| try: |
| for index in indexs: |
| group_name = valid[index].rsplit(".", 1)[0] |
| group_dir = os.path.join(IMAGES_DIR, group_name) |
| os.makedirs(group_dir, exist_ok=True) |
| with open(os.path.join(ROOT_DIR, valid[index]), "r") as f: |
| post_ids = f.read().split() |
|
|
| post_indexes = {post_id: i for i, post_id in enumerate(post_ids)} |
| post_ids_dict = fetch_cached_state(conn, post_ids) |
| if DRY_RUN: |
| filtered = list(post_ids) |
| else: |
| filtered = [ |
| post_id |
| for post_id in post_ids |
| if post_ids_dict[post_id] is None and f"{post_id}.png" not in images_cache |
| ] |
| print(f"Group: {group_name}\nFiltered: {len(filtered)}/{len(post_ids)}") |
|
|
| if DRY_RUN: |
| if filtered: |
| print("Dry run outputs (post_id -> page):") |
| for _ in iter_api_hunt_results(filtered, phpsessid, "API hunt", stop_event): |
| pass |
| continue |
|
|
| cached_downloads = { |
| post_id: decode_if_binary(url) |
| for post_id, url in post_ids_dict.items() |
| if url and f"{post_id}.png" not in images_cache |
| } |
| futures = [] |
| queued_downloads = set() |
| executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) |
| interrupted = False |
| try: |
| with tqdm(total=0, unit="image", desc="Downloading images") as download_pbar: |
| for post_id, url in cached_downloads.items(): |
| queued_downloads.add(post_id) |
| futures.append(executor.submit(download_one_image, post_id, url, STASH_DIR, phpsessid, stop_event)) |
| download_pbar.total += 1 |
| download_pbar.refresh() |
| if filtered: |
| scan_with_retries( |
| filtered, |
| phpsessid, |
| conn, |
| post_ids_dict, |
| executor, |
| futures, |
| queued_downloads, |
| group_dir, |
| post_indexes, |
| download_pbar, |
| "API hunt", |
| stop_event, |
| ) |
| finish_downloads(futures, group_dir, post_indexes, download_pbar, stop_event) |
| except KeyboardInterrupt: |
| interrupted = True |
| stop_event.set() |
| for future in futures: |
| future.cancel() |
| executor.shutdown(wait=False, cancel_futures=True) |
| raise |
| finally: |
| if not interrupted: |
| executor.shutdown(wait=True) |
|
|
| print("Linking images to the group directory...") |
| images_cache.update(os.listdir(STASH_DIR)) |
| for post_id in post_ids: |
| link_group_image(post_id, group_dir, post_indexes) |
|
|
| if len(os.listdir(group_dir)) == 0: |
| shutil.rmtree(group_dir) |
| else: |
| novelai_source = os.path.join(SCRIPTS_DIR, "novelai.py") |
| novelai_dest = os.path.join(group_dir, "!novelai.py") |
| if not os.path.exists(novelai_dest) and os.path.exists(novelai_source): |
| os.link(novelai_source, novelai_dest) |
| print(f"Linked novelai.py to images/{group_name}/!novelai.py") |
| comfy_source = os.path.join(SCRIPTS_DIR, "comfy.py") |
| comfy_dest = os.path.join(group_dir, "!comfy.py") |
| if not os.path.exists(comfy_dest) and os.path.exists(comfy_source): |
| os.link(comfy_source, comfy_dest) |
| print(f"Linked comfy.py to images/{group_name}/!comfy.py") |
| |
|
|
| except KeyboardInterrupt: |
| stop_event.set() |
| raise |
| finally: |
| conn.close() |
|
|