import os import openai from openai.error import OpenAIError import datetime import gradio as gr import json from jinja2 import Template import requests # Initialize OpenAI openai.api_key = os.environ.get('OPENAI_API_KEY') # Configuration variables airtable_api_key = os.environ.get('AIRTABLE_API_KEY') # Airtable table names policies_table_name = 'tbla6PC65qZfqdJhE' prompts_table_name = 'tblYIZEB8m6JkGDEP' qalog_table_name = 'tbl4oNgFPWM5xH1XO' examples_table_name = 'tblu7sraOEmRgEGkp' users_table_name = 'tblLNe5ZL47SvrAEk' user_log_table_name = 'tblrlTsRrkl6BqMAJ' # Define the style and content for the response field label_text = "NILI Response" color = "#6562F4" background_color = "white" border_radius = "10px" response_label = f'{label_text}' #Airtable Base ID base_id = 'appcUK3hUWC7GM2Kb' #Name of the prompt temlate record #prompt_name = "NILI_v1" #App name for user login logging app="NILI_Mobile" #Header for the Airtable requests headers = { "Authorization": f"Bearer {airtable_api_key}", "Content-Type": "application/json", "Accept": "application/json", } def set_defaults(request: gr.Request): if request.username: print(request.username) else: print("No User Name") return #Function to trim prompts....not used def prompt_trim(prompt: str) -> str: lines = prompt.split('\n') trimmed = '\n'.join([l.strip() for l in lines]) return trimmed #Get the policies for the selected schools and concatenate them. def get_policies(user_school): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' # Parameters for the API request to filter by 'school' field and retrieve 'policy_text' params = { 'filterByFormula': f'school = "{user_school}"', 'fields[]': 'policy_text' # Replace with the name of your field } # Initialize an empty string to store concatenated policies concatenated_policies = '' #print(params) try: # Send a GET request to the Airtable API response = requests.get(airtable_endpoint, headers=headers, params=params) # Check if the request was successful (status code 200) if response.status_code == 200: # Parse the JSON response data = response.json() # Check if there are records in the response if data.get('records'): # Extract the 'policy_text' values from each record and concatenate them for record in data['records']: policy_text = record['fields']['policy_text'] if concatenated_policies: concatenated_policies += "\n----------\n" concatenated_policies += policy_text else: print("No records found in the 'policies' table for the selected schools.") else: print(f"Failed to retrieve data. Status code: {response.status_code}") except Exception as e: print(f"An error occurred: {str(e)}") #print(concatenated_policies) return concatenated_policies #Get a list of School Name from the policies for the UI dropdown def get_schools(): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' # Parameters for the API request to select only the 'school' field params = { 'filterByFormula': f'app = "{app}"', 'fields[]': 'school', # Replace with the name of your field 'sort[0][field]': 'school', # Sort by the 'school' field 'sort[0][direction]': 'asc', # Sort in ascending order } schools = [] try: # Send a GET request to the Airtable API response = requests.get(airtable_endpoint, headers=headers, params=params) # Check if the request was successful (status code 200) if response.status_code == 200: # Parse the JSON response data = response.json() # Check if there are records in the response if data.get('records'): # Extract the 'school' values from each record schools = [record['fields']['school'] for record in data['records']] else: print("No records found in the 'policies' table.") else: print(f"Failed to retrieve data. Status code: {response.status_code}") except Exception as e: print(f"An error occurred: {str(e)}") return schools #Get the designated prompt template record def get_prompt(header, template_content): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{prompts_table_name}' params = { 'filterByFormula': "prompt_name='NILI_v1'" } response = requests.get(airtable_endpoint, headers=headers, params=params) # Check for errors response.raise_for_status() data = response.json() # Check if there is at least one record matching the condition if data.get('records'): # Get the first record (there should be only one) record = data['records'][0]['fields'] # Assign system_prompt and user_prompt to variables header = record.get('system_prompt', '') template_content = record.get('user_prompt', '') return header, template_content def get_examples(): ui_examples = [] airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{examples_table_name}' params = { 'filterByFormula': f'app = "{app}"', 'fields[]': 'nil_question' } # Send your request and parse the response response = requests.get(airtable_endpoint, headers=headers,params=params) data = json.loads(response.text) # Check for errors response.raise_for_status() # Initialize a nested list to store nil_questions nested_nil_questions = [] for record in data['records']: nil_question = record['fields']['nil_question'] ui_examples.append(nil_question) #print(ui_examples) return ui_examples def append_to_at_qalog(your_role, school_selection, output_format, input_text, gpt_response,response_time,question_cost,prompt_tokens,completion_tokens): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{qalog_table_name}' # Organize data for Airtable new_fields = { 'your_role': str(your_role), 'school_selection': str(school_selection), 'output_format': str(output_format), 'input_text': str(input_text), 'gpt_response': str(gpt_response), 'response_time': str(response_time), 'question_cost': question_cost, 'user_name': str(logged_in_user), 'prompt_tokens': prompt_tokens, 'completion_tokens': completion_tokens } data = { 'fields': new_fields } try: # Post data to Airtable response = requests.post(airtable_endpoint, headers=headers, json=data) # Check for errors response.raise_for_status() except requests.exceptions.HTTPError as http_error: # Handle the HTTP error (e.g., log it or display an error message) print(f"HTTP error occurred: {http_error}") except Exception as e: # Handle exceptions, log errors, or raise them as needed print(f"An error occurred: {str(e)}") #Chatbot Function def chatbot(school_ddss,input_text): start_time = datetime.datetime.now() school_text = school_ddss # Read the Hydrated policies policies = get_policies(school_text) template_content = '' header = '' header, template_content = get_prompt(header, template_content) header_template = Template(header) #merged_header = header_template.render(your_role=your_role) merged_header = header_template.render(your_role=user_user_role) # Create a Jinja2 template from the content template = Template(template_content) # Render the template with the policy JSON analysis_input = template.render(policies=policies, question=input_text,format=user_output_format,your_role=user_user_role) trimmed_input = prompt_trim(analysis_input) response = openai.ChatCompletion.create( model="gpt-4", temperature=0, messages=[ { "role": "system", "content": merged_header }, { "role": "user", "content": analysis_input } ] ) gpt_response = response.choices[0].message["content"] tokens_used = response.usage question_cost = (tokens_used.get('total_tokens', 0) / 1000) * .03 prompt_tokens = tokens_used.get('prompt_tokens',) completion_tokens = tokens_used.get('completion_tokens', 0) end_time = datetime.datetime.now() response_time = end_time - start_time append_to_at_qalog(user_user_role, school_text, user_output_format, input_text, gpt_response, response_time, question_cost, prompt_tokens, completion_tokens) return {nili_response_html: gpt_response} def log_login(username): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{user_log_table_name}' # Organize data for Airtable new_fields = { 'user_name': str(username), 'app': str(app) } data = { 'fields': new_fields } try: # Post data to Airtable response = requests.post(airtable_endpoint, headers=headers, json=data) # Check for errors response.raise_for_status() except requests.exceptions.HTTPError as http_error: # Handle the HTTP error (e.g., log it or display an error message) print(f"HTTP error occurred: {http_error}") except Exception as e: # Handle exceptions, log errors, or raise them as needed print(f"An error occurred: {str(e)}") def login_auth(username, password): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{users_table_name}' # Query the 'users' table to check for a match with the provided username and password params = { 'filterByFormula': f'AND(user_name = "{username}", password = "{password}")' } response = requests.get(airtable_endpoint, headers=headers, params=params) if response.status_code == 200: data = response.json() #If the matching user/password record is found: if data.get('records'): user_details = data['records'][0]['fields'] log_login(username) #Set the global logged_in_user variable. This used in the append_to_at_qalog function to track what user asked the question global logged_in_user,user_user_role, user_output_format, user_school user_user_role = user_details.get('user_role') user_output_format = user_details.get('output_format') user_school = user_details.get('school_name', [None])[0] logged_in_user = username return True print(f"Invalid user/password combination") return False #Gradio UI #CIMStheme = gr.themes.Soft().set(button_primary_background_fill='#6562F4',body_background_fill='#e6effc') CIMStheme = gr.themes.Soft().set(button_primary_background_fill='#6562F4') # Initialize an empty list to store the examples #ui_examples = [] school_selection = [] #username_text = "" #Gets the school for the school selection DD schools = get_schools() #This code runs first before the login screen, so if there's no login screen used, this sets the logged_in_user. Otherwise it is set in the login function. logged_in_user = 'admin' #global user_user_role, user_output_format, user_school user_user_role = "Student Athlete" user_output_format = "Table" #user_school = [] with gr.Blocks(CIMStheme) as iface: with gr.Row(): with gr.Column(): gr.Image(label="Logo",value="CIMS Logo Purple.png",height=100,show_download_button=False,interactive=False,show_label=False,elem_id="logo",container=False) with gr.Row(): with gr.Column(): gr.Markdown(value="