|
import argparse |
|
import asyncio |
|
from functools import partial |
|
import contextlib |
|
import json |
|
import os |
|
import random |
|
import sys |
|
import time |
|
import aiohttp |
|
import pkg_resources |
|
import regex |
|
import requests |
|
from typing import Union |
|
|
|
if os.environ.get("BING_URL") == None: |
|
BING_URL = "https://www.bing.com" |
|
else: |
|
BING_URL = os.environ.get("BING_URL") |
|
|
|
FORWARDED_IP = ( |
|
f"13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}" |
|
) |
|
HEADERS = { |
|
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", |
|
"accept-language": "en-US,en;q=0.9", |
|
"cache-control": "max-age=0", |
|
"content-type": "application/x-www-form-urlencoded", |
|
"referrer": "https://www.bing.com/images/create/", |
|
"origin": "https://www.bing.com", |
|
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.63", |
|
"x-forwarded-for": FORWARDED_IP, |
|
} |
|
|
|
|
|
error_timeout = "Your request has timed out." |
|
error_redirect = "Redirect failed" |
|
error_blocked_prompt = ( |
|
"Your prompt has been blocked by Bing. Try to change any bad words and try again." |
|
) |
|
error_noresults = "Could not get results" |
|
error_unsupported_lang = "\nthis language is currently not supported by bing" |
|
error_bad_images = "Bad images" |
|
error_no_images = "No images" |
|
|
|
sending_message = "Sending request..." |
|
wait_message = "Waiting for results..." |
|
download_message = "\nDownloading images..." |
|
|
|
|
|
def debug(debug_file, text_var): |
|
"""helper function for debug""" |
|
with open(f"{debug_file}", "a", encoding="utf-8") as f: |
|
f.write(str(text_var)) |
|
|
|
|
|
class ImageGen: |
|
""" |
|
Image generation by Microsoft Bing |
|
Parameters:3 |
|
auth_cookie: str |
|
""" |
|
|
|
def __init__( |
|
self, |
|
auth_cookie: '15iNP0L_xa8fjGOOmF9For9sfHo3dWNKCMe_7LCA8XRNqtkFx0CtlH8mSTLDTaCL7GXTYF2z_TIIJKb9C2EZa6isVYJjEK39LbaRLpMCKzb5E6zO5cNilSmlqKco6e6Hn8WIUP22j_GLYVgM1awGOejEL8lcgkN0InQjpX-STlGED3PVabcfeDgDxknaiae2L29sGJ6Mt7gZnfNfgWYuO7XFXep9HAwAzSx5cprtFbwA', |
|
debug_file: Union[str, None] = None, |
|
quiet: bool = False, |
|
all_cookies: list[dict] = None, |
|
) -> None: |
|
self.session: requests.Session = requests.Session() |
|
self.session.headers = HEADERS |
|
self.session.cookies.set("_U", auth_cookie) |
|
if all_cookies: |
|
for cookie in all_cookies: |
|
self.session.cookies.set(cookie["name"], cookie["value"]) |
|
self.quiet = quiet |
|
self.debug_file = debug_file |
|
if self.debug_file: |
|
self.debug = partial(debug, self.debug_file) |
|
|
|
def get_images(self, prompt: str) -> list: |
|
""" |
|
Fetches image links from Bing |
|
Parameters: |
|
prompt: str |
|
""" |
|
if not self.quiet: |
|
print(sending_message) |
|
if self.debug_file: |
|
self.debug(sending_message) |
|
url_encoded_prompt = requests.utils.quote(prompt) |
|
payload = f"q={url_encoded_prompt}&qs=ds" |
|
|
|
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" |
|
response = self.session.post( |
|
url, allow_redirects=False, data=payload, timeout=200 |
|
) |
|
|
|
if "this prompt has been blocked" in response.text.lower(): |
|
if self.debug_file: |
|
self.debug(f"ERROR: {error_blocked_prompt}") |
|
raise Exception( |
|
error_blocked_prompt, |
|
) |
|
if ( |
|
"we're working hard to offer image creator in more languages" |
|
in response.text.lower() |
|
): |
|
if self.debug_file: |
|
self.debug(f"ERROR: {error_unsupported_lang}") |
|
raise Exception(error_unsupported_lang) |
|
if response.status_code != 302: |
|
|
|
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE" |
|
response3 = self.session.post(url, allow_redirects=False, timeout=200) |
|
if response3.status_code != 302: |
|
if self.debug_file: |
|
self.debug(f"ERROR: {error_redirect}") |
|
print(f"ERROR: {response3.text}") |
|
raise Exception(error_redirect) |
|
response = response3 |
|
|
|
redirect_url = response.headers["Location"].replace("&nfy=1", "") |
|
request_id = redirect_url.split("id=")[-1] |
|
self.session.get(f"{BING_URL}{redirect_url}") |
|
|
|
polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}" |
|
|
|
if self.debug_file: |
|
self.debug("Polling and waiting for result") |
|
if not self.quiet: |
|
print("Waiting for results...") |
|
start_wait = time.time() |
|
while True: |
|
if int(time.time() - start_wait) > 200: |
|
if self.debug_file: |
|
self.debug(f"ERROR: {error_timeout}") |
|
raise Exception(error_timeout) |
|
if not self.quiet: |
|
print(".", end="", flush=True) |
|
response = self.session.get(polling_url) |
|
if response.status_code != 200: |
|
if self.debug_file: |
|
self.debug(f"ERROR: {error_noresults}") |
|
raise Exception(error_noresults) |
|
if not response.text or response.text.find("errorMessage") != -1: |
|
time.sleep(1) |
|
continue |
|
else: |
|
break |
|
|
|
image_links = regex.findall(r'src="([^"]+)"', response.text) |
|
|
|
normal_image_links = [link.split("?w=")[0] for link in image_links] |
|
|
|
normal_image_links = list(set(normal_image_links)) |
|
|
|
|
|
bad_images = [ |
|
"https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", |
|
"https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", |
|
] |
|
for img in normal_image_links: |
|
if img in bad_images: |
|
raise Exception("Bad images") |
|
|
|
if not normal_image_links: |
|
raise Exception(error_no_images) |
|
return normal_image_links |
|
|
|
def save_images(self, links: list, output_dir: str, file_name: str = None) -> None: |
|
""" |
|
Saves images to output directory |
|
""" |
|
if self.debug_file: |
|
self.debug(download_message) |
|
if not self.quiet: |
|
print(download_message) |
|
with contextlib.suppress(FileExistsError): |
|
os.mkdir(output_dir) |
|
try: |
|
fn = f"{file_name}_" if file_name else "" |
|
jpeg_index = 0 |
|
for link in links: |
|
while os.path.exists( |
|
os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg") |
|
): |
|
jpeg_index += 1 |
|
with self.session.get(link, stream=True) as response: |
|
|
|
response.raise_for_status() |
|
with open( |
|
os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg"), "wb" |
|
) as output_file: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
output_file.write(chunk) |
|
except requests.exceptions.MissingSchema as url_exception: |
|
raise Exception( |
|
"Inappropriate contents found in the generated images. Please try again or try another prompt.", |
|
) from url_exception |
|
|
|
|
|
class ImageGenAsync: |
|
""" |
|
Image generation by Microsoft Bing |
|
Parameters: |
|
auth_cookie: str |
|
""" |
|
|
|
def __init__(self, auth_cookie: str, quiet: bool = False) -> None: |
|
self.session = aiohttp.ClientSession( |
|
headers=HEADERS, |
|
cookies={"_U": auth_cookie}, |
|
trust_env=True, |
|
) |
|
self.quiet = quiet |
|
|
|
async def __aenter__(self): |
|
return self |
|
|
|
async def __aexit__(self, *excinfo) -> None: |
|
await self.session.close() |
|
|
|
async def get_images(self, prompt: str) -> list: |
|
""" |
|
Fetches image links from Bing |
|
Parameters: |
|
prompt: str |
|
""" |
|
if not self.quiet: |
|
print("Sending request...") |
|
url_encoded_prompt = requests.utils.quote(prompt) |
|
|
|
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" |
|
payload = f"q={url_encoded_prompt}&qs=ds" |
|
async with self.session.post( |
|
url, allow_redirects=False, data=payload |
|
) as response: |
|
content = await response.text() |
|
if "this prompt has been blocked" in content.lower(): |
|
raise Exception( |
|
"Your prompt has been blocked by Bing. Try to change any bad words and try again.", |
|
) |
|
if response.status != 302: |
|
|
|
url = ( |
|
f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE" |
|
) |
|
async with self.session.post( |
|
url, |
|
allow_redirects=False, |
|
timeout=200, |
|
) as response3: |
|
if response3.status != 302: |
|
print(f"ERROR: {await response3.text()}") |
|
raise Exception("Redirect failed") |
|
response = response3 |
|
|
|
redirect_url = response.headers["Location"].replace("&nfy=1", "") |
|
request_id = redirect_url.split("id=")[-1] |
|
await self.session.get(f"{BING_URL}{redirect_url}") |
|
|
|
polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}" |
|
|
|
if not self.quiet: |
|
print("Waiting for results...") |
|
while True: |
|
if not self.quiet: |
|
print(".", end="", flush=True) |
|
|
|
response = await self.session.get(polling_url) |
|
if response.status != 200: |
|
raise Exception("Could not get results") |
|
content = await response.text() |
|
if content and content.find("errorMessage") == -1: |
|
break |
|
|
|
await asyncio.sleep(1) |
|
continue |
|
|
|
image_links = regex.findall(r'src="([^"]+)"', content) |
|
|
|
normal_image_links = [link.split("?w=")[0] for link in image_links] |
|
|
|
normal_image_links = list(set(normal_image_links)) |
|
|
|
|
|
bad_images = [ |
|
"https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", |
|
"https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", |
|
] |
|
for im in normal_image_links: |
|
if im in bad_images: |
|
raise Exception("Bad images") |
|
|
|
if not normal_image_links: |
|
raise Exception("No images") |
|
return normal_image_links |
|
|
|
async def save_images(self, links: list, output_dir: str) -> None: |
|
""" |
|
Saves images to output directory |
|
""" |
|
if not self.quiet: |
|
print("\nDownloading images...") |
|
with contextlib.suppress(FileExistsError): |
|
os.mkdir(output_dir) |
|
try: |
|
jpeg_index = 0 |
|
for link in links: |
|
while os.path.exists(os.path.join(output_dir, f"{jpeg_index}.jpeg")): |
|
jpeg_index += 1 |
|
async with self.session.get(link, raise_for_status=True) as response: |
|
|
|
with open( |
|
os.path.join(output_dir, f"{jpeg_index}.jpeg"), "wb" |
|
) as output_file: |
|
async for chunk in response.content.iter_chunked(8192): |
|
output_file.write(chunk) |
|
except aiohttp.client_exceptions.InvalidURL as url_exception: |
|
raise Exception( |
|
"Inappropriate contents found in the generated images. Please try again or try another prompt.", |
|
) from url_exception |
|
|
|
|
|
async def async_image_gen(args) -> None: |
|
async with ImageGenAsync(args.U, args.quiet) as image_generator: |
|
images = await image_generator.get_images(args.prompt) |
|
await image_generator.save_images(images, output_dir=args.output_dir) |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("-U", help="Auth cookie from browser", type=str) |
|
parser.add_argument("--cookie-file", help="File containing auth cookie", type=str) |
|
parser.add_argument( |
|
"--prompt", |
|
help="Prompt to generate images for", |
|
type=str, |
|
required=True, |
|
) |
|
|
|
parser.add_argument( |
|
"--output-dir", |
|
help="Output directory", |
|
type=str, |
|
default="./output", |
|
) |
|
|
|
parser.add_argument( |
|
"--debug-file", |
|
help="Path to the file where debug information will be written.", |
|
type=str, |
|
) |
|
|
|
parser.add_argument( |
|
"--quiet", |
|
help="Disable pipeline messages", |
|
action="store_true", |
|
) |
|
parser.add_argument( |
|
"--asyncio", |
|
help="Run ImageGen using asyncio", |
|
action="store_true", |
|
) |
|
parser.add_argument( |
|
"--version", |
|
action="store_true", |
|
help="Print the version number", |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
if args.version: |
|
print(pkg_resources.get_distribution("BingImageCreator").version) |
|
sys.exit() |
|
|
|
|
|
cookie_json = None |
|
if args.cookie_file is not None: |
|
with contextlib.suppress(Exception): |
|
with open(args.cookie_file, encoding="utf-8") as file: |
|
cookie_json = json.load(file) |
|
|
|
if args.U is None and args.cookie_file is None: |
|
raise Exception("Could not find auth cookie") |
|
|
|
if not args.asyncio: |
|
|
|
image_generator = ImageGen( |
|
args.U, args.debug_file, args.quiet, all_cookies=cookie_json |
|
) |
|
image_generator.save_images( |
|
image_generator.get_images(args.prompt), |
|
output_dir=args.output_dir, |
|
) |
|
else: |
|
asyncio.run(async_image_gen(args)) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |