Fsg-Pp / sites /danbooru.py
Kyo-Kai's picture
Update sites/danbooru.py
cde5c58 verified
raw
history blame
9.41 kB
import time
import urllib.request
import os
import aiohttp
import asyncio
from random import randint
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from commands.driver_instance import create_url_headers, tab_handler
from commands.exec_path import imgList
from commands.universal import searchQuery, save_Search, continue_Search, contains_works
from ai.classifying_ai import img_classifier
async def getOrderedDanbooruImages(driver, user_search, num_pics, num_pages, filters, bl_tags, inc_tags, exec_path, imageControl):
global image_locations, bl_tags_list, inc_tags_list, image_names, ai_mode,rating_filters
image_names = imgList(mode=0)
image_locations = []
link = "https://danbooru.donmai.us/"
if 0 in imageControl:
continue_Search(driver, link, mode=1)
else:
driver.get(link)
# Rating Filter Creation
rating_filters = ["e"] if 2 in filters else []
rating_filters = ["s","e"] if 3 in filters else []
rating_filters = ["q","s","e"] if 4 in filters else []
# Tag list creation
score = 1 if 0 in filters else 0
match_type = 1 if 1 in filters else 0
r_18 = pg_lenient() if 2 in filters else []
r_18 = pg_strict() if 3 in filters else r_18
ai_mode = 1
continue_search = 1 if imageControl else 0
# Replace spaces to make spaces feasible by the user
user_search = user_search.replace(" ", "_")
score = filter_score(score)
bl_tags_list = create_filter_tag_list(bl_tags, r_18)
inc_tags_list = create_tag_list(inc_tags, match_type) if inc_tags else []
if 0 not in imageControl:
searchQuery(user_search, driver, '//*[@name="tags"]', mode=1, score=score)
if not contains_works(driver, '//*[@class="posts-container gap-2"]'):
print("No works found...")
return []
if ai_mode:
WebDriverWait(driver, timeout=11).until(EC.presence_of_element_located((By.XPATH, '//*[@class="popup-menu-content"]')))
driver.get(driver.find_element(By.XPATH, '(//*[@class="popup-menu-content"]//li)[6]//a').get_attribute("href"))
curr_page = driver.current_url
while len(image_locations) < num_pics*num_pages:
await pages_to_search(driver, num_pages, num_pics, exec_path)
if curr_page == driver.current_url and len(image_locations) < num_pics*num_pages:
print("Reached end of search results")
break
curr_page = driver.current_url
driver.close()
return image_locations
def filter_score(score):
if score:
return " order:score"
return ""
async def pages_to_search(driver, num_pages, num_pics, exec_path):
for i in range(num_pages):
WebDriverWait(driver, timeout=11).until(EC.presence_of_element_located((By.XPATH, '//*[@class="posts-container gap-2"]')))
# Selects the picture grids
images = driver.find_element(
By.XPATH, '//*[@class="posts-container gap-2"]'
).find_elements(By.CLASS_NAME, "post-preview-link")
await grid_search(driver, num_pics, images, exec_path, num_pages)
save_Search(driver, mode=1)
if not valid_page(driver) or len(image_locations) >= num_pics*num_pages:
break
async def grid_search(driver, num_pics, images, exec_path, num_pages):
temp_img_len = len(image_locations)
for n_iter, image in enumerate(images):
if len(image_locations) >= num_pics*num_pages or len(image_locations) - temp_img_len >= num_pics:
break
try:
if image.find_element(By.XPATH, ".//img").get_attribute('src').split("/")[-1].split(".")[0].encode("ascii", "ignore").decode("ascii") in image_names:
print("\nImage already exists, moving to another image...")
continue
# Has to be checked this way otherwise tags are not visible in headless mode
img_tags = driver.find_elements(By.CLASS_NAME, "post-preview")[n_iter].get_attribute('data-tags')
img_rating = driver.find_elements(By.CLASS_NAME, "post-preview")[n_iter].get_attribute('data-rating')
if filter_ratings(img_rating,rating_filters) and filter_tags(bl_tags_list, inc_tags_list, img_tags):
if ai_mode:
checker = 0
image_loc = await download_image(exec_path=exec_path, driver=driver, image=image)
if img_classifier(image_loc):
print("AI Mode: I approve this image")
else:
print("AI Mode: Skipping this image")
checker = 1
os.remove(image_loc)
if checker:
continue
driver, tempImg = tab_handler(driver=driver,image=image)
WebDriverWait(driver, timeout=15).until(EC.presence_of_element_located((By.XPATH, '//*[@id="post-option-download"]/a')))
await download_image(exec_path=exec_path, driver=driver)
driver = tab_handler(driver=driver)
else:
print("\nFilters did not match/Not an image, moving to another image...")
except:
print("\nI ran into an error, closing the tab and moving on...")
if driver.window_handles[-1] != driver.window_handles[0]:
driver = tab_handler(driver=driver)
time.sleep(randint(0,2) + randint(0,9)/10)
def filter_ratings(img_rating,rating_filters):
if img_rating not in rating_filters:
return True
return False
def filter_tags(bl_tags_list, inc_tags_list, img_tags):
# Hashmap of picture's tags for O(1) time searching
img_hash = {}
for img_tag in img_tags.split(" "):
img_hash[img_tag] = 1
# Included tags (exact match or not exact)
if inc_tags_list and inc_tags_list[-1] == 1:
inc_tags_list.pop()
for tag in inc_tags_list:
if not img_hash.get(tag, 0):
return False
elif inc_tags_list:
cond = False
for tag in inc_tags_list:
if img_hash.get(tag, 0):
cond = True
break
if not cond:
return False
# Note that bl_tags_list is never empty since it filters videos
for tag in bl_tags_list:
if img_hash.get(tag,0):
return False
return True
def create_tag_list(inc_tags, match_type):
temp_tags = [tag.lstrip().replace(" ","_") for tag in inc_tags.split(",")]
if match_type:
temp_tags.append(1)
return temp_tags
def create_filter_tag_list(bl_tags, r_18):
temp_tags = ["animated", "video", "sound"]
if bl_tags:
temp_tags += [tag.lstrip().replace(" ","_") for tag in bl_tags.split(",")]
if r_18:
temp_tags += r_18
return temp_tags
# Find the next page and ensure it isn't the last page
def valid_page(driver):
cur_url = driver.current_url
driver.find_element(By.CLASS_NAME, "paginator-next").click()
if cur_url == driver.current_url:
return 0
return 1
def pg_lenient():
return ["sex","penis","vaginal","completely_nude","nude","exposed_boobs","ahegao","cum","no_panties","no_bra",
"nipple_piercing", "anal_fluid","uncensored", "see-through", "pussy", "cunnilingus", "oral", "ass_focus",
"anal", "sex_from_behind", "cum_on_clothes", "cum_on_face", "nipple","nipples", "missionary"
"fellatio", "rape", "breasts_out","cum_in_pussy", "condom", "dildo", "sex_toy", "cum_in_mouth", "heavy_breathing", "cum_on_tongue"
"panties", "panty_pull", "nude_cover", "underwear_only","grabbing_own_breast","ass_grab","censored","areola_slip","areolae","torn_pantyhose","micro_bikini","steaming_body"]
def pg_strict():
return pg_lenient() + ["piercings", "cleavage","boobs","thongs","fellatio_gesture", "mosaic_censoring", "ass", "mosaic_censoring",
"covered_nipples", "thigh_focus", "thighs", "bikini", "swimsuit", "grabbing_another's_breast", "huge_breasts",
"foot_focus", "licking_foot", "foot_worship", "shirt_lift","clothes_lift", "underwear", "panties_under_pantyhose"]
async def download_image(exec_path, driver, image=0):
if not image:
tempDL = driver.find_element(By.XPATH, '//*[@id="post-option-download"]/a')
tempDLAttr = tempDL.get_attribute("href")
tempDLName = tempDL.get_attribute("download").encode('ascii', 'ignore').decode('ascii')
else:
tempDLAttr = image.find_element(By.XPATH, ".//img").get_attribute('src')
tempDLName = tempDLAttr.split("/")[-1].encode("ascii", "ignore").decode("ascii")
print(f"\n{tempDLAttr.split('?')[0]}")
img_loc = f"./{exec_path.folder_path}/{tempDLName}"
async with aiohttp.ClientSession() as session:
async with session.get(tempDLAttr) as resp:
with open(img_loc, 'wb') as fd:
while True:
chunk = await resp.content.read(1024)
if not chunk:
break
fd.write(chunk)
if not image:
image_locations.append(img_loc)
image_names.append(f"{tempDLName.split('.')[0]}")
return img_loc