Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, File, UploadFile, Form, BackgroundTasks | |
| from pydantic import BaseModel | |
| from typing import List | |
| from pathlib import Path | |
| import shutil | |
| import tempfile | |
| import os | |
| import uuid | |
| from langchain_docling import DoclingLoader | |
| from langchain_docling.loader import ExportType | |
| from job_samples import job_list | |
| from ranker import rank_resume, rank_resume_multi | |
| from embeddings import rank_jobs, rank_jobs_multi | |
| from database import Base, engine | |
| Base.metadata.create_all(bind=engine) | |
| app = FastAPI() | |
| resumes = [] | |
| jobs = [{ | |
| "id":str(uuid.uuid4()), | |
| "metadata":{"source":"built-in text"}, | |
| "page_content":x | |
| } for x in job_list] | |
| scoring = {} | |
| UPLOAD_DIR = Path("uploads") | |
| UPLOAD_DIR.mkdir(exist_ok=True) | |
| async def upload_file(file: UploadFile = File(...), type: str = Form(...), task: BackgroundTasks = None): | |
| # print(file) | |
| # file_path = Path(file.filename) | |
| # with file_path.open("wb") as buffer: | |
| # shutil.copyfileobj(file.file, buffer) | |
| # with tempfile.NamedTemporaryFile(delete=False, suffix=file.filename) as temp_file: | |
| # # Efficiently write the uploaded file's content to the temporary file | |
| # contents = await file.read() | |
| # temp_file.write(contents) | |
| # temp_file_path = temp_file.name | |
| suffix = os.path.splitext(file.filename)[-1] or ".pdf" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, dir="/tmp") as tmp: | |
| shutil.copyfileobj(file.file, tmp) | |
| tmp_path = tmp.name | |
| # At this point, tmp_path is a real file path in /tmp | |
| # Debug: check if file is valid | |
| size = os.path.getsize(tmp_path) | |
| print(f"Saved {file.filename} -> {tmp_path} ({size} bytes)") | |
| print("[TMP PATH]", str(tmp_path)) | |
| loader = DoclingLoader(file_path="" + str(tmp_path), export_type=ExportType.MARKDOWN) | |
| docs = loader.load() | |
| # docs = docs.model_dump() | |
| result = docs[0].model_dump() | |
| result["id"] = str(uuid.uuid4()) | |
| if type == "resume": | |
| resumes.append(result) | |
| elif type == "job": | |
| jobs.append(result) | |
| task.add_task(process_scoring) | |
| return { | |
| "code":201, | |
| "message":"Request was successful.", | |
| "data": result | |
| } | |
| def get_jobs(): | |
| return { | |
| "code":200, | |
| "message":"Request was successful.", | |
| "data": jobs | |
| } | |
| def get_resumes(): | |
| return { | |
| "code":200, | |
| "message":"Request was successful.", | |
| "data": resumes | |
| } | |
| def process_scoring(): | |
| # score_resume_ids = [x["resume_id"] for x in scoring] | |
| # score_job_ids = [x["job_id"] for x in scoring | |
| # score_resume_ids = [x.split("_")[0] for x in scoring.keys()] | |
| # score_job_ids = [x.split("_")[1] for x in scoring.keys()] | |
| # scoring_keys = scoring.keys() | |
| # scs = {"resume_ids":[], "job_ids":[]} | |
| for resume in resumes: | |
| for job in jobs: | |
| sc = f"{resume['id']}_{job['id']}" | |
| # scs.append({"resume_id"}) | |
| # scs["resume_ids"].append(resume) | |
| # scs["job_ids"].append(job['id']) | |
| if sc not in scoring.keys(): | |
| rank_score = process_input(job["page_content"], [resume["page_content"]]) | |
| suggest_score = process_input_suggestion(resume["page_content"], [job["page_content"]]) | |
| scoring[sc] = { | |
| "resume_id":resume["id"], | |
| "job_id":job["id"], | |
| "rank_score":rank_score[0], | |
| "suggestion_score":suggest_score[0] | |
| } | |
| # for resume in resumes: | |
| # if resume["id"] not in score_resume_ids: | |
| # # rank_score = process_input(job["page_content"], [resume["page_content"]]) | |
| # suggest_score = process_input_suggestion(resume["page_content"], [job["page_content"] for job in jobs]) | |
| # for i,job in enumerate(jobs): | |
| # if not scoring.get(f"{resume['id']}_{job['id']}"): | |
| # scoring[f"{resume['id']}_{job['id']}"] = {} | |
| # scoring[f"{resume['id']}_{job['id']}"].update({ | |
| # "resume_id":resume["id"], | |
| # "job_id":job["id"], | |
| # # "rank_score":rank_score[0], | |
| # "suggestion_score":suggest_score[i] | |
| # }) | |
| # for job in jobs: | |
| # if job["id"] not in score_job_ids: | |
| # rank_score = process_input(job["page_content"], [resume["page_content"] for resume in resumes]) | |
| # for i,resume in enumerate(resumes): | |
| # if not scoring.get(f"{resume['id']}_{job['id']}"): | |
| # scoring[f"{resume['id']}_{job['id']}"] = {} | |
| # scoring[f"{resume['id']}_{job['id']}"].update({ | |
| # "resume_id":resume["id"], | |
| # "job_id":job["id"], | |
| # "rank_score":rank_score[i], | |
| # # "suggestion_score":suggest_score[0] | |
| # }) | |
| async def get_scoring(): | |
| # resume_ids = [x["id"] for x in resumes] | |
| # job_ids = [x["id"] for x in jobs] | |
| # scoring = await process_scoring() | |
| return { | |
| "code":200, | |
| "message":"Request was successful.", | |
| "data": list(scoring.values()) | |
| } | |
| # class InputResume(BaseModel): | |
| # content: str | |
| # @app.post("/suggest/") | |
| # async def suggestion(data: InputResume): | |
| # return { | |
| # "code":201, | |
| # "message":"Request was successful.", | |
| # "data": InputResume.model_dump_json() | |
| # } | |
| # Function to wrap the existing rank_resume | |
| def process_input(job_description, resumes): | |
| print("[JOB DESC]", job_description) | |
| print("[RESUMES]", resumes) | |
| resumes = [r for r in resumes if r and r.strip() != ""] # Remove empty | |
| if not job_description.strip() or not resumes: | |
| return "Please provide both job description and at least one resume." | |
| return rank_resume(job_description, resumes)[1] | |
| def process_input_suggestion(resume, job_descriptions): | |
| # print("[JOB DESC]", job_description) | |
| # print("[RESUMES]", resumes) | |
| # resumes = [r for r in resumes if r and r.strip() != ""] # Remove empty | |
| # if not job_description.strip() or not resumes: | |
| # return "Please provide both resume and at least one job description." | |
| return rank_jobs(job_descriptions, resume)[1] | |
| # results = zip(*rank_jobs(resumes, job_description)) | |
| # formatted_output = "" | |
| # for i, (resume, score) in enumerate(results, 1): | |
| # formatted_output += f"Job #{i}:\nScore: {score:.2f}\nJob Description Snippet: {resume[:200]}...\n\n-------\n\n" | |
| # return formatted_output | |
| app.get("/") | |
| def read_root(): | |
| return {"message": "Hello, World!"} | |
| class InputData(BaseModel): | |
| resumes: List[str] | |
| job_description: str | |
| class InputData2(BaseModel): | |
| job_descriptions: List[str] | |
| resume: str | |
| class InputData3(BaseModel): | |
| content: str | |
| type: str | |
| async def process_data(data: InputData): | |
| return dict(scores=process_input(data.job_description, data.resumes)) | |
| async def suggestion(data: InputData2): | |
| return { | |
| "scores":process_input_suggestion(data.resume, data.job_descriptions) | |
| } | |
| async def add_content(data: InputData3, task: BackgroundTasks): | |
| result = { | |
| "id":str(uuid.uuid4()), | |
| "page_content":data.content, | |
| "metadata":{"source":"form input"} | |
| } | |
| if data.type == "job": | |
| jobs.append(result) | |
| elif data.type == "resume": | |
| resumes.append(result) | |
| task.add_task(process_scoring) | |
| return { | |
| "code":201, | |
| "message":"Request was successful.", | |
| "data": result | |
| } | |
| # return { | |
| # "scores":process_input_suggestion(data.resume, data.job_descriptions) | |
| # } | |