q6 commited on
Commit
52fadc8
·
1 Parent(s): 16908bd
Client/Scripts/hunt.py CHANGED
@@ -207,15 +207,17 @@ def detect_exif_codes_from_files(
207
  ): post_id
208
  for post_id in post_ids
209
  }
210
- for future in as_completed(futures):
211
- if stop_event.is_set():
212
- break
213
- post_id = futures[future]
214
- try:
215
- code = future.result()
216
- except Exception:
217
- code = None
218
- results[post_id] = code
 
 
219
  return results
220
 
221
  def chunked(seq: Sequence[str], size: int) -> Iterator[List[str]]:
@@ -520,7 +522,7 @@ try:
520
  if filtered:
521
  if DRY_RUN:
522
  print("Dry run outputs (post_id -> page):")
523
- scan_with_retries(filtered, phpsessid, conn, post_ids_dict, exif_types, "Scanning", stop_event)
524
  if DRY_RUN:
525
  continue
526
  exif_pending = {
 
207
  ): post_id
208
  for post_id in post_ids
209
  }
210
+ with tqdm(total=len(futures), unit="image", desc="Scanning exif") as pbar:
211
+ for future in as_completed(futures):
212
+ if stop_event.is_set():
213
+ break
214
+ post_id = futures[future]
215
+ try:
216
+ code = future.result()
217
+ except Exception:
218
+ code = None
219
+ results[post_id] = code
220
+ pbar.update(1)
221
  return results
222
 
223
  def chunked(seq: Sequence[str], size: int) -> Iterator[List[str]]:
 
522
  if filtered:
523
  if DRY_RUN:
524
  print("Dry run outputs (post_id -> page):")
525
+ scan_with_retries(filtered, phpsessid, conn, post_ids_dict, exif_types, "Scanning exif", stop_event)
526
  if DRY_RUN:
527
  continue
