开发者

Worker/Timeslot permutation/constraint filtering algorithm

Hope you can help me out with this guys. It's not help with work -- it's for a charity of very hard working volunteers, who could really use a less confusing/annoying timetable system than what they currently have.

If anyone knows of a good third-party app which (certainly) automate this, that would almost as good. Just... please don't suggest random timetabling stuff such as the ones for booking classrooms, as I don't think they can do this.

Thanks in advance for reading; I know it's a big post. I'm trying to do my best to document this clearly though, and to show that I've made efforts on my own.

Problem

I need a worker/timeslot scheduling algorithm which generates shifts for workers, which meets the following criteria:

Input Data

import datetime.datetime as dt

class DateRange:
    def __init__(self, start, end):
        self.start = start
        self.end   = end

class Shift:
    def __init__(self, range, min, max):
        self.range = range
        self.min_workers = min
        self.max_workers = max

tue_9th_10pm = dt(2009, 1, 9,   22, 0)
wed_10th_4am = dt(2009, 1, 10,   4, 0)
wed_10th_10am = dt(2009, 1, 10, 10, 0)

shift_1_times = Range(tue_9th_10pm, wed_10th_4am)
shift_2_times = Range(wed_10th_4am, wed_10th_10am)
shift_3_times = Range(wed_10th_10am, wed_10th_2pm)

shift_1 = Shift(shift_1_times, 2,3)  # allows 3, requires 2, but only 2 available
shift_2 = Shift(shift_2_times, 2,2)  # allows 2
shift_3 = Shift(shift_3_times, 2,3)  # allows 3, requires 2, 3 available

shifts = ( shift_1, shift_2, shift_3 )

joe_avail = [ shift_1, shift_2 ]
bob_avail = [ shift_1, shift_3 ]
sam_avail = [ shift_2 ]
amy_avail = [ shift_2 ]
ned_avail = [ shift_2, shift_3 ]
max_avail = [ shift_3 ]
jim_avail = [ shift_3 ]

joe = Worker('joe', joe_avail)
bob = Worker('bob', bob_avail)
sam = Worker('sam', sam_avail)
ned = Worker('ned', ned_avail)
max = Worker('max', max_avail)
amy = Worker('amy', amy_avail)
jim = Worker('jim', jim_avail)

workers = ( joe, bob, sam, ned, max, amy, jim )

Processing

From above, shifts and workers are the two main input variables to process

Each shift has a minimum and maximum number of workers needed. Filling the minimum requirements for a shift is crucial to success, but if all else fails, a rota with gaps to be filled manually is better than "error" :) The main algorithmic issue is that there shouldn't be unnecessary gaps, when enough workers are available.

Ideally, the maximum number of workers for a shift would be filled, but this is the lowest priority relative to other constraints, so if anything has to give, it should be this.

Flexible constraints

These are a little flexible, and their boundaries can be pushed a little if a "perfect" solution can't be found. This flexibility should be a last resort though, rather than being exploited randomly. Ideally, the flexibility would be configurable with a "fudge_factor" variable, or similar.

  • There is a minimum time period between two shifts. So, a worker shouldn't be scheduled for two shifts in the same day, for instance.
  • There are a maximum number of shifts a worker can do in a given time period (say, a month)
  • There are a maximum number of certain shifts that can be done in a month (say, overnight shifts)

Nice to have, but not necessary

If you can come up with an algorithm which does the above and includes any/all of these, I'll be seriously impressed and grateful. Even an add-on script to do these bits separately would be great too.

  • Overlapping shifts. For instance, it would be good to be able to specify a "front desk" shift and a "back office" shift that both occur at the same time. This could be done with separate invocations of the program with different shift data, except that the constraints about scheduling people for multiple shifts in a given time period would be missed.

  • Minimum reschedule time period for workers specifiable on a per-worker (rather than global) basis. For instance, if Joe is feeling overworked or is dealing with personal issues, or is a beginner learning the ropes, we might want to schedule him less often than other workers.

  • Some automated/random/fair way of selecting staff to fill minimum shift numbers when no available workers fit.

  • Some way of handling sudden cancellations, and just filling the gaps without rearranging other shifts.

