Worker/Timeslot permutation/constraint filtering algorithm
Hope you can help me out with this guys. It's not help with work -- it's for a charity of very hard working volunteers, who could really use a less confusing/annoying timetable system than what they currently have.
If anyone knows of a good third-party app which (certainly) automate this, that would almost as good. Just... please don't suggest random timetabling stuff such as the ones for booking classrooms, as I don't think they can do this.
Thanks in advance for reading; I know it's a big post. I'm trying to do my best to document this clearly though, and to show that I've made efforts on my own.
Problem
I need a worker/timeslot scheduling algorithm which generates shifts for workers, which meets the following criteria:
Input Data
import datetime.datetime as dt
class DateRange:
def __init__(self, start, end):
self.start = start
self.end = end
class Shift:
def __init__(self, range, min, max):
self.range = range
self.min_workers = min
self.max_workers = max
tue_9th_10pm = dt(2009, 1, 9, 22, 0)
wed_10th_4am = dt(2009, 1, 10, 4, 0)
wed_10th_10am = dt(2009, 1, 10, 10, 0)
shift_1_times = Range(tue_9th_10pm, wed_10th_4am)
shift_2_times = Range(wed_10th_4am, wed_10th_10am)
shift_3_times = Range(wed_10th_10am, wed_10th_2pm)
shift_1 = Shift(shift_1_times, 2,3) # allows 3, requires 2, but only 2 available
shift_2 = Shift(shift_2_times, 2,2) # allows 2
shift_3 = Shift(shift_3_times, 2,3) # allows 3, requires 2, 3 available
shifts = ( shift_1, shift_2, shift_3 )
joe_avail = [ shift_1, shift_2 ]
bob_avail = [ shift_1, shift_3 ]
sam_avail = [ shift_2 ]
amy_avail = [ shift_2 ]
ned_avail = [ shift_2, shift_3 ]
max_avail = [ shift_3 ]
jim_avail = [ shift_3 ]
joe = Worker('joe', joe_avail)
bob = Worker('bob', bob_avail)
sam = Worker('sam', sam_avail)
ned = Worker('ned', ned_avail)
max = Worker('max', max_avail)
amy = Worker('amy', amy_avail)
jim = Worker('jim', jim_avail)
workers = ( joe, bob, sam, ned, max, amy, jim )
Processing
From above, shifts and workers are the two main input variables to process
Each shift has a minimum and maximum number of workers needed. Filling the minimum requirements for a shift is crucial to success, but if all else fails, a rota with gaps to be filled manually is better than "error" :) The main algorithmic issue is that there shouldn't be unnecessary gaps, when enough workers are available.
Ideally, the maximum number of workers for a shift would be filled, but this is the lowest priority relative to other constraints, so if anything has to give, it should be this.
Flexible constraints
These are a little flexible, and their boundaries can be pushed a little if a "perfect" solution can't be found. This flexibility should be a last resort though, rather than being exploited randomly. Ideally, the flexibility would be configurable with a "fudge_factor" variable, or similar.
- There is a minimum time period between two shifts. So, a worker shouldn't be scheduled for two shifts in the same day, for instance.
- There are a maximum number of shifts a worker can do in a given time period (say, a month)
- There are a maximum number of certain shifts that can be done in a month (say, overnight shifts)
Nice to have, but not necessary
If you can come up with an algorithm which does the above and includes any/all of these, I'll be seriously impressed and grateful. Even an add-on script to do these bits separately would be great too.
Overlapping shifts. For instance, it would be good to be able to specify a "front desk" shift and a "back office" shift that both occur at the same time. This could be done with separate invocations of the program with different shift data, except that the constraints about scheduling people for multiple shifts in a given time period would be missed.
Minimum reschedule time period for workers specifiable on a per-worker (rather than global) basis. For instance, if Joe is feeling overworked or is dealing with personal issues, or is a beginner learning the ropes, we might want to schedule him less often than other workers.
Some automated/random/fair way of selecting staff to fill minimum shift numbers when no available workers fit.
Some way of handling sudden cancellations, and just filling the gaps without rearranging other shifts.
Output Test
Probably, the algorithm should generate as many matching Solutions as possible, where each Solution looks like this:
class Solution:
def __init__(self, shifts_workers):
"""shifts_workers -- a dictionary of shift objects as keys, and a
a lists of workers filling the shift as values."""
assert isinstance(dict, shifts_workers)
self.shifts_workers = shifts_workers
Here's a test function for an individual solution, given the above data. I think this is right, but I'd appreciate some peer review on it too.
def check_solution(solution):
assert isinstance(Solution, solution)
def shift_check(shift, workers, workers_allowed):
assert isinstance(Shift, shift):
assert isinstance(list, workers):
assert isinstance(list, workers_allowed)
num_workers = len(workers)
assert num_workers >= shift.min_workers
assert num_workers <= shift.max_workers
for w in workers_allowed:
assert w in workers
shifts_workers = solution.shifts_workers
# all shifts should be covered
assert len(shifts_workers.keys()) == 3
assert shift1 in shifts_workers.keys()
assert shift2 in shifts_workers.keys()
assert shift3 in shifts_workers.keys()
# shift_1 should be covered by 2 people - joe, and bob
shift_check(shift_1, shifts_workers[shift_1], (joe, bob))
# shift_2 should be covered by 2 people - sam and amy
shift_check(shift_2, shifts_workers[shift_2], (sam, amy))
# shift_3 should be covered by 3 people - ned, max, and jim
shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))
Attempts
I've tried implementing this with a Genetic Algorithm, but can't seem to get it tuned quite right, so although the basic principle seems to work on single shifts, it can't solve even easy cases with a few shifts and a few workers.
My latest attempt is to generate every possible permutation as a solution, then whittle down the permutations that don't meet the constraints. This seems to work much more quickly, and has gotten me further, but I'm using python 2.6's itertools.product() to help generate the permutations, and I can't quite get it right. It wouldn't surprise me if there are many bugs as, honestly, the problem doesn't fit in my head that well :)
Currently my code for this is in two files: models.py and rota.py. models.py looks like:
# -*- coding: utf-8 -*-
class Shift:
def __init__(self, start_datetime, end_datetime, min_coverage, max_coverage):
self.start = start_datetime
self.end = end_datetime
self.duration = self.end - self.start
self.min_coverage = min_coverage
self.max_coverage = max_coverage
def __repr__(self):
return "<Shift %s--%s (%r<x<%r)" % (self.start, self.end, self.min_coverage, self.max_coverage)
class Duty:
def __init__(self, worker, shift, slot):
self.worker = worker
self.shift = shift
self.slot = slot
def __repr__(self):
return "<Duty worker=%r shift=%r slot=%d>" % (self.worker, self.shift, self.slot)
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Duty shift=%s slot=%s" % (self.shift, self.slot)
self.worker.dump(indent=indent, depth=depth+1)
print ind + ">"
class Avail:
def __init__(self, start_time, end_time):
self.start = start_time
self.end = end_time
def __repr__(self):
return "<%s to %s>" % (self.start, self.end)
class Worker:
def __init__(self, name, availabilities):
self.name = name
self.availabilities = availabilities
def __repr__(self):
return "<Worker %s Avail=%r>" % (self.name, self.availabilities)
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Worker %s" % self.name
for avail in self.availabilities:
print ind + " " * indent + repr(avail)
print ind + ">"
def available_for_shift(self, shift):
for a in self.availabilities:
if shift.start >= a.start and shift.end <= a.end:
return True
print "Worker %s not available for %r (Availability: %r)" % (self.name, shift, self.availabilities)
return False
class Solution:
def __init__(self, shifts):
self._shifts = list(shifts)
def __repr__(self):
return "<Solution: shifts=%r>" % self._shifts
def duties(self):
d = []
for s in self._shifts:
for x in s:
yield x
def shifts(self):
return list(set([ d.shift for d in self.duties() ]))
def dump_shift(self, s, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<ShiftList"
for duty in s:
duty.dump(indent=indent, depth=depth+1)
print ind + ">"
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Solution"
for s in self._shifts:
self.dump_shift(s, indent=indent, depth=depth+1)
print ind + ">"
class Env:
def __init__(self, shifts, workers):
self.shifts = shifts
self.workers = workers
self.fittest = None
self.generation = 0
class DisplayContext:
def __init__(self, env):
self.env = env
def status(self, msg, *args):
raise NotImplementedError()
def cleanup(self):
pass
def update(self):
pass
and rota.py looks like:
#!/usr/bin/env python2.6
# -*- coding: utf-8 -*-
from datetime import datetime as dt
am2 = dt(2009, 10, 1, 2, 0)
am8 = dt(2009, 10, 1, 8, 0)
pm12 = dt(2009, 10, 1, 12, 0)
def duties_for_all_workers(shifts, workers):
from models import Duty
duties = []
# for all shifts
for shift in shifts:
# for all slots
for cov in range(shift.min_coverage, shift.max_coverage):
for slot in range(cov):
# for all workers
for worker in workers:
# generate a duty
duty = Duty(worker, shift, slot+1)
duties.append(duty)
return duties
def filter_duties_for_shift(duties, shift):
matching_duties = [ d for d in duties if d.shift == shift ]
for m in matching_duties:
yield m
def duty_permutations(shifts, duties):
from itertools import product
# build a list of shifts
shift_perms = []
for shift in shifts:
shift_duty_perms = []
for slot in range(shift.max_coverage):
slot_duties = [ d for d in duties if d.shift == shift and d.slot == (slot+1) ]
shift_duty_perms.append(slot_duties)
shift_perms.append(shift_duty_perms)
all_perms = ( shift_perms, shift_duty_perms )
# generate all possible duties for all shifts
perms = list(product(*shift_perms))
return perms
def solutions_for_duty_permutations(permutations):
from models import Solution
res = []
for duties in permutations:
sol = Solution(duties)
res.append(sol)
return res
def find_clashing_duties(duty, duties):
"""Find duties for the same worker that are too close together"""
from datetime import timedelta
one_day = timedelta(days=1)
one_day_before = duty.shift.start - one_day
one_day_after = duty.shift.end + one_day
for d in [ ds for ds in duties if ds.worker == duty.worker ]:
# skip the duty we're considering, as it can't clash with itself
if duty == d:
continue
clashes = False
# check if dates are too close to another shift
if d.shift.start >= one_day_before and d.shift.start <= one_day_after:
clashes = True
# check if slots collide with another shift
if d.slot == duty.slot:
clashes = True
if clashes:
yield d
def filter_unwanted_shifts(solutions):
from models import Solution
print "possibly unwanted:", solutions
new_solutions = []
new_duties = []
for sol in solutions:
for duty in sol.duties():
duty_ok = True
if not duty.worker.available_for_shift(duty.shift):
duty_ok = False
if duty_ok:
print "duty OK:"
duty.dump(depth=1)
new_duties.append(duty)
else:
print "duty **NOT** OK:"
duty.dump(depth=1)
shifts = set([ d.shift for d in new_duties ])
shift_lists = []
for s in shifts:
shift_duties = [ d for d in new_duties if d.shift == s ]
shift_lists.append(shift_duties)
new_solutions.append(Solution(shift_lists))
return new_solutions
def filter_clashing_duties(solutions):
new_solutions = []
for sol in solutions:
solution_ok = True
for dut开发者_C百科y in sol.duties():
num_clashing_duties = len(set(find_clashing_duties(duty, sol.duties())))
# check if many duties collide with this one (and thus we should delete this one
if num_clashing_duties > 0:
solution_ok = False
break
if solution_ok:
new_solutions.append(sol)
return new_solutions
def filter_incomplete_shifts(solutions):
new_solutions = []
shift_duty_count = {}
for sol in solutions:
solution_ok = True
for shift in set([ duty.shift for duty in sol.duties() ]):
shift_duties = [ d for d in sol.duties() if d.shift == shift ]
num_workers = len(set([ d.worker for d in shift_duties ]))
if num_workers < shift.min_coverage:
solution_ok = False
if solution_ok:
new_solutions.append(sol)
return new_solutions
def filter_solutions(solutions, workers):
# filter permutations ############################
# for each solution
solutions = filter_unwanted_shifts(solutions)
solutions = filter_clashing_duties(solutions)
solutions = filter_incomplete_shifts(solutions)
return solutions
def prioritise_solutions(solutions):
# TODO: not implemented!
return solutions
# prioritise solutions ############################
# for all solutions
# score according to number of staff on a duty
# score according to male/female staff
# score according to skill/background diversity
# score according to when staff last on shift
# sort all solutions by score
def solve_duties(shifts, duties, workers):
# ramify all possible duties #########################
perms = duty_permutations(shifts, duties)
solutions = solutions_for_duty_permutations(perms)
solutions = filter_solutions(solutions, workers)
solutions = prioritise_solutions(solutions)
return solutions
def load_shifts():
from models import Shift
shifts = [
Shift(am2, am8, 2, 3),
Shift(am8, pm12, 2, 3),
]
return shifts
def load_workers():
from models import Avail, Worker
joe_avail = ( Avail(am2, am8), )
sam_avail = ( Avail(am2, am8), )
ned_avail = ( Avail(am2, am8), )
bob_avail = ( Avail(am8, pm12), )
max_avail = ( Avail(am8, pm12), )
joe = Worker("joe", joe_avail)
sam = Worker("sam", sam_avail)
ned = Worker("ned", sam_avail)
bob = Worker("bob", bob_avail)
max = Worker("max", max_avail)
return (joe, sam, ned, bob, max)
def main():
import sys
shifts = load_shifts()
workers = load_workers()
duties = duties_for_all_workers(shifts, workers)
solutions = solve_duties(shifts, duties, workers)
if len(solutions) == 0:
print "Sorry, can't solve this. Perhaps you need more staff available, or"
print "simpler duty constraints?"
sys.exit(20)
else:
print "Solved. Solutions found:"
for sol in solutions:
sol.dump()
if __name__ == "__main__":
main()
Snipping the debugging output before the result, this currently gives:
Solved. Solutions found:
<Solution
<ShiftList
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker joe
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker sam
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker ned
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
>
<ShiftList
<Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
<Worker bob
<2009-10-01 08:00:00 to 2009-10-01 12:00:00>
>
>
<Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
<Worker max
<2009-10-01 08:00:00 to 2009-10-01 12:00:00>
>
>
>
>
I've tried implementing this with a Genetic Algorithm, but can't seem to get it tuned quite right, so although the basic principle seems to work on single shifts, it can't solve even easy cases with a few shifts and a few workers.
In short, don't! Unless you have lots of experience with genetic algorithms, you won't get this right.
- They are approximate methods that do not guarantee converging to a workable solution.
- They work only if you can reasonably well establish the quality of your current solution (i.e. number of criteria not met).
- Their quality critically depends on the quality of operators you use to combine/mutate previous solutions into new ones.
It is a tough thing to get right in small python program if you have close to zero experience with GA. If you have a small group of people exhaustive search is not that bad option. The problem is that it may work right for n
people, will be slow for n+1
people and will be unbearably slow for n+2
and it may very well be that your n
will end up as low as 10.
You are working on an NP-complete problem and there are no easy win solutions. If the fancy timetable scheduling problem of your choice does not work good enough, it is very unlikely you will have something better with your python script.
If you insist on doing this via your own code, it is much easier to get some results with min-max or simulated annealing.
Okay, I don't know about a particular algorithm, but here is what I would take into consideration.
Evaluation
Whatever the method you will need a function to evaluate how much your solution is satisfying the constraints. You may take the 'comparison' approach (no global score but a way to compare two solutions), but I would recommend evaluation.
What would be real good is if you could obtain a score for a shorter timespan, for example daily, it is really helpful with algorithms if you can 'predict' the range of the final score from a partial solution (eg, just the first 3 days out of 7). This way you can interrupt the computation based on this partial solution if it's already too low to meet your expectations.
Symmetry
It is likely that among those 200 people you have similar profiles: ie people sharing the same characteristics (availability, experience, willingness, ...). If you take two persons with the same profile, they are going to be interchangeable:
- Solution 1: (shift 1: Joe)(shift 2: Jim) ...other workers only...
- Solution 2: (shift 1: Jim)(shift 2: Joe) ...other workers only...
are actually the same solution from your point of view.
The good thing is that usually, you have less profiles than persons, which helps tremendously with the time spent in computation!
For example, imagine that you generated all the solutions based on Solution 1, then there is no need to compute anything based on Solution 2.
Iterative
Instead of generating the whole schedule at once, you may consider generating it incrementally (say 1 week at a time). The net gain is that the complexity for a week is reduced (there are less possibilities).
Then, once you have this week, you compute the second one, being careful of taking the first into account the first for your constraints of course.
The advantage is that you explicitly design you algorithm to take into account an already used solution, this way for the next schedule generation it will make sure not to make a person work 24hours straight!
Serialization
You should consider the serialization of your solution objects (pick up your choice, pickle is quite good for Python). You will need the previous schedule when generating a new one, and I bet you'd rather not enter it manually for the 200 people.
Exhaustive
Now, after all that, I would actually favor an exhaustive search since using symmetry and evaluation the possibilities might not be so numerous (the problem remains NP-complete though, there is no silver bullet).
You may be willing to try your hand at the Backtracking Algorithm.
Also, you should take a look at the following links which deal with similar kind of problems:
- Python Sudoku Solver
- Java Sudoku Solver using Knuth Dancing Links (uses the backtracking algorithm)
Both discuss the problems encountered during the implementation, so checking them out should help you.
I don't have an algorithm choice but I can relate some practical considerations.
Since the algorithm is dealing with cancellations, it has to run whenever a scheduling exception occurs to reschedule everyone.
Consider that some algorithms are not very linear and might radically reschedule everyone from that point forward. You probably want to avoid that, people like to know their schedules well in advance.
You can deal with some cancellations without rerunning the algorithm because it can pre-schedule the next available person or two.
It might not be possible or desirable to always generate the most optimal solution, but you can keep a running count of "less-than-optimal" events per worker, and always choose the worker with the lowest count when you have to assign another "bad choice". That's what people generally care about (several "bad" scheduling decisions frequently/unfairly).
精彩评论