from bot_backend import * import base64 import time import tiktoken from notebook_serializer import add_code_cell_error_to_notebook, add_image_to_notebook, add_code_cell_output_to_notebook SLICED_CONV_MESSAGE = "[Rest of the conversation has been omitted to fit in the context window]" def get_conversation_slice(conversation, model, encoding_for_which_model, min_output_tokens_count=500): """ Function to get a slice of the conversation that fits in the model's context window. returns: The conversation with the first message(explaining the role of the assistant) + the last x messages that can fit in the context window. """ encoder = tiktoken.encoding_for_model(encoding_for_which_model) count_tokens = lambda txt: len(encoder.encode(txt)) nb_tokens = count_tokens(conversation[0]['content']) sliced_conv = [conversation[0]] context_window_limit = int(config['model_context_window'][model]) max_tokens = context_window_limit - count_tokens(SLICED_CONV_MESSAGE) - min_output_tokens_count sliced = False for message in conversation[-1:0:-1]: nb_tokens += count_tokens(message['content']) if nb_tokens > max_tokens: sliced_conv.insert(1, {'role': 'system', 'content': SLICED_CONV_MESSAGE}) sliced = True break sliced_conv.insert(1, message) return sliced_conv, nb_tokens, sliced def chat_completion(bot_backend: BotBackend): model_choice = bot_backend.gpt_model_choice model_name = bot_backend.config['model'][model_choice]['model_name'] kwargs_for_chat_completion = copy.deepcopy(bot_backend.kwargs_for_chat_completion) if bot_backend.config['API_TYPE'] == "azure": kwargs_for_chat_completion['messages'], nb_tokens, sliced = \ get_conversation_slice( conversation=kwargs_for_chat_completion['messages'], model=model_name, encoding_for_which_model='gpt-3.5-turbo' if model_choice == 'GPT-3.5' else 'gpt-4' ) else: kwargs_for_chat_completion['messages'], nb_tokens, sliced = \ get_conversation_slice( conversation=kwargs_for_chat_completion['messages'], model=model_name, encoding_for_which_model=model_name ) bot_backend.update_token_count(num_tokens=nb_tokens) bot_backend.update_sliced_state(sliced=sliced) assert config['model'][model_choice]['available'], f"{model_choice} is not available for your API key" assert model_name in config['model_context_window'], \ f"{model_name} lacks context window information. Please check the config.json file." response = openai.ChatCompletion.create(**kwargs_for_chat_completion) return response def add_code_execution_result_to_bot_history(content_to_display, history, unique_id): images, text = [], [] # terminal output error_occurred = False for mark, out_str in content_to_display: if mark in ('stdout', 'execute_result_text', 'display_text'): text.append(out_str) add_code_cell_output_to_notebook(out_str) elif mark in ('execute_result_png', 'execute_result_jpeg', 'display_png', 'display_jpeg'): if 'png' in mark: images.append(('png', out_str)) add_image_to_notebook(out_str, 'image/png') else: add_image_to_notebook(out_str, 'image/jpeg') images.append(('jpg', out_str)) elif mark == 'error': # Set output type to error text.append(delete_color_control_char(out_str)) error_occurred = True add_code_cell_error_to_notebook(out_str) text = '\n'.join(text).strip('\n') if error_occurred: history.append([None, f'❌Terminal output:\n```shell\n\n{text}\n```']) else: history.append([None, f'✔️Terminal output:\n```shell\n{text}\n```']) # image output for filetype, img in images: image_bytes = base64.b64decode(img) temp_path = f'cache/temp_{unique_id}' if not os.path.exists(temp_path): os.mkdir(temp_path) path = f'{temp_path}/{hash(time.time())}.{filetype}' with open(path, 'wb') as f: f.write(image_bytes) width, height = get_image_size(path) history.append( [ None, f' ' ] ) def add_function_response_to_bot_history(hypertext_to_display, history): if hypertext_to_display is not None: if history[-1][1]: history.append([None, hypertext_to_display]) else: history[-1][1] = hypertext_to_display def parse_json(function_args: str, finished: bool): """ GPT may generate non-standard JSON format string, which contains '\n' in string value, leading to error when using `json.loads()`. Here we implement a parser to extract code directly from non-standard JSON string. :return: code string if successfully parsed otherwise None """ parser_log = { 'met_begin_{': False, 'begin_"code"': False, 'end_"code"': False, 'met_:': False, 'met_end_}': False, 'met_end_code_"': False, "code_begin_index": 0, "code_end_index": 0 } try: for index, char in enumerate(function_args): if char == '{': parser_log['met_begin_{'] = True elif parser_log['met_begin_{'] and char == '"': if parser_log['met_:']: if finished: parser_log['code_begin_index'] = index + 1 break else: if index + 1 == len(function_args): return None else: temp_code_str = function_args[index + 1:] if '\n' in temp_code_str: try: return json.loads(function_args + '"}')['code'] except json.JSONDecodeError: try: return json.loads(function_args + '}')['code'] except json.JSONDecodeError: try: return json.loads(function_args)['code'] except json.JSONDecodeError: if temp_code_str[-1] in ('"', '\n'): return None else: return temp_code_str.strip('\n') else: return json.loads(function_args + '"}')['code'] elif parser_log['begin_"code"']: parser_log['end_"code"'] = True else: parser_log['begin_"code"'] = True elif parser_log['end_"code"'] and char == ':': parser_log['met_:'] = True else: continue if finished: for index, char in enumerate(function_args[::-1]): back_index = -1 - index if char == '}': parser_log['met_end_}'] = True elif parser_log['met_end_}'] and char == '"': parser_log['code_end_index'] = back_index - 1 break else: continue code_str = function_args[parser_log['code_begin_index']: parser_log['code_end_index'] + 1] if '\n' in code_str: return code_str.strip('\n') else: return json.loads(function_args)['code'] except Exception as e: return None def get_image_size(image_path): with Image.open(image_path) as img: width, height = img.size return width, height