Spaces:
Runtime error
Runtime error
import argparse | |
import asyncio | |
import json | |
import re | |
import sys | |
from pathlib import Path | |
from EdgeGPT.EdgeGPT import Chatbot | |
from prompt_toolkit import PromptSession | |
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory | |
from prompt_toolkit.completion import WordCompleter | |
from prompt_toolkit.history import InMemoryHistory | |
from prompt_toolkit.key_binding import KeyBindings | |
from rich.live import Live | |
from rich.markdown import Markdown | |
def create_session() -> PromptSession: | |
kb = KeyBindings() | |
def _(event) -> None: | |
buffer_text = event.current_buffer.text | |
if buffer_text.startswith("!"): | |
event.current_buffer.validate_and_handle() | |
else: | |
event.current_buffer.insert_text("\n") | |
def _(event) -> None: | |
if event.current_buffer.complete_state: | |
# event.current_buffer.cancel_completion() | |
event.current_buffer.text = "" | |
return PromptSession(key_bindings=kb, history=InMemoryHistory()) | |
def create_completer(commands: list, pattern_str: str = "$") -> WordCompleter: | |
return WordCompleter(words=commands, pattern=re.compile(pattern_str)) | |
def _create_history_logger(f): | |
def logger(*args, **kwargs) -> None: | |
tmp = sys.stdout | |
sys.stdout = f | |
print(*args, **kwargs, flush=True) | |
sys.stdout = tmp | |
return logger | |
async def get_input_async( | |
session: PromptSession = None, | |
completer: WordCompleter = None, | |
) -> str: | |
""" | |
Multiline input function. | |
""" | |
return await session.prompt_async( | |
completer=completer, | |
multiline=True, | |
auto_suggest=AutoSuggestFromHistory(), | |
) | |
async def async_main(args: argparse.Namespace) -> None: | |
""" | |
Main function | |
""" | |
print("Initializing...") | |
print("Enter `alt+enter` or `escape+enter` to send a message") | |
# Read and parse cookies | |
cookies = None | |
if args.cookie_file: | |
file_path = Path(args.cookie_file) | |
if file_path.exists(): | |
with file_path.open("r", encoding="utf-8") as f: | |
cookies = json.load(f) | |
bot = await Chatbot.create(proxy=args.proxy, cookies=cookies) | |
session = create_session() | |
completer = create_completer(["!help", "!exit", "!reset"]) | |
initial_prompt = args.prompt | |
# Log chat history | |
def p_hist(*args, **kwargs) -> None: | |
pass | |
if args.history_file: | |
history_file_path = Path(args.history_file) | |
f = history_file_path.open("a+", encoding="utf-8") | |
p_hist = _create_history_logger(f) | |
while True: | |
print("\nYou:") | |
p_hist("\nYou:") | |
if initial_prompt: | |
question = initial_prompt | |
print(question) | |
initial_prompt = None | |
else: | |
question = ( | |
input() | |
if args.enter_once | |
else await get_input_async(session=session, completer=completer) | |
) | |
print() | |
p_hist(question + "\n") | |
if question == "!exit": | |
break | |
if question == "!help": | |
print( | |
""" | |
!help - Show this help message | |
!exit - Exit the program | |
!reset - Reset the conversation | |
""", | |
) | |
continue | |
if question == "!reset": | |
await bot.reset() | |
continue | |
print("Bot:") | |
p_hist("Bot:") | |
if args.no_stream: | |
response = ( | |
await bot.ask( | |
prompt=question, | |
conversation_style=args.style, | |
wss_link=args.wss_link, | |
search_result=args.search_result, | |
locale=args.locale, | |
) | |
)["item"]["messages"][-1]["adaptiveCards"][0]["body"][0]["text"] | |
print(response) | |
p_hist(response) | |
else: | |
wrote = 0 | |
if args.rich: | |
md = Markdown("") | |
with Live(md, auto_refresh=False) as live: | |
async for final, response in bot.ask_stream( | |
prompt=question, | |
conversation_style=args.style, | |
wss_link=args.wss_link, | |
search_result=args.search_result, | |
locale=args.locale, | |
): | |
if not final: | |
if not wrote: | |
p_hist(response, end="") | |
else: | |
p_hist(response[wrote:], end="") | |
if wrote > len(response): | |
print(md) | |
print(Markdown("***Bing revoked the response.***")) | |
wrote = len(response) | |
md = Markdown(response) | |
live.update(md, refresh=True) | |
else: | |
async for final, response in bot.ask_stream( | |
prompt=question, | |
conversation_style=args.style, | |
wss_link=args.wss_link, | |
search_result=args.search_result, | |
locale=args.locale, | |
): | |
if not final: | |
if not wrote: | |
print(response, end="", flush=True) | |
p_hist(response, end="") | |
else: | |
print(response[wrote:], end="", flush=True) | |
p_hist(response[wrote:], end="") | |
wrote = len(response) | |
print() | |
p_hist() | |
if args.history_file: | |
f.close() | |
await bot.close() | |
def main() -> None: | |
print( | |
""" | |
EdgeGPT - A demo of reverse engineering the Bing GPT chatbot | |
Repo: github.com/acheong08/EdgeGPT | |
By: Antonio Cheong | |
!help for help | |
Type !exit to exit | |
""", | |
) | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--enter-once", action="store_true") | |
parser.add_argument("--search-result", action="store_true") | |
parser.add_argument("--no-stream", action="store_true") | |
parser.add_argument("--rich", action="store_true") | |
parser.add_argument( | |
"--proxy", | |
help="Proxy URL (e.g. socks5://127.0.0.1:1080)", | |
type=str, | |
) | |
parser.add_argument( | |
"--wss-link", | |
help="WSS URL(e.g. wss://sydney.bing.com/sydney/ChatHub)", | |
type=str, | |
default="wss://sydney.bing.com/sydney/ChatHub", | |
) | |
parser.add_argument( | |
"--style", | |
choices=["creative", "balanced", "precise"], | |
default="balanced", | |
) | |
parser.add_argument( | |
"--prompt", | |
type=str, | |
default="", | |
required=False, | |
help="prompt to start with", | |
) | |
parser.add_argument( | |
"--cookie-file", | |
type=str, | |
default="", | |
required=False, | |
help="path to cookie file", | |
) | |
parser.add_argument( | |
"--history-file", | |
type=str, | |
default="", | |
required=False, | |
help="path to history file", | |
) | |
parser.add_argument( | |
"--locale", | |
type=str, | |
default="en-US", | |
required=False, | |
help="your locale", | |
) | |
args = parser.parse_args() | |
asyncio.run(async_main(args)) | |