app engine DeadlineExceededError for cron jobs and task queue for wikipedia crawler
I am trying to build a wikipedia link crawler on google app engine. I wanted to store an index in the datastore. But I run into the DeadlineExceededError for both cron jobs and task queue.
for the cron job I have this code:
def buildTree(self):
start=time.time()
self.log.info(" Start Time: %f" % start)
nobranches=TreeNode.all()
for tree in nobranches:
if tree.branches==[]:
self.addBranches(tree)
time.sleep(1)
if (time.time()-start) > 10 :
break
self.log.info("Time Eclipsed: %f" % (time.time()-start))
self.log.info(" End Time:%f" % time.clock())
I don't understand why the for loop doesn't break after 10 seconds. It does on the dev server. Something must be wrong with the time.time() on the server. Is there another function I can use?
for the task queue I have this code:
def addNewBranch(self, keyword, level=0):
self.log.debug("Add Tree")
self.addBranches(keyword)
t=TreeNode.gql("WHERE name=:1", keyword).get()
branches=t.nodes
if level < 3:
for branch in branches:
if branch.branches == []:
taskqueue.add(url="/addTree/%s" % branch.name)
self.log.debug("url:%s" % "/addTree/%s" % branch.name)
The logs show that they both run into the DeadlineExceededError. Shouldn't background processing have a longer that the 30 seconds for the page request. Is there a way around the exception?
Here is the code for addBranch()
def addBranches(self, keyword):
tree=TreeNode.gql("WHERE name=:1", keyword).get()
if tree is None:
tree=TreeNode(name=keyword)
self.log.debug("in addBranches arguments: tree %s", tree.name)
t=urllib2.quote(tree.name.encode('utf8'))
s="http://en.wikipedia.org/w/api.php?action=query&titles=%s&prop=links&pllimit=500&format=xml" % t
self.log.debug(s)
try: 开发者_JAVA百科
usock = urllib2.urlopen(s)
except :
self.log.error( "Could not retrieve doc: %s" % tree.name)
usock=None
if usock is not None:
try:
xmldoc=minidom.parse(usock)
except Exception , error:
self.log.error("Parse Error: %s" % error)
return None
usock.close()
try:
pyNode= xmldoc.getElementsByTagName('pl')
self.log.debug("Nodes to be added: %d" % pyNode.length)
except Exception, e:
pyNode=None
self.log.error("Getting Nodes Error: %s" % e)
return None
newNodes=[]
if pyNode is not None:
for child in pyNode:
node=None
node= TreeNode.gql("WHERE name=:1", child.attributes["title"].value).get()
if node is None:
newNodes.append(TreeNode(name=child.attributes["title"].value))
else:
tree.branches.append(node.key())
db.put(newNodes)
for node in newNodes:
tree.branches.append(node.key())
self.log.debug("Node Added: %s" % node.name)
tree.put()
return tree.branches
I have had great success with datetimes on GAE.
from datetime import datetime, timedelta
time_start = datetime.now()
time_taken = datetime.now() - time_start
time_taken will be a timedelta. You can compare it against another timedelta that has the duration you are interested in.
ten_seconds = timedelta(seconds=10)
if time_taken > ten_seconds:
....do something quick.
It sounds like you would be far better served using mapreduce or Task Queues. Both are great fun for dealing with huge numbers of records.
A cleaner pattern for the code you have is to fetch only some records.
nobranches=TreeNode.all().fetch(100)
This code will only pull 100 records. If you have a full 100, when you are done, you can throw another item on the queue to launch off more.
-- Based on comment about needing trees without branches --
I do not see your model up there, but if I were trying to create a list of all of the trees without branches and process them, I would: Fetch the keys only for trees in blocks of 100 or so. Then, I would fetch all of the branches that belong to those trees using an In query. Order by the tree key. Scan the list of branches, the first time you find a tree's key, pull the key tree from the list. When done, you will have a list of "branchless" tree keys. Schedule each one of them for processing.
A simpler version is to use MapReduce on the trees. For each tree, find one branch that matches its ID. If you cannot, flag the tree for follow up. By default, this function will pull batches of trees (I think 25) with 8 simultaneous workers. And, it manages the job queues internally so you don't have to worry about timing out.
There is not a way "around" the deadline exception aside from making your code execute within the proper timeframe.
The problem here is that you're doing a query operation for every link in your document. Since wikipedia pages can contain a lot of links, this means a lot of queries - and hence, you run out of processing time. This approach is also going to consume your quota at a fantastic rate!
Instead, you should use the page name of the Wikipedia page as the key name of the entity. Then, you can collect up all the links from the document into a list, construct keys from them (which is an entirely local operation), and do a single batch db.get for all of them. Once you've updated and/or created them as appropriate, you can do a batch db.put to store them all to the datastore - reducing your total datastore operations from numlinks*2 to just 2!
When DeadlineExcededErrors happen you want the request to eventually succeed if called again. This may require that your crawling state is guaranteed to have made some progress that can be skipped the next time. (Not addressed here)
Parallelized calls can help tremendously.
- Urlfetch
- Datastore Put (mixed entities together using db.put)
- Datastore Query (queries in parallel - asynctools)
Urlfetch:
- When you make your urlfetch calls be sure to use the asynchronous mode to collapse your loop.
Datastore
Combine Entities being put into a single round trip call.
# put newNodes+tree at the same time db.put(newNodes+tree)
Pull TreeNode.gql from inside loop up into parallel query tool like asynctools http://asynctools.googlecode.com
Asynctools Example
if pyNode is not None:
runner = AsyncMultiTask()
for child in pyNode:
title = child.attributes["title"].value
query = db.GqlQuery("SELECT __key__ FROM TreeNode WHERE name = :1", title)
runner.append(QueryTask(query, limit=1, client_state=title))
# kick off the work
runner.run()
# peel out the results
treeNodes = []
for task in runner:
task_result = task.get_result() # will raise any exception that occurred for the given query
treeNodes.append(task_result)
for node in treeNodes:
if node is None:
newNodes.append(TreeNode(name=child.attributes["title"].value))
else:
tree.branches.append(node.key())
for node in newNodes:
tree.branches.append(node.key())
self.log.debug("Node Added: %s" % node.name)
# put newNodes+tree at the same time
db.put(newNodes+tree)
return tree.branches
DISCLOSURE: I am associated with asynctools.
精彩评论