CandidateExplorer / services /agentic /profile_scoring.py
ishaq101's picture
[NOTICKET] Add field criteria_id in scoring v2 endpoitn
fc8eb3c
import traceback
from uuid import uuid4
from typing import List
from langchain_core.prompts import ChatPromptTemplate
from config.constant import ProfileFieldTypes
# from externals.databases.pg_crud import get_criteria_id, create_cv_filter, create_cv_matching, get_matching_id, create_cv_score, get_scoring_id
from externals.databases.pg_models import CVProfile, CVWeight, CVFilter, CVScore, CVMatching
from services.llms.LLM import model_4o_2, model_5_1
# from services.base.BaseGenerator import BaseAIGenerator, MetadataObservability
from services.base.BaseGenerator_v2 import BaseAIGenerator, MetadataObservability
from services.models.data_model import AIProfile
from services.models.data_model import (
AIMatchProfile,
Criteria,
CriteriaWeight,
InputScoringBulk,
AIProfileTbScore,
LOGIC_NUMERIC,
LOGIC_CATEGORICAL,
# InputScoring,
# DataResponseMatchOne,
# ResponseMatchOne,
# InputScoringBulk,
# DataResponseMatchBulk,
# ResponseMatchBulk
)
# from services.knowledge.knowledge_setup import KnowledgeService
from services.agentic.weight import AgenticWeightService
from services.agentic.filter import AgenticFilterService
from services.knowledge.get_profile import KnowledgeGetProfileService
from sqlalchemy.ext.asyncio import AsyncSession
from externals.databases.pg_crud import get_profiles, create_matchings, create_scores, get_profiles_by_criteria_id, get_weight_by_id, get_profiles_by_user_id
from utils.logger import get_logger
logger = get_logger("profile scoring")
def helper_get_operator(col_name: str):
if col_name in ProfileFieldTypes.NUMERIC:
op:LOGIC_NUMERIC = "greater than or equal"
return op
elif col_name in ProfileFieldTypes.TEXT:
op:LOGIC_CATEGORICAL = "similar"
return op
def helper_judge_scoring(a_profile, b_criteria, rules) -> bool:
try:
if rules == "greater than":
return int(a_profile > b_criteria)
elif rules == "less than":
return int(a_profile < b_criteria)
elif rules == "equal":
return int(a_profile == b_criteria)
elif rules == "greater than or equal":
return int(a_profile >= b_criteria)
elif rules == "less than or equal":
return int(a_profile <= b_criteria)
return 0
except Exception as E:
logger.error(f"❌ error in helper_judge_scoring: {E}")
logger.error(f"a_profile={a_profile}, b_criteria={b_criteria}")
return 0
# @deprecated
# def comparison_parser(input_scoring: InputScoring):
# comparison = "| parameter | candidate_profile | criteria | true_if_rules | draft_matching_score |"
# comparison += "\n| --- | --- | --- | --- | --- |"
# criteria = input_scoring.get("criteria")
# excluded = ["criteria_id"]
# for k, v in criteria.items():
# if v and not k.startswith("w_") and k not in excluded:
# op = helper_get_operator(k)
# if 'gpa' in k or 'yoe' in k:
# # logger.info(f"👁️ a_profile={input_scoring.get(k)} | b_criteria={v}")
# judges = helper_judge_scoring(a_profile=input_scoring.get(k),
# b_criteria=v,
# rules=op)
# else:
# judges = '???'
# comparison += f"\n| {k} | {input_scoring.get(k)} | {v} | {op} | {judges} |"
# return comparison
def comparison_parser(profile:AIProfileTbScore, criteria:Criteria):
comparison = "| parameter | candidate_profile | job criteria | true_if_rules | draft_matching_score |"
comparison += "\n| --- | --- | --- | --- | --- |"
for k, v in criteria.items():
# print(f" key comparison: {k}")
op = helper_get_operator(k)
if ('gpa' in k or 'yoe' in k) and (profile.get(k) is not None) and (v is not None):
judges = helper_judge_scoring(a_profile=profile.get(k),
b_criteria=v,
rules=op)
comparison += f"\n| {k} | {profile.get(k)} | {v} | {op} | {judges} |"
else:
judges = '???'
comparison += f"\n| {k} | {profile.get(k)} | {v} | related | False |"
return comparison
# async def match_one_profile(input_scoring: InputScoring) -> ResponseMatchOne:
# """Matching between one profile to a criteria"""
# try:
# # 1. CHECKING CRITERIA EXISTED
# logger.info("👁️ 1. CHECKING CRITERIA EXISTED")
# status_criteria_id = await get_criteria_id(input_scoring.get("criteria")) # pg crud
# criteria_id = status_criteria_id.get("criteria_id")
# if status_criteria_id.get("status") == "existing":
# # just get the criteria_id
# pass
# elif status_criteria_id.get("status") == "new":
# # get the criteria_id and insert the criteria
# status_insert = await create_cv_filter(
# {
# **input_scoring.get("criteria"),
# "criteria_id": criteria_id
# }
# )
# if status_insert.get("status") == "success":
# logger.info(f"""status_insert: {status_insert}""")
# else:
# logger.error(f"""❌ error when create_cv_filter. {create_cv_filter.get("message")}""")
# return ResponseMatchOne(
# status=create_cv_filter.get("status"),
# message=create_cv_filter.get("message"),
# data=DataResponseMatchOne(
# profile_id=input_scoring.get("profile_id"),
# )
# )
# else:
# logger.error(f"""❌ error when get_criteria_id. {get_criteria_id.get("message")}""")
# return ResponseMatchOne(
# status=get_criteria_id.get("status"),
# message=get_criteria_id.get("message"),
# data=DataResponseMatchOne(
# profile_id=input_scoring.get("profile_id"),
# )
# )
# # 2. CHECKING MATCHING EXISTED
# logger.info("👁️ 2. CHECKING MATCHING EXISTED")
# status_matching_id = await get_matching_id(profile_id=input_scoring.get("profile_id"),
# criteria_id=criteria_id)
# matching_id = status_matching_id.get("matching_id")
# # logger.info(f"👁️ status_matching_id: {status_matching_id}")
# if status_matching_id.get("status") == "existing":
# pass
# elif status_matching_id.get("status") == "new":
# matching_result = await ai_matching(input_scoring=input_scoring)
# # logger.info(f"👁️ matching_result: {matching_result}")
# status_insert_matching = await create_cv_matching(
# profile_id = input_scoring.get("profile_id"),
# criteria_id = criteria_id,
# matching_id = matching_id,
# **matching_result
# )
# # logger.info(f"👁️ status_insert_matching: {status_insert_matching}")
# if status_insert_matching.get("status") == "success":
# # logger.info(f"👁️ matching_result: {matching_result}")
# # logger.info(f"""👁️ criteria_weight: {input_scoring.get("criteria_weight")}""")
# score = helper_calculate_score(comparison_result=matching_result,
# criteria_weight=input_scoring.get("criteria_weight"))
# status_scoring_id = await get_scoring_id(matching_id=matching_id)
# scoring_id = status_scoring_id.get("scoring_id")
# if status_scoring_id.get("status") in ["existing", "new"]:
# status_insert_score = await create_cv_score(scoring_id=scoring_id,
# matching_id=matching_id,
# score=score,
# **input_scoring.get("criteria_weight")
# )
# if status_insert_score.get("status") == "success":
# response = ResponseMatchOne(
# status="success",
# message="success",
# data=DataResponseMatchOne(
# profile_id=input_scoring.get("profile_id"),
# criteria_id=criteria_id,
# matching_id=matching_id,
# scoring_id=scoring_id,
# score=score,
# )
# )
# return response
# else:
# return ResponseMatchOne(
# status=status_insert_score.get("status"),
# message=status_insert_score.get("message"),
# data=DataResponseMatchOne(
# profile_id=input_scoring.get("profile_id"),
# criteria_id=criteria_id,
# matching_id=matching_id,
# scoring_id=scoring_id,
# score=score,
# )
# )
# else:
# return ResponseMatchOne(
# status=status_scoring_id.get("status"),
# message=status_scoring_id.get("message"),
# data=DataResponseMatchOne(
# profile_id=input_scoring.get("profile_id"),
# criteria_id=criteria_id,
# matching_id=matching_id,
# )
# )
# else:
# logger.error(f"""❌ error when create_cv_matching. {status_matching_id.get("message")}""")
# return ResponseMatchOne(
# status=status_matching_id.get("status"),
# message=status_matching_id.get("message"),
# data=DataResponseMatchOne(
# profile_id=input_scoring.get("profile_id"),
# criteria_id=criteria_id,
# )
# )
# else:
# logger.error(f"""❌ error when get_matching_id. {get_matching_id.get("message")}""")
# return ResponseMatchOne(
# status=get_matching_id.get("status"),
# message=get_matching_id.get("message"),
# data=DataResponseMatchOne(
# profile_id=input_scoring.get("profile_id"),
# )
# )
# except Exception as E:
# logger.error(f"❌ match one profile error, {E}")
# response = ResponseMatchOne(
# status="failed",
# message=f"match_one_profile error, {E}",
# data=DataResponseMatchOne(
# profile_id=input_scoring.get("profile_id"),
# )
# )
# return response
# async def match_bulk_profile(input_scorings: InputScoringBulk,
# knowledge_service:KnowledgeService):
# """Generate list of profile matching """
# try:
# status_ids = {}
# # 1. GET PROFILE GENERATOR FOR EACH PROFILE_ID
# logger.info("👁️ 1. GET PROFILE GENERATORS")
# profile_generators = knowledge_service.profile.get_profiles_generator(input_scorings.get("profile_ids"))
# # 2. CHECKING CRITERIA EXISTED
# criteria_id = ""
# logger.info("👁️ 2. CHECKING CRITERIA EXISTED")
# status_criteria_id = await get_criteria_id(input_scorings.get("criteria")) # pg crud
# criteria_id = status_criteria_id.get("criteria_id")
# if status_criteria_id.get("status") == "existing":
# # just get the criteria_id
# pass
# elif status_criteria_id.get("status") == "new":
# # get the criteria_id and insert the criteria
# status_insert = await create_cv_filter(
# {
# **input_scorings.get("criteria"),
# "criteria_id": criteria_id
# }
# )
# if status_insert.get("status") == "success":
# logger.info(f"""status_insert: {status_insert}""")
# else:
# logger.error(f"""❌ error when create_cv_filter. {create_cv_filter.get("message")}""")
# return ResponseMatchBulk(
# status=create_cv_filter.get("status"),
# message=create_cv_filter.get("message"),
# )
# else:
# logger.error(f"""❌ error when get_criteria_id. {get_criteria_id.get("message")}""")
# return ResponseMatchBulk(
# status=get_criteria_id.get("status"),
# message=get_criteria_id.get("message"),
# )
# # 3. CHECKING MATCHING EXISTED
# logger.info("👁️ 3. CHECKING MATCHING EXISTED")
# for profile_id, profile_gen in profile_generators.items():
# status_matching_id = await get_matching_id(profile_id=profile_id,
# criteria_id=criteria_id)
# matching_id = status_matching_id.get("matching_id")
# if status_matching_id.get("status") == "existing":
# pass
# elif status_matching_id.get("status") == "new":
# profile = await anext(profile_gen)
# input_scoring = InputScoring(
# profile_id=profile_id,
# criteria=input_scorings.get("criteria"),
# criteria_weight=input_scorings.get("criteria_weight"),
# **profile,
# )
# matching_result = await ai_matching(input_scoring=input_scoring)
# status_insert_matching = await create_cv_matching(
# profile_id = input_scoring.get("profile_id"),
# criteria_id = criteria_id,
# matching_id = matching_id,
# **matching_result
# )
# # logger.info(f"👁️ status_insert_matching: {status_insert_matching}")
# if status_insert_matching.get("status") == "success":
# # logger.info(f"👁️ matching_result: {matching_result}")
# # logger.info(f"""👁️ criteria_weight: {input_scoring.get("criteria_weight")}""")
# score = helper_calculate_score(comparison_result=matching_result,
# criteria_weight=input_scoring.get("criteria_weight"))
# status_scoring_id = await get_scoring_id(matching_id=matching_id)
# scoring_id = status_scoring_id.get("scoring_id")
# if status_scoring_id.get("status") in ["existing", "new"]:
# status_insert_score = await create_cv_score(scoring_id=scoring_id,
# matching_id=matching_id,
# score=score,
# **input_scoring.get("criteria_weight")
# )
# if status_insert_score.get("status") == "success":
# status_ids.append({profile_id: "success"})
# else:
# status_ids.append({profile_id: "failed, insert score data"})
# else:
# status_ids.append({profile_id: "failed, get scoring id"})
# else:
# status_ids.append({profile_id: "failed, insert matching data"})
# else:
# status_ids.append({profile_id: "failed, get matching id"})
# n_success = sum([1 if v == "success" else 0 for k, v in status_ids.items()])
# if len(status_ids) == n_success:
# status = "success"
# elif n_success == 0:
# status = "failed"
# else:
# status = "partial-success"
# return ResponseMatchBulk(
# status=status,
# message=f"total success {n_success} from {len(status_ids)} profile_id",
# data=DataResponseMatchBulk(
# status_ids=status_ids,
# criteria_id=criteria_id
# )
# )
# except Exception as E:
# logger.error(f"❌ match bulk profile error, {E}")
# return ResponseMatchBulk(
# status="failed",
# message=f"match bulk profile error, {E}",
# )
logger = get_logger("weight agentic service")
class AgenticScoringService:
def __init__(self, db: AsyncSession, user):
self.db = db
self.user = user
self.criteria_service = AgenticFilterService(db=db, user=user)
self.weight_service = AgenticWeightService(db=db, user=user)
self.profile_service = KnowledgeGetProfileService(db=db, user=user)
async def _ai_matching(self, profile:AIProfileTbScore, criteria:Criteria) -> AIMatchProfile:
"Helper function to matching between a profile and criteria"
comparison_text = comparison_parser(profile=profile, criteria=criteria)
# logger.info(f"👁️ comparison_text: {comparison_text}")
match_one_prompt = """
**Goals:**
Determine if the 'candidate profile' is match to 'criteria' for each parameter based on 'true if rules'. \
**Framework matching**:
1. Give 1 if 'candidate profile' results true based on 'true if rules' and 'criteria', else give 0.
2. Beware of typo, please consider further for logical operator "similar".
3. Under column draft_matching_score, there is sample of how to giving the score.
Here is table comparison between candidate profile and lookup criteria:
{comparison}
""".strip()
prompt = ChatPromptTemplate.from_template(match_one_prompt)
input_llm = {
"comparison": comparison_text
}
# llm = model_4o_2.with_structured_output(AIMatchProfile)
llm = model_5_1.with_structured_output(AIMatchProfile)
gen_ai = BaseAIGenerator(
task_name="ai matching",
prompt=prompt,
llm=llm,
input_llm=input_llm,
metadata_observability=MetadataObservability(
fullname=self.user.full_name,
task_id=str(uuid4()),
agent=self._ai_matching.__name__,
user_id=self.user.email
)
)
matching_result = await gen_ai.agenerate()
return matching_result
async def _ai_matching_bulk(self,
profiles:List[AIProfileTbScore],
criteria:Criteria) -> List:
try:
results = []
for i, p in enumerate(profiles):
tmp_matching:AIMatchProfile = await self._ai_matching(profile=p, criteria=criteria)
print(f"==> {i+1} profile: {p} vs criteria: {criteria}")
logger.info(f">>> _ai_matching_bulk/tmp_matching: {tmp_matching}")
results.append(tmp_matching)
return results
except Exception as E:
logger.error(f"❌ error in _ai_matching_bulk: {E}")
raise
def _calculate_score(self, match_result:AIMatchProfile, weight_data:CriteriaWeight) -> float:
"""Returns matching score (float) with min-max is 0-100.0
Args:
comparison_result (dict): key-value of criteria and boolean of matching profiles. Example {"gpa_edu_1": 1, "univ_edu_1": 0} this means gpa_edu_1 between candidate profile and criteria is match, else not match.
criteria_weight (dict): kev-value of criteria and its weight, all weight sum must be in range 0.0-1.0.
Returns:
score (float)
"""
score = 0
total_weight = 0
norm_criteria = {} # weight
# weight_data = self.weight_service.get_weight_by_weight_id(weight_id)
for k, v in weight_data.items():
total_weight += v
# logger.info(f"👁️ helper_calculate_score/total_weight: {total_weight}")
if total_weight > 1.0:
# normalized weight
for k, v in weight_data.items():
norm_criteria[k] = v/total_weight
else:
norm_criteria = weight_data.copy()
# logger.info(f"👁️ helper_calculate_score/norm_criteria: {norm_criteria}")
match_dict = {
column.name: getattr(match_result, column.name)
for column in match_result.__table__.columns
}
for k, value_comparison in match_dict.items():
if k in norm_criteria and value_comparison:
temp_score = norm_criteria[k] * value_comparison * 100
score += temp_score
# logger.info(f"👁️ helper_calculate_score/score: {score}")
return score
def _calculate_score_bulk(self, match_results:List[CVMatching], weight_data:CriteriaWeight) -> List[CVScore]:
scores = []
for i, match_result in enumerate(match_results):
temp = CVScore()
temp.profile_id = match_result.profile_id
temp.matching_id = match_result.matching_id
temp.score = self._calculate_score(match_result=match_result, weight_data=weight_data)
match_result_dict = {c.name: getattr(match_result, c.name) for c in CVMatching.__table__.columns}
print(f"{i+1} match_result: {match_result_dict} vs weight_data: {weight_data}")
print(f"temp.score: {temp.score}")
scores.append(temp)
return scores
async def scoring(self, weight_id: str):
try:
# Get profile data all
# all_profiles = await get_profiles(self.db)
weight = await get_weight_by_id(self.db, weight_id=weight_id)
all_profiles = await get_profiles_by_criteria_id(db=self.db, criteria_id=weight.criteria_id, current_user=self.user)
print(f"🫡 Found {len(all_profiles)} profiles to be scored")
_weight:CVWeight = await self.weight_service.get_weight_by_weight_id(weight_id=weight_id)
# print(f"Found weight: {_weight}")
# print(f"--> criteria id: {_weight.criteria_id}")
_criteria:CVFilter = await self.criteria_service.get_filter_by_id(criteria_id=_weight.criteria_id)
all_tobe_scored = []
for p in all_profiles:
tmp_profile = AIProfile(
fullname=p.fullname,
gpa_edu_1=p.gpa_edu_1,
univ_edu_1=p.univ_edu_1,
major_edu_1=p.major_edu_1,
gpa_edu_2=p.gpa_edu_2,
univ_edu_2=p.univ_edu_2,
major_edu_2=p.major_edu_2,
gpa_edu_3=p.gpa_edu_3,
univ_edu_3=p.univ_edu_3,
major_edu_3=p.major_edu_3,
domicile=p.domicile,
yoe=p.yoe,
hardskills=p.hardskills,
softskills=p.softskills,
certifications=p.certifications,
business_domain=p.business_domain
)
all_tobe_scored.append(tmp_profile)
criteria = Criteria(
gpa_edu_1=_criteria.gpa_edu_1,
univ_edu_1=_criteria.univ_edu_1,
major_edu_1=_criteria.major_edu_1,
gpa_edu_2=_criteria.gpa_edu_2,
univ_edu_2=_criteria.univ_edu_2,
major_edu_2=_criteria.major_edu_2,
gpa_edu_3=_criteria.gpa_edu_3,
univ_edu_3=_criteria.univ_edu_3,
major_edu_3=_criteria.major_edu_3,
domicile=_criteria.domicile,
yoe=_criteria.yoe,
hardskills=_criteria.hardskills,
softskills=_criteria.softskills,
certifications=_criteria.certifications,
business_domain=_criteria.business_domain
)
weight = CriteriaWeight(
gpa_edu_1=_weight.gpa_edu_1,
univ_edu_1=_weight.univ_edu_1,
major_edu_1=_weight.major_edu_1,
gpa_edu_2=_weight.gpa_edu_2,
univ_edu_2=_weight.univ_edu_2,
major_edu_2=_weight.major_edu_2,
gpa_edu_3=_weight.gpa_edu_3,
univ_edu_3=_weight.univ_edu_3,
major_edu_3=_weight.major_edu_3,
domicile=_weight.domicile,
yoe=_weight.yoe,
hardskills=_weight.hardskills,
softskills=_weight.softskills,
certifications=_weight.certifications,
business_domain=_weight.business_domain
)
match_results:List[AIMatchProfile] = await self._ai_matching_bulk(all_tobe_scored, criteria)
# match_results kurang profile_id dan criteria id
match_results = [{"profile_id": p.profile_id, "weight_id": weight_id, **match_results[i]} for i, p in enumerate(all_profiles)]
# Insert Match Result to DB
matchings = await create_matchings(self.db, match_results)
score_results = self._calculate_score_bulk(match_results=matchings, weight_data=weight)
# Insert Score Result to DB
scores = await create_scores(self.db, score_results)
return scores
except Exception as E:
logger.error(f"profile scoring error, {E}")
traceback.print_exc()
raise
async def scoring_v2(self, requirements: InputScoringBulk):
try:
# Create criteria_id
filter_svc = AgenticFilterService(db=self.db, user=self.user)
criteria = requirements.get("criteria")
weight = requirements.get("criteria_weight")
criteria_id = await filter_svc.create_filter_v2(filter=criteria)
weight_svc = AgenticWeightService(db=self.db, user=self.user)
cv_weight = CVWeight(
criteria_id=criteria_id,
weight_id=uuid4(),
gpa_edu_1=weight.get("gpa_edu_1"),
gpa_edu_2=weight.get("gpa_edu_2"),
gpa_edu_3=weight.get("gpa_edu_3"),
univ_edu_1=weight.get("univ_edu_1"),
univ_edu_2=weight.get("univ_edu_2"),
univ_edu_3=weight.get("univ_edu_3"),
major_edu_1=weight.get("major_edu_1"),
major_edu_2=weight.get("major_edu_2"),
major_edu_3=weight.get("major_edu_3"),
domicile=weight.get("domicile"),
yoe=weight.get("yoe"),
hardskills=weight.get("hardskills"),
softskills=weight.get("softskills"),
certifications=weight.get("certifications"),
business_domain=weight.get("business_domain")
)
weight_output = await weight_svc.create_weight(weight=cv_weight)
weight_id = weight_output.get("weight_id")
all_profiles = await get_profiles_by_user_id(db=self.db, current_user=self.user)
print(f"🫡 Found {len(all_profiles)} profiles to be scored")
all_tobe_scored:list[AIProfileTbScore] = []
for p in all_profiles:
tmp_profile = AIProfileTbScore(
fullname=p.fullname,
gpa_edu_1=p.gpa_edu_1,
univ_edu_1=p.univ_edu_1,
major_edu_1=p.major_edu_1,
gpa_edu_2=p.gpa_edu_2,
univ_edu_2=p.univ_edu_2,
major_edu_2=p.major_edu_2,
gpa_edu_3=p.gpa_edu_3,
univ_edu_3=p.univ_edu_3,
major_edu_3=p.major_edu_3,
domicile=p.domicile,
yoe=p.yoe,
hardskills=p.hardskills,
softskills=p.softskills,
certifications=p.certifications,
business_domain=p.business_domain
)
all_tobe_scored.append(tmp_profile)
match_results:List[AIMatchProfile] = await self._ai_matching_bulk(all_tobe_scored, criteria) # TODO: refactor, use non LLM solution
# match_results kurang profile_id dan criteria id
match_results = [{"profile_id": p.profile_id, "weight_id": weight_id, **match_results[i]} for i, p in enumerate(all_profiles)]
# Insert Match Result to DB
matchings = await create_matchings(self.db, match_results)
score_results = self._calculate_score_bulk(match_results=matchings, weight_data=weight)
# Insert Score Result to DB
scores = await create_scores(self.db, score_results)
return {
"criteria_id":criteria_id,
"scores":scores
}
except Exception as E:
logger.error(f"profile scoring v2 error, {E}")
traceback.print_exc()
raise