Spaces:
Running
Running
from __future__ import annotations | |
import os | |
import random | |
import time | |
import gradio as gr | |
from selenium import webdriver | |
from selenium.common.exceptions import WebDriverException | |
from PIL import Image | |
from io import BytesIO | |
import base64 | |
import trafilatura | |
from huggingface_hub import whoami | |
from languages import ISO_CODE_TO_LANGUAGE_NAME | |
OFFLINE = os.environ.get("OFFLINE", False) | |
def pil_image_to_base64(image): | |
# Save the image to a BytesIO buffer | |
buffer = BytesIO() | |
image.save(buffer, format="PNG") # You can change the format if needed | |
buffer.seek(0) | |
# Encode the bytes into a base64 string | |
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") | |
# Format the base64 string for use in an HTML image tag | |
html_img_tag_src = f"data:image/png;base64,{img_base64}" | |
return html_img_tag_src | |
def fetch_screenshot_and_text_from_url(url): | |
screen_width = 1080 | |
height = 350 | |
text = "" | |
if OFFLINE: | |
screenshot = Image.new('RGB', (350, height)) | |
text = f"Some dummy text for {url} (offline mode enabled)" | |
else: | |
options = webdriver.ChromeOptions() | |
options.add_argument('--headless') | |
options.add_argument('--no-sandbox') | |
options.add_argument('--disable-dev-shm-usage') | |
try: | |
driver = webdriver.Chrome(options=options) | |
#driver.set_window_size(1080, 720) # Adjust the window size here | |
driver.get(url) | |
driver.implicitly_wait(10) | |
# Wait for the page to fully load; you may adjust the sleep time or implement a wait condition | |
# time.sleep(2) | |
# fetch html from web page | |
html_str = driver.page_source | |
# Execute JS to find the full height of the rendered page | |
scroll_height = driver.execute_script("return document.body.scrollHeight") | |
# Resize the window to full page height | |
driver.set_window_size(screen_width, max(scroll_height + 200, 900)) | |
raw_screenshot = driver.get_screenshot_as_png() | |
screenshot = Image.open(BytesIO(raw_screenshot)) | |
# extract text | |
text = trafilatura.extract(html_str) | |
except WebDriverException as e: | |
screenshot = Image.new('RGB', (1, 1)) | |
finally: | |
if driver: | |
driver.quit() | |
# embed base65 encoded image as <img> tag into html string | |
screenshot_html_str = f"""<div style="width: 100%; height: {height}px; overflow-y: scroll;"><img src="{pil_image_to_base64(screenshot)}" /></div>""" | |
# return gr.update(value=html_str, visible=True), text, gr.update(visible=True) | |
return screenshot_html_str, text | |
with gr.Blocks(fill_height=True) as demo: | |
gr.Markdown( | |
""" | |
# Seed Crawl Annotator | |
""") | |
profile_state = gr.State([]) | |
gr.LoginButton() | |
with gr.Column(visible=False) as wrapper_col: | |
def handle_login(profile: gr.OAuthProfile | None) -> dict: | |
if profile: | |
gr.Info(f"Logged in as {profile.username}") | |
return { | |
profile_state: f"{profile.username}", | |
wrapper_col: gr.update(visible=True), | |
} | |
else: | |
gr.Warning(f"You need to login to use this app.") | |
return { | |
profile_state: None, | |
wrapper_col: gr.update(visible=False), | |
} | |
demo.load(handle_login, inputs=None, outputs=[profile_state, wrapper_col]) | |
url_field = gr.Textbox(label="Website URL", placeholder="Enter a URL you want to annotate", interactive=True) | |
with gr.Row(): | |
set_random_btn = gr.Button("Set Random URL", variant="secondary", interactive=True) | |
load_btn = gr.Button("Annotate URL", variant="primary", interactive=True) | |
with gr.Row(): | |
extracted_text = gr.Textbox(label="Extracted text", max_lines=15, lines=15, visible=False, placeholder="Click on `Load URL` to fetch Web page's text content.") | |
screenshot_scrollable = gr.HTML(visible=False) | |
with gr.Column(visible=False) as output_col: | |
with gr.Row(): | |
language_codes = gr.Dropdown( | |
[("unknown", "unknown")] + [(f"{code}: {name}", code) for code, name in ISO_CODE_TO_LANGUAGE_NAME.items()], | |
label="Language codes", | |
multiselect=True, | |
# allow_custom_value=True, | |
) | |
categories = gr.CheckboxGroup(["News", "Culture/History", "Government", "Political Parties", "Other"], label="Categories") | |
with gr.Row(): | |
do_crawl_btn = gr.Button("β Do Crawl", elem_classes="success") | |
dont_crawl_btn = gr.Button("β Don't Crawl", elem_classes="error") | |
# random_subpage_btn = gr.Button("π Load Another Subpage", variant="secondary") | |
def set_random_url(): | |
candidate_urls = [ | |
"http://example.com", | |
"https://wikipedia.org/", | |
"https://occiglot.eu", | |
"https://ostendorff.org", | |
"https://fr.wikipedia.org/", | |
"https://amazon.com/" | |
] | |
selected_url = random.choice(candidate_urls) | |
return selected_url | |
set_random_btn.click(fn=set_random_url, outputs=url_field) | |
def load_url(url): | |
screenshot_html_str, text = fetch_screenshot_and_text_from_url(url) | |
if not screenshot_html_str or not text: | |
gr.Error("Could not fetch data for url") | |
else: | |
return { | |
screenshot_scrollable: gr.update(value=screenshot_html_str, visible=True), | |
extracted_text: gr.update(value=text, visible=True), | |
output_col: gr.update(visible=True), | |
language_codes: "unknown", # Reset by set to invalid value # gr.update(None, label=url), | |
categories: gr.update(value=None), | |
} | |
load_btn.click(fn=load_url, inputs=url_field, outputs=[screenshot_scrollable, extracted_text, output_col, language_codes, categories], api_name="load_url") | |
def do_crawl(profile_state, url, language_codes, categories, do_crawl=True): | |
if profile_state: | |
html_str = f"<b>Thanks {profile_state}, we have saved your feedback!</b>" | |
gr.Info("Thanks for your feedback") | |
else: | |
gr.Error("Feedback could not be saved") | |
html_str = f"<b>Feedback could not be saved.</b> You are not authenticated." | |
return { | |
url_field: "", | |
output_col: gr.update(visible=False), | |
extracted_text: gr.update(value=None, visible=False), | |
screenshot_scrollable: gr.update(value="", visible=False), | |
} | |
# def do_crawl(profile_state, url, language_codes, categories): | |
# return do_crawl_or_not(profile_state, url, language_codes, categories, do_crawl=True) | |
# def dont_crawl(profile_state, url, language_codes, categories): | |
# return do_crawl_or_not(profile_state, url, language_codes, categories, do_crawl=False) | |
do_crawl_btn.click( | |
fn=do_crawl, | |
inputs=[profile_state, url_field, language_codes, categories], | |
outputs=[ | |
url_field, | |
output_col, | |
extracted_text, | |
screenshot_scrollable | |
], | |
api_name="do_crawl", | |
) | |
dont_crawl_btn.click( | |
fn=do_crawl, | |
inputs=[profile_state, url_field, language_codes, categories], | |
outputs=[ | |
url_field, | |
output_col, | |
extracted_text, | |
screenshot_scrollable | |
], | |
api_name="do_crawl", | |
) | |
# dont_crawl_btn.click(fn=dont_crawl, inputs=[profile_state, url, language_codes, categories], outputs=[url, output_col, extracted_text, screenshot_scrollable], api_name="dont_crawl") | |
# def random_subpage(url): | |
# new_url = "http://example.com" | |
# return [new_url, *fetch_screenshot_and_text_from_url(new_url)] | |
# random_subpage_btn.click(fn=random_subpage, inputs=url, outputs=[url, screenshot_scrollable, extracted_text, output_col], api_name="load_random_subpage") | |
if __name__ == "__main__": | |
demo.launch() |