开发者

How to create a synchronized object with Python multiprocessing?

I am trouble figuring out how to make a synchronized Python object. I have a class called Observation and a class called Variable that basically looks like (code is simplified to show the essence):

class Observation:
    def __init__(self, date, time_unit, id, meta):
        self.date = date
        self.time_unit = time_unit
        self.id = id
        self.count = 0
        self.data = 0

    def add(self, value):
        if isinstance(value, list):
            if self.count == 0:
                self.data = []
            self.data.append(value)
        else:
            self.data += value
        self.count += 1


class Variable:
    def __init__(self, name, time_unit, lock):
        self.name = name
        self.lock = lock
        self.obs = {}
        self.time_unit = time_unit

    def get_observation(self, id, date, meta):
        self.lock.acquire()
        try:
            obs = self.obs.get(id, Observation(date, self.time_unit, id, meta))
            self.obs[id] = obs
        finally:
            self.lock.release()
        return obs

    def add(self, date, value, meta={}):
        self.lock.acquire()
        try:
            obs = self.get_observation(id, date, meta)
            obs.add(value)
            self.obs[id] = obs
        finally:
            self.lock.release()

This is how I setup the multiprocessing part: plugin = function defined somewhere else tasks = JoinableQueue() result = JoinableQueue() mgr = Manager() lock = mgr.RLock() var = Variable('foobar', 'year', lock)

for person in persons:
    tasks.put(Task(plugin, var, person))

Example of how the code is supposed to work:

I have an instance of Variable called var and I want to add an observation to var:

today = datetime.datetime.today()  
var.add(today, 1)  

So, the add function of Variable looks whether there already exists an observation for that date, if it does then it returns that observation else it creates a new instance of Observation. Having found an observation than the actual value is added by the call obs.add(value). My main concern is that I want to make sure that different processes are not creating multiple instances of Observation for the same date, that's why I lock it.

One instance of Variable is created and is shared between different processes using the multiprocessing library and is the container for numerous instances of Observation. The above code does not work, I get the error:

RuntimeError: Lock objects should only be shared between processes through inheritance

However, if I instantiate a Lock object before launching the different processes and supply it to the constructor of Variable then it seems that I get a race condition as all processes seem to be waiting for each other.

The ultimate goal is that different processes can update the obs variable in the object Variable. I need this to be threadsafe because I am not just modifying the dictionary in place but adding new elements and incrementing existing variables. the obs variable is a dictionary that contains a bunch of instances of Observation.

How can I make this synchronized where I share one single instance of Variable between numerous multiprocessing processes? Thanks so much for your cognitive surplus!

UPDATE 1:

* I am using multiprocessing Locks and I have changed the source code to开发者_开发知识库 show this.

* I have changed the title to more accurately capture the problem

* I have replaced theadsafe with synchronization where I was confusing the two terms.

Thanks to Dmitry Dvoinikov for pointing me out!

One question that I am still not sure about is where do I instantiate Lock? Should this happen inside the class or before initializing the multiprocesses and give it as an argument? ANSWER: Should happen outside the class.

UPDATE 2:

* I fixed the 'Lock objects should only be shared between processes through inheritance' error by moving the initialization of the Lock outside the class definition and using a manager.

* Final question, now everything works except that it seems that when I put my Variable instance in the queue then it does not get updated, and everytime I get it from the queue it does not contain the observation I added in the previous iteration. This is the only thing that is confusing me :(

UPDATE 3:

The final solution was to set the var.obs dictionary to an instance of mgr.dict() and then to have a custom serializer. Happy tho share the code with somebody who is struggling with this as well.


You are talking not about thread safety but about synchronization between separate processes and that's entirely different thing. Anyway, to start

different processes can update the obs variable in the object Variable.

implies that Variable is in shared memory, and you have to explicitly store objects there, by no magic a local instance becomes visible to separate process. Here:

Data can be stored in a shared memory map using Value or Array

Then, your code snippet is missing crucial import section. No way to tell whether you instantiate the right multiprocessing.Lock, not multithreading.Lock. Your code doesn't show the way you create processes and pass data around.

Therefore, I'd suggest that you realize the difference between threads and processes, whether you truly need a shared memory model for an application which contains multiple processes and examine the spec.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