Spaces:
Sleeping
Sleeping
| import os | |
| import openai | |
| import sys | |
| import gradio as gr | |
| from IPython import get_ipython | |
| import json | |
| import requests | |
| from tenacity import retry, wait_random_exponential, stop_after_attempt | |
| from IPython import get_ipython | |
| # from termcolor import colored # doesn't actually work in Colab ¯\_(ツ)_/¯ | |
| import ast | |
| GPT_MODEL = "gpt-3.5-turbo-1106" | |
| openai.api_key = os.environ['OPENAI_API_KEY'] | |
| def exec_python(cell): | |
| # result = 0 | |
| print(cell) | |
| # print(type(cell)) | |
| # code = json.loads(cell) | |
| # print(code) | |
| # exec(code["cell"]) | |
| inputcode = cell | |
| print(inputcode) | |
| code = inputcode | |
| # code_string = code["cell"] | |
| local_namespace = {} | |
| try: | |
| exec(code, globals(), local_namespace) | |
| except Exception as e: | |
| return "Please figure and solve it step by step with explanation by yourself. Remember, you must give the correct answer!" | |
| print(local_namespace) | |
| if not local_namespace: | |
| return "Please figure and solve it step by step with explanation by yourself. Remember, you must give the correct answer!" | |
| else: | |
| theanswers = local_namespace.values() | |
| print(theanswers) | |
| local_ans = list(theanswers)[-1] | |
| print(local_ans) | |
| return local_ans | |
| # Now let's define the function specification: | |
| functions = [ | |
| { | |
| "name": "exec_python", | |
| "description": "run python code and return the execution result.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "cell": { | |
| "type": "string", | |
| "description": "Valid Python code to execute.", | |
| } | |
| }, | |
| "required": ["cell"], | |
| }, | |
| }, | |
| ] | |
| # In order to run these functions automatically, we should maintain a dictionary: | |
| functions_dict = { | |
| "exec_python": exec_python, | |
| } | |
| def openai_api_calculate_cost(usage,model=GPT_MODEL): | |
| pricing = { | |
| # 'gpt-3.5-turbo-4k': { | |
| # 'prompt': 0.0015, | |
| # 'completion': 0.002, | |
| # }, | |
| # 'gpt-3.5-turbo-16k': { | |
| # 'prompt': 0.003, | |
| # 'completion': 0.004, | |
| # }, | |
| 'gpt-3.5-turbo-1106': { | |
| 'prompt': 0.001, | |
| 'completion': 0.002, | |
| }, | |
| # 'gpt-4-1106-preview': { | |
| # 'prompt': 0.01, | |
| # 'completion': 0.03, | |
| # }, | |
| # 'gpt-4-32k': { | |
| # 'prompt': 0.06, | |
| # 'completion': 0.12, | |
| # }, | |
| # 'text-embedding-ada-002-v2': { | |
| # 'prompt': 0.0001, | |
| # 'completion': 0.0001, | |
| # } | |
| } | |
| try: | |
| model_pricing = pricing[model] | |
| except KeyError: | |
| raise ValueError("Invalid model specified") | |
| prompt_cost = usage['prompt_tokens'] * model_pricing['prompt'] / 1000 | |
| completion_cost = usage['completion_tokens'] * model_pricing['completion'] / 1000 | |
| total_cost = prompt_cost + completion_cost | |
| print(f"\nTokens used: {usage['prompt_tokens']:,} prompt + {usage['completion_tokens']:,} completion = {usage['total_tokens']:,} tokens") | |
| print(f"Total cost for {model}: ${total_cost:.4f}\n") | |
| return total_cost | |
| def chat_completion_request(messages, functions=None, function_call=None, model=GPT_MODEL): | |
| """ | |
| This function sends a POST request to the OpenAI API to generate a chat completion. | |
| Parameters: | |
| - messages (list): A list of message objects. Each object should have a 'role' (either 'system', 'user', or 'assistant') and 'content' | |
| (the content of the message). | |
| - functions (list, optional): A list of function objects that describe the functions that the model can call. | |
| - function_call (str or dict, optional): If it's a string, it can be either 'auto' (the model decides whether to call a function) or 'none' | |
| (the model will not call a function). If it's a dict, it should describe the function to call. | |
| - model (str): The ID of the model to use. | |
| Returns: | |
| - response (requests.Response): The response from the OpenAI API. If the request was successful, the response's JSON will contain the chat completion. | |
| """ | |
| # Set up the headers for the API request | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": "Bearer " + openai.api_key, | |
| } | |
| # Set up the data for the API request | |
| json_data = {"model": model, "messages": messages} | |
| # json_data = {"model": model, "messages": messages, "response_format":{"type": "json_object"}} | |
| # json_data = {"model": model, "messages": messages, "temperature": 0.2, "top_p": 0.1} | |
| # If functions were provided, add them to the data | |
| if functions is not None: | |
| json_data.update({"functions": functions}) | |
| # If a function call was specified, add it to the data | |
| if function_call is not None: | |
| json_data.update({"function_call": function_call}) | |
| # Send the API request | |
| try: | |
| response = requests.post( | |
| "https://api.openai.com/v1/chat/completions", | |
| headers=headers, | |
| json=json_data, | |
| ) | |
| return response | |
| except Exception as e: | |
| print("Unable to generate ChatCompletion response") | |
| print(f"Exception: {e}") | |
| return e | |
| def first_call(init_prompt, user_input): | |
| # Set up a conversation | |
| messages = [] | |
| messages.append({"role": "system", "content": init_prompt}) | |
| # Write a user message that perhaps our function can handle...? | |
| messages.append({"role": "user", "content": user_input}) | |
| # Generate a response | |
| chat_response = chat_completion_request( | |
| messages, functions=functions, function_call='auto' | |
| ) | |
| # Save the JSON to a variable | |
| assistant_message = chat_response.json()["choices"][0]["message"] | |
| # Append response to conversation | |
| messages.append(assistant_message) | |
| usage = chat_response.json()['usage'] | |
| cost1 = openai_api_calculate_cost(usage) | |
| # Let's see what we got back before continuing | |
| return assistant_message, cost1, messages | |
| def is_valid_dict_string(s): | |
| try: | |
| ast.literal_eval(s) | |
| return True | |
| except (SyntaxError, ValueError): | |
| return False | |
| def function_call_process(assistant_message): | |
| if assistant_message.get("function_call") != None: | |
| # Retrieve the name of the relevant function | |
| function_name = assistant_message["function_call"]["name"] | |
| # Retrieve the arguments to send the function | |
| # function_args = json.loads(assistant_message["function_call"]["arguments"], strict=False) | |
| # if isinstance(assistant_message["function_call"]["arguments"], dict): | |
| # arg_dict = json.loads(r"{jsonload}".format(jsonload=assistant_message["function_call"]["arguments"]), strict=False) | |
| # else: | |
| # arg_dict = {'cell': assistant_message["function_call"]["arguments"]} | |
| # arg_dict = assistant_message["function_call"]["arguments"] | |
| # print(function_args) | |
| if is_valid_dict_string(assistant_message["function_call"]["arguments"])==True: | |
| arg_dict = json.loads(r"{jsonload}".format(jsonload=assistant_message["function_call"]["arguments"]), strict=False) | |
| arg_dict = arg_dict['cell'] | |
| print("arg_dict : " + arg_dict) | |
| else: | |
| arg_dict = assistant_message["function_call"]["arguments"] | |
| print(arg_dict) | |
| # Look up the function and call it with the provided arguments | |
| result = functions_dict[function_name](arg_dict) | |
| return result | |
| # print(result) | |
| def second_prompt_build(prompt, log): | |
| prompt_second = prompt.format(ans = log) | |
| # prompt_second = prompt % log | |
| return prompt_second | |
| def second_call(prompt, prompt_second, messages, function_name = "exec_python"): | |
| # Add a new message to the conversation with the function result | |
| messages.append({ | |
| "role": "function", | |
| "name": function_name, | |
| "content": str(prompt_second), # Convert the result to a string | |
| }) | |
| # Call the model again to generate a user-facing message based on the function result | |
| chat_response = chat_completion_request( | |
| messages, functions=functions | |
| ) | |
| assistant_message = chat_response.json()["choices"][0]["message"] | |
| messages.append(assistant_message) | |
| usage = chat_response.json()['usage'] | |
| cost2 = openai_api_calculate_cost(usage) | |
| # Print the final conversation | |
| # pretty_print_conversation(messages) | |
| return assistant_message, cost2, messages | |
| def main_function(init_prompt, prompt, user_input): | |
| first_call_result, cost1, messages = first_call(init_prompt, user_input) | |
| function_call_process_result = function_call_process(first_call_result) | |
| second_prompt_build_result = second_prompt_build(prompt, function_call_process_result) | |
| second_call_result, cost2, finalmessages = second_call(second_prompt_build_result, function_call_process_result, messages) | |
| finalcostresult = cost1 + cost2 | |
| finalcostrpresult = finalcostresult * 15000 | |
| return first_call_result, function_call_process_result, second_prompt_build_result, second_call_result, cost1, cost2, finalmessages, finalcostresult, finalcostrpresult | |
| def gradio_function(): | |
| init_prompt = gr.Textbox(label="init_prompt (for 1st call)") | |
| prompt = gr.Textbox(label="prompt (for 2nd call)") | |
| user_input = gr.Textbox(label="User Input") | |
| output_1st_call = gr.JSON(label="Assistant (output_1st_call)") | |
| output_fc_call = gr.Textbox(label="Function Call (exec_python) Result (output_fc_call)") | |
| output_fc_call_with_prompt = gr.Textbox(label="Building 2nd Prompt (output_fc_call_with_2nd_prompt)") | |
| output_2nd_call = gr.JSON(label="Assistant (output_2nd_call_buat_user)") | |
| cost = gr.Textbox(label="Cost 1") | |
| cost2 = gr.Textbox(label="Cost 2") | |
| finalcost = gr.Textbox(label="Final Cost ($)") | |
| finalcostrp = gr.Textbox(label="Final Cost (Rp)") | |
| finalmessages = gr.JSON(label="Final Messages") | |
| iface = gr.Interface( | |
| fn=main_function, | |
| inputs=[init_prompt, prompt, user_input], | |
| outputs=[output_1st_call, output_fc_call, output_fc_call_with_prompt, output_2nd_call, cost, cost2, finalmessages, finalcost, finalcostrp], | |
| title="Test", | |
| description="Accuracy", | |
| ) | |
| iface.launch(share=True, debug=True) | |
| if __name__ == "__main__": | |
| gradio_function() |