Spaces:
Running
Running
| import os | |
| import openai | |
| import gradio as gr | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import urllib.parse | |
| from selenium import webdriver | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except ImportError: | |
| pass # In production, python-dotenv may not be installed | |
| openai.api_key = os.getenv("OPEN_API_KEY") | |
| class Conversation: | |
| def __init__(self): | |
| self.messages = [] | |
| # def is_valid_url(self, url): | |
| # try: | |
| # result = urlparse(url) | |
| # return True if all([result.scheme, result.netloc]) else False | |
| # except ValueError: | |
| # return False | |
| def to_valid_url(self, input_string): | |
| print("url: ", input_string) | |
| try: | |
| url = input_string.strip() | |
| if not url: | |
| raise ValueError("Invalid URL, please try again.") | |
| parsed_url = urllib.parse.urlparse(url) | |
| if not all([parsed_url.scheme, parsed_url.netloc]): | |
| raise ValueError("Invalid URL, please try again.") | |
| if not parsed_url.scheme: | |
| url = "https://" + url | |
| parsed_url = urllib.parse.urlparse(url) | |
| return parsed_url.geturl() | |
| except ValueError: | |
| raise ValueError("Invalid URL, please try again.") | |
| def get_data(self, old_url): | |
| # ... your existing get_data implementation ... | |
| # Replace `messages` with `self.messages` | |
| def extract_html_content(url): | |
| response = requests.get(url) | |
| return response.text | |
| def extract_js_content(url): | |
| options = webdriver.ChromeOptions() | |
| options.add_argument('--headless') | |
| driver = webdriver.Chrome(ChromeDriverManager().install(), options=options) | |
| driver.get(url) | |
| rendered_content = driver.page_source | |
| driver.quit() | |
| return rendered_content | |
| def smart_scraper(url): | |
| html_content = extract_html_content(url) | |
| selector_to_find = "body" | |
| # Check if the content is incomplete or if a specific tag is missing | |
| # if not html_content or not html_content.find(selector_to_find): | |
| if not html_content or not html_content.find(selector_to_find): | |
| # If incomplete, use Selenium to render JavaScript | |
| print("Using Selenium for JavaScript rendering...") | |
| js_content = extract_js_content(url) | |
| return js_content | |
| else: | |
| return html_content | |
| url = self.to_valid_url(old_url) | |
| self.messages | |
| html = smart_scraper(url) | |
| doc = BeautifulSoup(html, 'html.parser') | |
| if not doc: | |
| raise ValueError("Please try again") | |
| doc = doc.body | |
| headings_1 = [e.text for e in doc.find_all('h1')] | |
| headings_2 = [e.text for e in doc.find_all('h2')] | |
| # headings_3 = [e.text for e in doc.find_all('h3')] | |
| links = [e.text for e in doc.find_all('a')] | |
| paragraphs = [e.text for e in doc.find_all('p')] | |
| # spans = [e.text for e in doc.find_all('span')] | |
| joined_paragraphs = (' '.join(paragraphs)) | |
| if len(joined_paragraphs) > 7500: | |
| paragraphs = joined_paragraphs[:3000] | |
| self.messages = [] | |
| self.messages.append({'role': 'system', 'content': "You are a helpful assistant that must answer questions about a website."}) | |
| self.messages.append({'role': 'system', 'content': f"here are the h1s - {headings_1}"}) | |
| self.messages.append({'role': 'system', 'content': f"here are the h2s - {headings_2}"}) | |
| # self.messages.append({'role': 'system', 'content': f"here are the links - {links}"}) | |
| # messages.append({'role': 'system', 'content': f"here are the h3s - {headings_3}"}) | |
| self.messages.append({'role': 'system', 'content': f"here are the paragraphs - {paragraphs}"}) | |
| # messages.append({'role': 'system', 'content': f"here are the spans - {spans}"}) | |
| return self.messages | |
| def ask_chatbot(self, input): | |
| # ... your existing ask_chatbot implementation ... | |
| # Replace `messages` with `self.messages` | |
| if input: | |
| self.messages.append({"role": "user", "content": input}) | |
| try: | |
| chat = openai.ChatCompletion.create( | |
| model="gpt-3.5-turbo", messages=self.messages | |
| ) | |
| except openai.error.InvalidRequestError: | |
| raise ValueError("The website is too large to understand. Please try a different site.") | |
| reply = chat.choices[0].message.content | |
| if not reply: | |
| raise ValueError("Please try again") | |
| self.messages.append({"role": "assistant", "content": reply}) | |
| return reply | |
| def user(self, user_message, history): | |
| # ... your existing user implementation ... | |
| # Replace `messages` with `self.messages` | |
| return "", history + [[user_message, None]] | |
| def bot(self, history): | |
| # ... your existing bot implementation ... | |
| # Replace `messages` with `self.messages` | |
| user_message = history[-1][0] | |
| try: | |
| bot_message = self.ask_chatbot(user_message) | |
| except ValueError: | |
| bot_message = "Please try again" | |
| history[-1][1] = bot_message | |
| return history |