|
from bot_backend import * |
|
import base64 |
|
import time |
|
import tiktoken |
|
from notebook_serializer import add_code_cell_error_to_notebook, add_image_to_notebook, add_code_cell_output_to_notebook |
|
|
|
SLICED_CONV_MESSAGE = "[Rest of the conversation has been omitted to fit in the context window]" |
|
|
|
|
|
def get_conversation_slice(conversation, model, encoding_for_which_model, min_output_tokens_count=500): |
|
""" |
|
Function to get a slice of the conversation that fits in the model's context window. returns: The conversation |
|
with the first message(explaining the role of the assistant) + the last x messages that can fit in the context |
|
window. |
|
""" |
|
encoder = tiktoken.encoding_for_model(encoding_for_which_model) |
|
count_tokens = lambda txt: len(encoder.encode(txt)) |
|
nb_tokens = count_tokens(conversation[0]['content']) |
|
sliced_conv = [conversation[0]] |
|
context_window_limit = int(config['model_context_window'][model]) |
|
max_tokens = context_window_limit - count_tokens(SLICED_CONV_MESSAGE) - min_output_tokens_count |
|
sliced = False |
|
for message in conversation[-1:0:-1]: |
|
nb_tokens += count_tokens(message['content']) |
|
if nb_tokens > max_tokens: |
|
sliced_conv.insert(1, {'role': 'system', 'content': SLICED_CONV_MESSAGE}) |
|
sliced = True |
|
break |
|
sliced_conv.insert(1, message) |
|
return sliced_conv, nb_tokens, sliced |
|
|
|
|
|
def chat_completion(bot_backend: BotBackend): |
|
model_choice = bot_backend.gpt_model_choice |
|
model_name = bot_backend.config['model'][model_choice]['model_name'] |
|
kwargs_for_chat_completion = copy.deepcopy(bot_backend.kwargs_for_chat_completion) |
|
if bot_backend.config['API_TYPE'] == "azure": |
|
kwargs_for_chat_completion['messages'], nb_tokens, sliced = \ |
|
get_conversation_slice( |
|
conversation=kwargs_for_chat_completion['messages'], |
|
model=model_name, |
|
encoding_for_which_model='gpt-3.5-turbo' if model_choice == 'GPT-3.5' else 'gpt-4' |
|
) |
|
else: |
|
kwargs_for_chat_completion['messages'], nb_tokens, sliced = \ |
|
get_conversation_slice( |
|
conversation=kwargs_for_chat_completion['messages'], |
|
model=model_name, |
|
encoding_for_which_model=model_name |
|
) |
|
|
|
bot_backend.update_token_count(num_tokens=nb_tokens) |
|
bot_backend.update_sliced_state(sliced=sliced) |
|
|
|
assert config['model'][model_choice]['available'], f"{model_choice} is not available for your API key" |
|
|
|
assert model_name in config['model_context_window'], \ |
|
f"{model_name} lacks context window information. Please check the config.json file." |
|
|
|
response = openai.ChatCompletion.create(**kwargs_for_chat_completion) |
|
return response |
|
|
|
|
|
def add_code_execution_result_to_bot_history(content_to_display, history, unique_id): |
|
images, text = [], [] |
|
|
|
|
|
error_occurred = False |
|
|
|
for mark, out_str in content_to_display: |
|
if mark in ('stdout', 'execute_result_text', 'display_text'): |
|
text.append(out_str) |
|
add_code_cell_output_to_notebook(out_str) |
|
elif mark in ('execute_result_png', 'execute_result_jpeg', 'display_png', 'display_jpeg'): |
|
if 'png' in mark: |
|
images.append(('png', out_str)) |
|
add_image_to_notebook(out_str, 'image/png') |
|
else: |
|
add_image_to_notebook(out_str, 'image/jpeg') |
|
images.append(('jpg', out_str)) |
|
elif mark == 'error': |
|
|
|
text.append(delete_color_control_char(out_str)) |
|
error_occurred = True |
|
add_code_cell_error_to_notebook(out_str) |
|
text = '\n'.join(text).strip('\n') |
|
if error_occurred: |
|
history.append([None, f'❌Terminal output:\n```shell\n\n{text}\n```']) |
|
else: |
|
history.append([None, f'✔️Terminal output:\n```shell\n{text}\n```']) |
|
|
|
|
|
for filetype, img in images: |
|
image_bytes = base64.b64decode(img) |
|
temp_path = f'cache/temp_{unique_id}' |
|
if not os.path.exists(temp_path): |
|
os.mkdir(temp_path) |
|
path = f'{temp_path}/{hash(time.time())}.{filetype}' |
|
with open(path, 'wb') as f: |
|
f.write(image_bytes) |
|
width, height = get_image_size(path) |
|
history.append( |
|
[ |
|
None, |
|
f'<img src=\"file={path}\" style=\'{"" if width < 800 else "width: 800px;"} max-width:none; ' |
|
f'max-height:none\'> ' |
|
] |
|
) |
|
|
|
|
|
def add_function_response_to_bot_history(hypertext_to_display, history): |
|
if hypertext_to_display is not None: |
|
if history[-1][1]: |
|
history.append([None, hypertext_to_display]) |
|
else: |
|
history[-1][1] = hypertext_to_display |
|
|
|
|
|
def parse_json(function_args: str, finished: bool): |
|
""" |
|
GPT may generate non-standard JSON format string, which contains '\n' in string value, leading to error when using |
|
`json.loads()`. |
|
Here we implement a parser to extract code directly from non-standard JSON string. |
|
:return: code string if successfully parsed otherwise None |
|
""" |
|
parser_log = { |
|
'met_begin_{': False, |
|
'begin_"code"': False, |
|
'end_"code"': False, |
|
'met_:': False, |
|
'met_end_}': False, |
|
'met_end_code_"': False, |
|
"code_begin_index": 0, |
|
"code_end_index": 0 |
|
} |
|
try: |
|
for index, char in enumerate(function_args): |
|
if char == '{': |
|
parser_log['met_begin_{'] = True |
|
elif parser_log['met_begin_{'] and char == '"': |
|
if parser_log['met_:']: |
|
if finished: |
|
parser_log['code_begin_index'] = index + 1 |
|
break |
|
else: |
|
if index + 1 == len(function_args): |
|
return None |
|
else: |
|
temp_code_str = function_args[index + 1:] |
|
if '\n' in temp_code_str: |
|
try: |
|
return json.loads(function_args + '"}')['code'] |
|
except json.JSONDecodeError: |
|
try: |
|
return json.loads(function_args + '}')['code'] |
|
except json.JSONDecodeError: |
|
try: |
|
return json.loads(function_args)['code'] |
|
except json.JSONDecodeError: |
|
if temp_code_str[-1] in ('"', '\n'): |
|
return None |
|
else: |
|
return temp_code_str.strip('\n') |
|
else: |
|
return json.loads(function_args + '"}')['code'] |
|
elif parser_log['begin_"code"']: |
|
parser_log['end_"code"'] = True |
|
else: |
|
parser_log['begin_"code"'] = True |
|
elif parser_log['end_"code"'] and char == ':': |
|
parser_log['met_:'] = True |
|
else: |
|
continue |
|
if finished: |
|
for index, char in enumerate(function_args[::-1]): |
|
back_index = -1 - index |
|
if char == '}': |
|
parser_log['met_end_}'] = True |
|
elif parser_log['met_end_}'] and char == '"': |
|
parser_log['code_end_index'] = back_index - 1 |
|
break |
|
else: |
|
continue |
|
code_str = function_args[parser_log['code_begin_index']: parser_log['code_end_index'] + 1] |
|
if '\n' in code_str: |
|
return code_str.strip('\n') |
|
else: |
|
return json.loads(function_args)['code'] |
|
|
|
except Exception as e: |
|
return None |
|
|
|
|
|
def get_image_size(image_path): |
|
with Image.open(image_path) as img: |
|
width, height = img.size |
|
return width, height |
|
|