Spaces:
Running
Running
import json | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import zipfile | |
import tempfile | |
import chardet | |
import io | |
import csv | |
import xml.etree.ElementTree as ET | |
from datetime import datetime | |
from typing import List, Dict, Optional, Union, Tuple, Any | |
from pathlib import Path | |
from urllib.parse import urlparse, urljoin | |
import requests | |
import validators | |
import gradio as gr | |
from diskcache import Cache | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode | |
from PIL import Image, ImageDraw, ImageFont | |
import numpy as np | |
import tarfile | |
import gzip | |
import math | |
import random | |
import pandas as pd | |
from requests.adapters import HTTPAdapter | |
from urllib3.util.retry import Retry | |
# Setup enhanced logging with more detailed formatting | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
]) | |
logger = logging.getLogger(__name__) | |
# Conditional imports for document processing | |
try: | |
from PyPDF2 import PdfReader | |
PDF_SUPPORT = True | |
except ImportError: | |
PDF_SUPPORT = False | |
logger.warning("PyPDF2 not installed. PDF file processing will be limited.") | |
try: | |
from docx import Document | |
DOCX_SUPPORT = True | |
except ImportError: | |
DOCX_SUPPORT = False | |
logger.warning("python-docx not installed. DOCX file processing will be limited.") | |
try: | |
from pyth.plugins.plaintext.writer import PlaintextWriter | |
from pyth.plugins.rtf15.reader import Rtf15Reader | |
RTF_SUPPORT = True | |
except ImportError: | |
RTF_SUPPORT = False | |
logger.warning("pyth not installed. RTF file processing will be limited.") | |
try: | |
from odf.opendocument import OpenDocumentText | |
from odf import text as odftext | |
ODT_SUPPORT = True | |
except ImportError: | |
ODT_SUPPORT = False | |
logger.warning("odfpy not installed. ODT file processing will be limited.") | |
# Ensure output directories exist with modern structure | |
OUTPUTS_DIR = Path('output') | |
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' | |
TEMP_DIR = OUTPUTS_DIR / 'temp' | |
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: | |
directory.mkdir(parents=True, exist_ok=True) | |
class EnhancedURLProcessor: | |
"""Advanced URL processing with enhanced content extraction and recursive link following.""" | |
def __init__(self): | |
# Use a real requests session with retry strategy | |
self.session = requests.Session() | |
retry_strategy = Retry( | |
total=3, | |
backoff_factor=1, | |
status_forcelist=[429, 500, 502, 503, 504], | |
allowed_methods=["HEAD", "GET"] | |
) | |
adapter = HTTPAdapter(max_retries=retry_strategy) | |
self.session.mount("http://", adapter) | |
self.session.mount("https://", adapter) | |
self.user_agent = UserAgent() | |
self.timeout = 15 # seconds | |
def validate_url(self, url: str) -> Dict[str, Any]: | |
"""Enhanced URL validation with accessibility check.""" | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format', | |
'details': 'URL must begin with http:// or https://'} | |
parsed = urlparse(url) | |
if not all([parsed.scheme, parsed.netloc]): | |
return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} | |
try: | |
# Use a HEAD request to check accessibility without downloading full content | |
headers = {'User-Agent': self.user_agent.random} | |
response = self.session.head(url, timeout=self.timeout, headers=headers, allow_redirects=True) | |
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
# Check content type if available in HEAD response | |
content_type = response.headers.get('Content-Type', '').split(';')[0].strip() | |
if not content_type or not ( | |
content_type.startswith('text/') or 'json' in content_type or 'xml' in content_type): | |
# Basic check if content type seems relevant for text extraction | |
logger.warning(f"URL {url} returned potentially irrelevant content type: {content_type}") | |
# Decide if this should invalidate the URL or just add a note | |
# For now, we'll allow fetching but add a note. | |
return { | |
'is_valid': True, | |
'message': 'URL is valid and accessible', | |
'details': { | |
'final_url': response.url, # Capture final URL after redirects | |
'content_type': content_type, | |
'server': response.headers.get('Server', 'N/A'), | |
'size': response.headers.get('Content-Length', 'N/A') | |
} | |
} | |
except requests.exceptions.RequestException as e: | |
return {'is_valid': False, 'message': 'URL not accessible', 'details': str(e)} | |
except Exception as e: | |
logger.error(f"Unexpected error during URL validation for {url}: {e}") | |
return {'is_valid': False, 'message': 'Unexpected validation error', 'details': str(e)} | |
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict[str, Any]]: | |
"""Enhanced content fetcher with retry mechanism and complete character extraction.""" | |
try: | |
logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1})") | |
headers = {'User-Agent': self.user_agent.random} | |
response = self.session.get(url, timeout=self.timeout, headers=headers, allow_redirects=True) | |
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
final_url = response.url # Capture potential redirects | |
content_type = response.headers.get('Content-Type', '').split(';')[0].strip() | |
# Attempt to detect encoding if not specified in headers | |
encoding = response.encoding # requests attempts to guess encoding | |
if encoding is None or encoding == 'ISO-8859-1': # Fallback if requests guess is default/uncertain | |
try: | |
encoding_detection = chardet.detect(response.content) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
logger.debug(f"Chardet detected encoding: {encoding} for {url}") | |
except Exception as e: | |
logger.warning(f"Chardet detection failed for {url}: {e}. Falling back to utf-8.") | |
encoding = 'utf-8' | |
raw_content = response.content.decode(encoding, errors='replace') | |
# Extract metadata | |
metadata = { | |
'original_url': url, | |
'final_url': final_url, | |
'timestamp': datetime.now().isoformat(), | |
'detected_encoding': encoding, | |
'content_type': content_type, | |
'content_length': len(response.content), | |
'headers': dict(response.headers), | |
'status_code': response.status_code | |
} | |
# Process based on content type | |
processed_extraction = self._process_web_content(raw_content, metadata['content_type'], final_url) | |
return { | |
'source': 'url', | |
'url': url, # Keep original URL as identifier for this step | |
'raw_content': raw_content, | |
'metadata': metadata, | |
'extracted_data': processed_extraction['data'], | |
'processing_notes': processed_extraction['notes'] | |
} | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Failed to fetch content from {url}: {e}") | |
return { | |
'source': 'url', | |
'url': url, | |
'raw_content': None, | |
'metadata': {'original_url': url, 'timestamp': datetime.now().isoformat(), | |
'status_code': getattr(e.response, 'status_code', None)}, | |
'extracted_data': None, | |
'processing_notes': [f"Failed to fetch content: {str(e)}"] | |
} | |
except Exception as e: | |
logger.error(f"Unexpected error while fetching or processing URL {url}: {e}") | |
return { | |
'source': 'url', | |
'url': url, | |
'raw_content': raw_content if 'raw_content' in locals() else None, | |
'metadata': metadata if 'metadata' in locals() else {'original_url': url, | |
'timestamp': datetime.now().isoformat(), | |
'status_code': None}, | |
'extracted_data': None, | |
'processing_notes': [f"Unexpected processing error: {str(e)}"] | |
} | |
def _process_web_content(self, content: str, content_type: str, base_url: str) -> Dict[str, Any]: | |
"""Process content based on detected content type""" | |
lower_content_type = content_type.lower() | |
notes = [] | |
extracted_data: Any = None | |
try: | |
if 'text/html' in lower_content_type: | |
logger.debug(f"Processing HTML content from {base_url}") | |
extracted_data = self._process_html_content_enhanced(content, base_url) | |
notes.append("Processed as HTML") | |
elif 'application/json' in lower_content_type or 'text/json' in lower_content_type: | |
logger.debug(f"Processing JSON content from {base_url}") | |
try: | |
extracted_data = json.loads(content) | |
notes.append("Parsed as JSON") | |
except json.JSONDecodeError as e: | |
extracted_data = content | |
notes.append(f"Failed to parse as JSON: {e}") | |
logger.warning(f"Failed to parse JSON from {base_url}: {e}") | |
except Exception as e: | |
extracted_data = content | |
notes.append(f"Error processing JSON: {e}") | |
logger.error(f"Error processing JSON from {base_url}: {e}") | |
elif 'application/xml' in lower_content_type or 'text/xml' in lower_content_type or lower_content_type.endswith( | |
'+xml'): | |
logger.debug(f"Processing XML content from {base_url}") | |
try: | |
root = ET.fromstring(content) | |
xml_text = ET.tostring(root, encoding='unicode', method='xml') | |
extracted_data = xml_text | |
notes.append("Parsed as XML (text representation)") | |
except ET.ParseError as e: | |
extracted_data = content | |
notes.append(f"Failed to parse as XML: {e}") | |
logger.warning(f"Failed to parse XML from {base_url}: {e}") | |
except Exception as e: | |
extracted_data = content | |
notes.append(f"Error processing XML: {e}") | |
logger.error(f"Error processing XML from {base_url}: {e}") | |
elif 'text/plain' in lower_content_type or 'text/' in lower_content_type: | |
logger.debug(f"Processing Plain Text content from {base_url}") | |
extracted_data = content | |
notes.append("Processed as Plain Text") | |
else: | |
logger.debug(f"Unknown content type '{content_type}' from {base_url}. Storing raw content.") | |
extracted_data = content | |
notes.append(f"Unknown content type '{content_type}'. Stored raw text.") | |
except Exception as e: | |
logger.error(f"Unexpected error in _process_web_content for {base_url} ({content_type}): {e}") | |
extracted_data = content | |
notes.append(f"Unexpected processing error: {e}. Stored raw text.") | |
return {'data': extracted_data, 'notes': notes} | |
def _process_html_content_enhanced(self, content: str, base_url: str) -> Dict[str, Any]: | |
"""Process HTML content, preserving text, and extracting metadata and links.""" | |
extracted: Dict[str, Any] = { | |
'title': None, | |
'meta_description': None, | |
'full_text': "", | |
'links': [] | |
} | |
try: | |
soup = BeautifulSoup(content, 'html.parser') | |
if soup.title and soup.title.string: | |
extracted['title'] = soup.title.string.strip() | |
meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
if meta_desc and meta_desc.get('content'): | |
extracted['meta_description'] = meta_desc['content'].strip() | |
unique_links = set() | |
for a_tag in soup.find_all('a', href=True): | |
href = a_tag['href'].strip() | |
if href and not href.startswith(('#', 'mailto:', 'tel:', 'javascript:')): | |
text = a_tag.get_text().strip() | |
try: | |
absolute_url = urljoin(base_url, href) | |
if absolute_url not in unique_links: | |
extracted['links'].append({'text': text, 'url': absolute_url}) | |
unique_links.add(absolute_url) | |
except Exception: | |
if validators.url(href) and href not in unique_links: | |
extracted['links'].append({'text': text, 'url': href}) | |
unique_links.add(href) | |
elif urlparse(href).netloc and href not in unique_links: | |
extracted['links'].append({'text': text, 'url': href}) | |
unique_links.add(href) | |
soup_copy = BeautifulSoup(content, 'html.parser') | |
for script_or_style in soup_copy(["script", "style"]): | |
script_or_style.extract() | |
text = soup_copy.get_text(separator='\n') | |
lines = text.splitlines() | |
cleaned_lines = [line.strip() for line in lines if line.strip()] | |
extracted['full_text'] = '\n'.join(cleaned_lines) | |
except Exception as e: | |
logger.error(f"Enhanced HTML processing error for {base_url}: {e}") | |
soup_copy = BeautifulSoup(content, 'html.parser') | |
for script_or_style in soup_copy(["script", "style"]): | |
script_or_style.extract() | |
extracted['full_text'] = soup_copy.get_text(separator='\n').strip() | |
extracted['processing_error'] = f"Enhanced HTML processing failed: {e}" | |
return extracted | |
def fetch_content_with_depth(self, url: str, max_steps: int = 0) -> Dict[str, Any]: | |
"""Fetches content from a URL and recursively follows links up to max_steps depth.""" | |
if not isinstance(max_steps, int) or not (0 <= max_steps <= 10): | |
logger.error(f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 10.") | |
return { | |
'url': url, | |
'level': 0, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'processing_notes': [f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 10."] | |
} | |
validation_result = self.validate_url(url) | |
if not validation_result['is_valid']: | |
logger.error(f"Initial URL validation failed for {url}: {validation_result['message']}") | |
return { | |
'url': url, | |
'level': 0, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'processing_notes': [f"Initial URL validation failed: {validation_result['message']}"] | |
} | |
# Use a set to keep track of visited URLs during the crawl to avoid infinite loops | |
visited_urls = set() | |
return self._fetch_content_recursive(url, max_steps, current_step=0, visited_urls=visited_urls) | |
def _fetch_content_recursive(self, url: str, max_steps: int, current_step: int, | |
visited_urls: set) -> Dict[str, Any]: | |
"""Recursive helper function to fetch content and follow links.""" | |
if current_step > max_steps: | |
logger.debug(f"Depth limit ({max_steps}) reached for {url} at level {current_step}.") | |
return { | |
'url': url, | |
'level': current_step, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'processing_notes': [f"Depth limit ({max_steps}) reached."] | |
} | |
# Normalize URL before checking visited set | |
normalized_url = url.rstrip('/') # Simple normalization | |
if normalized_url in visited_urls: | |
logger.debug(f"Skipping already visited URL: {url} at level {current_step}.") | |
return { | |
'url': url, | |
'level': current_step, | |
'fetch_result': None, # Indicate not fetched in this run | |
'linked_extractions': [], | |
'processing_notes': ["URL already visited in this crawl."] | |
} | |
visited_urls.add(normalized_url) # Mark as visited | |
logger.info(f"Processing URL: {url} at level {current_step}/{max_steps}") | |
fetch_result = self.fetch_content(url) | |
linked_extractions: List[Dict[str, Any]] = [] | |
if fetch_result and fetch_result.get('extracted_data') and 'text/html' in fetch_result.get('metadata', {}).get( | |
'content_type', '').lower(): | |
extracted_data = fetch_result['extracted_data'] | |
links = extracted_data.get('links', []) | |
logger.info( | |
f"Found {len(links)} potential links on {url} at level {current_step}. Proceeding to depth {current_step + 1}.") | |
if current_step < max_steps: | |
for link_info in links: | |
linked_url = link_info.get('url') | |
if linked_url: | |
# Ensure linked URL is absolute and potentially within the same domain | |
# Simple same-domain check (can be made more sophisticated) | |
try: | |
base_domain = urlparse(url).netloc | |
linked_domain = urlparse(linked_url).netloc | |
if linked_domain and linked_domain != base_domain: | |
logger.debug(f"Skipping external link: {linked_url}") | |
continue # Skip external links | |
# Recursively call for linked URLs | |
linked_result = self._fetch_content_recursive(linked_url, max_steps, current_step + 1, | |
visited_urls) | |
if linked_result: | |
linked_extractions.append(linked_result) | |
except Exception as e: | |
logger.warning(f"Error processing linked URL {linked_url} from {url}: {e}") | |
current_notes = fetch_result.get('processing_notes', []) if fetch_result else ['Fetch failed.'] | |
if f"Processed at level {current_step}" not in current_notes: | |
current_notes.append(f"Processed at level {current_step}") | |
return { | |
'url': url, | |
'level': current_step, | |
'fetch_result': fetch_result, | |
'linked_extractions': linked_extractions, | |
'processing_notes': current_notes | |
} | |
class EnhancedFileProcessor: | |
"""Advanced file processing with enhanced content extraction""" | |
def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): # 5GB default | |
self.max_file_size = max_file_size | |
self.supported_extensions = { | |
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', | |
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', | |
'.pdf', '.doc', '.docx', '.rtf', '.odt', | |
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar', | |
} | |
self.archive_extensions = {'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar'} | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling and complete extraction""" | |
if not file or not hasattr(file, 'name'): | |
logger.warning("Received invalid file object.") | |
return [] | |
dataset = [] | |
file_path = Path(file.name) | |
if not file_path.exists(): | |
logger.error(f"File path does not exist: {file_path}") | |
return [{ | |
'source': 'file', | |
'filename': file.name if hasattr(file, 'name') else 'unknown', | |
'file_size': None, | |
'extracted_data': None, | |
'processing_notes': ['File path does not exist.'] | |
}] | |
try: | |
file_size = file_path.stat().st_size | |
if file_size > self.max_file_size: | |
logger.warning( | |
f"File '{file_path.name}' size ({file_size} bytes) exceeds maximum allowed size ({self.max_file_size} bytes).") | |
return [{ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'extracted_data': None, | |
'processing_notes': ['File size exceeds limit.'] | |
}] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_dir_path = Path(temp_dir) | |
if file_path.suffix.lower() in self.archive_extensions: | |
dataset.extend(self._process_archive(file_path, temp_dir_path)) | |
elif file_path.suffix.lower() in self.supported_extensions: | |
dataset.extend(self._process_single_file(file_path)) | |
else: | |
logger.warning(f"Unsupported file type for processing: '{file_path.name}'. Attempting to read as plain text.") | |
try: | |
content_bytes = file_path.read_bytes() | |
encoding_detection = chardet.detect(content_bytes) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
raw_content = content_bytes.decode(encoding, errors='replace') | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', | |
'extracted_data': {'plain_text': raw_content}, | |
'processing_notes': ['Processed as plain text (unsupported extension).'] | |
}) | |
except Exception as e: | |
logger.error(f"Error reading or processing unsupported file '{file_path.name}' as text: {e}") | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', | |
'extracted_data': None, | |
'processing_notes': [f'Unsupported file type and failed to read as text: {e}'] | |
}) | |
except Exception as e: | |
logger.error(f"Error processing file '{file_path.name}': {str(e)}") | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size if 'file_size' in locals() else None, | |
'extracted_data': None, | |
'processing_notes': [f'Overall file processing error: {str(e)}'] | |
}) | |
return dataset | |
def _is_archive(self, filepath: Union[str, Path]) -> bool: | |
"""Check if file is an archive""" | |
p = Path(filepath) if isinstance(filepath, str) else filepath | |
return p.suffix.lower() in self.archive_extensions | |
def _process_single_file(self, file_path: Path) -> List[Dict]: | |
"""Process a single file with enhanced character extraction and format-specific handling""" | |
dataset_entries = [] | |
filename = file_path.name | |
file_size = file_path.stat().st_size | |
mime_type, _ = mimetypes.guess_type(file_path) | |
mime_type = mime_type or 'unknown/unknown' | |
file_extension = file_path.suffix.lower() | |
logger.info(f"Processing single file: '{filename}' ({mime_type}, {file_size} bytes)") | |
raw_content: Optional[str] = None | |
extracted_data: Any = None | |
processing_notes: List[str] = [] | |
try: | |
content_bytes = file_path.read_bytes() | |
encoding_detection = chardet.detect(content_bytes) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
raw_content = content_bytes.decode(encoding, errors='replace') | |
is_explicit_json = mime_type == 'application/json' or file_extension == '.json' | |
looks_like_json = raw_content.strip().startswith('{') or raw_content.strip().startswith('[') | |
if is_explicit_json or looks_like_json: | |
try: | |
extracted_data = json.loads(raw_content) | |
processing_notes.append("Parsed as JSON.") | |
if not is_explicit_json: | |
processing_notes.append("Note: Content looked like JSON despite extension/mime.") | |
logger.warning(f"File '{filename}' identified as JSON content despite extension/mime.") | |
mime_type = 'application/json' | |
except json.JSONDecodeError as e: | |
processing_notes.append(f"Failed to parse as JSON: {e}.") | |
if is_explicit_json: | |
logger.error(f"Explicit JSON file '{filename}' has invalid format: {e}") | |
else: | |
logger.warning(f"Content of '{filename}' looks like JSON but failed to parse: {e}") | |
except Exception as e: | |
processing_notes.append(f"Error processing JSON: {e}.") | |
logger.error(f"Error processing JSON in '{filename}': {e}") | |
looks_like_xml = extracted_data is None and raw_content.strip().startswith( | |
'<') and raw_content.strip().endswith('>') | |
is_explicit_xml = extracted_data is None and ( | |
mime_type in ('application/xml', 'text/xml') or mime_type.endswith('+xml') or file_extension in ( | |
'.xml', '.xsd')) | |
if extracted_data is None and (is_explicit_xml or looks_like_xml): | |
try: | |
root = ET.fromstring(raw_content) | |
extracted_data = ET.tostring(root, encoding='unicode', method='xml') | |
processing_notes.append("Parsed as XML (text representation).") | |
if not is_explicit_xml: | |
processing_notes.append("Note: Content looked like XML despite extension/mime.") | |
if 'xml' not in mime_type: mime_type = 'application/xml' | |
except ET.ParseError as e: | |
processing_notes.append(f"Failed to parse as XML: {e}.") | |
if is_explicit_xml: | |
logger.error(f"Explicit XML file '{filename}' has invalid format: {e}") | |
else: | |
logger.warning(f"Content of '{filename}' looks like XML but failed to parse: {e}") | |
except Exception as e: | |
processing_notes.append(f"Error processing XML: {e}.") | |
logger.error(f"Error processing XML in '{filename}': {e}") | |
is_explicit_csv = extracted_data is None and (mime_type == 'text/csv' or file_extension == '.csv') | |
looks_like_csv = extracted_data is None and (',' in raw_content or ';' in raw_content) and ( | |
'\n' in raw_content or len(raw_content.splitlines()) > 1) | |
if extracted_data is None and (is_explicit_csv or looks_like_csv): | |
try: | |
dialect = 'excel' | |
try: | |
sample = '\n'.join(raw_content.splitlines()[:10]) | |
if sample: | |
dialect = csv.Sniffer().sniff(sample).name | |
logger.debug(f"Sniffer detected CSV dialect: {dialect} for '{filename}'") | |
except csv.Error: | |
logger.debug(f"Sniffer failed to detect dialect for '{filename}', using 'excel'.") | |
dialect = 'excel' | |
csv_reader = csv.reader(io.StringIO(raw_content), dialect=dialect) | |
rows = list(csv_reader) | |
if rows: | |
max_rows_preview = 100 | |
extracted_data = { | |
'headers': rows[0] if rows and rows[0] else None, | |
'rows': rows[1:max_rows_preview + 1] if len(rows) > 1 else [] | |
} | |
if len(rows) > max_rows_preview + 1: | |
processing_notes.append(f"CSV data rows truncated to {max_rows_preview}.") | |
processing_notes.append("Parsed as CSV.") | |
if not is_explicit_csv: | |
processing_notes.append("Note: Content looked like CSV despite extension/mime.") | |
mime_type = 'text/csv' | |
else: | |
extracted_data = "Empty CSV" | |
processing_notes.append("Parsed as empty CSV.") | |
if not is_explicit_csv: | |
processing_notes.append("Note: Content looked like CSV but was empty.") | |
except Exception as e: | |
processing_notes.append(f"Failed to parse as CSV: {e}.") | |
logger.warning(f"Failed to parse CSV from '{filename}': {e}") | |
if extracted_data is None: | |
try: | |
extracted_text = None | |
if file_extension == '.pdf' and PDF_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
tmp_file.write(content_bytes) | |
temp_path = Path(tmp_file.name) | |
try: | |
reader = PdfReader(temp_path) | |
text_content = "".join(page.extract_text() or "" for page in reader.pages) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from PDF.") | |
finally: | |
if temp_path.exists(): temp_path.unlink() | |
elif file_extension == '.docx' and DOCX_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as tmp_file: | |
tmp_file.write(content_bytes) | |
temp_path = Path(tmp_file.name) | |
try: | |
document = Document(temp_path) | |
text_content = "\n".join(paragraph.text for paragraph in document.paragraphs) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from DOCX.") | |
finally: | |
if temp_path.exists(): temp_path.unlink() | |
elif file_extension == '.rtf' and RTF_SUPPORT: | |
try: | |
doc = Rtf15Reader.read(io.StringIO(raw_content)) | |
text_content = PlaintextWriter.write(doc).getvalue() | |
extracted_text = text_content | |
processing_notes.append("Extracted text from RTF.") | |
except Exception as e: | |
processing_notes.append(f"RTF extraction error: {e}") | |
logger.warning(f"Failed to extract RTF text from '{filename}': {e}") | |
elif file_extension == '.odt' and ODT_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.odt') as tmp_file: | |
tmp_file.write(content_bytes) | |
temp_path = Path(tmp_file.name) | |
try: | |
text_doc = OpenDocumentText(temp_path) | |
paragraphs = text_doc.getElementsByType(odftext.P) | |
text_content = "\n".join( | |
"".join(node.text for node in p.childNodes) for p in paragraphs) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from ODT.") | |
finally: | |
if temp_path.exists(): temp_path.unlink() | |
elif file_extension in ['.doc', '.ppt', '.pptx', '.xls', '.xlsx']: | |
processing_notes.append( | |
f"Automatic text extraction for {file_extension.upper()} not fully implemented.") | |
logger.warning( | |
f"Automatic text extraction for {file_extension.upper()} not fully implemented for '{filename}'.") | |
if extracted_text is not None: | |
max_extracted_text_size = 10000 | |
extracted_data = {'text': extracted_text[:max_extracted_text_size]} | |
if len(extracted_text) > max_extracted_text_size: | |
extracted_data['text'] += "..." | |
processing_notes.append("Extracted text truncated.") | |
except ImportError as e: | |
processing_notes.append(f"Missing dependency for document type ({e}). Cannot extract text.") | |
except Exception as e: | |
processing_notes.append(f"Error during document text extraction: {e}") | |
logger.warning(f"Error during document text extraction for '{filename}': {e}") | |
if extracted_data is None: | |
extracted_data = {'plain_text': raw_content} | |
processing_notes.append("Stored as plain text.") | |
if mime_type in ['unknown/unknown', 'application/octet-stream']: | |
guessed_text_mime, _ = mimetypes.guess_type('dummy.txt') | |
if guessed_text_mime: mime_type = guessed_text_mime | |
except Exception as e: | |
logger.error(f"Fatal error processing single file '{filename}': {e}") | |
processing_notes.append(f"Fatal processing error: {e}") | |
raw_content = None | |
extracted_data = None | |
entry = { | |
'source': 'file', | |
'filename': filename, | |
'file_size': file_size, | |
'mime_type': mime_type, | |
'created': datetime.fromtimestamp(file_path.stat().st_ctime).isoformat() if file_path.exists() else None, | |
'modified': datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() if file_path.exists() else None, | |
'raw_content': raw_content, | |
'extracted_data': extracted_data, | |
'processing_notes': processing_notes | |
} | |
dataset_entries.append(entry) | |
return dataset_entries | |
def _process_archive(self, archive_path: Path, extract_to: Path) -> List[Dict]: | |
"""Process an archive file with enhanced extraction""" | |
dataset = [] | |
archive_extension = archive_path.suffix.lower() | |
logger.info(f"Processing archive: '{archive_path.name}'") | |
try: | |
if archive_extension == '.zip': | |
if zipfile.is_zipfile(archive_path): | |
with zipfile.ZipFile(archive_path, 'r') as zip_ref: | |
for file_info in zip_ref.infolist(): | |
if file_info.file_size > 0 and not file_info.filename.endswith('/'): | |
sanitized_filename = Path(file_info.filename).name | |
extracted_file_path = extract_to / sanitized_filename | |
try: | |
with zip_ref.open(file_info) as zf, open(extracted_file_path, 'wb') as outfile: | |
outfile.write(zf.read()) | |
if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive( | |
extracted_file_path): | |
dataset.extend(self._process_single_file(extracted_file_path)) | |
elif extracted_file_path.suffix.lower() in self.archive_extensions: | |
logger.info(f"Found nested archive '{file_info.filename}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_file_path, extract_to)) | |
else: | |
logger.debug(f"Skipping unsupported file in archive: '{file_info.filename}'") | |
except Exception as e: | |
logger.warning( | |
f"Error extracting/processing file '{file_info.filename}' from zip '{archive_path.name}': {e}") | |
finally: | |
if extracted_file_path.exists(): | |
try: | |
extracted_file_path.unlink() | |
except OSError as e: | |
logger.warning( | |
f"Failed to clean up extracted file {extracted_file_path}: {e}") | |
else: | |
logger.error(f"'{archive_path.name}' is not a valid zip file.") | |
elif archive_extension in ('.tar', '.gz', '.tgz'): | |
try: | |
mode = 'r' | |
if archive_extension in ('.tar.gz', '.tgz'): mode = 'r:gz' | |
with tarfile.open(archive_path, mode) as tar_ref: | |
for member in tar_ref.getmembers(): | |
if member.isfile(): | |
sanitized_filename = Path(member.name).name | |
extracted_file_path = extract_to / sanitized_filename | |
try: | |
if not str(extracted_file_path).startswith(str(extract_to)): | |
logger.warning(f"Skipping potentially malicious path in tar: {member.name}") | |
continue | |
with tar_ref.extractfile(member) as tf, open(extracted_file_path, | |
'wb') as outfile: | |
if tf: | |
outfile.write(tf.read()) | |
else: | |
logger.warning( | |
f"Could not extract file-like object for {member.name} from tar.") | |
continue | |
if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive( | |
extracted_file_path): | |
dataset.extend(self._process_single_file(extracted_file_path)) | |
elif extracted_file_path.suffix.lower() in self.archive_extensions: | |
logger.info(f"Found nested archive '{member.name}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_file_path, extract_to)) | |
else: | |
logger.debug(f"Skipping unsupported file in archive: '{member.name}'") | |
except Exception as e: | |
logger.warning( | |
f"Error extracting/processing file '{member.name}' from tar '{archive_path.name}': {e}") | |
finally: | |
if extracted_file_path.exists(): | |
try: | |
extracted_file_path.unlink() | |
except OSError as e: | |
logger.warning( | |
f"Failed to clean up extracted file {extracted_file_path}: {e}") | |
except tarfile.TarError as e: | |
logger.error(f"Error processing TAR archive '{archive_path.name}': {e}") | |
elif archive_extension == '.gz': | |
extracted_name = archive_path.stem | |
extracted_path = extract_to / extracted_name | |
try: | |
with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile: | |
outfile.write(gz_file.read()) | |
if extracted_path.suffix.lower() in self.supported_extensions and not self._is_archive( | |
extracted_path): | |
dataset.extend(self._process_single_file(extracted_path)) | |
elif extracted_path.suffix.lower() in self.archive_extensions: | |
logger.info(f"Found nested archive '{extracted_name}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_path, extract_to)) | |
else: | |
logger.debug(f"Skipping unsupported file (from gz): '{extracted_name}'") | |
except gzip.GzipFile as e: | |
logger.error(f"Error processing GZIP file '{archive_path.name}': {e}") | |
except Exception as e: | |
logger.error(f"Error extracting/processing from GZIP '{archive_path.name}': {e}") | |
finally: | |
if extracted_path.exists(): | |
try: | |
extracted_path.unlink() | |
except OSError as e: | |
logger.warning(f"Failed to clean up extracted file {extracted_path}: {e}") | |
elif archive_extension in ('.bz2', '.7z', '.rar'): | |
logger.warning( | |
f"Support for {archive_extension} archives is not yet fully implemented and requires external tools/libraries.") | |
except Exception as e: | |
logger.error(f"Overall archive processing error for '{archive_path.name}': {e}") | |
return dataset | |
def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[str]: | |
""" | |
Enhanced data chunking for QR codes with sequence metadata and start/end tags. | |
max_size is the maximum *byte* capacity for a QR code (e.g., 2953 bytes for Version 40-L). | |
""" | |
try: | |
json_str = json.dumps(data, ensure_ascii=False, separators=(',', ':')) | |
json_bytes = json_str.encode('utf-8') | |
total_bytes_length = len(json_bytes) | |
MAX_OVERHEAD_PER_CHUNK_BYTES = 250 | |
PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY = 2900 | |
effective_payload_bytes_per_chunk = PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY - MAX_OVERHEAD_PER_CHUNK_BYTES | |
if effective_payload_bytes_per_chunk <= 0: | |
logger.error( | |
f"Effective payload size is zero or negative. QR size ({PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY}) is too small for metadata overhead ({MAX_OVERHEAD_PER_CHUNK_BYTES}). Cannot chunk.") | |
return [] | |
num_chunks = math.ceil(total_bytes_length / effective_payload_bytes_per_chunk) if total_bytes_length > 0 else 0 | |
if num_chunks == 0: | |
return [] | |
chunks_for_qr: List[str] = [] | |
current_byte_pos = 0 | |
for i in range(num_chunks): | |
end_byte_pos = min(current_byte_pos + effective_payload_bytes_per_chunk, total_bytes_length) | |
chunk_data_bytes = json_bytes[current_byte_pos:end_byte_pos] | |
chunk_data_str = chunk_data_bytes.decode('utf-8', errors='replace') | |
chunk_dict = { | |
"idx": i + 1, | |
"tc": num_chunks, | |
"tl": total_bytes_length, | |
"hash": hash(chunk_data_bytes) & 0xFFFFFFFF, | |
"data": chunk_data_str | |
} | |
inner_json_string = json.dumps(chunk_dict, ensure_ascii=False, separators=(',', ':')) | |
final_qr_string = f"{{start{i + 1}}}{inner_json_string}{{end{i + 1}}}" | |
encoded_final_qr_string_len = len(final_qr_string.encode('utf-8')) | |
if encoded_final_qr_string_len > PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY: | |
logger.warning( | |
f"Chunk {i + 1} exceeds estimated QR capacity. Actual: {encoded_final_qr_string_len} bytes, Target Max: {PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY} bytes. Consider increasing MAX_OVERHEAD_PER_CHUNK_BYTES further.") | |
chunks_for_qr.append(final_qr_string) | |
current_byte_pos = end_byte_pos | |
if current_byte_pos < total_bytes_length: | |
logger.error(f"Chunking logic error: Only processed {current_byte_pos} of {total_bytes_length} bytes.") | |
return [] | |
logger.info(f"Chunked data into {num_chunks} chunks for QR codes, with positional sequencing tags.") | |
return chunks_for_qr | |
except Exception as e: | |
logger.error(f"Error chunking data: {e}") | |
return [] | |
def generate_stylish_qr(data: Union[str, Dict], | |
filename: str, | |
size: int = 10, | |
border: int = 4, | |
fill_color: str = "#000000", | |
back_color: str = "#FFFFFF") -> str: | |
"""Generate a stylish QR code with enhanced visual appeal""" | |
try: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_M, | |
box_size=size, | |
border=border | |
) | |
if isinstance(data, dict): | |
qr.add_data(json.dumps(data, ensure_ascii=False, separators=(',', ':'))) | |
else: | |
qr.add_data(str(data)) | |
qr.make(fit=True) | |
qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) | |
qr_image = qr_image.convert('RGBA') | |
try: | |
gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) | |
draw = ImageDraw.Draw(gradient) | |
for i in range(qr_image.width): | |
alpha = int(255 * (i / qr_image.width) * 0.05) | |
draw.line([(i, 0), (i, qr_image.height)], fill=(0, 0, 0, alpha)) | |
final_image = Image.alpha_composite(qr_image, gradient) | |
except Exception as e: | |
logger.warning(f"Failed to add gradient overlay to QR code: {e}. Using plain QR.") | |
final_image = qr_image | |
output_path = QR_CODES_DIR / filename | |
final_image.save(output_path, quality=90) | |
return str(output_path) | |
except Exception as e: | |
logger.error(f"QR generation error: {e}") | |
return "" | |
def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]: | |
"""Generate QR codes with enhanced visual appeal and metadata""" | |
if not isinstance(data, (list, dict, str)): | |
logger.error("generate_qr_codes received data that is not a list, dict, or string.") | |
return [] | |
try: | |
file_processor = EnhancedFileProcessor() | |
paths = [] | |
if combined: | |
chunks_of_combined_data = file_processor.chunk_data(data) | |
if not chunks_of_combined_data: | |
logger.warning("No chunks generated for combined data.") | |
return [] | |
for i, chunk_str in enumerate(chunks_of_combined_data): | |
filename = f'combined_qr_{i + 1}_of_{len(chunks_of_combined_data)}_{int(time.time())}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk_str, | |
filename=filename, | |
fill_color="#1a365d", | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
logger.warning(f"Failed to generate QR for combined chunk {i + 1}/{len(chunks_of_combined_data)}.") | |
else: | |
if isinstance(data, list): | |
for idx, item in enumerate(data): | |
item_chunks = file_processor.chunk_data(item) | |
if not item_chunks: | |
logger.warning(f"No chunks generated for item {idx + 1}.") | |
continue | |
for chunk_idx, chunk_str in enumerate(item_chunks): | |
filename = f'item_{idx + 1}_chunk_{chunk_idx + 1}_of_{len(item_chunks)}_{int(time.time())}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk_str, | |
filename=filename, | |
fill_color="#1a365d", | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
logger.warning(f"Failed to generate QR for item {idx + 1} chunk {chunk_idx + 1}/{len(item_chunks)}.") | |
elif isinstance(data, (dict, str)): | |
single_item_chunks = file_processor.chunk_data(data) | |
if not single_item_chunks: | |
logger.warning("No chunks generated for single item.") | |
return [] | |
for chunk_idx, chunk_str in enumerate(single_item_chunks): | |
filename = f'single_item_chunk_{chunk_idx + 1}_of_{len(single_item_chunks)}_{int(time.time())}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk_str, | |
filename=filename, | |
fill_color="#1a365d", | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
logger.warning(f"Failed to generate QR for single item chunk {chunk_idx + 1}/{len(single_item_chunks)}.") | |
else: | |
logger.warning("Data is not a list, dict, or string and cannot be processed individually.") | |
logger.info(f"Generated {len(paths)} QR codes.") | |
return paths | |
except Exception as e: | |
logger.error(f"An unexpected error occurred in generate_qr_codes: {e}") | |
return [] | |
def respond_to_chat( | |
message: str, | |
chat_history: List[Tuple[str, str]], | |
chatbot_data: Optional[List[Dict]], | |
current_filtered_df_state: Optional[pd.DataFrame]) -> Tuple[ | |
List[Tuple[str, str]], List[Dict], Optional[pd.DataFrame]]: | |
""" | |
Responds to user chat messages based on the loaded JSON data. | |
Manages and returns the state of the filtered DataFrame. | |
""" | |
if chatbot_data is None or not chatbot_data: | |
chat_history.append((message, "Please process some data first using the other tabs before chatting.")) | |
return chat_history, chatbot_data, current_filtered_df_state | |
chat_history.append((message, "")) | |
response = "" | |
lower_message = message.lower().strip() | |
new_filtered_df_state = current_filtered_df_state | |
try: | |
flat_data = [] | |
def flatten_item(d, parent_key='', sep='_'): | |
items = [] | |
if isinstance(d, dict): | |
for k, v in d.items(): | |
new_key = parent_key + sep + k if parent_key else k | |
if isinstance(v, (dict, list)): | |
items.extend(flatten_item(v, new_key, sep=sep).items()) | |
else: | |
items.append((new_key, v)) | |
elif isinstance(d, list): | |
for i, elem in enumerate(d): | |
if isinstance(elem, (dict, list)): | |
items.extend( | |
flatten_item(elem, f'{parent_key}_{i}' if parent_key else str(i), sep=sep).items()) | |
else: | |
items.append((f'{parent_key}_{i}' if parent_key else str(i), elem)) | |
return dict(items) | |
for i, item in enumerate(chatbot_data): | |
if isinstance(item, dict): | |
extracted_data_part = item.get('extracted_data') | |
if isinstance(extracted_data_part, (dict, list)): | |
flat_item_data = flatten_item(extracted_data_part, parent_key=f'item_{i}_extracted_data') | |
metadata_part = {k: v for k, v in item.items() if | |
k not in ['extracted_data', 'raw_content', 'linked_extractions']} | |
flat_data.append({**metadata_part, **flat_item_data}) | |
else: | |
flat_data.append({k: v for k, v in item.items() if k != 'raw_content'}) | |
elif isinstance(item, list): | |
flat_data.extend(flatten_item(item, parent_key=f'item_{i}')) | |
else: | |
flat_data.append({f'item_{i}_value': item}) | |
df = None | |
if flat_data: | |
try: | |
df = pd.DataFrame(flat_data) | |
logger.debug(f"Created DataFrame with shape: {df.shape}") | |
logger.debug(f"DataFrame columns: {list(df.columns)}") | |
except Exception as e: | |
logger.warning(f"Could not create pandas DataFrame from processed data: {e}. Falling back to manual processing.") | |
df = None | |
if df is not None: | |
if "what columns are available" in lower_message or "list columns" in lower_message: | |
response = f"The available columns in the data are: {', '.join(df.columns)}" | |
match = re.search(r'describe column (\w+)', lower_message) | |
if match: | |
column_name = match.group(1) | |
if column_name in df.columns: | |
description = df[column_name].describe().to_string() | |
response = f"Description for column '{column_name}':\n```\n{description}\n```" | |
else: | |
response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
match = re.search(r'how many unique values in (\w+)', lower_message) | |
if match: | |
column_name = match.group(1) | |
if column_name in df.columns: | |
unique_count = df[column_name].nunique() | |
response = f"There are {unique_count} unique values in the '{column_name}' column." | |
else: | |
response = f"I couldn't find a column named '{column_name}' in the data. Available columns are: {', '.join(df.columns)}" | |
match = re.search(r'what is the (average|sum|min|max) of (\w+)', lower_message) | |
if match: | |
operation, column_name = match.groups() | |
if column_name in df.columns: | |
try: | |
numeric_col = pd.to_numeric(df[column_name], errors='coerce') | |
numeric_col = numeric_col.dropna() | |
if not numeric_col.empty: | |
if operation == 'average': | |
result = numeric_col.mean() | |
response = f"The average of '{column_name}' is {result:.2f}." | |
elif operation == 'sum': | |
result = numeric_col.sum() | |
response = f"The sum of '{column_name}' is {result:.2f}." | |
elif operation == 'min': | |
result = numeric_col.min() | |
response = f"The minimum of '{column_name}' is {result}." | |
elif operation == 'max': | |
result = numeric_col.max() | |
response = f"The maximum of '{column_name}' is {result}." | |
else: | |
response = "I can calculate average, sum, min, or max." | |
else: | |
response = f"The column '{column_name}' does not contain numeric values that I can analyze." | |
except Exception as e: | |
response = f"An error occurred while calculating the {operation} of '{column_name}': {e}" | |
logger.error(f"Error calculating {operation} for column '{column_name}': {e}") | |
else: | |
response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
filter_match = re.search( | |
r'(?:filter|show items|show me items|find entries|select items|get items)\s+' | |
r'(?:where|by|for|with|if)\s+' | |
r'(\w+)\s+' | |
r'(is|equals?|==|!=|>=?|<=?|contains?|starts with|ends with)\s+' | |
r'([\'"]?[\w\s.-]+[\'"]?)', | |
lower_message | |
) | |
if filter_match: | |
column_name, operator, value_str = filter_match.groups() | |
column_name = column_name.strip() | |
operator = operator.strip().lower() | |
value_str = value_str.strip().strip("'\"") | |
logger.info(f"Filter request: Column='{column_name}', Operator='{operator}', Value='{value_str}'") | |
if column_name not in df.columns: | |
response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
new_filtered_df_state = None | |
else: | |
active_df_to_filter = df.copy() | |
try: | |
target_value: Any | |
col_dtype = df[column_name].dtype | |
df_to_filter = current_filtered_df_state if current_filtered_df_state is not None and not current_filtered_df_state.empty else df.copy() | |
if pd.api.types.is_numeric_dtype(col_dtype) and operator in ['>', '>=', '<', '<=', '==', | |
'!=']: | |
try: | |
target_value = float(value_str) | |
col_series = pd.to_numeric(df_to_filter[column_name], errors='coerce') | |
except ValueError: | |
response = f"For numeric column '{column_name}', '{value_str}' is not a valid number." | |
target_value = None | |
elif pd.api.types.is_bool_dtype(col_dtype) or value_str.lower() in ['true', 'false']: | |
target_value = value_str.lower() == 'true' | |
col_series = df_to_filter[column_name].astype(bool, errors='ignore') | |
else: | |
target_value = str(value_str) | |
col_series = df_to_filter[column_name].astype(str).str.lower() | |
value_str_lower = target_value.lower() | |
if 'response' not in locals(): | |
if operator in ['is', 'equals', '==']: | |
if pd.api.types.is_numeric_dtype(col_dtype) or pd.api.types.is_bool_dtype( | |
col_dtype): | |
condition = col_series == target_value | |
else: | |
condition = col_series == value_str_lower | |
elif operator == '!=': | |
if pd.api.types.is_numeric_dtype(col_dtype) or pd.api.types.is_bool_dtype( | |
col_dtype): | |
condition = col_series != target_value | |
else: | |
condition = col_series != value_str_lower | |
elif operator == '>' and pd.api.types.is_numeric_dtype(col_dtype): | |
condition = col_series > target_value | |
elif operator == '>=' and pd.api.types.is_numeric_dtype(col_dtype): | |
condition = col_series >= target_value | |
elif operator == '<' and pd.api.types.is_numeric_dtype(col_dtype): | |
condition = col_series < target_value | |
elif operator == '<=' and pd.api.types.is_numeric_dtype(col_dtype): | |
condition = col_series <= target_value | |
elif operator in ['contains', 'contain'] and pd.api.types.is_string_dtype(col_series): | |
condition = col_series.str.contains(value_str_lower, case=False, na=False) | |
elif operator == 'starts with' and pd.api.types.is_string_dtype(col_series): | |
condition = col_series.str.startswith(value_str_lower, na=False) | |
elif operator == 'ends with' and pd.api.types.is_string_dtype(col_series): | |
condition = col_series.str.endswith(value_str_lower, na=False) | |
else: | |
response = f"Unsupported operator '{operator}' for column '{column_name}' (type: {col_dtype})." | |
condition = None | |
if response: new_filtered_df_state = None | |
if condition is not None: | |
filtered_results_df = df_to_filter[condition] | |
if not filtered_results_df.empty: | |
new_filtered_df_state = filtered_results_df | |
num_results = len(filtered_results_df) | |
preview_rows = min(num_results, 5) | |
preview_cols = min(len(filtered_results_df.columns), 5) | |
preview_df = filtered_results_df.head(preview_rows).iloc[:, :preview_cols] | |
preview_str = preview_df.to_string(index=False) | |
response = ( | |
f"Found {num_results} items where '{column_name}' {operator} '{value_str}'.\n" | |
f"Here's a preview:\n```\n{preview_str}\n```\n" | |
f"The full filtered dataset is now available for download using the 'Download Filtered JSON' button.") | |
else: | |
new_filtered_df_state = pd.DataFrame() | |
response = f"No items found where '{column_name}' {operator} '{value_str}'." | |
elif not response: | |
response = f"Unsupported operator '{operator}' for column '{column_name}' (type: {col_dtype})." | |
new_filtered_df_state = None | |
except ValueError as ve: | |
response = f"Invalid value '{value_str}' for numeric column '{column_name}'. {ve}" | |
new_filtered_df_state = None | |
logger.warning(f"ValueError during filter: {ve}") | |
except Exception as e: | |
new_filtered_df_state = None | |
response = f"An error occurred while applying the filter: {e}" | |
logger.error( | |
f"Error applying filter (column='{column_name}', op='{operator}', val='{value_str}'): {e}") | |
elif "output as csv" in lower_message or "export as csv" in lower_message: | |
if df is not None and not df.empty: | |
csv_output = df.to_csv(index=False) | |
response = f"Here is the data in CSV format:\n```csv\n{csv_output[:1000]}...\n```\n(Output truncated for chat display)" | |
else: | |
response = "There is no data available to output as CSV." | |
elif "output as json" in lower_message or "export as json" in lower_message: | |
if df is not None and not df.empty: | |
json_output = df.to_json(orient='records', indent=2) | |
response = f"Here is the data in JSON format:\n```json\n{json_output[:1000]}...\n```\n(Output truncated for chat display)" | |
else: | |
response = "There is no data available to output as JSON." | |
if not response: | |
if "how many items" in lower_message or "number of items" in lower_message: | |
if new_filtered_df_state is not None and not new_filtered_df_state.empty: | |
response = f"The currently filtered dataset has {len(new_filtered_df_state)} items. The original dataset has {len(df if df is not None else chatbot_data)} items." | |
elif df is not None: | |
response = f"There are {len(df)} top-level items in the processed data." | |
elif isinstance(chatbot_data, list): | |
response = f"There are {len(chatbot_data)} top-level items in the processed data (not in DataFrame)." | |
elif isinstance(chatbot_data, dict): | |
response = "The processed data is a single dictionary, not a list of items." | |
else: | |
response = "The processed data is not a standard list or dictionary structure." | |
elif "what is the structure" in lower_message or "tell me about the data" in lower_message: | |
if new_filtered_df_state is not None and not new_filtered_df_state.empty: | |
response = f"The filtered data has columns: {', '.join(new_filtered_df_state.columns)}. " | |
if df is not None: | |
response += f"The original data has columns: {', '.join(df.columns)}." | |
else: | |
response += "Original data structure is not tabular." | |
elif df is not None: | |
response = f"The data is a table with {len(df)} rows and columns: {', '.join(df.columns)}." | |
elif isinstance(chatbot_data, list) and chatbot_data: | |
sample_item = chatbot_data[0] | |
response = f"The data is a list containing {len(chatbot_data)} items. The first item has the following top-level keys: {list(sample_item.keys())}." | |
elif isinstance(chatbot_data, dict): | |
response = f"The data is a dictionary with the following top-level keys: {list(chatbot_data.keys())}." | |
else: | |
response = "The processed data is not a standard list or dictionary structure that I can easily describe." | |
elif "show me" in lower_message or "get me" in lower_message or "extract" in lower_message: | |
response = "If you want to filter the data, please use a phrase like 'show me items where column_name is value'. If you want to see the raw data, consider using the download buttons." | |
elif "how can i modify" in lower_message or "how to change" in lower_message or "can i add" in lower_message or "can i remove" in lower_message: | |
response = "I cannot directly modify the data here, but I can tell you how you *could* modify it. What kind of change are you considering (e.g., adding an item, changing a value, removing a field)?" | |
elif "add a field" in lower_message or "add a column" in lower_message: | |
response = "To add a field (or column if the data is tabular), you would typically iterate through each item (or row) in the data and add the new key-value pair. For example, adding a 'status' field with a default value." | |
elif "change a value" in lower_message or "update a field" in lower_message: | |
response = "To change a value, you would need to identify the specific item(s) and the field you want to update. You could use a condition (like filtering) to find the right items and then assign a new value to the field." | |
elif "remove a field" in lower_message or "delete a column" in lower_message: | |
response = "To remove a field, you would iterate through each item and delete the specified key. Be careful, as this is irreversible." | |
elif "restructure" in lower_message or "change the format" in lower_message: | |
response = "Restructuring data involves transforming it into a different shape. This could mean flattening nested objects, grouping items, or pivoting data. This often requires writing custom code to map the old structure to the new one." | |
elif "what if i" in lower_message or "if i changed" in lower_message: | |
response = "Tell me what specific change you're contemplating, and I can speculate on the potential impact or how you might approach it programmatically." | |
elif "hello" in lower_message or "hi" in lower_message: | |
response = random.choice(["Hello! How can I help you understand the processed data?", | |
"Hi there! What's on your mind about this data?", | |
"Hey! Ask me anything about the data you've loaded."]) | |
elif "thank you" in lower_message or "thanks" in lower_message: | |
response = random.choice(["You're welcome!", "Glad I could help.", | |
"No problem! Let me know if you have more questions about the data."]) | |
elif "clear chat" in lower_message: | |
chat_history = [] | |
response = "Chat history cleared." | |
new_filtered_df_state = None | |
elif not response: | |
response = random.choice([ | |
"I can analyze the data you've processed. What would you like to know? Try asking to filter data, e.g., 'show items where status is active'.", | |
"Ask me about the number of items, the structure, or values of specific fields. You can also filter data.", | |
"I can perform basic analysis or filter the data. For example: 'filter by price > 100'.", | |
"Tell me what you want to extract or filter from the data. Use phrases like 'show items where ...'.", | |
"I'm equipped to filter your data. Try 'find entries where name contains widget'." | |
]) | |
except Exception as e: | |
logger.error(f"Chatbot runtime error: {e}") | |
response = f"An internal error occurred while processing your request: {e}" | |
response += "\nPlease try rephrasing your question or clear the chat history." | |
if not response: | |
response = "I'm not sure how to respond to that. Please try rephrasing or ask for help on available commands." | |
if chat_history and chat_history[-1][1] == "": | |
chat_history[-1] = (chat_history[-1][0], response) | |
return chat_history, chatbot_data, new_filtered_df_state | |
def create_qr_zip(qr_paths: List[str]) -> Optional[str]: | |
"""Creates a zip archive from a list of QR code image paths.""" | |
if not qr_paths: | |
logger.warning("Attempted to create a zip archive, but no QR code paths were provided.") | |
return None # Return None to prevent Gradio from attempting a download | |
try: | |
timestamp = int(time.time()) | |
zip_filename = f"qr_code_collection_{timestamp}.zip" | |
zip_filepath = TEMP_DIR / zip_filename | |
with zipfile.ZipFile(zip_filepath, 'w') as zipf: | |
for path_str in qr_paths: | |
path = Path(path_str) | |
if path.exists(): | |
# Use path.name to avoid storing the full directory structure in the zip | |
zipf.write(path, arcname=path.name) | |
else: | |
logger.warning(f"QR code file not found, skipping: {path_str}") | |
logger.info(f"Successfully created QR code zip archive: {zip_filepath}") | |
return str(zip_filepath) | |
except Exception as e: | |
logger.error(f"Failed to create QR code zip archive: {e}") | |
return None | |
# --- Gradio Interface Definition --- | |
def create_modern_interface(): | |
"""Create a modern and visually appealing Gradio interface""" | |
css = """ | |
/* Modern color scheme */ | |
:root { | |
--primary-color: #1a365d; | |
--secondary-color: #2d3748; | |
--accent-color: #4299e1; | |
--background-color: #f7fafc; | |
--success-color: #48bb78; | |
--error-color: #f56565; | |
--warning-color: #ed8936; | |
} | |
/* Component styling */ | |
.input-container { | |
background-color: white; | |
padding: 1.5rem; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
margin-bottom: 1rem; | |
} | |
/* Button styling */ | |
.primary-button { | |
background-color: var(--primary-color); | |
color: white; | |
padding: 0.75rem 1.5rem; | |
border-radius: 0.375rem; | |
border: none; | |
cursor: pointer; | |
transition: all 0.2s; | |
} | |
.primary-button:hover { | |
background-color: var(--accent-color); | |
transform: translateY(-1px); | |
} | |
/* Gallery styling */ | |
.gallery { | |
display: grid; | |
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); | |
gap: 1rem; | |
padding: 1rem; | |
background-color: white; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
} | |
.gallery img { | |
width: 100%; | |
height: auto; | |
border-radius: 0.375rem; | |
transition: transform 0.2s; | |
} | |
.gallery img:hover { | |
transform: scale(1.05); | |
} | |
/* QR Code Viewport Styling */ | |
.viewport-container { | |
display: grid; | |
gap: 0.5rem; | |
padding: 1rem; | |
background-color: white; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
margin-top: 1rem; | |
} | |
.viewport-item { | |
display: flex; | |
flex-direction: column; | |
align-items: center; | |
} | |
.viewport-item img { | |
width: 100%; | |
height: auto; | |
border-radius: 0.375rem; | |
transition: transform 0.2s; | |
max-width: 150px; | |
max-height: 150px; | |
} | |
/* --- NEW: Fullscreen Enhancements --- */ | |
#fullscreen-viewport-wrapper:fullscreen { | |
background-color: var(--background-color) !important; | |
overflow-y: auto; | |
padding: 2rem; | |
} | |
#fullscreen-viewport-wrapper:fullscreen .viewport-container { | |
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); | |
} | |
#fullscreen-viewport-wrapper:fullscreen .viewport-item img { | |
max-width: none; | |
max-height: none; | |
} | |
""" | |
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: | |
interface.head += """ | |
<script> | |
let enabledStates = []; | |
function updateEnabledStates(checkbox) { | |
const index = parseInt(checkbox.dataset.index); | |
if (checkbox.checked) { | |
if (!enabledStates.includes(index)) { | |
enabledStates.push(index); | |
} | |
} else { | |
enabledStates = enabledStates.filter(item => item !== index); | |
} | |
const enabled_qr_codes_component = document.querySelector('[data-component-type="state"][data-state-name="enabled_qr_codes"]'); | |
if (enabled_qr_codes_component) { | |
enabled_qr_codes_component.value = JSON.stringify(enabledStates); | |
enabled_qr_codes_component.dispatchEvent(new Event('input')); | |
} | |
console.log("Enabled QR Code Indices:", enabledStates); | |
} | |
function goFullscreen(elementId) { | |
const elem = document.getElementById(elementId); | |
if (!elem) return; | |
if (elem.requestFullscreen) { | |
elem.requestFullscreen(); | |
} else if (elem.webkitRequestFullscreen) { /* Safari */ | |
elem.webkitRequestFullscreen(); | |
} else if (elem.msRequestFullscreen) { /* IE11 */ | |
elem.msRequestFullscreen(); | |
} | |
} | |
</script> | |
""" | |
qr_code_paths = gr.State([]) | |
chatbot_data = gr.State(None) | |
gr.Markdown(""" | |
# π Advanced Data Processing & QR Code Generator | |
Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. | |
""") | |
with gr.Row(): | |
crawl_depth_slider = gr.Slider( | |
label="Crawl Depth", | |
minimum=0, | |
maximum=10, | |
value=0, | |
step=1, | |
interactive=True, | |
info="Select the maximum depth for crawling links (0-10)." | |
) | |
with gr.Tab("π URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com", | |
value="" | |
) | |
with gr.Tab("π File Input"): | |
file_input = gr.File( | |
label="Upload Files", | |
file_types=None, | |
file_count="multiple" | |
) | |
with gr.Tab("π JSON Input"): | |
text_input = gr.TextArea( | |
label="Direct JSON Input", | |
lines=15, | |
placeholder="Paste your JSON data here...", | |
value="" | |
) | |
with gr.Row(): | |
example_btn = gr.Button("π Load Example", variant="secondary") | |
clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
with gr.Row(): | |
combine_data = gr.Checkbox( | |
label="Combine all data into sequence", | |
value=True, | |
info="Generate sequential QR codes for combined data" | |
) | |
generate_qr_toggle = gr.Checkbox( | |
label="Generate QR Codes", | |
value=False, | |
info="Enable to generate QR codes for the processed data." | |
) | |
process_btn = gr.Button( | |
"π Process & Generate QR", | |
variant="primary" | |
) | |
# --- NEW: Two-Column Output Layout --- | |
with gr.Row(): | |
with gr.Column(scale=1): | |
output_json = gr.JSON(label="Processed Data") | |
with gr.Column(scale=1): | |
output_gallery = gr.Gallery( | |
label="Generated QR Codes", | |
columns=None, | |
height="auto", | |
show_label=True, | |
elem_classes=["gallery"] | |
) | |
download_qrs_btn = gr.Button("β¬οΈ Download All QR Codes as ZIP") | |
qr_zip_output = gr.File(label="Download QR Code ZIP", interactive=False) | |
output_text = gr.Textbox( | |
label="Processing Status", | |
interactive=False, | |
lines=8 | |
) | |
# --- End of New Layout --- | |
with gr.Tab("πΌοΈ QR Code Viewport") as viewport_tab: | |
viewport_output = gr.HTML(label="QR Code Sequence Viewport") | |
enabled_qr_codes = gr.State([]) | |
with gr.Tab("π€ Chat with Data") as chat_tab: | |
chat_history = gr.State([]) | |
chatbot = gr.Chatbot(label="Data Chatbot", height=500) | |
filtered_chatbot_df_state = gr.State(None) # To store the filtered DataFrame | |
with gr.Row(): | |
chat_input = gr.Textbox(label="Your Message", placeholder="Ask me about the processed data...") | |
send_msg_btn = gr.Button("Send") | |
with gr.Row(): | |
download_full_json_btn = gr.Button("Download Full JSON") | |
download_filtered_json_btn = gr.Button("Download Filtered JSON") | |
download_file_output = gr.File(label="Download Data", interactive=False) | |
clear_chat_btn = gr.Button("Clear Chat History") | |
def load_example(): | |
example = { | |
"type": "product_catalog", | |
"items": [ | |
{ | |
"id": "123", | |
"name": "Premium Widget", | |
"description": "High-quality widget with advanced features", | |
"price": 299.99, | |
"category": "electronics", | |
"tags": ["premium", "featured", "new"] | |
}, | |
{ | |
"id": "456", | |
"name": "Basic Widget", | |
"description": "Reliable widget for everyday use", | |
"price": 149.99, | |
"category": "electronics", | |
"tags": ["basic", "popular"] | |
} | |
], | |
"metadata": { | |
"timestamp": datetime.now().isoformat(), | |
"version": "2.0", | |
"source": "example" | |
} | |
} | |
return json.dumps(example, indent=2) | |
def clear_input(): | |
return "", None, "", None | |
def update_viewport(paths, enabled_states): | |
if not paths: | |
return "<p>No QR codes generated yet.</p>" | |
# Wrapper div with an ID for fullscreen targeting | |
html_content = '<div id="fullscreen-viewport-wrapper" style="padding:1rem; border: 1px solid #ddd; border-radius: 0.5rem;">' | |
# Fullscreen button | |
html_content += '<button onclick="goFullscreen(\'fullscreen-viewport-wrapper\')" class="primary-button" style="margin-bottom: 1rem;">View Fullscreen</button>' | |
num_qr_codes = len(paths) | |
cols = math.ceil(math.sqrt(num_qr_codes)) | |
cols = max(1, min(cols, 8)) | |
html_content += f'<div class="viewport-container" style="grid-template-columns: repeat({cols}, 1fr);">' | |
if enabled_states is None or len(enabled_states) != num_qr_codes or not enabled_states: | |
enabled_states = list(range(num_qr_codes)) | |
for i, path in enumerate(paths): | |
is_enabled = i in enabled_states | |
border = "border: 2px solid var(--success-color);" if is_enabled else "border: 2px solid #ccc;" | |
opacity = "opacity: 1.0;" if is_enabled else "opacity: 0.5;" | |
html_content += f'<div class="viewport-item" id="qr_item_{i}">' | |
html_content += f'<img src="/file={path}" style="{border} {opacity}" alt="QR Code {i + 1}">' | |
html_content += f'<label style="font-size: 0.8em; margin-top: 4px;"><input type="checkbox" data-index="{i}" {"checked" if is_enabled else ""} onchange="updateEnabledStates(this)"> Enable</label>' | |
html_content += '</div>' | |
html_content += '</div>' | |
html_content += '</div>' | |
return html_content | |
def process_inputs(urls, files, text, combine, crawl_depth, generate_qr_enabled): | |
"""Process all inputs and generate QR codes based on toggle""" | |
results = [] | |
processing_status_messages = [] | |
url_processor = EnhancedURLProcessor() | |
file_processor = EnhancedFileProcessor() | |
try: | |
if text and text.strip(): | |
try: | |
json_data = json.loads(text) | |
results.append({ | |
'source': 'json_input', | |
'extracted_data': json_data, | |
'timestamp': datetime.now().isoformat(), | |
'processing_notes': ['Parsed from direct JSON input.'] | |
}) | |
processing_status_messages.append("β Successfully parsed direct JSON input.") | |
except json.JSONDecodeError as e: | |
processing_status_messages.append(f"β Invalid JSON format in text input: {str(e)}") | |
except Exception as e: | |
processing_status_messages.append(f"β Error processing direct JSON input: {str(e)}") | |
if urls and urls.strip(): | |
url_list = re.split(r'[,\n]', urls) | |
url_list = [url.strip() for url in url_list if url.strip()] | |
for url in url_list: | |
processing_status_messages.append( | |
f"π Processing URL: {url} with crawl depth {crawl_depth}...") | |
content_result = url_processor.fetch_content_with_depth(url, max_steps=crawl_depth) | |
if content_result: | |
results.append(content_result) | |
if content_result.get('fetch_result') is not None: | |
processing_status_messages.append(f"β Processed URL: {url} (Level 0)") | |
if content_result.get('processing_notes'): | |
processing_status_messages.append( | |
f" Notes: {'; '.join(content_result['processing_notes'])}") | |
if content_result.get('linked_extractions'): | |
num_linked_processed = len([r for r in content_result['linked_extractions'] if | |
r and r.get('fetch_result') is not None]) | |
processing_status_messages.append( | |
f" Found and processed {num_linked_processed}/{len(content_result['linked_extractions'])} direct links.") | |
else: | |
processing_status_messages.append(f"β Failed to process URL: {url}") | |
if content_result.get('processing_notes'): | |
processing_status_messages.append( | |
f" Notes: {'; '.join(content_result['processing_notes'])}") | |
else: | |
processing_status_messages.append( | |
f"β Failed to process URL: {url} (No result returned)") | |
if files: | |
for file in files: | |
processing_status_messages.append(f"π Processing file: {file.name}...") | |
file_results = file_processor.process_file(file) | |
if file_results: | |
results.extend(file_results) | |
processing_status_messages.append(f"β Processed file: {file.name}") | |
for res in file_results: | |
if res.get('processing_notes'): | |
processing_status_messages.append( | |
f" Notes for {res.get('filename', 'item')}: {'; '.join(res['processing_notes'])}") | |
else: | |
processing_status_messages.append(f"β Failed to process file: {file.name}") | |
qr_paths = [] | |
final_json_output = None | |
if results: | |
final_json_output = results | |
if generate_qr_enabled: | |
processing_status_messages.append("βοΈ Generating QR codes as requested...") | |
qr_paths = generate_qr_codes(results, combine) | |
if qr_paths: | |
processing_status_messages.append(f"β Successfully generated {len(qr_paths)} QR codes.") | |
else: | |
processing_status_messages.append( | |
"β Failed to generate QR codes (empty result or error). Check logs for details.)") | |
else: | |
processing_status_messages.append( | |
"βοΈ QR code generation was disabled. Processed data is available.") | |
else: | |
processing_status_messages.append("β οΈ No valid content collected from inputs.") | |
final_json_output = {} | |
except Exception as e: | |
logger.error(f"Overall processing error in process_inputs: {e}") | |
processing_status_messages.append(f"β An unexpected error occurred during processing: {str(e)}") | |
return ( | |
final_json_output, | |
[str(path) for path in qr_paths], | |
"\n".join(processing_status_messages), | |
final_json_output, | |
None | |
) | |
def on_qr_generation(qr_paths_list): | |
num_qrs = len(qr_paths_list) if qr_paths_list is not None else 0 | |
initial_enabled_states = list(range(num_qrs)) | |
return qr_paths_list, initial_enabled_states | |
# Event Handlers | |
example_btn.click(load_example, inputs=[], outputs=text_input) | |
clear_btn.click(clear_input, inputs=[], outputs=[url_input, file_input, text_input, chatbot_data]) | |
process_btn.click( | |
process_inputs, | |
inputs=[url_input, file_input, text_input, combine_data, crawl_depth_slider, generate_qr_toggle], | |
outputs=[output_json, output_gallery, output_text, chatbot_data, qr_zip_output] | |
).then( | |
on_qr_generation, | |
inputs=[output_gallery], | |
outputs=[qr_code_paths, enabled_qr_codes] | |
) | |
download_qrs_btn.click( | |
fn=create_qr_zip, | |
inputs=[qr_code_paths], | |
outputs=[qr_zip_output] | |
) | |
viewport_tab.select(update_viewport, inputs=[qr_code_paths, enabled_qr_codes], outputs=[viewport_output]) | |
send_msg_btn.click( | |
respond_to_chat, | |
inputs=[chat_input, chat_history, chatbot_data, filtered_chatbot_df_state], | |
outputs=[chatbot, chatbot_data, filtered_chatbot_df_state] | |
).then( | |
lambda: "", | |
inputs=None, | |
outputs=chat_input | |
) | |
chat_input.submit( | |
respond_to_chat, | |
inputs=[chat_input, chat_history, chatbot_data, filtered_chatbot_df_state], | |
outputs=[chatbot, chatbot_data, filtered_chatbot_df_state] | |
).then( | |
lambda: "", | |
inputs=None, | |
outputs=chat_input | |
) | |
clear_chat_btn.click( | |
lambda: ([], None, []), | |
inputs=None, | |
outputs=[chatbot, filtered_chatbot_df_state, chat_history] | |
) | |
def download_json_data(data: Optional[Union[pd.DataFrame, List[Dict]]], filename_prefix: str) -> Optional[str]: | |
if data is None: | |
logger.info(f"No data provided for download with prefix '{filename_prefix}'.") | |
return None | |
data_to_dump = None | |
if isinstance(data, pd.DataFrame): | |
if data.empty: | |
logger.info(f"DataFrame for '{filename_prefix}' is empty. Nothing to download.") | |
return None | |
data_to_dump = data.to_dict(orient='records') | |
elif isinstance(data, list): | |
if not data: | |
logger.info(f"List for '{filename_prefix}' is empty. Nothing to download.") | |
return None | |
data_to_dump = data | |
if data_to_dump is None: | |
return None | |
try: | |
json_str = json.dumps(data_to_dump, indent=2, ensure_ascii=False) | |
timestamp = int(time.time()) | |
filename = f"{filename_prefix}_{timestamp}.json" | |
file_path = TEMP_DIR / filename | |
with open(file_path, 'w', encoding='utf-8') as f: | |
f.write(json_str) | |
logger.info(f"Successfully created JSON file for download: {file_path}") | |
return str(file_path) | |
except Exception as e: | |
logger.error(f"Error creating JSON file for {filename_prefix}: {e}") | |
return None | |
def handle_download_full_json(current_chatbot_data_state: Optional[List[Dict]]) -> Optional[str]: | |
if not current_chatbot_data_state: | |
logger.info("No full data available to download.") | |
gr.Warning("No data has been processed yet!") | |
return None | |
return download_json_data(current_chatbot_data_state, "full_data_collection") | |
def handle_download_filtered_json(current_filtered_df_state: Optional[pd.DataFrame]) -> Optional[str]: | |
if current_filtered_df_state is None or current_filtered_df_state.empty: | |
logger.info("No filtered data available to download.") | |
gr.Warning("No filtered data to download. Please filter data in the chat first.") | |
return None | |
return download_json_data(current_filtered_df_state, "filtered_data") | |
download_full_json_btn.click( | |
fn=handle_download_full_json, | |
inputs=[chatbot_data], | |
outputs=[download_file_output] | |
) | |
download_filtered_json_btn.click( | |
fn=handle_download_filtered_json, | |
inputs=[filtered_chatbot_df_state], | |
outputs=[download_file_output] | |
) | |
gr.Markdown(""" | |
### π Features | |
- **Enhanced URL Scraping**: Extracts HTML text, title, meta description, links, and attempts parsing JSON/XML from URLs based on content type. Supports crawling links up to a specified depth. | |
- **Advanced File Processing**: Reads various text-based files, HTML, XML, CSV, and attempts text extraction from common documents (.pdf, .docx, .rtf, .odt). | |
- **Archive Support**: Extracts and processes supported files from .zip, .tar, .gz archives. | |
- **Data Chatbot**: Interact conversationally with the processed JSON data to ask questions, filter, and get insights. | |
- **Sequential QR Codes**: Chunks large data and embeds sequencing info for reconstruction. | |
- **QR Code Viewport**: A dedicated tab with a **fullscreen mode** for viewing the entire QR code collection. | |
- **Bulk Download**: Download all generated QR codes as a single ZIP file. | |
### π‘ Tips | |
1. **Layout**: The output is split into two columns: raw JSON on the left, and the QR Code gallery + status log on the right. This prevents the status log from hiding the QR codes. | |
2. **Fullscreen**: For the best viewing experience of all QR codes, navigate to the **"QR Code Viewport"** tab and click the **"View Fullscreen"** button. | |
3. **Download**: Use the **"Download All QR Codes as ZIP"** button located directly under the QR code gallery to save all images at once. | |
""") | |
return interface | |
def main(): | |
"""Initialize and launch the application""" | |
try: | |
mimetypes.init() | |
interface = create_modern_interface() | |
interface.launch( | |
share=False, | |
debug=False, | |
show_error=True, | |
show_api=False | |
) | |
except Exception as e: | |
logger.error(f"Application startup error: {e}") | |
print(f"\nFatal Error: {e}\nCheck the logs for details.") | |
raise | |
if __name__ == "__main__": | |
main() |