Output Test

Probably, the algorithm should generate as many matching Solutions as possible, where each Solution looks like this:

class Solution:
    def __init__(self, shifts_workers):
        """shifts_workers -- a dictionary of shift objects as keys, and a
        a lists of workers filling the shift as values."""

        assert isinstance(dict, shifts_workers)
        self.shifts_workers = shifts_workers

Here's a test function for an individual solution, given the above data. I think this is right, but I'd appreciate some peer review on it too.

def check_solution(solution):
  assert isinstance(Solution, solution)

  def shift_check(shift, workers, workers_allowed):
      assert isinstance(Shift, shift):
      assert isinstance(list, workers):
      assert isinstance(list, workers_allowed)

      num_workers = len(workers)
      assert num_workers >= shift.min_workers
      assert num_workers <= shift.max_workers

      for w in workers_allowed:
          assert w in workers

  shifts_workers = solution.shifts_workers

  # all shifts should be covered
  assert len(shifts_workers.keys()) == 3
  assert shift1 in shifts_workers.keys()
  assert shift2 in shifts_workers.keys()
  assert shift3 in shifts_workers.keys()

  # shift_1 should be covered by 2 people - joe, and bob
  shift_check(shift_1, shifts_workers[shift_1], (joe, bob))

  # shift_2 should be covered by 2 people - sam and amy
  shift_check(shift_2, shifts_workers[shift_2], (sam, amy))

  # shift_3 should be covered by 3 people - ned, max, and jim
  shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))

Attempts

I've tried implementing this with a Genetic Algorithm, but can't seem to get it tuned quite right, so although the basic principle seems to work on single shifts, it can't solve even easy cases with a few shifts and a few workers.

My latest attempt is to generate every possible permutation as a solution, then whittle down the permutations that don't meet the constraints. This seems to work much more quickly, and has gotten me further, but I'm using python 2.6's itertools.product() to help generate the permutations, and I can't quite get it right. It wouldn't surprise me if there are many bugs as, honestly, the problem doesn't fit in my head that well :)

Currently my code for this is in two files: models.py and rota.py. models.py looks like:

# -*- coding: utf-8 -*-
class Shift:
    def __init__(self, start_datetime, end_datetime, min_coverage, max_coverage):
        self.start = start_datetime
        self.end = end_datetime
        self.duration = self.end - self.start
        self.min_coverage = min_coverage
        self.max_coverage = max_coverage

    def __repr__(self):
        return "<Shift %s--%s (%r<x<%r)" % (self.start, self.end, self.min_coverage, self.max_coverage)

class Duty:
    def __init__(self, worker, shift, slot):
        self.worker = worker
        self.shift = shift
        self.slot = slot

    def __repr__(self):
        return "<Duty worker=%r shift=%r slot=%d>" % (self.worker, self.shift, self.slot)

    def dump(self,  indent=4,  depth=1):
        ind = " " * (indent * depth)
        print ind + "<Duty shift=%s slot=%s" % (self.shift, self.slot)
        self.worker.dump(indent=indent, depth=depth+1)
        print ind + ">"

class Avail:
    def __init__(self, start_time, end_time):
        self.start = start_time
        self.end = end_time

    def __repr__(self):
        return "<%s to %s>" % (self.start, self.end)

class Worker:
    def __init__(self, name, availabilities):
        self.name = name
        self.availabilities = availabilities

    def __repr__(self):
        return "<Worker %s Avail=%r>" % (self.name, self.availabilities)

    def dump(self,  indent=4,  depth=1):
        ind = " " * (indent * depth)
        print ind + "<Worker %s" % self.name
        for avail in self.availabilities:
            print ind + " " * indent + repr(avail)
        print ind + ">"

    def available_for_shift(self,  shift):
        for a in self.availabilities:
            if shift.start >= a.start and shift.end <= a.end:
                return True
        print "Worker %s not available for %r (Availability: %r)" % (self.name,  shift, self.availabilities)
        return False

