|
import logging |
|
from typing import List |
|
from pydantic import NoneStr |
|
import os |
|
from langchain.chains.question_answering import load_qa_chain |
|
from langchain.document_loaders import UnstructuredFileLoader |
|
from langchain.embeddings.openai import OpenAIEmbeddings |
|
from langchain.llms import OpenAI |
|
from langchain.text_splitter import CharacterTextSplitter |
|
from langchain.vectorstores import FAISS |
|
import gradio as gr |
|
import openai |
|
from langchain import PromptTemplate, OpenAI, LLMChain |
|
import validators |
|
import requests |
|
import mimetypes |
|
import tempfile |
|
import pandas as pd |
|
import re |
|
|
|
|
|
|
|
|
|
|
|
class ChemicalIdentifier: |
|
|
|
def __init__(self): |
|
|
|
openai.api_key = os.getenv("OPENAI_API_KEY") |
|
self.logger = logging.getLogger("ChemicalIdentifier") |
|
self.logger.setLevel(logging.DEBUG) |
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
console_handler = logging.StreamHandler() |
|
console_handler.setLevel(logging.DEBUG) |
|
console_handler.setFormatter(formatter) |
|
self.logger.addHandler(console_handler) |
|
|
|
def get_empty_state(self): |
|
|
|
""" Create empty Knowledge base""" |
|
|
|
return {"knowledge_base": None} |
|
|
|
def get_content_from_url(self,url:str)->List: |
|
""" |
|
Uploads a file from a given URL and returns the loaded document. |
|
Args: |
|
url (str): The URL of the file to be uploaded. |
|
Returns: |
|
Document: The loaded document. |
|
Raises: |
|
ValueError: If the URL is not valid or the file cannot be fetched. |
|
""" |
|
|
|
try: |
|
if validators.url(url): |
|
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',} |
|
r = requests.get(url,headers=headers) |
|
if r.status_code != 200: |
|
raise ValueError( |
|
"Check the url of your file; returned status code %s" % r.status_code |
|
) |
|
|
|
content_type = r.headers.get("content-type") |
|
file_extension = mimetypes.guess_extension(content_type) |
|
temp_file = tempfile.NamedTemporaryFile(suffix=file_extension, delete=False) |
|
temp_file.write(r.content) |
|
file_path = temp_file.name |
|
loader = UnstructuredFileLoader(file_path, strategy="fast") |
|
docs = loader.load() |
|
return docs |
|
else: |
|
raise ValueError("Please enter a valid URL") |
|
except Exception as e: |
|
self.logger.error("Error occurred while uploading the file: %s", str(e)) |
|
raise ValueError("Error occurred while uploading the file") from e |
|
|
|
|
|
def extract_chemical_names(self,text:str)->str: |
|
""" |
|
Extracts chemical names from the given text. |
|
Args: |
|
text (str): The text to extract chemical names from. |
|
Returns: |
|
str: The extracted chemical names in bullet form. |
|
Raises: |
|
ValueError: If an error occurs during the extraction process. |
|
""" |
|
|
|
try: |
|
prompt = f"Identify the Chemical Names Only give text in bullet form {text}. Don't Generate any extra chemicals apart from given text" |
|
response = openai.Completion.create( |
|
model="text-davinci-003", |
|
prompt=prompt, |
|
temperature=0, |
|
max_tokens=500, |
|
top_p=1, |
|
frequency_penalty=0, |
|
presence_penalty=0, |
|
) |
|
|
|
message = response.choices[0].text.strip() |
|
if ":" in message: |
|
message = re.sub(r'^.*:', '', message) |
|
return message.strip() |
|
except Exception as e: |
|
self.logger.error("Error occurred while finding chemicals: %s", str(e)) |
|
raise ValueError("Error occurred while finding chemicals") from e |
|
|
|
|
|
def get_chemicals_for_url(self,urls:str)->str: |
|
""" |
|
Retrieves chemicals from the provided URLs. |
|
|
|
Args: |
|
urls (str): Comma-separated URLs of the files to be processed. |
|
|
|
Returns: |
|
str: The extracted chemical names. |
|
|
|
Raises: |
|
ValueError: If an error occurs during the process. |
|
""" |
|
|
|
try: |
|
total_chemical=[] |
|
for url in urls.split(','): |
|
webpage_text = self.get_content_from_url(url) |
|
chemicals = self.extract_chemical_names(webpage_text) |
|
total_chemical.append(str(url)+"\n"+chemicals+"\n\n") |
|
list_of_chemicals = "".join(total_chemical) |
|
return list_of_chemicals |
|
|
|
except Exception as e: |
|
self.logger.error("Error occurred while getting chemicals from URLs: %s", str(e)) |
|
raise ValueError("Error occurred while getting chemicals from URLs") from e |
|
|
|
def create_knowledge_base(self,docs): |
|
|
|
"""Create a knowledge base from the given documents. |
|
Args: |
|
docs (List[str]): List of documents. |
|
Returns: |
|
FAISS: Knowledge base built from the documents. |
|
""" |
|
|
|
|
|
|
|
|
|
text_splitter = CharacterTextSplitter( |
|
separator="\n", chunk_size=1000, chunk_overlap=200, length_function=len |
|
) |
|
|
|
|
|
chunks = text_splitter.split_documents(docs) |
|
|
|
|
|
embeddings = OpenAIEmbeddings() |
|
|
|
|
|
knowledge_base = FAISS.from_documents(chunks, embeddings) |
|
|
|
|
|
return knowledge_base |
|
|
|
def file_path_show(self,file_paths): |
|
file_paths = [single_file_path.name for single_file_path in file_paths] |
|
return file_paths |
|
|
|
def get_chemicals_for_file(self,state): |
|
|
|
knowledge_base = state["knowledge_base"] |
|
|
|
|
|
question = "Identify the Chemical Capabilities Only" |
|
|
|
|
|
docs = knowledge_base.similarity_search(question) |
|
|
|
|
|
template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. |
|
Identify the Chemical Capabilities Only. |
|
{context} |
|
Question :{question}. |
|
The result should be in bullet points format. |
|
""" |
|
|
|
prompt = PromptTemplate(template=template,input_variables=["context","question"]) |
|
|
|
llm = OpenAI(temperature=0.4) |
|
llm_chain = LLMChain(prompt=prompt, llm=llm) |
|
|
|
|
|
chain = load_qa_chain(llm, chain_type="stuff",prompt=prompt) |
|
|
|
|
|
response = chain.run(input_documents=docs, question=question) |
|
|
|
|
|
return response |
|
|
|
def identify_chemicals_in_files(self,file_paths,state): |
|
"""Upload a file and create a knowledge base from its contents. |
|
Args: |
|
file_paths : The files to uploaded. |
|
Returns: |
|
tuple: A tuple containing the file name and the knowledge base. |
|
""" |
|
|
|
|
|
file_paths = [single_file_path.name for single_file_path in file_paths] |
|
collection_of_results = [] |
|
for file_obj in file_paths: |
|
|
|
loader = UnstructuredFileLoader(file_obj, strategy="fast") |
|
|
|
|
|
docs =loader.load() |
|
|
|
|
|
knowledge_base = self.create_knowledge_base(docs) |
|
state = {"knowledge_base": knowledge_base} |
|
pdf_name = os.path.basename(file_obj) |
|
final_ans = self.get_chemicals_for_file(state) |
|
response = pdf_name+"\n"+final_ans+"\n\n" |
|
collection_of_results.append(response) |
|
|
|
results = "".join(collection_of_results) |
|
return results |
|
|
|
def get_final_result(self,urls,file_paths,state,progress=gr.Progress()): |
|
|
|
if urls: |
|
if file_paths: |
|
urls_chemicals = self.get_chemicals_for_url(urls) |
|
file_chemicals = self.identify_chemicals_in_files(file_paths,state) |
|
chemicals = urls_chemicals + file_chemicals |
|
|
|
return chemicals |
|
else: |
|
urls_chemicals = self.get_chemicals_for_url(urls) |
|
return urls_chemicals |
|
elif file_paths: |
|
file_chemicals = self.identify_chemicals_in_files(file_paths,state) |
|
return file_chemicals |
|
else: |
|
return "No Files Uploaded" |
|
|
|
|
|
def gradio_interface(self)->None: |
|
""" |
|
Starts the Gradio interface for chemical identification. |
|
""" |
|
|
|
with gr.Blocks(css="style.css",theme='karthikeyan-adople/hudsonhayes-gray') as demo: |
|
gr.HTML("""<center style='background-color:rgb(0,1,36); text-align:center;padding:25px;'><center><h1 class ="center"> |
|
<img src="https://hudsonandhayes.co.uk/wp-content/uploads/2023/01/Group-479.svg" height="110px" width="280px"></h1></center> |
|
<br><h1 style="color:#fff">Chemical Identifier</h1></center>""") |
|
state = gr.State(self.get_empty_state()) |
|
with gr.Column(elem_id="col-container"): |
|
with gr.Row(elem_id="row-flex"): |
|
url = gr.Textbox(label="URL") |
|
with gr.Row(elem_id="row-flex"): |
|
with gr.Accordion("Upload Files", open = False): |
|
with gr.Row(): |
|
with gr.Column(scale=0.90, min_width=160): |
|
file_output = gr.File() |
|
with gr.Column(scale=0.10, min_width=160): |
|
upload_button = gr.UploadButton( |
|
"Browse File", file_types=[".txt", ".pdf", ".doc", ".docx"], |
|
file_count = "multiple",variant="primary") |
|
with gr.Row(): |
|
with gr.Column(scale=1, min_width=0): |
|
compare_btn = gr.Button(value="Analyse",variant="primary") |
|
with gr.Row(): |
|
with gr.Column(scale=1, min_width=0): |
|
compared_result = gr.Textbox(value="",label='Chemicals :',show_label=True, placeholder="",lines=10) |
|
|
|
upload_button.upload(self.file_path_show, upload_button, [file_output]) |
|
|
|
compare_btn.click(self.get_final_result,[url,upload_button,state],compared_result) |
|
|
|
demo.launch() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
chemical_identifier = ChemicalIdentifier() |
|
chemical_identifier.gradio_interface() |