python threading and queues for infinite data input (stream)
I would like to use thread to process a streaming input.
How can make the below code for an infinite input generate for example by using itertools.count
The code below will work if: 'for i in itertools.count():' is replaced by 'for i in xrange(5):'
from threading import Thread
from Queue import Queue, Empty
import itertools
def do_work(q):
while True:
try:
x = q.get(block=False)
print (x)
excep开发者_高级运维t Empty:
break
if __name__ == "__main__":
work_queue = Queue()
for i in itertools.count():
work_queue.put(i)
threads = [Thread(target=do_work, args=(work_queue,)) for i in range(8)]
for t in threads: t.start()
for t in threads: t.join()
The problem is that itertools.count
generates an infinite sequence. This means the for loop will never end. You should put that in it's own function and make it a separate thread. This way you will have the queue growing while the worker threads get data off the queue.
You need to fill the queue with a thread. You need to manage the queue size. Especially if the workers are taking time to process items. You need to mark queue items done. If this is related to your other question about twitter and "extremely fast" input, then you have an awful lot more to do with regards to database inserts.
Your questions have been too vague on pretty complicated topics. You don't seem to understand enough about even what you're trying to achieve to know that it's not easy. I recommend that you are a little more specific in what you are trying to do.
Here's an example of filling and consuming a queue with threads. The queue size isn't being managed.
from threading import Thread
from Queue import Queue, Empty, Full
import itertools
from time import sleep
def do_work(q,wkr):
while True:
try:
x = q.get(block=True,timeout=10)
q.task_done()
print "Wkr %s: Consuming %s" % (wkr,x)
sleep(0.01)
except Empty:
print "Wkr %s exiting, timeout/empty" % (wkr)
break
sleep(0.01)
def fill_queue(q,limit=1000):
count = itertools.count()
while True:
n = count.next()
try:
q.put(n,block=True,timeout=10)
except Full:
print "Filler exiting, timeout/full"
break
if n >= limit:
print "Filler exiting, reached limit - %s" % limit
break
sleep(0.01)
work_queue = Queue()
threads = [Thread(target=do_work, args=(work_queue,i)) for i in range(2)]
threads.insert(0,Thread(target=fill_queue,args=(work_queue,100)))
for t in threads:
t.start()
for t in threads:
t.join()
Wkr 0: Consuming 0
Wkr 1: Consuming 1
Wkr 0: Consuming 2
Wkr 1: Consuming 3
....
Wkr 1: Consuming 99
Filler exiting, reached limit - 100
Wkr 0: Consuming 100
Wkr 1 exiting, timeout/empty
Wkr 0 exiting, timeout/empty
Maybe I'm missing something, but isn't it as simple as creating and starting the threads before the for
loop?
Also, having your threads terminate when there's no work seems like a bad idea, as there may be more work showing up in future. Surely you want them to block until some work is available?
精彩评论