File size: 22,038 Bytes
a4b70d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 |
from __future__ import annotations
import os
import json
import random
import re
import base64
import asyncio
import time
from urllib.parse import quote_plus, unquote_plus
from pathlib import Path
from aiohttp import ClientSession, BaseConnector
try:
import nodriver
has_nodriver = True
except ImportError:
has_nodriver = False
from ... import debug
from ...typing import Messages, Cookies, MediaListType, AsyncResult, AsyncIterator
from ...providers.response import JsonConversation, Reasoning, RequestLogin, ImageResponse, YouTubeResponse, AudioResponse, TitleGeneration
from ...requests.raise_for_status import raise_for_status
from ...requests.aiohttp import get_connector
from ...requests import get_nodriver
from ...image.copy_images import get_filename, get_media_dir, ensure_media_dir
from ...errors import MissingAuthError, ModelNotFoundError
from ...image import to_bytes
from ...cookies import get_cookies_dir
from ...tools.media import merge_media
from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin
from ..helper import format_prompt, get_cookies, get_last_user_message, format_media_prompt
from ... import debug
REQUEST_HEADERS = {
"authority": "gemini.google.com",
"origin": "https://gemini.google.com",
"referer": "https://gemini.google.com/",
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
'x-same-domain': '1',
}
REQUEST_BL_PARAM = "boq_assistant-bard-web-server_20240519.16_p0"
REQUEST_URL = "https://gemini.google.com/_/BardChatUi/data/assistant.lamda.BardFrontendService/StreamGenerate"
UPLOAD_IMAGE_URL = "https://content-push.googleapis.com/upload/"
UPLOAD_IMAGE_HEADERS = {
"authority": "content-push.googleapis.com",
"accept": "*/*",
"accept-language": "en-US,en;q=0.7",
"authorization": "Basic c2F2ZXM6cyNMdGhlNmxzd2F2b0RsN3J1d1U=",
"content-type": "application/x-www-form-urlencoded;charset=UTF-8",
"origin": "https://gemini.google.com",
"push-id": "feeds/mcudyrk2a4khkz",
"referer": "https://gemini.google.com/",
"x-goog-upload-command": "start",
"x-goog-upload-header-content-length": "",
"x-goog-upload-protocol": "resumable",
"x-tenant-id": "bard-storage",
}
GOOGLE_COOKIE_DOMAIN = ".google.com"
ROTATE_COOKIES_URL = "https://accounts.google.com/RotateCookies"
GOOGLE_SID_COOKIE = "__Secure-1PSID"
models = {
"gemini-2.5-pro-exp": {"x-goog-ext-525001261-jspb": '[1,null,null,null,"2525e3954d185b3c"]'},
"gemini-2.5-flash": {"x-goog-ext-525001261-jspb": '[1,null,null,null,"35609594dbe934d8"]'},
"gemini-2.0-flash-thinking-exp": {"x-goog-ext-525001261-jspb": '[1,null,null,null,"7ca48d02d802f20a"]'},
"gemini-deep-research": {"x-goog-ext-525001261-jspb": '[1,null,null,null,"cd472a54d2abba7e"]'},
"gemini-2.0-flash": {"x-goog-ext-525001261-jspb": '[null,null,null,null,"f299729663a2343f"]'},
"gemini-2.0-flash-exp": {"x-goog-ext-525001261-jspb": '[null,null,null,null,"f299729663a2343f"]'},
"gemini-2.0-flash-thinking": {"x-goog-ext-525001261-jspb": '[null,null,null,null,"9c17b1863f581b8a"]'},
"gemini-2.0-flash-thinking-with-apps": {"x-goog-ext-525001261-jspb": '[null,null,null,null,"f8f8f5ea629f5d37"]'},
# Currently used models
"gemini-2.5-pro": {"x-goog-ext-525001261-jspb": '[1,null,null,null,"61530e79959ab139",null,null,null,[4]]'},
"gemini-2.5-flash": {"x-goog-ext-525001261-jspb": '[1,null,null,null,"9ec249fc9ad08861",null,null,null,[4]]'},
"gemini-audio": {}
}
class Gemini(AsyncGeneratorProvider, ProviderModelMixin):
label = "Google Gemini"
url = "https://gemini.google.com"
needs_auth = True
working = True
use_nodriver = True
default_model = ""
default_image_model = default_model
default_vision_model = default_model
image_models = [default_image_model]
models = [
default_model, "gemini-2.5-flash", "gemini-2.5-pro"
]
synthesize_content_type = "audio/vnd.wav"
_cookies: Cookies = None
_snlm0e: str = None
_sid: str = None
auto_refresh = True
refresh_interval = 540
rotate_tasks = {}
@classmethod
async def nodriver_login(cls, proxy: str = None) -> AsyncIterator[str]:
if not has_nodriver:
debug.log("Skip nodriver login in Gemini provider")
return
browser, stop_browser = await get_nodriver(proxy=proxy, user_data_dir="gemini")
try:
yield RequestLogin(cls.label, os.environ.get("G4F_LOGIN_URL", ""))
page = await browser.get(f"{cls.url}/app")
await page.select("div.ql-editor.textarea", 240)
cookies = {}
for c in await page.send(nodriver.cdp.network.get_cookies([cls.url])):
cookies[c.name] = c.value
await page.close()
cls._cookies = cookies
finally:
stop_browser()
@classmethod
async def start_auto_refresh(cls, proxy: str = None) -> None:
"""
Start the background task to automatically refresh cookies.
"""
while True:
new_1psidts = None
try:
new_1psidts = await rotate_1psidts(cls.url, cls._cookies, proxy)
except Exception as e:
debug.error(f"Failed to refresh cookies: {e}")
task = cls.rotate_tasks.get(cls._cookies[GOOGLE_SID_COOKIE])
if task:
task.cancel()
debug.error(
"Failed to refresh cookies. Background auto refresh task canceled."
)
debug.log(f"Gemini: Cookies refreshed. New __Secure-1PSIDTS: {new_1psidts}")
if new_1psidts:
cls._cookies["__Secure-1PSIDTS"] = new_1psidts
await asyncio.sleep(cls.refresh_interval)
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
proxy: str = None,
cookies: Cookies = None,
connector: BaseConnector = None,
media: MediaListType = None,
return_conversation: bool = True,
conversation: Conversation = None,
language: str = "en",
prompt: str = None,
audio: dict = None,
**kwargs
) -> AsyncResult:
if model in cls.model_aliases:
model = cls.model_aliases[model]
if audio is not None or model == "gemini-audio":
prompt = format_media_prompt(messages, prompt)
filename = get_filename(["gemini"], prompt, ".ogx", prompt)
ensure_media_dir()
path = os.path.join(get_media_dir(), filename)
with open(path, "wb") as f:
async for chunk in cls.synthesize({"text": prompt}, proxy):
f.write(chunk)
yield AudioResponse(f"/media/{filename}", text=prompt)
return
cls._cookies = cookies or cls._cookies or get_cookies(GOOGLE_COOKIE_DOMAIN, False, True)
if conversation is not None and getattr(conversation, "model", None) != model:
conversation = None
prompt = format_prompt(messages) if conversation is None else get_last_user_message(messages)
base_connector = get_connector(connector, proxy)
async with ClientSession(
headers=REQUEST_HEADERS,
connector=base_connector
) as session:
if not cls._snlm0e:
await cls.fetch_snlm0e(session, cls._cookies) if cls._cookies else None
if not cls._snlm0e:
try:
async for chunk in cls.nodriver_login(proxy):
yield chunk
except Exception as e:
raise MissingAuthError('Missing or invalid "__Secure-1PSID" cookie', e)
if not cls._snlm0e:
if cls._cookies is None or "__Secure-1PSID" not in cls._cookies:
raise MissingAuthError('Missing "__Secure-1PSID" cookie')
await cls.fetch_snlm0e(session, cls._cookies)
if not cls._snlm0e:
raise RuntimeError("Invalid cookies. SNlM0e not found")
if GOOGLE_SID_COOKIE in cls._cookies:
task = cls.rotate_tasks.get(cls._cookies[GOOGLE_SID_COOKIE])
if not task:
cls.rotate_tasks[cls._cookies[GOOGLE_SID_COOKIE]] = asyncio.create_task(
cls.start_auto_refresh()
)
uploads = await cls.upload_images(base_connector, merge_media(media, messages))
async with ClientSession(
cookies=cls._cookies,
headers=REQUEST_HEADERS,
connector=base_connector,
) as client:
params = {
'bl': REQUEST_BL_PARAM,
'hl': language,
'_reqid': random.randint(1111, 9999),
'rt': 'c',
"f.sid": cls._sid,
}
data = {
'at': cls._snlm0e,
'f.req': json.dumps([None, json.dumps(cls.build_request(
prompt,
language=language,
conversation=conversation,
uploads=uploads
))])
}
async with client.post(
REQUEST_URL,
data=data,
params=params,
headers=models[model] if model in models else None
) as response:
await raise_for_status(response)
image_prompt = response_part = None
last_content = ""
youtube_ids = []
for line in (await response.text()).split("\n"):
try:
try:
line = json.loads(line)
except ValueError:
continue
if not isinstance(line, list):
continue
if len(line[0]) < 3 or not line[0][2]:
continue
response_part = json.loads(line[0][2])
if response_part[10]:
yield TitleGeneration(response_part[10][0].strip())
if not response_part[4]:
continue
if return_conversation:
yield Conversation(response_part[1][0], response_part[1][1], response_part[4][0][0], model)
def find_youtube_ids(content: str):
pattern = re.compile(r"http://www.youtube.com/watch\?v=([\w-]+)")
for match in pattern.finditer(content):
if match.group(1) not in youtube_ids:
yield match.group(1)
def read_recusive(data):
for item in data:
if isinstance(item, list):
yield from read_recusive(item)
elif isinstance(item, str) and not item.startswith("rc_"):
yield item
def find_str(data, skip=0):
for item in read_recusive(data):
if skip > 0:
skip -= 1
continue
yield item
reasoning = "\n\n".join(find_str(response_part[4][0], 3))
reasoning = re.sub(r"<b>|</b>", "**", reasoning)
def replace_image(match):
return f"})"
reasoning = re.sub(r"//yt3.(?:ggpht.com|googleusercontent.com/ytc)/[\w=-]+", replace_image, reasoning)
reasoning = re.sub(r"\nyoutube\n", "\n\n\n", reasoning)
reasoning = re.sub(r"\nyoutube_tool\n", "\n\n", reasoning)
reasoning = re.sub(r"\nYouTube\n", "\nYouTube ", reasoning)
reasoning = reasoning.replace('\nhttps://www.gstatic.com/images/branding/productlogos/youtube/v9/192px.svg', '<i class="fa-brands fa-youtube"></i>')
youtube_ids = list(find_youtube_ids(reasoning))
content = response_part[4][0][1][0]
if reasoning:
yield Reasoning(reasoning, status="🤔")
except (ValueError, KeyError, TypeError, IndexError) as e:
debug.error(f"{cls.__name__} {type(e).__name__}: {e}")
continue
match = re.search(r'\[Imagen of (.*?)\]', content)
if match:
image_prompt = match.group(1)
content = content.replace(match.group(0), '')
pattern = r"http://googleusercontent.com/(?:image_generation|youtube|map)_content/\d+"
content = re.sub(pattern, "", content)
content = content.replace("<!-- end list -->", "")
content = content.replace("<ctrl94>thought", "<think>").replace("<ctrl95>", "</think>")
def replace_link(match):
return f"(https://{quote_plus(unquote_plus(match.group(1)), '/?&=#')})"
content = re.sub(r"\(https://www.google.com/(?:search\?q=|url\?sa=E&source=gmail&q=)https?://(.+?)\)", replace_link, content)
if last_content and content.startswith(last_content):
yield content[len(last_content):]
else:
yield content
last_content = content
if image_prompt:
try:
images = [image[0][3][3] for image in response_part[4][0][12][7][0]]
image_prompt = image_prompt.replace("a fake image", "")
yield ImageResponse(images, image_prompt, {"cookies": cls._cookies})
except (TypeError, IndexError, KeyError):
pass
youtube_ids = youtube_ids if youtube_ids else find_youtube_ids(content)
if youtube_ids:
yield YouTubeResponse(youtube_ids)
@classmethod
async def synthesize(cls, params: dict, proxy: str = None) -> AsyncIterator[bytes]:
if "text" not in params:
raise ValueError("Missing parameter text")
async with ClientSession(
cookies=cls._cookies,
headers=REQUEST_HEADERS,
connector=get_connector(proxy=proxy),
) as session:
if not cls._snlm0e:
await cls.fetch_snlm0e(session, cls._cookies) if cls._cookies else None
inner_data = json.dumps([None, params["text"], "en-US", None, 2])
async with session.post(
"https://gemini.google.com/_/BardChatUi/data/batchexecute",
data={
"f.req": json.dumps([[["XqA3Ic", inner_data, None, "generic"]]]),
"at": cls._snlm0e,
},
params={
"rpcids": "XqA3Ic",
"source-path": "/app/2704fb4aafcca926",
"bl": "boq_assistant-bard-web-server_20241119.00_p1",
"f.sid": "" if cls._sid is None else cls._sid,
"hl": "de",
"_reqid": random.randint(1111, 9999),
"rt": "c"
},
) as response:
await raise_for_status(response)
iter_base64_response = iter_filter_base64(response.content.iter_chunked(1024))
async for chunk in iter_base64_decode(iter_base64_response):
yield chunk
def build_request(
prompt: str,
language: str,
conversation: Conversation = None,
uploads: list[list[str, str]] = None,
tools: list[list[str]] = []
) -> list:
image_list = [[[image_url, 1], image_name] for image_url, image_name in uploads] if uploads else []
return [
[prompt, 0, None, image_list, None, None, 0],
[language],
[
None if conversation is None else conversation.conversation_id,
None if conversation is None else conversation.response_id,
None if conversation is None else conversation.choice_id,
None,
None,
[]
],
None,
None,
None,
[1],
0,
[],
tools,
1,
0,
]
async def upload_images(connector: BaseConnector, media: MediaListType) -> list:
async def upload_image(image: bytes, image_name: str = None):
async with ClientSession(
headers=UPLOAD_IMAGE_HEADERS,
connector=connector
) as session:
image = to_bytes(image)
async with session.options(UPLOAD_IMAGE_URL) as response:
await raise_for_status(response)
headers = {
"size": str(len(image)),
"x-goog-upload-command": "start"
}
data = f"File name: {image_name}" if image_name else None
async with session.post(
UPLOAD_IMAGE_URL, headers=headers, data=data
) as response:
await raise_for_status(response)
upload_url = response.headers["X-Goog-Upload-Url"]
async with session.options(upload_url, headers=headers) as response:
await raise_for_status(response)
headers["x-goog-upload-command"] = "upload, finalize"
headers["X-Goog-Upload-Offset"] = "0"
async with session.post(
upload_url, headers=headers, data=image
) as response:
await raise_for_status(response)
return [await response.text(), image_name]
return await asyncio.gather(*[upload_image(image, image_name) for image, image_name in media])
@classmethod
async def fetch_snlm0e(cls, session: ClientSession, cookies: Cookies):
async with session.get(cls.url, cookies=cookies) as response:
await raise_for_status(response)
response_text = await response.text()
match = re.search(r'SNlM0e\":\"(.*?)\"', response_text)
if match:
cls._snlm0e = match.group(1)
sid_match = re.search(r'"FdrFJe":"([\d-]+)"', response_text)
if sid_match:
cls._sid = sid_match.group(1)
class Conversation(JsonConversation):
def __init__(self,
conversation_id: str,
response_id: str,
choice_id: str,
model: str
) -> None:
self.conversation_id = conversation_id
self.response_id = response_id
self.choice_id = choice_id
self.model = model
async def iter_filter_base64(chunks: AsyncIterator[bytes]) -> AsyncIterator[bytes]:
search_for = b'[["wrb.fr","XqA3Ic","[\\"'
end_with = b'\\'
is_started = False
async for chunk in chunks:
if is_started:
if end_with in chunk:
yield chunk.split(end_with, maxsplit=1).pop(0)
break
else:
yield chunk
elif search_for in chunk:
is_started = True
yield chunk.split(search_for, maxsplit=1).pop()
else:
raise ValueError(f"Response: {chunk}")
async def iter_base64_decode(chunks: AsyncIterator[bytes]) -> AsyncIterator[bytes]:
buffer = b""
rest = 0
async for chunk in chunks:
chunk = buffer + chunk
rest = len(chunk) % 4
buffer = chunk[-rest:]
yield base64.b64decode(chunk[:-rest])
if rest > 0:
yield base64.b64decode(buffer+rest*b"=")
async def rotate_1psidts(url, cookies: dict, proxy: str | None = None) -> str:
path = Path(get_cookies_dir())
path.mkdir(parents=True, exist_ok=True)
filename = f"auth_Gemini.json"
path = path / filename
# Check if the cache file was modified in the last minute to avoid 429 Too Many Requests
if not (path.is_file() and time.time() - os.path.getmtime(path) <= 60):
async with ClientSession(proxy=proxy) as client:
response = await client.post(
url=ROTATE_COOKIES_URL,
headers={
"Content-Type": "application/json",
},
cookies=cookies,
data='[000,"-0000000000000000000"]',
)
if response.status == 401:
raise MissingAuthError("Invalid cookies")
response.raise_for_status()
for key, c in response.cookies.items():
cookies[key] = c.value
new_1psidts = response.cookies.get("__Secure-1PSIDTS")
path.write_text(json.dumps([{
"name": k,
"value": v,
"domain": GOOGLE_COOKIE_DOMAIN,
} for k, v in cookies.items()]))
if new_1psidts:
return new_1psidts
|