|
import time |
|
import urllib.request |
|
import os |
|
import aiohttp |
|
import asyncio |
|
from random import randint |
|
from selenium.webdriver.common.by import By |
|
from selenium.webdriver.common.keys import Keys |
|
from selenium.webdriver.support.ui import WebDriverWait |
|
from selenium.webdriver.support import expected_conditions as EC |
|
from commands.driver_instance import create_url_headers, tab_handler |
|
from commands.exec_path import imgList |
|
from commands.universal import searchQuery, save_Search, continue_Search, contains_works |
|
from ai.classifying_ai import img_classifier |
|
|
|
async def getOrderedDanbooruImages(driver, user_search, num_pics, num_pages, filters, bl_tags, inc_tags, exec_path, imageControl): |
|
global image_locations, bl_tags_list, inc_tags_list, image_names, ai_mode,rating_filters |
|
image_names = imgList(mode=0) |
|
image_locations = [] |
|
link = "https://danbooru.donmai.us/" |
|
|
|
if 0 in imageControl: |
|
continue_Search(driver, link, mode=1) |
|
else: |
|
driver.get(link) |
|
|
|
|
|
rating_filters = ["e"] if 2 in filters else [] |
|
rating_filters = ["s","e"] if 3 in filters else [] |
|
rating_filters = ["q","s","e"] if 4 in filters else [] |
|
|
|
|
|
score = 1 if 0 in filters else 0 |
|
match_type = 1 if 1 in filters else 0 |
|
r_18 = pg_lenient() if 2 in filters else [] |
|
r_18 = pg_strict() if 3 in filters else r_18 |
|
ai_mode = 1 |
|
|
|
continue_search = 1 if imageControl else 0 |
|
|
|
|
|
user_search = user_search.replace(" ", "_") |
|
score = filter_score(score) |
|
|
|
bl_tags_list = create_filter_tag_list(bl_tags, r_18) |
|
inc_tags_list = create_tag_list(inc_tags, match_type) if inc_tags else [] |
|
|
|
if 0 not in imageControl: |
|
searchQuery(user_search, driver, '//*[@name="tags"]', mode=1, score=score) |
|
|
|
if not contains_works(driver, '//*[@class="posts-container gap-2"]'): |
|
print("No works found...") |
|
return [] |
|
|
|
if ai_mode: |
|
WebDriverWait(driver, timeout=11).until(EC.presence_of_element_located((By.XPATH, '//*[@class="popup-menu-content"]'))) |
|
driver.get(driver.find_element(By.XPATH, '(//*[@class="popup-menu-content"]//li)[6]//a').get_attribute("href")) |
|
|
|
curr_page = driver.current_url |
|
while len(image_locations) < num_pics*num_pages: |
|
await pages_to_search(driver, num_pages, num_pics, exec_path) |
|
if curr_page == driver.current_url and len(image_locations) < num_pics*num_pages: |
|
print("Reached end of search results") |
|
break |
|
curr_page = driver.current_url |
|
driver.close() |
|
|
|
return image_locations |
|
|
|
def filter_score(score): |
|
if score: |
|
return " order:score" |
|
return "" |
|
|
|
async def pages_to_search(driver, num_pages, num_pics, exec_path): |
|
for i in range(num_pages): |
|
WebDriverWait(driver, timeout=11).until(EC.presence_of_element_located((By.XPATH, '//*[@class="posts-container gap-2"]'))) |
|
|
|
images = driver.find_element( |
|
By.XPATH, '//*[@class="posts-container gap-2"]' |
|
).find_elements(By.CLASS_NAME, "post-preview-link") |
|
await grid_search(driver, num_pics, images, exec_path, num_pages) |
|
save_Search(driver, mode=1) |
|
if not valid_page(driver) or len(image_locations) >= num_pics*num_pages: |
|
break |
|
|
|
async def grid_search(driver, num_pics, images, exec_path, num_pages): |
|
temp_img_len = len(image_locations) |
|
for n_iter, image in enumerate(images): |
|
if len(image_locations) >= num_pics*num_pages or len(image_locations) - temp_img_len >= num_pics: |
|
break |
|
|
|
try: |
|
if image.find_element(By.XPATH, ".//img").get_attribute('src').split("/")[-1].split(".")[0].encode("ascii", "ignore").decode("ascii") in image_names: |
|
print("\nImage already exists, moving to another image...") |
|
continue |
|
|
|
|
|
img_tags = driver.find_elements(By.CLASS_NAME, "post-preview")[n_iter].get_attribute('data-tags') |
|
img_rating = driver.find_elements(By.CLASS_NAME, "post-preview")[n_iter].get_attribute('data-rating') |
|
|
|
if filter_ratings(img_rating,rating_filters) and filter_tags(bl_tags_list, inc_tags_list, img_tags): |
|
|
|
|
|
if ai_mode: |
|
checker = 0 |
|
image_loc = await download_image(exec_path=exec_path, driver=driver, image=image) |
|
if img_classifier(image_loc): |
|
print("AI Mode: I approve this image") |
|
else: |
|
print("AI Mode: Skipping this image") |
|
checker = 1 |
|
os.remove(image_loc) |
|
if checker: |
|
continue |
|
|
|
driver, tempImg = tab_handler(driver=driver,image=image) |
|
WebDriverWait(driver, timeout=15).until(EC.presence_of_element_located((By.XPATH, '//*[@id="post-option-download"]/a'))) |
|
await download_image(exec_path=exec_path, driver=driver) |
|
driver = tab_handler(driver=driver) |
|
|
|
else: |
|
print("\nFilters did not match/Not an image, moving to another image...") |
|
|
|
except: |
|
print("\nI ran into an error, closing the tab and moving on...") |
|
if driver.window_handles[-1] != driver.window_handles[0]: |
|
driver = tab_handler(driver=driver) |
|
time.sleep(randint(0,2) + randint(0,9)/10) |
|
|
|
def filter_ratings(img_rating,rating_filters): |
|
if img_rating not in rating_filters: |
|
return True |
|
return False |
|
|
|
def filter_tags(bl_tags_list, inc_tags_list, img_tags): |
|
|
|
img_hash = {} |
|
for img_tag in img_tags.split(" "): |
|
img_hash[img_tag] = 1 |
|
|
|
|
|
if inc_tags_list and inc_tags_list[-1] == 1: |
|
inc_tags_list.pop() |
|
for tag in inc_tags_list: |
|
if not img_hash.get(tag, 0): |
|
return False |
|
elif inc_tags_list: |
|
cond = False |
|
for tag in inc_tags_list: |
|
if img_hash.get(tag, 0): |
|
cond = True |
|
break |
|
if not cond: |
|
return False |
|
|
|
|
|
for tag in bl_tags_list: |
|
if img_hash.get(tag,0): |
|
return False |
|
return True |
|
|
|
def create_tag_list(inc_tags, match_type): |
|
temp_tags = [tag.lstrip().replace(" ","_") for tag in inc_tags.split(",")] |
|
if match_type: |
|
temp_tags.append(1) |
|
return temp_tags |
|
|
|
def create_filter_tag_list(bl_tags, r_18): |
|
temp_tags = ["animated", "video", "sound"] |
|
if bl_tags: |
|
temp_tags += [tag.lstrip().replace(" ","_") for tag in bl_tags.split(",")] |
|
if r_18: |
|
temp_tags += r_18 |
|
return temp_tags |
|
|
|
|
|
def valid_page(driver): |
|
cur_url = driver.current_url |
|
driver.find_element(By.CLASS_NAME, "paginator-next").click() |
|
if cur_url == driver.current_url: |
|
return 0 |
|
return 1 |
|
|
|
def pg_lenient(): |
|
return ["sex","penis","vaginal","completely_nude","nude","exposed_boobs","ahegao","cum","no_panties","no_bra", |
|
"nipple_piercing", "anal_fluid","uncensored", "see-through", "pussy", "cunnilingus", "oral", "ass_focus", |
|
"anal", "sex_from_behind", "cum_on_clothes", "cum_on_face", "nipple","nipples", "missionary" |
|
"fellatio", "rape", "breasts_out","cum_in_pussy", "condom", "dildo", "sex_toy", "cum_in_mouth", "heavy_breathing", "cum_on_tongue" |
|
"panties", "panty_pull", "nude_cover", "underwear_only","grabbing_own_breast","ass_grab","censored","areola_slip","areolae","torn_pantyhose","micro_bikini","steaming_body"] |
|
|
|
def pg_strict(): |
|
return pg_lenient() + ["piercings", "cleavage","boobs","thongs","fellatio_gesture", "mosaic_censoring", "ass", "mosaic_censoring", |
|
"covered_nipples", "thigh_focus", "thighs", "bikini", "swimsuit", "grabbing_another's_breast", "huge_breasts", |
|
"foot_focus", "licking_foot", "foot_worship", "shirt_lift","clothes_lift", "underwear", "panties_under_pantyhose"] |
|
|
|
async def download_image(exec_path, driver, image=0): |
|
if not image: |
|
tempDL = driver.find_element(By.XPATH, '//*[@id="post-option-download"]/a') |
|
tempDLAttr = tempDL.get_attribute("href") |
|
tempDLName = tempDL.get_attribute("download").encode('ascii', 'ignore').decode('ascii') |
|
else: |
|
tempDLAttr = image.find_element(By.XPATH, ".//img").get_attribute('src') |
|
tempDLName = tempDLAttr.split("/")[-1].encode("ascii", "ignore").decode("ascii") |
|
print(f"\n{tempDLAttr.split('?')[0]}") |
|
|
|
img_loc = f"./{exec_path.folder_path}/{tempDLName}" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(tempDLAttr) as resp: |
|
with open(img_loc, 'wb') as fd: |
|
while True: |
|
chunk = await resp.content.read(1024) |
|
if not chunk: |
|
break |
|
fd.write(chunk) |
|
|
|
if not image: |
|
image_locations.append(img_loc) |
|
image_names.append(f"{tempDLName.split('.')[0]}") |
|
return img_loc |