Python: nonblocking read from stdout of threaded subprocess
I have a script (worker.py) that prints unbuffered output in the form...
1
2
3
.
.
.
n
where n is some constant number of iterations a loop in this script will make. In another script (service_controller.py) I start a number of threads, each of which starts a subprocess using subprocess.Popen(stdout=subprocess.PIPE, ...); Now, in my main thread (service_controller.py) I want to read the output of each thread's worker.py subprocess and use it to calculate an estimate for the time remaining till completion.
I have all of the logic working that reads the stdout from worker.py and determines the last printed number. The problem is that I can not figure out how to do this in a non-blocking way. If I read a constant bufsize then each read will end up waiting for the same data from each of the workers. I have tried numerous ways including using fcntl, select + os.read, etc. What is my best option here? I can post my source if needed, but I figured the explanation describes the problem well enough.
Thanks for any help here.
EDIT
Adding sample codeI have a worker that starts a subprocess.
class WorkerThread(threading.Thread):
def __init__(self):
self.completed = 0
self.process = None
self.lock = threading.RLock()
threading.Thread.__init__(self)
def run(self):
cmd = ["/path/to/script", "arg1", "arg2"]
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, shell=False)
#flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
#fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
def get_completed(self):
开发者_高级运维 self.lock.acquire();
fd = select.select([self.process.stdout.fileno()], [], [], 5)[0]
if fd:
self.data += os.read(fd, 1)
try:
self.completed = int(self.data.split("\n")[-2])
except IndexError:
pass
self.lock.release()
return self.completed
I then have a ThreadManager.
class ThreadManager():
def __init__(self):
self.pool = []
self.running = []
self.lock = threading.Lock()
def clean_pool(self, pool):
for worker in [x for x in pool is not x.isAlive()]:
worker.join()
pool.remove(worker)
del worker
return pool
def run(self, concurrent=5):
while len(self.running) + len(self.pool) > 0:
self.clean_pool(self.running)
n = min(max(concurrent - len(self.running), 0), len(self.pool))
if n > 0:
for worker in self.pool[0:n]:
worker.start()
self.running.extend(self.pool[0:n])
del self.pool[0:n]
time.sleep(.01)
for worker in self.running + self.pool:
worker.join()
and some code to run it.
threadManager = ThreadManager()
for i in xrange(0, 5):
threadManager.pool.append(WorkerThread())
threadManager.run()
I have stripped out a log of the other code in hopes to try to pinpoint the issue.
Instead of having your service_controller being blocked by i/o access, only the thread loop should read its own controlled process output.
then, you can have method in the threaded object controlling the process to get the last polled output.
of course, don't forget in that case to use some locking mechanism to protect the buffer that will be used both by the thread to fill it and the method called by the controller to get it.
Your method WorkerThread.run() launches the subprocess and then terminates immediately. Run() needs to perform the polling and update WorkerThread.completed until the subprocess completes.
精彩评论