开发者

Python threading: What am I missing? (task_done() called too many times)

My apologies for the long-ish post up front. Hopefully it'll give enough context for a solution. I've tried to create a utility function that will take any number of old classmethods and stick them into a multi-threaded queue:

class QueuedCall(threading.Thread):

    def __init__(self, name, queue, fn, args, cb):
        threading.Thread.__init__(self)
        self.name = name

        self._cb = cb
        self._fn = fn
        self._queue = queue
        self._args = args

        self.daemon = True
        self.start()

    def run(self):
        r = self._fn(*self._args) if self._args is not None \
            else self._fn()

        if self._cb is not None:
            self._cb(self.name, r)

            self._queue.task_done()

Here's what my calling code looks like (within a class)

data = {}
def __op_complete(name, r):
    data[name] = r

q = Queue.Queue()

socket.setdefaulttimeout(5)

q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete))
q.put(QueuedCall('so_answers', q, StackExchange.get_answers,
    ['api.stackoverflow.com', 534476, 5], __op_complete))
q.put(QueuedCall('so_user', q, StackExchange.get_user_info,
    ['api.stackoverflow.com', 534476], __op_complete))
q.put(QueuedCall('p_answers', q, StackExchange.get_answers,
    ['api.programmers.stackexchange.com', 23901, 5], __op_complete))
q.put(QueuedCall('p_user', q, StackExchange.get_user_info,
    ['api.programmers.stackexchange.com', 23901], __op_complete))
q.put(QueuedCall('fb_image', q, Facebook.get_latest_picture, None, __op_complete))

q.join()
return data

The problem that I'm running into here is that it seems to work every time on a fresh server restart, but fails every second or third request, with the error:

ValueError: task_done() called too many times

This error presents itself in a random thread every second or third request, so it's rather difficult to nail down exactly what the problem is.

Anyone have any ideas and/or suggestions?

Thanks.


Edit:

I had added prints in an effort to debug this (quick and dirty rather than logging). One print statement (print 'running thread: %s' % self.name) in the first line of run and another right before calling task_done() (print 'thread done: %s' % self.name).

The output of a successful request:

running thread: twitter
running thread: so_answers
running thread: so_user
running thread: p_answers
thread done: twitter
thread done: so_user
running thread: p_user
thread done: so_answers
running thread: fb_image
thread done: p_answers
thread done: p_user
thread done: fb_image

The output of an unsuccessful request:

running thread: twitter
running thread: so_answers
thread done: twitter
thread done: so_answers
running thread: so_user
thread done: so_user
running thread: p_answers
thread done: p_answers
Exception in thread p_answers:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/home/demian/src/www/projects/demianbrecht/demianbrecht/demianbrecht/helpers.py", line 37, in run
    self._queue.task_done()
  File "/usr/lib/python2.7/Queue.py", line 64, in task_done
    raise ValueError('task_done() called too many t开发者_运维百科imes')
ValueError: task_done() called too many times

running thread: p_user
thread done: p_user
running thread: fb_image
thread done: fb_image


Your approach to this problem is "unconventional". But ignoring that for now ... the issue is simply that in the code you have given

q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete))

it is clearly possible for the following workflow to occur

  1. A thread is constructed and started by QueuedCall.__init__
  2. It is then put into the queue q. However ... before the Queue completes its logic for inserting the item, the independent thread has already finished its work and attempted to call q.task_done(). Which causes the error you have (task_done() has been called before the object was safely put into the queue)

How it should be done? You don't insert threads into queues. Queues hold data that threads process. So instead you

  • Create a Queue. Insert into it jobs you want done (as eg functions, the args they want and the callback)
  • You create and start worker threads
  • A worker thread calls
    • q.get() to get the function to invoke
    • invokes it
    • calls q.task_done() to let the queue know the item was handled.


I may be misunderstanding here, but I'm not sure you're using the Queue correctly.

From a brief survey of the docs, it looks like the idea is that you can use the put method to put work into a Queue, then another thread can call get to get some work out of it, do the work, and then call task_done when it has finished.

What your code appears to do is put instances of QueuedCall into a queue. Nothing ever gets from the queue, but the QueuedCall instances are also passed a reference to the queue they're being inserted into, and they do their work (which they know about intrinsically, not because they get it from the queue) and then call task_done.

If my reading of all that is correct (and you don't call the get method from somewhere else I can't see), then I believe I understand the problem.

The issue is that the QueuedCall instances have to be created before they can be put on the queue, and the act of creating one starts its work in another thread. If the thread finishes its work and calls task_done before the main thread has managed to put the QueuedCall into the queue, then you can get the error you see.

I think it only works when you run it the first time by accident. The GIL 'helps' you a lot; it's not very likely that the QueuedCall thread will actually gain the GIL and begin running immediately. The fact that you don't actually care about the Queue other than as a counter also 'helps' this appear to work: it doesn't matter if the QueuedCall hasn't hit the queue yet so long as it's not empty (this QueuedCall can just task_done another element in the queue, and by the time that element calls task_done this one will hopefully be in the queue, and it can be marked as done by that). And adding sleep also makes the new threads wait a bit, giving the main thread time to make sure they're actually in the queue, which is why that masks the problem as well.

Also note that, as far as I can tell from some quick fiddling with an interactive shell, your queue is actually still full at the end, because you never actually get anything out of it. It's just received a number of task_done messages equal to the number of things that were put in it, so the join works.

I think you'll need to radically redesign the way your QueuedCall class works, or use a different synchronisation primitive than a Queue. A Queue is designed to be used to queue work for worker threads that already exist. Starting a thread from within a constructor for an object that you put on the queue isn't really a good fit.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