Statistical accumulator in Python
An statistical accumulator allows one to perform incremental calculations. For instance, for computing the arithmetic mean of a stream of numbers given at arbitrary times one could make an object which keeps track of the current number of items given, n
and their sum, sum
. When one requests the mean, the object simply returns sum/n
.
An accumulator like this allows you to compute incrementally in the sense that, when given a new number, you don't need to recompute the entire sum and count.
Similar accumulators can be written for other statistics (cf. boost library for a C++ implementation).
How would you implement accumulators in Python? The code I came up with is:
class Accumulator(object):
"""
Used to accumulate the arithmetic mean of a stream of
numbers. This implementation does not allow to remove items
already accumulated, but it could easily be modified to do
so. also, other statistics could be accumulated.
"""
def __init__(self):
# upon initialization, the numnber of items currently
# accumulated (_n) and the total sum of the items acumulated
# (_sum) are set to zero because nothing ha开发者_如何学Gos been accumulated
# yet.
self._n = 0
self._sum = 0.0
def add(self, item):
# the 'add' is used to add an item to this accumulator
try:
# try to convert the item to a float. If you are
# successful, add the float to the current sum and
# increase the number of accumulated items
self._sum += float(item)
self._n += 1
except ValueError:
# if you fail to convert the item to a float, simply
# ignore the exception (pass on it and do nothing)
pass
@property
def mean(self):
# the property 'mean' returns the current mean accumulated in
# the object
if self._n > 0:
# if you have more than zero items accumulated, then return
# their artithmetic average
return self._sum / self._n
else:
# if you have no items accumulated, return None (you could
# also raise an exception)
return None
# using the object:
# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated
# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0
# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5
# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')
Interesting design questions arise:
- How to make the accumulator thread-safe?
- How to safely remove items?
- How to architect in a way that allows other statistics to be plugged in easily (a factory for statistics)
For a generalized, threadsafe higher-level function, you could use something like the following in combination with the Queue.Queue
class and some other bits:
from Queue import Empty
def Accumulator(f, q, storage):
"""Yields successive values of `f` over the accumulation of `q`.
`f` should take a single iterable as its parameter.
`q` is a Queue.Queue or derivative.
`storage` is a persistent sequence that provides an `append` method.
`collections.deque` may be particularly useful, but a `list` is quite acceptable.
>>> from Queue import Queue
>>> from collections import deque
>>> from threading import Thread
>>> def mean(it):
... vals = tuple(it)
... return sum(it) / len(it)
>>> value_queue = Queue()
>>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
>>> def add_to_queue(it, queue):
... for value in it:
... value_queue.put(value)
>>> putting_thread = Thread(target=add_to_queue,
... args=(range(0, 12, 2), value_queue))
>>> putting_thread.start()
>>> list(LastThreeAverage)
[0, 1, 2, 4, 6, 8]
"""
try:
while True:
storage.append(q.get(timeout=0.1))
q.task_done()
yield f(storage)
except Empty:
pass
This generator function evades most of its purported responsibility by delegating it to other entities:
- It relies on
Queue.Queue
to supply its source elements in a thread-safe manner - A
collections.deque
object can be passed in as the value of thestorage
parameter; this provides, among other things, a convenient way to only use the lastn
(in this case 3) values - The function itself (in this case
mean
) is passed as a parameter. This will result in less-than-optimally efficient code in some cases, but is readily applied to all sorts of situations.
Note that there is a possibility of the accumulator timing out if your producer thread takes longer than 0.1 seconds per value. This is easily remedied by passing a longer timeout or by removing the timeout parameter entirely. In the latter case the function will block indefinitely at the end of the queue; this usage makes more sense in a case where it's being used in a sub thread (usually a daemon
thread). Of course you can also parametrize the arguments that are passed to q.get
as a fourth argument to Accumulator
.
If you want to communicate end of queue, i.e. that there are no more values to come, from the producer thread (here putting_thread
), you can pass and check for a sentinel value or use some other method. There is more info in this thread; I opted to write a subclass of Queue.Queue called CloseableQueue that provides a close
method.
There are various other ways you could customize the behaviour of such a function, for example by limiting the queue size; this is just an example of usage.
edit
As mentioned above, this loses some efficiency because of the necessity of recalculation and also, I think, doesn't really answer your question.
A generator function can also accept values through its send
method. So you can write a mean generator function like
def meangen():
"""Yields the accumulated mean of sent values.
>>> g = meangen()
>>> g.send(None) # Initialize the generator
>>> g.send(4)
4.0
>>> g.send(10)
7.0
>>> g.send(-2)
4.0
"""
sum = yield(None)
count = 1
while True:
sum += yield(sum / float(count))
count += 1
Here the yield expression is both bringing values —the arguments to send
— into the function, while simultaneously passing the calculated values out as the return value of send
.
You can pass the generator returned by a call to that function to a more optimizable accumulator generator function like this one:
def EfficientAccumulator(g, q):
"""Similar to Accumulator but sends values to a generator `g`.
>>> from Queue import Queue
>>> from threading import Thread
>>> value_queue = Queue()
>>> g = meangen()
>>> g.send(None)
>>> mean_accumulator = EfficientAccumulator(g, value_queue)
>>> def add_to_queue(it, queue):
... for value in it:
... value_queue.put(value)
>>> putting_thread = Thread(target=add_to_queue,
... args=(range(0, 12, 2), value_queue))
>>> putting_thread.start()
>>> list(mean_accumulator)
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
"""
try:
while True:
yield(g.send(q.get(timeout=0.1)))
q.task_done()
except Empty:
pass
If I were doing this in Python, there are two things I would do differently:
- Separate out the functionality of each accumulator.
- Not use @property in any way you did.
For the first one, I would likely want to come up with an API for performing an accumulation, perhaps something like:
def add(self, num) # add a number
def compute(self) # compute the value of the accumulator
Then I would create a AccumulatorRegistry that holds onto these accumulators, and allows the user to call actions and add to all of them. The code may look like:
class Accumulators(object):
_accumulator_library = {}
def __init__(self):
self.accumulator_library = {}
for key, value in Accumulators._accumulator_library.items():
self.accumulator_library[key] = value()
@staticmethod
def register(name, accumulator):
Accumulators._accumulator_library[name] = accumulator
def add(self, num):
for accumulator in self.accumulator_library.values():
accumulator.add(num)
def compute(self, name):
self.accumulator_library[name].compute()
@staticmethod
def register_decorator(name):
def _inner(cls):
Accumulators.register(name, cls)
return cls
@Accumulators.register_decorator("Mean")
class Mean(object):
def __init__(self):
self.total = 0
self.count = 0
def add(self, num):
self.count += 1
self.total += num
def compute(self):
return self.total / float(self.count)
I should probably speak to your thread-safe question. Python's GIL protects you from a lot of threading issues. There are a few things you may way to do to protect yourself though:
- If these objects are localized to one thread, use threading.local
- If not, you can wrap the operations in a lock, using the with context syntax to deal with holding the lock for you.
精彩评论