import os
import openai
import datetime
import time
import gradio as gr
import json
from jinja2 import Template
import requests
from pdfminer.high_level import extract_text
#import pdfkit
import pdfkit
# Initialize OpenAI
openai.api_key = os.environ.get('OPENAI_API_KEY')
# Configuration variables
airtable_api_key = os.environ.get('AIRTABLE_API_KEY')
# Airtable table names
prompts_table_name = 'tblYIZEB8m6JkGDEP'
users_table_name = 'tblLNe5ZL47SvrAEk'
user_log_table_name = 'tblrlTsRrkl6BqMAJ'
compliancelog_table_name = 'tblQMXWKGlOonkIw2'
policies_table_name = 'tbla6PC65qZfqdJhE'
# Define the style and content for the response field
label_text = "Contract Redline"
color = "#6562F4"
background_color = "white"
border_radius = "10px"
# response_label = f'
{label_text}
'
response_label = f'{label_text}'
base_id = 'appcUK3hUWC7GM2Kb'
# App name for user login logging
app = "Compliance"
headers = {
"Authorization": f"Bearer {airtable_api_key}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def prompt_trim(prompt: str) -> str:
lines = prompt.split('\n')
trimmed = '\n'.join([l.strip() for l in lines])
return trimmed
def get_policy_text(school):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}'
# Parameters for the API request to select only the 'school' field
params = {
'filterByFormula': f'school="{school}"',
'fields[]': 'policy_text'
}
global policy_text
policy_text = ''
try:
# Send a GET request to the Airtable API
response = requests.get(airtable_endpoint, headers=headers, params=params)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# Check if there are records in the response
if data.get('records'):
# Extract the 'school' values from each record
policy_text = [record['fields']['policy_text'] for record in data['records']]
else:
print("No records found in the 'policies' table.")
else:
print(f"Failed to retrieve data. Status code: {response.status_code}")
except Exception as e:
print(f"An error occurred: {str(e)}")
#print(policy_text)
return policy_text
def get_schools():
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}'
# Parameters for the API request to select only the 'school' field
params = {
'fields[]': 'school', # Replace with the name of your field
'sort[0][field]': 'school', # Sort by the 'school' field
'sort[0][direction]': 'asc', # Sort in ascending order
}
schools = ''
try:
# Send a GET request to the Airtable API
response = requests.get(airtable_endpoint, headers=headers, params=params)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# Check if there are records in the response
if data.get('records'):
# Extract the 'school' values from each record
schools = [record['fields']['school'] for record in data['records']]
else:
print("No records found in the 'policies' table.")
else:
print(f"Failed to retrieve data. Status code: {response.status_code}")
except Exception as e:
print(f"An error occurred: {str(e)}")
return schools
def get_prompt(header, template_content):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{prompts_table_name}'
params = {
'filterByFormula': "prompt_name='Compliance_v1'",
}
response = requests.get(airtable_endpoint, headers=headers, params=params)
# Check for errors
response.raise_for_status()
data = response.json()
# Check if there is at least one record matching the condition
if data.get('records'):
# Get the first record (there should be only one)
record = data['records'][0]['fields']
# Assign system_prompt and user_prompt to variables
header = record.get('system_prompt', '')
template_content = record.get('user_prompt', '')
return header, template_content
def append_to_at_compliancelog(policy_name_dd,contract_text,gpt_response, response_time, question_cost, prompt_tokens, completion_tokens):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{compliancelog_table_name}'
# Organize data for Airtable
new_fields = {
'policy_name': str(policy_name_dd),
'contract_text': str(contract_text),
'gpt_response': str(gpt_response),
'response_time': str(response_time),
'question_cost': question_cost,
'user_name': str(logged_in_user),
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens
}
data = {
'fields': new_fields
}
try:
# Post data to Airtable
response = requests.post(airtable_endpoint, headers=headers, json=data)
# print(response.json())
# Check for errors
response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
# Handle the HTTP error (e.g., log it or display an error message)
print(f"HTTP error occurred: {http_error}")
except Exception as e:
# Handle exceptions, log errors, or raise them as needed
print(f"An error occurred: {str(e)}")
# Chatbot Function
def chatbot(policy_name_dd,contract_text,progress=gr.Progress()):
start_time = datetime.datetime.now()
progress(progress=None)
"""
time.sleep(10)
for i in progress.tqdm(range(100)):
time.sleep(1)
"""
#print(policy_name)
#students = get_students(school_selection)
get_policy_text(policy_name_dd)
#print(policy_text)
#print(contract_text)
template_content = ''
header = ''
header, template_content = get_prompt(header, template_content)
# print(header)
# print(template_content)
# Create a Jinja2 template from the content
template = Template(template_content)
# Render the template with the inputs
analysis_input = template.render(contract_text=contract_text,policy_text=policy_text)
trimmed_input = prompt_trim(analysis_input)
"""
with open('analysis_input.txt', 'w', encoding='utf-8') as out_file:
out_file.write(trimmed_input)
"""
response = openai.ChatCompletion.create(
model="gpt-4",
# model="gpt-3.5-turbo",
temperature=0,
messages=[
{
"role": "system",
"content": header
},
{
"role": "user",
"content": analysis_input
}
]
)
gpt_response = response.choices[0].message["content"]
tokens_used = response.usage
question_cost = (tokens_used.get('total_tokens', 0) / 1000) * .03
prompt_tokens = tokens_used.get('prompt_tokens', )
completion_tokens = tokens_used.get('completion_tokens', 0)
"""
with open('response.txt', 'w', encoding='utf-8') as out_file:
out_file.write(gpt_response)
"""
end_time = datetime.datetime.now()
response_time = end_time - start_time
contract_redline = gpt_response
#print(contract_redline)
append_to_at_compliancelog(policy_name_dd,contract_text,contract_redline, response_time, question_cost, prompt_tokens, completion_tokens)
return {contract_redline_html: contract_redline, download_row: gr.Row(visible=True)}
def log_login(username):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{user_log_table_name}'
# Organize data for Airtable
new_fields = {
'user_name': str(username),
'app': str(app)
}
data = {
'fields': new_fields
}
try:
# Post data to Airtable
response = requests.post(airtable_endpoint, headers=headers, json=data)
# Check for errors
response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
# Handle the HTTP error (e.g., log it or display an error message)
print(f"HTTP error occurred: {http_error}")
except Exception as e:
# Handle exceptions, log errors, or raise them as needed
print(f"An error occurred: {str(e)}")
def login_auth(username, password):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{users_table_name}'
# Query the 'users' table to check for a match with the provided username and password
params = {
'filterByFormula': f'AND(user_name = "{username}", password = "{password}")'
}
response = requests.get(airtable_endpoint, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
if data.get('records'):
log_login(username)
global logged_in_user
logged_in_user = username
return True
print(f"Invalid user/password combination")
return False
def pdf_to_text(contract_file_cmpt, contract_text_tbox, file_name_tbox):
file_text = extract_text(contract_file_cmpt.name)
original_file_name = contract_file_cmpt.name.split("/")[-1]
redline_file_name = original_file_name.split(".")[0]+" Redline.pdf"
return file_text, redline_file_name
def download_pdf(compliance_comments,contract_redline_html,file_name_tbox):
contract_redline_comments = "Contract Redline:
"+contract_redline_html + "
Compliance Comments:
"+compliance_comments
global pdf_download
pdf_download = pdfkit.from_string(contract_redline_comments,None)
return {pdf_download_file: file_name_tbox}
# Gradio UI
CIMStheme = gr.themes.Soft().set(
button_primary_background_fill='#6562F4',
)
schools = get_schools()
#policy_text = get_policy_text("LSU") #for testing the function call
# scout_response = "HTMLTableGeneratorRank | CIMS.AI Scout Ranking | Athlete Name | Gender | Primary Sport | Social Media Fan Total | Interests | Commentary on Ranking |
---|
| | | | | | | |
"
scout_response = "HTMLTableGeneratorRank | CIMS.AI Scout Ranking | Athlete Name | Gender | Primary Sport | Social Media Fan Total | Interests | Commentary on Ranking |
---|
| | | | | | | |
"
#contract_redline = "Campaign:
Engagement Name: Applebee's Fall Burger Promo
Engagement Id: 7015j000001DvCAAA0
Sponsor: Applebee's
Start Date: 2023-10-28
End Date: 2023-11-11
Engagement Description
The goal of the engagement is to Increase Sales by having the Student-Athlete Social Media Post. For the Social Media Post, the sponsor is requesting the student athlete take a photo in front of the football stadium in eating a Applebee's burger in your team jersey. The Media rights for the content will be 90 Days.
Engagement Compensation.
For successful completion of the engagement the student-athlete will receive payment in the form of Cash.
Part or all of the payment will be in cash, paid via PayPal.
The total value of compensation will be 250.
"
#pdf_download = pdfkit.from_string(contract_redline, False)
#print(pdf_download)
logged_in_user = 'admin'
file_text = ''
contract_text = ''
policy_text = ''
compliance_comments = ''
file_name = 'redline.pdf'
pdf_download = ''
with gr.Blocks(CIMStheme) as iface:
with gr.Row():
with gr.Column(scale=2):
gr.Image(label="Logo", value="CIMS Logo Purple.png", width=10, show_download_button=False,
interactive=False, show_label=False, elem_id="logo", container=False)
with gr.Column(scale=2):
gr.Markdown(value="Compliance Desktop - Powered by CIMS.AI
")
with gr.Column(scale=2):
gr.Markdown("")
with gr.Row():
with gr.Column(variant='panel',scale=1):
policy_name_dd = gr.components.Dropdown(schools, multiselect=False,label="NIL Policy Selection")
contract_file_cmpt = gr.File(label="Select Contract File",file_count="single",file_types=[".pdf"],height=150)
with gr.Column(variant='panel',scale=3):
contract_text_tbox = gr.Textbox(label="Contract Text",interactive=True,info="Upload .pdf or paste in text")
with gr.Row():
with gr.Column():
upload_btn = gr.components.Button(value="Upload Contract", size='sm', variant="primary")
with gr.Column():
gr.Markdown("")
with gr.Column():
redline_btn = gr.components.Button(value="Redline Contract", size='sm', variant="primary")
with gr.Column():
gr.Markdown("")
with gr.Row():
with gr.Column(variant='panel'):
gr.components.Markdown(response_label)
contract_redline_html = gr.components.HTML(label="Contract Redline")
compliance_comments_tbox = gr.Textbox(interactive=True,label='Compliance Comments')
with gr.Row(visible=False) as download_row:
with gr.Column(variant='panel'):
file_name_tbox = gr.Textbox(interactive=False,label='File Name',visible=False)
pdf_download_file = gr.File()
download_btn = gr.components.Button(value="Create Redline PDF", size='sm', variant="primary")
upload_btn.click(pdf_to_text,inputs=[contract_file_cmpt,contract_text_tbox,file_name_tbox],outputs=[contract_text_tbox,file_name_tbox])
redline_btn.click(chatbot,inputs=[policy_name_dd,contract_text_tbox],outputs=[contract_redline_html,download_row])
download_btn.click(download_pdf,inputs=[compliance_comments_tbox,contract_redline_html,file_name_tbox],outputs=pdf_download_file)
with gr.Column():
gr.Markdown("")
#pdf_download_file = gr.File()
#email_btn = gr.components.Button(value="Email Redline", size='sm', variant="primary")
#email_btn.click(send_contract_email)
#download_btn.click(download_pdf,inputs=[file_name_tbox,compliance_comments_tbox,contract_redline_html])
with gr.Column():
gr.Markdown("")
with gr.Column():
gr.Markdown("")
with gr.Row():
with gr.Column():
gr.HTML('CIMS.AI Confidential 2023')
#iface.queue(concurrency_count=20).launch(auth=login_auth, auth_message= "Enter your username and password that you received from CIMS.AI. To request a login, please email 'info@cims.ai'")
iface.queue(concurrency_count=20).launch()