Spaces:
Runtime error
Runtime error
import argparse | |
import asyncio | |
from functools import partial | |
import contextlib | |
import json | |
import os | |
import random | |
import sys | |
import time | |
import aiohttp | |
import pkg_resources | |
import regex | |
import requests | |
from typing import Union | |
if os.environ.get("BING_URL") == None: | |
BING_URL = "https://www.bing.com" | |
else: | |
BING_URL = os.environ.get("BING_URL") | |
# Generate random IP between range 13.104.0.0/14 | |
FORWARDED_IP = ( | |
f"13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}" | |
) | |
HEADERS = { | |
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", | |
"accept-language": "en-US,en;q=0.9", | |
"cache-control": "max-age=0", | |
"content-type": "application/x-www-form-urlencoded", | |
"referrer": "https://www.bing.com/images/create/", | |
"origin": "https://www.bing.com", | |
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.63", | |
"x-forwarded-for": FORWARDED_IP, | |
} | |
# Error messages | |
error_timeout = "Your request has timed out." | |
error_redirect = "Redirect failed" | |
error_blocked_prompt = ( | |
"Your prompt has been blocked by Bing. Try to change any bad words and try again." | |
) | |
error_noresults = "Could not get results" | |
error_unsupported_lang = "\nthis language is currently not supported by bing" | |
error_bad_images = "Bad images" | |
error_no_images = "No images" | |
# | |
sending_message = "Sending request..." | |
wait_message = "Waiting for results..." | |
download_message = "\nDownloading images..." | |
def debug(debug_file, text_var): | |
"""helper function for debug""" | |
with open(f"{debug_file}", "a", encoding="utf-8") as f: | |
f.write(str(text_var)) | |
class ImageGen: | |
""" | |
Image generation by Microsoft Bing | |
Parameters:3 | |
auth_cookie: str | |
""" | |
def __init__( | |
self, | |
auth_cookie: '15iNP0L_xa8fjGOOmF9For9sfHo3dWNKCMe_7LCA8XRNqtkFx0CtlH8mSTLDTaCL7GXTYF2z_TIIJKb9C2EZa6isVYJjEK39LbaRLpMCKzb5E6zO5cNilSmlqKco6e6Hn8WIUP22j_GLYVgM1awGOejEL8lcgkN0InQjpX-STlGED3PVabcfeDgDxknaiae2L29sGJ6Mt7gZnfNfgWYuO7XFXep9HAwAzSx5cprtFbwA', | |
debug_file: Union[str, None] = None, | |
quiet: bool = False, | |
all_cookies: list[dict] = None, | |
) -> None: | |
self.session: requests.Session = requests.Session() | |
self.session.headers = HEADERS | |
self.session.cookies.set("_U", auth_cookie) | |
if all_cookies: | |
for cookie in all_cookies: | |
self.session.cookies.set(cookie["name"], cookie["value"]) | |
self.quiet = quiet | |
self.debug_file = debug_file | |
if self.debug_file: | |
self.debug = partial(debug, self.debug_file) | |
def get_images(self, prompt: str) -> list: | |
""" | |
Fetches image links from Bing | |
Parameters: | |
prompt: str | |
""" | |
if not self.quiet: | |
print(sending_message) | |
if self.debug_file: | |
self.debug(sending_message) | |
url_encoded_prompt = requests.utils.quote(prompt) | |
payload = f"q={url_encoded_prompt}&qs=ds" | |
# https://www.bing.com/images/create?q=<PROMPT>&rt=3&FORM=GENCRE | |
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" | |
response = self.session.post( | |
url, allow_redirects=False, data=payload, timeout=200 | |
) | |
# check for content waring message | |
if "this prompt has been blocked" in response.text.lower(): | |
if self.debug_file: | |
self.debug(f"ERROR: {error_blocked_prompt}") | |
raise Exception( | |
error_blocked_prompt, | |
) | |
if ( | |
"we're working hard to offer image creator in more languages" | |
in response.text.lower() | |
): | |
if self.debug_file: | |
self.debug(f"ERROR: {error_unsupported_lang}") | |
raise Exception(error_unsupported_lang) | |
if response.status_code != 302: | |
# if rt4 fails, try rt3 | |
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE" | |
response3 = self.session.post(url, allow_redirects=False, timeout=200) | |
if response3.status_code != 302: | |
if self.debug_file: | |
self.debug(f"ERROR: {error_redirect}") | |
print(f"ERROR: {response3.text}") | |
raise Exception(error_redirect) | |
response = response3 | |
# Get redirect URL | |
redirect_url = response.headers["Location"].replace("&nfy=1", "") | |
request_id = redirect_url.split("id=")[-1] | |
self.session.get(f"{BING_URL}{redirect_url}") | |
# https://www.bing.com/images/create/async/results/{ID}?q={PROMPT} | |
polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}" | |
# Poll for results | |
if self.debug_file: | |
self.debug("Polling and waiting for result") | |
if not self.quiet: | |
print("Waiting for results...") | |
start_wait = time.time() | |
while True: | |
if int(time.time() - start_wait) > 200: | |
if self.debug_file: | |
self.debug(f"ERROR: {error_timeout}") | |
raise Exception(error_timeout) | |
if not self.quiet: | |
print(".", end="", flush=True) | |
response = self.session.get(polling_url) | |
if response.status_code != 200: | |
if self.debug_file: | |
self.debug(f"ERROR: {error_noresults}") | |
raise Exception(error_noresults) | |
if not response.text or response.text.find("errorMessage") != -1: | |
time.sleep(1) | |
continue | |
else: | |
break | |
# Use regex to search for src="" | |
image_links = regex.findall(r'src="([^"]+)"', response.text) | |
# Remove size limit | |
normal_image_links = [link.split("?w=")[0] for link in image_links] | |
# Remove duplicates | |
normal_image_links = list(set(normal_image_links)) | |
# Bad images | |
bad_images = [ | |
"https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", | |
"https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", | |
] | |
for img in normal_image_links: | |
if img in bad_images: | |
raise Exception("Bad images") | |
# No images | |
if not normal_image_links: | |
raise Exception(error_no_images) | |
return normal_image_links | |
def save_images(self, links: list, output_dir: str, file_name: str = None) -> None: | |
""" | |
Saves images to output directory | |
""" | |
if self.debug_file: | |
self.debug(download_message) | |
if not self.quiet: | |
print(download_message) | |
with contextlib.suppress(FileExistsError): | |
os.mkdir(output_dir) | |
try: | |
fn = f"{file_name}_" if file_name else "" | |
jpeg_index = 0 | |
for link in links: | |
while os.path.exists( | |
os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg") | |
): | |
jpeg_index += 1 | |
with self.session.get(link, stream=True) as response: | |
# save response to file | |
response.raise_for_status() | |
with open( | |
os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg"), "wb" | |
) as output_file: | |
for chunk in response.iter_content(chunk_size=8192): | |
output_file.write(chunk) | |
except requests.exceptions.MissingSchema as url_exception: | |
raise Exception( | |
"Inappropriate contents found in the generated images. Please try again or try another prompt.", | |
) from url_exception | |
class ImageGenAsync: | |
""" | |
Image generation by Microsoft Bing | |
Parameters: | |
auth_cookie: str | |
""" | |
def __init__(self, auth_cookie: str, quiet: bool = False) -> None: | |
self.session = aiohttp.ClientSession( | |
headers=HEADERS, | |
cookies={"_U": auth_cookie}, | |
trust_env=True, | |
) | |
self.quiet = quiet | |
async def __aenter__(self): | |
return self | |
async def __aexit__(self, *excinfo) -> None: | |
await self.session.close() | |
async def get_images(self, prompt: str) -> list: | |
""" | |
Fetches image links from Bing | |
Parameters: | |
prompt: str | |
""" | |
if not self.quiet: | |
print("Sending request...") | |
url_encoded_prompt = requests.utils.quote(prompt) | |
# https://www.bing.com/images/create?q=<PROMPT>&rt=3&FORM=GENCRE | |
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" | |
payload = f"q={url_encoded_prompt}&qs=ds" | |
async with self.session.post( | |
url, allow_redirects=False, data=payload | |
) as response: | |
content = await response.text() | |
if "this prompt has been blocked" in content.lower(): | |
raise Exception( | |
"Your prompt has been blocked by Bing. Try to change any bad words and try again.", | |
) | |
if response.status != 302: | |
# if rt4 fails, try rt3 | |
url = ( | |
f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE" | |
) | |
async with self.session.post( | |
url, | |
allow_redirects=False, | |
timeout=200, | |
) as response3: | |
if response3.status != 302: | |
print(f"ERROR: {await response3.text()}") | |
raise Exception("Redirect failed") | |
response = response3 | |
# Get redirect URL | |
redirect_url = response.headers["Location"].replace("&nfy=1", "") | |
request_id = redirect_url.split("id=")[-1] | |
await self.session.get(f"{BING_URL}{redirect_url}") | |
# https://www.bing.com/images/create/async/results/{ID}?q={PROMPT} | |
polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}" | |
# Poll for results | |
if not self.quiet: | |
print("Waiting for results...") | |
while True: | |
if not self.quiet: | |
print(".", end="", flush=True) | |
# By default, timeout is 300s, change as needed | |
response = await self.session.get(polling_url) | |
if response.status != 200: | |
raise Exception("Could not get results") | |
content = await response.text() | |
if content and content.find("errorMessage") == -1: | |
break | |
await asyncio.sleep(1) | |
continue | |
# Use regex to search for src="" | |
image_links = regex.findall(r'src="([^"]+)"', content) | |
# Remove size limit | |
normal_image_links = [link.split("?w=")[0] for link in image_links] | |
# Remove duplicates | |
normal_image_links = list(set(normal_image_links)) | |
# Bad images | |
bad_images = [ | |
"https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", | |
"https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", | |
] | |
for im in normal_image_links: | |
if im in bad_images: | |
raise Exception("Bad images") | |
# No images | |
if not normal_image_links: | |
raise Exception("No images") | |
return normal_image_links | |
async def save_images(self, links: list, output_dir: str) -> None: | |
""" | |
Saves images to output directory | |
""" | |
if not self.quiet: | |
print("\nDownloading images...") | |
with contextlib.suppress(FileExistsError): | |
os.mkdir(output_dir) | |
try: | |
jpeg_index = 0 | |
for link in links: | |
while os.path.exists(os.path.join(output_dir, f"{jpeg_index}.jpeg")): | |
jpeg_index += 1 | |
async with self.session.get(link, raise_for_status=True) as response: | |
# save response to file | |
with open( | |
os.path.join(output_dir, f"{jpeg_index}.jpeg"), "wb" | |
) as output_file: | |
async for chunk in response.content.iter_chunked(8192): | |
output_file.write(chunk) | |
except aiohttp.client_exceptions.InvalidURL as url_exception: | |
raise Exception( | |
"Inappropriate contents found in the generated images. Please try again or try another prompt.", | |
) from url_exception | |
async def async_image_gen(args) -> None: | |
async with ImageGenAsync(args.U, args.quiet) as image_generator: | |
images = await image_generator.get_images(args.prompt) | |
await image_generator.save_images(images, output_dir=args.output_dir) | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-U", help="Auth cookie from browser", type=str) | |
parser.add_argument("--cookie-file", help="File containing auth cookie", type=str) | |
parser.add_argument( | |
"--prompt", | |
help="Prompt to generate images for", | |
type=str, | |
required=True, | |
) | |
parser.add_argument( | |
"--output-dir", | |
help="Output directory", | |
type=str, | |
default="./output", | |
) | |
parser.add_argument( | |
"--debug-file", | |
help="Path to the file where debug information will be written.", | |
type=str, | |
) | |
parser.add_argument( | |
"--quiet", | |
help="Disable pipeline messages", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--asyncio", | |
help="Run ImageGen using asyncio", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--version", | |
action="store_true", | |
help="Print the version number", | |
) | |
args = parser.parse_args() | |
if args.version: | |
print(pkg_resources.get_distribution("BingImageCreator").version) | |
sys.exit() | |
# Load auth cookie | |
cookie_json = None | |
if args.cookie_file is not None: | |
with contextlib.suppress(Exception): | |
with open(args.cookie_file, encoding="utf-8") as file: | |
cookie_json = json.load(file) | |
if args.U is None and args.cookie_file is None: | |
raise Exception("Could not find auth cookie") | |
if not args.asyncio: | |
# Create image generator | |
image_generator = ImageGen( | |
args.U, args.debug_file, args.quiet, all_cookies=cookie_json | |
) | |
image_generator.save_images( | |
image_generator.get_images(args.prompt), | |
output_dir=args.output_dir, | |
) | |
else: | |
asyncio.run(async_image_gen(args)) | |
if __name__ == "__main__": | |
main() |