Spaces:
Sleeping
Sleeping
| import re | |
| import ast | |
| import psycopg | |
| from datetime import datetime | |
| import pytz | |
| import streamlit as st | |
| regex_pattern = r'^(\"\d+)[\s\S]*?(\"$)' | |
| DB_HOST = st.secrets["DB_HOST"] | |
| DB_NAME = st.secrets["DB_NAME"] | |
| DB_USER = st.secrets["DB_USER"] | |
| DB_PASS = st.secrets["DB_PASS"] | |
| DB_SSLMODE = st.secrets["SSLMODE"] | |
| def create_connection(): | |
| """ create a database connection to the SQLite database | |
| specified by db_file | |
| :return: Connection object or None | |
| """ | |
| conn = None | |
| try: | |
| db_uri = f"host={DB_HOST} dbname={DB_NAME} user={DB_USER} password={DB_PASS} sslmode={DB_SSLMODE}" | |
| conn = psycopg.connect(db_uri) | |
| except: | |
| print("Error during connection is \n", e) | |
| return conn | |
| def check_for_already_present(dict_of_docs): | |
| conn = psycopg.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST) | |
| cur = conn.cursor() | |
| list_of_docs = list(dict_of_docs.keys()) | |
| try: | |
| result = [] | |
| for docid in list_of_docs: | |
| sql_for_check = "SELECT Doc_Id FROM stored_results WHERE Doc_Id = %s" | |
| cur.execute(sql_for_check, (docid,)) | |
| fetch_result = cur.fetchone() | |
| if fetch_result: | |
| result.append(fetch_result[0]) # Append the Doc_Id if found | |
| # Convert lists to sets for set difference operation | |
| set_of_docs = set(list_of_docs) | |
| set_of_results = set(result) | |
| # Calculate the difference | |
| list_of_docs_not_present = list(set_of_docs - set_of_results) | |
| return list_of_docs_not_present | |
| except psycopg.DatabaseError as error: | |
| print(f"Database operation failed: {error}") | |
| return None # or handle error as needed | |
| finally: | |
| cur.close() | |
| conn.close() | |
| def create_task(conn, task): | |
| """ | |
| Create or replace a task record in both 'tasks' and 'stored_results' tables. | |
| :param conn: A psycopg2 database connection object | |
| :param task: A tuple containing the task data (Doc_ID, Title, Doc_Text, Doc_Blockquotes, Doc_Size) | |
| :return: None | |
| """ | |
| sql_task = ''' | |
| INSERT INTO tasks(Doc_ID, Title, Doc_Text, Doc_Blockquotes, Doc_Size) | |
| VALUES (%s, %s, %s, %s, %s) | |
| ON CONFLICT (Doc_ID) DO UPDATE SET | |
| Title = EXCLUDED.Title, | |
| Doc_Text = EXCLUDED.Doc_Text, | |
| Doc_Blockquotes = EXCLUDED.Doc_Blockquotes, | |
| Doc_Size = EXCLUDED.Doc_Size; | |
| ''' | |
| sql_results = ''' | |
| INSERT INTO stored_results(Doc_ID, Title, Doc_Text, Doc_Blockquotes, Doc_Size) | |
| VALUES (%s, %s, %s, %s, %s) | |
| ON CONFLICT (Doc_ID) DO UPDATE SET | |
| Title = EXCLUDED.Title, | |
| Doc_Text = EXCLUDED.Doc_Text, | |
| Doc_Blockquotes = EXCLUDED.Doc_Blockquotes, | |
| Doc_Size = EXCLUDED.Doc_Size; | |
| ''' | |
| try: | |
| # Using a context manager to handle the cursor | |
| with conn.cursor() as cur: | |
| try: | |
| cur.execute(sql_task, task) | |
| except psycopg.errors.UndefinedTable: | |
| print("Warning: 'tasks' table does not exist. Skipping tasks table insertion.") | |
| cur.execute(sql_results, task) | |
| conn.commit() | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| conn.rollback() # Rollback on error | |
| finally: | |
| # No need to explicitly close the cursor due to the context manager | |
| print("Database operation at create_task() completed.") | |
| def retrieve_text(conn, query): | |
| """ | |
| Create a new task | |
| :param conn: | |
| :param task: | |
| :return: | |
| """ | |
| sql_query = "SELECT * FROM tasks" | |
| # Execute the query to fetch rows from the table | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute(sql_query) | |
| except psycopg.errors.UndefinedTable: | |
| print("Warning: 'tasks' table does not exist. No data to retrieve.") | |
| cursor.close() | |
| return [] # Return empty list if table doesn't exist | |
| # Fetch all rows from the cursor | |
| rows = cursor.fetchall() | |
| # List to store JSON objects of rows with matching columns | |
| matching_rows_json = [] | |
| print ("Checking for matching rows") | |
| for row in rows: | |
| has_matching_column = False | |
| has_matching_text = False | |
| has_matching_indent = False | |
| matching_columns = {} | |
| title = None | |
| doc_id = None | |
| matching_text = [] | |
| matching_text_with_query = [] | |
| has_matching_indent = False | |
| matching_indents = [] | |
| for i, column_value in enumerate(row): #generates pairs of (index, column_value) for each cell in the row | |
| if isinstance(column_value, str) and re.search(regex_pattern, column_value, re.MULTILINE | re.DOTALL): | |
| matching_text = [] | |
| matching_text_with_query = [] | |
| has_matching_text = True | |
| #matching_text_with_query= find_matching_text_with_query(column_value, query) | |
| matching_text = find_matching_text(column_value) | |
| if i == 1: # Replace title_column_index with the index of the "Title" column | |
| title = column_value | |
| elif i == 0: # Replace doc_id_column_index with the index of the "DocID" column | |
| doc_id = column_value | |
| elif i == 3: | |
| matching_indent_list=ast.literal_eval(column_value) | |
| #matching_indent_list = column_value | |
| #print("Matching indent list/ list of blockquotes for", title, "is", matching_indent_list) | |
| if len(matching_indent_list) == 0: | |
| has_matching_indent = False | |
| #print(title, "has no blockquote") | |
| else: | |
| has_matching_indent = True | |
| matching_indents = [value for value in matching_indent_list] | |
| #print("Indents for", title, "is", matching_indents) | |
| if has_matching_text or has_matching_indent: | |
| row_data = { | |
| "Title": title, | |
| "DocID": doc_id, | |
| "matching_columns": matching_text, #+ matching_text_query, | |
| "matching_indents": matching_indents | |
| } | |
| matching_rows_json.append(row_data) | |
| data_dict = matching_rows_json | |
| # Convert the list of JSON objects to a JSON array | |
| #json_result = json.dumps(matching_rows_json, indent=2) | |
| ''' | |
| df = pd.read_json(json_result) | |
| data_dict = df.to_dict(orient='list') | |
| ''' | |
| with open("Matching_rows_Format.txt", "w") as file: | |
| file.write(str(data_dict)) | |
| print("Generated matching rows and file with matching rows") | |
| cursor.close() | |
| return data_dict | |
| def add_stored_results(conn, lst): | |
| """ | |
| Copies selected documents from 'stored_results' to 'tasks' based on document IDs provided. | |
| :param conn: Database connection object. | |
| :param lst: List of document IDs to transfer. | |
| :return: Number of records successfully inserted or None if an error occurred. | |
| """ | |
| sql = '''INSERT INTO tasks (Doc_Id, Title, Doc_Text, Doc_Blockquotes, Doc_Size) | |
| SELECT Doc_Id, Title, Doc_Text, Doc_Blockquotes, Doc_Size | |
| FROM stored_results | |
| WHERE Doc_Id = %s;''' | |
| try: | |
| with conn.cursor() as cur: | |
| for docid in lst: | |
| try: | |
| cur.execute(sql, (docid,)) | |
| except psycopg.errors.UndefinedTable: | |
| print("Warning: 'tasks' table does not exist. Cannot transfer data.") | |
| return 0 | |
| conn.commit() | |
| print("Stored data transferred to tasks.") | |
| return cur.rowcount # Returns the total number of rows affected by the last execute call | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| conn.rollback() # Rollback on error | |
| return None | |
| finally: | |
| print("Operation - adding stored data completed.") | |
| # Example Usage | |
| # Assume 'conn' is a psycopg2 connection object | |
| # document_ids = [123, 456, 789] | |
| # result = add_stored_results(conn, document_ids) | |
| # if result is not None: | |
| # print(f"Transferred {result} records.") | |
| def find_matching_text_with_query(column_value, query): | |
| matches = re.finditer(regex_pattern, column_value, re.MULTILINE | re.DOTALL) | |
| matching_text_with_query = [] | |
| for match in matches: | |
| if query in match.group(): | |
| matching_text_with_query.append(query) | |
| print("Matching text with query is: ", matching_text_with_query) | |
| return matching_text_with_query | |
| def find_matching_text(column_value): | |
| matching_text = [] | |
| matches = re.finditer(regex_pattern, column_value, re.MULTILINE | re.DOTALL) | |
| for match in matches: | |
| matching_text.append(match.group()) | |
| print("Matching text is: ", matching_text) | |
| return matching_text | |
| def delete_sql_records(conn): | |
| delete_records = "DELETE FROM tasks" | |
| cur = conn.cursor() | |
| try: | |
| cur.execute(delete_records) | |
| print("Deleted records") | |
| except psycopg.errors.UndefinedTable: | |
| print("Warning: 'tasks' table does not exist. Skipping deletion.") | |
| except Exception as e: | |
| print(f"Error deleting records: {e}") | |
| def add_classified_results(dict_of_results, searchquery): | |
| conn = create_connection() | |
| # SQL query for classified_index table | |
| sql_query_classified = '''INSERT INTO classified_index(Doc_Id, Title, searchquery, matching_indents, matching_columns, matching_columns_after_classification, matching_indents_after_classification) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s)''' | |
| # SQL query for another_table | |
| sql_query_search_queries = '''INSERT INTO search_queries(searchquery, dateandtime) | |
| VALUES (%s, %s)''' | |
| # Get the current date and time in IST | |
| ist = pytz.timezone('Asia/Kolkata') | |
| current_datetime_ist = datetime.now(ist).strftime('%Y-%m-%d %H:%M:%S') | |
| with conn: | |
| cur = conn.cursor() | |
| for result in dict_of_results: | |
| cur.execute(sql_query_classified, (result['DocID'], result['Title'], searchquery, result['matching_columns'], result['matching_indents'], result['matching_columns_after_classification'], result['matching_indents_after_classification'])) | |
| # Insert into another_table | |
| cur.execute(sql_query_search_queries, (searchquery, current_datetime_ist)) | |
| conn.commit() | |
| def main(list_of_docs_already_present, lst_new_data, query): #lst, query to be added as parameters | |
| conn = create_connection() | |
| with conn: | |
| # create tasks from a list | |
| delete_sql_records(conn) | |
| # Extracting values from each subdictionary | |
| values_list = [list(subdict.values()) for subdict in lst_new_data.values()] | |
| add_stored_results(conn, list_of_docs_already_present) | |
| for task in values_list: | |
| create_task(conn, task) | |
| results=retrieve_text(conn, query) | |
| return results | |
| if __name__ == '__main__': | |
| main() | |