|
from fastapi import APIRouter, UploadFile, File, HTTPException |
|
import os |
|
import shutil |
|
import uuid |
|
from bs4 import BeautifulSoup |
|
from PyPDF2 import PdfReader |
|
import requests |
|
from fastapi import Form |
|
from typing import Optional, List |
|
import re |
|
from urllib.parse import urlparse |
|
import html |
|
import validators |
|
|
|
router = APIRouter(tags=["files"]) |
|
|
|
|
|
session_files = {} |
|
|
|
|
|
UPLOAD_ROOT = "uploaded_files" |
|
os.makedirs(UPLOAD_ROOT, exist_ok=True) |
|
|
|
|
|
MIN_FILE_LENGTH = 500 |
|
|
|
|
|
MAX_CONTENT_SIZE = 5 * 1024 * 1024 |
|
REQUEST_TIMEOUT = 10 |
|
|
|
ALLOWED_DOMAINS: List[str] = [] |
|
|
|
BLOCKED_EXTENSIONS = ['.exe', '.sh', '.bat', '.dll', '.jar', '.msi'] |
|
|
|
def validate_pdf(file_path: str) -> bool: |
|
"""Validate if file is a valid PDF.""" |
|
try: |
|
reader = PdfReader(file_path) |
|
|
|
if len(reader.pages) == 0: |
|
return False |
|
|
|
|
|
text = "" |
|
for page in reader.pages: |
|
text += page.extract_text() |
|
|
|
return len(text) >= MIN_FILE_LENGTH |
|
except: |
|
return False |
|
|
|
def validate_markdown(file_path: str) -> bool: |
|
"""Validate if file is a valid Markdown file.""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
return len(content) >= MIN_FILE_LENGTH and any(marker in content for marker in ['#', '-', '*', '`', '[', '>']) |
|
except: |
|
return False |
|
|
|
def validate_html(file_path: str) -> bool: |
|
"""Validate if file is a valid HTML file.""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
if len(content) < MIN_FILE_LENGTH: |
|
return False |
|
BeautifulSoup(content, 'html.parser') |
|
return True |
|
except: |
|
return False |
|
|
|
def validate_txt(file_path: str) -> bool: |
|
"""Validate if file is a valid text file.""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
return len(content.strip()) >= MIN_FILE_LENGTH |
|
except: |
|
return False |
|
|
|
|
|
precalculated_docs = ["the-bitter-lesson", "hurricane-faq", "pokemon-guide"] |
|
|
|
for doc_id in precalculated_docs: |
|
doc_dir = os.path.join(UPLOAD_ROOT, doc_id) |
|
if os.path.exists(doc_dir): |
|
doc_files_dir = os.path.join(doc_dir, "uploaded_files") |
|
if os.path.exists(doc_files_dir): |
|
for filename in os.listdir(doc_files_dir): |
|
if filename.endswith((".pdf", ".txt", ".html", ".md")): |
|
file_path = os.path.join(doc_files_dir, filename) |
|
session_files[doc_id] = file_path |
|
print(f"Added pre-calculated document to session_files: {doc_id} -> {file_path}") |
|
break |
|
else: |
|
|
|
for filename in os.listdir(doc_dir): |
|
if filename.endswith((".pdf", ".txt", ".html", ".md")): |
|
file_path = os.path.join(doc_dir, filename) |
|
session_files[doc_id] = file_path |
|
print(f"Added pre-calculated document to session_files: {doc_id} -> {file_path}") |
|
break |
|
|
|
@router.post("/upload") |
|
async def upload_file(file: UploadFile = File(...)): |
|
""" |
|
Upload a file to the server and generate a session ID |
|
|
|
Args: |
|
file: The file to upload |
|
|
|
Returns: |
|
Dictionary with filename, status and session_id |
|
""" |
|
|
|
if not file.filename.endswith(('.pdf', '.txt', '.html', '.md')): |
|
raise HTTPException(status_code=400, detail="Only PDF, TXT, HTML and MD files are accepted") |
|
|
|
|
|
file_extension = os.path.splitext(file.filename)[1].lower() |
|
|
|
|
|
session_id = str(uuid.uuid4()) |
|
|
|
|
|
session_dir = os.path.join(UPLOAD_ROOT, session_id) |
|
uploaded_files_dir = os.path.join(session_dir, "uploaded_files") |
|
os.makedirs(uploaded_files_dir, exist_ok=True) |
|
|
|
|
|
standardized_filename = f"document{file_extension}" |
|
|
|
|
|
file_path = os.path.join(uploaded_files_dir, standardized_filename) |
|
|
|
|
|
with open(file_path, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
is_valid = False |
|
error_detail = "" |
|
|
|
if file_extension == '.pdf': |
|
try: |
|
reader = PdfReader(file_path) |
|
if len(reader.pages) == 0: |
|
error_detail = "PDF must contain at least one page" |
|
is_valid = False |
|
else: |
|
text = "" |
|
for page in reader.pages: |
|
text += page.extract_text() |
|
|
|
if len(text) < MIN_FILE_LENGTH: |
|
error_detail = f"PDF contains {len(text)} characters but must contain at least {MIN_FILE_LENGTH}" |
|
is_valid = False |
|
else: |
|
is_valid = True |
|
except: |
|
error_detail = "Invalid PDF format" |
|
is_valid = False |
|
elif file_extension == '.md': |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
if len(content) < MIN_FILE_LENGTH: |
|
error_detail = f"Markdown file contains {len(content)} characters but must contain at least {MIN_FILE_LENGTH}" |
|
is_valid = False |
|
elif not any(marker in content for marker in ['#', '-', '*', '`', '[', '>']): |
|
error_detail = "Markdown file does not contain any valid Markdown elements" |
|
is_valid = False |
|
else: |
|
is_valid = True |
|
except: |
|
error_detail = "Invalid Markdown format" |
|
is_valid = False |
|
elif file_extension == '.html': |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
if len(content) < MIN_FILE_LENGTH: |
|
error_detail = f"HTML file contains {len(content)} characters but must contain at least {MIN_FILE_LENGTH}" |
|
is_valid = False |
|
else: |
|
BeautifulSoup(content, 'html.parser') |
|
is_valid = True |
|
except: |
|
error_detail = "Invalid HTML format" |
|
is_valid = False |
|
elif file_extension == '.txt': |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
content_length = len(content.strip()) |
|
|
|
if content_length < MIN_FILE_LENGTH: |
|
error_detail = f"Text file contains {content_length} characters but must contain at least {MIN_FILE_LENGTH}" |
|
is_valid = False |
|
else: |
|
is_valid = True |
|
except: |
|
error_detail = "Invalid text format" |
|
is_valid = False |
|
|
|
if not is_valid: |
|
|
|
os.remove(file_path) |
|
raise HTTPException(status_code=400, detail=error_detail or f"Invalid {file_extension[1:].upper()} file") |
|
|
|
|
|
session_files[session_id] = file_path |
|
|
|
return {"filename": standardized_filename, "status": "uploaded", "session_id": session_id} |
|
|
|
@router.post("/upload-url") |
|
async def upload_url(url: str = Form(...)): |
|
""" |
|
Upload content from a URL, extract text and store it as a document |
|
|
|
Args: |
|
url: The URL to download content from |
|
|
|
Returns: |
|
Dictionary with status and session_id |
|
""" |
|
try: |
|
|
|
if not validators.url(url): |
|
raise HTTPException(status_code=400, detail="Invalid URL format") |
|
|
|
|
|
parsed_url = urlparse(url) |
|
path = parsed_url.path.lower() |
|
if any(path.endswith(ext) for ext in BLOCKED_EXTENSIONS): |
|
raise HTTPException(status_code=400, detail="This file type is not allowed") |
|
|
|
|
|
domain = parsed_url.netloc |
|
if ALLOWED_DOMAINS and domain not in ALLOWED_DOMAINS: |
|
raise HTTPException(status_code=403, detail="This domain is not in the allowed list") |
|
|
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (compatible; YourBenchBot/1.0; +https://yourbench.example.com)', |
|
'Accept': 'text/html,application/xhtml+xml', |
|
'Accept-Language': 'en-US,en;q=0.5', |
|
} |
|
|
|
response = requests.get( |
|
url, |
|
timeout=REQUEST_TIMEOUT, |
|
headers=headers, |
|
stream=True |
|
) |
|
response.raise_for_status() |
|
|
|
|
|
content_type = response.headers.get('Content-Type', '') |
|
if not content_type.startswith(('text/html', 'text/plain', 'application/xhtml+xml')): |
|
raise HTTPException( |
|
status_code=400, |
|
detail=f"Unsupported content type: {content_type}. Only HTML and text formats are supported." |
|
) |
|
|
|
|
|
content_length = int(response.headers.get('Content-Length', 0)) |
|
if content_length > MAX_CONTENT_SIZE: |
|
raise HTTPException( |
|
status_code=400, |
|
detail=f"Content too large ({content_length} bytes). Maximum size: {MAX_CONTENT_SIZE} bytes." |
|
) |
|
|
|
|
|
content = "" |
|
bytes_read = 0 |
|
for chunk in response.iter_content(chunk_size=8192, decode_unicode=True): |
|
bytes_read += len(chunk.encode('utf-8') if isinstance(chunk, str) else chunk) |
|
if bytes_read > MAX_CONTENT_SIZE: |
|
raise HTTPException( |
|
status_code=400, |
|
detail=f"Content exceeded maximum allowed size of {MAX_CONTENT_SIZE} bytes" |
|
) |
|
content += chunk if isinstance(chunk, str) else chunk.decode('utf-8', errors='replace') |
|
|
|
|
|
soup = BeautifulSoup(content, 'html.parser') |
|
|
|
|
|
for element in soup(['script', 'style', 'iframe', 'object', 'embed', 'noscript']): |
|
element.extract() |
|
|
|
|
|
for tag in soup.find_all(True): |
|
for attr in list(tag.attrs): |
|
if attr.startswith('on'): |
|
del tag[attr] |
|
|
|
|
|
text = soup.get_text() |
|
|
|
|
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = '\n'.join(chunk for chunk in chunks if chunk) |
|
|
|
|
|
text = html.escape(text) |
|
|
|
|
|
if len(text) > 25000: |
|
text = text[:25000] |
|
|
|
|
|
if len(text.strip()) < MIN_FILE_LENGTH: |
|
raise HTTPException( |
|
status_code=400, |
|
detail=f"The content is too short ({len(text.strip())} characters). Minimum required: {MIN_FILE_LENGTH} characters." |
|
) |
|
|
|
|
|
session_id = str(uuid.uuid4()) |
|
|
|
session_dir = os.path.join(UPLOAD_ROOT, session_id) |
|
uploaded_files_dir = os.path.join(session_dir, "uploaded_files") |
|
os.makedirs(uploaded_files_dir, exist_ok=True) |
|
|
|
|
|
file_path = os.path.join(uploaded_files_dir, "document.txt") |
|
|
|
|
|
with open(file_path, "w", encoding="utf-8") as f: |
|
f.write(text) |
|
|
|
|
|
session_files[session_id] = file_path |
|
|
|
return { |
|
"status": "uploaded", |
|
"session_id": session_id, |
|
"filename": "document.txt", |
|
"text_length": len(text), |
|
"source_url": url |
|
} |
|
|
|
except requests.exceptions.RequestException as e: |
|
raise HTTPException(status_code=400, detail=f"Error retrieving the URL: {str(e)}") |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Error processing the URL: {str(e)}") |