GAE: Unit testing DeadlineExceededError
I've been using testbed, webtest, and nose to test my Python GAE app, and it is a great setup. I'm now implementing something similar to Nick's great example of using the deferred library, but I can't figure out a good way to test the parts of the code triggered by DeadlineExceededError
.
Since this is in the context of a taskqueue, it would be painful to construct a test that took more than 10 minutes to run. Is there a way to temporarily set the taskqueue time limit to a few seconds for the purpose of testing? Or perhaps some other way to elegantly test the execution of code in the except De开发者_运维问答adlineExceededError
block?
Abstract the "GAE context" for your code. in production provide real "GAE implementation" for testing provide a mock own that will raise the DeadlineExceededError. The test should not depend on any timeout, should be fast.
Sample abstraction (just glue):
class AbstractGAETaskContext(object):
def task_spired(): pass # this will throw exception in mock impl
# here you define any method that you call into GAE, to be mocked
def defered(...): pass
If you don't like abstraction, you can do monkey patching for testing only, also you need to define the task_expired function to be your hook for testing. task_expired should be called during your task implementation function.
*UPDATED*This the 3rd solution:
First I want to mention that the Nick's sample implementation is not so great, the Mapper class has to many responsabilities(deferring, query data, update in batch); and this make the test hard to made, a lot of mocks need to be defined. So I extract the deferring responsabilities in a separate class. You only want to test that deferring mechanism, what actually is happen(the update, query, etc) should be handled in other test.
Here is deffering class, also this no more depends on GAE:
class DeferredCall(object):
def __init__(self, deferred):
self.deferred = deferred
def run(self, long_execution_call, context, *args, **kwargs):
''' long_execution_call should return a tuple that tell us how was terminate operation, with timeout and the context where was abandoned '''
next_context, timeouted = long_execution_call(context, *args, **kwargs)
if timeouted:
self.deferred(self.run, next_context, *args, **kwargs)
Here is the test module:
class Test(unittest.TestCase):
def test_defer(self):
calls = []
def mock_deferrer(callback, *args, **kwargs):
calls.append((callback, args, kwargs))
def interrupted(self, context):
return "new_context", True
d = DeferredCall()
d.run(interrupted, "init_context")
self.assertEquals(1, len(calls), 'a deferred call should be')
def test_no_defer(self):
calls = []
def mock_deferrer(callback, *args, **kwargs):
calls.append((callback, args, kwargs))
def completed(self, context):
return None, False
d = DeferredCall()
d.run(completed, "init_context")
self.assertEquals(0, len(calls), 'no deferred call should be')
How will look the Nick's Mapper implementation:
class Mapper:
...
def _continue(self, start_key, batch_size):
... # here is same code, nothing was changed
except DeadlineExceededError:
# Write any unfinished updates to the datastore.
self._batch_write()
# Queue a new task to pick up where we left off.
##deferred.defer(self._continue, start_key, batch_size)
return start_key, True ## make compatible with DeferredCall
self.finish()
return None, False ## make it comaptible with DeferredCall
runner = _continue
Code where you register the long running task; this only depend on the GAE deferred lib.
import DeferredCall
import PersonMapper # this inherits the Mapper
from google.appengine.ext import deferred
mapper = PersonMapper()
DeferredCall(deferred).run(mapper.run)
精彩评论