urld / app.py
acecalisto3's picture
Update app.py
5909e94 verified
import json
import os
import re
import time
import logging
import mimetypes
import zipfile
import tempfile
import chardet
from datetime import datetime
from typing import List, Dict, Optional, Union, Tuple
from pathlib import Path
from urllib.parse import urlparse, urljoin
import requests
import validators
import gradio as gr
from diskcache import Cache
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from cleantext import clean
import qrcode
from PIL import Image, ImageDraw, ImageFont
import numpy as np
# Setup enhanced logging with more detailed formatting
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log', encoding='utf-8')
]
)
logger = logging.getLogger(__name__)
# Ensure output directories exist with modern structure
OUTPUTS_DIR = Path('output')
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes'
TEMP_DIR = OUTPUTS_DIR / 'temp'
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]:
directory.mkdir(parents=True, exist_ok=True)
class EnhancedURLProcessor:
"""Advanced URL processing with complete content extraction"""
def __init__(self):
self.session = requests.Session()
self.timeout = 15 # Extended timeout for larger content
self.max_retries = 3
self.user_agent = UserAgent()
# Enhanced headers for better site compatibility
self.session.headers.update({
'User-Agent': self.user_agent.random,
'Accept': '*/*', # Accept all content types
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'DNT': '1'
})
def validate_url(self, url: str) -> Dict:
"""Enhanced URL validation with detailed feedback"""
try:
if not validators.url(url):
return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'}
parsed = urlparse(url)
if not all([parsed.scheme, parsed.netloc]):
return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'}
# Try HEAD request first to check accessibility
try:
head_response = self.session.head(url, timeout=5)
head_response.raise_for_status()
except requests.exceptions.RequestException:
# If HEAD fails, try GET as some servers don't support HEAD
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
return {
'is_valid': True,
'message': 'URL is valid and accessible',
'details': {
'content_type': head_response.headers.get('Content-Type', 'unknown'),
'server': head_response.headers.get('Server', 'unknown'),
'size': head_response.headers.get('Content-Length', 'unknown')
}
}
except Exception as e:
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}', 'details': str(e)}
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict]:
"""Enhanced content fetcher with retry mechanism and complete character extraction"""
try:
logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})")
# Update User-Agent randomly for each request
self.session.headers.update({'User-Agent': self.user_agent.random})
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
# Detect encoding
if response.encoding is None:
encoding = chardet.detect(response.content)['encoding'] or 'utf-8'
else:
encoding = response.encoding
# Decode content with fallback
try:
raw_content = response.content.decode(encoding, errors='replace')
except (UnicodeDecodeError, LookupError):
raw_content = response.content.decode('utf-8', errors='replace')
# Extract metadata
metadata = {
'url': url,
'timestamp': datetime.now().isoformat(),
'encoding': encoding,
'content_type': response.headers.get('Content-Type', ''),
'content_length': len(response.content),
'headers': dict(response.headers),
'status_code': response.status_code
}
# Process based on content type
content_type = response.headers.get('Content-Type', '').lower()
if 'text/html' in content_type:
processed_content = self._process_html_content(raw_content, url)
else:
processed_content = raw_content
return {
'content': processed_content,
'raw_content': raw_content,
'metadata': metadata
}
except requests.exceptions.RequestException as e:
if retry_count < self.max_retries - 1:
logger.warning(f"Retry {retry_count + 1}/{self.max_retries} for URL: {url}")
time.sleep(2 ** retry_count) # Exponential backoff
return self.fetch_content(url, retry_count + 1)
logger.error(f"Failed to fetch content after {self.max_retries} attempts: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error while fetching content: {e}")
return None
def _process_html_content(self, content: str, base_url: str) -> str:
"""Process HTML content while preserving all characters"""
try:
soup = BeautifulSoup(content, 'html.parser')
# Convert relative URLs to absolute
for tag in soup.find_all(['a', 'img', 'link', 'script']):
for attr in ['href', 'src']:
if tag.get(attr):
try:
tag[attr] = urljoin(base_url, tag[attr])
except Exception:
pass
# Extract all text content
text_parts = []
for element in soup.stripped_strings:
text_parts.append(str(element))
return '\n'.join(text_parts)
except Exception as e:
logger.error(f"HTML processing error: {e}")
return content
class EnhancedFileProcessor:
"""Advanced file processing with complete content extraction"""
def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024 * 1024 *1024):
self.max_file_size = max_file_size
self.supported_extensions = {
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.html',
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg',
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar',
'.pdf', '.doc', '.docx', '.rtf', '.odt'
}
def process_file(self, file) -> List[Dict]:
"""Process uploaded file with enhanced error handling and complete extraction"""
if not file:
return []
dataset = []
try:
file_size = os.path.getsize(file.name)
if file_size > self.max_file_size:
logger.warning(f"File size ({{file_size}} bytes) exceeds maximum allowed size")
return []
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
# Handle different archive types
if self._is_archive(file.name):
dataset.extend(self._process_archive(file.name, temp_dir_path))
else:
dataset.extend(self._process_single_file(file))
except Exception as e:
logger.error(f"Error processing file: {{str(e)}}")
return []
return dataset
def _is_archive(self, filepath: str) -> bool:
"""Check if file is an archive"""
return any(filepath.lower().endswith(ext) for ext in [
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar'
])
def _process_single_file(self, file) -> List[Dict]:
"""Process a single file with enhanced character extraction"""
try:
file_stat = os.stat(file.name)
file_size = file_stat.st_size
# Initialize content storage
content_parts = []
# Process file in chunks for large files
chunk_size = 10 * 1024 * 1024 # 10MB chunks
with open(file.name, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
# Detect encoding for each chunk
encoding = chardet.detect(chunk)['encoding'] or 'utf-8'
try:
decoded_chunk = chunk.decode(encoding, errors='replace')
content_parts.append(decoded_chunk)
except (UnicodeDecodeError, LookupError):
decoded_chunk = chunk.decode('utf-8', errors='replace')
content_parts.append(decoded_chunk)
# Combine all chunks
complete_content = ''.join(content_parts)
return [{
'source': 'file',
'filename': os.path.basename(file.name),
'file_size': file_size,
'mime_type': mimetypes.guess_type(file.name)[0],
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
'content': complete_content,
'timestamp': datetime.now().isoformat()
}]
except Exception as e:
logger.error(f"File processing error: {{e}}")
return []
def _process_archive(self, archive_path: str, extract_to: Path) -> List[Dict]:
"""Process an archive file with enhanced extraction"""
dataset = []
try:
if zipfile.is_zipfile(archive_path):
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
for file_info in zip_ref.infolist():
if file_info.file_size > 0 and not file_info.filename.endswith('/'):
extracted_path = extract_to / file_info.filename
if extracted_path.suffix.lower() in self.supported_extensions:
with open(extracted_path, 'rb') as f:
dataset.extend(self._process_single_file(f))
elif tarfile.is_tarfile(archive_path):
with tarfile.open(archive_path, 'r') as tar_ref:
tar_ref.extractall(extract_to)
for member in tar_ref.getmembers():
if member.isfile():
extracted_path = extract_to / member.name
if extracted_path.suffix.lower() in self.supported_extensions:
with open(extracted_path, 'rb') as f:
dataset.extend(self._process_single_file(f))
except Exception as e:
logger.error(f"Archive processing error: {e}")
return dataset
def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[Dict]:
try:
# Convert data to JSON bytes
json_str = json.dumps(data, ensure_ascii=False)
json_bytes = json_str.encode('utf-8')
total_length = len(json_bytes)
# Calculate metadata overhead in bytes
metadata_template = {
"chunk_index": 0,
"total_chunks": 1,
"total_length": total_length,
"chunk_hash": "",
"data": ""
}
overhead_bytes = len(json.dumps(metadata_template).encode('utf-8')) + 20 # Add padding
effective_chunk_size = max_size - overhead_bytes
if effective_chunk_size <= 0:
raise ValueError("Max size is too small after accounting for metadata overhead")
chunks = []
start = 0
while start < total_length:
end = start + effective_chunk_size
# Ensure valid Unicode by decoding
chunk_str = json_bytes[start:end].decode('utf-8', errors='replace')
chunk = {
"chunk_index": len(chunks),
"total_chunks": -1, # To be set later
"total_length": total_length,
"chunk_hash": hash(chunk_str) & 0xFFFFFFFF,
"data": chunk_str
}
chunks.append(chunk)
start = end
# Update total_chunks in each chunk
for i, chunk in enumerate(chunks):
chunk["total_chunks"] = len(chunks)
return chunks
except Exception as e:
logger.error(f"Error chunking data: {{e}}")
return []
# Calculate number of chunks needed
num_chunks = -(-total_length // effective_chunk_size) # Ceiling division
chunk_size = -(-total_length // num_chunks) # Even distribution
chunks = []
for i in range(num_chunks):
start_idx = i * chunk_size
end_idx = min(start_idx + chunk_size, total_length)
chunk_data = json_str[start_idx:end_idx]
chunk = {
"chunk_index": i,
"total_chunks": num_chunks,
"total_length": total_length,
"chunk_hash": hash(chunk_data) & 0xFFFFFFFF,
"data": chunk_data
}
chunks.append(chunk)
return chunks
except Exception as e:
logger.error(f"Error chunking data: {{e}}")
return []
def generate_stylish_qr(data: Union[str, Dict],
filename: str,
size: int = 10,
border: int = 4,
fill_color: str = "#000000",
back_color: str = "#FFFFFF") -> str:
"""Generate a stylish QR code with enhanced visual appeal"""
try:
qr = qrcode.QRCode(
version=None,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=size,
border=border
)
# Add data to QR code
if isinstance(data, dict):
qr.add_data(json.dumps(data, ensure_ascii=False))
else:
qr.add_data(data)
qr.make(fit=True)
# Create QR code image with custom colors
qr_image = qr.make_image(fill_color=fill_color, back_color=back_color)
# Convert to RGBA for transparency support
qr_image = qr_image.convert('RGBA')
# Add subtle gradient overlay
gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(gradient)
for i in range(qr_image.width):
alpha = int(255 * (1 - i/qr_image.width) * 0.1) # 10% maximum opacity
draw.line([(i, 0), (i, qr_image.height)], fill=(255, 255, 255, alpha))
# Combine images
final_image = Image.alpha_composite(qr_image, gradient)
# Save the image
output_path = QR_CODES_DIR / filename
final_image.save(output_path, quality=95)
return str(output_path)
except Exception as e:
logger.error(f"QR generation error: {e}")
return ""
def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]:
"""Generate QR codes with enhanced visual appeal and metadata"""
try:
file_processor = EnhancedFileProcessor()
paths = []
if combined:
# Process combined data
chunks = file_processor.chunk_data(data)
for i, chunk in enumerate(chunks):
filename = f'combined_qr_{int(time.time())}_{i+1}_of_{len(chunks)}.png'
qr_path = generate_stylish_qr(
data=chunk,
filename=filename,
fill_color="#1a365d", # Deep blue
back_color="#ffffff"
)
if qr_path:
paths.append(qr_path)
else:
# Process individual items
if isinstance(data, list):
for idx, item in enumerate(data):
chunks = file_processor.chunk_data(item)
for chunk_idx, chunk in enumerate(chunks):
filename = f'item_{idx+1}_chunk_{chunk_idx+1}_of_{len(chunks)}_{int(time.time())}.png'
qr_path = generate_stylish_qr(
data=chunk,
filename=filename,
fill_color="#1a365d", # Deep blue
back_color="#ffffff"
)
if qr_path:
paths.append(qr_path)
else:
chunks = file_processor.chunk_data(data)
for i, chunk in enumerate(chunks):
filename = f'single_qr_{i+1}_of_{len(chunks)}_{int(time.time())}.png'
qr_path = generate_stylish_qr(
data=chunk,
filename=filename,
fill_color="#1a365d", # Deep blue
back_color="#ffffff"
)
if qr_path:
paths.append(qr_path)
return paths
except Exception as e:
logger.error(f"QR code generation error: {e}")
return []
def create_modern_interface():
"""Create a modern and visually appealing Gradio interface"""
# Modern CSS styling
css = """
/* Modern color scheme */
:root {
--primary-color: #1a365d;
--secondary-color: #2d3748;
--accent-color: #4299e1;
--background-color: #f7fafc;
--success-color: #48bb78;
--error-color: #f56565;
--warning-color: #ed8936;
}
/* Container styling */
.container {
max-width: 1200px;
margin: auto;
padding: 2rem;
background-color: var(--background-color);
border-radius: 1rem;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
}
/* Component styling */
.input-container {
background-color: white;
padding: 1.5rem;
border-radius: 0.5rem;
border: 1px solid #e2e8f0;
margin-bottom: 1rem;
}
/* Button styling */
.primary-button {
background-color: var(--primary-color);
color: white;
padding: 0.75rem 1.5rem;
border-radius: 0.375rem;
border: none;
cursor: pointer;
transition: all 0.2s;
}
.primary-button:hover {
background-color: var(--accent-color);
transform: translateY(-1px);
}
/* Status messages */
.status {
padding: 1rem;
border-radius: 0.375rem;
margin: 1rem 0;
}
.status.success { background-color: #f0fff4; color: var(--success-color); }
.status.error { background-color: #fff5f5; color: var(--error-color); }
.status.warning { background-color: #fffaf0; color: var(--warning-color); }
/* Gallery styling */
.gallery {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 1rem;
padding: 1rem;
background-color: white;
border-radius: 0.5rem;
border: 1px solid #e2e8f0;
}
.gallery img {
width: 100%;
height: auto;
border-radius: 0.375rem;
transition: transform 0.2s;
}
.gallery img:hover {
transform: scale(1.05);
}
"""
# Create interface with modern design
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface:
gr.Markdown("""
# 🌐 Advanced Data Processing & QR Code Generator
Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor.
""")
with gr.Tab("πŸ“ URL Processing"):
url_input = gr.Textbox(
label="Enter URLs (comma or newline separated)",
lines=5,
placeholder="https://example1.com\nhttps://example2.com",
value=""
)
with gr.Tab("πŸ“ File Input"):
file_input = gr.File(
label="Upload Files",
file_types=["*"], # Allow all file types
file_count="multiple"
)
with gr.Tab("πŸ“‹ JSON Input"):
text_input = gr.TextArea(
label="Direct JSON Input",
lines=15,
placeholder="Paste your JSON data here...",
value=""
)
with gr.Row():
example_btn = gr.Button("πŸ“ Load Example", variant="secondary")
clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Row():
combine_data = gr.Checkbox(
label="Combine all data into sequence",
value=True,
info="Generate sequential QR codes for combined data"
)
process_btn = gr.Button(
"πŸ”„ Process & Generate QR",
variant="primary"
)
# Output components
output_json = gr.JSON(label="Processed Data")
output_gallery = gr.Gallery(
label="Generated QR Codes",
columns=3,
height=400,
show_label=True
)
output_text = gr.Textbox(
label="Processing Status",
interactive=False
)
# Load example data
def load_example():
example = {
"type": "product_catalog",
"items": [
{
"id": "123",
"name": "Premium Widget",
"description": "High-quality widget with advanced features",
"price": 299.99,
"category": "electronics",
"tags": ["premium", "featured", "new"]
},
{
"id": "456",
"name": "Basic Widget",
"description": "Reliable widget for everyday use",
"price": 149.99,
"category": "electronics",
"tags": ["basic", "popular"]
}
],
"metadata": {
"timestamp": datetime.now().isoformat(),
"version": "2.0",
"source": "example"
}
}
return json.dumps(example, indent=2)
def clear_input():
return ""
def process_inputs(urls, files, text, combine):
"""Process all inputs and generate QR codes"""
try:
results = []
url_processor = EnhancedURLProcessor()
file_processor = EnhancedFileProcessor()
# Process JSON input
if text and text.strip():
try:
json_data = json.loads(text)
if isinstance(json_data, list):
results.extend(json_data)
else:
results.append(json_data)
except json.JSONDecodeError as e:
return None, [], f"❌ Invalid JSON format: {str(e)}"
# Process URLs
if urls and urls.strip():
url_list = re.split(r'[,\n]', urls)
url_list = [url.strip() for url in url_list if url.strip()]
for url in url_list:
validation = url_processor.validate_url(url)
if validation['is_valid']:
content = url_processor.fetch_content(url)
if content:
results.append({
'source': 'url',
'url': url,
'content': content,
'timestamp': datetime.now().isoformat()
})
# Process files
if files:
for file in files:
file_results = file_processor.process_file(file)
if file_results:
results.extend(file_results)
# Generate QR codes
if results:
qr_paths = generate_qr_codes(results, combine)
if qr_paths:
return (
results,
[str(path) for path in qr_paths],
f"βœ… Successfully processed {len(results)} items and generated {len(qr_paths)} QR codes!"
)
else:
return None, [], "❌ Failed to generate QR codes"
else:
return None, [], "⚠️ No valid content to process"
except Exception as e:
logger.error(f"Processing error: {e}")
return None, [], f"❌ Error: {str(e)}"
# Set up event handlers
example_btn.click(load_example, outputs=[text_input])
clear_btn.click(clear_input, outputs=[text_input])
process_btn.click(
process_inputs,
inputs=[url_input, file_input, text_input, combine_data],
outputs=[output_json, output_gallery, output_text]
)
# Add helpful documentation
gr.Markdown("""
### πŸš€ Features
- **Complete URL Scraping**: Extracts every character from web pages
- **Advanced File Processing**: Full content extraction from text files and archives
- **Smart JSON Handling**: Processes any size JSON with automatic chunking
- **Sequential QR Codes**: Maintains data integrity across multiple codes
- **Modern Design**: Clean, responsive interface with visual feedback
### πŸ’‘ Tips
1. **URLs**: Enter multiple URLs separated by commas or newlines
2. **Files**: Upload text files or ZIP archives containing text files
3. **JSON**: Use the example button to see the expected format
4. **QR Codes**: Choose whether to combine data into sequential codes
5. **Processing**: Monitor the status for real-time feedback
### 🎨 Output
- Generated QR codes are saved in the `output/qr_codes` directory
- Each QR code contains metadata for proper sequencing
- Hover over QR codes in the gallery to see details
""")
return interface
def main():
"""Initialize and launch the application"""
try:
# Configure system settings
mimetypes.init()
# Create and launch interface
interface = create_modern_interface()
# Launch with configuration
interface.launch(
share=False,
debug=False,
show_error=True,
show_api=False
)
except Exception as e:
logger.error(f"Application startup error: {e}")
raise
if __name__ == "__main__":
main()