开发者

How do I pass a python object using a remote manager?

I'm developing a simple client-server application in python. I'm using a manager to set up shared queues, but I can't figure out how to pass an arbitrary object from the server to the client. I suspect it has something to do with the manager.register function, but it's not very well explained in the multiprocessing documentation. The only example there uses Queues and nothing else.

Here's my code:

#manager demo.py
from multiprocessing import Process, Queue, managers
from multiprocessing.managers import SyncManager
import time

class MyObject():
    def __init__( self, p, f ):
        self.parameter = p
        self.processor_function = f

class MyServer():
    def __init__(self, server_info, obj):
        print '=== Launching Server ... ====='
        (ip, port, pw) = server_info
        self.object = obj       #Parameters for task processing

        #Define queues
        self._process_queue = Queue()       #Queue of tasks to be processed
        self._results_queue = Queue()       #Queue of processed tasks to be stored

        #Set up IS_Manager class and register server functions
        class IS_Manager(managers.BaseManager): pass
        IS_Manager.register('get_processQ', callable=self.get_process_queue)
        IS_Manager.register('get_resultsQ', callable=self.get_results_queue)
        IS_Manager.register('get_object', callable=self.get_object)

        #Initialize manager and server
        self.manager = IS_Manager(address=(ip, port), authkey=pw)
        self.server = self.manager.get_server()

        self.server_process = Process( target=self.server.serve_forever )
        self.server_process.start()

    def get_process_queue(self): return self._process_queue
    def get_results_queue(self): return self._results_queue
    def get_object(self): return self.object

    def runUntilDone(self, task_list):
        #Fill the initial queue
        for t in task_list:
            self._process_queue.put(t)

        #Main loop
        total_tasks = len(task_list)
        while not self._results_queue.qsize()==total_tasks:
            time.sleep(.5)
            print self._process_queue.qsize(), '\t', self._results_queue.qsize()
            if not self._results_queue.empty():
                print '\t', self._results_queue.get()
            #Do stuff
            pass

class MyClient():
    def __init__(self, server_info):
        (ip, port, pw) = server_info
        print '=== Launching Client ... ====='

        class IS_Manager(managers.BaseManager): pass

        IS_Manager.register('get_processQ')
        IS_Manager.register('get_resultsQ')
        IS_Manager.register('get_object')

        #Set up manager, pool
        print '\tConnecting to server...'
        manager = IS_Manager(address=(ip, port), authkey=pw)
        manager.connect()

        self._process_queue = manager.get_processQ()
        self._results_queue = manager.get_resultsQ()
        self.object = manager.get_object()

        print '\tConnected.'

    def runUntilDone(self):#, parameters):
        print 'Starting client main loop...'

        #Main loop
        while 1:
            if self._process_queue.empty():
                print 'I\'m bored here!'
                time.sleep(.5)
            else:
                task = self._process_queue.get()
                print task, '\t', self.object.processor_f开发者_如何学Pythonunction( task, self.object.parameter )

        print 'Client process is quitting.  Bye!'
        self._clients_queue.get()

And a simple server...

from manager_demo import *

def myProcessor( x, parameter ):
    return x + parameter

if __name__ == '__main__':
    my_object = MyObject( 100, myProcessor )
    my_task_list = range(1,20)
    my_server_info = ('127.0.0.1', 8081, 'my_pw')

    my_crawl_server = MyServer( my_server_info, my_object )
    my_crawl_server.runUntilDone( my_task_list )

And a simple client...

from manager_demo import *
if __name__ == '__main__':
    my_server_info = ('127.0.0.1', 8081, 'my_pw')
    my_client = MyClient( my_server_info )
    my_client.runUntilDone()

When I run this it crashes on:

erin@Erin:~/Desktop$ python client.py 
=== Launching Client ... =====
    Connecting to server...
    Connected.
Starting client main loop...
2   Traceback (most recent call last):
  File "client.py", line 5, in <module>
    my_client.runUntilDone()
  File "/home/erin/Desktop/manager_demo.py", line 84, in runUntilDone
    print task, '\t', self.object.processor_function( task, self.object.parameter )
AttributeError: 'AutoProxy[get_object]' object has no attribute 'parameter'

Why does python have no trouble with Queues or the processor_function, but choke on the object parameter? Thanks!


You're encountering this issue because the parameter attribute on your MyObject() class is not a callable.

The documentation states that, _exposed_ is used to specify a sequence of method names which proxies for this typeid. In the case where no exposed list is specified, all “public methods” of the shared object will be accessible. (Here a “public method” means any attribute which has a __call__() method and whose name does not begin with '_'.)

So, you will need to manually expose the parameter attribute on MyObject, presumably, as a method, by changing your MyObject():

class MyObject():
    def __init__(self, p, f):
        self._parameter = p
        self.processor_function = f

    def parameter(self):
        return self._parameter

Also, you will need to change your task to:

 self.object.processor_function(task, self.object.parameter())

HTH.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