| import concurrent.futures |
| import gzip |
| import io |
| import json |
| import os |
| import threading |
| from typing import Union |
|
|
| import numpy as np |
| import requests |
| from PIL import Image |
| from requests.adapters import HTTPAdapter |
| from tqdm import tqdm |
|
|
|
|
| IMG_BASE = "https://i.pximg.net/img-original/img/" |
| USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0" |
| DEFAULT_WORKERS = 8 |
| REQUEST_TIMEOUT = 45 |
|
|
| thread_local = threading.local() |
|
|
|
|
| def read_dotenv_value(path, key): |
| try: |
| with open(path, "r") as env_file: |
| for line in env_file: |
| line = line.strip() |
| if not line or line.startswith("#") or "=" not in line: |
| continue |
| k, v = line.split("=", 1) |
| if k == key: |
| return v |
| except FileNotFoundError: |
| return None |
| return None |
|
|
|
|
| def get_phpsessid(): |
| phpsessid = os.getenv("PHPSESSID") |
| if phpsessid: |
| return phpsessid |
| env_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".env")) |
| phpsessid = read_dotenv_value(env_path, "PHPSESSID") |
| if phpsessid: |
| return phpsessid |
| raise RuntimeError("PHPSESSID is not set in the environment or .env") |
|
|
|
|
| def byteize(alpha): |
| alpha = alpha.T.reshape((-1,)) |
| alpha = alpha[:(alpha.shape[0] // 8) * 8] |
| alpha = np.bitwise_and(alpha, 1) |
| alpha = alpha.reshape((-1, 8)) |
| alpha = np.packbits(alpha, axis=1) |
| return alpha |
|
|
|
|
| class LSBExtractor: |
| def __init__(self, alpha): |
| self.data = byteize(alpha) |
| self.pos = 0 |
|
|
| def get_one_byte(self): |
| byte = self.data[self.pos] |
| self.pos += 1 |
| return byte |
|
|
| def get_next_n_bytes(self, n): |
| n_bytes = self.data[self.pos:self.pos + n] |
| self.pos += n |
| return bytearray(n_bytes) |
|
|
| def read_32bit_integer(self): |
| bytes_list = self.get_next_n_bytes(4) |
| if len(bytes_list) == 4: |
| integer_value = int.from_bytes(bytes_list, byteorder="big") |
| return integer_value |
| return None |
|
|
|
|
| def extract_image_metadata(image: Union[Image.Image, np.ndarray]) -> dict: |
| if isinstance(image, Image.Image): |
| if "A" not in image.getbands(): |
| raise AssertionError("image format") |
| alpha = np.array(image.getchannel("A")) |
| else: |
| if image.ndim == 3: |
| alpha = image[..., -1] |
| else: |
| alpha = image |
| assert alpha.ndim == 2, "image format" |
| reader = LSBExtractor(alpha) |
| magic = "stealth_pngcomp" |
| read_magic = reader.get_next_n_bytes(len(magic)).decode("utf-8") |
| assert magic == read_magic, "magic number" |
| read_len = reader.read_32bit_integer() // 8 |
| json_data = reader.get_next_n_bytes(read_len) |
| json_data = json.loads(gzip.decompress(json_data).decode("utf-8")) |
| if "Comment" in json_data and isinstance(json_data["Comment"], str): |
| json_data["Comment"] = json.loads(json_data["Comment"]) |
| return json_data |
|
|
|
|
| def iter_text_files(): |
| valid = [f for f in os.listdir() if f.endswith(".txt")] |
| if not valid: |
| print("No .txt files found.") |
| return [] |
| for idx, file in enumerate(valid): |
| print(f"{idx + 1}: {file}") |
| inputs = input("Enter the index of the file: ").split() |
| indexes = [] |
| for inp in inputs: |
| if "-" in inp: |
| start, end = map(int, inp.split("-")) |
| indexes.extend(range(start - 1, end)) |
| elif inp.isdigit(): |
| indexes.append(int(inp) - 1) |
| indexes = [idx for idx in sorted(set(indexes)) if 0 <= idx < len(valid)] |
| return [valid[idx] for idx in indexes] |
|
|
|
|
| def fetch_post_pages(session, post_id): |
| url = f"https://www.pixiv.net/ajax/illust/{post_id}/pages" |
| response = session.get(url, timeout=REQUEST_TIMEOUT) |
| response.raise_for_status() |
| data = response.json() |
| return data.get("body") or [] |
|
|
|
|
| def has_stealth_png(session, image_url, post_id): |
| headers = {"Referer": f"https://www.pixiv.net/artworks/{post_id}"} |
| response = session.get(image_url, headers=headers, timeout=REQUEST_TIMEOUT) |
| response.raise_for_status() |
| image = Image.open(io.BytesIO(response.content)) |
| extract_image_metadata(image) |
| return True |
|
|
|
|
| def find_stealth_page(post_id, phpsessid): |
| session = get_thread_session(phpsessid) |
| try: |
| pages = fetch_post_pages(session, post_id) |
| except Exception: |
| return None |
|
|
| for idx, page in enumerate(pages): |
| original = page.get("urls", {}).get("original") |
| if not original or not original.lower().endswith(".png"): |
| continue |
| try: |
| if has_stealth_png(session, original, post_id): |
| return idx + 1 |
| except Exception: |
| continue |
| return None |
|
|
|
|
| def build_session(phpsessid): |
| session = requests.Session() |
| session.headers.update({"User-Agent": USER_AGENT, "Referer": "https://www.pixiv.net/"}) |
| session.cookies.update({"PHPSESSID": phpsessid}) |
| adapter = HTTPAdapter(pool_connections=DEFAULT_WORKERS * 2, pool_maxsize=DEFAULT_WORKERS * 2) |
| session.mount("https://", adapter) |
| session.mount("http://", adapter) |
| return session |
|
|
|
|
| def get_thread_session(phpsessid): |
| session = getattr(thread_local, "session", None) |
| if session is None: |
| session = build_session(phpsessid) |
| thread_local.session = session |
| return session |
|
|
|
|
| def main() -> int: |
| os.chdir(os.path.dirname(os.path.abspath(__file__))) |
| try: |
| phpsessid = get_phpsessid() |
| except Exception as exc: |
| print(f"Failed to load PHPSESSID: {exc}") |
| return 1 |
|
|
| files = iter_text_files() |
| if not files: |
| return 0 |
|
|
| workers = int(os.getenv("PIXIF_WORKERS", DEFAULT_WORKERS)) |
|
|
| for filename in files: |
| with open(filename, "r") as handle: |
| post_ids = handle.read().split() |
| if not post_ids: |
| continue |
|
|
| with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: |
| futures = { |
| executor.submit(find_stealth_page, post_id, phpsessid): post_id |
| for post_id in post_ids |
| } |
| bar = tqdm( |
| concurrent.futures.as_completed(futures), |
| total=len(futures), |
| desc=f"Scanning {filename}", |
| unit="post", |
| ) |
| for future in bar: |
| post_id = futures[future] |
| try: |
| page = future.result() |
| except Exception: |
| page = None |
| if page is not None: |
| tqdm.write(f"{post_id} page {page}") |
|
|
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|