Spaces:
Running
Running
import json | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import zipfile | |
import tempfile | |
import chardet | |
from datetime import datetime | |
from typing import List, Dict, Optional, Union, Tuple | |
from pathlib import Path | |
from urllib.parse import urlparse, urljoin | |
import requests | |
import validators | |
import gradio as gr | |
from diskcache import Cache | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode | |
from PIL import Image, ImageDraw, ImageFont | |
import numpy as np | |
# Setup enhanced logging with more detailed formatting | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Ensure output directories exist with modern structure | |
OUTPUTS_DIR = Path('output') | |
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' | |
TEMP_DIR = OUTPUTS_DIR / 'temp' | |
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: | |
directory.mkdir(parents=True, exist_ok=True) | |
class EnhancedURLProcessor: | |
"""Advanced URL processing with complete content extraction""" | |
def __init__(self): | |
self.session = requests.Session() | |
self.timeout = 15 # Extended timeout for larger content | |
self.max_retries = 3 | |
self.user_agent = UserAgent() | |
# Enhanced headers for better site compatibility | |
self.session.headers.update({ | |
'User-Agent': self.user_agent.random, | |
'Accept': '*/*', # Accept all content types | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
'Sec-Fetch-Dest': 'document', | |
'Sec-Fetch-Mode': 'navigate', | |
'Sec-Fetch-Site': 'none', | |
'Sec-Fetch-User': '?1', | |
'DNT': '1' | |
}) | |
def validate_url(self, url: str) -> Dict: | |
"""Enhanced URL validation with detailed feedback""" | |
try: | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'} | |
parsed = urlparse(url) | |
if not all([parsed.scheme, parsed.netloc]): | |
return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} | |
# Try HEAD request first to check accessibility | |
try: | |
head_response = self.session.head(url, timeout=5) | |
head_response.raise_for_status() | |
except requests.exceptions.RequestException: | |
# If HEAD fails, try GET as some servers don't support HEAD | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'is_valid': True, | |
'message': 'URL is valid and accessible', | |
'details': { | |
'content_type': head_response.headers.get('Content-Type', 'unknown'), | |
'server': head_response.headers.get('Server', 'unknown'), | |
'size': head_response.headers.get('Content-Length', 'unknown') | |
} | |
} | |
except Exception as e: | |
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}', 'details': str(e)} | |
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict]: | |
"""Enhanced content fetcher with retry mechanism and complete character extraction""" | |
try: | |
logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})") | |
# Update User-Agent randomly for each request | |
self.session.headers.update({'User-Agent': self.user_agent.random}) | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
# Detect encoding | |
if response.encoding is None: | |
encoding = chardet.detect(response.content)['encoding'] or 'utf-8' | |
else: | |
encoding = response.encoding | |
# Decode content with fallback | |
try: | |
raw_content = response.content.decode(encoding, errors='replace') | |
except (UnicodeDecodeError, LookupError): | |
raw_content = response.content.decode('utf-8', errors='replace') | |
# Extract metadata | |
metadata = { | |
'url': url, | |
'timestamp': datetime.now().isoformat(), | |
'encoding': encoding, | |
'content_type': response.headers.get('Content-Type', ''), | |
'content_length': len(response.content), | |
'headers': dict(response.headers), | |
'status_code': response.status_code | |
} | |
# Process based on content type | |
content_type = response.headers.get('Content-Type', '').lower() | |
if 'text/html' in content_type: | |
processed_content = self._process_html_content(raw_content, url) | |
else: | |
processed_content = raw_content | |
return { | |
'content': processed_content, | |
'raw_content': raw_content, | |
'metadata': metadata | |
} | |
except requests.exceptions.RequestException as e: | |
if retry_count < self.max_retries - 1: | |
logger.warning(f"Retry {retry_count + 1}/{self.max_retries} for URL: {url}") | |
time.sleep(2 ** retry_count) # Exponential backoff | |
return self.fetch_content(url, retry_count + 1) | |
logger.error(f"Failed to fetch content after {self.max_retries} attempts: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Unexpected error while fetching content: {e}") | |
return None | |
def _process_html_content(self, content: str, base_url: str) -> str: | |
"""Process HTML content while preserving all characters""" | |
try: | |
soup = BeautifulSoup(content, 'html.parser') | |
# Convert relative URLs to absolute | |
for tag in soup.find_all(['a', 'img', 'link', 'script']): | |
for attr in ['href', 'src']: | |
if tag.get(attr): | |
try: | |
tag[attr] = urljoin(base_url, tag[attr]) | |
except Exception: | |
pass | |
# Extract all text content | |
text_parts = [] | |
for element in soup.stripped_strings: | |
text_parts.append(str(element)) | |
return '\n'.join(text_parts) | |
except Exception as e: | |
logger.error(f"HTML processing error: {e}") | |
return content | |
class EnhancedFileProcessor: | |
"""Advanced file processing with complete content extraction""" | |
def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024 * 1024 *1024): | |
self.max_file_size = max_file_size | |
self.supported_extensions = { | |
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.html', | |
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', | |
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar', | |
'.pdf', '.doc', '.docx', '.rtf', '.odt' | |
} | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling and complete extraction""" | |
if not file: | |
return [] | |
dataset = [] | |
try: | |
file_size = os.path.getsize(file.name) | |
if file_size > self.max_file_size: | |
logger.warning(f"File size ({{file_size}} bytes) exceeds maximum allowed size") | |
return [] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_dir_path = Path(temp_dir) | |
# Handle different archive types | |
if self._is_archive(file.name): | |
dataset.extend(self._process_archive(file.name, temp_dir_path)) | |
else: | |
dataset.extend(self._process_single_file(file)) | |
except Exception as e: | |
logger.error(f"Error processing file: {{str(e)}}") | |
return [] | |
return dataset | |
def _is_archive(self, filepath: str) -> bool: | |
"""Check if file is an archive""" | |
return any(filepath.lower().endswith(ext) for ext in [ | |
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar' | |
]) | |
def _process_single_file(self, file) -> List[Dict]: | |
"""Process a single file with enhanced character extraction""" | |
try: | |
file_stat = os.stat(file.name) | |
file_size = file_stat.st_size | |
# Initialize content storage | |
content_parts = [] | |
# Process file in chunks for large files | |
chunk_size = 10 * 1024 * 1024 # 10MB chunks | |
with open(file.name, 'rb') as f: | |
while True: | |
chunk = f.read(chunk_size) | |
if not chunk: | |
break | |
# Detect encoding for each chunk | |
encoding = chardet.detect(chunk)['encoding'] or 'utf-8' | |
try: | |
decoded_chunk = chunk.decode(encoding, errors='replace') | |
content_parts.append(decoded_chunk) | |
except (UnicodeDecodeError, LookupError): | |
decoded_chunk = chunk.decode('utf-8', errors='replace') | |
content_parts.append(decoded_chunk) | |
# Combine all chunks | |
complete_content = ''.join(content_parts) | |
return [{ | |
'source': 'file', | |
'filename': os.path.basename(file.name), | |
'file_size': file_size, | |
'mime_type': mimetypes.guess_type(file.name)[0], | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': complete_content, | |
'timestamp': datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"File processing error: {{e}}") | |
return [] | |
def _process_archive(self, archive_path: str, extract_to: Path) -> List[Dict]: | |
"""Process an archive file with enhanced extraction""" | |
dataset = [] | |
try: | |
if zipfile.is_zipfile(archive_path): | |
with zipfile.ZipFile(archive_path, 'r') as zip_ref: | |
zip_ref.extractall(extract_to) | |
for file_info in zip_ref.infolist(): | |
if file_info.file_size > 0 and not file_info.filename.endswith('/'): | |
extracted_path = extract_to / file_info.filename | |
if extracted_path.suffix.lower() in self.supported_extensions: | |
with open(extracted_path, 'rb') as f: | |
dataset.extend(self._process_single_file(f)) | |
elif tarfile.is_tarfile(archive_path): | |
with tarfile.open(archive_path, 'r') as tar_ref: | |
tar_ref.extractall(extract_to) | |
for member in tar_ref.getmembers(): | |
if member.isfile(): | |
extracted_path = extract_to / member.name | |
if extracted_path.suffix.lower() in self.supported_extensions: | |
with open(extracted_path, 'rb') as f: | |
dataset.extend(self._process_single_file(f)) | |
except Exception as e: | |
logger.error(f"Archive processing error: {e}") | |
return dataset | |
def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[Dict]: | |
try: | |
# Convert data to JSON bytes | |
json_str = json.dumps(data, ensure_ascii=False) | |
json_bytes = json_str.encode('utf-8') | |
total_length = len(json_bytes) | |
# Calculate metadata overhead in bytes | |
metadata_template = { | |
"chunk_index": 0, | |
"total_chunks": 1, | |
"total_length": total_length, | |
"chunk_hash": "", | |
"data": "" | |
} | |
overhead_bytes = len(json.dumps(metadata_template).encode('utf-8')) + 20 # Add padding | |
effective_chunk_size = max_size - overhead_bytes | |
if effective_chunk_size <= 0: | |
raise ValueError("Max size is too small after accounting for metadata overhead") | |
chunks = [] | |
start = 0 | |
while start < total_length: | |
end = start + effective_chunk_size | |
# Ensure valid Unicode by decoding | |
chunk_str = json_bytes[start:end].decode('utf-8', errors='replace') | |
chunk = { | |
"chunk_index": len(chunks), | |
"total_chunks": -1, # To be set later | |
"total_length": total_length, | |
"chunk_hash": hash(chunk_str) & 0xFFFFFFFF, | |
"data": chunk_str | |
} | |
chunks.append(chunk) | |
start = end | |
# Update total_chunks in each chunk | |
for i, chunk in enumerate(chunks): | |
chunk["total_chunks"] = len(chunks) | |
return chunks | |
except Exception as e: | |
logger.error(f"Error chunking data: {{e}}") | |
return [] | |
# Calculate number of chunks needed | |
num_chunks = -(-total_length // effective_chunk_size) # Ceiling division | |
chunk_size = -(-total_length // num_chunks) # Even distribution | |
chunks = [] | |
for i in range(num_chunks): | |
start_idx = i * chunk_size | |
end_idx = min(start_idx + chunk_size, total_length) | |
chunk_data = json_str[start_idx:end_idx] | |
chunk = { | |
"chunk_index": i, | |
"total_chunks": num_chunks, | |
"total_length": total_length, | |
"chunk_hash": hash(chunk_data) & 0xFFFFFFFF, | |
"data": chunk_data | |
} | |
chunks.append(chunk) | |
return chunks | |
except Exception as e: | |
logger.error(f"Error chunking data: {{e}}") | |
return [] | |
def generate_stylish_qr(data: Union[str, Dict], | |
filename: str, | |
size: int = 10, | |
border: int = 4, | |
fill_color: str = "#000000", | |
back_color: str = "#FFFFFF") -> str: | |
"""Generate a stylish QR code with enhanced visual appeal""" | |
try: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=size, | |
border=border | |
) | |
# Add data to QR code | |
if isinstance(data, dict): | |
qr.add_data(json.dumps(data, ensure_ascii=False)) | |
else: | |
qr.add_data(data) | |
qr.make(fit=True) | |
# Create QR code image with custom colors | |
qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) | |
# Convert to RGBA for transparency support | |
qr_image = qr_image.convert('RGBA') | |
# Add subtle gradient overlay | |
gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) | |
draw = ImageDraw.Draw(gradient) | |
for i in range(qr_image.width): | |
alpha = int(255 * (1 - i/qr_image.width) * 0.1) # 10% maximum opacity | |
draw.line([(i, 0), (i, qr_image.height)], fill=(255, 255, 255, alpha)) | |
# Combine images | |
final_image = Image.alpha_composite(qr_image, gradient) | |
# Save the image | |
output_path = QR_CODES_DIR / filename | |
final_image.save(output_path, quality=95) | |
return str(output_path) | |
except Exception as e: | |
logger.error(f"QR generation error: {e}") | |
return "" | |
def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]: | |
"""Generate QR codes with enhanced visual appeal and metadata""" | |
try: | |
file_processor = EnhancedFileProcessor() | |
paths = [] | |
if combined: | |
# Process combined data | |
chunks = file_processor.chunk_data(data) | |
for i, chunk in enumerate(chunks): | |
filename = f'combined_qr_{int(time.time())}_{i+1}_of_{len(chunks)}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, | |
filename=filename, | |
fill_color="#1a365d", # Deep blue | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
# Process individual items | |
if isinstance(data, list): | |
for idx, item in enumerate(data): | |
chunks = file_processor.chunk_data(item) | |
for chunk_idx, chunk in enumerate(chunks): | |
filename = f'item_{idx+1}_chunk_{chunk_idx+1}_of_{len(chunks)}_{int(time.time())}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, | |
filename=filename, | |
fill_color="#1a365d", # Deep blue | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
chunks = file_processor.chunk_data(data) | |
for i, chunk in enumerate(chunks): | |
filename = f'single_qr_{i+1}_of_{len(chunks)}_{int(time.time())}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, | |
filename=filename, | |
fill_color="#1a365d", # Deep blue | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
return paths | |
except Exception as e: | |
logger.error(f"QR code generation error: {e}") | |
return [] | |
def create_modern_interface(): | |
"""Create a modern and visually appealing Gradio interface""" | |
# Modern CSS styling | |
css = """ | |
/* Modern color scheme */ | |
:root { | |
--primary-color: #1a365d; | |
--secondary-color: #2d3748; | |
--accent-color: #4299e1; | |
--background-color: #f7fafc; | |
--success-color: #48bb78; | |
--error-color: #f56565; | |
--warning-color: #ed8936; | |
} | |
/* Container styling */ | |
.container { | |
max-width: 1200px; | |
margin: auto; | |
padding: 2rem; | |
background-color: var(--background-color); | |
border-radius: 1rem; | |
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
} | |
/* Component styling */ | |
.input-container { | |
background-color: white; | |
padding: 1.5rem; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
margin-bottom: 1rem; | |
} | |
/* Button styling */ | |
.primary-button { | |
background-color: var(--primary-color); | |
color: white; | |
padding: 0.75rem 1.5rem; | |
border-radius: 0.375rem; | |
border: none; | |
cursor: pointer; | |
transition: all 0.2s; | |
} | |
.primary-button:hover { | |
background-color: var(--accent-color); | |
transform: translateY(-1px); | |
} | |
/* Status messages */ | |
.status { | |
padding: 1rem; | |
border-radius: 0.375rem; | |
margin: 1rem 0; | |
} | |
.status.success { background-color: #f0fff4; color: var(--success-color); } | |
.status.error { background-color: #fff5f5; color: var(--error-color); } | |
.status.warning { background-color: #fffaf0; color: var(--warning-color); } | |
/* Gallery styling */ | |
.gallery { | |
display: grid; | |
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); | |
gap: 1rem; | |
padding: 1rem; | |
background-color: white; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
} | |
.gallery img { | |
width: 100%; | |
height: auto; | |
border-radius: 0.375rem; | |
transition: transform 0.2s; | |
} | |
.gallery img:hover { | |
transform: scale(1.05); | |
} | |
""" | |
# Create interface with modern design | |
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: | |
gr.Markdown(""" | |
# π Advanced Data Processing & QR Code Generator | |
Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. | |
""") | |
with gr.Tab("π URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com", | |
value="" | |
) | |
with gr.Tab("π File Input"): | |
file_input = gr.File( | |
label="Upload Files", | |
file_types=["*"], # Allow all file types | |
file_count="multiple" | |
) | |
with gr.Tab("π JSON Input"): | |
text_input = gr.TextArea( | |
label="Direct JSON Input", | |
lines=15, | |
placeholder="Paste your JSON data here...", | |
value="" | |
) | |
with gr.Row(): | |
example_btn = gr.Button("π Load Example", variant="secondary") | |
clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
with gr.Row(): | |
combine_data = gr.Checkbox( | |
label="Combine all data into sequence", | |
value=True, | |
info="Generate sequential QR codes for combined data" | |
) | |
process_btn = gr.Button( | |
"π Process & Generate QR", | |
variant="primary" | |
) | |
# Output components | |
output_json = gr.JSON(label="Processed Data") | |
output_gallery = gr.Gallery( | |
label="Generated QR Codes", | |
columns=3, | |
height=400, | |
show_label=True | |
) | |
output_text = gr.Textbox( | |
label="Processing Status", | |
interactive=False | |
) | |
# Load example data | |
def load_example(): | |
example = { | |
"type": "product_catalog", | |
"items": [ | |
{ | |
"id": "123", | |
"name": "Premium Widget", | |
"description": "High-quality widget with advanced features", | |
"price": 299.99, | |
"category": "electronics", | |
"tags": ["premium", "featured", "new"] | |
}, | |
{ | |
"id": "456", | |
"name": "Basic Widget", | |
"description": "Reliable widget for everyday use", | |
"price": 149.99, | |
"category": "electronics", | |
"tags": ["basic", "popular"] | |
} | |
], | |
"metadata": { | |
"timestamp": datetime.now().isoformat(), | |
"version": "2.0", | |
"source": "example" | |
} | |
} | |
return json.dumps(example, indent=2) | |
def clear_input(): | |
return "" | |
def process_inputs(urls, files, text, combine): | |
"""Process all inputs and generate QR codes""" | |
try: | |
results = [] | |
url_processor = EnhancedURLProcessor() | |
file_processor = EnhancedFileProcessor() | |
# Process JSON input | |
if text and text.strip(): | |
try: | |
json_data = json.loads(text) | |
if isinstance(json_data, list): | |
results.extend(json_data) | |
else: | |
results.append(json_data) | |
except json.JSONDecodeError as e: | |
return None, [], f"β Invalid JSON format: {str(e)}" | |
# Process URLs | |
if urls and urls.strip(): | |
url_list = re.split(r'[,\n]', urls) | |
url_list = [url.strip() for url in url_list if url.strip()] | |
for url in url_list: | |
validation = url_processor.validate_url(url) | |
if validation['is_valid']: | |
content = url_processor.fetch_content(url) | |
if content: | |
results.append({ | |
'source': 'url', | |
'url': url, | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}) | |
# Process files | |
if files: | |
for file in files: | |
file_results = file_processor.process_file(file) | |
if file_results: | |
results.extend(file_results) | |
# Generate QR codes | |
if results: | |
qr_paths = generate_qr_codes(results, combine) | |
if qr_paths: | |
return ( | |
results, | |
[str(path) for path in qr_paths], | |
f"β Successfully processed {len(results)} items and generated {len(qr_paths)} QR codes!" | |
) | |
else: | |
return None, [], "β Failed to generate QR codes" | |
else: | |
return None, [], "β οΈ No valid content to process" | |
except Exception as e: | |
logger.error(f"Processing error: {e}") | |
return None, [], f"β Error: {str(e)}" | |
# Set up event handlers | |
example_btn.click(load_example, outputs=[text_input]) | |
clear_btn.click(clear_input, outputs=[text_input]) | |
process_btn.click( | |
process_inputs, | |
inputs=[url_input, file_input, text_input, combine_data], | |
outputs=[output_json, output_gallery, output_text] | |
) | |
# Add helpful documentation | |
gr.Markdown(""" | |
### π Features | |
- **Complete URL Scraping**: Extracts every character from web pages | |
- **Advanced File Processing**: Full content extraction from text files and archives | |
- **Smart JSON Handling**: Processes any size JSON with automatic chunking | |
- **Sequential QR Codes**: Maintains data integrity across multiple codes | |
- **Modern Design**: Clean, responsive interface with visual feedback | |
### π‘ Tips | |
1. **URLs**: Enter multiple URLs separated by commas or newlines | |
2. **Files**: Upload text files or ZIP archives containing text files | |
3. **JSON**: Use the example button to see the expected format | |
4. **QR Codes**: Choose whether to combine data into sequential codes | |
5. **Processing**: Monitor the status for real-time feedback | |
### π¨ Output | |
- Generated QR codes are saved in the `output/qr_codes` directory | |
- Each QR code contains metadata for proper sequencing | |
- Hover over QR codes in the gallery to see details | |
""") | |
return interface | |
def main(): | |
"""Initialize and launch the application""" | |
try: | |
# Configure system settings | |
mimetypes.init() | |
# Create and launch interface | |
interface = create_modern_interface() | |
# Launch with configuration | |
interface.launch( | |
share=False, | |
debug=False, | |
show_error=True, | |
show_api=False | |
) | |
except Exception as e: | |
logger.error(f"Application startup error: {e}") | |
raise | |
if __name__ == "__main__": | |
main() |