Onboarding / app.py
AdamJovine's picture
Update app.py
034b241
raw
history blame
58.9 kB
import logging
import sys
import pandas as pd
import datetime
import os
import os
import openai
import camelot
from llama_index import (
VectorStoreIndex,
SimpleDirectoryReader,
ServiceContext,
StorageContext,
SQLDatabase,
WikipediaReader,
)
from llama_index.query_engine import PandasQueryEngine
import json
from typing import Sequence
from dateutil.relativedelta import relativedelta
from datetime import datetime
from llama_index.tools import BaseTool, FunctionTool
nest_asyncio.apply()
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))
df = pd.read_excel("/content/NewUpdatedEmployeeRoster 2023-07-13 12_27 EDT.xlsx")
df.columns = df.iloc[0]
df = df.iloc[1:]
df = df[
~pd.isna(df["Employee ID"])
& (df["Employee ID"].apply(lambda x: str.isnumeric(str(x))))
]
df["Employee ID"] = df["Employee ID"].apply(int)
new_cols = []
def no_space(c):
return c.replace(" ", "_")
for c in df.columns:
new_cols.append(no_space(c))
df.columns = new_cols
df
def date_fix(d):
if type(d) is str:
try:
return datetime.strptime(d, "%m/%d/%y").date()
except:
try:
return datetime.strptime(d, "%y-%m-%d").date()
except:
date = str(d)
y = int(date[6:])
m = int(date[3:5])
day = int(date[0:2])
if day <= 31 and m <= 12 and y <= 2100:
return datetime.datetime(y, m, day)
else:
try:
date = str(d)
y = int(date[6:])
m = int(date[3:5])
day = int(date[0:2])
if day <= 31 and m <= 12 and y <= 2100:
return datetime.datetime(y, m, day)
except:
return d
df["Hire_Date"] = df["Hire_Date"].apply(date_fix)
df["Termination_Date"] = df["Termination_Date"].apply(date_fix)
df = df[df["Hire_Date"].apply(lambda x: isinstance(x, datetime))]
df = df[df["Termination_Date"].apply(lambda x: isinstance(x, datetime))]
df["Ethnicity"] = df["Ethnicity"].apply(
lambda x: "Declined to Answer"
if "Declined" in x
else "Hispanic or Latino"
if "Hispanic" in x
else x
)
df["Worker_Management_Level"] = df["Worker's_Management_Level"]
cat_cols = [
"Entity",
"Gender",
"Ethnicity",
"Time_Type",
"Worker_Management_Level",
"Worker_Sub_Type",
"Exclude_from_Turnover_Calculation",
]
df_no_space_cats = df[cat_cols]
for cat in cat_cols:
df_no_space_cats[cat] = df[cat].apply(
lambda x: no_space(x) if isinstance(x, str) else pd.NA
)
non_cat_cols = [
"Employee_ID",
"Job_Profile",
"Job_Family",
"Job_Family_Group",
"Hire_Date",
"Termination_Date",
"Tenure",
"Age_(Years)",
"VP",
]
dumb = pd.get_dummies(df_no_space_cats).dropna(how="any")
for c in dumb.columns:
dumb[c] = dumb[c].apply(bool)
fin_df = df[cat_cols + non_cat_cols].join(dumb)
# vals = cur_df[filter_column[i]].unique()
# orig_col = filter_column[i]
# if str(filter_value[i]) not in str(vals):
# print('in the if')
# nothing_matches = True
# j = 0
# columns = cur_df.columns
# while nothing_matches and j<(len(columns)-1):
# col = columns[j]
# c_vals = cur_df[col].unique()
# if str(filter_value[i]) in str(c_vals):
# nothing_matches = False
# j +=1
# filter_column[i] = col
# if nothing_matches:
# #print('which 5 of these values ' + vals + ' most closely matches ' + filter_value[i])
# #print('we did not understand the query, could you rephrase to be more specific')
# this was error handling code to find values that might match but it meant that reporting which values caused errors didnt work because they would change due to this code
from pandas._libs.tslibs.ccalendar import MONTHS
from pandas.io.formats.format import return_docstring
import functools
os.environ["OPENAI_API_KEY"] = "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"
openai.api_key = os.environ["OPENAI_API_KEY"]
pandas_query_engine = PandasQueryEngine(df=df)
global group_by_missing
group_by_missing = "group_by doesnt exist : "
global filter_missing
filter_missing = "missing values in filter column: "
global filter_val_missing
filter_val_missing = "couldn't find the specified value. was this the correct column: "
global filter_missing_column
filter_missing_column = (
"counl't find the specified column. Was this the correct column: "
)
states = ["happy"]
def date_to_string(time_period):
return str(time_period.month) + "-" + str(time_period.year)
last_update = df["Hire_Date"].max() # datetime.date(2023,7,1)#datetime.now() ()
# print(last_update)
def get_snapshot(cur_df, time_period):
time_period_dt = datetime.now()
if time_period:
time_period_dt = datetime.strptime(
time_period, "%m-%Y"
) # + relativedelta(month=1)
# count people as snapshot of beginning of specified month (i.e. 7/1/20**)
cur_df = cur_df[
(cur_df["Hire_Date"] < time_period_dt)
& (
(cur_df["Termination_Date"] >= time_period_dt)
| (pd.isna(cur_df["Termination_Date"]))
)
]
return cur_df
def filter_by(col, val):
if col and len(col) != 0:
cur_df = df
for i in range(len(col)):
# something wrong with loop
# print('i', i-1 )
cur_df = cur_df[cur_df[col[i]] == val[i]]
return cur_df
return df
def group_check(cur_df, group_by):
global states
if group_by:
if group_by in cur_df.columns:
return group_by
else:
states.append(group_by_missing + group_by)
raise ValueError("missing group")
else:
return group_by
def column_check(cur_df, filter_column, filter_value):
# throw exp
global states
if filter_column:
if len(filter_column) != len(filter_value):
states.append(
filter_missing + str(filter_column) + "value: " + str(filter_value)
)
for i in range(len(filter_column)):
if filter_column[i] in cur_df.columns:
if str(filter_column[i]) != "nan":
if filter_value[i] not in cur_df[filter_column[i]].unique():
if str(filter_value[i]) != "nan":
print("in the nan if, you have missing values ")
states.append(
filter_val_missing
+ filter_column[i]
+ " is this the correct value: "
+ filter_value[i]
)
# raise ValueError('missing value')
else:
# TRY TO FIND SIMILAR Col
low_cols = [c.lower() for c in df.columns]
low = filter_column[i].lower()
if low in low_cols:
filter_column[i] = df.columns[low_cols.index(low)]
else:
states.append(filter_missing_column + filter_column[i])
# raise ValueError('missing column')
return filter_column, filter_value
else:
return ([], [])
def headcount(
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
time_period: str = date_to_string(last_update),
) -> pd.core.frame.DataFrame:
"""Calculates the headcount or number of active employees
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"group_by" is used to find the headcount for each element of the group_by column
"time_period" is the month we want to consider and must be in the format "mm-yyyy". If None then it is the most recent month
"""
cur_df = df[df["Exclude_from_Turnover_Calculation"] == "Keep"]
filter_column, filter_value = column_check(cur_df, filter_column, filter_value)
print("fc: ", filter_column)
print("fv: ", filter_value)
group_by = group_check(cur_df, group_by)
group_by = group_check(cur_df, group_by)
cur_df = filter_by(filter_column, filter_value)
cur_df = get_snapshot(cur_df, time_period)
cur_df = group(cur_df, group_by)
cur_df = cur_df["Employee_ID"].count()
return cur_df
def event_time_period(cur_df, time_period, col):
time_period_dt = datetime.strptime(time_period, "%m-%Y")
cur_df = cur_df[
(
cur_df[col].apply(
lambda x: datetime(int(x.year), int(x.month), 1)
if str(x) != "NaT"
else x
)
== time_period_dt
)
]
return cur_df
def group(cur_df, group_by):
if group_by:
cur_df = cur_df.groupby(by=group_by)
return cur_df
def event_flow(
operation,
col,
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
time_period: str = date_to_string(last_update),
):
cur_df = df
filter_column, filter_value = column_check(cur_df, filter_column, filter_value)
group_by = group_check(cur_df, group_by)
cur_df = filter_by(filter_column, filter_value)
cur_df = cur_df[cur_df["Exclude_from_Turnover_Calculation"] == "Keep"]
cur_df = operation(cur_df, time_period, col)
cur_df = group(cur_df, group_by)
cur_df = cur_df["Employee_ID"].count()
return cur_df
def hires(
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
time_period: str = date_to_string(last_update),
) -> pd.core.frame.DataFrame:
"""Counts the number of hires
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"group_by" is used to find the terminations for each elemnt of the group_by column
"time_period" is the month which we weant to consider. If None then it is the most recent date
"""
return event_flow(
event_time_period,
"Hire_Date",
filter_column,
filter_value,
group_by,
time_period,
)
def terminations(
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
time_period: str = date_to_string(last_update),
) -> pd.core.frame.DataFrame:
"""Counts the number of terminations
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"group_by" is used to find the terminations for each elemnt of the group_by column
"time_period" is the month which we weant to consider. If None then it is the most recent date
"""
return event_flow(
event_time_period,
"Termination_Date",
filter_column,
filter_value,
group_by,
time_period,
)
def thing_rate(
terms: bool,
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
time_period: str = date_to_string(last_update),
) -> pd.core.frame.DataFrame:
cur_df = df
filter_column, filter_value = column_check(cur_df, filter_column, filter_value)
group_by = group_check(cur_df, group_by)
termination_df = (
terminations(
filter_column, filter_value, group_by=group_by, time_period=time_period
)
if terms
else hires(
filter_column, filter_value, group_by=group_by, time_period=time_period
)
)
headcount_df = headcount(
filter_column, filter_value, group_by=group_by, time_period=time_period
)
if group_by:
all_groups = df[group_by].unique()
res = []
for g in all_groups:
if not g in headcount_df.index:
termination_df.loc[g] = 0
headcount_df.loc[g] = 1
if not g in termination_df.index:
termination_df.loc[g] = 0
res.append(termination_df.loc[g] / headcount_df.loc[g])
return pd.Series(data=res, index=all_groups)
return termination_df / headcount_df
def turnover_rate(
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
time_period: str = date_to_string(last_update),
) -> pd.core.frame.DataFrame:
"""The termination rate, attrition or Turnover
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"group_by" is used to find the turnover_rate for each element of the group_by column
"time_period" is the month we want to consider. If None then it is the most decent date
"""
return thing_rate(
True,
filter_column=filter_column,
filter_value=filter_value,
group_by=group_by,
time_period=time_period,
)
def hire_rate(
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
time_period: str = date_to_string(last_update),
) -> pd.core.frame.DataFrame:
"""The hire rate, percentage of people who are new hires each month or monthly ins
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"group_by" is used to find the turnover_rate for each element of the group_by column
"time_period" is the month we want to consider. If None then it is the most decent date
"""
return thing_rate(
False,
filter_column=filter_column,
filter_value=filter_value,
group_by=group_by,
time_period=time_period,
)
def roll(terms, filter_col, filter_val, group_by, time_period, roll_length):
# FIX THIS TO REUSE CALCULATIONS BY CALCULATING TURNOVER FOR EACH MONTH THEN CALCULATING AVERAGE
cur_df = df
time_period_dt = datetime.now()
months = 12
if roll_length:
months = roll_length
if time_period:
time_period_dt = datetime.strptime(time_period, "%m-%Y")
acc = (
turnover_rate(
filter_col,
filter_val,
group_by=group_by,
time_period=date_to_string(time_period_dt),
)
if terms
else hire_rate(
filter_col,
filter_val,
group_by=group_by,
time_period=date_to_string(time_period_dt),
)
)
if group_by:
acc = acc.values
for m in range(1, months):
temp_time = time_period_dt - relativedelta(months=m)
temp_rate = (
turnover_rate(
filter_col,
filter_val,
group_by=group_by,
time_period=date_to_string(temp_time),
)
if terms
else hire_rate(
filter_col,
filter_val,
group_by=group_by,
time_period=date_to_string(temp_time),
)
)
if group_by:
acc += temp_rate.values
acc += temp_rate
return acc * 12 / months
def rolling_turnover(
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
time_period: str = date_to_string(last_update),
roll_length: int = None,
):
"""
rolling monthly turnover or termination rate, using the past "roll_length" months,
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"group_by" is used to find the rolling turnover for each element of the group_by column
"time_period" the date which we will end our rolling turnover calculation, if not specified it is the most recent date
"roll_length" the number of months included in the rolling turnover calculation
"""
return roll(True, filter_column, filter_value, group_by, time_period, roll_length)
def rolling_hire_rate(
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
time_period: str = date_to_string(last_update),
roll_length: int = None,
):
"""
rolling monthly hire rate, using the past "roll_length" months
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"group_by" is used to find the rolling turnover for each element of the group_by column
"time_period" the date which we will end our rolling turnover calculation, if not specified it is the most recent date
"roll_length" the number of months included in the rolling turnover calculation
"""
return roll(False, filter_column, filter_value, group_by, time_period, roll_length)
## at some point we might want to add cat_col to everything but it might be unwanted
def average(
feature: str,
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
time_period: datetime = date_to_string(last_update),
):
"""
the average of the feature, use whenever asked to calculte an average
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"group_by" is used to find the average for each element of the group_by column
"time_period" is the month we want to consider. If None then it is the last year
"""
cur_df = df
cur_df = get_snapshot(cur_df, time_period)
cur_df = group(cur_df, group_by)
return cur_df[feature].mean()
def over_time(
filter_column, filter_value, start, end, operation, cat_col, roll_length=None
):
cur_df = df
filter_column, filter_value = column_check(cur_df, filter_column, filter_value)
cat_col = group_check(cur_df, cat_col)
starts = start
if not start:
starts = datetime.now() - relativedelta(years=1)
ends = end
if not end:
ends = datetime.now()
dates = pd.date_range(start=starts, end=ends, freq="M")
######
result = pd.DataFrame(index=dates)
cats = ["value"]
if cat_col:
cats = cur_df[cat_col].unique()
filter_val = [] if not filter_value else filter_value
filter_col = [] if not filter_column else filter_column
## DO THIS PROPERLY THIS IS SUPER JANKY
if cats[0] != "value":
for c in cats:
res = []
for d in dates:
new_val = filter_val + [c]
new_col = filter_col + [cat_col]
res.append(operation(new_col, new_val, time_period=date_to_string(d)))
result[c] = res
else:
res = []
for d in dates:
new_val = filter_val
new_col = filter_col
res.append(operation(new_col, new_val, time_period=date_to_string(d)))
result["value"] = res
return result
def hc_ovr_time(
filter_column: List[str] = None,
filter_value: List[str] = None,
start: str = None,
end: str = date_to_string(datetime.now()),
cat_col: str = None,
):
"""
the headcount overtime giving the live headcount for each month from start to end
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"start" the starting month that headcount is measured from
"end" the final month that headcount is measured from
"cat_col" is used to find the headcount for each category in the cat_col column
"time_period" is the month we want to consider. If None then it is the last year
"""
return over_time(filter_column, filter_value, start, end, headcount, cat_col)
def terms_ovr_time(
filter_column: List[str] = None,
filter_value: List[str] = None,
start: str = None,
end: str = date_to_string(datetime.now()),
cat_col: str = None,
):
"""
the Terminations overtime giving the live termination for each month from start to end
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"start" the starting month that terminations is measured from
"end" the final month that terminations is measured from
"cat_col" is used to find the terminations for each category of the cat_col column
"time_period" is the month we want to consider. If None then it is the last year
"""
return over_time(filter_column, filter_value, start, end, terminations, cat_col)
def turnover_ovr_time(
filter_column: List[str] = None,
filter_value: List[str] = None,
start: str = None,
end: str = date_to_string(datetime.now()),
cat_col: str = None,
):
"""
the turnover overtime giving the live turnover for each month from start to end
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"start" the starting month that terminations is measured from
"end" the final month that terminations is measured from
"cat_col" is used to find the turnover over time for each category of the cat_col column
"time_period" is the month we want to consider. If None then it is the last year
"""
return over_time(filter_column, filter_value, start, end, turnover_rate, cat_col)
def hires_ovr_time(
filter_column: List[str] = None,
filter_value: List[str] = None,
start: str = None,
end: str = date_to_string(datetime.now()),
cat_col: str = None,
):
"""
the hires overtime giving the live hires for each month from start to end
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"start" the starting month that terminations is measured from
"end" the final month that terminations is measured from
"cat_col" is used to find the hires over time for each category of the cat_col column
"time_period" is the month we want to consider. If None then it is the last year
"""
return over_time(filter_column, filter_value, start, end, hires, cat_col)
def rolling_turnover_ovr_time(
filter_column: List[str] = None,
filter_value: List[str] = None,
start: str = None,
end: str = date_to_string(datetime.now()),
cat_col: str = None,
roll_length=12,
):
"""
the the rolling turnover overtime giving the live termination for each month from start to end.
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"start" the starting month that terminations is measured from
"end" the final month that terminations is measured from
"cat_col" is used to find the rolling turnover over time for each category of the cat_col column
"time_period" is the month we want to consider. If None then it is the last year
"roll length" is the number of past months we use to calculate each months rolling turnover
"""
return over_time(
filter_column, filter_value, start, end, rolling_turnover, cat_col, roll_length
)
def rolling_hire_ovr_time(
filter_column: List[str] = None,
filter_value: List[str] = None,
start: str = None,
end: str = date_to_string(datetime.now()),
cat_col: str = None,
roll_length=12,
):
"""
the the rolling turnover overtime giving the live termination for each month from start to end.
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"start" the starting month that terminations is measured from
"end" the final month that terminations is measured from
"cat_col" is used to find the rolling hire over time for each element of the cat_col column
"time_period" is the month we want to consider. If None then it is the last year
"roll length" is the number of past months we use to calculate each months rolling turnover
"""
return over_time(
filter_column, filter_value, start, end, rolling_turnover, cat_col, roll_length
)
def FTE(filter_column: List[str] = None, filter_value: List[str] = None):
"""The Full time equivalent or FTE for employees, this is NOT number of employees but the number of equivalent full time workers
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"""
cur_df = df
filter_column, filter_value = column_check(cur_df, filter_column, filter_value)
cur_df = filter_by(filter_column, filter_value)
return cur_df["FTE"].sum()
def split_by_cat(cur_df, category_column, filter_column, filter_value, group_by):
filter_column, filter_value = column_check(cur_df, filter_column, filter_value)
cur_df = filter_by(filter_column, filter_value)
cols = cur_df[category_column].unique()
cols = [x for x in cols if str(x) != "nan"]
cats = pd.get_dummies(cur_df[category_column])
result = pd.concat([cur_df, cats], axis=1, join="inner")
cur_df = group(result, group_by)
return cur_df.mean()[list(cols)]
def perc_live_cat(
category_column: str,
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
):
"""this is used to find percentages/statistics about current employees. The percentage of current employees that belong to each catgegory in category_column, only use group_by if they want to consider two DIFFERENT percentages simultaneously, group_by should never be equal to category_column
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"category_column" is the column or feature that we want to filter on, this is the default for grouping
"group_by" if they want to compare a second column in relation to the first one then use this
"""
cur_df = df
filter_column, filter_value = column_check(cur_df, filter_column, filter_value)
group_by = group_check(cur_df, group_by)
category_column = group_check(cur_df, category_column)
cur_df = cur_df[pd.isna(cur_df["Termination_Date"])]
return split_by_cat(cur_df, category_column, filter_column, filter_value, group_by)
def perc_term_cat(
category_column: str,
filter_column: List[str] = None,
filter_value: List[str] = None,
group_by: str = None,
):
"""this used to find percentage/statistics about former employees. The percentage of terminated employees that belong to each catgegory in category_column, only use group_by if they want to consider two DIFFERENT percentages simultaneously, group_by should never be equal to category_column
"filter_col" is an list of columns/features that we want to filter on, it is an empty list or None if we do not want to filter
"filter_val" is an list of values that our columns takes on, it is an empty list or None if we do not want to filter, there is one to one correspondence between columns and values, meaning the lists must be the same length
"category_column" is the column or feature that we want to filter on, this is the default for grouping
"group_by" if they want to compare a second column in relation to the first one then use this
"""
cur_df = df
filter_column, filter_value = column_check(cur_df, filter_column, filter_value)
group_by = group_check(cur_df, group_by)
catgeory_column = group_check(cur_df, category_column)
cur_df = cur_df[~pd.isna(cur_df["Termination_Date"])]
return split_by_cat(cur_df, category_column, filter_column, filter_value, group_by)
# ADD THIS
def perc_cat_ovr_time():
return ""
def beneath_count(name: str = None):
def nan_to_empt(x):
return x if str(x) != "nan" else ""
cur_df = df
# sups = filter(lambda x , lst : lst + [x] if 'Sup_Org' in x else lst, df.columns)
sups = []
for c in df.columns:
if "Sup_Org" in c:
sups.append(c)
franken_boss = pd.DataFrame(
{
"franken": cur_df["VP"].apply(nan_to_empt)
+ cur_df["Director"].apply(nan_to_empt)
+ cur_df["Manager"].apply(nan_to_empt)
}
)
for c in sups:
franken_boss["franken"] = (
franken_boss["franken"].values + cur_df[c].apply(nan_to_empt).values
)
# all_managers = [x if str(x)!='nan' else '' for x in all_managers]
if name:
return franken_boss[franken_boss["franken"].str.contains(str(name))].shape[0]
else:
all_managers = set()
for col in sups + ["Manager", "VP", "Director"]:
all_managers = all_managers.union(set(cur_df[col].unique()))
all_managers_fin = []
for man in all_managers:
# prob get rid of this with filter idk where the nans come form
if isinstance(man, str):
all_managers_fin.append(man)
all_managers_count = []
for manager in all_managers_fin:
all_managers_count.append(
franken_boss[franken_boss["franken"].str.contains(str(manager))].shape[
0
]
)
return pd.DataFrame(data=all_managers_count, index=all_managers_fin)
def fancy_filter(q):
"""allows you to pass in your own condition eg for finding headcount of employees with tenure greater than 5 years, ie conditions that cannot be expressed in terms of equality to a column"""
# for now assume the string passed in is just the request
cur_df = eval(
pandas_query_engine.query(
"convert this query into a pandas expression, where you return the whole dataframe but with the desired filtering applied: "
+ q
).metadata["pandas_instruction_str"]
)
return cur_df
def fancy_category(q):
exec(
pandas_query_engine.query(
"convert this query into a pandas expression, where you create a single new column that creates the desired categories specified in this query: "
+ q
).metadata["pandas_instruction_str"]
)
return df
# have 2 layer chatbot where the first converts the query into all the necessary preprocessing
# then the second calls the correct function
headcount_tool = FunctionTool.from_defaults(fn=headcount, name="headcount")
terminations_tool = FunctionTool.from_defaults(fn=terminations, name="terminations")
turnover_tool = FunctionTool.from_defaults(fn=turnover_rate, name="turnover")
turnover_ovr_time_tool = FunctionTool.from_defaults(
fn=turnover_ovr_time, name="turnover_over_time"
)
avg_tool = FunctionTool.from_defaults(fn=average, name="average")
rolling_turnover_tool = FunctionTool.from_defaults(
fn=rolling_turnover, name="rolling_turnover"
)
head_over_tool = FunctionTool.from_defaults(fn=hc_ovr_time, name="headcount_over_time")
terms_over_tool = FunctionTool.from_defaults(fn=terms_ovr_time, name="terms_over_time")
FTE_tool = FunctionTool.from_defaults(fn=FTE, name="fte")
perc_live_cat_tool = FunctionTool.from_defaults(fn=perc_live_cat, name="perc_live_cat")
perc_terms_cat_tool = FunctionTool.from_defaults(fn=perc_term_cat, name="perc_term_cat")
rolling_turnover_ovr_time_tool = FunctionTool.from_defaults(
fn=rolling_turnover_ovr_time, name="rolling_turnover_over_time"
)
rolling_hire_rate_tool = FunctionTool.from_defaults(
fn=rolling_hire_rate, name="rolling_hire_rate"
)
hire_rate_tool = FunctionTool.from_defaults(fn=hire_rate, name="hire_rate")
hires_tool = FunctionTool.from_defaults(fn=hires, name="hires")
hires_ovr_time_tool = FunctionTool.from_defaults(
fn=hires_ovr_time, name="hires_over_time"
)
rolling_hire_ovr_time_tool = FunctionTool.from_defaults(
fn=rolling_hire_ovr_time, name="rolling_hire_over_time"
)
all_tools = [
headcount_tool,
terminations_tool,
turnover_tool,
avg_tool,
rolling_turnover_tool,
head_over_tool,
terms_over_tool,
FTE_tool,
hires_ovr_time_tool,
perc_live_cat_tool,
perc_terms_cat_tool,
turnover_ovr_time_tool,
rolling_turnover_ovr_time_tool,
rolling_hire_rate_tool,
hire_rate_tool,
hires_tool,
rolling_hire_ovr_time_tool,
]
all_tools_map = {t.metadata.name: t for t in all_tools}
# resp.sources[0].raw_input
headcount()
import matplotlib.pyplot as plt
global global_input
def group_compare(final_df, most_n=False, least_n=False, hc_min=None):
"""group compare is used when group_by is not none and the measurement isnt overtime"""
cur_df = final_df
if most_n:
cur_df = cur_df.sort_values(ascending=True)
cur_df = cur_df.iloc[:most_n]
if least_n:
cur_df = cur_df.sort_values(ascending=False)
cur_df = cur_df.iloc[:least_n]
if hc_min > 0:
print("in the if")
# we must find only groups with headcounts greater than hc_min
l = []
for group in cur_df.index:
filter_column_new = [cur_df.index.name]
if (
headcount(
filter_column=filter_column_new,
filter_value=[group, "Clinical Nurse"],
)
>= hc_min
):
l.append(group)
print(l)
cur_df = cur_df.loc[l]
cur_df.plot(kind="bar")
group_compare(
headcount(
filter_column=["Job_Family"],
filter_value=["Clinical Nurse"],
group_by="Cost_Center",
),
hc_min=15,
)
from llama_index import VectorStoreIndex
from llama_index.objects import ObjectIndex, SimpleToolNodeMapping
tool_mapping = SimpleToolNodeMapping.from_objects(all_tools)
obj_index = ObjectIndex.from_objects(
all_tools,
tool_mapping,
VectorStoreIndex,
)
from llama_index.agent import FnRetrieverOpenAIAgent
agent = FnRetrieverOpenAIAgent.from_retriever(obj_index.as_retriever(), verbose=True)
system = (
"Make sure to use Tools, always write dates in the format 'mm-yyyy' the year is 2023, "
"if for each is in the query then use group_by or cat_col NOT filter_column or filter_value"
"if grouping is in the query use group_by, if categorizing is used in the query use cat_col"
"Use filter_col and filter_val if the query asks for employees where {column} is {value} : "
)
def get_answer(query):
return agent.chat(system + query)
def get_data(query):
result = agent.chat(system + query)
return result.sources[0].raw_output
"""
#Instructions:
While the analytics bot is general purpose, and will do it's best to answer any question you ask, there are ways to ensure you get the desired calculation.
## Basics:
This chatbot can answer questions about headcount, terminations, turnover, rolling turnover, FTE, and statistics like Ethnic, Gender, Time Type percentages. These calculations can be done for a specific month, for the current employees or over time.
For now you refer use the exact Column name and value. The column names have had spaces replaced with underscores. eg Job Family Group is Job_Family_Group
#### Over Time
If you request a calculation without specifying the date, the result will be the current employee data, if not it will use historic data.
> ""
> ""
If you would like to see the evolution of a datapoint over time just add the words over time. If you don't specify a time frame the calculations will start from 12 months ago. You can ask for the calculation to be rolling the defulat roll is 6 months but you can specify rolling period.
##Filtering:
If you want to only consider a specific, VP, Job_Family, Entity, or any other category found in the Headcount dataset's columns you will want to the chatbot to use: filter_column and filter_value.
These can be used to create an unlimmited number of conditions. To be sure you get the desired filtering phrase the query like this:
>"what is the {desired calculation eg rolling Turnover} for employees where {column_1} is {value_1} and ... {column_n} is {value_n}"
For more tailored filtering ### IMPLEMENT
## Grouping:
If you want to do a calculation for each VP, Job_Family, Entity, etc there are 2 ways to go about it.
####group_by:
this will give each group a row
####cat_col:
this will give each group a column
For calculations where you are grouping on just one column you can use either.
In this case you should write a query like this:
> "what is the {desired calculation eg rolling Turnover} for each {column_1}"
if you would like to ensure it groups by rows:
>"what is the {desired calculation eg rolling Turnover} grouping by {column_1}"
and if you would like to ensure it groups by columns:
> "what is the {desired calculation eg rolling Turnover} categorizing by {column_1}"
if you would like to use both follow the same pattern
eg to find headcount for each VP for each Ethnicity
> "what is the {headcount} for each {VP} for each {Entity}"
> or " what is the {headcount} grouping by {VP} categorizing by {Entity}"
> or " what is the {headcount} categorizing by {VP} grouping by {Entity}"
#### Percentage
"""
from llama_index import VectorStoreIndex, SimpleDirectoryReader, ServiceContext
from llama_index.llms import OpenAI, Anthropic
from llama_index import Document
from llama_index.langchain_helpers.text_splitter import TokenTextSplitter
from llama_index import VectorStoreIndex, ServiceContext, LLMPredictor
from langchain.chat_models import ChatOpenAI
from llama_index.node_parser import SimpleNodeParser
from llama_index.tools import QueryEngineTool
from llama_index.tools import QueryEngineTool, ToolMetadata
from llama_index.agent import ReActAgent
from llama_index.schema import Document
from llama_index.agent import ContextRetrieverOpenAIAgent
from llama_index.tools import QueryPlanTool
from llama_index import get_response_synthesizer
from llama_index.tools import QueryEngineTool
from llama_index.agent import OpenAIAgent
from llama_index.schema import Document
from llama_index.agent import ContextRetrieverOpenAIAgent
from llama_index.tools import QueryPlanTool
from llama_index import get_response_synthesizer
from llama_index.tools import QueryEngineTool
from llama_index.agent import OpenAIAgent
import json
from typing import Sequence, List
from llama_index.llms import OpenAI, ChatMessage
from llama_index.tools import BaseTool, FunctionTool
import nest_asyncio
nest_asyncio.apply()
def rep(c):
l = str(df[c].unique())
if len(l) > 250:
return l[:250]
return l
column_info = (
"you are helping me analyse a dataset which is used for Human Resources. You have access to a database with data about every employee on each row. Always convert dates to be of the form mm-yyyy "
"Turnover refers to the percentage of employees who are terminated, so you must include Termination Dates "
"ins or Monthly ins refers to the percentage of employees who are new hires "
"These are the columns of the database: "
"These are the columns that you have access to. "
"Worker_ID: is the unique identifier for each employee. "
"Job_Code: is the unique number assigned to each Job. "
"Job_Profile: is the most specific Job Title of each employee. "
"Job_Family: is the category of Job that the employee belongs to. "
"Job_Family_Group: is the broadest class of Job. "
"Worker_Sub_Type: This is the typer of Worker, like Per Diem, Temporary. "
"Time_Type: If the worker is Full time of Part time. "
"Exclude_from_Turnover_Calculation is Keep if the employee should be included and Exclude if they should not be "
"FTE: stands for Full Time Equivalent and is the ratio of how many hours they are expected to work compared to 40 hours "
"Entity: is each Organization within the Tufts Medicine Network, it includes all the hospitals and corporate "
"Cost_Center: is the Department the employee belongs to "
"Is_Rehire: is whether or not the employee is a rehire "
"Hire_Date: is when each employee was hired "
"Termination_Date: is when each employee was terminated or null if they are still an employee "
"Termination_Type: is whether or not the termination was Voluntary or Involuntary "
"Termination_Reason: is the reason the employee was terminated "
"Tenure: is how many Years the employee has worked at Tufts Medicine "
"DOB: is the Date of Birth of the employee "
"Age_(Years): is the Age in Years of the employee "
"Ethnicity: is the Ethnicity of the employee "
"Gender: is the Gender of the employee "
"Veteran_Status: Is whether or not the employee is a Veteran of the armed services "
"Disability_Status: is whether or not the employee has disclosed a disability "
"Worker's_Management_Level: is what level the employee is in the company heirarcy "
"Worker's_Sup_Org: is the employees Department leader "
"VP: is the VP of the employee "
"Director: is the director of the employee "
)
# CHECK THESE MATCH EXACTLY and explain EVERY ARGUMENT for each function
calculation_info = (
"These are the calculations you are allowed to perform: "
"headcount: the number of active employees for a specific month."
"terminations: the total number of terminations for a specific month. NOT termination rate. "
"turnover: the termination rate or percentage of employees who were terminated. "
"average: the average value a particular column takes on."
"rolling_turnover: the average turnover when considering multiple months."
"headcount_ovr_time: the headcount over time. refers to the total headcount, Use headcount_ovr_time if the query specifies an interval longer than a month or uses phrases like since 2023, from 2023, starting in 2023"
"termination_ovr_time: the number of termination over time. Always use termination_ovr_time if the query specifies an interval longer than a month or uses phrases like since 2023, from 2023, starting in 2023"
"turnover_ovr_time: the turnover rate over time, Always use turnover_ovr_time if the query specifies an interval longer than a month or uses phrases like since 2023, from 2023, starting in 2023"
"hires_ovr_time: the number of hires rate over time, Always use turnover_ovr_time if the query specifies an interval longer than a month or uses phrases like since 2023, from 2023, starting in 2023"
"FTE: the sum of the Full Time Equivalent column, which should only be used if Full Time Equivalent or FTE is requested."
"percentage headcount category: this should only be used when trying to understand what percentage of current employees that belong to each catgory in a specific column."
"percentage termination category: this should only be used when trying to understand what percentage of former employees that belong to each category in a specific column"
"rolling_turnover_ovr_time: the average value turnover for multiple months. Always use rolling_turnover_ovr_time if the query specifies an interval longer than a month or uses phrases like since 2023, from 2023, starting in 2023 "
"rolling_hires_ovr_time: the average value hires for multiple months. Always use rolling_hires_ovr_time if the query specifies an interval longer than a month or uses phrases like since 2023, from 2023, starting in 2023 "
"hires: the number of hires in a specific month"
"hire_rate: the percentage of employees that are new hires"
"rolling_hire_rate_overtime: the average value of hire rate over multiple months Always use rolling_hire_rate_overtime if the query specifies an interval longer than a month or uses phrases like since 2023, from 2023, starting in 2023"
)
value_texts = []
import math
import numpy as np
from llama_index.agent import OpenAIAgent
from llama_index.llms import OpenAI
import json
from typing import Sequence
from llama_index.tools import BaseTool, FunctionTool
from llama_index import VectorStoreIndex
from llama_index.objects import ObjectIndex, SimpleToolNodeMapping
from llama_index.agent import FnRetrieverOpenAIAgent
import gradio as gr
# this is the values
for c in df.columns:
# print(df[c].unique())
vals = sorted([str(x) for x in df[c].unique()])
l = len(vals)
if l > 30:
m = math.floor(l / 30)
ind = [30 * i for i in range(m)]
value_texts += [
"column " + c + " takes on the following values: " + str(chunk)
for chunk in np.split(vals, ind)
]
value_texts.append("column" + c + " takes on the following values: " + str(vals))
# print(value_texts)
service_context = ServiceContext.from_defaults(llm=OpenAI())
data = [Document(text=t) for t in value_texts]
instructions = [Document(text=column_info), Document(text=calculation_info)] + data
column_index = VectorStoreIndex.from_documents(
[Document(text=column_info)], service_context=service_context
)
calc_index = VectorStoreIndex.from_documents(
[Document(text=calculation_info)], service_context=service_context
)
data_index = VectorStoreIndex.from_documents(data, service_context=service_context)
cols_and_vals_index = VectorStoreIndex.from_documents(
data + [Document(text=column_info)], service_context=service_context
)
col_engine = column_index.as_query_engine(similarity_top_k=2)
calc = calc_index.as_query_engine(similarity_top_k=5)
data_engine = data_index.as_query_engine(similarity_top_k=10)
c_v = cols_and_vals_index = cols_and_vals_index.as_query_engine(simialrity_top_k=5)
agent = FnRetrieverOpenAIAgent.from_retriever(obj_index.as_retriever(), verbose=True)
# ADD ALIASING and add FIRST PRIORITY error catching -> explain what you are doing
agent.chat("what is the termination rate")
user_input = "number of terminations for Clinical nurses over time " #'what is the number of hires for each race for care at home'
# WRITE VERY DETAILED INSTRUCTIONS and change the context above
# MAKE SURE THIS ALWAYS HAS THE SAME RETURN TYPE ->
def query(user_input):
agent = FnRetrieverOpenAIAgent.from_retriever(
obj_index.as_retriever(), verbose=True
)
message = ""
actual_response = ""
try:
actual_response = agent.chat(message + user_input)
except:
actual_response = agent.chat("please say: the calculation failed")
global states
print("the state is : " + str(states))
bug_fixing = "\n "
for state in states:
if filter_missing in state:
cols = state[len(filter_missing) :]
vals = state[state.index("value: ")]
bug_fixing += (
"the filtering is requiring that "
+ cols
+ "take on the following values: "
+ vals
+ " the number of columns and values must be the same, please rephrase to be more explicit"
)
if filter_missing_column in state:
col = state[state.index(":") :]
salvage = col_engine.query(
"which five columns most closely matche "
+ col
+ ". Only give the five column names"
)
bug_fixing += (
"please specify the column you are refering to for filtering. The model guessed: "
+ col
+ " .These columns might be useful: "
+ salvage.response
)
if group_by_missing in state:
col = state[len(group_by_missing) :]
salvage = col_engine.query(
"which five columns are most similar to or the same as "
+ col
+ ". only give the column names, choose randomly if unsure"
)
bug_fixing += (
"The column "
+ col
+ " you are using for grouping does not exist. The model guessed"
+ col
+ ". These columns might be useful: "
+ salvage.response
)
if filter_val_missing in state:
col = state[len(filter_val_missing) :]
val = state[state.index("value: ") :]
if str(val) != "nan" or str(val) != "NaT":
salvage = data_engine.query(
"which five values in column "
+ col
+ "most closely match"
+ val
+ ", only list the similar values, write NOTHING else"
)
bug_fixing += (
"The value "
+ val
+ " does not exist in the column: "
+ col
+ " these are the most similar values: "
+ salvage.response
)
return bug_fixing, actual_response
# query(user_input)
def many_sub(user_input):
calco = calc.query(
"which calculation would be most useful for answering this question: "
+ user_input
)
filter = c_v.query(
"does the query concern exactly one subset of the employees or a type of Employee? If yes which columns and values should we filter on. You can only use the following columns"
+ str(df.columns)
+ ". Never mention Hire_Date, Employee_ID, Termination_Dates or Age_(Years) unless specifically requested. There is never more than one subset requested meaning each column takes on exactly one value. Please use as few columns as possible. If nothing is specified or unsure then say no filters are needed."
+ user_input
)
group = calc.query(
"group_by is used to find the result of the calculation for each element of the group_by column, would a group_by column be useful in answering this query: "
+ user_input
+ ". You are only allowed to use columns with these names "
+ str(df.columns)
+ "if yes only say yes and the column if no only say No"
)
# ot = calc.query('does this query request results at different times or only apply to our current employees, by default say yes: ' + user_input)
print(calco, " calc")
print(filter, " filter")
print(group, " group")
# print(ot, 'ot')
# eventually we should combine this into one actually meaninful request
return query(
"use function : "
+ calco.response
+ "should you have any filters "
+ filter.response
+ "should you group by"
+ group.response
+ "to answer the following query: "
+ user_input
)
desc_index = VectorStoreIndex.from_documents(
[
Document(
text=(
"filter_column is a column dataset, it is the feature we are filtering on, if it is blank we are looking at all of Tufts Medicine. "
)
),
Document(
text=(
"filter_value is the value which our column takes on, it is the specific type that we are filtering on, if it is blank we are looking at all types of employees. "
)
),
Document(
text=(
"filter_column and filter_value must be the same lenght, if they arent then the function has been generated incorrectly"
)
),
Document(
text=(
"group_by is a column in the dataset for which we are doing the calculation for each unique value in the column, this shows each value as a row "
)
),
Document(
text=(
"cat_col is a column in the dataset for which we are doing a calculation for each unique value in the column, however, this is only used for over time calculations, this shows each value as a column "
)
),
Document(
text=(
"start is the start date, the first time the datapoint will be measured. If it is blank then it is 12 months ago. It is a date in the format mm-yyyy"
)
),
Document(
text=(
"end is the end date, the final time the datapoint will be measure. If it is blank then it is this month. It is a date in the format mm-yyyy"
)
),
],
service_context=service_context,
)
desc = desc_index.as_query_engine()
def remove_filter_args(args):
gargs = args
if "filter_column" in gargs.keys():
gargs["kwargs"].pop("filter_column")
if "filter_value" in gargs.keys():
gargs["kwargs"].pop("filter_value")
return gargs
def details(r):
print(r)
instruc = (
"what is being calculated by the function with this description :"
+ all_tools_map[r.sources[0].tool_name].metadata.description
)
arg = r.sources[0].raw_input
args = arg if ["happy"] == states else remove_filter_args(arg)
format = "the function calculated {function} with arguments {arguments}. {if filter_column and filter_value are not none then: where filter_column is filter value}, {if group_by is not none: for each group_by}, {if start or end not none then: from start to end}"
# instructions = (instruc+ ' and with these arguments : ' + str(args) + '. Answer the question in this format: ' + format)
fail_info = ""
#' was generated, but we didnt understand the filters, so instead arguments : ' + str(args) + ' were used'
print(
"dets",
"the function: "
+ r.sources[0].tool_name
+ " with arguments"
+ str(arg)
+ fail_info,
)
return (
"the function: "
+ r.sources[0].tool_name
+ " with arguments"
+ str(arg)
+ fail_info
) # desc.query(instructions)
x = pd.DataFrame(index=[1], data={"result": 1})
x.to_excel("/tmp/result.xlsx")
def chatbot_response(user_input):
res, resp = query(user_input)
s = resp.sources
global states
states = ["happy"]
filename = "/tmp/result.xlsx"
if len(s) > 0:
df = s[0].raw_output
if not isinstance(df, pd.core.frame.DataFrame):
df = pd.DataFrame(index=[1], data={"result": df})
# Save to a temporary file
df.to_excel(
filename, engine="openpyxl"
) # Use openpyxl as the engine to write Excel files
return resp.response + "\n" + details(resp) + "\n" + res, filename
return resp.response + "n" + res, filename
# iface = gr.Interface(
# fn=chatbot_response,
# inputs=gr.inputs.Textbox(lines=5, placeholder="Type here..."),
# outputs=gr.outputs.File(label="Download Response"), # Change this line
# )
iface = gr.Interface(
fn=chatbot_response,
inputs=gr.inputs.Textbox(lines=5, placeholder="Type here..."),
outputs=[
gr.components.Textbox(label="Chatbot Response"),
gr.components.File(label="Download Response in Excel"),
],
live=False, # Set live to False to include a Submit button
)
iface.launch(debug=True, share=True)
# FUTURE FEATURES
# visualizations -> it's possible but it's super hard because you need to be able to filter it in many ways, like by headcount, by max,min, etc cuz otherwise you can have way to many data points
# multiple columns -> idk how to do this but could be useful
# fancy filters -> maybe have a prequery that asks if fanccy filtering is necessary, then if it is modify the global df