import os import json import random import json import os import uuid import ssl import certifi import aiohttp import asyncio import requests from typing import Dict, NewType, Union, Optional, List, get_type_hints sha256 = NewType('sha_256_hash', str) url = 'https://bing.com/chat' model = ['gpt-4'] supports_stream = True needs_auth = False ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(certifi.where()) class optionsSets: optionSet: dict = { 'tone': str, 'optionsSets': list } jailbreak: dict = { "optionsSets": [ 'saharasugg', 'enablenewsfc', 'clgalileo', 'gencontentv3', "nlu_direct_response_filter", "deepleo", "disable_emoji_spoken_text", "responsible_ai_policy_235", "enablemm", "h3precise" # "harmonyv3", "dtappid", "cricinfo", "cricinfov2", "dv3sugg", "nojbfedge" ] } class Defaults: delimiter = '\x1e' ip_address = f'13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}' allowedMessageTypes = [ 'Chat', 'Disengaged', 'AdsQuery', 'SemanticSerp', 'GenerateContentQuery', 'SearchQuery', 'ActionRequest', 'Context', 'Progress', 'AdsQuery', 'SemanticSerp' ] sliceIds = [ # "222dtappid", # "225cricinfo", # "224locals0" 'winmuid3tf', 'osbsdusgreccf', 'ttstmout', 'crchatrev', 'winlongmsgtf', 'ctrlworkpay', 'norespwtf', 'tempcacheread', 'temptacache', '505scss0', '508jbcars0', '515enbotdets0', '5082tsports', '515vaoprvs', '424dagslnv1s0', 'kcimgattcf', '427startpms0' ] location = { 'locale': 'en-US', 'market': 'en-US', 'region': 'US', 'locationHints': [ { 'country': 'United States', 'state': 'California', 'city': 'Los Angeles', 'timezoneoffset': 8, 'countryConfidence': 8, 'Center': { 'Latitude': 34.0536909, 'Longitude': -118.242766 }, 'RegionType': 2, 'SourceType': 1 } ], } def _format(msg: dict) -> str: return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter async def create_conversation(): for _ in range(5): create = requests.get('https://www.bing.com/turing/conversation/create', headers={ 'authority': 'edgeservices.bing.com', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'accept-language': 'en-US,en;q=0.9', 'cache-control': 'max-age=0', 'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"', 'sec-ch-ua-arch': '"x86"', 'sec-ch-ua-bitness': '"64"', 'sec-ch-ua-full-version': '"110.0.1587.69"', 'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-model': '""', 'sec-ch-ua-platform': '"Windows"', 'sec-ch-ua-platform-version': '"15.0.0"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'none', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69', 'x-edge-shopping-flag': '1', 'x-forwarded-for': Defaults.ip_address }) conversationId = create.json().get('conversationId') clientId = create.json().get('clientId') conversationSignature = create.json().get('conversationSignature') if not conversationId or not clientId or not conversationSignature and _ == 4: raise Exception('Failed to create conversation.') return conversationId, clientId, conversationSignature async def stream_generate(prompt: str, mode: optionsSets.optionSet = optionsSets.jailbreak, context: bool or str = False): timeout = aiohttp.ClientTimeout(total=900) session = aiohttp.ClientSession(timeout=timeout) conversationId, clientId, conversationSignature = await create_conversation() wss = await session.ws_connect('wss://sydney.bing.com/sydney/ChatHub', ssl=ssl_context, autoping=False, headers={ 'accept': 'application/json', 'accept-language': 'en-US,en;q=0.9', 'content-type': 'application/json', 'sec-ch-ua': '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"', 'sec-ch-ua-arch': '"x86"', 'sec-ch-ua-bitness': '"64"', 'sec-ch-ua-full-version': '"109.0.1518.78"', 'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-model': '', 'sec-ch-ua-platform': '"Windows"', 'sec-ch-ua-platform-version': '"15.0.0"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'x-ms-client-request-id': str(uuid.uuid4()), 'x-ms-useragent': 'azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32', 'Referer': 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx', 'Referrer-Policy': 'origin-when-cross-origin', 'x-forwarded-for': Defaults.ip_address }) await wss.send_str(_format({'protocol': 'json', 'version': 1})) await wss.receive(timeout=900) struct = { 'arguments': [ { **mode, 'source': 'cib', 'allowedMessageTypes': Defaults.allowedMessageTypes, 'sliceIds': Defaults.sliceIds, 'traceId': os.urandom(16).hex(), 'isStartOfSession': True, 'message': Defaults.location | { 'author': 'user', 'inputMethod': 'Keyboard', 'text': prompt, 'messageType': 'Chat' }, 'conversationSignature': conversationSignature, 'participant': { 'id': clientId }, 'conversationId': conversationId } ], 'invocationId': '0', 'target': 'chat', 'type': 4 } if context: struct['arguments'][0]['previousMessages'] = [ { "author": "user", "description": context, "contextType": "WebPage", "messageType": "Context", "messageId": "discover-web--page-ping-mriduna-----" } ] await wss.send_str(_format(struct)) final = False draw = False resp_txt = '' result_text = '' resp_txt_no_link = '' cache_text = '' while not final: msg = await wss.receive(timeout=900) objects = msg.data.split(Defaults.delimiter) for obj in objects: if obj is None or not obj: continue response = json.loads(obj) if response.get('type') == 1 and response['arguments'][0].get('messages',): if not draw: if (response['arguments'][0]['messages'][0]['contentOrigin'] != 'Apology') and not draw: resp_txt = result_text + \ response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0].get( 'text', '') resp_txt_no_link = result_text + \ response['arguments'][0]['messages'][0].get( 'text', '') if response['arguments'][0]['messages'][0].get('messageType',): resp_txt = ( resp_txt + response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text') + '\n' ) result_text = ( result_text + response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text') + '\n' ) if cache_text.endswith(' '): final = True if wss and not wss.closed: await wss.close() if session and not session.closed: await session.close() yield (resp_txt.replace(cache_text, '')) cache_text = resp_txt elif response.get('type') == 2: if response['item']['result'].get('error'): if wss and not wss.closed: await wss.close() if session and not session.closed: await session.close() raise Exception( f"{response['item']['result']['value']}: {response['item']['result']['message']}") if draw: cache = response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] = ( cache + resp_txt) if (response['item']['messages'][-1]['contentOrigin'] == 'Apology' and resp_txt): response['item']['messages'][-1]['text'] = resp_txt_no_link response['item']['messages'][-1]['adaptiveCards'][0]['body'][0]['text'] = resp_txt # print('Preserved the message from being deleted', file=sys.stderr) final = True if wss and not wss.closed: await wss.close() if session and not session.closed: await session.close() def run(generator): loop = asyncio.get_event_loop() gen = generator.__aiter__() while True: try: next_val = loop.run_until_complete(gen.__anext__()) yield next_val except StopAsyncIteration: break #print('Done') def convert(messages): context = "" for message in messages: context += "[%s](#message)\n%s\n\n" % (message['role'], message['content']) return context def _create_completion(model: str, messages: list, stream: bool, **kwargs): if len(messages) < 2: prompt = messages[0]['content'] context = False else: prompt = messages[-1]['content'] context = convert(messages[:-1]) response = run(stream_generate(prompt, optionsSets.jailbreak, context)) for token in response: yield (token) #print('Done') params = f'g4f.Providers.{os.path.basename(__file__)[:-3]} supports: ' + \ '(%s)' % ', '.join( [f"{name}: {get_type_hints(_create_completion)[name].__name__}" for name in _create_completion.__code__.co_varnames[:_create_completion.__code__.co_argcount]])