from fastapi import FastAPI, File, UploadFile, Form, BackgroundTasks from pydantic import BaseModel from typing import List from pathlib import Path import shutil import tempfile import os import uuid from langchain_docling import DoclingLoader from langchain_docling.loader import ExportType from job_samples import job_list from ranker import rank_resume, rank_resume_multi from embeddings import rank_jobs, rank_jobs_multi from database import Base, engine Base.metadata.create_all(bind=engine) app = FastAPI() resumes = [] jobs = [{ "id":str(uuid.uuid4()), "metadata":{"source":"built-in text"}, "page_content":x } for x in job_list] scoring = {} UPLOAD_DIR = Path("uploads") UPLOAD_DIR.mkdir(exist_ok=True) @app.post("/upload") async def upload_file(file: UploadFile = File(...), type: str = Form(...), task: BackgroundTasks = None): # print(file) # file_path = Path(file.filename) # with file_path.open("wb") as buffer: # shutil.copyfileobj(file.file, buffer) # with tempfile.NamedTemporaryFile(delete=False, suffix=file.filename) as temp_file: # # Efficiently write the uploaded file's content to the temporary file # contents = await file.read() # temp_file.write(contents) # temp_file_path = temp_file.name suffix = os.path.splitext(file.filename)[-1] or ".pdf" with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, dir="/tmp") as tmp: shutil.copyfileobj(file.file, tmp) tmp_path = tmp.name # At this point, tmp_path is a real file path in /tmp # Debug: check if file is valid size = os.path.getsize(tmp_path) print(f"Saved {file.filename} -> {tmp_path} ({size} bytes)") print("[TMP PATH]", str(tmp_path)) loader = DoclingLoader(file_path="" + str(tmp_path), export_type=ExportType.MARKDOWN) docs = loader.load() # docs = docs.model_dump() result = docs[0].model_dump() result["id"] = str(uuid.uuid4()) if type == "resume": resumes.append(result) elif type == "job": jobs.append(result) task.add_task(process_scoring) return { "code":201, "message":"Request was successful.", "data": result } @app.get("/jobs") def get_jobs(): return { "code":200, "message":"Request was successful.", "data": jobs } @app.get("/resumes") def get_resumes(): return { "code":200, "message":"Request was successful.", "data": resumes } def process_scoring(): # score_resume_ids = [x["resume_id"] for x in scoring] # score_job_ids = [x["job_id"] for x in scoring # score_resume_ids = [x.split("_")[0] for x in scoring.keys()] # score_job_ids = [x.split("_")[1] for x in scoring.keys()] # scoring_keys = scoring.keys() # scs = {"resume_ids":[], "job_ids":[]} for resume in resumes: for job in jobs: sc = f"{resume['id']}_{job['id']}" # scs.append({"resume_id"}) # scs["resume_ids"].append(resume) # scs["job_ids"].append(job['id']) if sc not in scoring.keys(): rank_score = process_input(job["page_content"], [resume["page_content"]]) suggest_score = process_input_suggestion(resume["page_content"], [job["page_content"]]) scoring[sc] = { "resume_id":resume["id"], "job_id":job["id"], "rank_score":rank_score[0], "suggestion_score":suggest_score[0] } # for resume in resumes: # if resume["id"] not in score_resume_ids: # # rank_score = process_input(job["page_content"], [resume["page_content"]]) # suggest_score = process_input_suggestion(resume["page_content"], [job["page_content"] for job in jobs]) # for i,job in enumerate(jobs): # if not scoring.get(f"{resume['id']}_{job['id']}"): # scoring[f"{resume['id']}_{job['id']}"] = {} # scoring[f"{resume['id']}_{job['id']}"].update({ # "resume_id":resume["id"], # "job_id":job["id"], # # "rank_score":rank_score[0], # "suggestion_score":suggest_score[i] # }) # for job in jobs: # if job["id"] not in score_job_ids: # rank_score = process_input(job["page_content"], [resume["page_content"] for resume in resumes]) # for i,resume in enumerate(resumes): # if not scoring.get(f"{resume['id']}_{job['id']}"): # scoring[f"{resume['id']}_{job['id']}"] = {} # scoring[f"{resume['id']}_{job['id']}"].update({ # "resume_id":resume["id"], # "job_id":job["id"], # "rank_score":rank_score[i], # # "suggestion_score":suggest_score[0] # }) @app.get("/scoring") async def get_scoring(): # resume_ids = [x["id"] for x in resumes] # job_ids = [x["id"] for x in jobs] # scoring = await process_scoring() return { "code":200, "message":"Request was successful.", "data": list(scoring.values()) } # class InputResume(BaseModel): # content: str # @app.post("/suggest/") # async def suggestion(data: InputResume): # return { # "code":201, # "message":"Request was successful.", # "data": InputResume.model_dump_json() # } # Function to wrap the existing rank_resume def process_input(job_description, resumes): print("[JOB DESC]", job_description) print("[RESUMES]", resumes) resumes = [r for r in resumes if r and r.strip() != ""] # Remove empty if not job_description.strip() or not resumes: return "Please provide both job description and at least one resume." return rank_resume(job_description, resumes)[1] def process_input_suggestion(resume, job_descriptions): # print("[JOB DESC]", job_description) # print("[RESUMES]", resumes) # resumes = [r for r in resumes if r and r.strip() != ""] # Remove empty # if not job_description.strip() or not resumes: # return "Please provide both resume and at least one job description." return rank_jobs(job_descriptions, resume)[1] # results = zip(*rank_jobs(resumes, job_description)) # formatted_output = "" # for i, (resume, score) in enumerate(results, 1): # formatted_output += f"Job #{i}:\nScore: {score:.2f}\nJob Description Snippet: {resume[:200]}...\n\n-------\n\n" # return formatted_output app.get("/") def read_root(): return {"message": "Hello, World!"} class InputData(BaseModel): resumes: List[str] job_description: str class InputData2(BaseModel): job_descriptions: List[str] resume: str class InputData3(BaseModel): content: str type: str @app.post("/rank/") async def process_data(data: InputData): return dict(scores=process_input(data.job_description, data.resumes)) @app.post("/suggest/") async def suggestion(data: InputData2): return { "scores":process_input_suggestion(data.resume, data.job_descriptions) } @app.post("/add_content") async def add_content(data: InputData3, task: BackgroundTasks): result = { "id":str(uuid.uuid4()), "page_content":data.content, "metadata":{"source":"form input"} } if data.type == "job": jobs.append(result) elif data.type == "resume": resumes.append(result) task.add_task(process_scoring) return { "code":201, "message":"Request was successful.", "data": result } # return { # "scores":process_input_suggestion(data.resume, data.job_descriptions) # }