Spaces:
Sleeping
Sleeping
import requests | |
import insert_data | |
from bs4 import BeautifulSoup | |
#from dotenv import load_dotenv | |
import os | |
import streamlit as st | |
#load_dotenv() | |
api_key = st.secrets["IK_API_KEY"] | |
headers = { | |
'authorization': f"Token {api_key}" | |
} | |
def get_text_for_new_docs(list_of_documents_not_present, searchusr, lst): | |
lst_new_data = {} | |
for id in list_of_documents_not_present: | |
try: | |
lst_new_data[id] = {'id': int(id), 'title': '', 'cleantext': '', 'blocktext': '', 'size': ''} | |
lst_new_data[id]['title'] = lst[id]['title'] | |
lst_new_data[id]['size'] = lst[id]['size'] | |
except: | |
print("Error in get_text_for_new_docs") | |
try: | |
cleantext, blocktext_lst = get_text(id, searchusr) | |
blocktext = str(blocktext_lst) | |
except: | |
cleantext = '' | |
blocktext = '' | |
lst_new_data[id]['cleantext'] = cleantext | |
lst_new_data[id]['blocktext'] = blocktext | |
return lst_new_data | |
def get_text(id, searchusr): | |
idd = str(id) | |
st = '' | |
global headers | |
url = f'https://api.indiankanoon.org/doc/{idd}/' | |
res = requests.post(url, headers=headers).json() | |
print("Request for doc with id", idd, "sent") | |
try: | |
st = res['doc'] | |
html_string = st | |
escaped_string = bytes(html_string, 'utf-8').decode('unicode-escape') | |
soup = BeautifulSoup(escaped_string, "html.parser") | |
st = soup.get_text() | |
except: | |
st = '' | |
try: | |
def get_blockquotes(): | |
search_strings = ["clause", "agreement", " which reads as", " mutually agreed", " states the following"] | |
search_strings.append(str(searchusr)) | |
soup2 = BeautifulSoup(html_string, 'html.parser') | |
filtered_paragraphs = [] | |
# Find all elements and process them | |
elements = soup2.find_all() | |
for i, element in enumerate(elements): | |
# Check if the element is a paragraph containing any of the search strings | |
if element.name == 'p' and any( | |
search_string in element.get_text() for search_string in search_strings): | |
# Check the next three elements for <blockquote> elements | |
j = i + 1 | |
while j < len(elements) and j <= i + 3: | |
next_element = elements[j] | |
if next_element.name == 'blockquote': | |
filtered_paragraphs.append(next_element.get_text()) | |
j += 1 | |
return filtered_paragraphs | |
filtered_paragraphs_lst = get_blockquotes() | |
# Combine the values from matching_indents list with newlines between them | |
# filtered_paragraphs = '\n'.join(filtered_paragraphs_lst) | |
filtered_paragraphs = filtered_paragraphs_lst | |
except: | |
filtered_paragraphs = '' | |
return st, filtered_paragraphs | |
def get_docs(search): | |
global headers | |
S = requests.Session() | |
S.headers = headers | |
#lst = ["clause which reads as"] | |
lst = ["clause which reads as", " mutually agreed", "clause states the following"] | |
# lst += ["clause", "agreement"] | |
lst_data = {} | |
for qry in lst: | |
search = '"' + search + '"' + qry | |
search = search.replace(' ', '+') # queries the search text | |
for page_num in range(0, 1): | |
url = f"https://api.indiankanoon.org/search/?formInput={search}&pagenum={page_num}" | |
res = S.post(url).json() | |
# if not res['docs']: | |
# pass | |
# return [] | |
print("Res printed is", res) | |
for doc in res.get('docs', []): # safe access to 'docs' | |
doc_id = int(doc.get('tid', '')) | |
if doc_id: | |
# Initialize a sub-dictionary if not already present | |
if doc_id not in lst_data: | |
lst_data[doc_id] = {'id': int(doc_id), 'title': '', 'size': ''} | |
# Safely assign title and size with default values | |
lst_data[doc_id]['title'] = doc.get('title', '') | |
lst_data[doc_id]['size'] = doc.get('docsize', '') | |
return lst_data | |
def main(shortcode): | |
if not shortcode: | |
return "Error: No shortcode provided", 400 | |
# Simulate retrieving documents based on the shortcode | |
lst = get_docs(shortcode) | |
# Check for documents that are already present | |
list_of_docs_not_present = insert_data.check_for_already_present(lst) | |
# Identify documents that are already present | |
list_of_docs_already_present = [docid for docid in lst.keys() if docid not in list_of_docs_not_present] | |
# Get text for new documents that are not already present | |
lst_new_data = get_text_for_new_docs(list_of_docs_not_present, shortcode, lst) | |
# Writing new data to a file | |
# with open("new_data_output.txt", "w") as file: | |
# file.write(json.dumps(lst_new_data)) | |
results = insert_data.main(list_of_docs_already_present, lst_new_data, | |
shortcode) # lst and shortcode to be passed | |
''' | |
with open ("Results3.txt", "w") as file: | |
file.write(str(results)) | |
''' | |
if results is not None: | |
ln_lst1 = len(results) | |
else: | |
ln_lst1 = 0 | |
noresults = '' | |
if (ln_lst1 != 0): | |
return results | |
else: | |
return noresults | |