Spaces:
Sleeping
Sleeping
File size: 11,546 Bytes
9511a2d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
#### For scraping/webpage processing
import requests
import json # specifically for wikipedia api
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from bs4 import BeautifulSoup
#### For timing
import time
#### For app
import streamlit as st
from collections import deque # for printouts
#### For semantic similarity model
# !pip install tensorflow tensorflow-hub
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4") # Load the pre-trained Universal Sentence Encoder -- accessible at same link
# # @st.experimental_singleton
# @st.cache_resource
# def get_driver():
# return webdriver.Chrome(service = Service(ChromeDriverManager().install()), options = options)
# import os, sys
# @st.cache_resource
# def installff():
# os.system('sbase install geckodriver')
# os.system('ln -s /home/appuser/venv/lib/python3.7/site-packages/seleniumbase/drivers/geckodriver /home/appuser/venv/bin/geckodriver')
# _ = installff()
# from selenium import webdriver
# from selenium.webdriver import FirefoxOptions
# opts = FirefoxOptions()
# opts.add_argument("--headless")
# driver = webdriver.Firefox(options=opts)
# driver_target = webdriver.Firefox(options=opts)
# browser.get('http://example.com')
# driver.get("http://example.com")
# from selenium import webdriver
# from selenium.common.exceptions import TimeoutException
# from selenium.webdriver.common.by import By
# from selenium.webdriver.firefox.options import Options
# from selenium.webdriver.firefox.service import Service
# from selenium.webdriver.support import expected_conditions as EC
# from selenium.webdriver.support.ui import WebDriverWait
# from webdriver_manager.firefox import GeckoDriverManager
# # URL = ""
# TIMEOUT = 20
# # st.title("Test Selenium")
# firefoxOptions = Options()
# firefoxOptions.add_argument("--headless")
# service = Service(GeckoDriverManager().install())
# driver = webdriver.Firefox(
# options=firefoxOptions,
# service=service,
# )
# driver_target = webdriver.Firefox(
# options=firefoxOptions,
# service=service,
# )
import streamlit as st
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
@st.cache_resource
def get_driver():
return webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
options = Options()
options.add_argument('--disable-gpu')
options.add_argument('--headless')
driver = get_driver()
driver_target = get_driver()
# driver.get('http://example.com')
# st.code(driver.page_source)
# Initialize an empty deque
messages = deque(maxlen = 1000) # after 1000 links, it'll start popping things. The model should always timeout before this, since most people won't have the patience to make it last this long
def update_messages(message):
# Add the new message to the start of deque
messages.appendleft(message)
# Use a placeholder
placeholder = st.empty()
# Clear the placeholder and add all the messages from the deque
placeholder.text('') # clears the placeholder
for msg in messages:
placeholder.text(msg)
def most_similar_sentence(target_topic, labels_list):
# Encode the context sentence and all sentences in the list
context_embedding = embed([target_topic])[0]
sentence_embeddings = embed(labels_list)
# Calculate cosine similarities between the context sentence and each sentence in the list
similarities = np.inner(context_embedding, sentence_embeddings)
# Find the index of the most similar sentence
most_similar_index = np.argmax(similarities)
return labels_list[most_similar_index], similarities[most_similar_index], most_similar_index
def search_wikipedia(search_term):
# Define the endpoint
endpoint = "https://en.wikipedia.org/w/api.php"
# Define the search parameters
params = {
"action": "query",
"format": "json",
"list": "search",
"srsearch": search_term
}
# Send a GET request to the endpoint with your parameters
response = requests.get(url = endpoint, params = params)
# Parse the results as JSON
data = json.loads(response.text)
# Get the title of the first result (this will be used as the page title in the next step)
page_title = data["query"]["search"][0]["title"]
if "may refer to" in data["query"]["search"][0]["snippet"].lower():
page_title = data["query"]["search"][1]["title"]
# Construct the URL of the Wikipedia page
page_url = "https://en.wikipedia.org/wiki/{}".format(page_title.replace(" ", "_"))
return page_url, page_title
def get_topic_context(driver, more = False):
# Find the first paragraph of the main article
first_paragraph = driver.find_element(By.CSS_SELECTOR, "div.mw-parser-output > p:not(.mw-empty-elt)").text
if more:
context_sentence = ". ".join(first_paragraph.split(". ")[:5])
else:
context_sentence = first_paragraph.split(". ")[0]
return context_sentence
# bad_words = [word for word in open("censored.txt", "r").readlines()]
bad_words = [word.strip() for word in open("censored.txt", "r").readlines()]
def refine_links(topic, links, current_url_suffix, used_links, used_topics, censor = False):
links_texts = []
# Iterate through the links and extract their URLs
for link in links:
link_url = link.get('href')
if link_url and link_url.startswith("/wiki/"):
link_url = "https://en.wikipedia.org" + link_url
link_text = link.text.strip() # Get the text and remove leading/trailing spaces
# make sure they are both not None
if link_text and current_url_suffix not in link_url:
if link_url not in used_links and link_text.lower() not in [topic.lower() for topic in used_topics]:
# eliminates topic duplicates, non-wiki links, and wiki-help pages (non-content pages)
if topic.lower() not in link_url.lower() and "en.wikipedia.org/wiki/" in link_url and ":" not in "".join(link_url.split("/")[1:]) and "Main_Page" != str(link_url.split("/")[-1]):
# censoring if needed
if censor:
if not any(word1.lower() in bad_words for word1 in [word.lower() for word in link_text.split()]):
links_texts.append((link_url, link_text))
else:
links_texts.append((link_url, link_text))
return links_texts
def play_wiki_game_2(starting_topic: str, target_topic: str, limit: int = 100, delay: int = 0):
##### Setup Chrome options
# chrome_options = webdriver.ChromeOptions()
# chrome_options.add_argument("--headless") # Ensure GUI is off
# chrome_options.add_argument("--no-sandbox")
# chrome_options.add_argument("--disable-dev-shm-usage")
# driver = webdriver.Chrome(options = chrome_options)
# options = Options()
# options.add_argument('--disable-gpu')
# options.add_argument('--headless')
# driver = get_driver()
# driver = webdriver.Firefox(options=opts)
# driver_target = webdriver.Firefox(options=opts)
#### Getting target url, topic, and context
# driver_target = webdriver.Chrome(options = chrome_options)
# driver_target = get_driver()
target_url, target_topic = search_wikipedia(search_term = target_topic)
driver_target.get(target_url)
target_context = get_topic_context(driver_target, more = True)
# update_messages(target_context)
driver_target.quit()
topic = starting_topic
num_pages = 0
used_topics = []
used_links = []
start_time = time.time()
### BEGIN ###
update_messages("-" * 150)
update_messages(f"\nStarting!\n")
update_messages("-" * 150)
url, topic = search_wikipedia(search_term = starting_topic)
driver.get(url)
used_topics.append(topic)
used_links.append(driver.current_url)
while True:
# increment the page tracking by 1 for each new page
num_pages += 1
# if not the first page, navigate to the new page
if num_pages > 1:
driver.get(next_link)
try:
context_sentence = get_topic_context(driver)
except Exception as e:
context_sentence = "Context could not be found from webpage"
current_url = driver.current_url
current_url_suffix = str(current_url).split("/")[-1]
### Use BeautifulSoup and Requests instead of Selenium for link extraction
current_page = driver.page_source # html from Selenium instead of BeautifulSoup
soup = BeautifulSoup(current_page, 'html.parser')
links = soup.find_all('a')
# get rid of any bloat in the links from the page
links_texts = refine_links(topic, links, current_url_suffix, used_links, used_topics)
# best_label, best_score, loc_idx = most_similar_sentence(target_topic = target_topic, labels_list = [text for link, text in links_texts])
best_label, best_score, loc_idx = most_similar_sentence(target_topic = target_context.lower(), labels_list = [text.lower() for link, text in links_texts])
update_messages(f"\nPage: {num_pages}")
update_messages(f"Current topic: '{topic.title()}'")
update_messages(f"Current URL: '{current_url}'")
update_messages(f"Current Topic Context: '{context_sentence}'")
if current_url != target_url:
update_messages(f"Next topic: '{best_label.title()}'. Semantic similarity to '{target_topic.title()}': {round((best_score * 100), 2)}%")
next_link, topic = links_texts[loc_idx]
used_links.append(next_link)
used_topics.append(topic)
if current_url == target_url: # because the target_url is now found through the API
update_messages("\n" + "-" * 150)
update_messages(f"\nFrom '{starting_topic.title()}', to '{target_topic.title()}' in {num_pages} pages, {round(time.time() - start_time, 2)} seconds!")
update_messages(f"Starting topic: '{starting_topic.title()}': '{used_links[0]}'")
update_messages(f"Target topic: '{target_topic.title()}': '{target_url}'\n")
update_messages("-" * 150)
driver.quit()
break
if num_pages == limit:
update_messages("\n" + "-" * 150)
update_messages(f"\nUnfortunately, the model couldn't get from '{starting_topic.title()}', to '{target_topic.title()}' in {num_pages} pages or less.")
update_messages(f"In {round(time.time() - start_time, 2)} seconds, it got from '{starting_topic.title()}': '{used_links[0]}', to '{used_topics[-1].title()}': '{used_links[-1]}'")
update_messages(f"\nTry a different combination to see if it can do it!\n")
update_messages("-" * 150)
driver.quit()
break
# delay things, if applicable
###### Example
time.sleep(delay)
# starting_topic = 'soulja boy'
# target_topic = 'urine'
# play_wiki_game(starting_topic = starting_topic, target_topic = target_topic, limit = 50) |