Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
from fastapi import FastAPI, BackgroundTasks, UploadFile, File, Form, Request, Query | |
from fastapi.responses import HTMLResponse, JSONResponse, Response, RedirectResponse | |
from fastapi.staticfiles import StaticFiles | |
import pathlib, os, uvicorn, base64, json, shutil, uuid, time, urllib.parse | |
from typing import Dict, List, Any, Optional | |
import asyncio | |
import logging | |
import threading | |
import concurrent.futures | |
from openai import OpenAI | |
import fitz # PyMuPDF | |
import tempfile | |
from reportlab.lib.pagesizes import letter | |
from reportlab.pdfgen import canvas | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
from reportlab.lib.styles import getSampleStyleSheet | |
import io | |
import docx2txt | |
# Logging configuration | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
BASE = pathlib.Path(__file__).parent | |
app = FastAPI() | |
app.mount("/static", StaticFiles(directory=BASE), name="static") | |
# PDF directory (main directory) | |
PDF_DIR = BASE / "pdf" | |
if not PDF_DIR.exists(): | |
PDF_DIR.mkdir(parents=True) | |
# Permanent PDF directory (Hugging Face persistent disk) | |
PERMANENT_PDF_DIR = pathlib.Path("/data/pdfs") if os.path.exists("/data") else BASE / "permanent_pdfs" | |
if not PERMANENT_PDF_DIR.exists(): | |
PERMANENT_PDF_DIR.mkdir(parents=True) | |
# Cache directory | |
CACHE_DIR = BASE / "cache" | |
if not CACHE_DIR.exists(): | |
CACHE_DIR.mkdir(parents=True) | |
# PDF metadata directory and file | |
METADATA_DIR = pathlib.Path("/data/metadata") if os.path.exists("/data") else BASE / "metadata" | |
if not METADATA_DIR.exists(): | |
METADATA_DIR.mkdir(parents=True) | |
PDF_METADATA_FILE = METADATA_DIR / "pdf_metadata.json" | |
# Embedding cache directory | |
EMBEDDING_DIR = pathlib.Path("/data/embeddings") if os.path.exists("/data") else BASE / "embeddings" | |
if not EMBEDDING_DIR.exists(): | |
EMBEDDING_DIR.mkdir(parents=True) | |
# Admin password | |
ADMIN_PASSWORD = os.getenv("PASSWORD", "admin") # Retrieved from environment variable; default is for testing | |
# OpenAI API key | |
OPENAI_API_KEY = os.getenv("LLM_API", "") | |
# Flag indicating if we have a valid API key | |
HAS_VALID_API_KEY = bool(OPENAI_API_KEY and OPENAI_API_KEY.strip()) | |
if HAS_VALID_API_KEY: | |
try: | |
openai_client = OpenAI(api_key=OPENAI_API_KEY, timeout=30.0) | |
logger.info("OpenAI client initialized successfully.") | |
except Exception as e: | |
logger.error(f"Failed to initialize OpenAI client: {e}") | |
HAS_VALID_API_KEY = False | |
else: | |
logger.warning("No valid OpenAI API key found. AI features will be limited.") | |
openai_client = None | |
# Global cache object | |
pdf_cache: Dict[str, Dict[str, Any]] = {} | |
# Cache locks | |
cache_locks = {} | |
# PDF metadata (ID -> path) | |
pdf_metadata: Dict[str, str] = {} | |
# PDF embedding cache | |
pdf_embeddings: Dict[str, Dict[str, Any]] = {} | |
# Load PDF metadata from file | |
def load_pdf_metadata(): | |
global pdf_metadata | |
if PDF_METADATA_FILE.exists(): | |
try: | |
with open(PDF_METADATA_FILE, "r") as f: | |
pdf_metadata = json.load(f) | |
logger.info(f"PDF metadata loaded successfully: {len(pdf_metadata)} entries") | |
except Exception as e: | |
logger.error(f"Error loading metadata: {e}") | |
pdf_metadata = {} | |
else: | |
pdf_metadata = {} | |
# Save PDF metadata to file | |
def save_pdf_metadata(): | |
try: | |
with open(PDF_METADATA_FILE, "w") as f: | |
json.dump(pdf_metadata, f) | |
except Exception as e: | |
logger.error(f"Error saving metadata: {e}") | |
# Generate a PDF ID (based on filename + timestamp) | |
def generate_pdf_id(filename: str) -> str: | |
import re | |
base_name = os.path.splitext(filename)[0] | |
safe_name = re.sub(r'[^\w\-_]', '_', base_name.replace(" ", "_")) | |
timestamp = int(time.time()) | |
random_suffix = uuid.uuid4().hex[:6] | |
return f"{safe_name}_{timestamp}_{random_suffix}" | |
# Retrieve list of PDF files in main directory | |
def get_pdf_files(): | |
pdf_files = [] | |
if PDF_DIR.exists(): | |
pdf_files = [f for f in PDF_DIR.glob("*.pdf")] | |
return pdf_files | |
# Retrieve list of PDF files in permanent directory | |
def get_permanent_pdf_files(): | |
pdf_files = [] | |
if PERMANENT_PDF_DIR.exists(): | |
pdf_files = [f for f in PERMANENT_PDF_DIR.glob("*.pdf")] | |
return pdf_files | |
# Generate PDF project data (thumbnails, etc.) | |
def generate_pdf_projects(): | |
projects_data = [] | |
# Get files from both main and permanent directories | |
pdf_files = get_pdf_files() | |
permanent_pdf_files = get_permanent_pdf_files() | |
# Combine both sets of files (remove duplicates by filename) | |
unique_files = {} | |
# Add from main directory first | |
for file in pdf_files: | |
unique_files[file.name] = file | |
# Then add from permanent directory (overwrite if same filename) | |
for file in permanent_pdf_files: | |
unique_files[file.name] = file | |
for pdf_file in unique_files.values(): | |
# Find the PDF ID for this file | |
pdf_id = None | |
for pid, path in pdf_metadata.items(): | |
if os.path.basename(path) == pdf_file.name: | |
pdf_id = pid | |
break | |
# If the file has no ID, generate one and add it to metadata | |
if not pdf_id: | |
pdf_id = generate_pdf_id(pdf_file.name) | |
pdf_metadata[pdf_id] = str(pdf_file) | |
save_pdf_metadata() | |
projects_data.append({ | |
"path": str(pdf_file), | |
"name": pdf_file.stem, | |
"id": pdf_id, | |
"cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" | |
}) | |
return projects_data | |
# Get path for cache file | |
def get_cache_path(pdf_name: str): | |
return CACHE_DIR / f"{pdf_name}_cache.json" | |
# Get path for embedding cache file | |
def get_embedding_path(pdf_id: str): | |
return EMBEDDING_DIR / f"{pdf_id}_embedding.json" | |
# Extract text from a PDF | |
def extract_pdf_text(pdf_path: str) -> List[Dict[str, Any]]: | |
try: | |
doc = fitz.open(pdf_path) | |
chunks = [] | |
for page_num in range(len(doc)): | |
page = doc[page_num] | |
text = page.get_text() | |
# Only add if the text is non-empty | |
if text.strip(): | |
chunks.append({ | |
"page": page_num + 1, | |
"text": text, | |
"chunk_id": f"page_{page_num + 1}" | |
}) | |
return chunks | |
except Exception as e: | |
logger.error(f"Error extracting text from PDF: {e}") | |
return [] | |
# Get or create PDF embedding by PDF ID | |
async def get_pdf_embedding(pdf_id: str) -> Dict[str, Any]: | |
try: | |
# Check embedding cache file | |
embedding_path = get_embedding_path(pdf_id) | |
if embedding_path.exists(): | |
try: | |
with open(embedding_path, "r", encoding="utf-8") as f: | |
return json.load(f) | |
except Exception as e: | |
logger.error(f"Error loading embedding cache: {e}") | |
# Find the actual PDF path | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if not pdf_path: | |
raise ValueError(f"Could not find a file corresponding to PDF ID {pdf_id}") | |
# Extract text | |
chunks = extract_pdf_text(pdf_path) | |
if not chunks: | |
raise ValueError(f"No text could be extracted from PDF: {pdf_path}") | |
# Here, you'd normally create or fetch embeddings. For now, we just store chunks. | |
embedding_data = { | |
"pdf_id": pdf_id, | |
"pdf_path": pdf_path, | |
"chunks": chunks, | |
"created_at": time.time() | |
} | |
# Save embedding data to cache | |
with open(embedding_path, "w", encoding="utf-8") as f: | |
json.dump(embedding_data, f, ensure_ascii=False) | |
return embedding_data | |
except Exception as e: | |
logger.error(f"Error creating PDF embedding: {e}") | |
return {"error": str(e), "pdf_id": pdf_id} | |
# Query a PDF using its content (simple approach) | |
async def query_pdf(pdf_id: str, query: str) -> Dict[str, Any]: | |
try: | |
# If there's no valid API key | |
if not HAS_VALID_API_KEY or not openai_client: | |
return { | |
"error": "OpenAI API key not set.", | |
"answer": "Sorry, the AI feature is currently disabled. Please contact the system administrator." | |
} | |
# Get embedding data | |
embedding_data = await get_pdf_embedding(pdf_id) | |
if "error" in embedding_data: | |
return {"error": embedding_data["error"]} | |
# For simplicity, gather all text from the PDF | |
all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]]) | |
# Truncate context if too long | |
max_context_length = 60000 # roughly by characters | |
if len(all_text) > max_context_length: | |
all_text = all_text[:max_context_length] + "...(truncated)" | |
# System prompt | |
system_prompt = """ | |
The default language is English. However, please respond in the language used in the user's prompt (e.g., English, Korean, Japanese, Chinese, etc.). | |
You are an assistant that answers questions based solely on the provided PDF content. Use only the information from the PDF content to respond. If the relevant information is not available in the PDF, respond with: "The requested information could not be found in the provided PDF." | |
Provide clear, concise answers and cite relevant page numbers. Always remain polite and courteous. | |
""" | |
# Attempting to call the openai_client | |
try: | |
# Retry logic | |
for attempt in range(3): | |
try: | |
response = openai_client.chat.completions.create( | |
model="gpt-4.1-mini", | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{ | |
"role": "user", | |
"content": ( | |
f"The default language is English." | |
f"Please answer the following question using the PDF content below.\n\n" | |
f"PDF Content:\n{all_text}\n\n" | |
f"Question: {query}" | |
), | |
}, | |
], | |
temperature=0.7, | |
max_tokens=2048, | |
timeout=30.0 | |
) | |
answer = response.choices[0].message.content | |
return { | |
"answer": answer, | |
"pdf_id": pdf_id, | |
"query": query | |
} | |
except Exception as api_error: | |
logger.error(f"OpenAI API call error (attempt {attempt+1}/3): {api_error}") | |
if attempt == 2: | |
raise api_error | |
await asyncio.sleep(1 * (attempt + 1)) | |
raise Exception("All retry attempts for API call failed.") | |
except Exception as api_error: | |
logger.error(f"Final OpenAI API call error: {api_error}") | |
error_message = str(api_error) | |
if "Connection" in error_message: | |
return {"error": "Could not connect to the OpenAI server. Please check your internet connection."} | |
elif "Unauthorized" in error_message or "Authentication" in error_message: | |
return {"error": "Invalid API key."} | |
elif "Rate limit" in error_message: | |
return {"error": "API rate limit exceeded. Please try again later."} | |
else: | |
return {"error": f"An error occurred while generating the AI response: {error_message}"} | |
except Exception as e: | |
logger.error(f"Error in query_pdf: {e}") | |
return {"error": str(e)} | |
# Summarize PDF | |
async def summarize_pdf(pdf_id: str) -> Dict[str, Any]: | |
try: | |
# If there's no valid API key | |
if not HAS_VALID_API_KEY or not openai_client: | |
return { | |
"error": "OpenAI API key not set. Check 'LLM_API' environment variable.", | |
"summary": "Cannot generate summary without an API key. Please contact the system administrator." | |
} | |
# Get embedding data | |
embedding_data = await get_pdf_embedding(pdf_id) | |
if "error" in embedding_data: | |
return {"error": embedding_data["error"], "summary": "Cannot extract text from the PDF."} | |
all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]]) | |
# Truncate if too long | |
max_context_length = 60000 | |
if len(all_text) > max_context_length: | |
all_text = all_text[:max_context_length] + "...(truncated)" | |
try: | |
# Retry logic | |
for attempt in range(3): | |
try: | |
response = openai_client.chat.completions.create( | |
model="gpt-4.1-mini", | |
messages=[ | |
{ | |
"role": "system", | |
"content": ( | |
"The default language is English. Please summarize the following PDF content " | |
"concisely, including key topics and main points, in less than 500 characters." | |
), | |
}, | |
{"role": "user", "content": f"PDF Content:\n{all_text}"} | |
], | |
temperature=0.7, | |
max_tokens=1024, | |
timeout=30.0 | |
) | |
summary = response.choices[0].message.content | |
return { | |
"summary": summary, | |
"pdf_id": pdf_id | |
} | |
except Exception as api_error: | |
logger.error(f"OpenAI API call error (attempt {attempt+1}/3): {api_error}") | |
if attempt == 2: | |
raise api_error | |
await asyncio.sleep(1 * (attempt + 1)) | |
raise Exception("All retry attempts for API call failed.") | |
except Exception as api_error: | |
logger.error(f"Final OpenAI API error: {api_error}") | |
error_message = str(api_error) | |
if "Connection" in error_message: | |
return {"error": "Could not connect to the OpenAI server. Check your internet connection.", "pdf_id": pdf_id} | |
elif "Unauthorized" in error_message or "Authentication" in error_message: | |
return {"error": "Invalid API key.", "pdf_id": pdf_id} | |
elif "Rate limit" in error_message: | |
return {"error": "API rate limit exceeded. Please try again later.", "pdf_id": pdf_id} | |
else: | |
return {"error": f"An error occurred while generating the summary: {error_message}", "pdf_id": pdf_id} | |
except Exception as e: | |
logger.error(f"Error summarizing PDF: {e}") | |
return { | |
"error": str(e), | |
"summary": "An error occurred while summarizing the PDF. The PDF may be too large or in an unsupported format." | |
} | |
# Optimized PDF page caching | |
async def cache_pdf(pdf_path: str): | |
try: | |
import fitz | |
pdf_file = pathlib.Path(pdf_path) | |
pdf_name = pdf_file.stem | |
# Create a lock for this PDF (avoid concurrent caching) | |
if pdf_name not in cache_locks: | |
cache_locks[pdf_name] = threading.Lock() | |
# If it's already being cached or completed, skip | |
if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]: | |
logger.info(f"PDF {pdf_name} is already cached or in progress.") | |
return | |
with cache_locks[pdf_name]: | |
# Double check after lock acquisition | |
if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]: | |
return | |
pdf_cache[pdf_name] = {"status": "processing", "progress": 0, "pages": []} | |
# Check if there's an existing cache file | |
cache_path = get_cache_path(pdf_name) | |
if cache_path.exists(): | |
try: | |
with open(cache_path, "r") as cache_file: | |
cached_data = json.load(cache_file) | |
if cached_data.get("status") == "completed" and cached_data.get("pages"): | |
pdf_cache[pdf_name] = cached_data | |
pdf_cache[pdf_name]["status"] = "completed" | |
logger.info(f"Loaded {pdf_name} from cache file.") | |
return | |
except Exception as e: | |
logger.error(f"Failed to load cache file: {e}") | |
# Open the PDF | |
doc = fitz.open(pdf_path) | |
total_pages = doc.page_count | |
# Generate a small thumbnail for the first page in advance (fast UI loading) | |
if total_pages > 0: | |
page = doc[0] | |
pix_thumb = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2)) | |
thumb_data = pix_thumb.tobytes("png") | |
b64_thumb = base64.b64encode(thumb_data).decode('utf-8') | |
thumb_src = f"data:image/png;base64,{b64_thumb}" | |
pdf_cache[pdf_name]["pages"] = [{"thumb": thumb_src, "src": ""}] | |
pdf_cache[pdf_name]["progress"] = 1 | |
pdf_cache[pdf_name]["total_pages"] = total_pages | |
# Adjust resolution and quality to optimize performance | |
scale_factor = 1.0 | |
jpeg_quality = 80 | |
# Worker function for parallel page processing | |
def process_page(page_num): | |
try: | |
page = doc[page_num] | |
pix = page.get_pixmap(matrix=fitz.Matrix(scale_factor, scale_factor)) | |
img_data = pix.tobytes("jpeg", jpeg_quality) | |
b64_img = base64.b64encode(img_data).decode('utf-8') | |
img_src = f"data:image/jpeg;base64,{b64_img}" | |
# First page gets the thumbnail, others empty | |
thumb_src = "" if page_num > 0 else pdf_cache[pdf_name]["pages"][0]["thumb"] | |
return { | |
"page_num": page_num, | |
"src": img_src, | |
"thumb": thumb_src | |
} | |
except Exception as e: | |
logger.error(f"Error processing page {page_num}: {e}") | |
return { | |
"page_num": page_num, | |
"src": "", | |
"thumb": "", | |
"error": str(e) | |
} | |
pages = [None] * total_pages | |
processed_count = 0 | |
# Batch processing | |
batch_size = 5 | |
for batch_start in range(0, total_pages, batch_size): | |
batch_end = min(batch_start + batch_size, total_pages) | |
current_batch = list(range(batch_start, batch_end)) | |
with concurrent.futures.ThreadPoolExecutor(max_workers=min(5, batch_size)) as executor: | |
batch_results = list(executor.map(process_page, current_batch)) | |
for result in batch_results: | |
page_num = result["page_num"] | |
pages[page_num] = { | |
"src": result["src"], | |
"thumb": result["thumb"] | |
} | |
processed_count += 1 | |
progress = round(processed_count / total_pages * 100) | |
pdf_cache[pdf_name]["progress"] = progress | |
pdf_cache[pdf_name]["pages"] = pages | |
try: | |
with open(cache_path, "w") as cache_file: | |
json.dump({ | |
"status": "processing", | |
"progress": pdf_cache[pdf_name]["progress"], | |
"pages": pdf_cache[pdf_name]["pages"], | |
"total_pages": total_pages | |
}, cache_file) | |
except Exception as e: | |
logger.error(f"Failed to save intermediate cache: {e}") | |
pdf_cache[pdf_name] = { | |
"status": "completed", | |
"progress": 100, | |
"pages": pages, | |
"total_pages": total_pages | |
} | |
# Final save | |
try: | |
with open(cache_path, "w") as cache_file: | |
json.dump(pdf_cache[pdf_name], cache_file) | |
logger.info(f"PDF {pdf_name} cached successfully with {total_pages} pages.") | |
except Exception as e: | |
logger.error(f"Failed to save final cache: {e}") | |
except Exception as e: | |
import traceback | |
logger.error(f"Error caching PDF: {str(e)}\n{traceback.format_exc()}") | |
if pdf_name in pdf_cache: | |
pdf_cache[pdf_name]["status"] = "error" | |
pdf_cache[pdf_name]["error"] = str(e) | |
# Retrieve PDF path by PDF ID | |
def get_pdf_path_by_id(pdf_id: str) -> str: | |
logger.info(f"Searching for PDF by ID: {pdf_id}") | |
# 1. Directly check in metadata | |
if pdf_id in pdf_metadata: | |
path = pdf_metadata[pdf_id] | |
if os.path.exists(path): | |
return path | |
# If file was moved, try searching by filename | |
filename = os.path.basename(path) | |
# Check permanent directory | |
perm_path = PERMANENT_PDF_DIR / filename | |
if perm_path.exists(): | |
pdf_metadata[pdf_id] = str(perm_path) | |
save_pdf_metadata() | |
return str(perm_path) | |
# Check main directory | |
main_path = PDF_DIR / filename | |
if main_path.exists(): | |
pdf_metadata[pdf_id] = str(main_path) | |
save_pdf_metadata() | |
return str(main_path) | |
# 2. Fallback: search by partial filename | |
try: | |
name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id | |
for file_path in get_pdf_files() + get_permanent_pdf_files(): | |
file_basename = os.path.basename(file_path) | |
if file_basename.startswith(name_part) or file_path.stem.startswith(name_part): | |
pdf_metadata[pdf_id] = str(file_path) | |
save_pdf_metadata() | |
return str(file_path) | |
except Exception as e: | |
logger.error(f"Error searching by filename: {e}") | |
# 3. As a last resort, compare with existing metadata | |
for pid, path in pdf_metadata.items(): | |
if os.path.exists(path): | |
file_basename = os.path.basename(path) | |
if pdf_id in pid or pid in pdf_id: | |
pdf_metadata[pdf_id] = path | |
save_pdf_metadata() | |
return path | |
return None | |
# Initialize caching for all PDFs on startup | |
async def init_cache_all_pdfs(): | |
logger.info("Starting PDF caching process.") | |
load_pdf_metadata() | |
pdf_files = get_pdf_files() + get_permanent_pdf_files() | |
unique_pdf_paths = set(str(p) for p in pdf_files) | |
pdf_files = [pathlib.Path(p) for p in unique_pdf_paths] | |
# Update metadata for all files | |
for pdf_file in pdf_files: | |
found = False | |
for pid, path in pdf_metadata.items(): | |
if os.path.basename(path) == pdf_file.name: | |
found = True | |
if not os.path.exists(path): | |
pdf_metadata[pid] = str(pdf_file) | |
break | |
if not found: | |
pdf_id = generate_pdf_id(pdf_file.name) | |
pdf_metadata[pdf_id] = str(pdf_file) | |
save_pdf_metadata() | |
# Load existing cache for a quick start | |
for cache_file in CACHE_DIR.glob("*_cache.json"): | |
try: | |
pdf_name = cache_file.stem.replace("_cache", "") | |
with open(cache_file, "r") as f: | |
cached_data = json.load(f) | |
if cached_data.get("status") == "completed" and cached_data.get("pages"): | |
pdf_cache[pdf_name] = cached_data | |
pdf_cache[pdf_name]["status"] = "completed" | |
logger.info(f"Loaded existing cache: {pdf_name}") | |
except Exception as e: | |
logger.error(f"Error loading cache file: {str(e)}") | |
# Cache non-cached files in parallel | |
await asyncio.gather(*[ | |
asyncio.create_task(cache_pdf(str(pdf_file))) | |
for pdf_file in pdf_files | |
if pdf_file.stem not in pdf_cache or pdf_cache[pdf_file.stem].get("status") != "completed" | |
]) | |
async def startup_event(): | |
# Load PDF metadata | |
load_pdf_metadata() | |
# Create IDs for missing files | |
for pdf_file in get_pdf_files() + get_permanent_pdf_files(): | |
found = False | |
for pid, path in pdf_metadata.items(): | |
if os.path.basename(path) == pdf_file.name: | |
found = True | |
if not os.path.exists(path): | |
pdf_metadata[pid] = str(pdf_file) | |
break | |
if not found: | |
pdf_id = generate_pdf_id(pdf_file.name) | |
pdf_metadata[pdf_id] = str(pdf_file) | |
save_pdf_metadata() | |
# Start background caching task | |
asyncio.create_task(init_cache_all_pdfs()) | |
# API endpoint: List PDF projects | |
async def get_pdf_projects_api(): | |
return generate_pdf_projects() | |
# API endpoint: List permanently stored PDF projects | |
async def get_permanent_pdf_projects_api(): | |
pdf_files = get_permanent_pdf_files() | |
projects_data = [] | |
for pdf_file in pdf_files: | |
pdf_id = None | |
for pid, path in pdf_metadata.items(): | |
if os.path.basename(path) == pdf_file.name: | |
pdf_id = pid | |
break | |
if not pdf_id: | |
pdf_id = generate_pdf_id(pdf_file.name) | |
pdf_metadata[pdf_id] = str(pdf_file) | |
save_pdf_metadata() | |
projects_data.append({ | |
"path": str(pdf_file), | |
"name": pdf_file.stem, | |
"id": pdf_id, | |
"cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" | |
}) | |
return projects_data | |
# API endpoint: Get PDF info by ID | |
async def get_pdf_info_by_id(pdf_id: str): | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if pdf_path: | |
pdf_file = pathlib.Path(pdf_path) | |
return { | |
"path": pdf_path, | |
"name": pdf_file.stem, | |
"id": pdf_id, | |
"exists": True, | |
"cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" | |
} | |
return {"exists": False, "error": "Could not find the specified PDF."} | |
# API endpoint: Get PDF thumbnail (optimized) | |
async def get_pdf_thumbnail(path: str): | |
try: | |
pdf_file = pathlib.Path(path) | |
pdf_name = pdf_file.stem | |
# If cached, return the thumbnail from cache | |
if pdf_name in pdf_cache and pdf_cache[pdf_name].get("pages"): | |
if pdf_cache[pdf_name]["pages"][0].get("thumb"): | |
return {"thumbnail": pdf_cache[pdf_name]["pages"][0]["thumb"]} | |
# If not cached, generate a quick thumbnail (smaller resolution) | |
import fitz | |
doc = fitz.open(path) | |
if doc.page_count > 0: | |
page = doc[0] | |
pix = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2)) | |
img_data = pix.tobytes("jpeg", 70) | |
b64_img = base64.b64encode(img_data).decode('utf-8') | |
# Start background caching | |
asyncio.create_task(cache_pdf(path)) | |
return {"thumbnail": f"data:image/jpeg;base64,{b64_img}"} | |
return {"thumbnail": None} | |
except Exception as e: | |
logger.error(f"Error generating thumbnail: {str(e)}") | |
return {"error": str(e), "thumbnail": None} | |
# API endpoint: Cache status | |
async def get_cache_status(path: str = None): | |
if path: | |
pdf_file = pathlib.Path(path) | |
pdf_name = pdf_file.stem | |
if pdf_name in pdf_cache: | |
return pdf_cache[pdf_name] | |
return {"status": "not_cached"} | |
else: | |
return { | |
name: {"status": info["status"], "progress": info.get("progress", 0)} | |
for name, info in pdf_cache.items() | |
} | |
# API endpoint: Query PDF content with AI | |
async def api_query_pdf(pdf_id: str, query: Dict[str, str]): | |
try: | |
user_query = query.get("query", "") | |
if not user_query: | |
return JSONResponse(content={"error": "No question provided."}, status_code=400) | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if not pdf_path: | |
return JSONResponse(content={"error": f"No file found for PDF ID {pdf_id}"}, status_code=404) | |
result = await query_pdf(pdf_id, user_query) | |
if "error" in result: | |
return JSONResponse(content={"error": result["error"]}, status_code=500) | |
return result | |
except Exception as e: | |
logger.error(f"Error in AI query endpoint: {e}") | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
# API endpoint: Summarize PDF | |
async def api_summarize_pdf(pdf_id: str): | |
try: | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if not pdf_path: | |
return JSONResponse(content={"error": f"No file found for PDF ID {pdf_id}"}, status_code=404) | |
result = await summarize_pdf(pdf_id) | |
if "error" in result: | |
return JSONResponse(content={"error": result["error"]}, status_code=500) | |
return result | |
except Exception as e: | |
logger.error(f"Error in PDF summary endpoint: {e}") | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
# API endpoint: Provide cached PDF content (progressive loading) | |
async def get_cached_pdf(path: str, background_tasks: BackgroundTasks): | |
try: | |
pdf_file = pathlib.Path(path) | |
pdf_name = pdf_file.stem | |
if pdf_name in pdf_cache: | |
status = pdf_cache[pdf_name].get("status", "") | |
if status == "completed": | |
return pdf_cache[pdf_name] | |
elif status == "processing": | |
progress = pdf_cache[pdf_name].get("progress", 0) | |
pages = pdf_cache[pdf_name].get("pages", []) | |
total_pages = pdf_cache[pdf_name].get("total_pages", 0) | |
return { | |
"status": "processing", | |
"progress": progress, | |
"pages": pages, | |
"total_pages": total_pages, | |
"available_pages": len([p for p in pages if p and p.get("src")]) | |
} | |
# If no cache exists, start caching in the background | |
background_tasks.add_task(cache_pdf, path) | |
return {"status": "started", "progress": 0} | |
except Exception as e: | |
logger.error(f"Error providing cached PDF: {str(e)}") | |
return {"error": str(e), "status": "error"} | |
# API endpoint: Provide original PDF content (if not cached) | |
async def get_pdf_content(path: str, background_tasks: BackgroundTasks): | |
try: | |
pdf_file = pathlib.Path(path) | |
if not pdf_file.exists(): | |
return JSONResponse(content={"error": f"File not found: {path}"}, status_code=404) | |
pdf_name = pdf_file.stem | |
# If already cached or partially cached, redirect | |
if pdf_name in pdf_cache and ( | |
pdf_cache[pdf_name].get("status") == "completed" | |
or ( | |
pdf_cache[pdf_name].get("status") == "processing" | |
and pdf_cache[pdf_name].get("progress", 0) > 10 | |
) | |
): | |
return JSONResponse(content={"redirect": f"/api/cached-pdf?path={path}"}) | |
with open(path, "rb") as pdf_file_handle: | |
content = pdf_file_handle.read() | |
import urllib.parse | |
filename = pdf_file.name | |
encoded_filename = urllib.parse.quote(filename) | |
# Start caching in the background | |
background_tasks.add_task(cache_pdf, path) | |
headers = { | |
"Content-Type": "application/pdf", | |
"Content-Disposition": f'inline; filename="{encoded_filename}"; filename*=UTF-8\'\'{encoded_filename}' | |
} | |
return Response(content=content, media_type="application/pdf", headers=headers) | |
except Exception as e: | |
import traceback | |
error_details = traceback.format_exc() | |
logger.error(f"Error loading PDF content: {str(e)}\n{error_details}") | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
# API endpoint: Upload PDF to permanent storage | |
async def upload_pdf(file: UploadFile = File(...)): | |
try: | |
if not file.filename.lower().endswith('.pdf'): | |
return JSONResponse(content={"success": False, "message": "Only PDF files are allowed."}, status_code=400) | |
file_path = PERMANENT_PDF_DIR / file.filename | |
content = await file.read() | |
with open(file_path, "wb") as buffer: | |
buffer.write(content) | |
# Also copy to main directory to be automatically displayed | |
with open(PDF_DIR / file.filename, "wb") as buffer: | |
buffer.write(content) | |
pdf_id = generate_pdf_id(file.filename) | |
pdf_metadata[pdf_id] = str(file_path) | |
save_pdf_metadata() | |
asyncio.create_task(cache_pdf(str(file_path))) | |
return JSONResponse( | |
content={ | |
"success": True, | |
"path": str(file_path), | |
"name": file_path.stem, | |
"id": pdf_id, | |
"viewUrl": f"/view/{pdf_id}" | |
}, | |
status_code=200 | |
) | |
except Exception as e: | |
import traceback | |
error_details = traceback.format_exc() | |
logger.error(f"Error uploading PDF: {str(e)}\n{error_details}") | |
return JSONResponse(content={"success": False, "message": str(e)}, status_code=500) | |
# Convert text file to PDF | |
async def convert_text_to_pdf(text_content: str, title: str) -> str: | |
try: | |
import re | |
safe_title = re.sub(r'[^\w\-_\. ]', '_', title) | |
if not safe_title: | |
safe_title = "aibook" | |
timestamp = int(time.time()) | |
filename = f"{safe_title}_{timestamp}.pdf" | |
file_path = PERMANENT_PDF_DIR / filename | |
# Registering a Korean font. If not found, fallback to Helvetica. | |
from reportlab.pdfbase import pdfmetrics | |
from reportlab.pdfbase.ttfonts import TTFont | |
font_path = BASE / "MaruBuri-SemiBold.ttf" | |
font_name = "MaruBuri" | |
if font_path.exists(): | |
pdfmetrics.registerFont(TTFont(font_name, str(font_path))) | |
logger.info(f"Successfully registered the Korean font: {font_path}") | |
else: | |
font_name = "Helvetica" | |
logger.warning(f"Could not find the Korean font file: {font_path}. Using a default font.") | |
pdf_buffer = io.BytesIO() | |
from reportlab.lib.pagesizes import letter | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
from reportlab.lib.enums import TA_CENTER, TA_LEFT | |
doc = SimpleDocTemplate(pdf_buffer, pagesize=letter, encoding='utf-8') | |
title_style = ParagraphStyle( | |
name='CustomTitle', | |
fontName=font_name, | |
fontSize=18, | |
leading=22, | |
alignment=TA_CENTER, | |
spaceAfter=20 | |
) | |
normal_style = ParagraphStyle( | |
name='CustomNormal', | |
fontName=font_name, | |
fontSize=12, | |
leading=15, | |
alignment=TA_LEFT, | |
spaceBefore=6, | |
spaceAfter=6 | |
) | |
content = [] | |
# Add title | |
content.append(Paragraph(title, title_style)) | |
content.append(Spacer(1, 20)) | |
paragraphs = text_content.split('\n\n') | |
for para in paragraphs: | |
if para.strip(): | |
from xml.sax.saxutils import escape | |
safe_para = escape(para.replace('\n', '<br/>')) | |
p = Paragraph(safe_para, normal_style) | |
content.append(p) | |
content.append(Spacer(1, 10)) | |
doc.build(content) | |
with open(file_path, 'wb') as f: | |
f.write(pdf_buffer.getvalue()) | |
# Copy to main directory | |
with open(PDF_DIR / filename, 'wb') as f: | |
f.write(pdf_buffer.getvalue()) | |
pdf_id = generate_pdf_id(filename) | |
pdf_metadata[pdf_id] = str(file_path) | |
save_pdf_metadata() | |
asyncio.create_task(cache_pdf(str(file_path))) | |
return { | |
"path": str(file_path), | |
"filename": filename, | |
"id": pdf_id | |
} | |
except Exception as e: | |
logger.error(f"Error converting text to PDF: {e}") | |
raise e | |
# AI-based text enhancement stub (placeholder) | |
async def enhance_text_with_ai(text_content: str, title: str) -> str: | |
# Currently returns the original text (AI enhancement disabled) | |
return text_content | |
# API endpoint: Convert uploaded text file to PDF | |
async def text_to_pdf(file: UploadFile = File(...)): | |
try: | |
filename = file.filename.lower() | |
if not (filename.endswith('.txt') or filename.endswith('.docx') or filename.endswith('.doc')): | |
return JSONResponse( | |
content={"success": False, "message": "Supported file formats are .txt, .docx, and .doc only."}, | |
status_code=400 | |
) | |
content = await file.read() | |
# Extract text depending on file type | |
if filename.endswith('.txt'): | |
encodings = ['utf-8', 'euc-kr', 'cp949', 'latin1'] | |
text_content = None | |
for encoding in encodings: | |
try: | |
text_content = content.decode(encoding, errors='strict') | |
logger.info(f"Detected text file encoding: {encoding}") | |
break | |
except UnicodeDecodeError: | |
continue | |
if text_content is None: | |
text_content = content.decode('utf-8', errors='replace') | |
logger.warning("Could not detect text file encoding; defaulting to UTF-8.") | |
elif filename.endswith('.docx') or filename.endswith('.doc'): | |
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file: | |
temp_file.write(content) | |
temp_path = temp_file.name | |
try: | |
text_content = docx2txt.process(temp_path) | |
finally: | |
os.unlink(temp_path) | |
title = os.path.splitext(filename)[0] | |
# Optional AI enhancement | |
enhanced_text = await enhance_text_with_ai(text_content, title) | |
# Convert the final text to PDF | |
pdf_info = await convert_text_to_pdf(enhanced_text, title) | |
return JSONResponse( | |
content={ | |
"success": True, | |
"path": pdf_info["path"], | |
"name": os.path.splitext(pdf_info["filename"])[0], | |
"id": pdf_info["id"], | |
"viewUrl": f"/view/{pdf_info['id']}" | |
}, | |
status_code=200 | |
) | |
except Exception as e: | |
import traceback | |
error_details = traceback.format_exc() | |
logger.error(f"Error converting text to PDF: {str(e)}\n{error_details}") | |
return JSONResponse(content={"success": False, "message": str(e)}, status_code=500) | |
# Admin authentication endpoint | |
async def admin_login(password: str = Form(...)): | |
if password == ADMIN_PASSWORD: | |
return {"success": True} | |
return {"success": False, "message": "Authentication failed."} | |
# Admin: Delete PDF | |
async def delete_pdf(path: str): | |
try: | |
pdf_file = pathlib.Path(path) | |
if not pdf_file.exists(): | |
return {"success": False, "message": "File not found."} | |
filename = pdf_file.name | |
# Delete from permanent storage | |
pdf_file.unlink() | |
# Also delete from main directory if exists | |
main_file_path = PDF_DIR / filename | |
if main_file_path.exists(): | |
main_file_path.unlink() | |
# Delete related cache | |
pdf_name = pdf_file.stem | |
cache_path = get_cache_path(pdf_name) | |
if cache_path.exists(): | |
cache_path.unlink() | |
if pdf_name in pdf_cache: | |
del pdf_cache[pdf_name] | |
# Remove from metadata | |
to_remove = [] | |
for pid, fpath in pdf_metadata.items(): | |
if os.path.basename(fpath) == filename: | |
to_remove.append(pid) | |
for pid in to_remove: | |
del pdf_metadata[pid] | |
save_pdf_metadata() | |
return {"success": True} | |
except Exception as e: | |
logger.error(f"Error deleting PDF: {str(e)}") | |
return {"success": False, "message": str(e)} | |
# Admin: Feature PDF (copy to main directory) | |
async def feature_pdf(path: str): | |
try: | |
pdf_file = pathlib.Path(path) | |
if not pdf_file.exists(): | |
return {"success": False, "message": "File not found."} | |
target_path = PDF_DIR / pdf_file.name | |
shutil.copy2(pdf_file, target_path) | |
return {"success": True} | |
except Exception as e: | |
logger.error(f"Error featuring PDF: {str(e)}") | |
return {"success": False, "message": str(e)} | |
# Admin: Unfeature PDF (remove from main directory only) | |
async def unfeature_pdf(path: str): | |
try: | |
pdf_name = pathlib.Path(path).name | |
target_path = PDF_DIR / pdf_name | |
if target_path.exists(): | |
target_path.unlink() | |
return {"success": True} | |
except Exception as e: | |
logger.error(f"Error unfeaturing PDF: {str(e)}") | |
return {"success": False, "message": str(e)} | |
async def view_pdf_by_id(pdf_id: str): | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if not pdf_path: | |
# Reload metadata and retry | |
load_pdf_metadata() | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if not pdf_path: | |
# As a final fallback, try scanning all files for a match | |
for file_path in get_pdf_files() + get_permanent_pdf_files(): | |
name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id | |
if file_path.stem.startswith(name_part): | |
pdf_metadata[pdf_id] = str(file_path) | |
save_pdf_metadata() | |
pdf_path = str(file_path) | |
break | |
if not pdf_path: | |
return HTMLResponse( | |
content=( | |
f"<html><body><h1>Could not find the requested PDF</h1>" | |
f"<p>ID: {pdf_id}</p><a href='/'>Go back to home</a></body></html>" | |
), | |
status_code=404 | |
) | |
# Redirect to the main page with PDF ID parameter | |
return get_html_content(pdf_id=pdf_id) | |
def get_html_content(pdf_id: str = None): | |
html_path = BASE / "flipbook_template.html" | |
content = "" | |
if html_path.exists(): | |
with open(html_path, "r", encoding="utf-8") as f: | |
content = f.read() | |
else: | |
content = HTML # fallback if no local template | |
if pdf_id: | |
auto_load_script = f""" | |
<script> | |
document.addEventListener('DOMContentLoaded', async function() {{ | |
try {{ | |
const response = await fetch('/api/pdf-info-by-id/{pdf_id}'); | |
const pdfInfo = await response.json(); | |
if (pdfInfo.exists && pdfInfo.path) {{ | |
setTimeout(() => {{ | |
openPdfById('{pdf_id}', pdfInfo.path, pdfInfo.cached); | |
}}, 500); | |
}} else {{ | |
showError("The requested PDF could not be found."); | |
}} | |
}} catch (e) {{ | |
console.error("Auto-load PDF error:", e); | |
}} | |
}}); | |
</script> | |
""" | |
content = content.replace("</body>", auto_load_script + "</body>") | |
return HTMLResponse(content=content) | |
async def root(request: Request, pdf_id: Optional[str] = Query(None)): | |
if pdf_id: | |
return RedirectResponse(url=f"/view/{pdf_id}") | |
return get_html_content() | |
import os | |
HTML = os.getenv("HTML_TEMPLATE", "") | |
if not HTML: | |
logger.warning("HTML_TEMPLATE secret is not set. Using default HTML.") | |
HTML = """ | |
<!doctype html> | |
<html lang="en"> | |
<head> | |
<meta charset="utf-8"> | |
<title>FlipBook Space</title> | |
<style> | |
body { font-family: Arial, sans-serif; text-align: center; padding: 50px; } | |
.error { color: red; } | |
</style> | |
</head> | |
<body> | |
<h1>Could not load the HTML template</h1> | |
<p class="error">HTML_TEMPLATE secret is not configured.</p> | |
<p>Please set the HTML_TEMPLATE in your Hugging Face Space secrets.</p> | |
</body> | |
</html> | |
""" | |
if __name__ == "__main__": | |
uvicorn.run("app:app", host="0.0.0.0", port=int(os.getenv("PORT", 7860))) | |