开发者

Dynamically loading modules in Python (+ multi processing question)

I am writing a Python package which reads the list of modules (along with ancillary data) from a configuration file.

I then want to iterate through each of the dynamically loaded modules and invoke a do_work() function in it which will spawn a new process, so that the code runs ASYNCHRONOUSLY in a separate process.

At the moment, I am importing the list of all known modules at the beginning of my main script - this is a nasty hack I feel, and is not very flexible, as well as being a maintenance pain.

This is the function that spawns the processes. I will like to modify it to dynamically load the module when it is encountered. The key in the dictionary is the name of the module containing the code:

def do_work(work_info):
  for (worker, dataset) in work_info.items():
    #import the module defined by variable worker here...

    # [Edit] NOT using threads anymore, want to spawn processes asynchronously here...

    #t = threading.Thread(target=worker.do_work, args=[dataset])
    # I'll NOT dameonize since spawned children need to clean up on shutdown
    # Since the threads will be holding resources
    #t.daemon = True
    #t.start()

Question 1

When I call the function in my script (as written above), I get the following error:

AttributeError: 'str' object has no attribute 'do_work'

Which makes sense, since the dictionary key is a string (name of the module to be imported).

When I add the statement:

import worker

before spawning the thread, I get the error:

ImportError: No module named worker

This is strange, since the variable name rather than the value it holds are being used - when I print the variable, I get the value (as I expect) whats going on?

Question 2

As I mentioned in the comments section, I realize that the do_work() function written in the spawned children needs to cleanup after itself. My understanding is to write a clean_up function that is called when do_work() has completed successfully, or an unhandled exception is caught - is there anything more I need to do to ensure resources don't leak or leave the OS in an unstable state?

Question 3

If I comment out the t.daemon flag statement, will the code stil run ASYNCHRONOUSLY?. The work carried out by the spawned children are pretty intensive, and I don't want to have to be waiting for one child to finish before spawning another child. BTW, I am aware that threading in Python is in reality, a kind of time sharing/slicing - thats ok

Lastly is there a better (more Pythonic) way of doing what I'm trying to do?

[Edit]

After reading a little more about Pythons GIL and the threading (ahem - hack) in Python, I think its best to use separate processes instead (at least IIUC, the scrip开发者_如何学编程t can take advantage of multiple processes if they are available), so I will be spawning new processes instead of threads.

I have some sample code for spawning processes, but it is a bit trivial (using lambad functions). I would like to know how to expand it, so that it can deal with running functions in a loaded module (like I am doing above).

This is a snippet of what I have:

def do_mp_bench():
    q = mp.Queue() # Not only thread safe, but "process safe"
    p1 = mp.Process(target=lambda: q.put(sum(range(10000000))))
    p2 = mp.Process(target=lambda: q.put(sum(range(10000000)))) 
    p1.start()
    p2.start()
    r1 = q.get()
    r2 = q.get()
    return r1 + r2

How may I modify this to process a dictionary of modules and run a do_work() function in each loaded module in a new process?


Question 1: use __import__().

Question 2: why not just do the cleanup at the end of the do_work() function?

Question 3: IIRC daemon thread just means that the program won't automatically wait for this thread to end.


This was revised to make use of import() documentation here: import and refactored to utilize the requested multiprocessing module as documented here: multiprocessing. This hasn't been tested.

def do_work(work_info):
    q = mp.Queue()
    for (worker, dataset) in work_info.items():
      xworker = __import__(worker)
      p = mp.Process(target=xworker.do_work, args=dataset).start()
      q.put(p)
    while not q.empty():
      r = q.get()
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