#### For scraping/webpage processing import requests import json # specifically for wikipedia api from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from bs4 import BeautifulSoup #### For timing import time #### For app import streamlit as st from collections import deque # for printouts #### For semantic similarity model # !pip install tensorflow tensorflow-hub import tensorflow as tf import tensorflow_hub as hub import numpy as np embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4") # Load the pre-trained Universal Sentence Encoder -- accessible at same link # # @st.experimental_singleton # @st.cache_resource # def get_driver(): # return webdriver.Chrome(service = Service(ChromeDriverManager().install()), options = options) # import os, sys # @st.cache_resource # def installff(): # os.system('sbase install geckodriver') # os.system('ln -s /home/appuser/venv/lib/python3.7/site-packages/seleniumbase/drivers/geckodriver /home/appuser/venv/bin/geckodriver') # _ = installff() # from selenium import webdriver # from selenium.webdriver import FirefoxOptions # opts = FirefoxOptions() # opts.add_argument("--headless") # driver = webdriver.Firefox(options=opts) # driver_target = webdriver.Firefox(options=opts) # browser.get('http://example.com') # driver.get("http://example.com") # from selenium import webdriver # from selenium.common.exceptions import TimeoutException # from selenium.webdriver.common.by import By # from selenium.webdriver.firefox.options import Options # from selenium.webdriver.firefox.service import Service # from selenium.webdriver.support import expected_conditions as EC # from selenium.webdriver.support.ui import WebDriverWait # from webdriver_manager.firefox import GeckoDriverManager # # URL = "" # TIMEOUT = 20 # # st.title("Test Selenium") # firefoxOptions = Options() # firefoxOptions.add_argument("--headless") # service = Service(GeckoDriverManager().install()) # driver = webdriver.Firefox( # options=firefoxOptions, # service=service, # ) # driver_target = webdriver.Firefox( # options=firefoxOptions, # service=service, # ) import streamlit as st from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager @st.cache_resource def get_driver(): return webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) options = Options() options.add_argument('--disable-gpu') options.add_argument('--headless') driver = get_driver() driver_target = get_driver() # driver.get('http://example.com') # st.code(driver.page_source) # Initialize an empty deque messages = deque(maxlen = 1000) # after 1000 links, it'll start popping things. The model should always timeout before this, since most people won't have the patience to make it last this long def update_messages(message): # Add the new message to the start of deque messages.appendleft(message) # Use a placeholder placeholder = st.empty() # Clear the placeholder and add all the messages from the deque placeholder.text('') # clears the placeholder for msg in messages: placeholder.text(msg) def most_similar_sentence(target_topic, labels_list): # Encode the context sentence and all sentences in the list context_embedding = embed([target_topic])[0] sentence_embeddings = embed(labels_list) # Calculate cosine similarities between the context sentence and each sentence in the list similarities = np.inner(context_embedding, sentence_embeddings) # Find the index of the most similar sentence most_similar_index = np.argmax(similarities) return labels_list[most_similar_index], similarities[most_similar_index], most_similar_index def search_wikipedia(search_term): # Define the endpoint endpoint = "https://en.wikipedia.org/w/api.php" # Define the search parameters params = { "action": "query", "format": "json", "list": "search", "srsearch": search_term } # Send a GET request to the endpoint with your parameters response = requests.get(url = endpoint, params = params) # Parse the results as JSON data = json.loads(response.text) # Get the title of the first result (this will be used as the page title in the next step) page_title = data["query"]["search"][0]["title"] if "may refer to" in data["query"]["search"][0]["snippet"].lower(): page_title = data["query"]["search"][1]["title"] # Construct the URL of the Wikipedia page page_url = "https://en.wikipedia.org/wiki/{}".format(page_title.replace(" ", "_")) return page_url, page_title def get_topic_context(driver, more = False): # Find the first paragraph of the main article first_paragraph = driver.find_element(By.CSS_SELECTOR, "div.mw-parser-output > p:not(.mw-empty-elt)").text if more: context_sentence = ". ".join(first_paragraph.split(". ")[:5]) else: context_sentence = first_paragraph.split(". ")[0] return context_sentence # bad_words = [word for word in open("censored.txt", "r").readlines()] bad_words = [word.strip() for word in open("censored.txt", "r").readlines()] def refine_links(topic, links, current_url_suffix, used_links, used_topics, censor = False): links_texts = [] # Iterate through the links and extract their URLs for link in links: link_url = link.get('href') if link_url and link_url.startswith("/wiki/"): link_url = "https://en.wikipedia.org" + link_url link_text = link.text.strip() # Get the text and remove leading/trailing spaces # make sure they are both not None if link_text and current_url_suffix not in link_url: if link_url not in used_links and link_text.lower() not in [topic.lower() for topic in used_topics]: # eliminates topic duplicates, non-wiki links, and wiki-help pages (non-content pages) if topic.lower() not in link_url.lower() and "en.wikipedia.org/wiki/" in link_url and ":" not in "".join(link_url.split("/")[1:]) and "Main_Page" != str(link_url.split("/")[-1]): # censoring if needed if censor: if not any(word1.lower() in bad_words for word1 in [word.lower() for word in link_text.split()]): links_texts.append((link_url, link_text)) else: links_texts.append((link_url, link_text)) return links_texts def play_wiki_game_2(starting_topic: str, target_topic: str, limit: int = 100, delay: int = 0): ##### Setup Chrome options # chrome_options = webdriver.ChromeOptions() # chrome_options.add_argument("--headless") # Ensure GUI is off # chrome_options.add_argument("--no-sandbox") # chrome_options.add_argument("--disable-dev-shm-usage") # driver = webdriver.Chrome(options = chrome_options) # options = Options() # options.add_argument('--disable-gpu') # options.add_argument('--headless') # driver = get_driver() # driver = webdriver.Firefox(options=opts) # driver_target = webdriver.Firefox(options=opts) #### Getting target url, topic, and context # driver_target = webdriver.Chrome(options = chrome_options) # driver_target = get_driver() target_url, target_topic = search_wikipedia(search_term = target_topic) driver_target.get(target_url) target_context = get_topic_context(driver_target, more = True) # update_messages(target_context) driver_target.quit() topic = starting_topic num_pages = 0 used_topics = [] used_links = [] start_time = time.time() ### BEGIN ### update_messages("-" * 150) update_messages(f"\nStarting!\n") update_messages("-" * 150) url, topic = search_wikipedia(search_term = starting_topic) driver.get(url) used_topics.append(topic) used_links.append(driver.current_url) while True: # increment the page tracking by 1 for each new page num_pages += 1 # if not the first page, navigate to the new page if num_pages > 1: driver.get(next_link) try: context_sentence = get_topic_context(driver) except Exception as e: context_sentence = "Context could not be found from webpage" current_url = driver.current_url current_url_suffix = str(current_url).split("/")[-1] ### Use BeautifulSoup and Requests instead of Selenium for link extraction current_page = driver.page_source # html from Selenium instead of BeautifulSoup soup = BeautifulSoup(current_page, 'html.parser') links = soup.find_all('a') # get rid of any bloat in the links from the page links_texts = refine_links(topic, links, current_url_suffix, used_links, used_topics) # best_label, best_score, loc_idx = most_similar_sentence(target_topic = target_topic, labels_list = [text for link, text in links_texts]) best_label, best_score, loc_idx = most_similar_sentence(target_topic = target_context.lower(), labels_list = [text.lower() for link, text in links_texts]) update_messages(f"\nPage: {num_pages}") update_messages(f"Current topic: '{topic.title()}'") update_messages(f"Current URL: '{current_url}'") update_messages(f"Current Topic Context: '{context_sentence}'") if current_url != target_url: update_messages(f"Next topic: '{best_label.title()}'. Semantic similarity to '{target_topic.title()}': {round((best_score * 100), 2)}%") next_link, topic = links_texts[loc_idx] used_links.append(next_link) used_topics.append(topic) if current_url == target_url: # because the target_url is now found through the API update_messages("\n" + "-" * 150) update_messages(f"\nFrom '{starting_topic.title()}', to '{target_topic.title()}' in {num_pages} pages, {round(time.time() - start_time, 2)} seconds!") update_messages(f"Starting topic: '{starting_topic.title()}': '{used_links[0]}'") update_messages(f"Target topic: '{target_topic.title()}': '{target_url}'\n") update_messages("-" * 150) driver.quit() break if num_pages == limit: update_messages("\n" + "-" * 150) update_messages(f"\nUnfortunately, the model couldn't get from '{starting_topic.title()}', to '{target_topic.title()}' in {num_pages} pages or less.") update_messages(f"In {round(time.time() - start_time, 2)} seconds, it got from '{starting_topic.title()}': '{used_links[0]}', to '{used_topics[-1].title()}': '{used_links[-1]}'") update_messages(f"\nTry a different combination to see if it can do it!\n") update_messages("-" * 150) driver.quit() break # delay things, if applicable ###### Example time.sleep(delay) # starting_topic = 'soulja boy' # target_topic = 'urine' # play_wiki_game(starting_topic = starting_topic, target_topic = target_topic, limit = 50)