Why is my multithreaded python script that uses Queue , threading.Thread and subprocess so flaky
I have three shell scripts P1 , P2 and P3 which I am trying to chain. These three shell scripts need to be run in series , but at any given time multiple P1s and P2s and P3s can be running.
I need to run these on tens of files and quickly and hence the desire to use Threads and do things in parallel.
I am using the python Thread , Queue and subprocess module to achieve this.
My problem is when I have a thread count of greater than one , the program behaves erratically and the threads dont hand off to each other in a reproduceable manner. SOmetimes all five threads work perfectly and work to completion.
This is my first attempt at doing something using threads and I am certain this is because of the usual issues with Threads involving race conditions. But I want to know how I can go about cleaning up my code.
The actual code is at (https://github.com/harijay/xtaltools/blob/master/process_multi.py). Pseudocode is given below. Sorry if the code is messy.
My question is Why do I have erratic behavior using this design. The Threads are all accessing different files at any given time. Also subprocess.call returns only when the shell script is finished and the file it produces is written to disk.
What can I do differently? I have tried to explain my design here as succinctly as possible.
My Basic design:
开发者_开发百科P1_Queue = Queue()
P2_Queue = Queue()
P3_Queue = Queue()
class P1_Thread(Thread):
def __init__(self,P1_Queue,P2_Queue):
Thread.__init__(self)
self.in_queue = P1_Queue
self.out_queue = P2_Queue
def run(self):
while True:
my_file_to_process = self.in_queue.get()
if my_file_to_process = None:
break
P1_runner = P1_Runner(my_file_to_process)
P1_runner.run_p1_using_subprocess()
self.out_queue.put(my_file_to_process)
The class p1 Runner takes the input file handle and then calls the subprocess.call() to run a shell script that uses the file input and produces a new output file using a run_p1_using_subprocess method.
class P1_runner(object):
def __init__(self,inputfile):
self.my_shell_script = """#!/usr/bin/sh
prog_name <<eof
input 1
...
eof"""
self.my_shell_script_file = open("some_unique_p1_file_name.sh")
os.chmod("some_unique_file_name.sh",0755)
def run_p1_using_subprocess(self):
subprocess.call([self.my_shell_script_file])
I have essentially similar classes for P2 and P3 . All of which call a shell script that is custom generated
The chaining is achieved using a series of Thread Pools.
p1_worker_list = []
p2_worker_list = []
p3_worker_list = []
for i in range(THREAD_COUNT):
p1_worker = P1_Thread(P1_Queue,P2_Queue)
p1_worker.start()
p1_worker_list.append(p1_worker)
for worker in p1_worker_list:
worker.join()
And then again the same code block for p2 and p3
for i in range(THREAD_COUNT):
p2_worker = P2_Thread(P2_Queue,P3_Queue)
p2_worker.start()
p2_worker_list.append(p1_worker)
for worker in p2_worker_list:
worker.join()
Thanks a tonne for your help/advice
Well this is really bad:
runner.run()
You shouldn't ever call a thread's run method manually. You start a thread with .start(). Your code is a HUGE mess and no one here is going to wade through it to find your error.
The thread's exit condition makes them commit suicide when another thread empties their input queue:
my_file_to_process = self.in_queue.get()
if my_file_to_process = None: # my sister ate faster than I did, so...
break # ... I kill myself!
Threads are dieing just because they didn't find work to do when they were ready for more.
You should instead make the threads go to sleep (wait) until an event on their input queue is signaled, an die only when the orchestrator (main program) signals that processing is done (set the suicide flag, and signal all queues).
(I see you already changed the code).
What @Falmarri probably means in his note elsewhere is that your question is not about a specific problem (something others can answer) because the overall use of the threading
library in your code is wrong, and your use of the programming language in general is awkward. For example:
- The call to
worker.join()
makes the main program wait for the termination of all P1 threads, in order, before launching the P2 threads, thus defeating any attempt at concurrency. - You should either override
Thread.run()
or provide a callable to the constructor. There's no need for thePn_runner
classes. - All the thread classes do the same. Yo don't need a different class per process stage.
- If you are already using Python, then it makes no sense to call an external program (much less a shell script) unless you absolutely cannot do the work easily in pure Python.
- Because of the above, having your program write shell scripts to the file system is very odd, and almost certainly unnecessary.
What I suggest to do to solve this particular problem of yours is:
- Try to stick to 100% Python. If you can't, or it seems too difficult, you'll at least have found the specific functionality that has to be accessed externally.
- Build a solution that doesn't use concurrency.
- Measure the program's performance and try to improve it algorithmically.
- Avoid threading if you can. A program that is CPU-bound will eat all the available cycles without threading. A program that is too disk-bound (or bound any external/remote resource) will end up waiting for the disk if it hasn't anything else to do. To benefit from threading a program must have the right balance between calculation and external resource use (or must be able to service requests as they arrive even when otherwise busy).
- Do it the pythonic way: start simple, and gradually increase functionality and complexity while, at all times, avoiding anything that seems complicated.
If your intention is to teach yourself about threading in Python, then by all means seek a simple problem to experiment with. And if all you wanted was to run several shell scripts in parallel, then bash
and the other shells already have provisions for that, and you don't need to use Python.
精彩评论