import pandas as pd import gurobipy as gp from enum import Enum def optimize_schedule_for_pool(customer_schedule: pd.DataFrame) -> pd.DataFrame: """This function assumes the user submitted a pool of 3 with one child per family and school, each with the same hourly needs""" # customer schedule has names capitalized as it is front facing customer_schedule.columns = map(str.lower, customer_schedule.columns) # Import inside function are very bad. It can be fixed by having a proper database from database import offer offer = pd.concat([offer[customer_schedule.columns], customer_schedule]).set_index( "family", verify_integrity=True ) assert set(offer.values.reshape(-1)).issubset( {0, 1} ), "Incorrect schedule format, only 0 or 1 is allowed as inputs" m = gp.Model("Carebuds") workers = offer.index.to_list() # Number of workers required for each shift. shifts, shiftRequirements = gp.multidict( {"monday": 1, "tuesday": 1, "wednesday": 1, "thursday": 1, "friday": 1} ) availability = [] for family, row in offer.iterrows(): availability.extend([(family, day) for day in offer.columns[row == 1]]) # Initialize assignment decision variables. x = m.addVars(availability, vtype=gp.GRB.BINARY, name="x") # Slack decision variables determine the number of extra workers required to satisfy the requirements # of each shift slacks = m.addVars(shifts, name="Slack") # Auxiliary variable totSlack to represent the total number of extra workers required to satisfy the # requirements of all the shifts. totSlack = m.addVar(name="totSlack") # Auxiliary variable totShifts counts the total shifts worked by each employed worker totShifts = m.addVars(workers, name="TotShifts") # Constraint: All shifts requirements most be satisfied. shift_reqmts = m.addConstrs( (x.sum("*", s) + slacks[s] == shiftRequirements[s] for s in shifts), name="shiftRequirement", ) # Constraint: set the auxiliary variable (totSlack) equal to the total number of extra workers # required to satisfy shift requirements num_temps = m.addConstr(totSlack == slacks.sum(), name="totSlack") # Constraint: compute the total number of shifts for each worker num_shifts = m.addConstrs( (totShifts[w] == x.sum(w, "*") for w in workers), name="totShifts" ) # Auxiliary variables. # minShift is the minimum number of shifts allocated to workers # maxShift is the maximum number of shifts allocated to workers minShift = m.addVar(name="minShift") maxShift = m.addVar(name="maxShift") # Constraint: # The addGenConstrMin() method of the model object m adds a new general constraint that # determines the minimum value among a set of variables. # The first argument is the variable whose value will be equal to the minimum of the other variables, # minShift in this case. # The second argument is the set variables over which the minimum will be taken, (totShifts) in # this case. # Recall that the totShifts variable is defined over the set of worker and determines the number of # shifts that an employed worker will work. The third argument is the name of this constraint. min_constr = m.addGenConstrMin(minShift, totShifts, name="minShift") # Constraint: # Similarly, the addGenConstrMax() method of the model object m adds a new general # constraint that determines the maximum value among a set of variables. max_constr = m.addGenConstrMax(maxShift, totShifts, name="maxShift") # Set global sense for ALL objectives. # This means that all objectives of the model object m are going to be minimized m.ModelSense = gp.GRB.MINIMIZE # set_single_objective(m=m, totSlack=totSlack) set_objectives(m=m, totSlack=totSlack, maxShift=maxShift, minShift=minShift) # Optimize # This method runs the optimization engine to solve the MIP problem in the model object m m.optimize() check_status(m) assignments_all = get_solution(workers, availability, x, totSlack, totShifts) calendar = KPI(assignments_all, shifts) calendar.columns = [c.capitalize() for c in calendar.columns] return calendar class Duty(Enum): OnDuty = "On Duty" OffDuty = "Off Duty" def check_status(m: gp.Model) -> None: # The Status attribute provides current optimization status of the model object m # In workforce model, we check if the model is infeasible or unbounded and report this situation status = m.Status if ( status == gp.GRB.Status.INF_OR_UNBD or status == gp.GRB.Status.INFEASIBLE or status == gp.GRB.Status.UNBOUNDED ): m.write("workforce.iis") raise ValueError( "The model cannot be solved because it is infeasible or unbounded" ) # If the optimization status of the model is not optimal for some other reason, we report that # situation. if status != gp.GRB.Status.OPTIMAL: print("Optimization was stopped with status " + str(status)) return None def set_single_objective(m: gp.Model, totSlack: gp.Var) -> None: m.setObjective(totSlack) def set_objectives( m: gp.Model, totSlack: gp.Var, maxShift: gp.Var, minShift: gp.Var ) -> None: # The setObjectiveN() method of the model object m allows to define multiple objectives. # The first argument is the linear expression defining the most important objective, called primary # objective, in this case it is the minimization of extra workers required to satisfy shift # requirements. # The second argument is the index of the objective function, we set the index of the primary # objective to be equal to 0. # The third argument is the priority of the objective. # The fourth argument is the relative tolerance to degrade this objective when a lower priority # objective is optimized. The fifth argument is the name of this objective. # A hierarchical or lexicographic approach assigns a priority to each objective, and optimizes # for the objectives in decreasing priority order. # For this problem, we have two objectives, and the primary objective has the highest priority # which is equal to 2. # When the secondary objective is minimized, since the relative tolerance is 0.2, we can only # increase the minimum number of extra workers up to 20%. # For example if the minimum number extra workers is 10, then when optimizing the secondary objective # we can have up to 12 extra workers. m.setObjectiveN(totSlack, index=0, priority=2, reltol=0.2, name="TotalSlack") # Set up secondary objective. # The secondary objective is called fairness and its goal is to balance the workload assigned # to the employed workers. # To balance the workload assigned to the employed workers, we can minimize the difference # between the maximum number of shifts assigned to an employed worker and the minimum number # of shifts assigned to an employed worker. m.setObjectiveN(maxShift - minShift, index=1, priority=1, name="Fairness") return None def get_solution(workers, availability, x: gp.Var, totSlack: gp.Var, totShifts: gp.Var): # Print total slack and the number of shifts worked for each worker solution = {} shifts_sol = {} solution["Total slack required"] = str(totSlack.X) assignments_all = {} assignments = dict() for [w, s] in availability: if x[w, s].x == 1: if w in assignments: assignments[w].append(s) else: assignments[w] = [s] print(pd.DataFrame.from_records(list(solution.items()), columns=["KPI", "Value"])) print("-" * 50) for w in workers: shifts_sol[w] = totShifts[w].X assignments_all[w] = assignments.get(w, []) print("Shifts") sol_df = pd.DataFrame.from_records( list(shifts_sol.items()), columns=["Worker", "Number of shifts"] ) print(sol_df) return assignments_all def KPI(assignments_all, shifts) -> pd.DataFrame: # The KPIs for this optimization number is the number of extra worked required to satisfy # demand and the number of shifts that each employed worker is working. gant = {} print("-" * 50) for w in assignments_all: gant[w] = [w] for d in shifts: gant[w].append(Duty.OnDuty if d in assignments_all[w] else Duty.OffDuty) return pd.DataFrame.from_records(list(gant.values()), columns=["family"] + shifts) if __name__ == "__main__": solution = optimize_schedule_for_pool( customer_schedule=pd.DataFrame( # [("Doe", 1, 0, 1, 1, 1)], [("Doe", 1, 0, 0, 0, 0)], columns=["family", "monday", "tuesday", "wednesday", "thursday", "friday"], ) ) print("Assigments") print("Symbols: '-': not working, '*': working") pd.set_option("display.width", 1000) print(solution)