class Solution:
    def __init__(self, shifts):
        self._shifts = list(shifts)

    def __repr__(self):
        return "<Solution: shifts=%r>" % self._shifts

    def duties(self):
        d = []
        for s in self._shifts:
            for x in s:
                yield x

    def shifts(self):
        return list(set([ d.shift for d in self.duties() ]))

    def dump_shift(self, s, indent=4, depth=1):
        ind = " " * (indent * depth)
        print ind + "<ShiftList"
        for duty in s:
            duty.dump(indent=indent, depth=depth+1)
        print ind + ">"

    def dump(self, indent=4, depth=1):
        ind = " " * (indent * depth)
        print ind + "<Solution"
        for s in self._shifts:
            self.dump_shift(s, indent=indent, depth=depth+1)
        print ind + ">"

class Env:
    def __init__(self, shifts, workers):
        self.shifts = shifts
        self.workers = workers
        self.fittest = None
        self.generation = 0

class DisplayContext:
    def __init__(self,  env):
        self.env = env

    def status(self, msg, *args):
        raise NotImplementedError()

    def cleanup(self):
        pass

    def update(self):
        pass

and rota.py looks like:

#!/usr/bin/env python2.6
# -*- coding: utf-8 -*-

from datetime import datetime as dt
am2 = dt(2009,  10,  1,  2, 0)
am8 = dt(2009,  10,  1,  8, 0)
pm12 = dt(2009,  10,  1,  12, 0)

def duties_for_all_workers(shifts, workers):
    from models import Duty

    duties = []

    # for all shifts
    for shift in shifts:
        # for all slots
        for cov in range(shift.min_coverage, shift.max_coverage):
            for slot in range(cov):
                # for all workers
                for worker in workers:
                    # generate a duty
                    duty = Duty(worker, shift, slot+1)
                    duties.append(duty)

    return duties

def filter_duties_for_shift(duties,  shift):
    matching_duties = [ d for d in duties if d.shift == shift ]
    for m in matching_duties:
        yield m

def duty_permutations(shifts,  duties):
    from itertools import product

    # build a list of shifts
    shift_perms = []
    for shift in shifts:
        shift_duty_perms = []
        for slot in range(shift.max_coverage):
            slot_duties = [ d for d in duties if d.shift == shift and d.slot == (slot+1) ]
            shift_duty_perms.append(slot_duties)
        shift_perms.append(shift_duty_perms)

    all_perms = ( shift_perms,  shift_duty_perms )

    # generate all possible duties for all shifts
    perms = list(product(*shift_perms))
    return perms

def solutions_for_duty_permutations(permutations):
    from models import Solution
    res = []
    for duties in permutations:
        sol = Solution(duties)
        res.append(sol)
    return res

def find_clashing_duties(duty, duties):
    """Find duties for the same worker that are too close together"""
    from datetime import timedelta
    one_day = timedelta(days=1)
    one_day_before = duty.shift.start - one_day
    one_day_after = duty.shift.end + one_day
    for d in [ ds for ds in duties if ds.worker == duty.worker ]:
        # skip the duty we're considering, as it can't clash with itself
        if duty == d:
            continue

        clashes = False

        # check if dates are too close to another shift
        if d.shift.start >= one_day_before and d.shift.start <= one_day_after:
            clashes = True

        # check if slots collide with another shift
        if d.slot == duty.slot:
            clashes = True

        if clashes:
            yield d

