from fastapi import FastAPI, BackgroundTasks, UploadFile, File, Form, Request, Query
from fastapi.responses import HTMLResponse, JSONResponse, Response, RedirectResponse
from fastapi.staticfiles import StaticFiles
import pathlib, os, uvicorn, base64, json, shutil, uuid, time, urllib.parse
from typing import Dict, List, Any, Optional
import asyncio
import logging
import threading
import concurrent.futures
from openai import OpenAI
import fitz # PyMuPDF
import tempfile
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet
import io
import docx2txt
# Logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
BASE = pathlib.Path(__file__).parent
app = FastAPI()
app.mount("/static", StaticFiles(directory=BASE), name="static")
# PDF directory (main directory)
PDF_DIR = BASE / "pdf"
if not PDF_DIR.exists():
PDF_DIR.mkdir(parents=True)
# Permanent PDF directory (Hugging Face persistent disk)
PERMANENT_PDF_DIR = pathlib.Path("/data/pdfs") if os.path.exists("/data") else BASE / "permanent_pdfs"
if not PERMANENT_PDF_DIR.exists():
PERMANENT_PDF_DIR.mkdir(parents=True)
# Cache directory
CACHE_DIR = BASE / "cache"
if not CACHE_DIR.exists():
CACHE_DIR.mkdir(parents=True)
# PDF metadata directory and file
METADATA_DIR = pathlib.Path("/data/metadata") if os.path.exists("/data") else BASE / "metadata"
if not METADATA_DIR.exists():
METADATA_DIR.mkdir(parents=True)
PDF_METADATA_FILE = METADATA_DIR / "pdf_metadata.json"
# Embedding cache directory
EMBEDDING_DIR = pathlib.Path("/data/embeddings") if os.path.exists("/data") else BASE / "embeddings"
if not EMBEDDING_DIR.exists():
EMBEDDING_DIR.mkdir(parents=True)
# Admin password
ADMIN_PASSWORD = os.getenv("PASSWORD", "admin") # Retrieved from environment variable; default is for testing
# OpenAI API key
OPENAI_API_KEY = os.getenv("LLM_API", "")
# Flag indicating if we have a valid API key
HAS_VALID_API_KEY = bool(OPENAI_API_KEY and OPENAI_API_KEY.strip())
if HAS_VALID_API_KEY:
try:
openai_client = OpenAI(api_key=OPENAI_API_KEY, timeout=30.0)
logger.info("OpenAI client initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize OpenAI client: {e}")
HAS_VALID_API_KEY = False
else:
logger.warning("No valid OpenAI API key found. AI features will be limited.")
openai_client = None
# Global cache object
pdf_cache: Dict[str, Dict[str, Any]] = {}
# Cache locks
cache_locks = {}
# PDF metadata (ID -> path)
pdf_metadata: Dict[str, str] = {}
# PDF embedding cache
pdf_embeddings: Dict[str, Dict[str, Any]] = {}
# Load PDF metadata from file
def load_pdf_metadata():
global pdf_metadata
if PDF_METADATA_FILE.exists():
try:
with open(PDF_METADATA_FILE, "r") as f:
pdf_metadata = json.load(f)
logger.info(f"PDF metadata loaded successfully: {len(pdf_metadata)} entries")
except Exception as e:
logger.error(f"Error loading metadata: {e}")
pdf_metadata = {}
else:
pdf_metadata = {}
# Save PDF metadata to file
def save_pdf_metadata():
try:
with open(PDF_METADATA_FILE, "w") as f:
json.dump(pdf_metadata, f)
except Exception as e:
logger.error(f"Error saving metadata: {e}")
# Generate a PDF ID (based on filename + timestamp)
def generate_pdf_id(filename: str) -> str:
import re
base_name = os.path.splitext(filename)[0]
safe_name = re.sub(r'[^\w\-_]', '_', base_name.replace(" ", "_"))
timestamp = int(time.time())
random_suffix = uuid.uuid4().hex[:6]
return f"{safe_name}_{timestamp}_{random_suffix}"
# Retrieve list of PDF files in main directory
def get_pdf_files():
pdf_files = []
if PDF_DIR.exists():
pdf_files = [f for f in PDF_DIR.glob("*.pdf")]
return pdf_files
# Retrieve list of PDF files in permanent directory
def get_permanent_pdf_files():
pdf_files = []
if PERMANENT_PDF_DIR.exists():
pdf_files = [f for f in PERMANENT_PDF_DIR.glob("*.pdf")]
return pdf_files
# Generate PDF project data (thumbnails, etc.)
def generate_pdf_projects():
projects_data = []
# Get files from both main and permanent directories
pdf_files = get_pdf_files()
permanent_pdf_files = get_permanent_pdf_files()
# Combine both sets of files (remove duplicates by filename)
unique_files = {}
# Add from main directory first
for file in pdf_files:
unique_files[file.name] = file
# Then add from permanent directory (overwrite if same filename)
for file in permanent_pdf_files:
unique_files[file.name] = file
for pdf_file in unique_files.values():
# Find the PDF ID for this file
pdf_id = None
for pid, path in pdf_metadata.items():
if os.path.basename(path) == pdf_file.name:
pdf_id = pid
break
# If the file has no ID, generate one and add it to metadata
if not pdf_id:
pdf_id = generate_pdf_id(pdf_file.name)
pdf_metadata[pdf_id] = str(pdf_file)
save_pdf_metadata()
projects_data.append({
"path": str(pdf_file),
"name": pdf_file.stem,
"id": pdf_id,
"cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed"
})
return projects_data
# Get path for cache file
def get_cache_path(pdf_name: str):
return CACHE_DIR / f"{pdf_name}_cache.json"
# Get path for embedding cache file
def get_embedding_path(pdf_id: str):
return EMBEDDING_DIR / f"{pdf_id}_embedding.json"
# Extract text from a PDF
def extract_pdf_text(pdf_path: str) -> List[Dict[str, Any]]:
try:
doc = fitz.open(pdf_path)
chunks = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
# Only add if the text is non-empty
if text.strip():
chunks.append({
"page": page_num + 1,
"text": text,
"chunk_id": f"page_{page_num + 1}"
})
return chunks
except Exception as e:
logger.error(f"Error extracting text from PDF: {e}")
return []
# Get or create PDF embedding by PDF ID
async def get_pdf_embedding(pdf_id: str) -> Dict[str, Any]:
try:
# Check embedding cache file
embedding_path = get_embedding_path(pdf_id)
if embedding_path.exists():
try:
with open(embedding_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading embedding cache: {e}")
# Find the actual PDF path
pdf_path = get_pdf_path_by_id(pdf_id)
if not pdf_path:
raise ValueError(f"Could not find a file corresponding to PDF ID {pdf_id}")
# Extract text
chunks = extract_pdf_text(pdf_path)
if not chunks:
raise ValueError(f"No text could be extracted from PDF: {pdf_path}")
# Here, you'd normally create or fetch embeddings. For now, we just store chunks.
embedding_data = {
"pdf_id": pdf_id,
"pdf_path": pdf_path,
"chunks": chunks,
"created_at": time.time()
}
# Save embedding data to cache
with open(embedding_path, "w", encoding="utf-8") as f:
json.dump(embedding_data, f, ensure_ascii=False)
return embedding_data
except Exception as e:
logger.error(f"Error creating PDF embedding: {e}")
return {"error": str(e), "pdf_id": pdf_id}
# Query a PDF using its content (simple approach)
async def query_pdf(pdf_id: str, query: str) -> Dict[str, Any]:
try:
# If there's no valid API key
if not HAS_VALID_API_KEY or not openai_client:
return {
"error": "OpenAI API key not set.",
"answer": "Sorry, the AI feature is currently disabled. Please contact the system administrator."
}
# Get embedding data
embedding_data = await get_pdf_embedding(pdf_id)
if "error" in embedding_data:
return {"error": embedding_data["error"]}
# For simplicity, gather all text from the PDF
all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]])
# Truncate context if too long
max_context_length = 60000 # roughly by characters
if len(all_text) > max_context_length:
all_text = all_text[:max_context_length] + "...(truncated)"
# System prompt
system_prompt = """
The default language is English. However, please respond in the language used in the user's prompt (e.g., English, Korean, Japanese, Chinese, etc.).
You are an assistant that answers questions based solely on the provided PDF content. Use only the information from the PDF content to respond. If the relevant information is not available in the PDF, respond with: "The requested information could not be found in the provided PDF."
Provide clear, concise answers and cite relevant page numbers. Always remain polite and courteous.
"""
# Attempting to call the openai_client
try:
# Retry logic
for attempt in range(3):
try:
response = openai_client.chat.completions.create(
model="gpt-4.1-mini",
messages=[
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": (
f"The default language is English."
f"Please answer the following question using the PDF content below.\n\n"
f"PDF Content:\n{all_text}\n\n"
f"Question: {query}"
),
},
],
temperature=0.7,
max_tokens=2048,
timeout=30.0
)
answer = response.choices[0].message.content
return {
"answer": answer,
"pdf_id": pdf_id,
"query": query
}
except Exception as api_error:
logger.error(f"OpenAI API call error (attempt {attempt+1}/3): {api_error}")
if attempt == 2:
raise api_error
await asyncio.sleep(1 * (attempt + 1))
raise Exception("All retry attempts for API call failed.")
except Exception as api_error:
logger.error(f"Final OpenAI API call error: {api_error}")
error_message = str(api_error)
if "Connection" in error_message:
return {"error": "Could not connect to the OpenAI server. Please check your internet connection."}
elif "Unauthorized" in error_message or "Authentication" in error_message:
return {"error": "Invalid API key."}
elif "Rate limit" in error_message:
return {"error": "API rate limit exceeded. Please try again later."}
else:
return {"error": f"An error occurred while generating the AI response: {error_message}"}
except Exception as e:
logger.error(f"Error in query_pdf: {e}")
return {"error": str(e)}
# Summarize PDF
async def summarize_pdf(pdf_id: str) -> Dict[str, Any]:
try:
# If there's no valid API key
if not HAS_VALID_API_KEY or not openai_client:
return {
"error": "OpenAI API key not set. Check 'LLM_API' environment variable.",
"summary": "Cannot generate summary without an API key. Please contact the system administrator."
}
# Get embedding data
embedding_data = await get_pdf_embedding(pdf_id)
if "error" in embedding_data:
return {"error": embedding_data["error"], "summary": "Cannot extract text from the PDF."}
all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]])
# Truncate if too long
max_context_length = 60000
if len(all_text) > max_context_length:
all_text = all_text[:max_context_length] + "...(truncated)"
try:
# Retry logic
for attempt in range(3):
try:
response = openai_client.chat.completions.create(
model="gpt-4.1-mini",
messages=[
{
"role": "system",
"content": (
"The default language is English. Please summarize the following PDF content "
"concisely, including key topics and main points, in less than 500 characters."
),
},
{"role": "user", "content": f"PDF Content:\n{all_text}"}
],
temperature=0.7,
max_tokens=1024,
timeout=30.0
)
summary = response.choices[0].message.content
return {
"summary": summary,
"pdf_id": pdf_id
}
except Exception as api_error:
logger.error(f"OpenAI API call error (attempt {attempt+1}/3): {api_error}")
if attempt == 2:
raise api_error
await asyncio.sleep(1 * (attempt + 1))
raise Exception("All retry attempts for API call failed.")
except Exception as api_error:
logger.error(f"Final OpenAI API error: {api_error}")
error_message = str(api_error)
if "Connection" in error_message:
return {"error": "Could not connect to the OpenAI server. Check your internet connection.", "pdf_id": pdf_id}
elif "Unauthorized" in error_message or "Authentication" in error_message:
return {"error": "Invalid API key.", "pdf_id": pdf_id}
elif "Rate limit" in error_message:
return {"error": "API rate limit exceeded. Please try again later.", "pdf_id": pdf_id}
else:
return {"error": f"An error occurred while generating the summary: {error_message}", "pdf_id": pdf_id}
except Exception as e:
logger.error(f"Error summarizing PDF: {e}")
return {
"error": str(e),
"summary": "An error occurred while summarizing the PDF. The PDF may be too large or in an unsupported format."
}
# Optimized PDF page caching
async def cache_pdf(pdf_path: str):
try:
import fitz
pdf_file = pathlib.Path(pdf_path)
pdf_name = pdf_file.stem
# Create a lock for this PDF (avoid concurrent caching)
if pdf_name not in cache_locks:
cache_locks[pdf_name] = threading.Lock()
# If it's already being cached or completed, skip
if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]:
logger.info(f"PDF {pdf_name} is already cached or in progress.")
return
with cache_locks[pdf_name]:
# Double check after lock acquisition
if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]:
return
pdf_cache[pdf_name] = {"status": "processing", "progress": 0, "pages": []}
# Check if there's an existing cache file
cache_path = get_cache_path(pdf_name)
if cache_path.exists():
try:
with open(cache_path, "r") as cache_file:
cached_data = json.load(cache_file)
if cached_data.get("status") == "completed" and cached_data.get("pages"):
pdf_cache[pdf_name] = cached_data
pdf_cache[pdf_name]["status"] = "completed"
logger.info(f"Loaded {pdf_name} from cache file.")
return
except Exception as e:
logger.error(f"Failed to load cache file: {e}")
# Open the PDF
doc = fitz.open(pdf_path)
total_pages = doc.page_count
# Generate a small thumbnail for the first page in advance (fast UI loading)
if total_pages > 0:
page = doc[0]
pix_thumb = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2))
thumb_data = pix_thumb.tobytes("png")
b64_thumb = base64.b64encode(thumb_data).decode('utf-8')
thumb_src = f"data:image/png;base64,{b64_thumb}"
pdf_cache[pdf_name]["pages"] = [{"thumb": thumb_src, "src": ""}]
pdf_cache[pdf_name]["progress"] = 1
pdf_cache[pdf_name]["total_pages"] = total_pages
# Adjust resolution and quality to optimize performance
scale_factor = 1.0
jpeg_quality = 80
# Worker function for parallel page processing
def process_page(page_num):
try:
page = doc[page_num]
pix = page.get_pixmap(matrix=fitz.Matrix(scale_factor, scale_factor))
img_data = pix.tobytes("jpeg", jpeg_quality)
b64_img = base64.b64encode(img_data).decode('utf-8')
img_src = f"data:image/jpeg;base64,{b64_img}"
# First page gets the thumbnail, others empty
thumb_src = "" if page_num > 0 else pdf_cache[pdf_name]["pages"][0]["thumb"]
return {
"page_num": page_num,
"src": img_src,
"thumb": thumb_src
}
except Exception as e:
logger.error(f"Error processing page {page_num}: {e}")
return {
"page_num": page_num,
"src": "",
"thumb": "",
"error": str(e)
}
pages = [None] * total_pages
processed_count = 0
# Batch processing
batch_size = 5
for batch_start in range(0, total_pages, batch_size):
batch_end = min(batch_start + batch_size, total_pages)
current_batch = list(range(batch_start, batch_end))
with concurrent.futures.ThreadPoolExecutor(max_workers=min(5, batch_size)) as executor:
batch_results = list(executor.map(process_page, current_batch))
for result in batch_results:
page_num = result["page_num"]
pages[page_num] = {
"src": result["src"],
"thumb": result["thumb"]
}
processed_count += 1
progress = round(processed_count / total_pages * 100)
pdf_cache[pdf_name]["progress"] = progress
pdf_cache[pdf_name]["pages"] = pages
try:
with open(cache_path, "w") as cache_file:
json.dump({
"status": "processing",
"progress": pdf_cache[pdf_name]["progress"],
"pages": pdf_cache[pdf_name]["pages"],
"total_pages": total_pages
}, cache_file)
except Exception as e:
logger.error(f"Failed to save intermediate cache: {e}")
pdf_cache[pdf_name] = {
"status": "completed",
"progress": 100,
"pages": pages,
"total_pages": total_pages
}
# Final save
try:
with open(cache_path, "w") as cache_file:
json.dump(pdf_cache[pdf_name], cache_file)
logger.info(f"PDF {pdf_name} cached successfully with {total_pages} pages.")
except Exception as e:
logger.error(f"Failed to save final cache: {e}")
except Exception as e:
import traceback
logger.error(f"Error caching PDF: {str(e)}\n{traceback.format_exc()}")
if pdf_name in pdf_cache:
pdf_cache[pdf_name]["status"] = "error"
pdf_cache[pdf_name]["error"] = str(e)
# Retrieve PDF path by PDF ID
def get_pdf_path_by_id(pdf_id: str) -> str:
logger.info(f"Searching for PDF by ID: {pdf_id}")
# 1. Directly check in metadata
if pdf_id in pdf_metadata:
path = pdf_metadata[pdf_id]
if os.path.exists(path):
return path
# If file was moved, try searching by filename
filename = os.path.basename(path)
# Check permanent directory
perm_path = PERMANENT_PDF_DIR / filename
if perm_path.exists():
pdf_metadata[pdf_id] = str(perm_path)
save_pdf_metadata()
return str(perm_path)
# Check main directory
main_path = PDF_DIR / filename
if main_path.exists():
pdf_metadata[pdf_id] = str(main_path)
save_pdf_metadata()
return str(main_path)
# 2. Fallback: search by partial filename
try:
name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id
for file_path in get_pdf_files() + get_permanent_pdf_files():
file_basename = os.path.basename(file_path)
if file_basename.startswith(name_part) or file_path.stem.startswith(name_part):
pdf_metadata[pdf_id] = str(file_path)
save_pdf_metadata()
return str(file_path)
except Exception as e:
logger.error(f"Error searching by filename: {e}")
# 3. As a last resort, compare with existing metadata
for pid, path in pdf_metadata.items():
if os.path.exists(path):
file_basename = os.path.basename(path)
if pdf_id in pid or pid in pdf_id:
pdf_metadata[pdf_id] = path
save_pdf_metadata()
return path
return None
# Initialize caching for all PDFs on startup
async def init_cache_all_pdfs():
logger.info("Starting PDF caching process.")
load_pdf_metadata()
pdf_files = get_pdf_files() + get_permanent_pdf_files()
unique_pdf_paths = set(str(p) for p in pdf_files)
pdf_files = [pathlib.Path(p) for p in unique_pdf_paths]
# Update metadata for all files
for pdf_file in pdf_files:
found = False
for pid, path in pdf_metadata.items():
if os.path.basename(path) == pdf_file.name:
found = True
if not os.path.exists(path):
pdf_metadata[pid] = str(pdf_file)
break
if not found:
pdf_id = generate_pdf_id(pdf_file.name)
pdf_metadata[pdf_id] = str(pdf_file)
save_pdf_metadata()
# Load existing cache for a quick start
for cache_file in CACHE_DIR.glob("*_cache.json"):
try:
pdf_name = cache_file.stem.replace("_cache", "")
with open(cache_file, "r") as f:
cached_data = json.load(f)
if cached_data.get("status") == "completed" and cached_data.get("pages"):
pdf_cache[pdf_name] = cached_data
pdf_cache[pdf_name]["status"] = "completed"
logger.info(f"Loaded existing cache: {pdf_name}")
except Exception as e:
logger.error(f"Error loading cache file: {str(e)}")
# Cache non-cached files in parallel
await asyncio.gather(*[
asyncio.create_task(cache_pdf(str(pdf_file)))
for pdf_file in pdf_files
if pdf_file.stem not in pdf_cache or pdf_cache[pdf_file.stem].get("status") != "completed"
])
@app.on_event("startup")
async def startup_event():
# Load PDF metadata
load_pdf_metadata()
# Create IDs for missing files
for pdf_file in get_pdf_files() + get_permanent_pdf_files():
found = False
for pid, path in pdf_metadata.items():
if os.path.basename(path) == pdf_file.name:
found = True
if not os.path.exists(path):
pdf_metadata[pid] = str(pdf_file)
break
if not found:
pdf_id = generate_pdf_id(pdf_file.name)
pdf_metadata[pdf_id] = str(pdf_file)
save_pdf_metadata()
# Start background caching task
asyncio.create_task(init_cache_all_pdfs())
# API endpoint: List PDF projects
@app.get("/api/pdf-projects")
async def get_pdf_projects_api():
return generate_pdf_projects()
# API endpoint: List permanently stored PDF projects
@app.get("/api/permanent-pdf-projects")
async def get_permanent_pdf_projects_api():
pdf_files = get_permanent_pdf_files()
projects_data = []
for pdf_file in pdf_files:
pdf_id = None
for pid, path in pdf_metadata.items():
if os.path.basename(path) == pdf_file.name:
pdf_id = pid
break
if not pdf_id:
pdf_id = generate_pdf_id(pdf_file.name)
pdf_metadata[pdf_id] = str(pdf_file)
save_pdf_metadata()
projects_data.append({
"path": str(pdf_file),
"name": pdf_file.stem,
"id": pdf_id,
"cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed"
})
return projects_data
# API endpoint: Get PDF info by ID
@app.get("/api/pdf-info-by-id/{pdf_id}")
async def get_pdf_info_by_id(pdf_id: str):
pdf_path = get_pdf_path_by_id(pdf_id)
if pdf_path:
pdf_file = pathlib.Path(pdf_path)
return {
"path": pdf_path,
"name": pdf_file.stem,
"id": pdf_id,
"exists": True,
"cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed"
}
return {"exists": False, "error": "Could not find the specified PDF."}
# API endpoint: Get PDF thumbnail (optimized)
@app.get("/api/pdf-thumbnail")
async def get_pdf_thumbnail(path: str):
try:
pdf_file = pathlib.Path(path)
pdf_name = pdf_file.stem
# If cached, return the thumbnail from cache
if pdf_name in pdf_cache and pdf_cache[pdf_name].get("pages"):
if pdf_cache[pdf_name]["pages"][0].get("thumb"):
return {"thumbnail": pdf_cache[pdf_name]["pages"][0]["thumb"]}
# If not cached, generate a quick thumbnail (smaller resolution)
import fitz
doc = fitz.open(path)
if doc.page_count > 0:
page = doc[0]
pix = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2))
img_data = pix.tobytes("jpeg", 70)
b64_img = base64.b64encode(img_data).decode('utf-8')
# Start background caching
asyncio.create_task(cache_pdf(path))
return {"thumbnail": f"data:image/jpeg;base64,{b64_img}"}
return {"thumbnail": None}
except Exception as e:
logger.error(f"Error generating thumbnail: {str(e)}")
return {"error": str(e), "thumbnail": None}
# API endpoint: Cache status
@app.get("/api/cache-status")
async def get_cache_status(path: str = None):
if path:
pdf_file = pathlib.Path(path)
pdf_name = pdf_file.stem
if pdf_name in pdf_cache:
return pdf_cache[pdf_name]
return {"status": "not_cached"}
else:
return {
name: {"status": info["status"], "progress": info.get("progress", 0)}
for name, info in pdf_cache.items()
}
# API endpoint: Query PDF content with AI
@app.post("/api/ai/query-pdf/{pdf_id}")
async def api_query_pdf(pdf_id: str, query: Dict[str, str]):
try:
user_query = query.get("query", "")
if not user_query:
return JSONResponse(content={"error": "No question provided."}, status_code=400)
pdf_path = get_pdf_path_by_id(pdf_id)
if not pdf_path:
return JSONResponse(content={"error": f"No file found for PDF ID {pdf_id}"}, status_code=404)
result = await query_pdf(pdf_id, user_query)
if "error" in result:
return JSONResponse(content={"error": result["error"]}, status_code=500)
return result
except Exception as e:
logger.error(f"Error in AI query endpoint: {e}")
return JSONResponse(content={"error": str(e)}, status_code=500)
# API endpoint: Summarize PDF
@app.get("/api/ai/summarize-pdf/{pdf_id}")
async def api_summarize_pdf(pdf_id: str):
try:
pdf_path = get_pdf_path_by_id(pdf_id)
if not pdf_path:
return JSONResponse(content={"error": f"No file found for PDF ID {pdf_id}"}, status_code=404)
result = await summarize_pdf(pdf_id)
if "error" in result:
return JSONResponse(content={"error": result["error"]}, status_code=500)
return result
except Exception as e:
logger.error(f"Error in PDF summary endpoint: {e}")
return JSONResponse(content={"error": str(e)}, status_code=500)
# API endpoint: Provide cached PDF content (progressive loading)
@app.get("/api/cached-pdf")
async def get_cached_pdf(path: str, background_tasks: BackgroundTasks):
try:
pdf_file = pathlib.Path(path)
pdf_name = pdf_file.stem
if pdf_name in pdf_cache:
status = pdf_cache[pdf_name].get("status", "")
if status == "completed":
return pdf_cache[pdf_name]
elif status == "processing":
progress = pdf_cache[pdf_name].get("progress", 0)
pages = pdf_cache[pdf_name].get("pages", [])
total_pages = pdf_cache[pdf_name].get("total_pages", 0)
return {
"status": "processing",
"progress": progress,
"pages": pages,
"total_pages": total_pages,
"available_pages": len([p for p in pages if p and p.get("src")])
}
# If no cache exists, start caching in the background
background_tasks.add_task(cache_pdf, path)
return {"status": "started", "progress": 0}
except Exception as e:
logger.error(f"Error providing cached PDF: {str(e)}")
return {"error": str(e), "status": "error"}
# API endpoint: Provide original PDF content (if not cached)
@app.get("/api/pdf-content")
async def get_pdf_content(path: str, background_tasks: BackgroundTasks):
try:
pdf_file = pathlib.Path(path)
if not pdf_file.exists():
return JSONResponse(content={"error": f"File not found: {path}"}, status_code=404)
pdf_name = pdf_file.stem
# If already cached or partially cached, redirect
if pdf_name in pdf_cache and (
pdf_cache[pdf_name].get("status") == "completed"
or (
pdf_cache[pdf_name].get("status") == "processing"
and pdf_cache[pdf_name].get("progress", 0) > 10
)
):
return JSONResponse(content={"redirect": f"/api/cached-pdf?path={path}"})
with open(path, "rb") as pdf_file_handle:
content = pdf_file_handle.read()
import urllib.parse
filename = pdf_file.name
encoded_filename = urllib.parse.quote(filename)
# Start caching in the background
background_tasks.add_task(cache_pdf, path)
headers = {
"Content-Type": "application/pdf",
"Content-Disposition": f'inline; filename="{encoded_filename}"; filename*=UTF-8\'\'{encoded_filename}'
}
return Response(content=content, media_type="application/pdf", headers=headers)
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error loading PDF content: {str(e)}\n{error_details}")
return JSONResponse(content={"error": str(e)}, status_code=500)
# API endpoint: Upload PDF to permanent storage
@app.post("/api/upload-pdf")
async def upload_pdf(file: UploadFile = File(...)):
try:
if not file.filename.lower().endswith('.pdf'):
return JSONResponse(content={"success": False, "message": "Only PDF files are allowed."}, status_code=400)
file_path = PERMANENT_PDF_DIR / file.filename
content = await file.read()
with open(file_path, "wb") as buffer:
buffer.write(content)
# Also copy to main directory to be automatically displayed
with open(PDF_DIR / file.filename, "wb") as buffer:
buffer.write(content)
pdf_id = generate_pdf_id(file.filename)
pdf_metadata[pdf_id] = str(file_path)
save_pdf_metadata()
asyncio.create_task(cache_pdf(str(file_path)))
return JSONResponse(
content={
"success": True,
"path": str(file_path),
"name": file_path.stem,
"id": pdf_id,
"viewUrl": f"/view/{pdf_id}"
},
status_code=200
)
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error uploading PDF: {str(e)}\n{error_details}")
return JSONResponse(content={"success": False, "message": str(e)}, status_code=500)
# Convert text file to PDF
async def convert_text_to_pdf(text_content: str, title: str) -> str:
try:
import re
safe_title = re.sub(r'[^\w\-_\. ]', '_', title)
if not safe_title:
safe_title = "aibook"
timestamp = int(time.time())
filename = f"{safe_title}_{timestamp}.pdf"
file_path = PERMANENT_PDF_DIR / filename
# Registering a Korean font. If not found, fallback to Helvetica.
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
font_path = BASE / "MaruBuri-SemiBold.ttf"
font_name = "MaruBuri"
if font_path.exists():
pdfmetrics.registerFont(TTFont(font_name, str(font_path)))
logger.info(f"Successfully registered the Korean font: {font_path}")
else:
font_name = "Helvetica"
logger.warning(f"Could not find the Korean font file: {font_path}. Using a default font.")
pdf_buffer = io.BytesIO()
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_CENTER, TA_LEFT
doc = SimpleDocTemplate(pdf_buffer, pagesize=letter, encoding='utf-8')
title_style = ParagraphStyle(
name='CustomTitle',
fontName=font_name,
fontSize=18,
leading=22,
alignment=TA_CENTER,
spaceAfter=20
)
normal_style = ParagraphStyle(
name='CustomNormal',
fontName=font_name,
fontSize=12,
leading=15,
alignment=TA_LEFT,
spaceBefore=6,
spaceAfter=6
)
content = []
# Add title
content.append(Paragraph(title, title_style))
content.append(Spacer(1, 20))
paragraphs = text_content.split('\n\n')
for para in paragraphs:
if para.strip():
from xml.sax.saxutils import escape
safe_para = escape(para.replace('\n', '
'))
p = Paragraph(safe_para, normal_style)
content.append(p)
content.append(Spacer(1, 10))
doc.build(content)
with open(file_path, 'wb') as f:
f.write(pdf_buffer.getvalue())
# Copy to main directory
with open(PDF_DIR / filename, 'wb') as f:
f.write(pdf_buffer.getvalue())
pdf_id = generate_pdf_id(filename)
pdf_metadata[pdf_id] = str(file_path)
save_pdf_metadata()
asyncio.create_task(cache_pdf(str(file_path)))
return {
"path": str(file_path),
"filename": filename,
"id": pdf_id
}
except Exception as e:
logger.error(f"Error converting text to PDF: {e}")
raise e
# AI-based text enhancement stub (placeholder)
async def enhance_text_with_ai(text_content: str, title: str) -> str:
# Currently returns the original text (AI enhancement disabled)
return text_content
# API endpoint: Convert uploaded text file to PDF
@app.post("/api/text-to-pdf")
async def text_to_pdf(file: UploadFile = File(...)):
try:
filename = file.filename.lower()
if not (filename.endswith('.txt') or filename.endswith('.docx') or filename.endswith('.doc')):
return JSONResponse(
content={"success": False, "message": "Supported file formats are .txt, .docx, and .doc only."},
status_code=400
)
content = await file.read()
# Extract text depending on file type
if filename.endswith('.txt'):
encodings = ['utf-8', 'euc-kr', 'cp949', 'latin1']
text_content = None
for encoding in encodings:
try:
text_content = content.decode(encoding, errors='strict')
logger.info(f"Detected text file encoding: {encoding}")
break
except UnicodeDecodeError:
continue
if text_content is None:
text_content = content.decode('utf-8', errors='replace')
logger.warning("Could not detect text file encoding; defaulting to UTF-8.")
elif filename.endswith('.docx') or filename.endswith('.doc'):
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file:
temp_file.write(content)
temp_path = temp_file.name
try:
text_content = docx2txt.process(temp_path)
finally:
os.unlink(temp_path)
title = os.path.splitext(filename)[0]
# Optional AI enhancement
enhanced_text = await enhance_text_with_ai(text_content, title)
# Convert the final text to PDF
pdf_info = await convert_text_to_pdf(enhanced_text, title)
return JSONResponse(
content={
"success": True,
"path": pdf_info["path"],
"name": os.path.splitext(pdf_info["filename"])[0],
"id": pdf_info["id"],
"viewUrl": f"/view/{pdf_info['id']}"
},
status_code=200
)
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error converting text to PDF: {str(e)}\n{error_details}")
return JSONResponse(content={"success": False, "message": str(e)}, status_code=500)
# Admin authentication endpoint
@app.post("/api/admin-login")
async def admin_login(password: str = Form(...)):
if password == ADMIN_PASSWORD:
return {"success": True}
return {"success": False, "message": "Authentication failed."}
# Admin: Delete PDF
@app.delete("/api/admin/delete-pdf")
async def delete_pdf(path: str):
try:
pdf_file = pathlib.Path(path)
if not pdf_file.exists():
return {"success": False, "message": "File not found."}
filename = pdf_file.name
# Delete from permanent storage
pdf_file.unlink()
# Also delete from main directory if exists
main_file_path = PDF_DIR / filename
if main_file_path.exists():
main_file_path.unlink()
# Delete related cache
pdf_name = pdf_file.stem
cache_path = get_cache_path(pdf_name)
if cache_path.exists():
cache_path.unlink()
if pdf_name in pdf_cache:
del pdf_cache[pdf_name]
# Remove from metadata
to_remove = []
for pid, fpath in pdf_metadata.items():
if os.path.basename(fpath) == filename:
to_remove.append(pid)
for pid in to_remove:
del pdf_metadata[pid]
save_pdf_metadata()
return {"success": True}
except Exception as e:
logger.error(f"Error deleting PDF: {str(e)}")
return {"success": False, "message": str(e)}
# Admin: Feature PDF (copy to main directory)
@app.post("/api/admin/feature-pdf")
async def feature_pdf(path: str):
try:
pdf_file = pathlib.Path(path)
if not pdf_file.exists():
return {"success": False, "message": "File not found."}
target_path = PDF_DIR / pdf_file.name
shutil.copy2(pdf_file, target_path)
return {"success": True}
except Exception as e:
logger.error(f"Error featuring PDF: {str(e)}")
return {"success": False, "message": str(e)}
# Admin: Unfeature PDF (remove from main directory only)
@app.delete("/api/admin/unfeature-pdf")
async def unfeature_pdf(path: str):
try:
pdf_name = pathlib.Path(path).name
target_path = PDF_DIR / pdf_name
if target_path.exists():
target_path.unlink()
return {"success": True}
except Exception as e:
logger.error(f"Error unfeaturing PDF: {str(e)}")
return {"success": False, "message": str(e)}
@app.get("/view/{pdf_id}")
async def view_pdf_by_id(pdf_id: str):
pdf_path = get_pdf_path_by_id(pdf_id)
if not pdf_path:
# Reload metadata and retry
load_pdf_metadata()
pdf_path = get_pdf_path_by_id(pdf_id)
if not pdf_path:
# As a final fallback, try scanning all files for a match
for file_path in get_pdf_files() + get_permanent_pdf_files():
name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id
if file_path.stem.startswith(name_part):
pdf_metadata[pdf_id] = str(file_path)
save_pdf_metadata()
pdf_path = str(file_path)
break
if not pdf_path:
return HTMLResponse(
content=(
f"
ID: {pdf_id}
Go back to home" ), status_code=404 ) # Redirect to the main page with PDF ID parameter return get_html_content(pdf_id=pdf_id) def get_html_content(pdf_id: str = None): html_path = BASE / "flipbook_template.html" content = "" if html_path.exists(): with open(html_path, "r", encoding="utf-8") as f: content = f.read() else: content = HTML # fallback if no local template if pdf_id: auto_load_script = f""" """ content = content.replace("HTML_TEMPLATE secret is not configured.
Please set the HTML_TEMPLATE in your Hugging Face Space secrets.
", auto_load_script + "") return HTMLResponse(content=content) @app.get("/", response_class=HTMLResponse) async def root(request: Request, pdf_id: Optional[str] = Query(None)): if pdf_id: return RedirectResponse(url=f"/view/{pdf_id}") return get_html_content() import os HTML = os.getenv("HTML_TEMPLATE", "") if not HTML: logger.warning("HTML_TEMPLATE secret is not set. Using default HTML.") HTML = """