AGDS-UI / app.py
Nechba's picture
Update app.py
69dbd6d verified
raw
history blame contribute delete
No virus
13.2 kB
from utlis.helper import *
import sqlite3
import hashlib
def create_document_id(token, service_selected, document_selected):
# Create a unique document ID from token, service, and document name
unique_id = f"{token}{service_selected}{document_selected}"
# Hash the unique ID using SHA-256
hashed_id = hashlib.sha256(unique_id.encode()).hexdigest()
return hashed_id
def create_database():
conn = sqlite3.connect('document_cache.db')
c = conn.cursor()
# Create table for schemas
c.execute('''CREATE TABLE IF NOT EXISTS schemas
(document_id TEXT PRIMARY KEY, schema TEXT)''')
# Create table for comments
c.execute('''CREATE TABLE IF NOT EXISTS comments
(document_id TEXT PRIMARY KEY, comments TEXT)''')
conn.commit()
conn.close()
create_database()
initialize_session_state()
with st.sidebar:
st.image("logo.png", width=170)
st.title("AGDS")
# Get List of models
llms = ['gpt-3.5-turbo', 'gemini']
st.session_state.llm = st.selectbox("Choose LLM",llms)
st.session_state.genre = st.radio(
"Choose option",
["Select document", "Add document(s)","Delete service(s)", "Delete document(s)"])
if st.session_state.genre=="Add document(s)":
st.title('Add Document(s)')
# Check service status
# Get all available services
add_new_service = st.checkbox("Add new service")
if add_new_service:
new_service = st.text_input("Enter service name")
# Get list of Embedding models
if new_service and st.button('Add'):
add_service(st.session_state.token,new_service)
data = {"token": st.session_state.token}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
services = requests.get(SERVICES_API,data=json_data, headers=headers)
services =json.loads(services.text)
if len(services)>0:
st.session_state.service = st.selectbox("Choose Service",services)
if len(services)>0:
st.session_state.doc_ortext = st.radio("Choose option",["Documnt", "Text area"])
if st.session_state.doc_ortext=="Documnt":
st.session_state.uploaded_files = st.file_uploader("Upload PDF file", type=["pdf","txt"], accept_multiple_files=False)
if st.session_state.uploaded_files:
st.session_state.process = st.button('Process')
if st.session_state.process:
add_document(st.session_state.token,st.session_state.service)
# elif st.session_state.doc_ortext=="Text area":
# st.session_state.name_text_area = st.container().text_area("Enter name of the text area")
# st.session_state.text_area = st.container().text_area("Enter text")
# if st.session_state.text_area:
# st.session_state.process = st.container().button('Process')
# if st.session_state.process:
# add_text_document(st.session_state.token,st.session_state.service)
elif st.session_state.genre=="Select document":
st.title('Scrape Document')
data = {"token": st.session_state.token}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
services = requests.get(SERVICES_API,data=json_data, headers=headers)
services =json.loads(services.text)
if len(services)>0:
st.session_state.service_slected_to_chat = st.selectbox("Choose Service",services)
data = {"token": st.session_state.token, "servicename": st.session_state.service_slected_to_chat}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
history_document = requests.get(DOCUMENT_API,data=json_data, headers=headers)
history_document =json.loads(history_document.text).get("documents",[])
history_document = [doc["documentname"] for doc in history_document]
st.session_state.doument_slected_to_chat = st.selectbox("Choose Documnet",history_document)
if st.session_state.doument_slected_to_chat.split("_")[-1]=="pdf":
data = {"token": st.session_state.token, "service_name": st.session_state.service_slected_to_chat,"document_name":st.session_state.doument_slected_to_chat}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
number_pages = requests.get(GET_NUM_PAGES,data=json_data, headers=headers)
number_pages =json.loads(number_pages.text).get("num_pages")
page_options = list(range(1, int(number_pages) + 1))
st.session_state.start_page = st.selectbox("Start Page",page_options)
st.session_state.end_page = st.selectbox("End Page", page_options, index=len(page_options) - 1)
st.session_state.method = st.selectbox("Chunking Method", ["chunk_per_page", "personalize_chunking"])
if st.session_state.method=="personalize_chunking":
st.session_state.split_token = st.text_area("Split Token")
#elif st.session_state.doument_slected_to_chat.split("_")[-1]=="txt":
else:
st.session_state.method = st.selectbox("Chunking Method", ["personalize_chunking"])
st.session_state.split_token = st.text_area("Split Token")
else:
st.session_state.service_slected_to_chat = None
elif st.session_state.genre == "Delete service(s)":
st.title('Delete Service(s)')
data = {"token": st.session_state.token}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
services = requests.get(SERVICES_API,data=json_data, headers=headers)
services =json.loads(services.text)
if len(services)>=2:
services.append("ALL")
# Get list of documents from histrory
if "ALL" in services:
service_slected = st.multiselect(
"",services ,default="ALL"
)
elif len(services)==1:
service_slected = st.multiselect(
"",services,default=services[0]
)
else:
service_slected = st.multiselect(
"",services
)
if "ALL" in service_slected:
service_slected = services
service_slected.remove("ALL")
st.write("You selected:", service_slected)
if len(service_slected) > 0:
st.session_state.delete = st.button('Delete')
if st.session_state.delete:
delete_service(st.session_state.token ,service_slected)
elif st.session_state.genre == "Delete document(s)":
st.title('Delete Document(s)')
data = {"token": st.session_state.token}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
services = requests.get(SERVICES_API,data=json_data, headers=headers)
services =json.loads(services.text)
if len(services)>0:
service = st.selectbox("Choose Service",services)
data = {"token": st.session_state.token, "servicename": service}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
history_document = requests.get(DOCUMENT_API,data=json_data, headers=headers)
history_document =json.loads(history_document.text).get("documents",[])
history_document = [doc["documentname"] for doc in history_document]
if len(history_document)>=2:
history_document.append("ALL")
# Get list of documents from histrory
if "ALL" in history_document:
document_slected_to_delete = st.multiselect(
"",history_document ,default="ALL"
)
elif len(history_document)==1:
document_slected_to_delete = st.multiselect(
"",history_document,default=history_document[0]
)
else:
document_slected_to_delete = st.multiselect(
"",history_document
)
if "ALL" in document_slected_to_delete:
document_slected_to_delete = history_document
document_slected_to_delete.remove("ALL")
st.write("You selected:", document_slected_to_delete)
if len(document_slected_to_delete) > 0:
st.session_state.delete = st.button('Delete')
if st.session_state.delete:
delete_document(st.session_state.token,st.session_state.service ,document_slected_to_delete)
css_style = """
<style>
.title {
white-space: nowrap;
}
</style>
"""
st.markdown(css_style, unsafe_allow_html=True)
with st.container():
st.markdown('<h1 class="title">Augmented Generative Document Scraper</h1>', unsafe_allow_html=True)
if st.session_state.genre=="Add document(s)" and st.session_state.doc_ortext == "Text area":
st.session_state.name_text_area = st.text_input("Enter name of the text area:")
st.session_state.text_area = st.text_area("Enter text:")
if st.session_state.text_area:
if st.button('Process Text'):
add_text_document(st.session_state.token, st.session_state.service)
if st.session_state.genre=="Select document" and st.session_state.service_slected_to_chat:
#print(st.session_state.document_selected_to_chat)
#document_id = st.session_state.token+st.session_state.service_slected_to_chat+st.session_state.doument_slected_to_chat
document_id = create_document_id(st.session_state.token, st.session_state.service_slected_to_chat, st.session_state.doument_slected_to_chat)
print(document_id)
schema = get_schema(document_id)
schema = display_and_validate_schema(schema)
if schema:
save_schema(document_id, schema)
if schema and st.checkbox("Add comments") :
comments = get_comments(document_id)
if not comments:
comments = {}
keys = get_all_keys(schema)
else:
keys = get_all_keys(schema)
comments = handle_comments(comments, keys)
save_comments(document_id, comments)
comments = get_comments(document_id)
if schema and st.button('Process') :
if st.session_state.doument_slected_to_chat.split("_")[-1]=="pdf":
data = {"token": st.session_state.token,
"service_name": st.session_state.service_slected_to_chat,
"document_name": st.session_state.doument_slected_to_chat,
"method": st.session_state.method,
"model": st.session_state.llm,
"schema": schema,
"comment": comments,
"split_token": st.session_state.split_token if st.session_state.method == "personalize_chunking" else "",
"start_page": st.session_state.start_page,
"end_page": st.session_state.end_page}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
response = requests.get(RESPONSE_API,data=json_data, headers=headers)
print(response.text)
response_data = json.loads(response.text)
#elif st.session_state.doument_slected_to_chat.split("_")[-1]=="txt":
else:
data = {"token": st.session_state.token,
"service_name": st.session_state.service_slected_to_chat,
"document_name": st.session_state.doument_slected_to_chat,
"method": st.session_state.method,
"model": st.session_state.llm,
"schema": schema,
"comment": comments,
"split_token": st.session_state.split_token}
json_data = json.dumps(data)
headers = {'Content-Type': 'application/json'}
response = requests.get(RESPONSE_TXT_API,data=json_data, headers=headers)
response_data = json.loads(response.text)
if response_data.get('status')=='success':
json_str =response_data.get("json")
json_formatted_str = json.dumps(json_str) # Convert list to JSON formatted string
# Encode this JSON string to bytes, which is required for the download
json_bytes = json_formatted_str.encode('utf-8')
st.download_button(
label="Download JSON",
data=json_bytes,
file_name="results.json",
mime="application/json"
)
else:
st.error("Error in processing document")