def filter_unwanted_shifts(solutions):
    from models import Solution

    print "possibly unwanted:",  solutions
    new_solutions = []
    new_duties = []

    for sol in solutions:
        for duty in sol.duties():
            duty_ok = True

            if not duty.worker.available_for_shift(duty.shift):
                duty_ok = False

            if duty_ok:
                print "duty OK:"
                duty.dump(depth=1)
                new_duties.append(duty)
            else:
                print "duty **NOT** OK:"
                duty.dump(depth=1)

        shifts = set([ d.shift for d in new_duties ])
        shift_lists = []
        for s in shifts:
            shift_duties = [ d for d in new_duties if d.shift == s ]
            shift_lists.append(shift_duties)

        new_solutions.append(Solution(shift_lists))

    return new_solutions

def filter_clashing_duties(solutions):
    new_solutions = []

    for sol in solutions:
        solution_ok = True

        for dut开发者_C百科y in sol.duties():
            num_clashing_duties = len(set(find_clashing_duties(duty, sol.duties())))

            # check if many duties collide with this one (and thus we should delete this one
            if num_clashing_duties > 0:
                solution_ok = False
                break

        if solution_ok:
            new_solutions.append(sol)

    return new_solutions

def filter_incomplete_shifts(solutions):
    new_solutions = []

    shift_duty_count = {}

    for sol in solutions:
        solution_ok = True

        for shift in set([ duty.shift for duty in sol.duties() ]):
            shift_duties = [ d for d in sol.duties() if d.shift == shift ]
            num_workers = len(set([ d.worker for d in shift_duties ]))

            if num_workers < shift.min_coverage:
                solution_ok = False

        if solution_ok:
            new_solutions.append(sol)

    return new_solutions

def filter_solutions(solutions,  workers):
    # filter permutations ############################
    # for each solution
    solutions = filter_unwanted_shifts(solutions)
    solutions = filter_clashing_duties(solutions)
    solutions = filter_incomplete_shifts(solutions)
    return solutions

def prioritise_solutions(solutions):
    # TODO: not implemented!
    return solutions

    # prioritise solutions ############################
    # for all solutions
        # score according to number of staff on a duty
        # score according to male/female staff
        # score according to skill/background diversity
        # score according to when staff last on shift

    # sort all solutions by score

def solve_duties(shifts, duties,  workers):
    # ramify all possible duties #########################
    perms = duty_permutations(shifts,  duties)
    solutions = solutions_for_duty_permutations(perms)
    solutions = filter_solutions(solutions,  workers)
    solutions = prioritise_solutions(solutions)
    return solutions

def load_shifts():
    from models import Shift
    shifts = [
        Shift(am2, am8, 2, 3),
        Shift(am8, pm12, 2, 3),
    ]
    return shifts

def load_workers():
    from models import Avail, Worker
    joe_avail = ( Avail(am2,  am8), )
    sam_avail = ( Avail(am2,  am8), )
    ned_avail = ( Avail(am2,  am8), )
    bob_avail = ( Avail(am8,  pm12), )
    max_avail = ( Avail(am8,  pm12), )
    joe = Worker("joe", joe_avail)
    sam = Worker("sam", sam_avail)
    ned = Worker("ned", sam_avail)
    bob = Worker("bob", bob_avail)
    max = Worker("max", max_avail)
    return (joe, sam, ned, bob, max)

def main():
    import sys

    shifts = load_shifts()
    workers = load_workers()
    duties = duties_for_all_workers(shifts, workers)
    solutions = solve_duties(shifts, duties, workers)

    if len(solutions) == 0:
        print "Sorry, can't solve this.  Perhaps you need more staff available, or"
        print "simpler duty constraints?"
        sys.exit(20)
    else:
        print "Solved.  Solutions found:"
        for sol in solutions:
            sol.dump()

if __name__ == "__main__":
    main()

Snipping the debugging output before the result, this currently gives:

Solved.  Solutions found:
    <Solution
        <ShiftList
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker joe
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker sam
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker ned
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
        >
        <ShiftList
            <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                <Worker bob
                    <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                <Worker max
                    <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                >
            >
        >
    >


