from fastapi import FastAPI, BackgroundTasks, UploadFile, File, Form, Request, Query from fastapi.responses import HTMLResponse, JSONResponse, Response, RedirectResponse from fastapi.staticfiles import StaticFiles import pathlib, os, uvicorn, base64, json, shutil, uuid, time, urllib.parse from typing import Dict, List, Any, Optional import asyncio import logging import threading import concurrent.futures from openai import OpenAI import fitz # PyMuPDF import tempfile from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet import io import docx2txt # Logging configuration logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) BASE = pathlib.Path(__file__).parent app = FastAPI() app.mount("/static", StaticFiles(directory=BASE), name="static") # PDF directory (main directory) PDF_DIR = BASE / "pdf" if not PDF_DIR.exists(): PDF_DIR.mkdir(parents=True) # Permanent PDF directory (Hugging Face persistent disk) PERMANENT_PDF_DIR = pathlib.Path("/data/pdfs") if os.path.exists("/data") else BASE / "permanent_pdfs" if not PERMANENT_PDF_DIR.exists(): PERMANENT_PDF_DIR.mkdir(parents=True) # Cache directory CACHE_DIR = BASE / "cache" if not CACHE_DIR.exists(): CACHE_DIR.mkdir(parents=True) # PDF metadata directory and file METADATA_DIR = pathlib.Path("/data/metadata") if os.path.exists("/data") else BASE / "metadata" if not METADATA_DIR.exists(): METADATA_DIR.mkdir(parents=True) PDF_METADATA_FILE = METADATA_DIR / "pdf_metadata.json" # Embedding cache directory EMBEDDING_DIR = pathlib.Path("/data/embeddings") if os.path.exists("/data") else BASE / "embeddings" if not EMBEDDING_DIR.exists(): EMBEDDING_DIR.mkdir(parents=True) # Admin password ADMIN_PASSWORD = os.getenv("PASSWORD", "admin") # Retrieved from environment variable; default is for testing # OpenAI API key OPENAI_API_KEY = os.getenv("LLM_API", "") # Flag indicating if we have a valid API key HAS_VALID_API_KEY = bool(OPENAI_API_KEY and OPENAI_API_KEY.strip()) if HAS_VALID_API_KEY: try: openai_client = OpenAI(api_key=OPENAI_API_KEY, timeout=30.0) logger.info("OpenAI client initialized successfully.") except Exception as e: logger.error(f"Failed to initialize OpenAI client: {e}") HAS_VALID_API_KEY = False else: logger.warning("No valid OpenAI API key found. AI features will be limited.") openai_client = None # Global cache object pdf_cache: Dict[str, Dict[str, Any]] = {} # Cache locks cache_locks = {} # PDF metadata (ID -> path) pdf_metadata: Dict[str, str] = {} # PDF embedding cache pdf_embeddings: Dict[str, Dict[str, Any]] = {} # Load PDF metadata from file def load_pdf_metadata(): global pdf_metadata if PDF_METADATA_FILE.exists(): try: with open(PDF_METADATA_FILE, "r") as f: pdf_metadata = json.load(f) logger.info(f"PDF metadata loaded successfully: {len(pdf_metadata)} entries") except Exception as e: logger.error(f"Error loading metadata: {e}") pdf_metadata = {} else: pdf_metadata = {} # Save PDF metadata to file def save_pdf_metadata(): try: with open(PDF_METADATA_FILE, "w") as f: json.dump(pdf_metadata, f) except Exception as e: logger.error(f"Error saving metadata: {e}") # Generate a PDF ID (based on filename + timestamp) def generate_pdf_id(filename: str) -> str: import re base_name = os.path.splitext(filename)[0] safe_name = re.sub(r'[^\w\-_]', '_', base_name.replace(" ", "_")) timestamp = int(time.time()) random_suffix = uuid.uuid4().hex[:6] return f"{safe_name}_{timestamp}_{random_suffix}" # Retrieve list of PDF files in main directory def get_pdf_files(): pdf_files = [] if PDF_DIR.exists(): pdf_files = [f for f in PDF_DIR.glob("*.pdf")] return pdf_files # Retrieve list of PDF files in permanent directory def get_permanent_pdf_files(): pdf_files = [] if PERMANENT_PDF_DIR.exists(): pdf_files = [f for f in PERMANENT_PDF_DIR.glob("*.pdf")] return pdf_files # Generate PDF project data (thumbnails, etc.) def generate_pdf_projects(): projects_data = [] # Get files from both main and permanent directories pdf_files = get_pdf_files() permanent_pdf_files = get_permanent_pdf_files() # Combine both sets of files (remove duplicates by filename) unique_files = {} # Add from main directory first for file in pdf_files: unique_files[file.name] = file # Then add from permanent directory (overwrite if same filename) for file in permanent_pdf_files: unique_files[file.name] = file for pdf_file in unique_files.values(): # Find the PDF ID for this file pdf_id = None for pid, path in pdf_metadata.items(): if os.path.basename(path) == pdf_file.name: pdf_id = pid break # If the file has no ID, generate one and add it to metadata if not pdf_id: pdf_id = generate_pdf_id(pdf_file.name) pdf_metadata[pdf_id] = str(pdf_file) save_pdf_metadata() projects_data.append({ "path": str(pdf_file), "name": pdf_file.stem, "id": pdf_id, "cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" }) return projects_data # Get path for cache file def get_cache_path(pdf_name: str): return CACHE_DIR / f"{pdf_name}_cache.json" # Get path for embedding cache file def get_embedding_path(pdf_id: str): return EMBEDDING_DIR / f"{pdf_id}_embedding.json" # Extract text from a PDF def extract_pdf_text(pdf_path: str) -> List[Dict[str, Any]]: try: doc = fitz.open(pdf_path) chunks = [] for page_num in range(len(doc)): page = doc[page_num] text = page.get_text() # Only add if the text is non-empty if text.strip(): chunks.append({ "page": page_num + 1, "text": text, "chunk_id": f"page_{page_num + 1}" }) return chunks except Exception as e: logger.error(f"Error extracting text from PDF: {e}") return [] # Get or create PDF embedding by PDF ID async def get_pdf_embedding(pdf_id: str) -> Dict[str, Any]: try: # Check embedding cache file embedding_path = get_embedding_path(pdf_id) if embedding_path.exists(): try: with open(embedding_path, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: logger.error(f"Error loading embedding cache: {e}") # Find the actual PDF path pdf_path = get_pdf_path_by_id(pdf_id) if not pdf_path: raise ValueError(f"Could not find a file corresponding to PDF ID {pdf_id}") # Extract text chunks = extract_pdf_text(pdf_path) if not chunks: raise ValueError(f"No text could be extracted from PDF: {pdf_path}") # Here, you'd normally create or fetch embeddings. For now, we just store chunks. embedding_data = { "pdf_id": pdf_id, "pdf_path": pdf_path, "chunks": chunks, "created_at": time.time() } # Save embedding data to cache with open(embedding_path, "w", encoding="utf-8") as f: json.dump(embedding_data, f, ensure_ascii=False) return embedding_data except Exception as e: logger.error(f"Error creating PDF embedding: {e}") return {"error": str(e), "pdf_id": pdf_id} # Query a PDF using its content (simple approach) async def query_pdf(pdf_id: str, query: str) -> Dict[str, Any]: try: # If there's no valid API key if not HAS_VALID_API_KEY or not openai_client: return { "error": "OpenAI API key not set.", "answer": "Sorry, the AI feature is currently disabled. Please contact the system administrator." } # Get embedding data embedding_data = await get_pdf_embedding(pdf_id) if "error" in embedding_data: return {"error": embedding_data["error"]} # For simplicity, gather all text from the PDF all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]]) # Truncate context if too long max_context_length = 60000 # roughly by characters if len(all_text) > max_context_length: all_text = all_text[:max_context_length] + "...(truncated)" # System prompt system_prompt = """ The default language is English. However, please respond in the language used in the user's prompt (e.g., English, Korean, Japanese, Chinese, etc.). You are an assistant that answers questions based solely on the provided PDF content. Use only the information from the PDF content to respond. If the relevant information is not available in the PDF, respond with: "The requested information could not be found in the provided PDF." Provide clear, concise answers and cite relevant page numbers. Always remain polite and courteous. """ # Attempting to call the openai_client try: # Retry logic for attempt in range(3): try: response = openai_client.chat.completions.create( model="gpt-4.1-mini", messages=[ {"role": "system", "content": system_prompt}, { "role": "user", "content": ( f"The default language is English." f"Please answer the following question using the PDF content below.\n\n" f"PDF Content:\n{all_text}\n\n" f"Question: {query}" ), }, ], temperature=0.7, max_tokens=2048, timeout=30.0 ) answer = response.choices[0].message.content return { "answer": answer, "pdf_id": pdf_id, "query": query } except Exception as api_error: logger.error(f"OpenAI API call error (attempt {attempt+1}/3): {api_error}") if attempt == 2: raise api_error await asyncio.sleep(1 * (attempt + 1)) raise Exception("All retry attempts for API call failed.") except Exception as api_error: logger.error(f"Final OpenAI API call error: {api_error}") error_message = str(api_error) if "Connection" in error_message: return {"error": "Could not connect to the OpenAI server. Please check your internet connection."} elif "Unauthorized" in error_message or "Authentication" in error_message: return {"error": "Invalid API key."} elif "Rate limit" in error_message: return {"error": "API rate limit exceeded. Please try again later."} else: return {"error": f"An error occurred while generating the AI response: {error_message}"} except Exception as e: logger.error(f"Error in query_pdf: {e}") return {"error": str(e)} # Summarize PDF async def summarize_pdf(pdf_id: str) -> Dict[str, Any]: try: # If there's no valid API key if not HAS_VALID_API_KEY or not openai_client: return { "error": "OpenAI API key not set. Check 'LLM_API' environment variable.", "summary": "Cannot generate summary without an API key. Please contact the system administrator." } # Get embedding data embedding_data = await get_pdf_embedding(pdf_id) if "error" in embedding_data: return {"error": embedding_data["error"], "summary": "Cannot extract text from the PDF."} all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]]) # Truncate if too long max_context_length = 60000 if len(all_text) > max_context_length: all_text = all_text[:max_context_length] + "...(truncated)" try: # Retry logic for attempt in range(3): try: response = openai_client.chat.completions.create( model="gpt-4.1-mini", messages=[ { "role": "system", "content": ( "The default language is English. Please summarize the following PDF content " "concisely, including key topics and main points, in less than 500 characters." ), }, {"role": "user", "content": f"PDF Content:\n{all_text}"} ], temperature=0.7, max_tokens=1024, timeout=30.0 ) summary = response.choices[0].message.content return { "summary": summary, "pdf_id": pdf_id } except Exception as api_error: logger.error(f"OpenAI API call error (attempt {attempt+1}/3): {api_error}") if attempt == 2: raise api_error await asyncio.sleep(1 * (attempt + 1)) raise Exception("All retry attempts for API call failed.") except Exception as api_error: logger.error(f"Final OpenAI API error: {api_error}") error_message = str(api_error) if "Connection" in error_message: return {"error": "Could not connect to the OpenAI server. Check your internet connection.", "pdf_id": pdf_id} elif "Unauthorized" in error_message or "Authentication" in error_message: return {"error": "Invalid API key.", "pdf_id": pdf_id} elif "Rate limit" in error_message: return {"error": "API rate limit exceeded. Please try again later.", "pdf_id": pdf_id} else: return {"error": f"An error occurred while generating the summary: {error_message}", "pdf_id": pdf_id} except Exception as e: logger.error(f"Error summarizing PDF: {e}") return { "error": str(e), "summary": "An error occurred while summarizing the PDF. The PDF may be too large or in an unsupported format." } # Optimized PDF page caching async def cache_pdf(pdf_path: str): try: import fitz pdf_file = pathlib.Path(pdf_path) pdf_name = pdf_file.stem # Create a lock for this PDF (avoid concurrent caching) if pdf_name not in cache_locks: cache_locks[pdf_name] = threading.Lock() # If it's already being cached or completed, skip if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]: logger.info(f"PDF {pdf_name} is already cached or in progress.") return with cache_locks[pdf_name]: # Double check after lock acquisition if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]: return pdf_cache[pdf_name] = {"status": "processing", "progress": 0, "pages": []} # Check if there's an existing cache file cache_path = get_cache_path(pdf_name) if cache_path.exists(): try: with open(cache_path, "r") as cache_file: cached_data = json.load(cache_file) if cached_data.get("status") == "completed" and cached_data.get("pages"): pdf_cache[pdf_name] = cached_data pdf_cache[pdf_name]["status"] = "completed" logger.info(f"Loaded {pdf_name} from cache file.") return except Exception as e: logger.error(f"Failed to load cache file: {e}") # Open the PDF doc = fitz.open(pdf_path) total_pages = doc.page_count # Generate a small thumbnail for the first page in advance (fast UI loading) if total_pages > 0: page = doc[0] pix_thumb = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2)) thumb_data = pix_thumb.tobytes("png") b64_thumb = base64.b64encode(thumb_data).decode('utf-8') thumb_src = f"data:image/png;base64,{b64_thumb}" pdf_cache[pdf_name]["pages"] = [{"thumb": thumb_src, "src": ""}] pdf_cache[pdf_name]["progress"] = 1 pdf_cache[pdf_name]["total_pages"] = total_pages # Adjust resolution and quality to optimize performance scale_factor = 1.0 jpeg_quality = 80 # Worker function for parallel page processing def process_page(page_num): try: page = doc[page_num] pix = page.get_pixmap(matrix=fitz.Matrix(scale_factor, scale_factor)) img_data = pix.tobytes("jpeg", jpeg_quality) b64_img = base64.b64encode(img_data).decode('utf-8') img_src = f"data:image/jpeg;base64,{b64_img}" # First page gets the thumbnail, others empty thumb_src = "" if page_num > 0 else pdf_cache[pdf_name]["pages"][0]["thumb"] return { "page_num": page_num, "src": img_src, "thumb": thumb_src } except Exception as e: logger.error(f"Error processing page {page_num}: {e}") return { "page_num": page_num, "src": "", "thumb": "", "error": str(e) } pages = [None] * total_pages processed_count = 0 # Batch processing batch_size = 5 for batch_start in range(0, total_pages, batch_size): batch_end = min(batch_start + batch_size, total_pages) current_batch = list(range(batch_start, batch_end)) with concurrent.futures.ThreadPoolExecutor(max_workers=min(5, batch_size)) as executor: batch_results = list(executor.map(process_page, current_batch)) for result in batch_results: page_num = result["page_num"] pages[page_num] = { "src": result["src"], "thumb": result["thumb"] } processed_count += 1 progress = round(processed_count / total_pages * 100) pdf_cache[pdf_name]["progress"] = progress pdf_cache[pdf_name]["pages"] = pages try: with open(cache_path, "w") as cache_file: json.dump({ "status": "processing", "progress": pdf_cache[pdf_name]["progress"], "pages": pdf_cache[pdf_name]["pages"], "total_pages": total_pages }, cache_file) except Exception as e: logger.error(f"Failed to save intermediate cache: {e}") pdf_cache[pdf_name] = { "status": "completed", "progress": 100, "pages": pages, "total_pages": total_pages } # Final save try: with open(cache_path, "w") as cache_file: json.dump(pdf_cache[pdf_name], cache_file) logger.info(f"PDF {pdf_name} cached successfully with {total_pages} pages.") except Exception as e: logger.error(f"Failed to save final cache: {e}") except Exception as e: import traceback logger.error(f"Error caching PDF: {str(e)}\n{traceback.format_exc()}") if pdf_name in pdf_cache: pdf_cache[pdf_name]["status"] = "error" pdf_cache[pdf_name]["error"] = str(e) # Retrieve PDF path by PDF ID def get_pdf_path_by_id(pdf_id: str) -> str: logger.info(f"Searching for PDF by ID: {pdf_id}") # 1. Directly check in metadata if pdf_id in pdf_metadata: path = pdf_metadata[pdf_id] if os.path.exists(path): return path # If file was moved, try searching by filename filename = os.path.basename(path) # Check permanent directory perm_path = PERMANENT_PDF_DIR / filename if perm_path.exists(): pdf_metadata[pdf_id] = str(perm_path) save_pdf_metadata() return str(perm_path) # Check main directory main_path = PDF_DIR / filename if main_path.exists(): pdf_metadata[pdf_id] = str(main_path) save_pdf_metadata() return str(main_path) # 2. Fallback: search by partial filename try: name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id for file_path in get_pdf_files() + get_permanent_pdf_files(): file_basename = os.path.basename(file_path) if file_basename.startswith(name_part) or file_path.stem.startswith(name_part): pdf_metadata[pdf_id] = str(file_path) save_pdf_metadata() return str(file_path) except Exception as e: logger.error(f"Error searching by filename: {e}") # 3. As a last resort, compare with existing metadata for pid, path in pdf_metadata.items(): if os.path.exists(path): file_basename = os.path.basename(path) if pdf_id in pid or pid in pdf_id: pdf_metadata[pdf_id] = path save_pdf_metadata() return path return None # Initialize caching for all PDFs on startup async def init_cache_all_pdfs(): logger.info("Starting PDF caching process.") load_pdf_metadata() pdf_files = get_pdf_files() + get_permanent_pdf_files() unique_pdf_paths = set(str(p) for p in pdf_files) pdf_files = [pathlib.Path(p) for p in unique_pdf_paths] # Update metadata for all files for pdf_file in pdf_files: found = False for pid, path in pdf_metadata.items(): if os.path.basename(path) == pdf_file.name: found = True if not os.path.exists(path): pdf_metadata[pid] = str(pdf_file) break if not found: pdf_id = generate_pdf_id(pdf_file.name) pdf_metadata[pdf_id] = str(pdf_file) save_pdf_metadata() # Load existing cache for a quick start for cache_file in CACHE_DIR.glob("*_cache.json"): try: pdf_name = cache_file.stem.replace("_cache", "") with open(cache_file, "r") as f: cached_data = json.load(f) if cached_data.get("status") == "completed" and cached_data.get("pages"): pdf_cache[pdf_name] = cached_data pdf_cache[pdf_name]["status"] = "completed" logger.info(f"Loaded existing cache: {pdf_name}") except Exception as e: logger.error(f"Error loading cache file: {str(e)}") # Cache non-cached files in parallel await asyncio.gather(*[ asyncio.create_task(cache_pdf(str(pdf_file))) for pdf_file in pdf_files if pdf_file.stem not in pdf_cache or pdf_cache[pdf_file.stem].get("status") != "completed" ]) @app.on_event("startup") async def startup_event(): # Load PDF metadata load_pdf_metadata() # Create IDs for missing files for pdf_file in get_pdf_files() + get_permanent_pdf_files(): found = False for pid, path in pdf_metadata.items(): if os.path.basename(path) == pdf_file.name: found = True if not os.path.exists(path): pdf_metadata[pid] = str(pdf_file) break if not found: pdf_id = generate_pdf_id(pdf_file.name) pdf_metadata[pdf_id] = str(pdf_file) save_pdf_metadata() # Start background caching task asyncio.create_task(init_cache_all_pdfs()) # API endpoint: List PDF projects @app.get("/api/pdf-projects") async def get_pdf_projects_api(): return generate_pdf_projects() # API endpoint: List permanently stored PDF projects @app.get("/api/permanent-pdf-projects") async def get_permanent_pdf_projects_api(): pdf_files = get_permanent_pdf_files() projects_data = [] for pdf_file in pdf_files: pdf_id = None for pid, path in pdf_metadata.items(): if os.path.basename(path) == pdf_file.name: pdf_id = pid break if not pdf_id: pdf_id = generate_pdf_id(pdf_file.name) pdf_metadata[pdf_id] = str(pdf_file) save_pdf_metadata() projects_data.append({ "path": str(pdf_file), "name": pdf_file.stem, "id": pdf_id, "cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" }) return projects_data # API endpoint: Get PDF info by ID @app.get("/api/pdf-info-by-id/{pdf_id}") async def get_pdf_info_by_id(pdf_id: str): pdf_path = get_pdf_path_by_id(pdf_id) if pdf_path: pdf_file = pathlib.Path(pdf_path) return { "path": pdf_path, "name": pdf_file.stem, "id": pdf_id, "exists": True, "cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" } return {"exists": False, "error": "Could not find the specified PDF."} # API endpoint: Get PDF thumbnail (optimized) @app.get("/api/pdf-thumbnail") async def get_pdf_thumbnail(path: str): try: pdf_file = pathlib.Path(path) pdf_name = pdf_file.stem # If cached, return the thumbnail from cache if pdf_name in pdf_cache and pdf_cache[pdf_name].get("pages"): if pdf_cache[pdf_name]["pages"][0].get("thumb"): return {"thumbnail": pdf_cache[pdf_name]["pages"][0]["thumb"]} # If not cached, generate a quick thumbnail (smaller resolution) import fitz doc = fitz.open(path) if doc.page_count > 0: page = doc[0] pix = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2)) img_data = pix.tobytes("jpeg", 70) b64_img = base64.b64encode(img_data).decode('utf-8') # Start background caching asyncio.create_task(cache_pdf(path)) return {"thumbnail": f"data:image/jpeg;base64,{b64_img}"} return {"thumbnail": None} except Exception as e: logger.error(f"Error generating thumbnail: {str(e)}") return {"error": str(e), "thumbnail": None} # API endpoint: Cache status @app.get("/api/cache-status") async def get_cache_status(path: str = None): if path: pdf_file = pathlib.Path(path) pdf_name = pdf_file.stem if pdf_name in pdf_cache: return pdf_cache[pdf_name] return {"status": "not_cached"} else: return { name: {"status": info["status"], "progress": info.get("progress", 0)} for name, info in pdf_cache.items() } # API endpoint: Query PDF content with AI @app.post("/api/ai/query-pdf/{pdf_id}") async def api_query_pdf(pdf_id: str, query: Dict[str, str]): try: user_query = query.get("query", "") if not user_query: return JSONResponse(content={"error": "No question provided."}, status_code=400) pdf_path = get_pdf_path_by_id(pdf_id) if not pdf_path: return JSONResponse(content={"error": f"No file found for PDF ID {pdf_id}"}, status_code=404) result = await query_pdf(pdf_id, user_query) if "error" in result: return JSONResponse(content={"error": result["error"]}, status_code=500) return result except Exception as e: logger.error(f"Error in AI query endpoint: {e}") return JSONResponse(content={"error": str(e)}, status_code=500) # API endpoint: Summarize PDF @app.get("/api/ai/summarize-pdf/{pdf_id}") async def api_summarize_pdf(pdf_id: str): try: pdf_path = get_pdf_path_by_id(pdf_id) if not pdf_path: return JSONResponse(content={"error": f"No file found for PDF ID {pdf_id}"}, status_code=404) result = await summarize_pdf(pdf_id) if "error" in result: return JSONResponse(content={"error": result["error"]}, status_code=500) return result except Exception as e: logger.error(f"Error in PDF summary endpoint: {e}") return JSONResponse(content={"error": str(e)}, status_code=500) # API endpoint: Provide cached PDF content (progressive loading) @app.get("/api/cached-pdf") async def get_cached_pdf(path: str, background_tasks: BackgroundTasks): try: pdf_file = pathlib.Path(path) pdf_name = pdf_file.stem if pdf_name in pdf_cache: status = pdf_cache[pdf_name].get("status", "") if status == "completed": return pdf_cache[pdf_name] elif status == "processing": progress = pdf_cache[pdf_name].get("progress", 0) pages = pdf_cache[pdf_name].get("pages", []) total_pages = pdf_cache[pdf_name].get("total_pages", 0) return { "status": "processing", "progress": progress, "pages": pages, "total_pages": total_pages, "available_pages": len([p for p in pages if p and p.get("src")]) } # If no cache exists, start caching in the background background_tasks.add_task(cache_pdf, path) return {"status": "started", "progress": 0} except Exception as e: logger.error(f"Error providing cached PDF: {str(e)}") return {"error": str(e), "status": "error"} # API endpoint: Provide original PDF content (if not cached) @app.get("/api/pdf-content") async def get_pdf_content(path: str, background_tasks: BackgroundTasks): try: pdf_file = pathlib.Path(path) if not pdf_file.exists(): return JSONResponse(content={"error": f"File not found: {path}"}, status_code=404) pdf_name = pdf_file.stem # If already cached or partially cached, redirect if pdf_name in pdf_cache and ( pdf_cache[pdf_name].get("status") == "completed" or ( pdf_cache[pdf_name].get("status") == "processing" and pdf_cache[pdf_name].get("progress", 0) > 10 ) ): return JSONResponse(content={"redirect": f"/api/cached-pdf?path={path}"}) with open(path, "rb") as pdf_file_handle: content = pdf_file_handle.read() import urllib.parse filename = pdf_file.name encoded_filename = urllib.parse.quote(filename) # Start caching in the background background_tasks.add_task(cache_pdf, path) headers = { "Content-Type": "application/pdf", "Content-Disposition": f'inline; filename="{encoded_filename}"; filename*=UTF-8\'\'{encoded_filename}' } return Response(content=content, media_type="application/pdf", headers=headers) except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error loading PDF content: {str(e)}\n{error_details}") return JSONResponse(content={"error": str(e)}, status_code=500) # API endpoint: Upload PDF to permanent storage @app.post("/api/upload-pdf") async def upload_pdf(file: UploadFile = File(...)): try: if not file.filename.lower().endswith('.pdf'): return JSONResponse(content={"success": False, "message": "Only PDF files are allowed."}, status_code=400) file_path = PERMANENT_PDF_DIR / file.filename content = await file.read() with open(file_path, "wb") as buffer: buffer.write(content) # Also copy to main directory to be automatically displayed with open(PDF_DIR / file.filename, "wb") as buffer: buffer.write(content) pdf_id = generate_pdf_id(file.filename) pdf_metadata[pdf_id] = str(file_path) save_pdf_metadata() asyncio.create_task(cache_pdf(str(file_path))) return JSONResponse( content={ "success": True, "path": str(file_path), "name": file_path.stem, "id": pdf_id, "viewUrl": f"/view/{pdf_id}" }, status_code=200 ) except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error uploading PDF: {str(e)}\n{error_details}") return JSONResponse(content={"success": False, "message": str(e)}, status_code=500) # Convert text file to PDF async def convert_text_to_pdf(text_content: str, title: str) -> str: try: import re safe_title = re.sub(r'[^\w\-_\. ]', '_', title) if not safe_title: safe_title = "aibook" timestamp = int(time.time()) filename = f"{safe_title}_{timestamp}.pdf" file_path = PERMANENT_PDF_DIR / filename # Registering a Korean font. If not found, fallback to Helvetica. from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont font_path = BASE / "MaruBuri-SemiBold.ttf" font_name = "MaruBuri" if font_path.exists(): pdfmetrics.registerFont(TTFont(font_name, str(font_path))) logger.info(f"Successfully registered the Korean font: {font_path}") else: font_name = "Helvetica" logger.warning(f"Could not find the Korean font file: {font_path}. Using a default font.") pdf_buffer = io.BytesIO() from reportlab.lib.pagesizes import letter from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.enums import TA_CENTER, TA_LEFT doc = SimpleDocTemplate(pdf_buffer, pagesize=letter, encoding='utf-8') title_style = ParagraphStyle( name='CustomTitle', fontName=font_name, fontSize=18, leading=22, alignment=TA_CENTER, spaceAfter=20 ) normal_style = ParagraphStyle( name='CustomNormal', fontName=font_name, fontSize=12, leading=15, alignment=TA_LEFT, spaceBefore=6, spaceAfter=6 ) content = [] # Add title content.append(Paragraph(title, title_style)) content.append(Spacer(1, 20)) paragraphs = text_content.split('\n\n') for para in paragraphs: if para.strip(): from xml.sax.saxutils import escape safe_para = escape(para.replace('\n', '
')) p = Paragraph(safe_para, normal_style) content.append(p) content.append(Spacer(1, 10)) doc.build(content) with open(file_path, 'wb') as f: f.write(pdf_buffer.getvalue()) # Copy to main directory with open(PDF_DIR / filename, 'wb') as f: f.write(pdf_buffer.getvalue()) pdf_id = generate_pdf_id(filename) pdf_metadata[pdf_id] = str(file_path) save_pdf_metadata() asyncio.create_task(cache_pdf(str(file_path))) return { "path": str(file_path), "filename": filename, "id": pdf_id } except Exception as e: logger.error(f"Error converting text to PDF: {e}") raise e # AI-based text enhancement stub (placeholder) async def enhance_text_with_ai(text_content: str, title: str) -> str: # Currently returns the original text (AI enhancement disabled) return text_content # API endpoint: Convert uploaded text file to PDF @app.post("/api/text-to-pdf") async def text_to_pdf(file: UploadFile = File(...)): try: filename = file.filename.lower() if not (filename.endswith('.txt') or filename.endswith('.docx') or filename.endswith('.doc')): return JSONResponse( content={"success": False, "message": "Supported file formats are .txt, .docx, and .doc only."}, status_code=400 ) content = await file.read() # Extract text depending on file type if filename.endswith('.txt'): encodings = ['utf-8', 'euc-kr', 'cp949', 'latin1'] text_content = None for encoding in encodings: try: text_content = content.decode(encoding, errors='strict') logger.info(f"Detected text file encoding: {encoding}") break except UnicodeDecodeError: continue if text_content is None: text_content = content.decode('utf-8', errors='replace') logger.warning("Could not detect text file encoding; defaulting to UTF-8.") elif filename.endswith('.docx') or filename.endswith('.doc'): with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file: temp_file.write(content) temp_path = temp_file.name try: text_content = docx2txt.process(temp_path) finally: os.unlink(temp_path) title = os.path.splitext(filename)[0] # Optional AI enhancement enhanced_text = await enhance_text_with_ai(text_content, title) # Convert the final text to PDF pdf_info = await convert_text_to_pdf(enhanced_text, title) return JSONResponse( content={ "success": True, "path": pdf_info["path"], "name": os.path.splitext(pdf_info["filename"])[0], "id": pdf_info["id"], "viewUrl": f"/view/{pdf_info['id']}" }, status_code=200 ) except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error converting text to PDF: {str(e)}\n{error_details}") return JSONResponse(content={"success": False, "message": str(e)}, status_code=500) # Admin authentication endpoint @app.post("/api/admin-login") async def admin_login(password: str = Form(...)): if password == ADMIN_PASSWORD: return {"success": True} return {"success": False, "message": "Authentication failed."} # Admin: Delete PDF @app.delete("/api/admin/delete-pdf") async def delete_pdf(path: str): try: pdf_file = pathlib.Path(path) if not pdf_file.exists(): return {"success": False, "message": "File not found."} filename = pdf_file.name # Delete from permanent storage pdf_file.unlink() # Also delete from main directory if exists main_file_path = PDF_DIR / filename if main_file_path.exists(): main_file_path.unlink() # Delete related cache pdf_name = pdf_file.stem cache_path = get_cache_path(pdf_name) if cache_path.exists(): cache_path.unlink() if pdf_name in pdf_cache: del pdf_cache[pdf_name] # Remove from metadata to_remove = [] for pid, fpath in pdf_metadata.items(): if os.path.basename(fpath) == filename: to_remove.append(pid) for pid in to_remove: del pdf_metadata[pid] save_pdf_metadata() return {"success": True} except Exception as e: logger.error(f"Error deleting PDF: {str(e)}") return {"success": False, "message": str(e)} # Admin: Feature PDF (copy to main directory) @app.post("/api/admin/feature-pdf") async def feature_pdf(path: str): try: pdf_file = pathlib.Path(path) if not pdf_file.exists(): return {"success": False, "message": "File not found."} target_path = PDF_DIR / pdf_file.name shutil.copy2(pdf_file, target_path) return {"success": True} except Exception as e: logger.error(f"Error featuring PDF: {str(e)}") return {"success": False, "message": str(e)} # Admin: Unfeature PDF (remove from main directory only) @app.delete("/api/admin/unfeature-pdf") async def unfeature_pdf(path: str): try: pdf_name = pathlib.Path(path).name target_path = PDF_DIR / pdf_name if target_path.exists(): target_path.unlink() return {"success": True} except Exception as e: logger.error(f"Error unfeaturing PDF: {str(e)}") return {"success": False, "message": str(e)} @app.get("/view/{pdf_id}") async def view_pdf_by_id(pdf_id: str): pdf_path = get_pdf_path_by_id(pdf_id) if not pdf_path: # Reload metadata and retry load_pdf_metadata() pdf_path = get_pdf_path_by_id(pdf_id) if not pdf_path: # As a final fallback, try scanning all files for a match for file_path in get_pdf_files() + get_permanent_pdf_files(): name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id if file_path.stem.startswith(name_part): pdf_metadata[pdf_id] = str(file_path) save_pdf_metadata() pdf_path = str(file_path) break if not pdf_path: return HTMLResponse( content=( f"

Could not find the requested PDF

" f"

ID: {pdf_id}

Go back to home" ), status_code=404 ) # Redirect to the main page with PDF ID parameter return get_html_content(pdf_id=pdf_id) def get_html_content(pdf_id: str = None): html_path = BASE / "flipbook_template.html" content = "" if html_path.exists(): with open(html_path, "r", encoding="utf-8") as f: content = f.read() else: content = HTML # fallback if no local template if pdf_id: auto_load_script = f""" """ content = content.replace("", auto_load_script + "") return HTMLResponse(content=content) @app.get("/", response_class=HTMLResponse) async def root(request: Request, pdf_id: Optional[str] = Query(None)): if pdf_id: return RedirectResponse(url=f"/view/{pdf_id}") return get_html_content() import os HTML = os.getenv("HTML_TEMPLATE", "") if not HTML: logger.warning("HTML_TEMPLATE secret is not set. Using default HTML.") HTML = """ FlipBook Space

Could not load the HTML template

HTML_TEMPLATE secret is not configured.

Please set the HTML_TEMPLATE in your Hugging Face Space secrets.

""" if __name__ == "__main__": uvicorn.run("app:app", host="0.0.0.0", port=int(os.getenv("PORT", 7860)))