import os
import io
import openai
import datetime
import time
import gradio as gr
import json
from jinja2 import Template
import requests
import fitz
from xhtml2pdf import pisa
from io import BytesIO
# Initialize OpenAI
openai.api_key = os.environ.get('OPENAI_API_KEY')
# Configuration variables
airtable_api_key = os.environ.get('AIRTABLE_API_KEY')
# Airtable table names
prompts_table_name = 'tblYIZEB8m6JkGDEP'
users_table_name = 'tblLNe5ZL47SvrAEk'
user_log_table_name = 'tblrlTsRrkl6BqMAJ'
compliancelog_table_name = 'tblQMXWKGlOonkIw2'
policies_table_name = 'tbla6PC65qZfqdJhE'
# Define the style and content for the response field
label_text = "Contract Redline"
color = "#6562F4"
background_color = "white"
border_radius = "10px"
# response_label = f'
{label_text}
'
response_label = f'{label_text}'
base_id = 'appcUK3hUWC7GM2Kb'
# App name for user login logging
app = "Compliance"
headers = {
"Authorization": f"Bearer {airtable_api_key}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def prompt_trim(prompt: str) -> str:
lines = prompt.split('\n')
trimmed = '\n'.join([l.strip() for l in lines])
return trimmed
def get_policy_text(school):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}'
# Parameters for the API request to select only the 'school' field
params = {
'filterByFormula': f'school="{school}"',
'fields[]': 'policy_text'
}
global policy_text
policy_text = ''
try:
# Send a GET request to the Airtable API
response = requests.get(airtable_endpoint, headers=headers, params=params)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# Check if there are records in the response
if data.get('records'):
# Extract the 'school' values from each record
policy_text = [record['fields']['policy_text'] for record in data['records']]
else:
print("No records found in the 'policies' table.")
else:
print(f"Failed to retrieve data. Status code: {response.status_code}")
except Exception as e:
print(f"An error occurred: {str(e)}")
#print(policy_text)
return policy_text
def get_schools():
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}'
# Parameters for the API request to select only the 'school' field
params = {
'fields[]': 'school', # Replace with the name of your field
'sort[0][field]': 'school', # Sort by the 'school' field
'sort[0][direction]': 'asc', # Sort in ascending order
}
schools = ''
try:
# Send a GET request to the Airtable API
response = requests.get(airtable_endpoint, headers=headers, params=params)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# Check if there are records in the response
if data.get('records'):
# Extract the 'school' values from each record
schools = [record['fields']['school'] for record in data['records']]
else:
print("No records found in the 'policies' table.")
else:
print(f"Failed to retrieve data. Status code: {response.status_code}")
except Exception as e:
print(f"An error occurred: {str(e)}")
return schools
def get_prompt(header, template_content):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{prompts_table_name}'
params = {
'filterByFormula': "prompt_name='Compliance_v1'",
}
response = requests.get(airtable_endpoint, headers=headers, params=params)
# Check for errors
response.raise_for_status()
data = response.json()
# Check if there is at least one record matching the condition
if data.get('records'):
# Get the first record (there should be only one)
record = data['records'][0]['fields']
# Assign system_prompt and user_prompt to variables
header = record.get('system_prompt', '')
template_content = record.get('user_prompt', '')
return header, template_content
def append_to_at_compliancelog(policy_name_dd,contract_text,gpt_response, response_time, question_cost, prompt_tokens, completion_tokens):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{compliancelog_table_name}'
# Organize data for Airtable
new_fields = {
'policy_name': str(policy_name_dd),
'contract_text': str(contract_text),
'gpt_response': str(gpt_response),
'response_time': str(response_time),
'question_cost': question_cost,
'user_name': str(logged_in_user),
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens
}
data = {
'fields': new_fields
}
try:
# Post data to Airtable
response = requests.post(airtable_endpoint, headers=headers, json=data)
# print(response.json())
# Check for errors
response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
# Handle the HTTP error (e.g., log it or display an error message)
print(f"HTTP error occurred: {http_error}")
except Exception as e:
# Handle exceptions, log errors, or raise them as needed
print(f"An error occurred: {str(e)}")
# Chatbot Function
def chatbot(policy_name_dd,contract_text,progress=gr.Progress()):
start_time = datetime.datetime.now()
progress(progress=None)
"""
time.sleep(10)
for i in progress.tqdm(range(100)):
time.sleep(1)
"""
#print(policy_name)
#students = get_students(school_selection)
get_policy_text(policy_name_dd)
#print(policy_text)
#print(contract_text)
template_content = ''
header = ''
header, template_content = get_prompt(header, template_content)
# print(header)
# print(template_content)
# Create a Jinja2 template from the content
template = Template(template_content)
# Render the template with the inputs
analysis_input = template.render(contract_text=contract_text,policy_text=policy_text)
trimmed_input = prompt_trim(analysis_input)
"""
with open('analysis_input.txt', 'w', encoding='utf-8') as out_file:
out_file.write(trimmed_input)
"""
gpt_model = "gpt-4-1106-preview"
response = openai.chat.completions.create(
model=gpt_model,
temperature=0,
messages=[
{
"role": "system",
"content": header
},
{
"role": "user",
"content": analysis_input
}
]
)
gpt_response = response.choices[0].message.content
tokens_used = response.usage
if gpt_model == "gpt-4":
question_cost = (tokens_used.total_tokens / 1000) * .03
prompt_tokens = tokens_used.prompt_tokens
completion_tokens = tokens_used.completion_tokens
else:
prompt_tokens = tokens_used.prompt_tokens
completion_tokens = tokens_used.completion_tokens
question_cost = ((prompt_tokens / 1000) * .01) + ((completion_tokens / 1000) * .03)
"""
with open('response.txt', 'w', encoding='utf-8') as out_file:
out_file.write(gpt_response)
"""
end_time = datetime.datetime.now()
response_time = end_time - start_time
contract_redline = gpt_response
#print(contract_redline)
append_to_at_compliancelog(policy_name_dd,contract_text,contract_redline, response_time, question_cost, prompt_tokens, completion_tokens)
return {contract_redline_html: contract_redline, download_row: gr.Row(visible=True)}
def log_login(username):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{user_log_table_name}'
# Organize data for Airtable
new_fields = {
'user_name': str(username),
'app': str(app)
}
data = {
'fields': new_fields
}
try:
# Post data to Airtable
response = requests.post(airtable_endpoint, headers=headers, json=data)
# Check for errors
response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
# Handle the HTTP error (e.g., log it or display an error message)
print(f"HTTP error occurred: {http_error}")
except Exception as e:
# Handle exceptions, log errors, or raise them as needed
print(f"An error occurred: {str(e)}")
def login_auth(username, password):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{users_table_name}'
# Query the 'users' table to check for a match with the provided username and password
params = {
'filterByFormula': f'AND(user_name = "{username}", password = "{password}")'
}
response = requests.get(airtable_endpoint, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
if data.get('records'):
log_login(username)
global logged_in_user
logged_in_user = username
return True
print(f"Invalid user/password combination")
return False
def extract_text_with_spacing(pdf_path):
document = fitz.open(pdf_path)
all_text = []
for page in document:
# Extract text in a dict structure
blocks = page.get_text("dict")["blocks"]
for b in blocks:
if "lines" in b: # Check if the block contains lines of text
for line in b["lines"]:
span_texts = [span["text"] for span in line["spans"]]
all_text.append(" ".join(span_texts))
all_text.append("\n") # Presume a new block is a new paragraph
document.close()
return "\n".join(all_text)
def pdf_to_text(contract_file_cmpt, contract_text_tbox, file_name_tbox):
file_text = extract_text_with_spacing(contract_file_cmpt.name)
#file_text = extract_text(contract_file_cmpt.name)
original_file_name = contract_file_cmpt.name.split("/")[-1]
redline_file_name = original_file_name.split(".")[0]+" Redline.pdf"
return file_text, redline_file_name, None, None, None
def convert_html_to_pdf(source_html, output_filename):
# Result file stream
result_file = open(output_filename, "w+b")
# Convert HTML to PDF
pisa_status = pisa.CreatePDF(
BytesIO(source_html.encode("UTF-8")), # HTML content
dest=result_file) # File handle to receive the result
# Close the result file
result_file.close()
# Return True on success and False on errors
return pisa_status.err
def download_pdf(compliance_comments,contract_redline_html,file_name_tbox):
#config = pdfkit.configuration(wkhtmltopdf="/usr/local/bin/wkhtmltopdf")
contract_redline_comments = "Contract Redline:
"+contract_redline_html + "
Compliance Comments:
"+compliance_comments
#global pdf_download
#pdf_download = pdfkit.from_string(contract_redline_comments,file_name_tbox,configuration=config)
convert_html_to_pdf(contract_redline_comments, file_name_tbox)
return {pdf_download_file: file_name_tbox}
# Gradio UI
CIMStheme = gr.themes.Soft().set(
button_primary_background_fill='#6562F4',
)
schools = get_schools()
#policy_text = get_policy_text("LSU") #for testing the function call
# scout_response = "HTMLTableGeneratorRank | CIMS.AI Scout Ranking | Athlete Name | Gender | Primary Sport | Social Media Fan Total | Interests | Commentary on Ranking |
---|
| | | | | | | |
"
scout_response = "HTMLTableGeneratorRank | CIMS.AI Scout Ranking | Athlete Name | Gender | Primary Sport | Social Media Fan Total | Interests | Commentary on Ranking |
---|
| | | | | | | |
"
#contract_redline = "Campaign:
Engagement Name: Applebee's Fall Burger Promo
Engagement Id: 7015j000001DvCAAA0
Sponsor: Applebee's
Start Date: 2023-10-28
End Date: 2023-11-11
Engagement Description
The goal of the engagement is to Increase Sales by having the Student-Athlete Social Media Post. For the Social Media Post, the sponsor is requesting the student athlete take a photo in front of the football stadium in eating a Applebee's burger in your team jersey. The Media rights for the content will be 90 Days.
Engagement Compensation.
For successful completion of the engagement the student-athlete will receive payment in the form of Cash.
Part or all of the payment will be in cash, paid via PayPal.
The total value of compensation will be 250.
"
#pdf_download = pdfkit.from_string(contract_redline, False)
#print(pdf_download)
logged_in_user = 'admin'
file_text = ''
contract_text = ''
policy_text = ''
compliance_comments = ''
file_name = 'redline.pdf'
pdf_download = ''
with gr.Blocks(CIMStheme) as iface:
with gr.Row():
with gr.Column(scale=2):
gr.Image(label="Logo", value="CIMS Logo Purple.png", width=10, show_download_button=False,
interactive=False, show_label=False, elem_id="logo", container=False)
with gr.Column(scale=2):
gr.Markdown(value="Compliance Desktop - Powered by CIMS.AI
")
with gr.Column(scale=2):
gr.Markdown("")
with gr.Row():
with gr.Column(variant='panel',scale=1):
policy_name_dd = gr.components.Dropdown(schools, multiselect=False,label="NIL Policy Selection")
contract_file_cmpt = gr.File(label="Select Contract File",file_count="single",file_types=[".pdf"],height=150)
with gr.Column(variant='panel',scale=3):
contract_text_tbox = gr.Textbox(label="Contract Text",interactive=True,info="Upload .pdf or paste in text")
with gr.Row():
with gr.Column():
upload_btn = gr.components.Button(value="Upload Contract", size='sm', variant="primary")
with gr.Column():
gr.Markdown("")
with gr.Column():
redline_btn = gr.components.Button(value="Redline Contract", size='sm', variant="primary")
with gr.Column():
gr.Markdown("")
with gr.Row():
with gr.Column(variant='panel'):
gr.components.Markdown(response_label)
contract_redline_html = gr.components.HTML(label="Contract Redline")
compliance_comments_tbox = gr.Textbox(interactive=True,label='Compliance Comments')
with gr.Row(visible=False) as download_row:
with gr.Column(variant='panel'):
file_name_tbox = gr.Textbox(interactive=False,label='File Name',visible=False)
pdf_download_file = gr.File()
download_btn = gr.components.Button(value="Create Redline PDF", size='sm', variant="primary")
upload_btn.click(pdf_to_text,inputs=[contract_file_cmpt,contract_text_tbox,file_name_tbox],outputs=[contract_text_tbox,file_name_tbox,contract_redline_html,compliance_comments_tbox,pdf_download_file])
redline_btn.click(chatbot,inputs=[policy_name_dd,contract_text_tbox],outputs=[contract_redline_html,download_row])
download_btn.click(download_pdf,inputs=[compliance_comments_tbox,contract_redline_html,file_name_tbox],outputs=pdf_download_file)
with gr.Column():
gr.Markdown("")
#pdf_download_file = gr.File()
#email_btn = gr.components.Button(value="Email Redline", size='sm', variant="primary")
#email_btn.click(send_contract_email)
#download_btn.click(download_pdf,inputs=[file_name_tbox,compliance_comments_tbox,contract_redline_html])
with gr.Column():
gr.Markdown("")
with gr.Column():
gr.Markdown("")
with gr.Row():
with gr.Column():
gr.HTML('© 2023 Collegiate Influencer Marketing Systems, Inc.
CIMS.AI, CIMS.AI logo, NILI, NILI logo, and EzNIL are trademarks of Collegiate Influencer Marketing Systems, Inc.')
#iface.queue(concurrency_count=20).launch(show_api=False, auth=login_auth, auth_message= "Enter your username and password that you received from CIMS.AI. To request a login, please email 'info@cims.ai'")
iface.queue(concurrency_count=20).launch(show_api=False)