Spaces:
Running
Running
import json | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import zipfile | |
import tempfile | |
import chardet | |
import io | |
import csv | |
import xml.etree.ElementTree as ET | |
from datetime import datetime | |
from typing import List, Dict, Optional, Union, Tuple, Any | |
from pathlib import Path | |
from urllib.parse import urlparse, urljoin | |
import requests | |
import validators | |
import gradio as gr | |
from diskcache import Cache | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode | |
from PIL import Image, ImageDraw, ImageFont | |
import numpy as np | |
import tarfile | |
import gzip | |
import math | |
import random | |
import pandas as pd | |
from requests.adapters import HTTPAdapter | |
from urllib3.util.retry import Retry | |
# Setup enhanced logging with more detailed formatting | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
]) | |
logger = logging.getLogger(__name__) | |
# Conditional imports for document processing | |
try: | |
from PyPDF2 import PdfReader | |
PDF_SUPPORT = True | |
except ImportError: | |
PDF_SUPPORT = False | |
logger.warning("PyPDF2 not installed. PDF file processing will be limited.") | |
try: | |
from docx import Document | |
DOCX_SUPPORT = True | |
except ImportError: | |
DOCX_SUPPORT = False | |
logger.warning("python-docx not installed. DOCX file processing will be limited.") | |
try: | |
from pyth.plugins.plaintext.writer import PlaintextWriter | |
from pyth.plugins.rtf15.reader import Rtf15Reader | |
RTF_SUPPORT = True | |
except ImportError: | |
RTF_SUPPORT = False | |
logger.warning("pyth not installed. RTF file processing will be limited.") | |
try: | |
from odf.opendocument import OpenDocumentText | |
from odf import text as odftext | |
ODT_SUPPORT = True | |
except ImportError: | |
ODT_SUPPORT = False | |
logger.warning("odfpy not installed. ODT file processing will be limited.") | |
# Ensure output directories exist with modern structure | |
OUTPUTS_DIR = Path('output') | |
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' | |
TEMP_DIR = OUTPUTS_DIR / 'temp' | |
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: | |
directory.mkdir(parents=True, exist_ok=True) | |
class EnhancedURLProcessor: | |
"""Advanced URL processing with enhanced content extraction and recursive link following.""" | |
def __init__(self): | |
# Use a real requests session with retry strategy | |
self.session = requests.Session() | |
retry_strategy = Retry( | |
total=3, | |
backoff_factor=1, | |
status_forcelist=[429, 500, 502, 503, 504], | |
allowed_methods=["HEAD", "GET"] | |
) | |
adapter = HTTPAdapter(max_retries=retry_strategy) | |
self.session.mount("http://", adapter) | |
self.session.mount("https://", adapter) | |
self.user_agent = UserAgent() | |
self.timeout = 15 # seconds | |
def validate_url(self, url: str) -> Dict[str, Any]: | |
"""Enhanced URL validation with accessibility check.""" | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'} | |
parsed = urlparse(url) | |
if not all([parsed.scheme, parsed.netloc]): | |
return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} | |
try: | |
# Use a HEAD request to check accessibility without downloading full content | |
headers = {'User-Agent': self.user_agent.random} | |
response = self.session.head(url, timeout=self.timeout, headers=headers, allow_redirects=True) | |
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
# Check content type if available in HEAD response | |
content_type = response.headers.get('Content-Type', '').split(';')[0].strip() | |
if not content_type or not (content_type.startswith('text/') or 'json' in content_type or 'xml' in content_type): | |
# Basic check if content type seems relevant for text extraction | |
logger.warning(f"URL {url} returned potentially irrelevant content type: {content_type}") | |
# Decide if this should invalidate the URL or just add a note | |
# For now, we'll allow fetching but add a note. | |
return { | |
'is_valid': True, | |
'message': 'URL is valid and accessible', | |
'details': { | |
'final_url': response.url, # Capture final URL after redirects | |
'content_type': content_type, | |
'server': response.headers.get('Server', 'N/A'), | |
'size': response.headers.get('Content-Length', 'N/A') | |
} | |
} | |
except requests.exceptions.RequestException as e: | |
return {'is_valid': False, 'message': 'URL not accessible', 'details': str(e)} | |
except Exception as e: | |
logger.error(f"Unexpected error during URL validation for {url}: {e}") | |
return {'is_valid': False, 'message': 'Unexpected validation error', 'details': str(e)} | |
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict[str, Any]]: | |
"""Enhanced content fetcher with retry mechanism and complete character extraction.""" | |
try: | |
logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1})") | |
headers = {'User-Agent': self.user_agent.random} | |
response = self.session.get(url, timeout=self.timeout, headers=headers, allow_redirects=True) | |
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
final_url = response.url # Capture potential redirects | |
content_type = response.headers.get('Content-Type', '').split(';')[0].strip() | |
# Attempt to detect encoding if not specified in headers | |
encoding = response.encoding # requests attempts to guess encoding | |
if encoding is None or encoding == 'ISO-8859-1': # Fallback if requests guess is default/uncertain | |
try: | |
encoding_detection = chardet.detect(response.content) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
logger.debug(f"Chardet detected encoding: {encoding} for {url}") | |
except Exception as e: | |
logger.warning(f"Chardet detection failed for {url}: {e}. Falling back to utf-8.") | |
encoding = 'utf-8' | |
raw_content = response.content.decode(encoding, errors='replace') | |
# Extract metadata | |
metadata = { | |
'original_url': url, | |
'final_url': final_url, | |
'timestamp': datetime.now().isoformat(), | |
'detected_encoding': encoding, | |
'content_type': content_type, | |
'content_length': len(response.content), | |
'headers': dict(response.headers), | |
'status_code': response.status_code | |
} | |
# Process based on content type | |
processed_extraction = self._process_web_content(raw_content, metadata['content_type'], final_url) | |
return { | |
'source': 'url', | |
'url': url, # Keep original URL as identifier for this step | |
'raw_content': raw_content, | |
'metadata': metadata, | |
'extracted_data': processed_extraction['data'], | |
'processing_notes': processed_extraction['notes'] | |
} | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Failed to fetch content from {url}: {e}") | |
return { | |
'source': 'url', | |
'url': url, | |
'raw_content': None, | |
'metadata': {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': getattr(e.response, 'status_code', None)}, | |
'extracted_data': None, | |
'processing_notes': [f"Failed to fetch content: {str(e)}"] | |
} | |
except Exception as e: | |
logger.error(f"Unexpected error while fetching or processing URL {url}: {e}") | |
return { | |
'source': 'url', | |
'url': url, | |
'raw_content': raw_content if 'raw_content' in locals() else None, | |
'metadata': metadata if 'metadata' in locals() else {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': None}, | |
'extracted_data': None, | |
'processing_notes': [f"Unexpected processing error: {str(e)}"] | |
} | |
def _process_web_content(self, content: str, content_type: str, base_url: str) -> Dict[str, Any]: | |
"""Process content based on detected content type""" | |
lower_content_type = content_type.lower() | |
notes = [] | |
extracted_data: Any = None | |
try: | |
if 'text/html' in lower_content_type: | |
logger.debug(f"Processing HTML content from {base_url}") | |
extracted_data = self._process_html_content_enhanced(content, base_url) | |
notes.append("Processed as HTML") | |
elif 'application/json' in lower_content_type or 'text/json' in lower_content_type: | |
logger.debug(f"Processing JSON content from {base_url}") | |
try: | |
extracted_data = json.loads(content) | |
notes.append("Parsed as JSON") | |
except json.JSONDecodeError as e: | |
extracted_data = content | |
notes.append(f"Failed to parse as JSON: {e}") | |
logger.warning(f"Failed to parse JSON from {base_url}: {e}") | |
except Exception as e: | |
extracted_data = content | |
notes.append(f"Error processing JSON: {e}") | |
logger.error(f"Error processing JSON from {base_url}: {e}") | |
elif 'application/xml' in lower_content_type or 'text/xml' in lower_content_type or lower_content_type.endswith('+xml'): | |
logger.debug(f"Processing XML content from {base_url}") | |
try: | |
root = ET.fromstring(content) | |
xml_text = ET.tostring(root, encoding='unicode', method='xml') | |
extracted_data = xml_text | |
notes.append("Parsed as XML (text representation)") | |
except ET.ParseError as e: | |
extracted_data = content | |
notes.append(f"Failed to parse as XML: {e}") | |
logger.warning(f"Failed to parse XML from {base_url}: {e}") | |
except Exception as e: | |
extracted_data = content | |
notes.append(f"Error processing XML: {e}") | |
logger.error(f"Error processing XML from {base_url}: {e}") | |
elif 'text/plain' in lower_content_type or 'text/' in lower_content_type: | |
logger.debug(f"Processing Plain Text content from {base_url}") | |
extracted_data = content | |
notes.append("Processed as Plain Text") | |
else: | |
logger.debug(f"Unknown content type '{content_type}' from {base_url}. Storing raw content.") | |
extracted_data = content | |
notes.append(f"Unknown content type '{content_type}'. Stored raw text.") | |
except Exception as e: | |
logger.error(f"Unexpected error in _process_web_content for {base_url} ({content_type}): {e}") | |
extracted_data = content | |
notes.append(f"Unexpected processing error: {e}. Stored raw text.") | |
return {'data': extracted_data, 'notes': notes} | |
def _process_html_content_enhanced(self, content: str, base_url: str) -> Dict[str, Any]: | |
"""Process HTML content, preserving text, and extracting metadata and links.""" | |
extracted: Dict[str, Any] = { | |
'title': None, | |
'meta_description': None, | |
'full_text': "", | |
'links': [] | |
} | |
try: | |
soup = BeautifulSoup(content, 'html.parser') | |
if soup.title and soup.title.string: | |
extracted['title'] = soup.title.string.strip() | |
meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
if meta_desc and meta_desc.get('content'): | |
extracted['meta_description'] = meta_desc['content'].strip() | |
unique_links = set() | |
for a_tag in soup.find_all('a', href=True): | |
href = a_tag['href'].strip() | |
if href and not href.startswith(('#', 'mailto:', 'tel:', 'javascript:')): | |
text = a_tag.get_text().strip() | |
try: | |
absolute_url = urljoin(base_url, href) | |
if absolute_url not in unique_links: | |
extracted['links'].append({'text': text, 'url': absolute_url}) | |
unique_links.add(absolute_url) | |
except Exception: | |
if validators.url(href) and href not in unique_links: | |
extracted['links'].append({'text': text, 'url': href}) | |
unique_links.add(href) | |
elif urlparse(href).netloc and href not in unique_links: | |
extracted['links'].append({'text': text, 'url': href}) | |
unique_links.add(href) | |
soup_copy = BeautifulSoup(content, 'html.parser') | |
for script_or_style in soup_copy(["script", "style"]): | |
script_or_style.extract() | |
text = soup_copy.get_text(separator='\n') | |
lines = text.splitlines() | |
cleaned_lines = [line.strip() for line in lines if line.strip()] | |
extracted['full_text'] = '\n'.join(cleaned_lines) | |
except Exception as e: | |
logger.error(f"Enhanced HTML processing error for {base_url}: {e}") | |
soup_copy = BeautifulSoup(content, 'html.parser') | |
for script_or_style in soup_copy(["script", "style"]): | |
script_or_style.extract() | |
extracted['full_text'] = soup_copy.get_text(separator='\n').strip() | |
extracted['processing_error'] = f"Enhanced HTML processing failed: {e}" | |
return extracted | |
def fetch_content_with_depth(self, url: str, max_steps: int = 0) -> Dict[str, Any]: | |
"""Fetches content from a URL and recursively follows links up to max_steps depth.""" | |
if not isinstance(max_steps, int) or not (0 <= max_steps <= 10): | |
logger.error(f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 10.") | |
return { | |
'url': url, | |
'level': 0, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'processing_notes': [f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 10."] | |
} | |
validation_result = self.validate_url(url) | |
if not validation_result['is_valid']: | |
logger.error(f"Initial URL validation failed for {url}: {validation_result['message']}") | |
return { | |
'url': url, | |
'level': 0, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'processing_notes': [f"Initial URL validation failed: {validation_result['message']}"] | |
} | |
# Use a set to keep track of visited URLs during the crawl to avoid infinite loops | |
visited_urls = set() | |
return self._fetch_content_recursive(url, max_steps, current_step=0, visited_urls=visited_urls) | |
def _fetch_content_recursive(self, url: str, max_steps: int, current_step: int, visited_urls: set) -> Dict[str, Any]: | |
"""Recursive helper function to fetch content and follow links.""" | |
if current_step > max_steps: | |
logger.debug(f"Depth limit ({max_steps}) reached for {url} at level {current_step}.") | |
return { | |
'url': url, | |
'level': current_step, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'processing_notes': [f"Depth limit ({max_steps}) reached."] | |
} | |
# Normalize URL before checking visited set | |
normalized_url = url.rstrip('/') # Simple normalization | |
if normalized_url in visited_urls: | |
logger.debug(f"Skipping already visited URL: {url} at level {current_step}.") | |
return { | |
'url': url, | |
'level': current_step, | |
'fetch_result': None, # Indicate not fetched in this run | |
'linked_extractions': [], | |
'processing_notes': ["URL already visited in this crawl."] | |
} | |
visited_urls.add(normalized_url) # Mark as visited | |
logger.info(f"Processing URL: {url} at level {current_step}/{max_steps}") | |
fetch_result = self.fetch_content(url) | |
linked_extractions: List[Dict[str, Any]] = [] | |
if fetch_result and fetch_result.get('extracted_data') and 'text/html' in fetch_result.get('metadata', {}).get('content_type', '').lower(): | |
extracted_data = fetch_result['extracted_data'] | |
links = extracted_data.get('links', []) | |
logger.info(f"Found {len(links)} potential links on {url} at level {current_step}. Proceeding to depth {current_step + 1}.") | |
if current_step < max_steps: | |
for link_info in links: | |
linked_url = link_info.get('url') | |
if linked_url: | |
# Ensure linked URL is absolute and potentially within the same domain | |
# Simple same-domain check (can be made more sophisticated) | |
try: | |
base_domain = urlparse(url).netloc | |
linked_domain = urlparse(linked_url).netloc | |
if linked_domain and linked_domain != base_domain: | |
logger.debug(f"Skipping external link: {linked_url}") | |
continue # Skip external links | |
# Recursively call for linked URLs | |
linked_result = self._fetch_content_recursive(linked_url, max_steps, current_step + 1, visited_urls) | |
if linked_result: | |
linked_extractions.append(linked_result) | |
except Exception as e: | |
logger.warning(f"Error processing linked URL {linked_url} from {url}: {e}") | |
current_notes = fetch_result.get('processing_notes', []) if fetch_result else ['Fetch failed.'] | |
if f"Processed at level {current_step}" not in current_notes: | |
current_notes.append(f"Processed at level {current_step}") | |
return { | |
'url': url, | |
'level': current_step, | |
'fetch_result': fetch_result, | |
'linked_extractions': linked_extractions, | |
'processing_notes': current_notes | |
} | |
class EnhancedFileProcessor: | |
"""Advanced file processing with enhanced content extraction""" | |
def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): # 5GB default | |
self.max_file_size = max_file_size | |
self.supported_extensions = { | |
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', | |
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', | |
'.pdf', '.doc', '.docx', '.rtf', '.odt', | |
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar', | |
} | |
self.archive_extensions = {'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar'} | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling and complete extraction""" | |
if not file or not hasattr(file, 'name'): | |
logger.warning("Received invalid file object.") | |
return [] | |
dataset = [] | |
file_path = Path(file.name) | |
if not file_path.exists(): | |
logger.error(f"File path does not exist: {file_path}") | |
return [{ | |
'source': 'file', | |
'filename': file.name if hasattr(file, 'name') else 'unknown', | |
'file_size': None, | |
'extracted_data': None, | |
'processing_notes': ['File path does not exist.'] | |
}] | |
try: | |
file_size = file_path.stat().st_size | |
if file_size > self.max_file_size: | |
logger.warning(f"File '{file_path.name}' size ({file_size} bytes) exceeds maximum allowed size ({self.max_file_size} bytes).") | |
return [{ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'extracted_data': None, | |
'processing_notes': ['File size exceeds limit.'] | |
}] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_dir_path = Path(temp_dir) | |
if file_path.suffix.lower() in self.archive_extensions: | |
dataset.extend(self._process_archive(file_path, temp_dir_path)) | |
elif file_path.suffix.lower() in self.supported_extensions: | |
dataset.extend(self._process_single_file(file_path)) | |
else: | |
logger.warning(f"Unsupported file type for processing: '{file_path.name}'. Attempting to read as plain text.") | |
try: | |
content_bytes = file_path.read_bytes() | |
encoding_detection = chardet.detect(content_bytes) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
raw_content = content_bytes.decode(encoding, errors='replace') | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', | |
'extracted_data': {'plain_text': raw_content}, | |
'processing_notes': ['Processed as plain text (unsupported extension).'] | |
}) | |
except Exception as e: | |
logger.error(f"Error reading or processing unsupported file '{file_path.name}' as text: {e}") | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', | |
'extracted_data': None, | |
'processing_notes': [f'Unsupported file type and failed to read as text: {e}'] | |
}) | |
except Exception as e: | |
logger.error(f"Error processing file '{file_path.name}': {str(e)}") | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size if 'file_size' in locals() else None, | |
'extracted_data': None, | |
'processing_notes': [f'Overall file processing error: {str(e)}'] | |
}) | |
return dataset | |
def _is_archive(self, filepath: Union[str, Path]) -> bool: | |
"""Check if file is an archive""" | |
p = Path(filepath) if isinstance(filepath, str) else filepath | |
return p.suffix.lower() in self.archive_extensions | |
def _process_single_file(self, file_path: Path) -> List[Dict]: | |
"""Process a single file with enhanced character extraction and format-specific handling""" | |
dataset_entries = [] | |
filename = file_path.name | |
file_size = file_path.stat().st_size | |
mime_type, _ = mimetypes.guess_type(file_path) | |
mime_type = mime_type or 'unknown/unknown' | |
file_extension = file_path.suffix.lower() | |
logger.info(f"Processing single file: '{filename}' ({mime_type}, {file_size} bytes)") | |
raw_content: Optional[str] = None | |
extracted_data: Any = None | |
processing_notes: List[str] = [] | |
try: | |
content_bytes = file_path.read_bytes() | |
encoding_detection = chardet.detect(content_bytes) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
raw_content = content_bytes.decode(encoding, errors='replace') | |
is_explicit_json = mime_type == 'application/json' or file_extension == '.json' | |
looks_like_json = raw_content.strip().startswith('{') or raw_content.strip().startswith('[') | |
if is_explicit_json or looks_like_json: | |
try: | |
extracted_data = json.loads(raw_content) | |
processing_notes.append("Parsed as JSON.") | |
if not is_explicit_json: | |
processing_notes.append("Note: Content looked like JSON despite extension/mime.") | |
logger.warning(f"File '{filename}' identified as JSON content despite extension/mime.") | |
mime_type = 'application/json' | |
except json.JSONDecodeError as e: | |
processing_notes.append(f"Failed to parse as JSON: {e}.") | |
if is_explicit_json: | |
logger.error(f"Explicit JSON file '{filename}' has invalid format: {e}") | |
else: | |
logger.warning(f"Content of '{filename}' looks like JSON but failed to parse: {e}") | |
except Exception as e: | |
processing_notes.append(f"Error processing JSON: {e}.") | |
logger.error(f"Error processing JSON in '{filename}': {e}") | |
looks_like_xml = extracted_data is None and raw_content.strip().startswith('<') and raw_content.strip().endswith('>') | |
is_explicit_xml = extracted_data is None and (mime_type in ('application/xml', 'text/xml') or mime_type.endswith('+xml') or file_extension in ('.xml', '.xsd')) | |
if extracted_data is None and (is_explicit_xml or looks_like_xml): | |
try: | |
root = ET.fromstring(raw_content) | |
extracted_data = ET.tostring(root, encoding='unicode', method='xml') | |
processing_notes.append("Parsed as XML (text representation).") | |
if not is_explicit_xml: | |
processing_notes.append("Note: Content looked like XML despite extension/mime.") | |
if 'xml' not in mime_type: mime_type = 'application/xml' | |
except ET.ParseError as e: | |
processing_notes.append(f"Failed to parse as XML: {e}.") | |
if is_explicit_xml: | |
logger.error(f"Explicit XML file '{filename}' has invalid format: {e}") | |
else: | |
logger.warning(f"Content of '{filename}' looks like XML but failed to parse: {e}") | |
except Exception as e: | |
processing_notes.append(f"Error processing XML: {e}.") | |
logger.error(f"Error processing XML in '{filename}': {e}") | |
is_explicit_csv = extracted_data is None and (mime_type == 'text/csv' or file_extension == '.csv') | |
looks_like_csv = extracted_data is None and (',' in raw_content or ';' in raw_content) and ('\n' in raw_content or len(raw_content.splitlines()) > 1) | |
if extracted_data is None and (is_explicit_csv or looks_like_csv): | |
try: | |
dialect = 'excel' | |
try: | |
sample = '\n'.join(raw_content.splitlines()[:10]) | |
if sample: | |
dialect = csv.Sniffer().sniff(sample).name | |
logger.debug(f"Sniffer detected CSV dialect: {dialect} for '{filename}'") | |
except csv.Error: | |
logger.debug(f"Sniffer failed to detect dialect for '{filename}', using 'excel'.") | |
dialect = 'excel' | |
csv_reader = csv.reader(io.StringIO(raw_content), dialect=dialect) | |
rows = list(csv_reader) | |
if rows: | |
max_rows_preview = 100 | |
extracted_data = { | |
'headers': rows[0] if rows and rows[0] else None, | |
'rows': rows[1:max_rows_preview+1] if len(rows) > 1 else [] | |
} | |
if len(rows) > max_rows_preview + 1: | |
processing_notes.append(f"CSV data rows truncated to {max_rows_preview}.") | |
processing_notes.append("Parsed as CSV.") | |
if not is_explicit_csv: | |
processing_notes.append("Note: Content looked like CSV despite extension/mime.") | |
mime_type = 'text/csv' | |
else: | |
extracted_data = "Empty CSV" | |
processing_notes.append("Parsed as empty CSV.") | |
if not is_explicit_csv: | |
processing_notes.append("Note: Content looked like CSV but was empty.") | |
except Exception as e: | |
processing_notes.append(f"Failed to parse as CSV: {e}.") | |
logger.warning(f"Failed to parse CSV from '{filename}': {e}") | |
if extracted_data is None: | |
try: | |
extracted_text = None | |
if file_extension == '.pdf' and PDF_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
tmp_file.write(content_bytes) | |
temp_path = Path(tmp_file.name) | |
try: | |
reader = PdfReader(temp_path) | |
text_content = "".join(page.extract_text() or "" for page in reader.pages) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from PDF.") | |
finally: | |
if temp_path.exists(): temp_path.unlink() | |
elif file_extension == '.docx' and DOCX_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as tmp_file: | |
tmp_file.write(content_bytes) | |
temp_path = Path(tmp_file.name) | |
try: | |
document = Document(temp_path) | |
text_content = "\n".join(paragraph.text for paragraph in document.paragraphs) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from DOCX.") | |
finally: | |
if temp_path.exists(): temp_path.unlink() | |
elif file_extension == '.rtf' and RTF_SUPPORT: | |
try: | |
doc = Rtf15Reader.read(io.StringIO(raw_content)) | |
text_content = PlaintextWriter.write(doc).getvalue() | |
extracted_text = text_content | |
processing_notes.append("Extracted text from RTF.") | |
except Exception as e: | |
processing_notes.append(f"RTF extraction error: {e}") | |
logger.warning(f"Failed to extract RTF text from '{filename}': {e}") | |
elif file_extension == '.odt' and ODT_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.odt') as tmp_file: | |
tmp_file.write(content_bytes) | |
temp_path = Path(tmp_file.name) | |
try: | |
text_doc = OpenDocumentText(temp_path) | |
paragraphs = text_doc.getElementsByType(odftext.P) | |
text_content = "\n".join("".join(node.text for node in p.childNodes) for p in paragraphs) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from ODT.") | |
finally: | |
if temp_path.exists(): temp_path.unlink() | |
elif file_extension in ['.doc', '.ppt', '.pptx', '.xls', '.xlsx']: | |
processing_notes.append(f"Automatic text extraction for {file_extension.upper()} not fully implemented.") | |
logger.warning(f"Automatic text extraction for {file_extension.upper()} not fully implemented for '{filename}'.") | |
if extracted_text is not None: | |
max_extracted_text_size = 10000 | |
extracted_data = {'text': extracted_text[:max_extracted_text_size]} | |
if len(extracted_text) > max_extracted_text_size: | |
extracted_data['text'] += "..." | |
processing_notes.append("Extracted text truncated.") | |
except ImportError as e: | |
processing_notes.append(f"Missing dependency for document type ({e}). Cannot extract text.") | |
except Exception as e: | |
processing_notes.append(f"Error during document text extraction: {e}") | |
logger.warning(f"Error during document text extraction for '{filename}': {e}") | |
if extracted_data is None: | |
extracted_data = {'plain_text': raw_content} | |
processing_notes.append("Stored as plain text.") | |
if mime_type in ['unknown/unknown', 'application/octet-stream']: | |
guessed_text_mime, _ = mimetypes.guess_type('dummy.txt') | |
if guessed_text_mime: mime_type = guessed_text_mime | |
except Exception as e: | |
logger.error(f"Fatal error processing single file '{filename}': {e}") | |
processing_notes.append(f"Fatal processing error: {e}") | |
raw_content = None | |
extracted_data = None | |
entry = { | |
'source': 'file', | |
'filename': filename, | |
'file_size': file_size, | |
'mime_type': mime_type, | |
'created': datetime.fromtimestamp(file_path.stat().st_ctime).isoformat() if file_path.exists() else None, | |
'modified': datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() if file_path.exists() else None, | |
'raw_content': raw_content, | |
'extracted_data': extracted_data, | |
'processing_notes': processing_notes | |
} | |
dataset_entries.append(entry) | |
return dataset_entries | |
def _process_archive(self, archive_path: Path, extract_to: Path) -> List[Dict]: | |
"""Process an archive file with enhanced extraction""" | |
dataset = [] | |
archive_extension = archive_path.suffix.lower() | |
logger.info(f"Processing archive: '{archive_path.name}'") | |
try: | |
if archive_extension == '.zip': | |
if zipfile.is_zipfile(archive_path): | |
with zipfile.ZipFile(archive_path, 'r') as zip_ref: | |
for file_info in zip_ref.infolist(): | |
if file_info.file_size > 0 and not file_info.filename.endswith('/'): | |
sanitized_filename = Path(file_info.filename).name | |
extracted_file_path = extract_to / sanitized_filename | |
try: | |
with zip_ref.open(file_info) as zf, open(extracted_file_path, 'wb') as outfile: | |
outfile.write(zf.read()) | |
if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_file_path): | |
dataset.extend(self._process_single_file(extracted_file_path)) | |
elif extracted_file_path.suffix.lower() in self.archive_extensions: | |
logger.info(f"Found nested archive '{file_info.filename}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_file_path, extract_to)) | |
else: | |
logger.debug(f"Skipping unsupported file in archive: '{file_info.filename}'") | |
except Exception as e: | |
logger.warning(f"Error extracting/processing file '{file_info.filename}' from zip '{archive_path.name}': {e}") | |
finally: | |
if extracted_file_path.exists(): | |
try: | |
extracted_file_path.unlink() | |
except OSError as e: | |
logger.warning(f"Failed to clean up extracted file {extracted_file_path}: {e}") | |
else: | |
logger.error(f"'{archive_path.name}' is not a valid zip file.") | |
elif archive_extension in ('.tar', '.gz', '.tgz'): | |
try: | |
mode = 'r' | |
if archive_extension in ('.tar.gz', '.tgz'): mode = 'r:gz' | |
with tarfile.open(archive_path, mode) as tar_ref: | |
for member in tar_ref.getmembers(): | |
if member.isfile(): | |
sanitized_filename = Path(member.name).name | |
extracted_file_path = extract_to / sanitized_filename | |
try: | |
if not str(extracted_file_path).startswith(str(extract_to)): | |
logger.warning(f"Skipping potentially malicious path in tar: {member.name}") | |
continue | |
with tar_ref.extractfile(member) as tf, open(extracted_file_path, 'wb') as outfile: | |
if tf: | |
outfile.write(tf.read()) | |
else: | |
logger.warning(f"Could not extract file-like object for {member.name} from tar.") | |
continue | |
if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_file_path): | |
dataset.extend(self._process_single_file(extracted_file_path)) | |
elif extracted_file_path.suffix.lower() in self.archive_extensions: | |
logger.info(f"Found nested archive '{member.name}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_file_path, extract_to)) | |
else: | |
logger.debug(f"Skipping unsupported file in archive: '{member.name}'") | |
except Exception as e: | |
logger.warning(f"Error extracting/processing file '{member.name}' from tar '{archive_path.name}': {e}") | |
finally: | |
if extracted_file_path.exists(): | |
try: | |
extracted_file_path.unlink() | |
except OSError as e: | |
logger.warning(f"Failed to clean up extracted file {extracted_file_path}: {e}") | |
except tarfile.TarError as e: | |
logger.error(f"Error processing TAR archive '{archive_path.name}': {e}") | |
elif archive_extension == '.gz': | |
extracted_name = archive_path.stem | |
extracted_path = extract_to / extracted_name | |
try: | |
with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile: | |
outfile.write(gz_file.read()) | |
if extracted_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_path): | |
dataset.extend(self._process_single_file(extracted_path)) | |
elif extracted_path.suffix.lower() in self.archive_extensions: | |
logger.info(f"Found nested archive '{extracted_name}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_path, extract_to)) | |
else: | |
logger.debug(f"Skipping unsupported file (from gz): '{extracted_name}'") | |
except gzip.GzipFile as e: | |
logger.error(f"Error processing GZIP file '{archive_path.name}': {e}") | |
except Exception as e: | |
logger.error(f"Error extracting/processing from GZIP '{archive_path.name}': {e}") | |
finally: | |
if extracted_path.exists(): | |
try: | |
extracted_path.unlink() | |
except OSError as e: | |
logger.warning(f"Failed to clean up extracted file {extracted_path}: {e}") | |
elif archive_extension in ('.bz2', '.7z', '.rar'): | |
logger.warning(f"Support for {archive_extension} archives is not yet fully implemented and requires external tools/libraries.") | |
except Exception as e: | |
logger.error(f"Overall archive processing error for '{archive_path.name}': {e}") | |
return dataset | |
def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[Dict]: | |
"""Enhanced data chunking with sequence metadata""" | |
try: | |
json_str = json.dumps(data, ensure_ascii=False, separators=(',', ':')) | |
total_length = len(json_str) | |
metadata_template = { | |
"idx": 0, | |
"tc": 1, | |
"tl": total_length, | |
"hash": 0, | |
"data": "" | |
} | |
metadata_template_with_hash = {**metadata_template, "hash": 1234567890} | |
overhead_estimate = len(json.dumps(metadata_template_with_hash, separators=(',', ':'))) + 50 | |
effective_chunk_size = max_size - overhead_estimate | |
if effective_chunk_size <= 0: | |
logger.error(f"Max QR size ({max_size}) is too small for metadata overhead ({overhead_estimate}). Cannot chunk.") | |
return [] | |
if total_length <= effective_chunk_size: | |
chunk_data = json_str | |
chunk = { | |
"idx": 0, | |
"tc": 1, | |
"tl": total_length, | |
"hash": hash(chunk_data) & 0xFFFFFFFF, | |
"data": chunk_data | |
} | |
return [chunk] | |
num_chunks = -(-total_length // effective_chunk_size) | |
chunks = [] | |
current_pos = 0 | |
for i in range(num_chunks): | |
end_pos = min(current_pos + effective_chunk_size, total_length) | |
chunk_data_str = json_str[current_pos:end_pos] | |
chunk = { | |
"idx": i, | |
"tc": num_chunks, | |
"tl": total_length, | |
"hash": hash(chunk_data_str) & 0xFFFFFFFF, | |
"data": chunk_data_str | |
} | |
chunks.append(chunk) | |
current_pos = end_pos | |
if current_pos < total_length: | |
logger.error(f"Chunking logic error: Only processed {current_pos} of {total_length} characters.") | |
return [] | |
logger.info(f"Chunked data into {num_chunks} chunks for QR codes.") | |
return chunks | |
except Exception as e: | |
logger.error(f"Error chunking data: {e}") | |
return [] | |
def generate_stylish_qr(data: Union[str, Dict], | |
filename: str, | |
size: int = 10, | |
border: int = 4, | |
fill_color: str = "#000000", | |
back_color: str = "#FFFFFF") -> str: | |
"""Generate a stylish QR code with enhanced visual appeal""" | |
try: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_M, | |
box_size=size, | |
border=border | |
) | |
if isinstance(data, dict): | |
qr.add_data(json.dumps(data, ensure_ascii=False, separators=(',', ':'))) | |
else: | |
qr.add_data(str(data)) | |
qr.make(fit=True) | |
qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) | |
qr_image = qr_image.convert('RGBA') | |
try: | |
gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) | |
draw = ImageDraw.Draw(gradient) | |
for i in range(qr_image.width): | |
alpha = int(255 * (i/qr_image.width) * 0.05) | |
draw.line([(i, 0), (i, qr_image.height)], fill=(0, 0, 0, alpha)) | |
final_image = Image.alpha_composite(qr_image, gradient) | |
except Exception as e: | |
logger.warning(f"Failed to add gradient overlay to QR code: {e}. Using plain QR.") | |
final_image = qr_image | |
output_path = QR_CODES_DIR / filename | |
final_image.save(output_path, quality=90) | |
return str(output_path) | |
except Exception as e: | |
logger.error(f"QR generation error: {e}") | |
return "" | |
def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]: | |
"""Generate QR codes with enhanced visual appeal and metadata""" | |
if not isinstance(data, list): | |
logger.error("generate_qr_codes received data that is not a list.") | |
return [] | |
try: | |
file_processor = EnhancedFileProcessor() | |
paths = [] | |
if combined: | |
chunks = file_processor.chunk_data(data) | |
if not chunks: | |
logger.warning("No chunks generated for combined data.") | |
return [] | |
for i, chunk in enumerate(chunks): | |
filename = f'combined_qr_{int(time.time())}_{i+1}_of_{len(chunks)}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, | |
filename=filename, | |
fill_color="#1a365d", | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
logger.warning(f"Failed to generate QR for chunk {i+1}/{len(chunks)}.") | |
else: | |
if data: | |
for idx, item in enumerate(data): | |
chunks = file_processor.chunk_data(item) | |
if not chunks: | |
logger.warning(f"No chunks generated for item {idx+1}.") | |
continue | |
for chunk_idx, chunk in enumerate(chunks): | |
filename = f'item_{idx+1}_chunk_{chunk_idx+1}_of_{len(chunks)}_{int(time.time())}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, | |
filename=filename, | |
fill_color="#1a365d", | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
logger.warning(f"Failed to generate QR for item {idx+1} chunk {chunk_idx+1}/{len(chunks)}.") | |
else: | |
logger.warning("No items in data list to process individually.") | |
logger.info(f"Generated {len(paths)} QR codes.") | |
return paths | |
except Exception as e: | |
logger.error(f"QR code generation error: {e}") | |
return [] | |
# --- Chatbot Logic --- | |
def respond_to_chat(message: str, chat_history: List[Tuple[str, str]], chatbot_data: Optional[List[Dict]]) -> Tuple[List[Tuple[str, str]], List[Dict]]: | |
"""Responds to user chat messages based on the loaded JSON data.""" | |
if chatbot_data is None or not chatbot_data: | |
chat_history.append((message, "Please process some data first using the other tabs before chatting.")) | |
return chat_history, chatbot_data | |
chat_history.append((message, "")) | |
response = "" | |
lower_message = message.lower().strip() | |
try: | |
# Attempt to flatten the data structure for easier querying | |
flat_data = [] | |
def flatten_item(d, parent_key='', sep='_'): | |
items = [] | |
if isinstance(d, dict): | |
for k, v in d.items(): | |
new_key = parent_key + sep + k if parent_key else k | |
if isinstance(v, (dict, list)): | |
items.extend(flatten_item(v, new_key, sep=sep).items()) | |
else: | |
items.append((new_key, v)) | |
elif isinstance(d, list): | |
for i, elem in enumerate(d): | |
if isinstance(elem, (dict, list)): | |
items.extend(flatten_item(elem, f'{parent_key}_{i}' if parent_key else str(i), sep=sep).items()) | |
else: | |
items.append((f'{parent_key}_{i}' if parent_key else str(i), elem)) # Handle lists of non-dicts | |
# Note: If the top-level chatbot_data is NOT a list of dicts, this flattening might need adjustment. | |
# Assuming chatbot_data is a list of results, where each result is a dict. | |
return dict(items) | |
# Process each top-level item in chatbot_data | |
for i, item in enumerate(chatbot_data): | |
if isinstance(item, dict): | |
# Flatten the 'extracted_data' part if it exists and is a dict/list | |
extracted_data_part = item.get('extracted_data') | |
if isinstance(extracted_data_part, (dict, list)): | |
flat_item_data = flatten_item(extracted_data_part, parent_key=f'item_{i}_extracted_data') | |
# Include some top-level metadata if useful | |
metadata_part = {k: v for k, v in item.items() if k not in ['extracted_data', 'raw_content', 'linked_extractions']} | |
flat_data.append({**metadata_part, **flat_item_data}) | |
else: | |
# If extracted_data is not dict/list, just include top-level keys | |
flat_data.append({k: v for k, v in item.items() if k != 'raw_content'}) # Exclude raw_content | |
elif isinstance(item, list): | |
# If a top-level item is a list itself (less common for single file/URL results but possible), flatten it | |
flat_data.extend(flatten_item(item, parent_key=f'item_{i}')) | |
else: | |
# Handle cases where top-level item is not a dict or list | |
flat_data.append({f'item_{i}_value': item}) | |
df = None | |
if flat_data: | |
try: | |
df = pd.DataFrame(flat_data) | |
logger.debug(f"Created DataFrame with shape: {df.shape}") | |
logger.debug(f"DataFrame columns: {list(df.columns)}") | |
except Exception as e: | |
logger.warning(f"Could not create pandas DataFrame from processed data: {e}. Falling back to manual processing.") | |
df = None | |
# --- Complex Queries and Analysis --- | |
if df is not None: | |
# List available columns | |
if "what columns are available" in lower_message or "list columns" in lower_message: | |
response = f"The available columns in the data are: {', '.join(df.columns)}" | |
# Describe a specific column | |
match = re.search(r'describe column (\w+)', lower_message) | |
if match: | |
column_name = match.group(1) | |
if column_name in df.columns: | |
description = df[column_name].describe().to_string() | |
response = f"Description for column '{column_name}':\n```\n{description}\n```" | |
else: | |
response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
# How many unique values in a column? | |
match = re.search(r'how many unique values in (\w+)', lower_message) | |
if match: | |
column_name = match.group(1) | |
if column_name in df.columns: | |
unique_count = df[column_name].nunique() | |
response = f"There are {unique_count} unique values in the '{column_name}' column." | |
else: | |
response = f"I couldn't find a column named '{column_name}' in the data. Available columns are: {', '.join(df.columns)}" | |
# What is the average/sum/min/max of a numeric column? | |
match = re.search(r'what is the (average|sum|min|max) of (\w+)', lower_message) | |
if match: | |
operation, column_name = match.groups() | |
if column_name in df.columns: | |
try: | |
numeric_col = pd.to_numeric(df[column_name], errors='coerce') | |
numeric_col = numeric_col.dropna() | |
if not numeric_col.empty: | |
if operation == 'average': | |
result = numeric_col.mean() | |
response = f"The average of '{column_name}' is {result:.2f}." | |
elif operation == 'sum': | |
result = numeric_col.sum() | |
response = f"The sum of '{column_name}' is {result:.2f}." | |
elif operation == 'min': | |
result = numeric_col.min() | |
response = f"The minimum of '{column_name}' is {result}." | |
elif operation == 'max': | |
result = numeric_col.max() | |
response = f"The maximum of '{column_name}' is {result}." | |
else: | |
response = "I can calculate average, sum, min, or max." | |
else: | |
response = f"The column '{column_name}' does not contain numeric values that I can analyze." | |
except Exception as e: | |
response = f"An error occurred while calculating the {operation} of '{column_name}': {e}" | |
logger.error(f"Error calculating {operation} for column '{column_name}': {e}") | |
else: | |
response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
# Filter data based on a simple condition | |
match = re.search(r'show me items where (\w+)\s*([<>=!]+)\s*([\w"\']*)', lower_message) # Added quotes to value regex | |
if match: | |
column_name, operator, value_str = match.groups() | |
try: | |
# Attempt to infer value type (numeric, string, boolean) | |
value: Any | |
is_numeric_comparison = False | |
is_boolean_comparison = False | |
if value_str.lower() in ['true', 'false']: | |
value = value_str.lower() == 'true' | |
is_boolean_comparison = True | |
else: | |
try: | |
value = float(value_str.strip("'\"")) | |
is_numeric_comparison = True | |
except ValueError: | |
value = value_str.strip("'\"") | |
if column_name in df.columns: | |
if is_numeric_comparison: | |
numeric_col = pd.to_numeric(df[column_name], errors='coerce') | |
filtered_df = df.loc[pd.notna(numeric_col)] | |
if operator == '>': filtered_results = filtered_df[numeric_col > value] | |
elif operator == '<': filtered_results = filtered_df[numeric_col < value] | |
elif operator == '>=': filtered_results = filtered_df[numeric_col >= value] | |
elif operator == '<=': filtered_results = filtered_df[numeric_col <= value] | |
elif operator == '==': filtered_results = filtered_df[numeric_col == value] | |
elif operator == '!=': filtered_results = filtered_df[numeric_col != value] | |
else: | |
filtered_results = pd.DataFrame() | |
response = f"Unsupported numeric operator: {operator}. Try >, <, >=, <=, ==, !=." | |
if not filtered_results.empty: | |
preview = filtered_results.to_json(orient='records', indent=2)[:500] + "..." if len(filtered_results.to_json()) > 500 else filtered_results.to_json(orient='records', indent=2) | |
response = f"Here are the items where '{column_name}' {operator} {value_str}:\n```json\n{preview}\n```" | |
elif 'response' not in locals(): | |
response = f"No items found where '{column_name}' {operator} {value_str}." | |
elif is_boolean_comparison: | |
# Ensure column is boolean or can be interpreted as boolean | |
boolean_col = df[column_name].astype(bool, errors='ignore') # Coerce errors, might need more robust check | |
if operator == '==': filtered_results = df[boolean_col == value] | |
elif operator == '!=': filtered_results = df[boolean_col != value] | |
else: | |
filtered_results = pd.DataFrame() | |
response = f"Unsupported boolean operator: {operator}. Try == or !=." | |
if not filtered_results.empty: | |
preview = filtered_results.to_json(orient='records', indent=2)[:500] + "..." if len(filtered_results.to_json()) > 500 else filtered_results.to_json(orient='records', indent=2) | |
response = f"Here are the items where '{column_name}' is {value_str}:\n```json\n{preview}\n```" | |
elif 'response' not in locals(): | |
response = f"No items found where '{column_name}' is {value_str}." | |
elif operator == '==': | |
filtered_results = df[df[column_name] == value] | |
if not filtered_results.empty: | |
preview = filtered_results.to_json(orient='records', indent=2)[:500] + "..." if len(filtered_results.to_json()) > 500 else filtered_results.to_json(orient='records', indent=2) | |
response = f"Here are the items where '{column_name}' is '{value}':\n```json\n{preview}\n```" | |
else: | |
response = f"No items found where '{column_name}' is '{value}'." | |
elif operator == '!=': | |
filtered_results = df[df[column_name] != value] | |
if not filtered_results.empty: | |
preview = filtered_results.to_json(orient='records', indent=2)[:500] + "..." if len(filtered_results.to_json()) > 500 else filtered_results.to_json(orient='records', indent=2) | |
response = f"Here are the items where '{column_name}' is not '{value}':\n```json\n{preview}\n```" | |
else: | |
response = f"All items have '{column_name}' as '{value}' or the column doesn't exist." | |
else: | |
response = f"Unsupported operator for string comparison: {operator}. Try == or !=." | |
else: | |
response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
except Exception as e: | |
response = f"An error occurred while filtering data: {e}" | |
logger.error(f"Error filtering data based on condition: {e}") | |
# Request structured output (e.g., as CSV or simplified JSON) | |
if "output as csv" in lower_message or "export as csv" in lower_message: | |
if df is not None and not df.empty: | |
csv_output = df.to_csv(index=False) | |
response = f"Here is the data in CSV format:\n```csv\n{csv_output[:1000]}...\n```\n(Output truncated for chat display)" | |
else: | |
response = "There is no data available to output as CSV." | |
elif "output as json" in lower_message or "export as json" in lower_message: | |
if df is not None and not df.empty: | |
json_output = df.to_json(orient='records', indent=2) | |
response = f"Here is the data in JSON format:\n```json\n{json_output[:1000]}...\n```\n(Output truncated for chat display)" | |
else: | |
response = "There is no data available to output as JSON." | |
# --- General Queries (if no DataFrame or specific query matched) --- | |
if not response: | |
if "how many items" in lower_message or "number of items" in lower_message: | |
if isinstance(chatbot_data, list): | |
response = f"There are {len(chatbot_data)} top-level items in the processed data." | |
elif isinstance(chatbot_data, dict): | |
response = "The processed data is a single dictionary, not a list of items." | |
else: | |
response = "The processed data is not a standard list or dictionary structure." | |
elif "what is the structure" in lower_message or "tell me about the data" in lower_message: | |
if isinstance(chatbot_data, list) and chatbot_data: | |
sample_item = chatbot_data[0] | |
response = f"The data is a list containing {len(chatbot_data)} items. The first item has the following top-level keys: {list(sample_item.keys())}. I can try to tell you more about specific keys if you like." | |
elif isinstance(chatbot_data, dict): | |
response = f"The data is a dictionary with the following top-level keys: {list(chatbot_data.keys())}." | |
else: | |
response = "The processed data is not a standard list or dictionary structure that I can easily describe." | |
elif "show me" in lower_message or "get me" in lower_message or "extract" in lower_message: | |
parts = lower_message.split("show me") | |
if len(parts) > 1: | |
key_request = parts[1].strip().split(" ")[0] | |
extracted_values = [] | |
if isinstance(chatbot_data, list): | |
for item in chatbot_data: | |
if isinstance(item, dict) and key_request in item: | |
extracted_values.append(item[key_request]) | |
elif isinstance(chatbot_data, dict) and key_request in chatbot_data: | |
extracted_values.append(chatbot_data[key_request]) | |
if extracted_values: | |
preview = json.dumps(extracted_values, indent=2)[:500] + "..." if len(json.dumps(extracted_values)) > 500 else json.dumps(extracted_values, indent=2) | |
response = f"Here are the values for '{key_request}':\n```json\n{preview}\n```" | |
else: | |
response = f"I couldn't find a key named '{key_request}' in the top level of the data items." | |
else: | |
response = "What specifically would you like me to show or extract?" | |
# --- Speculation about Modifications --- | |
elif "how can i modify" in lower_message or "how to change" in lower_message or "can i add" in lower_message or "can i remove" in lower_message: | |
response = "I cannot directly modify the data here, but I can tell you how you *could* modify it. What kind of change are you considering (e.g., adding an item, changing a value, removing a field)?" | |
elif "add a field" in lower_message or "add a column" in lower_message: | |
response = "To add a field (or column if the data is tabular), you would typically iterate through each item (or row) in the data and add the new key-value pair. For example, adding a 'status' field with a default value." | |
elif "change a value" in lower_message or "update a field" in lower_message: | |
response = "To change a value, you would need to identify the specific item(s) and the field you want to update. You could use a condition (like filtering) to find the right items and then assign a new value to the field." | |
elif "remove a field" in lower_message or "delete a column" in lower_message: | |
response = "To remove a field, you would iterate through each item and delete the specified key. Be careful, as this is irreversible." | |
elif "restructure" in lower_message or "change the format" in lower_message: | |
response = "Restructuring data involves transforming it into a different shape. This could mean flattening nested objects, grouping items, or pivoting data. This often requires writing custom code to map the old structure to the new one." | |
elif "what if i" in lower_message or "if i changed" in lower_message: | |
response = "Tell me what specific change you're contemplating, and I can speculate on the potential impact or how you might approach it programmatically." | |
# --- General Conversation / Fallback --- | |
elif "hello" in lower_message or "hi" in lower_message: | |
response = random.choice(["Hello! How can I help you understand the processed data?", "Hi there! What's on your mind about this data?", "Hey! Ask me anything about the data you've loaded."]) | |
elif "thank you" in lower_message or "thanks" in lower_message: | |
response = random.choice(["You're welcome!", "Glad I could help.", "No problem! Let me know if you have more questions about the data."]) | |
elif "clear chat" in lower_message: | |
chat_history = [] | |
response = "Chat history cleared." | |
elif not response: | |
response = random.choice([ | |
"I can analyze the data you've processed. What would you like to know?", | |
"Ask me about the number of items, the structure, or values of specific fields.", | |
"I can perform basic analysis like counting unique values or calculating sums/averages if the data is suitable.", | |
"Tell me what you want to extract or filter from the data.", | |
"I'm still learning, but I can try to answer questions about the data structure and content." | |
]) | |
except Exception as e: | |
logger.error(f"Chatbot runtime error: {e}") | |
response = f"An internal error occurred while processing your request: {e}" | |
response += "\nPlease try rephrasing your question or clear the chat history." | |
if chat_history and chat_history[-1][1] == "": | |
chat_history[-1] = (chat_history[-1][0], response) | |
else: | |
chat_history.append(("", response)) | |
return chat_history, chatbot_data | |
# --- Gradio Interface Definition --- | |
def create_modern_interface(): | |
"""Create a modern and visually appealing Gradio interface""" | |
css = """ | |
/* Modern color scheme */ | |
:root { | |
--primary-color: #1a365d; | |
--secondary-color: #2d3748; | |
--accent-color: #4299e1; | |
--background-color: #f7fafc; | |
--success-color: #48bb78; | |
--error-color: #f56565; | |
--warning-color: #ed8936; | |
} | |
/* Container styling */ | |
.container { | |
max-width: 1200px; | |
margin: auto; | |
padding: 2rem; | |
background-color: var(--background-color); | |
border-radius: 1rem; | |
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
} | |
/* Component styling */ | |
.input-container { | |
background-color: white; | |
padding: 1.5rem; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
margin-bottom: 1rem; | |
} | |
/* Button styling */ | |
.primary-button { | |
background-color: var(--primary-color); | |
color: white; | |
padding: 0.75rem 1.5rem; | |
border-radius: 0.375rem; | |
border: none; | |
cursor: pointer; | |
transition: all 0.2s; | |
} | |
.primary-button:hover { | |
background-color: var(--accent-color); | |
transform: translateY(-1px); | |
} | |
/* Status messages */ | |
.status { | |
padding: 1rem; | |
border-radius: 0.375rem; | |
margin: 1rem 0; | |
} | |
.status.success { background-color: #f0fff4; color: var(--success-color); } | |
.status.error { background-color: #fff5f5; color: var(--error-color); } | |
.status.warning { background-color: #fffaf0; color: var(--warning-color); } | |
/* Gallery styling */ | |
.gallery { | |
display: grid; | |
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); | |
gap: 1rem; | |
padding: 1rem; | |
background-color: white; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
} | |
.gallery img { | |
width: 100%; | |
height: auto; | |
border-radius: 0.375rem; | |
transition: transform 0.2s; | |
} | |
.gallery img:hover { | |
transform: scale(1.05); | |
} | |
/* QR Code Viewport Styling */ | |
.viewport-container { | |
display: grid; | |
gap: 0.5rem; | |
padding: 1rem; | |
background-color: white; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
margin-top: 1rem; | |
} | |
.viewport-item { | |
display: flex; | |
flex-direction: column; | |
align-items: center; | |
} | |
.viewport-item img { | |
width: 100%; | |
height: auto; | |
border-radius: 0.375rem; | |
transition: transform 0.2s; | |
max-width: 150px; | |
max-height: 150px; | |
} | |
""" | |
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: | |
interface.head += """ | |
<script> | |
let enabledStates = []; | |
function updateEnabledStates(checkbox) { | |
const index = parseInt(checkbox.dataset.index); | |
if (checkbox.checked) { | |
if (!enabledStates.includes(index)) { | |
enabledStates.push(index); | |
} | |
} else { | |
enabledStates = enabledStates.filter(item => item !== index); | |
} | |
const enabled_qr_codes_component = document.querySelector('[data-component-type="state"][data-state-name="enabled_qr_codes"]'); | |
if (enabled_qr_codes_component) { | |
enabled_qr_codes_component.value = JSON.stringify(enabledStates); | |
enabled_qr_codes_component.dispatchEvent(new Event('input')); | |
} | |
console.log("Enabled QR Code Indices:", enabledStates); | |
} | |
</script> | |
""" | |
with gr.Row(): | |
crawl_depth_slider = gr.Slider( | |
label="Crawl Depth", | |
minimum=0, | |
maximum=10, | |
value=0, | |
step=1, | |
interactive=True, | |
info="Select the maximum depth for crawling links (0-10)." | |
) | |
qr_code_paths = gr.State([]) | |
chatbot_data = gr.State(None) | |
gr.Markdown(""" | |
# π Advanced Data Processing & QR Code Generator | |
Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. | |
""") | |
with gr.Tab("π URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com", | |
value="" | |
) | |
with gr.Tab("π File Input"): | |
file_input = gr.File( | |
label="Upload Files", | |
file_types=None, | |
file_count="multiple" | |
) | |
with gr.Tab("π JSON Input"): | |
text_input = gr.TextArea( | |
label="Direct JSON Input", | |
lines=15, | |
placeholder="Paste your JSON data here...", | |
value="" | |
) | |
with gr.Row(): | |
example_btn = gr.Button("π Load Example", variant="secondary") | |
clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
with gr.Row(): | |
combine_data = gr.Checkbox( | |
label="Combine all data into sequence", | |
value=True, | |
info="Generate sequential QR codes for combined data" | |
) | |
process_btn = gr.Button( | |
"π Process & Generate QR", | |
variant="primary" | |
) | |
output_json = gr.JSON(label="Processed Data") | |
output_gallery = gr.Gallery( | |
label="Generated QR Codes", | |
columns=3, | |
height=400, | |
show_label=True | |
) | |
output_text = gr.Textbox( | |
label="Processing Status", | |
interactive=False | |
) | |
with gr.Tab("πΌοΈ QR Code Viewport") as viewport_tab: | |
viewport_output = gr.HTML(label="QR Code Sequence Viewport") | |
enabled_qr_codes = gr.State([]) | |
with gr.Tab("π€ Chat with Data") as chat_tab: | |
chat_history = gr.State([]) | |
chatbot = gr.Chatbot(label="Data Chatbot") | |
with gr.Row(): | |
chat_input = gr.Textbox(label="Your Message", placeholder="Ask me about the processed data...") | |
send_msg_btn = gr.Button("Send") | |
clear_chat_btn = gr.Button("Clear Chat History") | |
def load_example(): | |
example = { | |
"type": "product_catalog", | |
"items": [ | |
{ | |
"id": "123", | |
"name": "Premium Widget", | |
"description": "High-quality widget with advanced features", | |
"price": 299.99, | |
"category": "electronics", | |
"tags": ["premium", "featured", "new"] | |
}, | |
{ | |
"id": "456", | |
"name": "Basic Widget", | |
"description": "Reliable widget for everyday use", | |
"price": 149.99, | |
"category": "electronics", | |
"tags": ["basic", "popular"] | |
} | |
], | |
"metadata": { | |
"timestamp": datetime.now().isoformat(), | |
"version": "2.0", | |
"source": "example" | |
} | |
} | |
return json.dumps(example, indent=2) | |
def clear_input(): | |
return "", None, "", None | |
def update_viewport(paths, enabled_states): | |
if not paths: | |
return "<p>No QR codes generated yet.</p>" | |
num_qr_codes = len(paths) | |
cols = math.ceil(math.sqrt(num_qr_codes)) | |
cols = max(1, min(cols, 6)) | |
viewport_html = f'<div class="viewport-container" style="grid-template-columns: repeat({cols}, 1fr);">' | |
if enabled_states is None or len(enabled_states) != num_qr_codes: | |
enabled_states = list(range(num_qr_codes)) | |
for i, path in enumerate(paths): | |
is_enabled = i in enabled_states | |
border = "border: 2px solid green;" if is_enabled else "border: 2px solid lightgray;" | |
opacity = "opacity: 1.0;" if is_enabled else "opacity: 0.5;" | |
viewport_html += f'<div class="viewport-item" id="qr_item_{i}">' | |
viewport_html += f'<img src="/file={path}" style="{border} {opacity}" alt="QR Code {i+1}">' | |
viewport_html += f'<label><input type="checkbox" data-index="{i}" {"checked" if is_enabled else ""} onchange="updateEnabledStates(this)"> Enable</label>' | |
viewport_html += '</div>' | |
viewport_html += '</div>' | |
return viewport_html | |
def process_inputs(urls, files, text, combine, crawl_depth): | |
"""Process all inputs and generate QR codes""" | |
results = [] | |
processing_status_messages = [] | |
url_processor = EnhancedURLProcessor() | |
file_processor = EnhancedFileProcessor() | |
try: | |
if text and text.strip(): | |
try: | |
json_data = json.loads(text) | |
results.append({ | |
'source': 'json_input', | |
'extracted_data': json_data, | |
'timestamp': datetime.now().isoformat(), | |
'processing_notes': ['Parsed from direct JSON input.'] | |
}) | |
processing_status_messages.append("β Successfully parsed direct JSON input.") | |
except json.JSONDecodeError as e: | |
processing_status_messages.append(f"β Invalid JSON format in text input: {str(e)}") | |
except Exception as e: | |
processing_status_messages.append(f"β Error processing direct JSON input: {str(e)}") | |
if urls and urls.strip(): | |
url_list = re.split(r'[,\n]', urls) | |
url_list = [url.strip() for url in url_list if url.strip()] | |
for url in url_list: | |
processing_status_messages.append(f"π Processing URL: {url} with crawl depth {crawl_depth}...") | |
content_result = url_processor.fetch_content_with_depth(url, max_steps=crawl_depth) | |
if content_result: # Check if a result dictionary was returned | |
results.append(content_result) | |
if content_result.get('fetch_result') is not None: | |
processing_status_messages.append(f"β Processed URL: {url} (Level 0)") | |
if content_result.get('processing_notes'): | |
processing_status_messages.append(f" Notes: {'; '.join(content_result['processing_notes'])}") | |
if content_result.get('linked_extractions'): | |
num_linked_processed = len([r for r in content_result['linked_extractions'] if r and r.get('fetch_result') is not None]) | |
processing_status_messages.append(f" Found and processed {num_linked_processed}/{len(content_result['linked_extractions'])} direct links.") | |
else: | |
processing_status_messages.append(f"β Failed to process URL: {url}") | |
if content_result.get('processing_notes'): | |
processing_status_messages.append(f" Notes: {'; '.join(content_result['processing_notes'])}") | |
else: | |
processing_status_messages.append(f"β Failed to process URL: {url} (No result returned)") | |
if files: | |
for file in files: | |
processing_status_messages.append(f"π Processing file: {file.name}...") | |
file_results = file_processor.process_file(file) | |
if file_results: | |
results.extend(file_results) | |
processing_status_messages.append(f"β Processed file: {file.name}") | |
for res in file_results: | |
if res.get('processing_notes'): | |
processing_status_messages.append(f" Notes for {res.get('filename', 'item')}: {'; '.join(res['processing_notes'])}") | |
else: | |
processing_status_messages.append(f"β Failed to process file: {file.name}") | |
qr_paths = [] | |
final_json_output = None | |
if results: | |
qr_paths = generate_qr_codes(results, combine) | |
final_json_output = results | |
if qr_paths: | |
processing_status_messages.append(f"β Successfully generated {len(qr_paths)} QR codes.") | |
else: | |
processing_status_messages.append("β Failed to generate QR codes.") | |
else: | |
processing_status_messages.append("β οΈ No valid content collected from inputs.") | |
except Exception as e: | |
logger.error(f"Overall processing error in process_inputs: {e}") | |
processing_status_messages.append(f"β An unexpected error occurred during processing: {str(e)}") | |
return ( | |
final_json_output, | |
[str(path) for path in qr_paths], | |
"\n".join(processing_status_messages), | |
final_json_output | |
) | |
def on_qr_generation(qr_paths_list): | |
if qr_paths_list is None: | |
num_qrs = 0 | |
else: | |
num_qrs = len(qr_paths_list) | |
initial_enabled_states = list(range(num_qrs)) | |
return qr_paths_list, initial_enabled_states | |
example_btn.click(load_example, inputs=[], outputs=text_input) | |
clear_btn.click(clear_input, inputs=[], outputs=[url_input, file_input, text_input, chatbot_data]) | |
process_btn.click( | |
process_inputs, | |
inputs=[url_input, file_input, text_input, combine_data, crawl_depth_slider], | |
outputs=[output_json, output_gallery, output_text, chatbot_data] | |
).then( | |
on_qr_generation, | |
inputs=[output_gallery], | |
outputs=[qr_code_paths, enabled_qr_codes] | |
) | |
viewport_tab.select(update_viewport, inputs=[qr_code_paths, enabled_qr_codes], outputs=[viewport_output]) | |
send_msg_btn.click( | |
respond_to_chat, | |
inputs=[chat_input, chat_history, chatbot_data], | |
outputs=[chatbot, chatbot_data] | |
).then( | |
lambda: "", | |
inputs=None, | |
outputs=chat_input | |
) | |
chat_input.submit( | |
respond_to_chat, | |
inputs=[chat_input, chat_history, chatbot_data], | |
outputs=[chatbot, chatbot_data] | |
).then( | |
lambda: "", | |
inputs=None, | |
outputs=chat_input | |
) | |
clear_chat_btn.click( | |
lambda: [], | |
inputs=None, | |
outputs=chatbot | |
) | |
gr.Markdown(""" | |
### π Features | |
- **Enhanced URL Scraping**: Extracts HTML text, title, meta description, links, and attempts parsing JSON/XML from URLs based on content type. Supports crawling links up to a specified depth. **(Now performs real fetching)** | |
- **Advanced File Processing**: Reads various text-based files (.txt, .md, .log etc.), HTML, XML, CSV, and attempts text extraction from common documents (.pdf, .docx, .rtf, .odt - *requires extra dependencies*). **(Now performs real file processing)** | |
- **Smart JSON Handling**: Parses valid JSON from direct input, files (.json or content), or URLs. | |
- **Archive Support**: Extracts and processes supported files from .zip, .tar, .gz archives. **(Now performs real extraction)** | |
- **Robust Encoding Detection**: Uses `chardet` for reliable character encoding identification. | |
- **Structured Output**: Provides a consistent JSON output format containing raw content (if applicable), extracted data, and processing notes for each processed item. | |
- **Sequential QR Codes**: Maintains data integrity across multiple codes by chunking the combined/individual processed data. | |
- **QR Code Viewport**: Visualize generated QR codes in a sequenced square grid with options to enable/disable individual codes for selective scanning/sharing. | |
- **Modern Design**: Clean, responsive interface with visual feedback. | |
- **Data Chatbot**: Interact conversationally with the processed JSON data to ask questions about its structure, content, or request specific information. | |
### π‘ Tips | |
1. **URLs**: Enter multiple URLs separated by commas or newlines. The processor will attempt to fetch and structure the content based on its type, following links up to the specified **Crawl Depth**. | |
2. **Files**: Upload any type of file. The processor will attempt to handle supported text-based files, archives (.zip, .tar, .gz), and specific document/structured formats. | |
3. **JSON**: Use the "Direct JSON Input" tab for pasting JSON data. The system also tries to detect JSON content in file uploads and URLs. Use the "Load Example" button to see a sample JSON structure. | |
4. **Dependencies**: Processing PDF, DOCX, RTF, and ODT files requires installing optional Python libraries (`PyPDF2`, `python-docx`, `pyth`, `odfpy`). Check the console logs for warnings if a library is missing. | |
5. **QR Codes**: Choose whether to "Combine all data into sequence" or generate separate sequences for each input item. | |
6. **Processing**: Monitor the "Processing Status" box for real-time updates and notes about errors or processing steps. | |
7. **Output**: The "Processed Data" JSON box shows the structured data extracted from your inputs. The "Generated QR Codes" gallery shows the QR code images. | |
8. **Chatbot**: After processing data, go to the "Chat with Data" tab to ask questions about the JSON output. | |
### βοΈ QR Code Viewport Instructions | |
1. Navigate to the **QR Code Viewport** tab after generating QR codes. | |
2. The generated QR codes will be displayed in a grid based on their total count. | |
3. Use the checkboxes below each QR code to enable or disable it for visual selection. Enabled codes have a green border and full opacity. | |
4. This viewport is currently for visualization and selection *within the UI*; it doesn't change the generated files themselves. You would manually select which physical QR codes to scan based on this view. | |
""") | |
return interface | |
def main(): | |
"""Initialize and launch the application""" | |
try: | |
mimetypes.init() | |
interface = create_modern_interface() | |
interface.launch( | |
share=False, | |
debug=False, | |
show_error=True, | |
show_api=False | |
) | |
except Exception as e: | |
logger.error(f"Application startup error: {e}") | |
print(f"\nFatal Error: {e}\nCheck the logs for details.") | |
raise | |
if __name__ == "__main__": | |
main() | |