How do I give a deferred AMP reply without blocking the system?
(I'm very open to suggestions for a better title.)
I am using the AMP proto开发者_运维知识库col over Twisted to create a scheduler that feeds out jobs to its agents. The agents pull jobs from the scheduler, so the scheduler is an AMP server and the agents connect as clients.
The idea is for an agent to connect, pick up a job from the top of the (internal scheduler) job queue and then go on about its way executing it. However, that queue is not guaranteed to be always non-empty. Thus, I am looking to take advantage of the twisted deferred mechanic in order to simply have a deferred fire on the agent's side when the scheduler has managed to pop off a job from the queue.
Implementing this on the scheduler side is proving a bit tricky, though. The way that AMP works is by assigning a function to each (predefined by me) command that the agent can send, with the function taking all the arguments that the command has and returning a dictionary of all the values it returns. This means that I need to do this all from within one function. Normally, this would not be an issue, but here twisted appears to get in my way: I need to have the function pause for a bit, without pausing the twisted event loop thus allowing it to actually add more jobs to the queue, so one can be popped off. (This is the reason I don't think the usual sleep()
will have the desired effect.) More importantly, it means I can't think of a way to use some twisted functionality, e.g. deferToThread()
, because I would have to handle the results from that (and only have access to them) in the separate function that I would assign as that deferred
's callback, so I wouldn't know what to return in the AMP responder function after launching off the separate thread and assigning its callback. This illustrates what I mean a bit more clearly:
def assignJob(agentID):
# We expect the agentID, so we can store who we've given a job to.
# Get a job without blocking even if the queue is originally empty.
job = None
while job is None:
try:
job = jobqueue.pop(0)
except IndexError:
# Imagine getJob simply tries to get a job every 5 seconds
# (using sleep() safely because it's in a separate thread)
# until it eventually gets one, which it returns
d = deferToThread(getJob)
# We would then need to have a separate function
# , e.g. jobReturn() pick up the firing deferred and do
# something with the result...
d.addCallback(jobReturn)
# But if we do... We don't (necessarily) have a job to return here
# because for all we know, the deferred from that thread hasn't even
# fired yet.
return {'job': ???}
(This is obviously not the actual full code for the function - for one, it's a method to a subclass of amp.AMP
as required.)
The reactor method callInThread()
also seems useful at first (since it doesn't return a deferred), but it doesn't offer a way to get the return value of the callable that it executes (as far as I can see) and, even if it did, that would mean waiting for the thread to finish, which would block this method for as long, which makes using a separate thread pointless.
So how do I block this method until I have a job, but not the whole Twisted event loop or, alternatively, how do I return an AMP reply outside of its immediate responder method?
One thing you might have missed is that an AMP responder method itself is also allowed to return a Deferred (search for may also return Deferreds in the AMP API docs). As long as the Deferred eventually fires with a dictionary which matches up with the command's response definition, everything will work fine.
Also somewhat related, if you want to avoid using threads, you might want to take a look at twisted.internet.defer.DeferredQueue, a queue data structure which knows about Deferreds natively.
精彩评论