Spaces:
Sleeping
Sleeping
import re | |
import ast | |
import psycopg | |
from datetime import datetime | |
import pytz | |
import streamlit as st | |
regex_pattern = r'^(\"\d+)[\s\S]*?(\"$)' | |
DB_HOST = st.secrets["DB_HOST"] | |
DB_NAME = st.secrets["DB_NAME"] | |
DB_USER = st.secrets["DB_USER"] | |
DB_PASS = st.secrets["DB_PASS"] | |
DB_SSLMODE = st.secrets["SSLMODE"] | |
def create_connection(): | |
""" create a database connection to the SQLite database | |
specified by db_file | |
:return: Connection object or None | |
""" | |
conn = None | |
try: | |
db_uri = f"host={DB_HOST} dbname={DB_NAME} user={DB_USER} password={DB_PASS} sslmode={DB_SSLMODE}" | |
conn = psycopg.connect(db_uri) | |
except: | |
print("Error during connection is \n", e) | |
return conn | |
def check_for_already_present(dict_of_docs): | |
conn = psycopg.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST) | |
cur = conn.cursor() | |
list_of_docs = list(dict_of_docs.keys()) | |
try: | |
result = [] | |
for docid in list_of_docs: | |
sql_for_check = "SELECT Doc_Id FROM stored_results WHERE Doc_Id = %s" | |
cur.execute(sql_for_check, (docid,)) | |
fetch_result = cur.fetchone() | |
if fetch_result: | |
result.append(fetch_result[0]) # Append the Doc_Id if found | |
# Convert lists to sets for set difference operation | |
set_of_docs = set(list_of_docs) | |
set_of_results = set(result) | |
# Calculate the difference | |
list_of_docs_not_present = list(set_of_docs - set_of_results) | |
return list_of_docs_not_present | |
except psycopg.DatabaseError as error: | |
print(f"Database operation failed: {error}") | |
return None # or handle error as needed | |
finally: | |
cur.close() | |
conn.close() | |
def create_task(conn, task): | |
""" | |
Create or replace a task record in both 'tasks' and 'stored_results' tables. | |
:param conn: A psycopg2 database connection object | |
:param task: A tuple containing the task data (Doc_ID, Title, Doc_Text, Doc_Blockquotes, Doc_Size) | |
:return: None | |
""" | |
sql_task = ''' | |
INSERT INTO tasks(Doc_ID, Title, Doc_Text, Doc_Blockquotes, Doc_Size) | |
VALUES (%s, %s, %s, %s, %s) | |
ON CONFLICT (Doc_ID) DO UPDATE SET | |
Title = EXCLUDED.Title, | |
Doc_Text = EXCLUDED.Doc_Text, | |
Doc_Blockquotes = EXCLUDED.Doc_Blockquotes, | |
Doc_Size = EXCLUDED.Doc_Size; | |
''' | |
sql_results = ''' | |
INSERT INTO stored_results(Doc_ID, Title, Doc_Text, Doc_Blockquotes, Doc_Size) | |
VALUES (%s, %s, %s, %s, %s) | |
ON CONFLICT (Doc_ID) DO UPDATE SET | |
Title = EXCLUDED.Title, | |
Doc_Text = EXCLUDED.Doc_Text, | |
Doc_Blockquotes = EXCLUDED.Doc_Blockquotes, | |
Doc_Size = EXCLUDED.Doc_Size; | |
''' | |
try: | |
# Using a context manager to handle the cursor | |
with conn.cursor() as cur: | |
cur.execute(sql_task, task) | |
cur.execute(sql_results, task) | |
conn.commit() | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
conn.rollback() # Rollback on error | |
finally: | |
# No need to explicitly close the cursor due to the context manager | |
print("Database operation at create_task() completed.") | |
def retrieve_text(conn, query): | |
""" | |
Create a new task | |
:param conn: | |
:param task: | |
:return: | |
""" | |
sql_query = "SELECT * FROM tasks" | |
# Execute the query to fetch rows from the table | |
cursor = conn.cursor() | |
cursor.execute(sql_query) | |
# Fetch all rows from the cursor | |
rows = cursor.fetchall() | |
# List to store JSON objects of rows with matching columns | |
matching_rows_json = [] | |
print ("Checking for matching rows") | |
for row in rows: | |
has_matching_column = False | |
has_matching_text = False | |
has_matching_indent = False | |
matching_columns = {} | |
title = None | |
doc_id = None | |
matching_text = [] | |
matching_text_with_query = [] | |
has_matching_indent = False | |
matching_indents = [] | |
for i, column_value in enumerate(row): #generates pairs of (index, column_value) for each cell in the row | |
if isinstance(column_value, str) and re.search(regex_pattern, column_value, re.MULTILINE | re.DOTALL): | |
matching_text = [] | |
matching_text_with_query = [] | |
has_matching_text = True | |
#matching_text_with_query= find_matching_text_with_query(column_value, query) | |
matching_text = find_matching_text(column_value) | |
if i == 1: # Replace title_column_index with the index of the "Title" column | |
title = column_value | |
elif i == 0: # Replace doc_id_column_index with the index of the "DocID" column | |
doc_id = column_value | |
elif i == 3: | |
matching_indent_list=ast.literal_eval(column_value) | |
#matching_indent_list = column_value | |
#print("Matching indent list/ list of blockquotes for", title, "is", matching_indent_list) | |
if len(matching_indent_list) == 0: | |
has_matching_indent = False | |
#print(title, "has no blockquote") | |
else: | |
has_matching_indent = True | |
matching_indents = [value for value in matching_indent_list] | |
#print("Indents for", title, "is", matching_indents) | |
if has_matching_text or has_matching_indent: | |
row_data = { | |
"Title": title, | |
"DocID": doc_id, | |
"matching_columns": matching_text, #+ matching_text_query, | |
"matching_indents": matching_indents | |
} | |
matching_rows_json.append(row_data) | |
data_dict = matching_rows_json | |
# Convert the list of JSON objects to a JSON array | |
#json_result = json.dumps(matching_rows_json, indent=2) | |
''' | |
df = pd.read_json(json_result) | |
data_dict = df.to_dict(orient='list') | |
''' | |
with open("Matching_rows_Format.txt", "w") as file: | |
file.write(str(data_dict)) | |
print("Generated matching rows and file with matching rows") | |
cursor.close() | |
return data_dict | |
def add_stored_results(conn, lst): | |
""" | |
Copies selected documents from 'stored_results' to 'tasks' based on document IDs provided. | |
:param conn: Database connection object. | |
:param lst: List of document IDs to transfer. | |
:return: Number of records successfully inserted or None if an error occurred. | |
""" | |
sql = '''INSERT INTO tasks (Doc_Id, Title, Doc_Text, Doc_Blockquotes, Doc_Size) | |
SELECT Doc_Id, Title, Doc_Text, Doc_Blockquotes, Doc_Size | |
FROM stored_results | |
WHERE Doc_Id = %s;''' | |
try: | |
with conn.cursor() as cur: | |
for docid in lst: | |
cur.execute(sql, (docid,)) | |
conn.commit() | |
print("Stored data transferred to tasks.") | |
return cur.rowcount # Returns the total number of rows affected by the last execute call | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
conn.rollback() # Rollback on error | |
return None | |
finally: | |
print("Operation - adding stored data completed.") | |
# Example Usage | |
# Assume 'conn' is a psycopg2 connection object | |
# document_ids = [123, 456, 789] | |
# result = add_stored_results(conn, document_ids) | |
# if result is not None: | |
# print(f"Transferred {result} records.") | |
def find_matching_text_with_query(column_value, query): | |
matches = re.finditer(regex_pattern, column_value, re.MULTILINE | re.DOTALL) | |
matching_text_with_query = [] | |
for match in matches: | |
if query in match.group(): | |
matching_text_with_query.append(query) | |
print("Matching text with query is: ", matching_text_with_query) | |
return matching_text_with_query | |
def find_matching_text(column_value): | |
matching_text = [] | |
matches = re.finditer(regex_pattern, column_value, re.MULTILINE | re.DOTALL) | |
for match in matches: | |
matching_text.append(match.group()) | |
print("Matching text is: ", matching_text) | |
return matching_text | |
def delete_sql_records(conn): | |
delete_records = "DELETE FROM tasks" | |
cur=conn.cursor() | |
cur.execute(delete_records) | |
print("Deleted records") | |
def add_classified_results(dict_of_results, searchquery): | |
conn = create_connection() | |
# SQL query for classified_index table | |
sql_query_classified = '''INSERT INTO classified_index(Doc_Id, Title, searchquery, matching_indents, matching_columns, matching_columns_after_classification, matching_indents_after_classification) | |
VALUES (%s, %s, %s, %s, %s, %s, %s)''' | |
# SQL query for another_table | |
sql_query_search_queries = '''INSERT INTO search_queries(searchquery, dateandtime) | |
VALUES (%s, %s)''' | |
# Get the current date and time in IST | |
ist = pytz.timezone('Asia/Kolkata') | |
current_datetime_ist = datetime.now(ist).strftime('%Y-%m-%d %H:%M:%S') | |
with conn: | |
cur = conn.cursor() | |
for result in dict_of_results: | |
cur.execute(sql_query_classified, (result['DocID'], result['Title'], searchquery, result['matching_columns'], result['matching_indents'], result['matching_columns_after_classification'], result['matching_indents_after_classification'])) | |
# Insert into another_table | |
cur.execute(sql_query_search_queries, (searchquery, current_datetime_ist)) | |
conn.commit() | |
def main(list_of_docs_already_present, lst_new_data, query): #lst, query to be added as parameters | |
conn = create_connection() | |
with conn: | |
# create tasks from a list | |
delete_sql_records(conn) | |
# Extracting values from each subdictionary | |
values_list = [list(subdict.values()) for subdict in lst_new_data.values()] | |
add_stored_results(conn, list_of_docs_already_present) | |
for task in values_list: | |
create_task(conn, task) | |
results=retrieve_text(conn, query) | |
return results | |
if __name__ == '__main__': | |
main() | |