Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import math | |
| import re | |
| import pickle | |
| import os | |
| # --- 데이터 경로 설정 --- | |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| DATA_DIR = os.path.join(BASE_DIR, "core", "data") | |
| # --- 전역 데이터 로드 --- | |
| try: | |
| with open(os.path.join(DATA_DIR, 'sw_competency.pkl'), 'rb') as f: | |
| sw_competency = pickle.load(f) | |
| job_def = sw_competency['직무레벨'] | |
| factor_def = sw_competency['평가요소'] | |
| factors = factor_def.columns.to_list()[2:] | |
| bars_df = sw_competency['평가지표'] | |
| opts = sw_competency['opts'] | |
| with open(os.path.join(DATA_DIR, 'sw_wage.pkl'), 'rb') as f: | |
| sw_wage = pickle.load(f) | |
| raw_df = sw_wage['raw'] | |
| avg_df = sw_wage['avg'] | |
| conds_df = sw_wage['conds'] | |
| except Exception as e: | |
| print(f"Warning: Could not load data files: {e}") | |
| # --- 핵심 비즈니스 로직 --- | |
| def format_text_wrap(text: str, max_len: int = 45, delimiter: str = " ") -> str: | |
| if not text: | |
| return "" | |
| lines = [] | |
| for paragraph in text.split("\n"): | |
| paragraph = paragraph.strip() | |
| while len(paragraph) > max_len: | |
| split_pos = paragraph.rfind(delimiter, 0, max_len) | |
| if split_pos == -1: | |
| split_pos = max_len | |
| else: | |
| split_pos += len(delimiter) | |
| lines.append(paragraph[:split_pos].strip()) | |
| paragraph = paragraph[split_pos:].strip() | |
| if paragraph: | |
| lines.append(paragraph) | |
| return "\n".join(lines) | |
| def get_step_options(df, job=None): | |
| result = {} | |
| result['job_options'] = df['ITSQF 직무(변환)'].dropna().drop_duplicates().tolist() | |
| if job is not None: | |
| filtered = df[df['ITSQF 직무(변환)'] == job] | |
| options = filtered['BM'].dropna().drop_duplicates().tolist() | |
| concrete_options = [x for x in options if x != '전체'] | |
| if '전체' in options: | |
| if len(concrete_options) >= 2: | |
| result['bm_options'] = ['전체'] + concrete_options | |
| else: | |
| result['bm_options'] = concrete_options | |
| else: | |
| result['bm_options'] = concrete_options | |
| else: | |
| result['bm_options'] = [] | |
| result['sales_options'] = ['전체'] + opts['매출규모'].tolist() | |
| result['emp_options'] = ['전체'] + opts['직원규모'].tolist() | |
| result['base_options'] = ['지급총액', '고정급'] | |
| return result | |
| def make_bars_table(bars_df, factors, job): | |
| target_df = bars_df[(bars_df['직무']==job)] | |
| level_cols = target_df['레벨'].sort_values(ascending=True).unique().tolist() | |
| target_table = target_df.pivot_table(index='평가요소', columns='레벨', values='지표정의', aggfunc='sum').reset_index() | |
| # 평가요소 순서대로 정렬 | |
| target_table['평가요소'] = pd.Categorical( | |
| target_table['평가요소'], | |
| categories=factors, | |
| ordered=True | |
| ) | |
| target_table = target_table.sort_values('평가요소').reset_index(drop=True) | |
| bars_indicator = target_table.copy() | |
| bars_indicator = bars_indicator.reset_index(names='id') | |
| bars_indicator['id'] = bars_indicator['id'] + 1 | |
| bars_indicator['title'] = bars_indicator['id'].astype(str).str.zfill(2) + '. ' + bars_indicator['평가요소'].astype(str) | |
| bars_cols = ['id', 'title'] + level_cols | |
| for col in level_cols: | |
| bars_indicator[col] = bars_indicator[col].apply(lambda x: x.split('\n') if isinstance(x, str) else []) | |
| max_level = level_cols[-1] | |
| min_level = level_cols[0] | |
| return target_table, bars_indicator[bars_cols], [max_level, min_level] | |
| def get_final_level(user_score, level_range, factors): | |
| levels = [int(re.search(r'(\d+)', level).group(1)) for level in level_range] | |
| max_level, min_level = levels[0], levels[1] | |
| s = pd.Series(user_score, dtype="int") | |
| level_cut = {} | |
| for i in range(7, 3, -1): | |
| level_cut[i] = i*7 + (i-1)*2 + (i-2) | |
| final_level = min_level | |
| for level, cut in level_cut.items(): | |
| if sum(s) >= cut: | |
| final_level = level | |
| break | |
| low_set = s[s < final_level].sort_values().index.tolist() | |
| middle_set = s[(s >= final_level) & (s < final_level+1)].sort_values().index.tolist() | |
| high_set = s[s >= final_level].sort_values(ascending=False).index.tolist() | |
| def trim_items(index_set, max_item=3): | |
| if len(index_set) == 0: | |
| text = "-" | |
| elif len(index_set) > max_item: | |
| text = ", ".join([factors[i] for i in index_set[:max_item]]) + " 등" | |
| else: | |
| text = ", ".join([factors[i] for i in index_set]) | |
| return format_text_wrap(text, max_len=33, delimiter=",") | |
| output = { | |
| 'left' : { | |
| 'guide' : "아래 역량은 현재 레벨 안착을 위해 보완해보면 좋겠습니다.", | |
| 'items' : trim_items(low_set) | |
| } | |
| } | |
| if final_level == max_level: | |
| output['right'] = { | |
| 'guide' : "다음 역량은 현재 안정적으로 발휘되고 있는 강점입니다.", | |
| 'items' : trim_items(high_set), | |
| } | |
| else: | |
| output['right'] = { | |
| 'guide' : "다음 역량을 강화하면 Level-Up 성장을 기대할 수 있습니다.", | |
| 'items' : trim_items(middle_set), | |
| } | |
| return final_level, max_level, pd.DataFrame(output) | |
| def judge_level(user_score, level_range, level_def, job, factors): | |
| final_level, max_level, guides = get_final_level(user_score, level_range, factors) | |
| level_dict = level_def[level_def['직무'] == job].set_index('수준')['수준 정의'] | |
| if final_level == max_level: | |
| output = { | |
| 'left' : { | |
| 'title' : "하위 레벨:" + f'L{final_level-1}', | |
| 'definition' : level_dict[f'L{final_level-1}'] | |
| }, | |
| 'right' : { | |
| 'title' : "현재 레벨:" + f'L{final_level}', | |
| 'definition' : level_dict[f'L{final_level}'] | |
| }, | |
| } | |
| else: | |
| output = { | |
| 'left' : { | |
| 'title' : "현재 레벨: " + f'L{final_level}', | |
| 'definition' : level_dict[f'L{final_level}'] | |
| }, | |
| 'right' : { | |
| 'title' : "상위 레벨: " + f'L{final_level+1}', | |
| 'definition' : level_dict[f'L{final_level+1}'] | |
| }, | |
| } | |
| definitions = pd.DataFrame(output) | |
| return f'L{final_level}', definitions, guides | |
| def describe_percentile(p): | |
| p = max(0, min(100, float(p))) | |
| p = round(p, 1) | |
| top = round(100 - p, 1) | |
| if top > 55: | |
| pos = f"하위 {int(math.ceil(p / 10.0) * 10)}% 이내" | |
| pos_text = f"하위 {p:.1f}% 수준" | |
| else: | |
| pos = f"상위 {int(math.ceil(top / 10.0) * 10)}% 이내" if top > 5 else f"상위 {int(top)}% 이내" | |
| pos_text = f"상위 {top:.1f}% 수준" | |
| if p >= 70: desc = "높은" | |
| elif p >= 60: desc = "평균 이상" | |
| elif p >= 40: desc = "평균" | |
| elif p >= 20: desc = "다소 낮은" | |
| else: desc = "낮은" | |
| return pos, desc, pos_text | |
| def judge_wage( | |
| raw_df: pd.DataFrame, | |
| job: str, | |
| bm: str, | |
| sales: str, | |
| emp: str, | |
| base_type: str, | |
| target_wage: int, | |
| final_level: str, | |
| k_std: float = 20.0, | |
| k_shrink: float = 20.0, | |
| z_clip: float = 2.5, | |
| n_switch: int = 20, | |
| alpha_denominator: int = 30, | |
| ): | |
| x = float(target_wage) * 10000.0 | |
| job_pool_df = raw_df[(raw_df['ITSQF 직무(변환)'] == job)&(raw_df['ITSQF 수준'] == final_level)] | |
| job_pool_vals = pd.to_numeric(job_pool_df[base_type], errors="coerce").dropna().to_numpy(dtype=float) | |
| std_pool = float(np.std(job_pool_vals, ddof=1)) | |
| mean_job = float(np.mean(job_pool_vals)) | |
| def get_cohort_df(df, bm, sales, emp, min_n=5): | |
| applied_sales = sales | |
| applied_emp = emp | |
| result = df[df['BM'] == bm].copy() | |
| if sales != '전체': | |
| sales_filtered = result[result['매출규모'] == sales].copy() | |
| if len(sales_filtered) > min_n: | |
| result = sales_filtered | |
| else: | |
| applied_sales = '전체' | |
| if emp != '전체': | |
| emp_filtered = result[result['직원규모'] == emp].copy() | |
| if len(emp_filtered) > min_n: | |
| result = emp_filtered | |
| else: | |
| applied_emp = '전체' | |
| return result, applied_sales, applied_emp | |
| cohort_df, applied_sales, applied_emp = get_cohort_df(job_pool_df, bm, sales, emp) | |
| cohort_vals = pd.to_numeric(cohort_df[base_type], errors="coerce").dropna().to_numpy(dtype=float) | |
| n = int(cohort_vals.size) | |
| mean_cohort = float(np.mean(cohort_vals)) if n >= 1 else mean_job | |
| std_cohort = float(np.std(cohort_vals, ddof=1)) if n >= 2 else 0.0 | |
| w_std = (n / (n + k_std)) if n > 0 else 0.0 | |
| var_eff = w_std * (std_cohort ** 2) + (1.0 - w_std) * (std_pool ** 2) | |
| std_eff = math.sqrt(max(var_eff, 1e-9)) | |
| z_raw = (x - mean_cohort) / std_eff | |
| w_n = (n / (n + k_shrink)) if n > 0 else 0.0 | |
| z_adj = float(np.clip(w_n * z_raw, -z_clip, z_clip)) | |
| def normal_cdf(z: float) -> float: | |
| return 0.5 * (1.0 + math.erf(z / math.sqrt(2.0))) | |
| def percentile_of_score(arr: np.ndarray, x: float) -> float: | |
| if arr.size == 0: return float("nan") | |
| return float((arr <= x).mean() * 100.0) | |
| p_z = normal_cdf(z_adj) * 100.0 | |
| p_raw = percentile_of_score(cohort_vals, x) if n >= 3 else float("nan") | |
| if n >= n_switch and not math.isnan(p_raw): | |
| alpha = min(1.0, n / float(alpha_denominator)) | |
| p_final = alpha * p_raw + (1.0 - alpha) * p_z | |
| else: | |
| p_final = p_z | |
| p = round(float(p_final), 1) | |
| pos, desc, pos_text = describe_percentile(p) | |
| head_message = f"""진단 결과, 현재 귀하의 직무 역량 수준은 {final_level}에 가장 가까운 것으로 보입니다. | |
| 동일 직무 레벨 및 조건 대비 보상 경쟁력은 {pos}로 {desc} 수준입니다.""" | |
| diff = x - mean_cohort | |
| sign = "+" if diff >= 0 else "-" | |
| gap = f"{sign}{abs(diff)/10000:,.0f}" | |
| if diff == 0: | |
| comp_text = "시장 평균과 동일한 수준으로 나타났습니다." | |
| else: | |
| direction = "더 높게" if diff > 0 else "더 낮게" | |
| comp_text = f"시장 평균 대비 {gap}만원 {direction} 나타났습니다." | |
| block1 = f"""현재 보상 경쟁력은 시장 {pos_text}으로, | |
| {comp_text}""" | |
| block2 = f"""직무: {job} | |
| 레벨: {final_level} | |
| 보상수준: ({base_type}) {target_wage:,.0f}만원 | |
| 준거집단: | |
| - (BM) {bm} | |
| - (매출규모) {applied_sales} | |
| - (직원규모) {applied_emp} | |
| """ | |
| table3 = pd.DataFrame({ | |
| "user": f"{target_wage:,.0f}", | |
| "marketAverage": f"{mean_cohort/10000:,.0f}", | |
| "gap": gap | |
| }, index=["값"]) | |
| def make_guage_chart(p): | |
| value = round(p / 100 * 180, 1) | |
| guage = [180, value, 180 - value] | |
| return pd.DataFrame({"값": guage}, index=["항목1", "항목2", "항목3"]) | |
| chart3 = make_guage_chart(p) | |
| badges = [job, final_level, base_type, bm, applied_sales, applied_emp] | |
| return head_message, block1, block2, p, chart3, table3, badges | |
| def format_table(table, user_score, level, factors): | |
| cut = int(level.replace("L",'')) | |
| s = pd.DataFrame({ | |
| "평가요소": factors, | |
| "Target": [3]*10, | |
| "User": user_score | |
| }) | |
| s['User'] = s['User'].apply(lambda x: 2 if x < cut else (3 if x == cut else 4)) | |
| s['부족'] = s['User'].apply(lambda x : "●" if x == 2 else "") | |
| s['충족'] = s['User'].apply(lambda x : "●" if x == 3 else "") | |
| s['초과'] = s['User'].apply(lambda x : "●" if x == 4 else "") | |
| table.columns = ["평가요소", f"{level} 수준"] | |
| table = table.merge(s[['평가요소', '부족', '충족', '초과']], on='평가요소', how='left') | |
| chart = s[['평가요소', "Target", "User"]] | |
| return table, chart | |