|
import gradio as gr |
|
import requests |
|
import json |
|
from base64 import b64encode |
|
import fitz |
|
import os |
|
import pickle |
|
import pytesseract |
|
import numpy as np |
|
from langchain import OpenAI |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain.prompts import PromptTemplate |
|
from langchain.chains.summarize import load_summarize_chain |
|
from langchain.callbacks import get_openai_callback |
|
import logging |
|
|
|
logging.basicConfig( |
|
format='%(asctime)s %(levelname)-8s %(message)s', |
|
level=logging.INFO, |
|
datefmt='%Y-%m-%d %H:%M:%S') |
|
chkey = os.environ["API_TOKEN"] |
|
token = b64encode(f"{chkey}".encode('utf-8')).decode("ascii") |
|
|
|
with gr.Blocks(gr.themes.Soft()) as demo: |
|
logging.info("*** App Starting ***") |
|
|
|
intro = gr.Textbox(label="Introduction", interactive=False, value="An application to search for any Limited company in the UK, check when the latest Accounts are filed and summarize the account filing using OpenAI.\nUses UK Companies House API to search and get the company information. And Langchain's Summarization chain to create a summary. (Needs an OpenAI API key)") |
|
|
|
input_box = gr.Textbox(label="Input search string for a UK Company Name") |
|
|
|
search_btn = gr.Button("Search") |
|
|
|
doc_id = gr.State() |
|
|
|
with gr.Column(visible=False) as output_col: |
|
company_list_box = gr.Radio(choices=["Test1","Test2"],label="Company search result") |
|
|
|
|
|
|
|
display_filing = gr.Textbox(label="",interactive=False, visible=False) |
|
|
|
|
|
submit_btn = gr.Button("Get latest filing", visible=False) |
|
|
|
|
|
display_filing_doc_info = gr.Textbox(label="",interactive=False, visible=False) |
|
|
|
|
|
openapi_key_input = gr.Textbox(label="OpenAI API Key", type='password', interactive=True, visible=False) |
|
|
|
|
|
process_filing_btn = gr.Button("Summarize the Account filing", visible=False) |
|
|
|
|
|
processed_info = gr.Textbox(label="",interactive=False, visible=False) |
|
|
|
|
|
summary_text = gr.Textbox(label="Summary using OPENAI",interactive=False, visible=False) |
|
|
|
|
|
clear = gr.Button("Clear") |
|
|
|
|
|
def company_search(text): |
|
logging.info("*** New Search Starting ***") |
|
logging.info(f'Search term : {text}') |
|
url = "https://api.company-information.service.gov.uk/advanced-search/companies?company_name_includes=" + text + "&company_status=active&size=10" |
|
logging.info(f'Calling Companies House API Advanced search : {url}') |
|
auth = f'Basic {token}' |
|
payload={} |
|
headers = { |
|
'Authorization': auth |
|
} |
|
response = requests.request("GET", url, headers=headers, data=payload) |
|
logging.info(f'API Response Code : {response.status_code}') |
|
select_resp = [] |
|
if response.status_code == 200: |
|
resp = json.loads(response.text) |
|
for comp in resp["items"]: |
|
addr = [] |
|
for key, value in comp["registered_office_address"].items(): |
|
addr.append(value) |
|
select_resp.append(comp["company_number"] + " : " + comp["company_name"] + " : " + ', '.join(addr)) |
|
resp_joined = (','.join(select_resp)) |
|
logging.info(f'Response list : {resp_joined}') |
|
return {output_col: gr.update(visible=True), company_list_box: gr.update(choices=select_resp,interactive=True)} |
|
else: |
|
select_resp.append("No matching companies found") |
|
return {output_col: gr.update(visible=True), company_list_box: gr.update(choices=select_resp,interactive=False)} |
|
|
|
|
|
def company_selected(selected_company, docid): |
|
logging.info("* Company selected. Getting Filing History *") |
|
logging.info(f'User Selection : {selected_company}') |
|
regid = selected_company.split(' : ')[0] |
|
filings_url = "https://api.company-information.service.gov.uk/company/" + regid + "/filing-history?category=accounts&items_per_page=1" |
|
logging.info(f'Calling Companies House API Filings Endpoint : {filings_url}') |
|
auth = f'Basic {token}' |
|
payload={} |
|
headers = { |
|
'Authorization': auth |
|
} |
|
response = requests.request("GET", filings_url, headers=headers, data=payload) |
|
resp = json.loads(response.text) |
|
logging.info(f'API Response Code : {response.status_code}') |
|
if response.status_code == 200: |
|
if len(resp["items"])>0: |
|
resp_value = f'Latest filing done on {resp["items"][0]["date"]}.' |
|
if "links" in resp["items"][0]: |
|
if "document_metadata" in resp["items"][0]["links"]: |
|
docid = resp["items"][0]["links"]["document_metadata"].rsplit('/',1)[-1] |
|
return {display_filing: gr.Textbox.update(visible=True, value=resp_value), submit_btn: gr.update(visible=True), doc_id : docid} |
|
else: |
|
resp_value += "But Document Metadata is not available." |
|
return {display_filing: gr.Textbox.update(visible=True, value=resp_value), submit_btn: gr.update(visible=False), doc_id : "None"} |
|
else: |
|
resp_value += "But Links to the filing not available." |
|
return {display_filing: gr.Textbox.update(visible=True, value=resp_value), submit_btn: gr.update(visible=False), doc_id : "None"} |
|
else: |
|
return {display_filing: gr.Textbox.update(visible=True, value="No record of accounts filed for the company"), submit_btn: gr.update(visible=False), doc_id : "None"} |
|
else: |
|
return {display_filing: gr.Textbox.update(visible=True, value="No record of accounts filed for the company"), submit_btn: gr.update(visible=False), doc_id : "None"} |
|
|
|
|
|
def get_filing(docid): |
|
logging.info("* Getting Filing Document for latest filing *") |
|
doc_url = "https://document-api.company-information.service.gov.uk/document/" + docid + "/content" |
|
logging.info(f'Calling Companies House Documents API : {doc_url}') |
|
auth = f'Basic {token}' |
|
payload={} |
|
headers = { |
|
'Authorization': auth, |
|
'Accept': 'application/pdf' |
|
} |
|
response = requests.request("GET", doc_url, headers=headers, data=payload) |
|
logging.info(f'API Response Code : {response.status_code}') |
|
content_type = response.headers['Content-Type'] |
|
resp_value = f'Filing document is of type {content_type}. ' |
|
if content_type == 'application/pdf': |
|
filename = f'doc_{docid}.pdf' |
|
filepath = './data/'+filename |
|
with open(filepath, 'wb') as f: |
|
f.write(response.content) |
|
pdf_document = fitz.open(filepath) |
|
resp_value += f'PDF saved as: {filename}. There are a total of {pdf_document.page_count} pages' |
|
logging.info(resp_value) |
|
return {display_filing_doc_info: gr.Textbox.update(visible=True, value=resp_value), process_filing_btn: gr.update(visible=True), openapi_key_input: gr.update(visible=True), processed_info: gr.update(visible=True), doc_id : docid} |
|
else: |
|
resp_value += 'Work in progress to process these type of filings' |
|
logging.info(resp_value) |
|
return {display_filing_doc_info: gr.Textbox.update(visible=True, value=resp_value), process_filing_btn: gr.update(visible=False), openapi_key_input: gr.update(visible=False), processed_info: gr.update(visible=False), doc_id : "None"} |
|
|
|
|
|
def langchain_summarize(contents,openai_api_key): |
|
logging.info("* Calling Langchain / OPENAI to get the summary *") |
|
concatenated_content = '`n`n'.join(contents) |
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1200,chunk_overlap=20,length_function=len) |
|
docs = text_splitter.create_documents([concatenated_content]) |
|
prompt_template = """You are a financial analyst, analyzing the Annual report submitted by the limited company at UK Companies House. Write a concise summary of the following report: |
|
|
|
|
|
{text} |
|
|
|
|
|
CONCISE SUMMARY:""" |
|
PROMPT = PromptTemplate(template=prompt_template, input_variables=["text"]) |
|
llm = OpenAI(temperature=0, openai_api_key=openai_api_key) |
|
|
|
chain = load_summarize_chain(llm, chain_type="map_reduce", map_prompt=PROMPT, combine_prompt=PROMPT) |
|
with get_openai_callback() as cb: |
|
resp = chain.run(docs) |
|
tkn_text = f'*** Spent a total of {cb.total_tokens} tokens ***' |
|
return resp, tkn_text |
|
|
|
|
|
def process_filing(docid, openai_api_key, progress=gr.Progress()): |
|
logging.info("* Processing the filing document *") |
|
progress(0,desc="Starting...") |
|
filepath = f'./data/doc_{docid}.pdf' |
|
pdf_document = fitz.open(filepath) |
|
text_path = f'./text/doc_{docid}.pkl' |
|
if os.path.exists(text_path): |
|
|
|
with open(text_path, 'rb') as f: |
|
contents = pickle.load(f) |
|
else: |
|
zoom_x = 2.0 |
|
zoom_y = 2.0 |
|
mat = fitz.Matrix(zoom_x, zoom_y) |
|
contents = [] |
|
for page in progress.tqdm(pdf_document, desc="Processing pages from PDF..."): |
|
|
|
pix = page.get_pixmap(matrix=mat) |
|
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height,pix.width, pix.n) |
|
texts = pytesseract.image_to_string(img) |
|
contents.append(texts) |
|
|
|
with open(text_path, 'wb') as f: |
|
pickle.dump(contents, f) |
|
|
|
resp_value = f'Total of {pdf_document.page_count} pages processed. ' |
|
summary_path = f'./summary/doc_{docid}.txt' |
|
if os.path.exists(summary_path): |
|
with open(summary_path, 'r') as f: |
|
summary = f.read() |
|
logging.info(resp_value) |
|
return {processed_info: gr.Textbox.update(visible=True, value=resp_value), summary_text: gr.Textbox.update(visible=True, value=summary)} |
|
else: |
|
try: |
|
summary, tkn_text = langchain_summarize(contents, openai_api_key) |
|
resp_value += tkn_text |
|
logging.info(resp_value) |
|
with open(summary_path, 'wb') as f: |
|
f.write(summary.encode()) |
|
return {processed_info: gr.Textbox.update(visible=True, value=resp_value), summary_text: gr.Textbox.update(visible=True, value=summary)} |
|
except Exception as e: |
|
logging.info(e) |
|
resp_value += 'LLM Call failed. Please check the OpenAI key again' |
|
logging.info(resp_value) |
|
return {processed_info: gr.Textbox.update(visible=True, value=resp_value), summary_text: gr.Textbox.update(visible=False)} |
|
finally: |
|
logging.info(resp_value) |
|
|
|
def clear_screen(): |
|
return {output_col: gr.update(visible=False),display_filing: gr.Textbox.update(visible=False),submit_btn: gr.update(visible=False), display_filing_doc_info:gr.update(visible=False), process_filing_btn:gr.update(visible=False),openapi_key_input:gr.update(visible=False),processed_info:gr.update(visible=False),summary_text:gr.update(visible=False)} |
|
|
|
search_btn.click(company_search,input_box,[company_list_box,output_col]) |
|
company_list_box.change(company_selected,[company_list_box, doc_id],[display_filing, submit_btn, doc_id]) |
|
submit_btn.click(get_filing,doc_id,[display_filing_doc_info, process_filing_btn, openapi_key_input, processed_info, doc_id]) |
|
process_filing_btn.click(process_filing,[doc_id,openapi_key_input],[processed_info,summary_text]) |
|
clear.click(clear_screen, None, [output_col,display_filing,submit_btn,display_filing_doc_info,process_filing_btn,openapi_key_input,processed_info,summary_text]) |
|
|
|
demo.queue(concurrency_count=3) |
|
demo.launch() |