How do I pass a python object using a remote manager?
I'm developing a simple client-server application in python. I'm using a manager to set up shared queues, but I can't figure out how to pass an arbitrary object from the server to the client. I suspect it has something to do with the manager.register function, but it's not very well explained in the multiprocessing documentation. The only example there uses Queues and nothing else.
Here's my code:
#manager demo.py
from multiprocessing import Process, Queue, managers
from multiprocessing.managers import SyncManager
import time
class MyObject():
def __init__( self, p, f ):
self.parameter = p
self.processor_function = f
class MyServer():
def __init__(self, server_info, obj):
print '=== Launching Server ... ====='
(ip, port, pw) = server_info
self.object = obj #Parameters for task processing
#Define queues
self._process_queue = Queue() #Queue of tasks to be processed
self._results_queue = Queue() #Queue of processed tasks to be stored
#Set up IS_Manager class and register server functions
class IS_Manager(managers.BaseManager): pass
IS_Manager.register('get_processQ', callable=self.get_process_queue)
IS_Manager.register('get_resultsQ', callable=self.get_results_queue)
IS_Manager.register('get_object', callable=self.get_object)
#Initialize manager and server
self.manager = IS_Manager(address=(ip, port), authkey=pw)
self.server = self.manager.get_server()
self.server_process = Process( target=self.server.serve_forever )
self.server_process.start()
def get_process_queue(self): return self._process_queue
def get_results_queue(self): return self._results_queue
def get_object(self): return self.object
def runUntilDone(self, task_list):
#Fill the initial queue
for t in task_list:
self._process_queue.put(t)
#Main loop
total_tasks = len(task_list)
while not self._results_queue.qsize()==total_tasks:
time.sleep(.5)
print self._process_queue.qsize(), '\t', self._results_queue.qsize()
if not self._results_queue.empty():
print '\t', self._results_queue.get()
#Do stuff
pass
class MyClient():
def __init__(self, server_info):
(ip, port, pw) = server_info
print '=== Launching Client ... ====='
class IS_Manager(managers.BaseManager): pass
IS_Manager.register('get_processQ')
IS_Manager.register('get_resultsQ')
IS_Manager.register('get_object')
#Set up manager, pool
print '\tConnecting to server...'
manager = IS_Manager(address=(ip, port), authkey=pw)
manager.connect()
self._process_queue = manager.get_processQ()
self._results_queue = manager.get_resultsQ()
self.object = manager.get_object()
print '\tConnected.'
def runUntilDone(self):#, parameters):
print 'Starting client main loop...'
#Main loop
while 1:
if self._process_queue.empty():
print 'I\'m bored here!'
time.sleep(.5)
else:
task = self._process_queue.get()
print task, '\t', self.object.processor_f开发者_如何学Pythonunction( task, self.object.parameter )
print 'Client process is quitting. Bye!'
self._clients_queue.get()
And a simple server...
from manager_demo import *
def myProcessor( x, parameter ):
return x + parameter
if __name__ == '__main__':
my_object = MyObject( 100, myProcessor )
my_task_list = range(1,20)
my_server_info = ('127.0.0.1', 8081, 'my_pw')
my_crawl_server = MyServer( my_server_info, my_object )
my_crawl_server.runUntilDone( my_task_list )
And a simple client...
from manager_demo import *
if __name__ == '__main__':
my_server_info = ('127.0.0.1', 8081, 'my_pw')
my_client = MyClient( my_server_info )
my_client.runUntilDone()
When I run this it crashes on:
erin@Erin:~/Desktop$ python client.py
=== Launching Client ... =====
Connecting to server...
Connected.
Starting client main loop...
2 Traceback (most recent call last):
File "client.py", line 5, in <module>
my_client.runUntilDone()
File "/home/erin/Desktop/manager_demo.py", line 84, in runUntilDone
print task, '\t', self.object.processor_function( task, self.object.parameter )
AttributeError: 'AutoProxy[get_object]' object has no attribute 'parameter'
Why does python have no trouble with Queues or the processor_function, but choke on the object parameter? Thanks!
You're encountering this issue because the parameter
attribute on your MyObject()
class is not a callable.
The documentation states that, _exposed_
is used to specify a sequence of method names which proxies for this typeid. In the case where no exposed list is specified, all “public methods” of the shared object will be accessible. (Here a “public method” means any attribute which has a __call__() method and whose name does not begin with '_'.)
So, you will need to manually expose the parameter
attribute on MyObject
, presumably, as a method, by changing your MyObject()
:
class MyObject():
def __init__(self, p, f):
self._parameter = p
self.processor_function = f
def parameter(self):
return self._parameter
Also, you will need to change your task to:
self.object.processor_function(task, self.object.parameter())
HTH.
精彩评论