I've tried implementing this with a Genetic Algorithm, but can't seem to get it tuned quite right, so although the basic principle seems to work on single shifts, it can't solve even easy cases with a few shifts and a few workers.

In short, don't! Unless you have lots of experience with genetic algorithms, you won't get this right.

  • They are approximate methods that do not guarantee converging to a workable solution.
  • They work only if you can reasonably well establish the quality of your current solution (i.e. number of criteria not met).
  • Their quality critically depends on the quality of operators you use to combine/mutate previous solutions into new ones.

It is a tough thing to get right in small python program if you have close to zero experience with GA. If you have a small group of people exhaustive search is not that bad option. The problem is that it may work right for n people, will be slow for n+1 people and will be unbearably slow for n+2 and it may very well be that your n will end up as low as 10.

You are working on an NP-complete problem and there are no easy win solutions. If the fancy timetable scheduling problem of your choice does not work good enough, it is very unlikely you will have something better with your python script.

If you insist on doing this via your own code, it is much easier to get some results with min-max or simulated annealing.


Okay, I don't know about a particular algorithm, but here is what I would take into consideration.

Evaluation

Whatever the method you will need a function to evaluate how much your solution is satisfying the constraints. You may take the 'comparison' approach (no global score but a way to compare two solutions), but I would recommend evaluation.

What would be real good is if you could obtain a score for a shorter timespan, for example daily, it is really helpful with algorithms if you can 'predict' the range of the final score from a partial solution (eg, just the first 3 days out of 7). This way you can interrupt the computation based on this partial solution if it's already too low to meet your expectations.

Symmetry

It is likely that among those 200 people you have similar profiles: ie people sharing the same characteristics (availability, experience, willingness, ...). If you take two persons with the same profile, they are going to be interchangeable:

  • Solution 1: (shift 1: Joe)(shift 2: Jim) ...other workers only...
  • Solution 2: (shift 1: Jim)(shift 2: Joe) ...other workers only...

are actually the same solution from your point of view.

The good thing is that usually, you have less profiles than persons, which helps tremendously with the time spent in computation!

For example, imagine that you generated all the solutions based on Solution 1, then there is no need to compute anything based on Solution 2.

Iterative

Instead of generating the whole schedule at once, you may consider generating it incrementally (say 1 week at a time). The net gain is that the complexity for a week is reduced (there are less possibilities).

Then, once you have this week, you compute the second one, being careful of taking the first into account the first for your constraints of course.

The advantage is that you explicitly design you algorithm to take into account an already used solution, this way for the next schedule generation it will make sure not to make a person work 24hours straight!

Serialization

You should consider the serialization of your solution objects (pick up your choice, pickle is quite good for Python). You will need the previous schedule when generating a new one, and I bet you'd rather not enter it manually for the 200 people.

Exhaustive

Now, after all that, I would actually favor an exhaustive search since using symmetry and evaluation the possibilities might not be so numerous (the problem remains NP-complete though, there is no silver bullet).

You may be willing to try your hand at the Backtracking Algorithm.

Also, you should take a look at the following links which deal with similar kind of problems:

  • Python Sudoku Solver
  • Java Sudoku Solver using Knuth Dancing Links (uses the backtracking algorithm)

Both discuss the problems encountered during the implementation, so checking them out should help you.


I don't have an algorithm choice but I can relate some practical considerations.

Since the algorithm is dealing with cancellations, it has to run whenever a scheduling exception occurs to reschedule everyone.

Consider that some algorithms are not very linear and might radically reschedule everyone from that point forward. You probably want to avoid that, people like to know their schedules well in advance.

You can deal with some cancellations without rerunning the algorithm because it can pre-schedule the next available person or two.

It might not be possible or desirable to always generate the most optimal solution, but you can keep a running count of "less-than-optimal" events per worker, and always choose the worker with the lowest count when you have to assign another "bad choice". That's what people generally care about (several "bad" scheduling decisions frequently/unfairly).

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