Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, Form | |
| from fastapi.responses import JSONResponse | |
| from pathlib import Path | |
| import pytesseract | |
| from PIL import Image | |
| import PyPDF2 | |
| import docx | |
| import shutil | |
| import os | |
| import io | |
| from datetime import datetime | |
| import uvicorn | |
| # Hugging Face GPT or LLM model for content-based name generation | |
| from langchain_openai import ChatOpenAI | |
| from langchain.schema import HumanMessage | |
| from fastapi.middleware.cors import CORSMiddleware | |
| app = FastAPI() | |
| # Enable CORS (you can restrict origins later) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allow all origins (less secure) | |
| allow_credentials=True, | |
| allow_methods=["*"], # Allow all HTTP methods | |
| allow_headers=["*"], # Allow all headers | |
| ) | |
| # Set up upload folder and allowed extensions | |
| UPLOAD_FOLDER = 'uploads' | |
| ALLOWED_EXTENSIONS = {'pdf', 'docx', 'txt'} | |
| MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16 MB | |
| if not os.path.exists(UPLOAD_FOLDER): | |
| os.makedirs(UPLOAD_FOLDER) | |
| # Load your OpenAI API key from environment variables | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| # Ensure the API key is correctly loaded | |
| if openai_api_key is None: | |
| raise ValueError("API key not found. Please set your OPENAI_API_KEY environment variable.") | |
| # Initialize the LLM (Language Model) with GPT-4o-mini or other available model | |
| llm = ChatOpenAI( | |
| model_name="gpt-4o-mini", # Specify the correct model name (e.g., "gpt-4" or "gpt-4o-mini") | |
| temperature=0, # Set temperature to 0 for deterministic responses (no randomness) | |
| openai_api_key=openai_api_key # Pass the OpenAI API key | |
| ) | |
| # Load the CLIP model for image feature extraction | |
| # Function to generate a more appropriate name based on content | |
| def generate_name_based_on_content(text,industry): | |
| prompt = f"Generate a meaningful file name for the following content: {text[:400]} based on the given industry {industry}" # Truncate text to first 200 characters | |
| response = llm([HumanMessage(content=prompt)]).content | |
| # Extract the generated file name and clean it | |
| file_name = response.strip() # Strip any unnecessary whitespace or characters | |
| return file_name | |
| # Allowed file extensions check | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| # Function to extract text from PDF | |
| def extract_text_from_pdf(pdf_path): | |
| text = "" | |
| with open(pdf_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| for page in reader.pages: | |
| text += page.extract_text() | |
| return text | |
| # Function to extract text from DOCX | |
| def extract_text_from_docx(docx_path): | |
| doc = docx.Document(docx_path) | |
| text = "" | |
| for para in doc.paragraphs: | |
| text += para.text | |
| return text | |
| # Function to process files | |
| def process_files(files, industry): | |
| directories = [] | |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
| for file in files: | |
| if file and allowed_file(file.filename): | |
| filename = file.filename | |
| file_path = os.path.join(UPLOAD_FOLDER, filename) | |
| with open(file_path, "wb") as buffer: | |
| buffer.write(file.file.read()) | |
| text = "" | |
| if filename.endswith('.pdf'): | |
| text = extract_text_from_pdf(file_path) | |
| elif filename.endswith('.docx'): | |
| text = extract_text_from_docx(file_path) | |
| else: | |
| print("Invalid") | |
| # Generate name based on LLM and include timestamp for uniqueness | |
| content_name = generate_name_based_on_content(text,industry) if text else 'Untitled' | |
| #directory_name = f"{industry}_{content_name}_{timestamp}" | |
| #new_dir = os.path.join(UPLOAD_FOLDER, directory_name) | |
| #if not os.path.exists(new_dir): | |
| # os.makedirs(new_dir) | |
| # Rename and move the file to the new directory | |
| #new_file_path = os.path.join(new_dir, f"{directory_name}_{filename}") | |
| #shutil.move(file_path, new_file_path) | |
| directories.append(content_name) | |
| return directories | |
| async def upload_files(industry: str = Form(...), files: list[UploadFile] = File(...)): | |
| if not industry: | |
| return JSONResponse(content={"message": "Industry is required."}, status_code=400) | |
| if not files: | |
| return JSONResponse(content={"message": "No files selected."}, status_code=400) | |
| directories = process_files(files, industry) | |
| return JSONResponse(content={"message": "Files successfully uploaded and organized.", "directories": directories}) | |
| if __name__ == "__main__": | |
| uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) | |