DrishtiSharma commited on
Commit
7f17ee4
Β·
verified Β·
1 Parent(s): fd79de0

Update test.py

Browse files
Files changed (1) hide show
  1. test.py +183 -167
test.py CHANGED
@@ -1,170 +1,186 @@
1
- from typing import List, Union, Optional
2
  import os
3
- import requests
4
  import re
5
- import time
6
  import shutil
7
- import subprocess
8
- import pandas as pd
9
- from selenium import webdriver
10
- from selenium.webdriver.common.keys import Keys
11
- from selenium.webdriver.chrome.service import Service
12
- from selenium.webdriver.chrome.options import Options
13
- from bs4 import BeautifulSoup
14
- from selenium.webdriver.common.by import By
15
- from selenium.webdriver.support.ui import WebDriverWait
16
- from selenium.webdriver.support import expected_conditions as EC
17
- import chromedriver_autoinstaller
18
-
19
-
20
- class PatentDownloader:
21
- url = "https://patents.google.com"
22
-
23
- def __init__(self, verbose: bool = False):
24
- """
25
- Parameters
26
- ----------
27
- verbose : bool
28
- Print additional debug information.
29
- """
30
- self.verbose = verbose
31
- self.chrome_path = self.install_chrome()
32
-
33
- def install_chrome(self) -> str:
34
- """
35
- Download and install Google Chrome dynamically.
36
- Returns
37
- -------
38
- str: Path to the Chrome binary.
39
- """
40
- chrome_path = "/usr/bin/google-chrome"
41
-
42
- if not shutil.which("google-chrome"):
43
- print("Downloading and installing Google Chrome...")
44
- subprocess.run(
45
- "wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb -O chrome.deb",
46
- shell=True,
47
- check=True,
48
- )
49
- subprocess.run(
50
- "apt-get update && apt-get install -y ./chrome.deb",
51
- shell=True,
52
- check=True,
53
- )
54
- os.remove("chrome.deb")
55
-
56
- if not shutil.which("google-chrome"):
57
- raise ValueError("Google Chrome installation failed!")
58
- return chrome_path
59
-
60
- def download(self, patent: Union[str, List[str]], output_path: str = "./",
61
- waiting_time: int = 10, remove_kind_codes: Optional[List[str]] = None) -> None:
62
- """
63
- Download patent document(s) as PDF.
64
- """
65
- if isinstance(patent, list) or os.path.isfile(patent):
66
- self.get_pdfs(patent, output_path, waiting_time, remove_kind_codes)
67
- else:
68
- self.get_pdf(patent, output_path, waiting_time, remove_kind_codes)
69
-
70
- def get_pdf(self, patent: str, output_path: str = "./", waiting_time: int = 10,
71
- remove_kind_codes: Optional[List[str]] = None) -> None:
72
- """
73
- Download a single patent PDF.
74
- """
75
- if remove_kind_codes:
76
- for kind_code in remove_kind_codes:
77
- patent = re.sub(kind_code + "$", "", patent)
78
-
79
- # Automatically install ChromeDriver
80
- chromedriver_autoinstaller.install()
81
-
82
- # Set up Chrome options
83
- chrome_options = Options()
84
- chrome_options.binary_location = self.chrome_path
85
- chrome_options.add_argument("--headless")
86
- chrome_options.add_argument("--no-sandbox")
87
- chrome_options.add_argument("--disable-dev-shm-usage")
88
-
89
- # Initialize Selenium WebDriver
90
- service = Service()
91
- driver = webdriver.Chrome(service=service, options=chrome_options)
92
- pdf_link = None # Ensure pdf_link is defined
93
-
94
- try:
95
- driver.get(self.url)
96
-
97
- # Wait for the search input field and interact with it
98
- print("Waiting for the search input field...")
99
- search_input_xpath = "//input[@aria-label='Search patents']"
100
- WebDriverWait(driver, 20).until(
101
- EC.presence_of_element_located((By.XPATH, search_input_xpath))
102
- )
103
- element = driver.find_element(By.XPATH, search_input_xpath)
104
- print("Search input field located.")
105
-
106
- element.send_keys(patent)
107
- element.send_keys(Keys.RETURN)
108
-
109
- # Wait for search results to load
110
- print("Waiting for search results to load...")
111
- WebDriverWait(driver, 20).until(
112
- EC.presence_of_element_located((By.TAG_NAME, "body"))
113
- )
114
- time.sleep(waiting_time)
115
-
116
- # Parse HTML and get the PDF link
117
- soup = BeautifulSoup(driver.page_source, "html.parser")
118
- pdf_link = self.get_pdf_link(soup, patent)
119
- except Exception as e:
120
- print(f"Error occurred: {e}")
121
- finally:
122
- driver.quit()
123
-
124
- # Download the PDF
125
- if pdf_link:
126
- validate_directory(output_path)
127
- pdf_content = requests.get(pdf_link).content
128
- with open(os.path.join(output_path, f"{patent}.pdf"), "wb") as file:
129
- file.write(pdf_content)
130
- print(f">>> Patent {patent} successfully downloaded <<<")
131
- else:
132
- print(f"Error: PDF link for patent {patent} not found!")
133
-
134
- def get_pdfs(self, patents: Union[List[str], str], output_path: str = "./",
135
- waiting_time: int = 10, remove_kind_codes: Optional[List[str]] = None) -> None:
136
- """
137
- Download multiple patent PDFs from a list or file.
138
- """
139
- if isinstance(patents, str):
140
- if patents.lower().endswith('csv'):
141
- df_patents = pd.read_csv(patents)
142
- patents = df_patents['patent_number'].to_list()
143
- elif patents.lower().endswith('txt'):
144
- with open(patents, 'r') as txt_file:
145
- patents = txt_file.read().splitlines()
146
- else:
147
- raise NotImplementedError(f'Unsupported file type: {patents}')
148
-
149
- for i, patent in enumerate(patents):
150
- print(len(patents) - i, "patent(s) remaining.")
151
- self.get_pdf(patent, output_path, waiting_time, remove_kind_codes)
152
-
153
- @staticmethod
154
- def get_pdf_link(soup: BeautifulSoup, patent: str) -> Optional[str]:
155
- """
156
- Extract the PDF link from parsed HTML.
157
- """
158
- pdf_links = [link['href'] for link in soup.find_all('a', href=True) if link['href'].lower().endswith("pdf")]
159
- for link in pdf_links:
160
- if patent.lower() in link.lower():
161
- return link
162
- return None
163
-
164
-
165
- def validate_directory(directory: str) -> None:
166
- """
167
- Ensure the output directory exists.
168
- """
169
- if not os.path.exists(directory):
170
- os.makedirs(directory)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sys
2
  import os
 
