|
import gradio as gr |
|
import requests |
|
from bs4 import BeautifulSoup |
|
import pytz |
|
from datetime import datetime, timedelta |
|
import logging |
|
import traceback |
|
from typing import List, Dict, Any |
|
import hashlib |
|
import icalendar |
|
import uuid |
|
import re |
|
import json |
|
import os |
|
|
|
|
|
try: |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
import torch |
|
TRANSFORMERS_AVAILABLE = True |
|
except ImportError: |
|
TRANSFORMERS_AVAILABLE = False |
|
|
|
|
|
from huggingface_hub import InferenceClient |
|
|
|
class EventScraper: |
|
def __init__(self, urls, timezone='Europe/Berlin'): |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
self.logger = logging.getLogger(__name__) |
|
|
|
|
|
self.timezone = pytz.timezone(timezone) |
|
|
|
|
|
self.urls = urls if isinstance(urls, list) else [urls] |
|
|
|
|
|
self.event_cache = set() |
|
|
|
|
|
self.calendar = icalendar.Calendar() |
|
self.calendar.add('prodid', '-//Event Scraper//example.com//') |
|
self.calendar.add('version', '2.0') |
|
|
|
|
|
self.model = None |
|
self.tokenizer = None |
|
self.client = None |
|
|
|
def setup_llm(self): |
|
"""Setup Hugging Face LLM for event extraction""" |
|
|
|
if TRANSFORMERS_AVAILABLE: |
|
try: |
|
model_name = "meta-llama/Llama-3.2-1B-Instruct" |
|
self.tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
model_name, |
|
torch_dtype=torch.float16, |
|
return_dict_in_generate=False, |
|
device_map='auto' |
|
) |
|
return |
|
except Exception as local_err: |
|
gr.Warning(f"Local model setup failed: {str(local_err)}") |
|
|
|
|
|
try: |
|
|
|
hf_token = os.getenv('HF_TOKEN') |
|
|
|
|
|
if hf_token: |
|
self.client = InferenceClient( |
|
model="meta-llama/Llama-3.2-3B-Instruct", |
|
token=hf_token |
|
) |
|
else: |
|
|
|
self.client = InferenceClient( |
|
model="meta-llama/Llama-3.2-3B-Instruct" |
|
) |
|
except Exception as e: |
|
gr.Warning(f"Inference Client setup error: {str(e)}") |
|
raise |
|
|
|
def generate_with_model(self, prompt): |
|
"""Generate text using either local model or inference client""" |
|
print("------ PROMPT ------------") |
|
print(prompt) |
|
print("------ PROMPT ------------") |
|
if self.model and self.tokenizer: |
|
|
|
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device) |
|
outputs = self.model.generate( |
|
inputs.input_ids, |
|
max_new_tokens=12000, |
|
do_sample=True, |
|
temperature=0.9 |
|
) |
|
return self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
|
elif self.client: |
|
|
|
return self.client.text_generation( |
|
prompt, |
|
max_new_tokens=2000, |
|
temperature=0.9 |
|
) |
|
|
|
else: |
|
raise ValueError("No model or client available for text generation") |
|
|
|
def fetch_webpage_content(self, url): |
|
"""Fetch webpage content""" |
|
try: |
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
} |
|
response = requests.get(url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
return response.text |
|
except Exception as e: |
|
gr.Warning(f"Error fetching {url}: {str(e)}") |
|
return "" |
|
|
|
def extract_text_from_html(self, html_content): |
|
"""Extract readable text from HTML""" |
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
|
|
for script in soup(["script", "style", "nav", "header", "footer"]): |
|
script.decompose() |
|
|
|
text = soup.get_text(separator=' ', strip=True) |
|
return ' '.join(text.split()[:2000]) |
|
|
|
def generate_event_extraction_prompt(self, text): |
|
"""Create prompt for LLM to extract event details""" |
|
|
|
prompt=f''' |
|
<|start_header_id|>system<|end_header_id|> |
|
|
|
<|eot_id|><|start_header_id|>user<|end_header_id|> |
|
You are an event extraction assistant. |
|
Find and extract all events from the following text. |
|
For each event, provide: |
|
- Exact event name |
|
- Date (DD.MM.YYYY) |
|
- Time (HH:MM if available) |
|
- Location |
|
- Short description |
|
|
|
Important: Extract ALL possible events. |
|
Text to analyze: |
|
{text} |
|
|
|
Output ONLY a JSON list of events like this - Response Format: |
|
[ |
|
{{ |
|
"name": "Event Name", |
|
"date": "07.12.2024", |
|
"time": "19:00", |
|
"location": "Event Location", |
|
"description": "Event details" |
|
}} |
|
] |
|
|
|
If NO events are found, return an empty list []. |
|
Only return the json. nothing else. no comments.<|eot_id|><|start_header_id|>assistant<|end_header_id|> |
|
''' |
|
|
|
return prompt |
|
|
|
def parse_llm_response(self, response): |
|
"""Parse LLM's text response into structured events""" |
|
try: |
|
|
|
response = response.strip() |
|
|
|
|
|
def flatten_events(data): |
|
if isinstance(data, list): |
|
flattened = [] |
|
for item in data: |
|
if isinstance(item, list): |
|
flattened.extend(flatten_events(item)) |
|
elif isinstance(item, dict): |
|
flattened.append(item) |
|
return flattened |
|
return [] |
|
|
|
try: |
|
|
|
events = json.loads(response) |
|
events = flatten_events(events) |
|
except json.JSONDecodeError: |
|
|
|
import re |
|
json_match = re.search(r'\[.*\]', response, re.DOTALL | re.MULTILINE) |
|
if json_match: |
|
try: |
|
events = json.loads(json_match.group(0)) |
|
events = flatten_events(events) |
|
except json.JSONDecodeError: |
|
events = [] |
|
else: |
|
events = [] |
|
|
|
|
|
cleaned_events = [] |
|
for event in events: |
|
|
|
if event.get('name'): |
|
|
|
event.setdefault('date', '') |
|
event.setdefault('time', '') |
|
event.setdefault('location', '') |
|
event.setdefault('description', '') |
|
cleaned_events.append(event) |
|
|
|
return cleaned_events |
|
|
|
except Exception as e: |
|
gr.Warning(f"Parsing error: {str(e)}") |
|
return [] |
|
|
|
def scrape_events(self): |
|
"""Main method to scrape events from all URLs""" |
|
|
|
self.setup_llm() |
|
|
|
all_events = [] |
|
|
|
for url in self.urls: |
|
try: |
|
|
|
html_content = self.fetch_webpage_content(url) |
|
|
|
|
|
text_content = self.extract_text_from_html(html_content) |
|
|
|
|
|
prompt = self.generate_event_extraction_prompt(text_content) |
|
|
|
|
|
response = self.generate_with_model(prompt) |
|
|
|
print("------ response ------------") |
|
print(response) |
|
print("------ response ------------") |
|
|
|
|
|
parsed_events = self.parse_llm_response(response) |
|
|
|
|
|
for event in parsed_events: |
|
event_hash = hashlib.md5(str(event).encode()).hexdigest() |
|
if event_hash not in self.event_cache: |
|
self.event_cache.add(event_hash) |
|
all_events.append(event) |
|
|
|
|
|
try: |
|
ical_event = self.create_ical_event(event) |
|
self.calendar.add_component(ical_event) |
|
except Exception as ical_error: |
|
gr.Warning(f"iCal creation error: {str(ical_error)}") |
|
|
|
except Exception as e: |
|
gr.Warning(f"Error processing {url}: {str(e)}") |
|
|
|
return all_events |
|
|
|
def create_ical_event(self, event): |
|
"""Convert event to iCal format""" |
|
ical_event = icalendar.Event() |
|
|
|
|
|
ical_event.add('uid', str(uuid.uuid4())) |
|
|
|
|
|
ical_event.add('summary', event.get('name', 'Unnamed Event')) |
|
|
|
|
|
ical_event.add('description', event.get('description', '')) |
|
|
|
|
|
if event.get('location'): |
|
ical_event.add('location', event['location']) |
|
|
|
|
|
try: |
|
|
|
if event.get('date'): |
|
try: |
|
event_date = datetime.strptime(event['date'], '%d.%m.%Y').date() |
|
|
|
|
|
event_time = datetime.strptime(event.get('time', '00:00'), '%H:%M').time() if event.get('time') else datetime.min.time() |
|
|
|
|
|
event_datetime = datetime.combine(event_date, event_time) |
|
|
|
|
|
localized_datetime = self.timezone.localize(event_datetime) |
|
|
|
|
|
if event_time == datetime.min.time(): |
|
start_datetime = localized_datetime.replace(hour=0, minute=0, second=0) |
|
end_datetime = (start_datetime + timedelta(days=1)).replace(hour=23, minute=59, second=59) |
|
|
|
|
|
ical_event.add('dtstart', start_datetime.date()) |
|
ical_event.add('dtend', end_datetime.date()) |
|
ical_event.add('x-microsoft-cdo-alldayevent', 'TRUE') |
|
else: |
|
|
|
end_datetime = localized_datetime + timedelta(hours=1) |
|
|
|
|
|
ical_event['dtstart'] = icalendar.prop.vDDDTypes(localized_datetime) |
|
ical_event['dtstart'].params['TZID'] = 'Europe/Berlin' |
|
|
|
ical_event['dtend'] = icalendar.prop.vDDDTypes(end_datetime) |
|
ical_event['dtend'].params['TZID'] = 'Europe/Berlin' |
|
|
|
except ValueError as date_err: |
|
gr.Warning(f"Date parsing error: {date_err}") |
|
|
|
except Exception as e: |
|
gr.Warning(f"iCal event creation error: {str(e)}") |
|
|
|
return ical_event |
|
|
|
|
|
def get_ical_string(self): |
|
"""Return iCal as a string""" |
|
return self.calendar.to_ical().decode('utf-8') |
|
|
|
def scrape_events_with_urls(urls): |
|
"""Wrapper function for Gradio interface""" |
|
|
|
url_list = [url.strip() for url in re.split(r'[\n,]+', urls) if url.strip()] |
|
|
|
if not url_list: |
|
gr.Warning("Please provide at least one valid URL.") |
|
return "", "" |
|
|
|
try: |
|
|
|
scraper = EventScraper(url_list) |
|
|
|
|
|
events = scraper.scrape_events() |
|
|
|
|
|
events_str = json.dumps(events, indent=2) |
|
|
|
|
|
ical_string = scraper.get_ical_string() |
|
|
|
return events_str, ical_string |
|
|
|
except Exception as e: |
|
gr.Warning(f"Error in event scraping: {str(e)}") |
|
return "", "" |
|
|
|
|
|
def create_gradio_app(): |
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Event Scraper ๐๏ธ") |
|
gr.Markdown("Scrape events from web pages using an AI-powered event extraction tool.") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
url_input = gr.Textbox( |
|
label="Enter URLs (comma or newline separated)", |
|
placeholder="https://example.com/events\nhttps://another-site.com/calendar" |
|
) |
|
scrape_btn = gr.Button("Scrape Events", variant="primary") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
events_output = gr.Textbox(label="Extracted Events (JSON)", lines=10) |
|
with gr.Column(): |
|
ical_output = gr.Textbox(label="iCal Export", lines=10) |
|
|
|
scrape_btn.click( |
|
fn=scrape_events_with_urls, |
|
inputs=url_input, |
|
outputs=[events_output, ical_output] |
|
) |
|
|
|
gr.Markdown("**Note:** Requires an internet connection and may take a few minutes to process.") |
|
gr.Markdown("Set HF_TOKEN environment variable for authenticated access.") |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
demo = create_gradio_app() |
|
demo.launch() |