ICAT-v1-Query / insert_data.py
sankalps's picture
added edit to connection string
f646c84
import re
import ast
import psycopg
from datetime import datetime
import pytz
import streamlit as st
regex_pattern = r'^(\"\d+)[\s\S]*?(\"$)'
DB_HOST = st.secrets["DB_HOST"]
DB_NAME = st.secrets["DB_NAME"]
DB_USER = st.secrets["DB_USER"]
DB_PASS = st.secrets["DB_PASS"]
DB_SSLMODE = st.secrets["SSLMODE"]
def create_connection():
""" create a database connection to the SQLite database
specified by db_file
:return: Connection object or None
"""
conn = None
try:
db_uri = f"host={DB_HOST} dbname={DB_NAME} user={DB_USER} password={DB_PASS} sslmode={DB_SSLMODE}"
conn = psycopg.connect(db_uri)
except:
print("Error during connection is \n", e)
return conn
def check_for_already_present(dict_of_docs):
conn = psycopg.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST)
cur = conn.cursor()
list_of_docs = list(dict_of_docs.keys())
try:
result = []
for docid in list_of_docs:
sql_for_check = "SELECT Doc_Id FROM stored_results WHERE Doc_Id = %s"
cur.execute(sql_for_check, (docid,))
fetch_result = cur.fetchone()
if fetch_result:
result.append(fetch_result[0]) # Append the Doc_Id if found
# Convert lists to sets for set difference operation
set_of_docs = set(list_of_docs)
set_of_results = set(result)
# Calculate the difference
list_of_docs_not_present = list(set_of_docs - set_of_results)
return list_of_docs_not_present
except psycopg.DatabaseError as error:
print(f"Database operation failed: {error}")
return None # or handle error as needed
finally:
cur.close()
conn.close()
def create_task(conn, task):
"""
Create or replace a task record in both 'tasks' and 'stored_results' tables.
:param conn: A psycopg2 database connection object
:param task: A tuple containing the task data (Doc_ID, Title, Doc_Text, Doc_Blockquotes, Doc_Size)
:return: None
"""
sql_task = '''
INSERT INTO tasks(Doc_ID, Title, Doc_Text, Doc_Blockquotes, Doc_Size)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (Doc_ID) DO UPDATE SET
Title = EXCLUDED.Title,
Doc_Text = EXCLUDED.Doc_Text,
Doc_Blockquotes = EXCLUDED.Doc_Blockquotes,
Doc_Size = EXCLUDED.Doc_Size;
'''
sql_results = '''
INSERT INTO stored_results(Doc_ID, Title, Doc_Text, Doc_Blockquotes, Doc_Size)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (Doc_ID) DO UPDATE SET
Title = EXCLUDED.Title,
Doc_Text = EXCLUDED.Doc_Text,
Doc_Blockquotes = EXCLUDED.Doc_Blockquotes,
Doc_Size = EXCLUDED.Doc_Size;
'''
try:
# Using a context manager to handle the cursor
with conn.cursor() as cur:
cur.execute(sql_task, task)
cur.execute(sql_results, task)
conn.commit()
except Exception as e:
print(f"An error occurred: {e}")
conn.rollback() # Rollback on error
finally:
# No need to explicitly close the cursor due to the context manager
print("Database operation at create_task() completed.")
def retrieve_text(conn, query):
"""
Create a new task
:param conn:
:param task:
:return:
"""
sql_query = "SELECT * FROM tasks"
# Execute the query to fetch rows from the table
cursor = conn.cursor()
cursor.execute(sql_query)
# Fetch all rows from the cursor
rows = cursor.fetchall()
# List to store JSON objects of rows with matching columns
matching_rows_json = []
print ("Checking for matching rows")
for row in rows:
has_matching_column = False
has_matching_text = False
has_matching_indent = False
matching_columns = {}
title = None
doc_id = None
matching_text = []
matching_text_with_query = []
has_matching_indent = False
matching_indents = []
for i, column_value in enumerate(row): #generates pairs of (index, column_value) for each cell in the row
if isinstance(column_value, str) and re.search(regex_pattern, column_value, re.MULTILINE | re.DOTALL):
matching_text = []
matching_text_with_query = []
has_matching_text = True
#matching_text_with_query= find_matching_text_with_query(column_value, query)
matching_text = find_matching_text(column_value)
if i == 1: # Replace title_column_index with the index of the "Title" column
title = column_value
elif i == 0: # Replace doc_id_column_index with the index of the "DocID" column
doc_id = column_value
elif i == 3:
matching_indent_list=ast.literal_eval(column_value)
#matching_indent_list = column_value
#print("Matching indent list/ list of blockquotes for", title, "is", matching_indent_list)
if len(matching_indent_list) == 0:
has_matching_indent = False
#print(title, "has no blockquote")
else:
has_matching_indent = True
matching_indents = [value for value in matching_indent_list]
#print("Indents for", title, "is", matching_indents)
if has_matching_text or has_matching_indent:
row_data = {
"Title": title,
"DocID": doc_id,
"matching_columns": matching_text, #+ matching_text_query,
"matching_indents": matching_indents
}
matching_rows_json.append(row_data)
data_dict = matching_rows_json
# Convert the list of JSON objects to a JSON array
#json_result = json.dumps(matching_rows_json, indent=2)
'''
df = pd.read_json(json_result)
data_dict = df.to_dict(orient='list')
'''
with open("Matching_rows_Format.txt", "w") as file:
file.write(str(data_dict))
print("Generated matching rows and file with matching rows")
cursor.close()
return data_dict
def add_stored_results(conn, lst):
"""
Copies selected documents from 'stored_results' to 'tasks' based on document IDs provided.
:param conn: Database connection object.
:param lst: List of document IDs to transfer.
:return: Number of records successfully inserted or None if an error occurred.
"""
sql = '''INSERT INTO tasks (Doc_Id, Title, Doc_Text, Doc_Blockquotes, Doc_Size)
SELECT Doc_Id, Title, Doc_Text, Doc_Blockquotes, Doc_Size
FROM stored_results
WHERE Doc_Id = %s;'''
try:
with conn.cursor() as cur:
for docid in lst:
cur.execute(sql, (docid,))
conn.commit()
print("Stored data transferred to tasks.")
return cur.rowcount # Returns the total number of rows affected by the last execute call
except Exception as e:
print(f"An error occurred: {e}")
conn.rollback() # Rollback on error
return None
finally:
print("Operation - adding stored data completed.")
# Example Usage
# Assume 'conn' is a psycopg2 connection object
# document_ids = [123, 456, 789]
# result = add_stored_results(conn, document_ids)
# if result is not None:
# print(f"Transferred {result} records.")
def find_matching_text_with_query(column_value, query):
matches = re.finditer(regex_pattern, column_value, re.MULTILINE | re.DOTALL)
matching_text_with_query = []
for match in matches:
if query in match.group():
matching_text_with_query.append(query)
print("Matching text with query is: ", matching_text_with_query)
return matching_text_with_query
def find_matching_text(column_value):
matching_text = []
matches = re.finditer(regex_pattern, column_value, re.MULTILINE | re.DOTALL)
for match in matches:
matching_text.append(match.group())
print("Matching text is: ", matching_text)
return matching_text
def delete_sql_records(conn):
delete_records = "DELETE FROM tasks"
cur=conn.cursor()
cur.execute(delete_records)
print("Deleted records")
def add_classified_results(dict_of_results, searchquery):
conn = create_connection()
# SQL query for classified_index table
sql_query_classified = '''INSERT INTO classified_index(Doc_Id, Title, searchquery, matching_indents, matching_columns, matching_columns_after_classification, matching_indents_after_classification)
VALUES (%s, %s, %s, %s, %s, %s, %s)'''
# SQL query for another_table
sql_query_search_queries = '''INSERT INTO search_queries(searchquery, dateandtime)
VALUES (%s, %s)'''
# Get the current date and time in IST
ist = pytz.timezone('Asia/Kolkata')
current_datetime_ist = datetime.now(ist).strftime('%Y-%m-%d %H:%M:%S')
with conn:
cur = conn.cursor()
for result in dict_of_results:
cur.execute(sql_query_classified, (result['DocID'], result['Title'], searchquery, result['matching_columns'], result['matching_indents'], result['matching_columns_after_classification'], result['matching_indents_after_classification']))
# Insert into another_table
cur.execute(sql_query_search_queries, (searchquery, current_datetime_ist))
conn.commit()
def main(list_of_docs_already_present, lst_new_data, query): #lst, query to be added as parameters
conn = create_connection()
with conn:
# create tasks from a list
delete_sql_records(conn)
# Extracting values from each subdictionary
values_list = [list(subdict.values()) for subdict in lst_new_data.values()]
add_stored_results(conn, list_of_docs_already_present)
for task in values_list:
create_task(conn, task)
results=retrieve_text(conn, query)
return results
if __name__ == '__main__':
main()