|
import os |
|
import json |
|
import random |
|
import json |
|
import os |
|
import uuid |
|
import ssl |
|
import certifi |
|
import aiohttp |
|
import asyncio |
|
|
|
import requests |
|
from ...typing import sha256, Dict, get_type_hints |
|
|
|
url = 'https://bing.com/chat' |
|
model = ['gpt-4'] |
|
supports_stream = True |
|
needs_auth = False |
|
|
|
ssl_context = ssl.create_default_context() |
|
ssl_context.load_verify_locations(certifi.where()) |
|
|
|
|
|
class optionsSets: |
|
optionSet: dict = { |
|
'tone': str, |
|
'optionsSets': list |
|
} |
|
|
|
jailbreak: dict = { |
|
"optionsSets": [ |
|
'saharasugg', |
|
'enablenewsfc', |
|
'clgalileo', |
|
'gencontentv3', |
|
"nlu_direct_response_filter", |
|
"deepleo", |
|
"disable_emoji_spoken_text", |
|
"responsible_ai_policy_235", |
|
"enablemm", |
|
"h3precise" |
|
|
|
"dtappid", |
|
"cricinfo", |
|
"cricinfov2", |
|
"dv3sugg", |
|
"nojbfedge" |
|
] |
|
} |
|
|
|
|
|
class Defaults: |
|
delimiter = '\x1e' |
|
ip_address = f'13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}' |
|
|
|
allowedMessageTypes = [ |
|
'Chat', |
|
'Disengaged', |
|
'AdsQuery', |
|
'SemanticSerp', |
|
'GenerateContentQuery', |
|
'SearchQuery', |
|
'ActionRequest', |
|
'Context', |
|
'Progress', |
|
'AdsQuery', |
|
'SemanticSerp' |
|
] |
|
|
|
sliceIds = [ |
|
|
|
|
|
|
|
|
|
|
|
'winmuid3tf', |
|
'osbsdusgreccf', |
|
'ttstmout', |
|
'crchatrev', |
|
'winlongmsgtf', |
|
'ctrlworkpay', |
|
'norespwtf', |
|
'tempcacheread', |
|
'temptacache', |
|
'505scss0', |
|
'508jbcars0', |
|
'515enbotdets0', |
|
'5082tsports', |
|
'515vaoprvs', |
|
'424dagslnv1s0', |
|
'kcimgattcf', |
|
'427startpms0' |
|
] |
|
|
|
location = { |
|
'locale': 'en-US', |
|
'market': 'en-US', |
|
'region': 'US', |
|
'locationHints': [ |
|
{ |
|
'country': 'United States', |
|
'state': 'California', |
|
'city': 'Los Angeles', |
|
'timezoneoffset': 8, |
|
'countryConfidence': 8, |
|
'Center': { |
|
'Latitude': 34.0536909, |
|
'Longitude': -118.242766 |
|
}, |
|
'RegionType': 2, |
|
'SourceType': 1 |
|
} |
|
], |
|
} |
|
|
|
|
|
def _format(msg: dict) -> str: |
|
return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter |
|
|
|
|
|
async def create_conversation(): |
|
for _ in range(5): |
|
create = requests.get('https://www.bing.com/turing/conversation/create', |
|
headers={ |
|
'authority': 'edgeservices.bing.com', |
|
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', |
|
'accept-language': 'en-US,en;q=0.9', |
|
'cache-control': 'max-age=0', |
|
'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"', |
|
'sec-ch-ua-arch': '"x86"', |
|
'sec-ch-ua-bitness': '"64"', |
|
'sec-ch-ua-full-version': '"110.0.1587.69"', |
|
'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', |
|
'sec-ch-ua-mobile': '?0', |
|
'sec-ch-ua-model': '""', |
|
'sec-ch-ua-platform': '"Windows"', |
|
'sec-ch-ua-platform-version': '"15.0.0"', |
|
'sec-fetch-dest': 'document', |
|
'sec-fetch-mode': 'navigate', |
|
'sec-fetch-site': 'none', |
|
'sec-fetch-user': '?1', |
|
'upgrade-insecure-requests': '1', |
|
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69', |
|
'x-edge-shopping-flag': '1', |
|
'x-forwarded-for': Defaults.ip_address |
|
}) |
|
|
|
conversationId = create.json().get('conversationId') |
|
clientId = create.json().get('clientId') |
|
conversationSignature = create.json().get('conversationSignature') |
|
|
|
if not conversationId or not clientId or not conversationSignature and _ == 4: |
|
raise Exception('Failed to create conversation.') |
|
|
|
return conversationId, clientId, conversationSignature |
|
|
|
|
|
async def stream_generate(prompt: str, mode: optionsSets.optionSet = optionsSets.jailbreak, context: bool or str = False): |
|
timeout = aiohttp.ClientTimeout(total=900) |
|
session = aiohttp.ClientSession(timeout=timeout) |
|
|
|
conversationId, clientId, conversationSignature = await create_conversation() |
|
|
|
wss = await session.ws_connect('wss://sydney.bing.com/sydney/ChatHub', ssl=ssl_context, autoping=False, |
|
headers={ |
|
'accept': 'application/json', |
|
'accept-language': 'en-US,en;q=0.9', |
|
'content-type': 'application/json', |
|
'sec-ch-ua': '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"', |
|
'sec-ch-ua-arch': '"x86"', |
|
'sec-ch-ua-bitness': '"64"', |
|
'sec-ch-ua-full-version': '"109.0.1518.78"', |
|
'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', |
|
'sec-ch-ua-mobile': '?0', |
|
'sec-ch-ua-model': '', |
|
'sec-ch-ua-platform': '"Windows"', |
|
'sec-ch-ua-platform-version': '"15.0.0"', |
|
'sec-fetch-dest': 'empty', |
|
'sec-fetch-mode': 'cors', |
|
'sec-fetch-site': 'same-origin', |
|
'x-ms-client-request-id': str(uuid.uuid4()), |
|
'x-ms-useragent': 'azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32', |
|
'Referer': 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx', |
|
'Referrer-Policy': 'origin-when-cross-origin', |
|
'x-forwarded-for': Defaults.ip_address |
|
}) |
|
|
|
await wss.send_str(_format({'protocol': 'json', 'version': 1})) |
|
await wss.receive(timeout=900) |
|
|
|
struct = { |
|
'arguments': [ |
|
{ |
|
**mode, |
|
'source': 'cib', |
|
'allowedMessageTypes': Defaults.allowedMessageTypes, |
|
'sliceIds': Defaults.sliceIds, |
|
'traceId': os.urandom(16).hex(), |
|
'isStartOfSession': True, |
|
'message': Defaults.location | { |
|
'author': 'user', |
|
'inputMethod': 'Keyboard', |
|
'text': prompt, |
|
'messageType': 'Chat' |
|
}, |
|
'conversationSignature': conversationSignature, |
|
'participant': { |
|
'id': clientId |
|
}, |
|
'conversationId': conversationId |
|
} |
|
], |
|
'invocationId': '0', |
|
'target': 'chat', |
|
'type': 4 |
|
} |
|
|
|
if context: |
|
struct['arguments'][0]['previousMessages'] = [ |
|
{ |
|
"author": "user", |
|
"description": context, |
|
"contextType": "WebPage", |
|
"messageType": "Context", |
|
"messageId": "discover-web--page-ping-mriduna-----" |
|
} |
|
] |
|
|
|
await wss.send_str(_format(struct)) |
|
|
|
final = False |
|
draw = False |
|
resp_txt = '' |
|
result_text = '' |
|
resp_txt_no_link = '' |
|
cache_text = '' |
|
|
|
while not final: |
|
msg = await wss.receive(timeout=900) |
|
objects = msg.data.split(Defaults.delimiter) |
|
|
|
for obj in objects: |
|
if obj is None or not obj: |
|
continue |
|
|
|
response = json.loads(obj) |
|
if response.get('type') == 1 and response['arguments'][0].get('messages',): |
|
if not draw: |
|
if (response['arguments'][0]['messages'][0]['contentOrigin'] != 'Apology') and not draw: |
|
resp_txt = result_text + \ |
|
response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0].get( |
|
'text', '') |
|
resp_txt_no_link = result_text + \ |
|
response['arguments'][0]['messages'][0].get( |
|
'text', '') |
|
|
|
if response['arguments'][0]['messages'][0].get('messageType',): |
|
resp_txt = ( |
|
resp_txt |
|
+ response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text') |
|
+ '\n' |
|
) |
|
result_text = ( |
|
result_text |
|
+ response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text') |
|
+ '\n' |
|
) |
|
|
|
if cache_text.endswith(' '): |
|
final = True |
|
if wss and not wss.closed: |
|
await wss.close() |
|
if session and not session.closed: |
|
await session.close() |
|
|
|
yield (resp_txt.replace(cache_text, '')) |
|
cache_text = resp_txt |
|
|
|
elif response.get('type') == 2: |
|
if response['item']['result'].get('error'): |
|
if wss and not wss.closed: |
|
await wss.close() |
|
if session and not session.closed: |
|
await session.close() |
|
|
|
raise Exception( |
|
f"{response['item']['result']['value']}: {response['item']['result']['message']}") |
|
|
|
if draw: |
|
cache = response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] |
|
response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] = ( |
|
cache + resp_txt) |
|
|
|
if (response['item']['messages'][-1]['contentOrigin'] == 'Apology' and resp_txt): |
|
response['item']['messages'][-1]['text'] = resp_txt_no_link |
|
response['item']['messages'][-1]['adaptiveCards'][0]['body'][0]['text'] = resp_txt |
|
|
|
|
|
|
|
final = True |
|
if wss and not wss.closed: |
|
await wss.close() |
|
if session and not session.closed: |
|
await session.close() |
|
|
|
|
|
def run(generator): |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
gen = generator.__aiter__() |
|
|
|
while True: |
|
try: |
|
next_val = loop.run_until_complete(gen.__anext__()) |
|
yield next_val |
|
|
|
except StopAsyncIteration: |
|
break |
|
|
|
|
|
def convert(messages): |
|
context = "" |
|
|
|
for message in messages: |
|
context += "[%s](#message)\n%s\n\n" % (message['role'], |
|
message['content']) |
|
|
|
return context |
|
|
|
|
|
def _create_completion(model: str, messages: list, stream: bool, **kwargs): |
|
if len(messages) < 2: |
|
prompt = messages[0]['content'] |
|
context = False |
|
|
|
else: |
|
prompt = messages[-1]['content'] |
|
context = convert(messages[:-1]) |
|
|
|
response = run(stream_generate(prompt, optionsSets.jailbreak, context)) |
|
for token in response: |
|
yield (token) |
|
|
|
|
|
|
|
|
|
params = f'g4f.Providers.{os.path.basename(__file__)[:-3]} supports: ' + \ |
|
'(%s)' % ', '.join( |
|
[f"{name}: {get_type_hints(_create_completion)[name].__name__}" for name in _create_completion.__code__.co_varnames[:_create_completion.__code__.co_argcount]]) |
|
|