p / Client /Scripts /hunt.py
q6's picture
Update hunt
cbad4a3
import json
import os
import sqlite3
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import shutil
from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union
import requests
from tqdm import tqdm
LOCAL = 0
DRY_RUN = 0
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
SCRIPTS_DIR = os.path.join(ROOT_DIR, "Scripts")
IMAGES_DIR = os.path.join(ROOT_DIR, "images")
STASH_DIR = os.path.join(IMAGES_DIR, "Stash")
DB_PATH = os.path.join(ROOT_DIR, "db.sqlite")
ENDPOINT = "http://127.0.0.1:7860" if LOCAL else "https://q6-p.hf.space"
IMG_BASE = "https://i.pximg.net/img-original/img/"
REQUEST_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0",
"Referer": "https://www.pixiv.net/",
}
MAX_WORKERS = min(16, os.cpu_count() or 8)
REQUEST_TIMEOUT = (120, 120)
STREAM_REQUEST_TIMEOUT = (120, 30)
STREAM_IDLE_TIMEOUT_SECONDS = 45
STREAM_MAX_READ_TIMEOUTS = 3
STREAM_MAX_RETRIES = 3
STREAM_RETRY_DELAY_SECONDS = 2
stop_event = threading.Event()
def read_dotenv_value(path: str, key: str) -> Optional[str]:
try:
with open(path, "r") as env_file:
for line in env_file:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
if k == key:
return v
except FileNotFoundError:
return None
return None
def get_phpsessid() -> str:
phpsessid = os.getenv("PHPSESSID")
if phpsessid:
return phpsessid
env_path = os.path.join(ROOT_DIR, ".env")
phpsessid = read_dotenv_value(env_path, "PHPSESSID")
if phpsessid:
return phpsessid
raise RuntimeError("PHPSESSID is not set in the environment or .env")
phpsessid = get_phpsessid()
os.makedirs(STASH_DIR, exist_ok=True)
images_cache = set(os.listdir(STASH_DIR))
def open_db(path: str) -> sqlite3.Connection:
conn = sqlite3.connect(path)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS pixif_cache (
post_id TEXT PRIMARY KEY,
url TEXT
)
"""
)
conn.commit()
return conn
def chunked(seq: Sequence[str], size: int) -> Iterator[List[str]]:
for i in range(0, len(seq), size):
yield seq[i:i + size]
def fetch_cached_state(
conn: sqlite3.Connection,
post_ids: Sequence[str],
) -> Dict[str, Optional[str]]:
post_ids_dict = {post_id: None for post_id in post_ids}
if not post_ids:
return post_ids_dict
for chunk in chunked(post_ids, 900):
placeholders = ",".join("?" for _ in chunk)
query = f"SELECT post_id, COALESCE(url, '') FROM pixif_cache WHERE post_id IN ({placeholders})"
for post_id, url in conn.execute(query, chunk):
post_ids_dict[post_id] = url
return post_ids_dict
def upsert_urls(conn: sqlite3.Connection, rows: Sequence[Tuple[str, str]]) -> None:
if not rows:
return
conn.executemany(
"""
INSERT INTO pixif_cache (post_id, url)
VALUES (?, ?)
ON CONFLICT(post_id) DO UPDATE SET url = excluded.url
""",
rows,
)
def iter_api_hunt_results(
post_ids: Sequence[str],
phpsessid: str,
desc: str,
stop_event: threading.Event = stop_event,
) -> Iterator[Tuple[str, str]]:
post_ids_list = list(post_ids)
if not post_ids_list:
return
payload = {"post_ids": post_ids_list, "phpsessid": phpsessid}
try:
with requests.post(
f"{ENDPOINT}/pixif_stream",
json=payload,
stream=True,
timeout=STREAM_REQUEST_TIMEOUT,
) as response:
response.raise_for_status()
with tqdm(total=len(post_ids_list), unit="post", desc=desc) as pbar:
last_progress = time.monotonic()
consecutive_timeouts = 0
idle_break = False
while not stop_event.is_set() and pbar.n < pbar.total:
if time.monotonic() - last_progress > STREAM_IDLE_TIMEOUT_SECONDS:
break
try:
for line in response.iter_lines(decode_unicode=True):
if stop_event.is_set():
break
now = time.monotonic()
if now - last_progress > STREAM_IDLE_TIMEOUT_SECONDS:
idle_break = True
break
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
post_id = event.get("post_id")
if post_id is None:
continue
post_id = str(post_id)
url = event.get("url")
if url is None:
url = ""
if DRY_RUN and url:
tqdm.write(f"{post_id} -> {decode_if_binary(url)}")
pbar.update(1)
last_progress = now
consecutive_timeouts = 0
yield post_id, url
if idle_break:
break
break
except requests.exceptions.ReadTimeout:
consecutive_timeouts += 1
if consecutive_timeouts >= STREAM_MAX_READ_TIMEOUTS:
break
continue
except KeyboardInterrupt:
stop_event.set()
raise
conn = open_db(DB_PATH)
valid = [f for f in os.listdir(ROOT_DIR) if f.endswith(".txt")]
for idx, file_name in enumerate(valid):
print(f"{idx + 1}: {file_name}")
inputs = input("Enter the index of the file: ").split()
indexs = []
for inp in inputs:
if "-" in inp:
start, end = map(int, inp.split("-"))
indexs.extend(range(start - 1, end))
elif inp.isdigit():
indexs.append(int(inp) - 1)
def build_image_url(url: str) -> str:
if url.startswith("http"):
return url
return IMG_BASE + url
def download_one_image(
post_id: str,
url: str,
dest_dir: str,
phpsessid: str,
stop_event: threading.Event = stop_event,
) -> Tuple[str, str, Optional[str]]:
dest_path = os.path.join(dest_dir, f"{post_id}.png")
if os.path.exists(dest_path):
return post_id, "exists", None
if stop_event.is_set():
return post_id, "cancelled", None
tmp_path = dest_path + ".part"
full_url = build_image_url(url)
try:
with requests.get(
full_url,
headers=REQUEST_HEADERS,
cookies={"PHPSESSID": phpsessid},
stream=True,
timeout=REQUEST_TIMEOUT,
) as response:
response.raise_for_status()
with open(tmp_path, "wb") as handle:
for chunk in response.iter_content(chunk_size=1024 * 1024):
if stop_event.is_set():
raise KeyboardInterrupt
if chunk:
handle.write(chunk)
os.replace(tmp_path, dest_path)
return post_id, "ok", None
except KeyboardInterrupt:
if os.path.exists(tmp_path):
try:
os.remove(tmp_path)
except OSError:
pass
return post_id, "cancelled", None
except Exception as exc:
if os.path.exists(tmp_path):
try:
os.remove(tmp_path)
except OSError:
pass
return post_id, "error", str(exc)
def link_group_image(post_id: str, group_dir: str, post_indexes: Dict[str, int]) -> None:
index = post_indexes.get(post_id)
if index is None:
return
stash_path = os.path.join(STASH_DIR, f"{post_id}.png")
dest_path = os.path.join(group_dir, f"{index}_{post_id}.png")
if os.path.exists(stash_path) and not os.path.exists(dest_path):
os.link(stash_path, dest_path)
def handle_download_result(future, group_dir: str, post_indexes: Dict[str, int], pbar: tqdm) -> None:
post_id, status, detail = future.result()
if status == "error":
tqdm.write(f"Failed {post_id}: {detail}")
elif status in {"ok", "exists"}:
images_cache.add(f"{post_id}.png")
link_group_image(post_id, group_dir, post_indexes)
pbar.update(1)
def drain_downloads(
futures,
group_dir: str,
post_indexes: Dict[str, int],
pbar: tqdm,
) -> None:
for future in list(futures):
if future.done():
futures.remove(future)
handle_download_result(future, group_dir, post_indexes, pbar)
def finish_downloads(
futures,
group_dir: str,
post_indexes: Dict[str, int],
pbar: tqdm,
stop_event: threading.Event = stop_event,
) -> None:
for future in as_completed(list(futures)):
if stop_event.is_set():
break
futures.remove(future)
handle_download_result(future, group_dir, post_indexes, pbar)
def decode_if_binary(val: Union[str, bytes]) -> str:
if type(val) is bytes:
return val.decode()
return val
def scan_with_retries(
post_ids: Sequence[str],
phpsessid: str,
conn: sqlite3.Connection,
post_ids_dict: Dict[str, Optional[str]],
executor: ThreadPoolExecutor,
futures,
queued_downloads,
group_dir: str,
post_indexes: Dict[str, int],
download_pbar: tqdm,
desc: str,
stop_event: threading.Event = stop_event,
) -> None:
if not post_ids:
return
remaining = list(post_ids)
attempts = 0
while remaining and attempts < STREAM_MAX_RETRIES and not stop_event.is_set():
received = set()
for post_id, url in iter_api_hunt_results(remaining, phpsessid, desc, stop_event):
received.add(post_id)
post_ids_dict[post_id] = url
if not DRY_RUN:
with conn:
upsert_urls(conn, [(post_id, url)])
if url and f"{post_id}.png" not in images_cache and post_id not in queued_downloads:
queued_downloads.add(post_id)
future = executor.submit(download_one_image, post_id, url, STASH_DIR, phpsessid, stop_event)
futures.append(future)
download_pbar.total += 1
download_pbar.refresh()
drain_downloads(futures, group_dir, post_indexes, download_pbar)
remaining = [post_id for post_id in remaining if post_id not in received]
if not remaining:
break
attempts += 1
if STREAM_RETRY_DELAY_SECONDS:
time.sleep(STREAM_RETRY_DELAY_SECONDS)
try:
for index in indexs:
group_name = valid[index].rsplit(".", 1)[0]
group_dir = os.path.join(IMAGES_DIR, group_name)
os.makedirs(group_dir, exist_ok=True)
with open(os.path.join(ROOT_DIR, valid[index]), "r") as f:
post_ids = f.read().split()
post_indexes = {post_id: i for i, post_id in enumerate(post_ids)}
post_ids_dict = fetch_cached_state(conn, post_ids)
if DRY_RUN:
filtered = list(post_ids)
else:
filtered = [
post_id
for post_id in post_ids
if post_ids_dict[post_id] is None and f"{post_id}.png" not in images_cache
]
print(f"Group: {group_name}\nFiltered: {len(filtered)}/{len(post_ids)}")
if DRY_RUN:
if filtered:
print("Dry run outputs (post_id -> page):")
for _ in iter_api_hunt_results(filtered, phpsessid, "API hunt", stop_event):
pass
continue
cached_downloads = {
post_id: decode_if_binary(url)
for post_id, url in post_ids_dict.items()
if url and f"{post_id}.png" not in images_cache
}
futures = []
queued_downloads = set()
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
interrupted = False
try:
with tqdm(total=0, unit="image", desc="Downloading images") as download_pbar:
for post_id, url in cached_downloads.items():
queued_downloads.add(post_id)
futures.append(executor.submit(download_one_image, post_id, url, STASH_DIR, phpsessid, stop_event))
download_pbar.total += 1
download_pbar.refresh()
if filtered:
scan_with_retries(
filtered,
phpsessid,
conn,
post_ids_dict,
executor,
futures,
queued_downloads,
group_dir,
post_indexes,
download_pbar,
"API hunt",
stop_event,
)
finish_downloads(futures, group_dir, post_indexes, download_pbar, stop_event)
except KeyboardInterrupt:
interrupted = True
stop_event.set()
for future in futures:
future.cancel()
executor.shutdown(wait=False, cancel_futures=True)
raise
finally:
if not interrupted:
executor.shutdown(wait=True)
print("Linking images to the group directory...")
images_cache.update(os.listdir(STASH_DIR))
for post_id in post_ids:
link_group_image(post_id, group_dir, post_indexes)
if len(os.listdir(group_dir)) == 0:
shutil.rmtree(group_dir)
else:
novelai_source = os.path.join(SCRIPTS_DIR, "novelai.py")
novelai_dest = os.path.join(group_dir, "!novelai.py")
if not os.path.exists(novelai_dest) and os.path.exists(novelai_source):
os.link(novelai_source, novelai_dest)
print(f"Linked novelai.py to images/{group_name}/!novelai.py")
comfy_source = os.path.join(SCRIPTS_DIR, "comfy.py")
comfy_dest = os.path.join(group_dir, "!comfy.py")
if not os.path.exists(comfy_dest) and os.path.exists(comfy_source):
os.link(comfy_source, comfy_dest)
print(f"Linked comfy.py to images/{group_name}/!comfy.py")
except KeyboardInterrupt:
stop_event.set()
raise
finally:
conn.close()