import gradio as gr import requests import re import logging import json from bs4 import BeautifulSoup from urllib.parse import urlparse from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline import torch from typing import Tuple, List, Dict import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer # Download required NLTK data nltk.download('punkt') nltk.download('punkt_tab') nltk.download('stopwords') nltk.download('wordnet') # For lemmatization # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class DialoGPTModel: def __init__(self, model_name="microsoft/DialoGPT-small"): try: self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModelForCausalLM.from_pretrained(model_name) except Exception as e: logging.error(f"Error loading DialoGPT model: {e}") raise def generate_text(self, prompt: str, max_length: int = 300, temperature: float = 0.7) -> str: try: inputs = self.tokenizer.encode(prompt, return_tensors="pt", truncation=True, max_length=max_length) outputs = self.model.generate( inputs, max_length=max_length, num_return_sequences=1, temperature=temperature, pad_token_id=self.tokenizer.eos_token_id, no_repeat_ngram_size=3, do_sample=True ) response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) original_prompt = self.tokenizer.decode(inputs[0], skip_special_tokens=True) generated_text = response[len(original_prompt):].strip() return generated_text except Exception as e: logging.error(f"Error generating text: {e}") return "" class SmartWebScraper: def __init__(self): self.text_generator = DialoGPTModel() self.lemmatizer = WordNetLemmatizer() self.stop_words = set(stopwords.words('english')) def process_query(self, query: str) -> Tuple[str, List[str]]: tokens = word_tokenize(query.lower()) tokens = [self.lemmatizer.lemmatize(word) for word in tokens if word not in self.stop_words] action_words = ['find', 'get', 'show', 'list', 'count', 'extract', 'analyze', 'search', 'retrieve', 'display', 'summarize', 'identify', 'gather', 'fetch'] targets = ['image', 'video', 'link', 'text', 'price', 'rss', 'title', 'heading', 'meta', 'audio', 'button', 'form', 'table', 'picture', 'photo', 'paragraph', 'header', 'footer'] query_action = next((word for word in tokens if word in action_words), 'extract') query_targets = [word for word in tokens if word in targets] return query_action, query_targets def extract_data(self, url: str, query: str) -> dict: try: response = requests.get(url, timeout=10) soup = BeautifulSoup(response.text, 'html.parser') action, targets = self.process_query(query) results = {} if not targets: targets = ['text', 'link', 'image'] for target in targets: if target == 'image': results['images'] = [{'src': img.get('src'), 'alt': img.get('alt', ''), 'title': img.get('title', '')} for img in soup.find_all('img') if img.get('src')] elif target == 'video': results['videos'] = [{'src': vid.get('src'), 'type': vid.get('type', '')} for vid in soup.find_all(['video', 'iframe'])] elif target == 'link': results['links'] = [{'href': a.get('href'), 'text': a.get_text(strip=True)} for a in soup.find_all('a') if a.get('href')] elif target == 'text': results['text'] = [p.get_text(strip=True) for p in soup.find_all(['p', 'article', 'section']) if p.get_text(strip=True)] elif target == 'price': text_content = soup.get_text() results['prices'] = re.findall(r'\$\d+(?:\.\d{2})?', text_content) elif target == 'rss': feed_links = [link.get('href') for link in soup.find_all('link', type='application/rss+xml')] if feed_links: results['rss'] = [] for feed_link in feed_links: feed = feedparser.parse(feed_link) results['rss'].append({'title': feed.feed.get('title', ''), 'link': feed_link, 'entries': len(feed.entries)}) elif target == 'meta': results['meta'] = { 'title': soup.title.string if soup.title else '', 'description': soup.find('meta', {'name': 'description'}).get('content', '') if soup.find('meta', {'name': 'description'}) else '', 'keywords': soup.find('meta', {'name': 'keywords'}).get('content', '') if soup.find('meta', {'name': 'keywords'}) else '' } elif target == 'audio': results['audios'] = [{'src': audio.get('src'), 'type': audio.get('type', '')} for audio in soup.find_all('audio') if audio.get('src')] elif target == 'button': results['buttons'] = [{'text': button.get_text(strip=True), 'type': button.get('type', '')} for button in soup.find_all('button')] elif target == 'form': results['forms'] = [{'action': form.get('action', ''), 'method': form.get('method', '')} for form in soup.find_all('form')] return results except Exception as e: logging.error(f"Error extracting data: {str(e)}") return {"error": str(e)} def format_response(self, data: dict, query: str) -> str: if "error" in data: return f"I encountered an error while processing your request: {data['error']}" # Create a structured summary of the data summary = [] query_lower = query.lower() # First, collect summary information if "images" in data: summary.append(f"Found {len(data['images'])} images") if "links" in data: summary.append(f"Found {len(data['links'])} links") if "text" in data: summary.append(f"Found {len(data['text'])} text blocks") if "prices" in data: summary.append(f"Found {len(data['prices'])} price mentions") # Handle specific query types if "how many" in query_lower: if "image" in query_lower and "images" in data: return f"There are {len(data['images'])} images on the webpage." elif "link" in query_lower and "links" in data: return f"There are {len(data['links'])} links on the webpage." elif "price" in query_lower and "prices" in data: return f"There are {len(data['prices'])} prices mentioned on the webpage." elif "text" in query_lower and "text" in data: return f"There are {len(data['text'])} text blocks on the webpage." if "show" in query_lower or "list" in query_lower: if "image" in query_lower and "images" in data: images = data['images'][:5] # Limit to 5 images return "Here are up to 5 images found:\n" + "\n".join([f"- {img['alt'] or 'No description'} ({img['src']})" for img in images]) elif "link" in query_lower and "links" in data: links = data['links'][:5] # Limit to 5 links return "Here are up to 5 links found:\n" + "\n".join([f"- {link['text'] or 'No text'} ({link['href']})" for link in links]) elif "text" in query_lower and "text" in data: texts = data['text'][:3] # Limit to 3 text blocks return "Here are up to 3 text blocks found:\n" + "\n".join([f"- {text[:100]}..." for text in texts]) # If no specific handling matched, return general summary if summary: return "Here's what I found on the webpage:\n" + "\n".join(summary) return "I couldn't find any relevant information based on your query." def handle_query(self, query: str, url: str) -> str: if not url: return "Please provide a URL to analyze." try: parsed_url = urlparse(url) if not all([parsed_url.scheme, parsed_url.netloc]): return "Please provide a valid URL (including http:// or https://)." # Add timeout to prevent hanging data = self.extract_data(url, query) response = self.format_response(data, query) # Validate response if not response or response.isspace(): return "I couldn't generate a meaningful response based on the available data." return response except Exception as e: logging.error(f"Error processing request: {str(e)}") return f"An error occurred while processing your request: {str(e)}" def create_interface(): scraper = SmartWebScraper() def process_request(query: str, url: str) -> str: return scraper.handle_query(query, url) with gr.Blocks() as demo: gr.Markdown("# Smart Web Scraper") gr.Markdown("Ask me anything about a webpage, and I'll try to find the information you need!") with gr.Row(): url_input = gr.Textbox(label="Website URL", placeholder="https://example.com") query_input = gr.Textbox(label="What would you like to know about this page?", placeholder="E.g., 'How many images are there?' or 'Show me all the links'") output = gr.Textbox(label="Results", lines=10) submit_btn = gr.Button("Analyze") submit_btn.click(fn=process_request, inputs=[query_input, url_input], outputs=output) gr.Markdown(""" ## Example queries you can try: - "How many images are on this page?" - "Show me all the links" - "Find prices on this page" - "Is there an RSS feed?" - "What's the page about?" - "Extract all text content" - "Find video content" - "Retrieve all buttons" - "List all forms" """) return demo if __name__ == "__main__": demo = create_interface() # Assign the returned Gradio interface to 'demo' demo.launch(debug=True)