528
  exif_pending = {
Client/Scripts/scan_existing_exif.py ADDED
@@ -0,0 +1,217 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import sqlite3
3
+ from concurrent.futures import ThreadPoolExecutor, as_completed
4
+ from typing import Dict, List, Optional, Sequence, Tuple
5
+
6
+ import numpy as np
7
+ from PIL import Image
8
+ from tqdm import tqdm
9
+
10
+ ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
11
+ IMAGES_DIR = os.path.join(ROOT_DIR, "images")
12
+ STASH_DIR = os.path.join(IMAGES_DIR, "Stash")
13
+ DB_PATH = os.path.join(ROOT_DIR, "db.sqlite")
14
+ MAX_WORKERS = min(16, os.cpu_count() or 8)
15
+ EXIF_METADATA_MAX_BYTES = 512
16
+ EXIF_TYPE_ORDER = ("novelai", "sd", "comfy", "mj", "celsys", "photoshop", "stealth")
17
+ EXIF_TYPE_TO_CODE = {name: idx + 1 for idx, name in enumerate(EXIF_TYPE_ORDER)}
18
+ PNG_SIGNATURE = b"\x89PNG\r\n\x1a\n"
19
+
20
+ def open_db(path: str) -> sqlite3.Connection:
21
+ conn = sqlite3.connect(path)
22
+ conn.execute(
23
+ """
24
+ CREATE TABLE IF NOT EXISTS pixif_cache (
25
+ post_id TEXT PRIMARY KEY,
26
+ url TEXT,
27
+ exif_type INTEGER
28
+ )
29
+ """
30
+ )
31
+ conn.commit()
32
+ ensure_db_schema(conn)
33
+ return conn
34
+
35
+ def ensure_db_schema(conn: sqlite3.Connection) -> None:
36
+ columns = [row[1] for row in conn.execute("PRAGMA table_info(pixif_cache)")]
37
+ if "exif_type" not in columns:
38
+ conn.execute("ALTER TABLE pixif_cache ADD COLUMN exif_type INTEGER")
39
+ conn.commit()
40
+
41
+ def determine_exif_type(metadata: Optional[bytes]) -> Optional[str]:
42
+ if metadata is None:
43
+ return None
44
+ if metadata == b"TitleAI generated image":
45
+ return "novelai"
46
+ if metadata.startswith(b"parameter"):
47
+ return "sd"
48
+ if b'{"' in metadata:
49
+ return "comfy"
50
+ if metadata.startswith(b"SoftwareCelsys"):
51
+ return "celsys"
52
+ return "photoshop"
53
+
54
+ def exif_type_to_code(exif_type: Optional[str]) -> Optional[int]:
55
+ if not exif_type:
56
+ return None
57
+ return EXIF_TYPE_TO_CODE.get(exif_type)
58
+
59
+ def parse_png_metadata(data: bytes) -> Optional[bytes]:
60
+ index = 8
61
+ while index < len(data):
62
+ if index + 8 > len(data):
63
+ break
64
+ chunk_len = int.from_bytes(data[index:index + 4], "big")
65
+ chunk_type = data[index + 4:index + 8]
66
+ index += 8
67
+ if chunk_type == b"tEXt":
68
+ content = data[index:index + chunk_len]
69
+ return content.replace(b"\0", b"")
70
+ if chunk_type == b"iTXt":
71
+ content = data[index:index + chunk_len]
72
+ return content.strip()
73
+ index += chunk_len + 4
74
+ return None
75
+
76
+ def parse_png_metadata_file(path: str) -> Optional[bytes]:
77
+ try:
78
+ with open(path, "rb") as handle:
79
+ head = handle.read(EXIF_METADATA_MAX_BYTES)
80
+ if not head.startswith(PNG_SIGNATURE):
81
+ return None
82
+ return parse_png_metadata(head)
83
+ except Exception:
84
+ return None
85
+
86
+ def byteize(alpha: np.ndarray) -> np.ndarray:
87
+ alpha = alpha.T.reshape((-1,))
88
+ alpha = alpha[:(alpha.shape[0] // 8) * 8]
89
+ alpha = np.bitwise_and(alpha, 1)
90
+ alpha = alpha.reshape((-1, 8))
91
+ alpha = np.packbits(alpha, axis=1)
92
+ return alpha
93
+
94
+ class LSBExtractor:
95
+ def __init__(self, alpha: np.ndarray) -> None:
96
+ self.data = byteize(alpha)
97
+ self.pos = 0
98
+
99
+ def get_next_n_bytes(self, n: int) -> bytearray:
100
+ n_bytes = self.data[self.pos:self.pos + n]
101
+ self.pos += n
102
+ return bytearray(n_bytes)
103
+
104
+ def read_32bit_integer(self) -> Optional[int]:
105
+ bytes_list = self.get_next_n_bytes(4)
106
+ if len(bytes_list) == 4:
107
+ return int.from_bytes(bytes_list, byteorder="big")
108
+ return None
109
+
110
+ def extract_stealth_metadata(image: Image.Image) -> bool:
111
+ if "A" not in image.getbands():
112
+ raise AssertionError("image format")
113
+ alpha = np.array(image.getchannel("A"))
114
+ reader = LSBExtractor(alpha)
115
+ magic = "stealth_pngcomp"
116
+ read_magic = reader.get_next_n_bytes(len(magic)).decode("utf-8")
117
+ if magic != read_magic:
118
+ raise AssertionError("magic number")
119
+ read_len = reader.read_32bit_integer()
120
+ if read_len is None:
121
+ raise AssertionError("length missing")
122
+ return True
123
+
124
+ def has_stealth_png_path(path: str) -> bool:
125
+ try:
126
+ with Image.open(path) as image:
127
+ return extract_stealth_metadata(image)
128
+ except Exception:
129
+ return False
130
+
131
+ def detect_exif_code_from_path(path: str) -> Optional[int]:
132
+ metadata = parse_png_metadata_file(path)
133
+ exif_type = determine_exif_type(metadata)
134
+ code = exif_type_to_code(exif_type)
135
+ if code is not None:
136
+ return code
137
+ if has_stealth_png_path(path):
138
+ return EXIF_TYPE_TO_CODE.get("stealth")
139
+ return None
140
+
141
+ def fetch_pending_post_ids(conn: sqlite3.Connection) -> List[str]:
142
+ rows = conn.execute(
143
+ """
144
+ SELECT post_id
145
+ FROM pixif_cache
146
+ WHERE exif_type IS NULL
147
+ AND COALESCE(url, '') != ''
148
+ """
149
+ ).fetchall()
150
+ return [str(row[0]) for row in rows]
151
+
152
+ def update_exif_types(conn: sqlite3.Connection, rows: Sequence[Tuple[int, str]]) -> None:
153
+ if not rows:
154
+ return
155
+ conn.executemany(
156
+ """
157
+ UPDATE pixif_cache SET exif_type = ?
158
+ WHERE post_id = ?
159
+ """,
160
+ rows,
161
+ )
162
+
163
+ def detect_exif_codes_from_files(
164
+ post_ids: Sequence[str],
165
+ stash_dir: str,
166
+ max_workers: int = MAX_WORKERS,
167
+ ) -> Dict[str, Optional[int]]:
168
+ if not post_ids:
169
+ return {}
170
+ results: Dict[str, Optional[int]] = {}
171
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
172
+ futures = {
173
+ executor.submit(
174
+ detect_exif_code_from_path,
175
+ os.path.join(stash_dir, f"{post_id}.png"),
176
+ ): post_id
177
+ for post_id in post_ids
178
+ }
179
+ with tqdm(total=len(futures), unit="image", desc="Scanning exif") as pbar:
180
+ for future in as_completed(futures):
181
+ post_id = futures[future]
182
+ try:
183
+ code = future.result()
184
+ except Exception:
185
+ code = None
186
+ results[post_id] = code
187
+ pbar.update(1)
188
+ return results
189
+
190
+ def main() -> int:
191
+ os.makedirs(STASH_DIR, exist_ok=True)
192
+ conn = open_db(DB_PATH)
193
+ try:
194
+ post_ids = fetch_pending_post_ids(conn)
195
+ if not post_ids:
196
+ print("No pending rows.")
197
+ return 0
198
+ existing = [post_id for post_id in post_ids if os.path.exists(os.path.join(STASH_DIR, f"{post_id}.png"))]
199
+ if not existing:
200
+ print("No matching images in stash.")
201
+ return 0
202
+ results = detect_exif_codes_from_files(existing, STASH_DIR)
203
+ rows = [
204
+ (exif_type, post_id)
205
+ for post_id, exif_type in results.items()
206
+ if exif_type is not None
207
+ ]
208
+ if rows:
209
+ with conn:
210
+ update_exif_types(conn, rows)
211
+ print(f"Updated {len(rows)} rows.")
212
+ return 0
213
+ finally:
214
+ conn.close()
215
+
216
+ if __name__ == "__main__":
217
+ raise SystemExit(main())
Client/main.py CHANGED
@@ -32,14 +32,15 @@ MENU_SECTIONS: Sequence[Tuple[str, Sequence[Tuple[str, str, str]]]] = (
32
  "Hunt",
33
  (
34
  ("5", "Hunt", os.path.join(SCRIPTS_DIR, "hunt.py")),
 
35
  ),
36
  ),
37
  (
38
  "Maintenance",
39
  (
40
- ("6", "Clear text logs", os.path.join(SCRIPTS_DIR, "clear_texts.py")),
41
- ("7", "Clear DB empty rows", os.path.join(SCRIPTS_DIR, "clear_db.py")),
42
- ("8", "Clear images (except Stash)", os.path.join(SCRIPTS_DIR, "clear_images.py")),
43
  ),
44
  ),
45
  )
 
32
  "Hunt",
33
  (
34
  ("5", "Hunt", os.path.join(SCRIPTS_DIR, "hunt.py")),
35
+ ("6", "Scan existing exif", os.path.join(SCRIPTS_DIR, "scan_existing_exif.py")),
36
  ),
37
  ),
38
  (
39
  "Maintenance",
40
  (
41
+ ("7", "Clear text logs", os.path.join(SCRIPTS_DIR, "clear_texts.py")),
42
+ ("8", "Clear DB empty rows", os.path.join(SCRIPTS_DIR, "clear_db.py")),
43
+ ("9", "Clear images (except Stash)", os.path.join(SCRIPTS_DIR, "clear_images.py")),
44
  ),
45
  ),
46
  )