q6 commited on
Commit
cbad4a3
·
1 Parent(s): bc1f305

Update hunt

Browse files
Files changed (2) hide show
  1. .gitignore +2 -1
  2. Client/Scripts/hunt.py +114 -66
.gitignore CHANGED
@@ -6,4 +6,5 @@ merge_dev.bat
6
  __pycache__
7
  *.png
8
  *.txt
9
- db.sqlite
 
 
6
  __pycache__
7
  *.png
8
  *.txt
9
+ db.sqlite
10
+ .?
Client/Scripts/hunt.py CHANGED
@@ -107,17 +107,16 @@ def upsert_urls(conn: sqlite3.Connection, rows: Sequence[Tuple[str, str]]) -> No
107
  rows,
108
  )
109
 
110
- def fetch_api_hunt_results(
111
  post_ids: Sequence[str],
112
  phpsessid: str,
113
  desc: str,
114
  stop_event: threading.Event = stop_event,
115
- ) -> Dict[str, str]:
116
  post_ids_list = list(post_ids)
117
  if not post_ids_list:
118
- return {}
119
  payload = {"post_ids": post_ids_list, "phpsessid": phpsessid}
120
- results: Dict[str, str] = {}
121
  try:
122
  with requests.post(
123
  f"{ENDPOINT}/pixif_stream",
@@ -156,10 +155,10 @@ def fetch_api_hunt_results(
156
  url = ""
157
  if DRY_RUN and url:
158
  tqdm.write(f"{post_id} -> {decode_if_binary(url)}")
159
- results[post_id] = url
160
  pbar.update(1)
161
  last_progress = now
162
  consecutive_timeouts = 0
 
163
  if idle_break:
164
  break
165
  break
@@ -171,7 +170,6 @@ def fetch_api_hunt_results(
171
  except KeyboardInterrupt:
172
  stop_event.set()
173
  raise
174
- return results
175
 
176
  conn = open_db(DB_PATH)
177
 
@@ -240,44 +238,47 @@ def download_one_image(
240
  pass
241
  return post_id, "error", str(exc)
242
 
243
- def download_images(
244
- to_download: Dict[str, str],
245
- dest_dir: str,
246
- phpsessid: str,
247
- max_workers: int = MAX_WORKERS,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
248
  stop_event: threading.Event = stop_event,
249
  ) -> None:
250
- if not to_download:
251
- return
252
-
253
- os.makedirs(dest_dir, exist_ok=True)
254
- futures = []
255
- executor = ThreadPoolExecutor(max_workers=max_workers)
256
- interrupted = False
257
- try:
258
- for post_id, url in to_download.items():
259
- if stop_event.is_set():
260
- break
261
- futures.append(executor.submit(download_one_image, post_id, url, dest_dir, phpsessid, stop_event))
262
-
263
- with tqdm(total=len(futures), unit="image", desc="Downloading images") as pbar:
264
- for future in as_completed(futures):
265
- if stop_event.is_set():
266
- break
267
- post_id, status, detail = future.result()
268
- if status == "error":
269
- tqdm.write(f"Failed {post_id}: {detail}")
270
- pbar.update(1)
271
- except KeyboardInterrupt:
272
- interrupted = True
273
- stop_event.set()
274
- for future in futures:
275
- future.cancel()
276
- executor.shutdown(wait=False, cancel_futures=True)
277
- raise
278
- finally:
279
- if not interrupted:
280
- executor.shutdown(wait=True)
281
 
282
  def decode_if_binary(val: Union[str, bytes]) -> str:
283
  if type(val) is bytes:
@@ -289,6 +290,12 @@ def scan_with_retries(
289
  phpsessid: str,
290
  conn: sqlite3.Connection,
291
  post_ids_dict: Dict[str, Optional[str]],
 
 
 
 
 
 
292
  desc: str,
293
  stop_event: threading.Event = stop_event,
294
  ) -> None:
@@ -297,18 +304,26 @@ def scan_with_retries(
297
  remaining = list(post_ids)
298
  attempts = 0
299
  while remaining and attempts < STREAM_MAX_RETRIES and not stop_event.is_set():
300
- results = fetch_api_hunt_results(remaining, phpsessid, desc, stop_event)
301
- post_ids_dict.update(results)
302
- remaining = [post_id for post_id in remaining if post_id not in results]
 
 
 
 
 
 
 
 
 
 
 
 
303
  if not remaining:
304
  break
305
  attempts += 1
306
  if STREAM_RETRY_DELAY_SECONDS:
307
  time.sleep(STREAM_RETRY_DELAY_SECONDS)
308
- if not DRY_RUN:
309
- rows = [(post_id, url) for post_id, url in post_ids_dict.items() if url is not None]
310
- with conn:
311
- upsert_urls(conn, rows)
312
 
313
  try:
314
  for index in indexs:
@@ -318,6 +333,7 @@ try:
318
  with open(os.path.join(ROOT_DIR, valid[index]), "r") as f:
319
  post_ids = f.read().split()
320
 
 
321
  post_ids_dict = fetch_cached_state(conn, post_ids)
322
  if DRY_RUN:
323
  filtered = list(post_ids)
@@ -329,28 +345,60 @@ try:
329
  ]
330
  print(f"Group: {group_name}\nFiltered: {len(filtered)}/{len(post_ids)}")
331
 
332
- if filtered:
333
- if DRY_RUN:
334
- print("Dry run outputs (post_id -> page):")
335
- scan_with_retries(filtered, phpsessid, conn, post_ids_dict, "API hunt", stop_event)
336
  if DRY_RUN:
 
 
 
 
337
  continue
338
- to_download = {post_id: decode_if_binary(url) for post_id, url in post_ids_dict.items() if url and f"{post_id}.png" not in images_cache}
339
 
340
- if to_download:
341
- print(f"Total images to download: {len(to_download)}")
342
- if not DRY_RUN:
343
- download_images(to_download, STASH_DIR, phpsessid, stop_event=stop_event)
344
-
345
- images_cache.update(os.listdir(STASH_DIR))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
346
 
347
  print("Linking images to the group directory...")
348
- for i, post_id in enumerate(post_ids):
349
- stash_path = os.path.join(STASH_DIR, f"{post_id}.png")
350
- dest_path = os.path.join(group_dir, f"{i}_{post_id}.png")
351
-
352
- if os.path.exists(stash_path) and not os.path.exists(dest_path):
353
- os.link(stash_path, dest_path)
354
 
355
  if len(os.listdir(group_dir)) == 0:
356
  shutil.rmtree(group_dir)
 
107
  rows,
108
  )
109
 
110
+ def iter_api_hunt_results(
111
  post_ids: Sequence[str],
112
  phpsessid: str,
113
  desc: str,
114
  stop_event: threading.Event = stop_event,
115
+ ) -> Iterator[Tuple[str, str]]:
116
  post_ids_list = list(post_ids)
117
  if not post_ids_list:
118
+ return
119
  payload = {"post_ids": post_ids_list, "phpsessid": phpsessid}
 
120
  try:
121
  with requests.post(
122
  f"{ENDPOINT}/pixif_stream",
 
155
  url = ""
156
  if DRY_RUN and url:
157
  tqdm.write(f"{post_id} -> {decode_if_binary(url)}")
 
158
  pbar.update(1)
159
  last_progress = now
160
  consecutive_timeouts = 0
161
+ yield post_id, url
162
  if idle_break:
163
  break
164
  break
 
170
  except KeyboardInterrupt:
171
  stop_event.set()
172
  raise
 
173
 
174
  conn = open_db(DB_PATH)
175
 
 
238
  pass
239
  return post_id, "error", str(exc)
240
 
241
+ def link_group_image(post_id: str, group_dir: str, post_indexes: Dict[str, int]) -> None:
242
+ index = post_indexes.get(post_id)
243
+ if index is None:
244
+ return
245
+ stash_path = os.path.join(STASH_DIR, f"{post_id}.png")
246
+ dest_path = os.path.join(group_dir, f"{index}_{post_id}.png")
247
+ if os.path.exists(stash_path) and not os.path.exists(dest_path):
248
+ os.link(stash_path, dest_path)
249
+
250
+ def handle_download_result(future, group_dir: str, post_indexes: Dict[str, int], pbar: tqdm) -> None:
251
+ post_id, status, detail = future.result()
252
+ if status == "error":
253
+ tqdm.write(f"Failed {post_id}: {detail}")
254
+ elif status in {"ok", "exists"}:
255
+ images_cache.add(f"{post_id}.png")
256
+ link_group_image(post_id, group_dir, post_indexes)
257
+ pbar.update(1)
258
+
259
+ def drain_downloads(
260
+ futures,
261
+ group_dir: str,
262
+ post_indexes: Dict[str, int],
263
+ pbar: tqdm,
264
+ ) -> None:
265
+ for future in list(futures):
266
+ if future.done():
267
+ futures.remove(future)
268
+ handle_download_result(future, group_dir, post_indexes, pbar)
269
+
270
+ def finish_downloads(
271
+ futures,
272
+ group_dir: str,
273
+ post_indexes: Dict[str, int],
274
+ pbar: tqdm,
275
  stop_event: threading.Event = stop_event,
276
  ) -> None:
277
+ for future in as_completed(list(futures)):
278
+ if stop_event.is_set():
279
+ break
280
+ futures.remove(future)
281
+ handle_download_result(future, group_dir, post_indexes, pbar)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
282
 
283
  def decode_if_binary(val: Union[str, bytes]) -> str:
284
  if type(val) is bytes:
 
290
  phpsessid: str,
291
  conn: sqlite3.Connection,
292
  post_ids_dict: Dict[str, Optional[str]],
293
+ executor: ThreadPoolExecutor,
294
+ futures,
295
+ queued_downloads,
296
+ group_dir: str,
297
+ post_indexes: Dict[str, int],
298
+ download_pbar: tqdm,
299
  desc: str,
300
  stop_event: threading.Event = stop_event,
301
  ) -> None:
 
304
  remaining = list(post_ids)
305
  attempts = 0
306
  while remaining and attempts < STREAM_MAX_RETRIES and not stop_event.is_set():
307
+ received = set()
308
+ for post_id, url in iter_api_hunt_results(remaining, phpsessid, desc, stop_event):
309
+ received.add(post_id)
310
+ post_ids_dict[post_id] = url
311
+ if not DRY_RUN:
312
+ with conn:
313
+ upsert_urls(conn, [(post_id, url)])
314
+ if url and f"{post_id}.png" not in images_cache and post_id not in queued_downloads:
315
+ queued_downloads.add(post_id)
316
+ future = executor.submit(download_one_image, post_id, url, STASH_DIR, phpsessid, stop_event)
317
+ futures.append(future)
318
+ download_pbar.total += 1
319
+ download_pbar.refresh()
320
+ drain_downloads(futures, group_dir, post_indexes, download_pbar)
321
+ remaining = [post_id for post_id in remaining if post_id not in received]
322
  if not remaining:
323
  break
324
  attempts += 1
325
  if STREAM_RETRY_DELAY_SECONDS:
326
  time.sleep(STREAM_RETRY_DELAY_SECONDS)
 
 
 
 
327
 
328
  try:
329
  for index in indexs:
 
333
  with open(os.path.join(ROOT_DIR, valid[index]), "r") as f:
334
  post_ids = f.read().split()
335
 
336
+ post_indexes = {post_id: i for i, post_id in enumerate(post_ids)}
337
  post_ids_dict = fetch_cached_state(conn, post_ids)
338
  if DRY_RUN:
339
  filtered = list(post_ids)
 
345
  ]
346
  print(f"Group: {group_name}\nFiltered: {len(filtered)}/{len(post_ids)}")
347
 
 
 
 
 
348
  if DRY_RUN:
349
+ if filtered:
350
+ print("Dry run outputs (post_id -> page):")
351
+ for _ in iter_api_hunt_results(filtered, phpsessid, "API hunt", stop_event):
352
+ pass
353
  continue
 
354
 
355
+ cached_downloads = {
356
+ post_id: decode_if_binary(url)
357
+ for post_id, url in post_ids_dict.items()
358
+ if url and f"{post_id}.png" not in images_cache
359
+ }
360
+ futures = []
361
+ queued_downloads = set()
362
+ executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
363
+ interrupted = False
364
+ try:
365
+ with tqdm(total=0, unit="image", desc="Downloading images") as download_pbar:
366
+ for post_id, url in cached_downloads.items():
367
+ queued_downloads.add(post_id)
368
+ futures.append(executor.submit(download_one_image, post_id, url, STASH_DIR, phpsessid, stop_event))
369
+ download_pbar.total += 1
370
+ download_pbar.refresh()
371
+ if filtered:
372
+ scan_with_retries(
373
+ filtered,
374
+ phpsessid,
375
+ conn,
376
+ post_ids_dict,
377
+ executor,
378
+ futures,
379
+ queued_downloads,
380
+ group_dir,
381
+ post_indexes,
382
+ download_pbar,
383
+ "API hunt",
384
+ stop_event,
385
+ )
386
+ finish_downloads(futures, group_dir, post_indexes, download_pbar, stop_event)
387
+ except KeyboardInterrupt:
388
+ interrupted = True
389
+ stop_event.set()
390
+ for future in futures:
391
+ future.cancel()
392
+ executor.shutdown(wait=False, cancel_futures=True)
393
+ raise
394
+ finally:
395
+ if not interrupted:
396
+ executor.shutdown(wait=True)
397
 
398
  print("Linking images to the group directory...")
399
+ images_cache.update(os.listdir(STASH_DIR))
400
+ for post_id in post_ids:
401
+ link_group_image(post_id, group_dir, post_indexes)
 
 
 
402
 
403
  if len(os.listdir(group_dir)) == 0:
404
  shutil.rmtree(group_dir)