ka1kuk commited on
Commit
78f3a37
1 Parent(s): 72d4018

Create BingImageCreator.py

Browse files
Files changed (1) hide show
  1. BingImageCreator.py +403 -0
BingImageCreator.py ADDED
@@ -0,0 +1,403 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+ import asyncio
3
+ from functools import partial
4
+ import contextlib
5
+ import json
6
+ import os
7
+ import random
8
+ import sys
9
+ import time
10
+ import aiohttp
11
+ import pkg_resources
12
+ import regex
13
+ import requests
14
+ from typing import Union
15
+
16
+ if os.environ.get("BING_URL") == None:
17
+ BING_URL = "https://www.bing.com"
18
+ else:
19
+ BING_URL = os.environ.get("BING_URL")
20
+ # Generate random IP between range 13.104.0.0/14
21
+ FORWARDED_IP = (
22
+ f"13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
23
+ )
24
+ HEADERS = {
25
+ "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
26
+ "accept-language": "en-US,en;q=0.9",
27
+ "cache-control": "max-age=0",
28
+ "content-type": "application/x-www-form-urlencoded",
29
+ "referrer": "https://www.bing.com/images/create/",
30
+ "origin": "https://www.bing.com",
31
+ "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.63",
32
+ "x-forwarded-for": FORWARDED_IP,
33
+ }
34
+
35
+ # Error messages
36
+ error_timeout = "Your request has timed out."
37
+ error_redirect = "Redirect failed"
38
+ error_blocked_prompt = (
39
+ "Your prompt has been blocked by Bing. Try to change any bad words and try again."
40
+ )
41
+ error_noresults = "Could not get results"
42
+ error_unsupported_lang = "\nthis language is currently not supported by bing"
43
+ error_bad_images = "Bad images"
44
+ error_no_images = "No images"
45
+ #
46
+ sending_message = "Sending request..."
47
+ wait_message = "Waiting for results..."
48
+ download_message = "\nDownloading images..."
49
+
50
+
51
+ def debug(debug_file, text_var):
52
+ """helper function for debug"""
53
+ with open(f"{debug_file}", "a", encoding="utf-8") as f:
54
+ f.write(str(text_var))
55
+
56
+
57
+ class ImageGen:
58
+ """
59
+ Image generation by Microsoft Bing
60
+ Parameters:3
61
+ auth_cookie: str
62
+ """
63
+
64
+ def __init__(
65
+ self,
66
+ auth_cookie: '15iNP0L_xa8fjGOOmF9For9sfHo3dWNKCMe_7LCA8XRNqtkFx0CtlH8mSTLDTaCL7GXTYF2z_TIIJKb9C2EZa6isVYJjEK39LbaRLpMCKzb5E6zO5cNilSmlqKco6e6Hn8WIUP22j_GLYVgM1awGOejEL8lcgkN0InQjpX-STlGED3PVabcfeDgDxknaiae2L29sGJ6Mt7gZnfNfgWYuO7XFXep9HAwAzSx5cprtFbwA',
67
+ debug_file: Union[str, None] = None,
68
+ quiet: bool = False,
69
+ all_cookies: list[dict] = None,
70
+ ) -> None:
71
+ self.session: requests.Session = requests.Session()
72
+ self.session.headers = HEADERS
73
+ self.session.cookies.set("_U", auth_cookie)
74
+ if all_cookies:
75
+ for cookie in all_cookies:
76
+ self.session.cookies.set(cookie["name"], cookie["value"])
77
+ self.quiet = quiet
78
+ self.debug_file = debug_file
79
+ if self.debug_file:
80
+ self.debug = partial(debug, self.debug_file)
81
+
82
+ def get_images(self, prompt: str) -> list:
83
+ """
84
+ Fetches image links from Bing
85
+ Parameters:
86
+ prompt: str
87
+ """
88
+ if not self.quiet:
89
+ print(sending_message)
90
+ if self.debug_file:
91
+ self.debug(sending_message)
92
+ url_encoded_prompt = requests.utils.quote(prompt)
93
+ payload = f"q={url_encoded_prompt}&qs=ds"
94
+ # https://www.bing.com/images/create?q=<PROMPT>&rt=3&FORM=GENCRE
95
+ url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
96
+ response = self.session.post(
97
+ url, allow_redirects=False, data=payload, timeout=200
98
+ )
99
+ # check for content waring message
100
+ if "this prompt has been blocked" in response.text.lower():
101
+ if self.debug_file:
102
+ self.debug(f"ERROR: {error_blocked_prompt}")
103
+ raise Exception(
104
+ error_blocked_prompt,
105
+ )
106
+ if (
107
+ "we're working hard to offer image creator in more languages"
108
+ in response.text.lower()
109
+ ):
110
+ if self.debug_file:
111
+ self.debug(f"ERROR: {error_unsupported_lang}")
112
+ raise Exception(error_unsupported_lang)
113
+ if response.status_code != 302:
114
+ # if rt4 fails, try rt3
115
+ url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
116
+ response3 = self.session.post(url, allow_redirects=False, timeout=200)
117
+ if response3.status_code != 302:
118
+ if self.debug_file:
119
+ self.debug(f"ERROR: {error_redirect}")
120
+ print(f"ERROR: {response3.text}")
121
+ raise Exception(error_redirect)
122
+ response = response3
123
+ # Get redirect URL
124
+ redirect_url = response.headers["Location"].replace("&nfy=1", "")
125
+ request_id = redirect_url.split("id=")[-1]
126
+ self.session.get(f"{BING_URL}{redirect_url}")
127
+ # https://www.bing.com/images/create/async/results/{ID}?q={PROMPT}
128
+ polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
129
+ # Poll for results
130
+ if self.debug_file:
131
+ self.debug("Polling and waiting for result")
132
+ if not self.quiet:
133
+ print("Waiting for results...")
134
+ start_wait = time.time()
135
+ while True:
136
+ if int(time.time() - start_wait) > 200:
137
+ if self.debug_file:
138
+ self.debug(f"ERROR: {error_timeout}")
139
+ raise Exception(error_timeout)
140
+ if not self.quiet:
141
+ print(".", end="", flush=True)
142
+ response = self.session.get(polling_url)
143
+ if response.status_code != 200:
144
+ if self.debug_file:
145
+ self.debug(f"ERROR: {error_noresults}")
146
+ raise Exception(error_noresults)
147
+ if not response.text or response.text.find("errorMessage") != -1:
148
+ time.sleep(1)
149
+ continue
150
+ else:
151
+ break
152
+ # Use regex to search for src=""
153
+ image_links = regex.findall(r'src="([^"]+)"', response.text)
154
+ # Remove size limit
155
+ normal_image_links = [link.split("?w=")[0] for link in image_links]
156
+ # Remove duplicates
157
+ normal_image_links = list(set(normal_image_links))
158
+
159
+ # Bad images
160
+ bad_images = [
161
+ "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
162
+ "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
163
+ ]
164
+ for img in normal_image_links:
165
+ if img in bad_images:
166
+ raise Exception("Bad images")
167
+ # No images
168
+ if not normal_image_links:
169
+ raise Exception(error_no_images)
170
+ return normal_image_links
171
+
172
+ def save_images(self, links: list, output_dir: str, file_name: str = None) -> None:
173
+ """
174
+ Saves images to output directory
175
+ """
176
+ if self.debug_file:
177
+ self.debug(download_message)
178
+ if not self.quiet:
179
+ print(download_message)
180
+ with contextlib.suppress(FileExistsError):
181
+ os.mkdir(output_dir)
182
+ try:
183
+ fn = f"{file_name}_" if file_name else ""
184
+ jpeg_index = 0
185
+ for link in links:
186
+ while os.path.exists(
187
+ os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg")
188
+ ):
189
+ jpeg_index += 1
190
+ with self.session.get(link, stream=True) as response:
191
+ # save response to file
192
+ response.raise_for_status()
193
+ with open(
194
+ os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg"), "wb"
195
+ ) as output_file:
196
+ for chunk in response.iter_content(chunk_size=8192):
197
+ output_file.write(chunk)
198
+ except requests.exceptions.MissingSchema as url_exception:
199
+ raise Exception(
200
+ "Inappropriate contents found in the generated images. Please try again or try another prompt.",
201
+ ) from url_exception
202
+
203
+
204
+ class ImageGenAsync:
205
+ """
206
+ Image generation by Microsoft Bing
207
+ Parameters:
208
+ auth_cookie: str
209
+ """
210
+
211
+ def __init__(self, auth_cookie: str, quiet: bool = False) -> None:
212
+ self.session = aiohttp.ClientSession(
213
+ headers=HEADERS,
214
+ cookies={"_U": auth_cookie},
215
+ trust_env=True,
216
+ )
217
+ self.quiet = quiet
218
+
219
+ async def __aenter__(self):
220
+ return self
221
+
222
+ async def __aexit__(self, *excinfo) -> None:
223
+ await self.session.close()
224
+
225
+ async def get_images(self, prompt: str) -> list:
226
+ """
227
+ Fetches image links from Bing
228
+ Parameters:
229
+ prompt: str
230
+ """
231
+ if not self.quiet:
232
+ print("Sending request...")
233
+ url_encoded_prompt = requests.utils.quote(prompt)
234
+ # https://www.bing.com/images/create?q=<PROMPT>&rt=3&FORM=GENCRE
235
+ url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
236
+ payload = f"q={url_encoded_prompt}&qs=ds"
237
+ async with self.session.post(
238
+ url, allow_redirects=False, data=payload
239
+ ) as response:
240
+ content = await response.text()
241
+ if "this prompt has been blocked" in content.lower():
242
+ raise Exception(
243
+ "Your prompt has been blocked by Bing. Try to change any bad words and try again.",
244
+ )
245
+ if response.status != 302:
246
+ # if rt4 fails, try rt3
247
+ url = (
248
+ f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
249
+ )
250
+ async with self.session.post(
251
+ url,
252
+ allow_redirects=False,
253
+ timeout=200,
254
+ ) as response3:
255
+ if response3.status != 302:
256
+ print(f"ERROR: {await response3.text()}")
257
+ raise Exception("Redirect failed")
258
+ response = response3
259
+ # Get redirect URL
260
+ redirect_url = response.headers["Location"].replace("&nfy=1", "")
261
+ request_id = redirect_url.split("id=")[-1]
262
+ await self.session.get(f"{BING_URL}{redirect_url}")
263
+ # https://www.bing.com/images/create/async/results/{ID}?q={PROMPT}
264
+ polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
265
+ # Poll for results
266
+ if not self.quiet:
267
+ print("Waiting for results...")
268
+ while True:
269
+ if not self.quiet:
270
+ print(".", end="", flush=True)
271
+ # By default, timeout is 300s, change as needed
272
+ response = await self.session.get(polling_url)
273
+ if response.status != 200:
274
+ raise Exception("Could not get results")
275
+ content = await response.text()
276
+ if content and content.find("errorMessage") == -1:
277
+ break
278
+
279
+ await asyncio.sleep(1)
280
+ continue
281
+ # Use regex to search for src=""
282
+ image_links = regex.findall(r'src="([^"]+)"', content)
283
+ # Remove size limit
284
+ normal_image_links = [link.split("?w=")[0] for link in image_links]
285
+ # Remove duplicates
286
+ normal_image_links = list(set(normal_image_links))
287
+
288
+ # Bad images
289
+ bad_images = [
290
+ "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
291
+ "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
292
+ ]
293
+ for im in normal_image_links:
294
+ if im in bad_images:
295
+ raise Exception("Bad images")
296
+ # No images
297
+ if not normal_image_links:
298
+ raise Exception("No images")
299
+ return normal_image_links
300
+
301
+ async def save_images(self, links: list, output_dir: str) -> None:
302
+ """
303
+ Saves images to output directory
304
+ """
305
+ if not self.quiet:
306
+ print("\nDownloading images...")
307
+ with contextlib.suppress(FileExistsError):
308
+ os.mkdir(output_dir)
309
+ try:
310
+ jpeg_index = 0
311
+ for link in links:
312
+ while os.path.exists(os.path.join(output_dir, f"{jpeg_index}.jpeg")):
313
+ jpeg_index += 1
314
+ async with self.session.get(link, raise_for_status=True) as response:
315
+ # save response to file
316
+ with open(
317
+ os.path.join(output_dir, f"{jpeg_index}.jpeg"), "wb"
318
+ ) as output_file:
319
+ async for chunk in response.content.iter_chunked(8192):
320
+ output_file.write(chunk)
321
+ except aiohttp.client_exceptions.InvalidURL as url_exception:
322
+ raise Exception(
323
+ "Inappropriate contents found in the generated images. Please try again or try another prompt.",
324
+ ) from url_exception
325
+
326
+
327
+ async def async_image_gen(args) -> None:
328
+ async with ImageGenAsync(args.U, args.quiet) as image_generator:
329
+ images = await image_generator.get_images(args.prompt)
330
+ await image_generator.save_images(images, output_dir=args.output_dir)
331
+
332
+
333
+ def main():
334
+ parser = argparse.ArgumentParser()
335
+ parser.add_argument("-U", help="Auth cookie from browser", type=str)
336
+ parser.add_argument("--cookie-file", help="File containing auth cookie", type=str)
337
+ parser.add_argument(
338
+ "--prompt",
339
+ help="Prompt to generate images for",
340
+ type=str,
341
+ required=True,
342
+ )
343
+
344
+ parser.add_argument(
345
+ "--output-dir",
346
+ help="Output directory",
347
+ type=str,
348
+ default="./output",
349
+ )
350
+
351
+ parser.add_argument(
352
+ "--debug-file",
353
+ help="Path to the file where debug information will be written.",
354
+ type=str,
355
+ )
356
+
357
+ parser.add_argument(
358
+ "--quiet",
359
+ help="Disable pipeline messages",
360
+ action="store_true",
361
+ )
362
+ parser.add_argument(
363
+ "--asyncio",
364
+ help="Run ImageGen using asyncio",
365
+ action="store_true",
366
+ )
367
+ parser.add_argument(
368
+ "--version",
369
+ action="store_true",
370
+ help="Print the version number",
371
+ )
372
+
373
+ args = parser.parse_args()
374
+
375
+ if args.version:
376
+ print(pkg_resources.get_distribution("BingImageCreator").version)
377
+ sys.exit()
378
+
379
+ # Load auth cookie
380
+ cookie_json = None
381
+ if args.cookie_file is not None:
382
+ with contextlib.suppress(Exception):
383
+ with open(args.cookie_file, encoding="utf-8") as file:
384
+ cookie_json = json.load(file)
385
+
386
+ if args.U is None and args.cookie_file is None:
387
+ raise Exception("Could not find auth cookie")
388
+
389
+ if not args.asyncio:
390
+ # Create image generator
391
+ image_generator = ImageGen(
392
+ args.U, args.debug_file, args.quiet, all_cookies=cookie_json
393
+ )
394
+ image_generator.save_images(
395
+ image_generator.get_images(args.prompt),
396
+ output_dir=args.output_dir,
397
+ )
398
+ else:
399
+ asyncio.run(async_image_gen(args))
400
+
401
+
402
+ if __name__ == "__main__":
403
+ main()