Spaces:
Runtime error
Runtime error
import os | |
import openai | |
from openai.error import OpenAIError | |
import datetime | |
import gradio as gr | |
import json | |
from jinja2 import Template | |
import requests | |
# Initialize OpenAI | |
openai.api_key = os.environ.get('OPENAI_API_KEY') | |
# Configuration variables | |
airtable_api_key = os.environ.get('AIRTABLE_API_KEY') | |
# Airtable table names | |
policies_table_name = 'tbla6PC65qZfqdJhE' | |
prompts_table_name = 'tblYIZEB8m6JkGDEP' | |
qalog_table_name = 'tbl4oNgFPWM5xH1XO' | |
examples_table_name = 'tblu7sraOEmRgEGkp' | |
users_table_name = 'tblLNe5ZL47SvrAEk' | |
user_log_table_name = 'tblrlTsRrkl6BqMAJ' | |
# Define the style and content for the response field | |
label_text = "NILI Response" | |
color = "#6562F4" | |
background_color = "white" | |
border_radius = "10px" | |
response_label = f'<span style="display: inline-block; position: relative; z-index: var(--layer-4); border: solid var(--block-title-border-width) var(--block-title-border-color); border-radius: var(--block-title-radius); background: var(--block-title-background-fill); padding: var(--block-title-padding); color: var(--block-title-text-color); font-weight: var(--block-title-text-weight); font-size: var(--block-title-text-size); line-height: var(--line-sm); margin-bottom: var(--spacing-lg);">{label_text}</span>' | |
#Airtable Base ID | |
base_id = 'appcUK3hUWC7GM2Kb' | |
#Name of the prompt temlate record | |
#prompt_name = "NILI_v1" | |
#App name for user login logging | |
app="NILI_Mobile" | |
#Header for the Airtable requests | |
headers = { | |
"Authorization": f"Bearer {airtable_api_key}", | |
"Content-Type": "application/json", | |
"Accept": "application/json", | |
} | |
def set_defaults(request: gr.Request): | |
if request.username: | |
print(request.username) | |
else: | |
print("No User Name") | |
return | |
#Function to trim prompts....not used | |
def prompt_trim(prompt: str) -> str: | |
lines = prompt.split('\n') | |
trimmed = '\n'.join([l.strip() for l in lines]) | |
return trimmed | |
#Get the policies for the selected schools and concatenate them. | |
def get_policies(user_school): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' | |
# Parameters for the API request to filter by 'school' field and retrieve 'policy_text' | |
params = { | |
'filterByFormula': f'school = "{user_school}"', | |
'fields[]': 'policy_text' # Replace with the name of your field | |
} | |
# Initialize an empty string to store concatenated policies | |
concatenated_policies = '' | |
#print(params) | |
try: | |
# Send a GET request to the Airtable API | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
# Parse the JSON response | |
data = response.json() | |
# Check if there are records in the response | |
if data.get('records'): | |
# Extract the 'policy_text' values from each record and concatenate them | |
for record in data['records']: | |
policy_text = record['fields']['policy_text'] | |
if concatenated_policies: | |
concatenated_policies += "\n----------\n" | |
concatenated_policies += policy_text | |
else: | |
print("No records found in the 'policies' table for the selected schools.") | |
else: | |
print(f"Failed to retrieve data. Status code: {response.status_code}") | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
#print(concatenated_policies) | |
return concatenated_policies | |
#Get a list of School Name from the policies for the UI dropdown | |
def get_schools(): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' | |
# Parameters for the API request to select only the 'school' field | |
params = { | |
'filterByFormula': f'app = "{app}"', | |
'fields[]': 'school', # Replace with the name of your field | |
'sort[0][field]': 'school', # Sort by the 'school' field | |
'sort[0][direction]': 'asc', # Sort in ascending order | |
} | |
schools = [] | |
try: | |
# Send a GET request to the Airtable API | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
# Parse the JSON response | |
data = response.json() | |
# Check if there are records in the response | |
if data.get('records'): | |
# Extract the 'school' values from each record | |
schools = [record['fields']['school'] for record in data['records']] | |
else: | |
print("No records found in the 'policies' table.") | |
else: | |
print(f"Failed to retrieve data. Status code: {response.status_code}") | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
return schools | |
#Get the designated prompt template record | |
def get_prompt(header, template_content): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{prompts_table_name}' | |
params = { | |
'filterByFormula': "prompt_name='NILI_v1'" | |
} | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check for errors | |
response.raise_for_status() | |
data = response.json() | |
# Check if there is at least one record matching the condition | |
if data.get('records'): | |
# Get the first record (there should be only one) | |
record = data['records'][0]['fields'] | |
# Assign system_prompt and user_prompt to variables | |
header = record.get('system_prompt', '') | |
template_content = record.get('user_prompt', '') | |
return header, template_content | |
def get_examples(): | |
ui_examples = [] | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{examples_table_name}' | |
params = { | |
'filterByFormula': f'app = "{app}"', | |
'fields[]': 'nil_question' | |
} | |
# Send your request and parse the response | |
response = requests.get(airtable_endpoint, headers=headers,params=params) | |
data = json.loads(response.text) | |
# Check for errors | |
response.raise_for_status() | |
# Initialize a nested list to store nil_questions | |
nested_nil_questions = [] | |
for record in data['records']: | |
nil_question = record['fields']['nil_question'] | |
ui_examples.append(nil_question) | |
#print(ui_examples) | |
return ui_examples | |
def append_to_at_qalog(your_role, school_selection, output_format, input_text, gpt_response,response_time,question_cost,prompt_tokens,completion_tokens): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{qalog_table_name}' | |
# Organize data for Airtable | |
new_fields = { | |
'your_role': str(your_role), | |
'school_selection': str(school_selection), | |
'output_format': str(output_format), | |
'input_text': str(input_text), | |
'gpt_response': str(gpt_response), | |
'response_time': str(response_time), | |
'question_cost': question_cost, | |
'user_name': str(logged_in_user), | |
'prompt_tokens': prompt_tokens, | |
'completion_tokens': completion_tokens | |
} | |
data = { | |
'fields': new_fields | |
} | |
try: | |
# Post data to Airtable | |
response = requests.post(airtable_endpoint, headers=headers, json=data) | |
# Check for errors | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as http_error: | |
# Handle the HTTP error (e.g., log it or display an error message) | |
print(f"HTTP error occurred: {http_error}") | |
except Exception as e: | |
# Handle exceptions, log errors, or raise them as needed | |
print(f"An error occurred: {str(e)}") | |
#Chatbot Function | |
def chatbot(school_ddss,input_text): | |
start_time = datetime.datetime.now() | |
school_text = school_ddss | |
# Read the Hydrated policies | |
policies = get_policies(school_text) | |
template_content = '' | |
header = '' | |
header, template_content = get_prompt(header, template_content) | |
header_template = Template(header) | |
#merged_header = header_template.render(your_role=your_role) | |
merged_header = header_template.render(your_role=user_user_role) | |
# Create a Jinja2 template from the content | |
template = Template(template_content) | |
# Render the template with the policy JSON | |
analysis_input = template.render(policies=policies, question=input_text,format=user_output_format,your_role=user_user_role) | |
trimmed_input = prompt_trim(analysis_input) | |
response = openai.ChatCompletion.create( | |
model="gpt-4", | |
temperature=0, | |
messages=[ | |
{ | |
"role": "system", | |
"content": merged_header | |
}, | |
{ | |
"role": "user", | |
"content": analysis_input | |
} | |
] | |
) | |
gpt_response = response.choices[0].message["content"] | |
tokens_used = response.usage | |
question_cost = (tokens_used.get('total_tokens', 0) / 1000) * .03 | |
prompt_tokens = tokens_used.get('prompt_tokens',) | |
completion_tokens = tokens_used.get('completion_tokens', 0) | |
end_time = datetime.datetime.now() | |
response_time = end_time - start_time | |
append_to_at_qalog(user_user_role, school_text, user_output_format, input_text, gpt_response, response_time, | |
question_cost, prompt_tokens, completion_tokens) | |
return {nili_response_html: gpt_response} | |
def log_login(username): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{user_log_table_name}' | |
# Organize data for Airtable | |
new_fields = { | |
'user_name': str(username), | |
'app': str(app) | |
} | |
data = { | |
'fields': new_fields | |
} | |
try: | |
# Post data to Airtable | |
response = requests.post(airtable_endpoint, headers=headers, json=data) | |
# Check for errors | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as http_error: | |
# Handle the HTTP error (e.g., log it or display an error message) | |
print(f"HTTP error occurred: {http_error}") | |
except Exception as e: | |
# Handle exceptions, log errors, or raise them as needed | |
print(f"An error occurred: {str(e)}") | |
def login_auth(username, password): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{users_table_name}' | |
# Query the 'users' table to check for a match with the provided username and password | |
params = { | |
'filterByFormula': f'AND(user_name = "{username}", password = "{password}")' | |
} | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
#If the matching user/password record is found: | |
if data.get('records'): | |
user_details = data['records'][0]['fields'] | |
log_login(username) | |
#Set the global logged_in_user variable. This used in the append_to_at_qalog function to track what user asked the question | |
global logged_in_user,user_user_role, user_output_format, user_school | |
user_user_role = user_details.get('user_role') | |
user_output_format = user_details.get('output_format') | |
user_school = user_details.get('school_name', [None])[0] | |
logged_in_user = username | |
return True | |
print(f"Invalid user/password combination") | |
return False | |
#Gradio UI | |
#CIMStheme = gr.themes.Soft().set(button_primary_background_fill='#6562F4',body_background_fill='#e6effc') | |
CIMStheme = gr.themes.Soft().set(button_primary_background_fill='#6562F4') | |
# Initialize an empty list to store the examples | |
#ui_examples = [] | |
school_selection = [] | |
#username_text = "" | |
#Gets the school for the school selection DD | |
schools = get_schools() | |
#This code runs first before the login screen, so if there's no login screen used, this sets the logged_in_user. Otherwise it is set in the login function. | |
logged_in_user = 'admin' | |
#global user_user_role, user_output_format, user_school | |
user_user_role = "Student Athlete" | |
user_output_format = "Table" | |
#user_school = [] | |
with gr.Blocks(CIMStheme) as iface: | |
with gr.Row(): | |
with gr.Column(): | |
gr.Image(label="Logo",value="CIMS Logo Purple.png",height=100,show_download_button=False,interactive=False,show_label=False,elem_id="logo",container=False) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown(value="<H2 style='text-align: center;'>NILI - Powered by CIMS.AI</h2>") | |
with gr.Row(): | |
with gr.Column(variant='panel'): | |
school_ddss = gr.components.Dropdown(schools, multiselect=False, info="Select your school.", label="School") | |
with gr.Row(): | |
with gr.Column(variant='panel'): | |
question_tbox= gr.components.Textbox(lines=3, placeholder="Enter your question here", label="NIL Question",info="Depending the scope of your question, results may take 30-60 sec.",interactive=True) | |
with gr.Row(variant='panel'): | |
with gr.Column(): | |
examples_cmpt= gr.Examples(examples=get_examples(),inputs=question_tbox,label="Sample NIL questions - Click to select one") | |
with gr.Row(): | |
with gr.Column(): | |
submit_btn = gr.components.Button(value="Submit", size='sm', variant="primary") | |
with gr.Row(): | |
with gr.Column(variant='panel'): | |
response_label_md = gr.components.Markdown(response_label) | |
nili_response_html = gr.components.HTML(label="NILI Response") | |
submit_btn.click(fn=chatbot,inputs=[school_ddss, question_tbox], | |
outputs=[nili_response_html]) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown(value="[Ask Questions/Provide Feedback](https://discord.com/channels/1168589934244929647/1168589934244929650)") | |
with gr.Row(): | |
with gr.Column(): | |
gr.HTML('<center><i>© 2023 Collegiate Influencer Marketing Systems, Inc.</i><br>CIMS.AI, CIMS.AI logo, NILI, NILI logo, and EzNIL are trademarks of Collegiate Influencer Marketing Systems, Inc.</center>') | |
#iface.load(fn=set_defaults) | |
#iface.launch(auth=login_auth, auth_message= "Enter your username and password that you received from CIMS.AI. To request a login, please email 'info@cims.ai'") | |
#iface.launch(auth=("admin","admin")) | |
iface.launch() |