3
  import re
 
4
  import shutil
5
+ import time
6
+ import streamlit as st
7
+ import nltk
8
+
9
+ # Ensure NLTK 'punkt' resource is downloaded
10
+ nltk_data_path = os.path.join(os.getcwd(), "nltk_data")
11
+ os.makedirs(nltk_data_path, exist_ok=True)
12
+ nltk.data.path.append(nltk_data_path)
13
+
14
+ # Force download of the 'punkt' resource
15
+ try:
16
+ print("Ensuring NLTK 'punkt' resource is downloaded...")
17
+ nltk.download("punkt", download_dir=nltk_data_path)
18
+ except Exception as e:
19
+ print(f"Error downloading NLTK 'punkt': {e}")
20
+
21
+ sys.path.append(os.path.abspath("."))
22
+ from langchain.chains import ConversationalRetrievalChain
23
+ from langchain.memory import ConversationBufferMemory
24
+ from langchain.llms import OpenAI
25
+ from langchain.document_loaders import UnstructuredPDFLoader
26
+ from langchain.vectorstores import Chroma
27
+ from langchain.embeddings import HuggingFaceEmbeddings
28
+ from langchain.text_splitter import NLTKTextSplitter
29
+ from patent_downloader import PatentDownloader
30
+
31
+ PERSISTED_DIRECTORY = "."
32
+
33
+ # Fetch API key securely from the environment
34
+ OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
35
+ if not OPENAI_API_KEY:
36
+ st.error("Critical Error: OpenAI API key not found in the environment variables. Please configure it.")
37
+ st.stop()
38
+
39
+ def check_poppler_installed():
40
+ if not shutil.which("pdfinfo"):
41
+ raise EnvironmentError(
42
+ "Poppler is not installed or not in PATH. Install 'poppler-utils' for PDF processing."
43
+ )
44
+
45
+ check_poppler_installed()
46
+
47
+ def load_docs(document_path):
48
+ try:
49
+ loader = UnstructuredPDFLoader(
50
+ document_path,
51
+ mode="elements",
52
+ strategy="fast",
53
+ ocr_languages=None # Explicitly disable OCR
54
+ )
55
+ documents = loader.load()
56
+ text_splitter = NLTKTextSplitter(chunk_size=1000)
57
+ return text_splitter.split_documents(documents)
58
+ except Exception as e:
59
+ st.error(f"Failed to load and process PDF: {e}")
60
+ st.stop()
61
+
62
+ def already_indexed(vectordb, file_name):
63
+ indexed_sources = set(
64
+ x["source"] for x in vectordb.get(include=["metadatas"])["metadatas"]
65
+ )
66
+ return file_name in indexed_sources
67
+
68
+ def load_chain(file_name=None):
69
+ loaded_patent = st.session_state.get("LOADED_PATENT")
70
+
71
+ vectordb = Chroma(
72
+ persist_directory=PERSISTED_DIRECTORY,
73
+ embedding_function=HuggingFaceEmbeddings(),
74
+ )
75
+ if loaded_patent == file_name or already_indexed(vectordb, file_name):
76
+ st.write("βœ… Already indexed.")
77
+ else:
78
+ vectordb.delete_collection()
79
+ docs = load_docs(file_name)
80
+ st.write("πŸ” Number of Documents: ", len(docs))
81
+
82
+ vectordb = Chroma.from_documents(
83
+ docs, HuggingFaceEmbeddings(), persist_directory=PERSISTED_DIRECTORY
84
+ )
85
+ vectordb.persist()
86
+ st.session_state["LOADED_PATENT"] = file_name
87
+
88
+ memory = ConversationBufferMemory(
89
+ memory_key="chat_history",
90
+ return_messages=True,
91
+ input_key="question",
92
+ output_key="answer",
93
+ )
94
+ return ConversationalRetrievalChain.from_llm(
95
+ OpenAI(temperature=0, openai_api_key=OPENAI_API_KEY),
96
+ vectordb.as_retriever(search_kwargs={"k": 3}),
97
+ return_source_documents=False,
98
+ memory=memory,
99
+ )
100
+
101
+ def extract_patent_number(url):
102
+ pattern = r"/patent/([A-Z]{2}\d+)"
103
+ match = re.search(pattern, url)
104
+ return match.group(1) if match else None
105
+
106
+ def download_pdf(patent_number):
107
+ try:
108
+ patent_downloader = PatentDownloader(verbose=True)
109
+ output_path = patent_downloader.download(patents=patent_number)
110
+ return output_path[0] # Return the first file path
111
+ except Exception as e:
112
+ st.error(f"Failed to download patent PDF: {e}")
113
+ st.stop()
114
+
115
+ if __name__ == "__main__":
116
+ st.set_page_config(
117
+ page_title="Patent Chat: Google Patents Chat Demo",
118
+ page_icon="πŸ“–",
119
+ layout="wide",
120
+ initial_sidebar_state="expanded",
121
+ )
122
+ st.header("πŸ“– Patent Chat: Google Patents Chat Demo")
123
+
124
+ # Allow user to input the Google patent link
125
+ patent_link = st.text_input("Enter Google Patent Link:", key="PATENT_LINK")
126
+
127
+ if not patent_link:
128
+ st.warning("Please enter a Google patent link to proceed.")
129
+ st.stop()
130
+
131
+ patent_number = extract_patent_number(patent_link)
132
+ if not patent_number:
133
+ st.error("Invalid patent link format. Please provide a valid Google patent link.")
134
+ st.stop()
135
+
136
+ st.write(f"Patent number: **{patent_number}**")
137
+
138
+ # Download the PDF file
139
+ pdf_path = f"{patent_number}.pdf"
140
+ if os.path.isfile(pdf_path):
141
+ st.write("βœ… File already downloaded.")
142
+ else:
143
+ st.write("πŸ“₯ Downloading patent file...")
144
+ pdf_path = download_pdf(patent_number)
145
+ st.write(f"βœ… File downloaded: {pdf_path}")
146
+
147
+ # Load the conversational chain
148
+ st.write("πŸ”„ Loading document into the system...")
149
+ chain = load_chain(pdf_path)
150
+ st.success("πŸš€ Document successfully loaded! You can now start asking questions.")
151
+
152
+ # Initialize the chat
153
+ if "messages" not in st.session_state:
154
+ st.session_state["messages"] = [
155
+ {"role": "assistant", "content": "Hello! How can I assist you with this patent?"}
156
+ ]
157
+
158
+ # Display chat history
159
+ for message in st.session_state.messages:
160
+ with st.chat_message(message["role"]):
161
+ st.markdown(message["content"])
162
+
163
+ # User input
164
+ if user_input := st.chat_input("What is your question?"):
165
+ st.session_state.messages.append({"role": "user", "content": user_input})
166
+ with st.chat_message("user"):
167
+ st.markdown(user_input)
168
+
169
+ # Generate assistant response
170
+ with st.chat_message("assistant"):
171
+ message_placeholder = st.empty()
172
+ full_response = ""
173
+
174
+ with st.spinner("Generating response..."):
175
+ try:
176
+ assistant_response = chain({"question": user_input})
177
+ for chunk in assistant_response["answer"].split():
178
+ full_response += chunk + " "
179
+ time.sleep(0.05) # Simulate typing effect
180
+ message_placeholder.markdown(full_response + "β–Œ")
181
+ except Exception as e:
182
+ full_response = f"An error occurred: {e}"
183
+ finally:
184
+ message_placeholder.markdown(full_response)
185
+
186
+ st.session_state.messages.append({"role": "assistant", "content": full_response})