carebuds / kids_main.py
quantresearch1's picture
first stab
06a79da
raw
history blame contribute delete
No virus
9.03 kB
import pandas as pd
import gurobipy as gp
from enum import Enum
def optimize_schedule_for_pool(customer_schedule: pd.DataFrame) -> pd.DataFrame:
"""This function assumes the user submitted a pool of 3 with one child per family and school, each with the same hourly needs"""
# customer schedule has names capitalized as it is front facing
customer_schedule.columns = map(str.lower, customer_schedule.columns)
# Import inside function are very bad. It can be fixed by having a proper database
from database import offer
offer = pd.concat([offer[customer_schedule.columns], customer_schedule]).set_index(
"family", verify_integrity=True
)
assert set(offer.values.reshape(-1)).issubset(
{0, 1}
), "Incorrect schedule format, only 0 or 1 is allowed as inputs"
m = gp.Model("Carebuds")
workers = offer.index.to_list()
# Number of workers required for each shift.
shifts, shiftRequirements = gp.multidict(
{"monday": 1, "tuesday": 1, "wednesday": 1, "thursday": 1, "friday": 1}
)
availability = []
for family, row in offer.iterrows():
availability.extend([(family, day) for day in offer.columns[row == 1]])
# Initialize assignment decision variables.
x = m.addVars(availability, vtype=gp.GRB.BINARY, name="x")
# Slack decision variables determine the number of extra workers required to satisfy the requirements
# of each shift
slacks = m.addVars(shifts, name="Slack")
# Auxiliary variable totSlack to represent the total number of extra workers required to satisfy the
# requirements of all the shifts.
totSlack = m.addVar(name="totSlack")
# Auxiliary variable totShifts counts the total shifts worked by each employed worker
totShifts = m.addVars(workers, name="TotShifts")
# Constraint: All shifts requirements most be satisfied.
shift_reqmts = m.addConstrs(
(x.sum("*", s) + slacks[s] == shiftRequirements[s] for s in shifts),
name="shiftRequirement",
)
# Constraint: set the auxiliary variable (totSlack) equal to the total number of extra workers
# required to satisfy shift requirements
num_temps = m.addConstr(totSlack == slacks.sum(), name="totSlack")
# Constraint: compute the total number of shifts for each worker
num_shifts = m.addConstrs(
(totShifts[w] == x.sum(w, "*") for w in workers), name="totShifts"
)
# Auxiliary variables.
# minShift is the minimum number of shifts allocated to workers
# maxShift is the maximum number of shifts allocated to workers
minShift = m.addVar(name="minShift")
maxShift = m.addVar(name="maxShift")
# Constraint:
# The addGenConstrMin() method of the model object m adds a new general constraint that
# determines the minimum value among a set of variables.
# The first argument is the variable whose value will be equal to the minimum of the other variables,
# minShift in this case.
# The second argument is the set variables over which the minimum will be taken, (totShifts) in
# this case.
# Recall that the totShifts variable is defined over the set of worker and determines the number of
# shifts that an employed worker will work. The third argument is the name of this constraint.
min_constr = m.addGenConstrMin(minShift, totShifts, name="minShift")
# Constraint:
# Similarly, the addGenConstrMax() method of the model object m adds a new general
# constraint that determines the maximum value among a set of variables.
max_constr = m.addGenConstrMax(maxShift, totShifts, name="maxShift")
# Set global sense for ALL objectives.
# This means that all objectives of the model object m are going to be minimized
m.ModelSense = gp.GRB.MINIMIZE
# set_single_objective(m=m, totSlack=totSlack)
set_objectives(m=m, totSlack=totSlack, maxShift=maxShift, minShift=minShift)
# Optimize
# This method runs the optimization engine to solve the MIP problem in the model object m
m.optimize()
check_status(m)
assignments_all = get_solution(workers, availability, x, totSlack, totShifts)
calendar = KPI(assignments_all, shifts)
calendar.columns = [c.capitalize() for c in calendar.columns]
return calendar
class Duty(Enum):
OnDuty = "On Duty"
OffDuty = "Off Duty"
def check_status(m: gp.Model) -> None:
# The Status attribute provides current optimization status of the model object m
# In workforce model, we check if the model is infeasible or unbounded and report this situation
status = m.Status
if (
status == gp.GRB.Status.INF_OR_UNBD
or status == gp.GRB.Status.INFEASIBLE
or status == gp.GRB.Status.UNBOUNDED
):
m.write("workforce.iis")
raise ValueError(
"The model cannot be solved because it is infeasible or unbounded"
)
# If the optimization status of the model is not optimal for some other reason, we report that
# situation.
if status != gp.GRB.Status.OPTIMAL:
print("Optimization was stopped with status " + str(status))
return None
def set_single_objective(m: gp.Model, totSlack: gp.Var) -> None:
m.setObjective(totSlack)
def set_objectives(
m: gp.Model, totSlack: gp.Var, maxShift: gp.Var, minShift: gp.Var
) -> None:
# The setObjectiveN() method of the model object m allows to define multiple objectives.
# The first argument is the linear expression defining the most important objective, called primary
# objective, in this case it is the minimization of extra workers required to satisfy shift
# requirements.
# The second argument is the index of the objective function, we set the index of the primary
# objective to be equal to 0.
# The third argument is the priority of the objective.
# The fourth argument is the relative tolerance to degrade this objective when a lower priority
# objective is optimized. The fifth argument is the name of this objective.
# A hierarchical or lexicographic approach assigns a priority to each objective, and optimizes
# for the objectives in decreasing priority order.
# For this problem, we have two objectives, and the primary objective has the highest priority
# which is equal to 2.
# When the secondary objective is minimized, since the relative tolerance is 0.2, we can only
# increase the minimum number of extra workers up to 20%.
# For example if the minimum number extra workers is 10, then when optimizing the secondary objective
# we can have up to 12 extra workers.
m.setObjectiveN(totSlack, index=0, priority=2, reltol=0.2, name="TotalSlack")
# Set up secondary objective.
# The secondary objective is called fairness and its goal is to balance the workload assigned
# to the employed workers.
# To balance the workload assigned to the employed workers, we can minimize the difference
# between the maximum number of shifts assigned to an employed worker and the minimum number
# of shifts assigned to an employed worker.
m.setObjectiveN(maxShift - minShift, index=1, priority=1, name="Fairness")
return None
def get_solution(workers, availability, x: gp.Var, totSlack: gp.Var, totShifts: gp.Var):
# Print total slack and the number of shifts worked for each worker
solution = {}
shifts_sol = {}
solution["Total slack required"] = str(totSlack.X)
assignments_all = {}
assignments = dict()
for [w, s] in availability:
if x[w, s].x == 1:
if w in assignments:
assignments[w].append(s)
else:
assignments[w] = [s]
print(pd.DataFrame.from_records(list(solution.items()), columns=["KPI", "Value"]))
print("-" * 50)
for w in workers:
shifts_sol[w] = totShifts[w].X
assignments_all[w] = assignments.get(w, [])
print("Shifts")
sol_df = pd.DataFrame.from_records(
list(shifts_sol.items()), columns=["Worker", "Number of shifts"]
)
print(sol_df)
return assignments_all
def KPI(assignments_all, shifts) -> pd.DataFrame:
# The KPIs for this optimization number is the number of extra worked required to satisfy
# demand and the number of shifts that each employed worker is working.
gant = {}
print("-" * 50)
for w in assignments_all:
gant[w] = [w]
for d in shifts:
gant[w].append(Duty.OnDuty if d in assignments_all[w] else Duty.OffDuty)
return pd.DataFrame.from_records(list(gant.values()), columns=["family"] + shifts)
if __name__ == "__main__":
solution = optimize_schedule_for_pool(
customer_schedule=pd.DataFrame(
# [("Doe", 1, 0, 1, 1, 1)],
[("Doe", 1, 0, 0, 0, 0)],
columns=["family", "monday", "tuesday", "wednesday", "thursday", "friday"],
)
)
print("Assigments")
print("Symbols: '-': not working, '*': working")
pd.set_option("display.width", 1000)
print(solution)