Rolling variance algorithm
I'm trying to find an efficient, numerically stable algorithm to calculate a rolling variance (for instance, a variance over a 20-period rolling window). I'm aware of the Welford algorithm that efficiently computes the running v开发者_开发知识库ariance for a stream of numbers (it requires only one pass), but am not sure if this can be adapted for a rolling window. I would also like the solution to avoid the accuracy problems discussed at the top of this article by John D. Cook. A solution in any language is fine.
I've run across this problem as well. There are some great posts out there in computing the running cumulative variance such as John Cooke's Accurately computing running variance post and the post from Digital explorations, Python code for computing sample and population variances, covariance and correlation coefficient. Just could not find any that were adapted to a rolling window.
The Running Standard Deviations post by Subluminal Messages was critical in getting the rolling window formula to work. Jim takes the power sum of the squared differences of the values versus Welford’s approach of using the sum of the squared differences of the mean. Formula as follows:
PSA today = PSA(yesterday) + (((x today * x today) - x yesterday)) / n
- x = value in your time series
- n = number of values you've analyzed so far.
But, to convert the Power Sum Average formula to a windowed variety you need tweak the formula to the following:
PSA today = PSA yesterday + (((x today * x today) - (x yesterday * x Yesterday) / n
- x = value in your time series
- n = number of values you've analyzed so far.
You'll also need the Rolling Simple Moving Average formula:
SMA today = SMA yesterday + ((x today - x today - n) / n
- x = value in your time series
- n = period used for your rolling window.
From there you can compute the Rolling Population Variance:
Population Var today = (PSA today * n - n * SMA today * SMA today) / n
Or the Rolling Sample Variance:
Sample Var today = (PSA today * n - n * SMA today * SMA today) / (n - 1)
I've covered this topic along with sample Python code in a blog post a few years back, Running Variance.
Hope this helps.
Please note: I provided links to all the blog posts and math formulas in Latex (images) for this answer. But, due to my low reputation (< 10); I'm limited to only 2 hyperlinks and absolutely no images. Sorry about this. Hope this doesn't take away from the content.
I have been dealing with the same issue.
Mean is simple to compute iteratively, but you need to keep the complete history of values in a circular buffer.
next_index = (index + 1) % window_size; // oldest x value is at next_index, wrapping if necessary.
new_mean = mean + (x_new - xs[next_index])/window_size;
I have adapted Welford's algorithm and it works for all the values that I have tested with.
varSum = var_sum + (x_new - mean) * (x_new - new_mean) - (xs[next_index] - mean) * (xs[next_index] - new_mean);
xs[next_index] = x_new;
index = next_index;
To get the current variance just divide varSum by the window size: variance = varSum / window_size;
If you prefer code over words (heavily based on DanS' post): http://calcandstuff.blogspot.se/2014/02/rolling-variance-calculation.html
public IEnumerable RollingSampleVariance(IEnumerable data, int sampleSize)
{
double mean = 0;
double accVar = 0;
int n = 0;
var queue = new Queue(sampleSize);
foreach(var observation in data)
{
queue.Enqueue(observation);
if (n < sampleSize)
{
// Calculating first variance
n++;
double delta = observation - mean;
mean += delta / n;
accVar += delta * (observation - mean);
}
else
{
// Adjusting variance
double then = queue.Dequeue();
double prevMean = mean;
mean += (observation - then) / sampleSize;
accVar += (observation - prevMean) * (observation - mean) - (then - prevMean) * (then - mean);
}
if (n == sampleSize)
yield return accVar / (sampleSize - 1);
}
}
Actually Welfords algorithm can AFAICT easily be adapted to compute weighted Variance. And by setting weights to -1, you should be able to effectively cancel out elements. I havn't checked the math whether it allows negative weights though, but at a first look it should!
I did perform a small experiment using ELKI:
void testSlidingWindowVariance() {
MeanVariance mv = new MeanVariance(); // ELKI implementation of weighted Welford!
MeanVariance mc = new MeanVariance(); // Control.
Random r = new Random();
double[] data = new double[1000];
for (int i = 0; i < data.length; i++) {
data[i] = r.nextDouble();
}
// Pre-roll:
for (int i = 0; i < 10; i++) {
mv.put(data[i]);
}
// Compare to window approach
for (int i = 10; i < data.length; i++) {
mv.put(data[i-10], -1.); // Remove
mv.put(data[i]);
mc.reset(); // Reset statistics
for (int j = i - 9; j <= i; j++) {
mc.put(data[j]);
}
assertEquals("Variance does not agree.", mv.getSampleVariance(),
mc.getSampleVariance(), 1e-14);
}
}
I get around ~14 digits of precision compared to the exact two-pass algorithm; this is about as much as can be expected from doubles. Note that Welford does come at some computational cost because of the extra divisions - it takes about twice as long as the exact two-pass algorithm. If your window size is small, it may be much more sensible to actually recompute the mean and then in a second pass the variance every time.
I have added this experiment as unit test to ELKI, you can see the full source here: http://elki.dbs.ifi.lmu.de/browser/elki/trunk/test/de/lmu/ifi/dbs/elki/math/TestSlidingVariance.java it also compares to the exact two-pass variance.
However, on skewed data sets, the behaviour might be different. This data set obviously is uniform distributed; but I've also tried a sorted array and it worked.
Update: we published a paper with details on differentweighting schemes for (co-)variance:
Schubert, Erich, and Michael Gertz. "Numerically stable parallel computation of (co-) variance." Proceedings of the 30th International Conference on Scientific and Statistical Database Management. ACM, 2018. (Won the SSDBM best-paper award.)
This also discusses how weighting can be used to parallelize the computation, e.g., with AVX, GPUs, or on clusters.
Here's a divide and conquer approach that has O(log k)
-time updates, where k
is the number of samples. It should be relatively stable for the same reasons that pairwise summation and FFTs are stable, but it's a bit complicated and the constant isn't great.
Suppose we have a sequence A
of length m
with mean E(A)
and variance V(A)
, and a sequence B
of length n
with mean E(B)
and variance V(B)
. Let C
be the concatenation of A
and B
. We have
p = m / (m + n)
q = n / (m + n)
E(C) = p * E(A) + q * E(B)
V(C) = p * (V(A) + (E(A) + E(C)) * (E(A) - E(C))) + q * (V(B) + (E(B) + E(C)) * (E(B) - E(C)))
Now, stuff the elements in a red-black tree, where each node is decorated with mean and variance of the subtree rooted at that node. Insert on the right; delete on the left. (Since we're only accessing the ends, a splay tree might be O(1)
amortized, but I'm guessing amortized is a problem for your application.) If k
is known at compile-time, you could probably unroll the inner loop FFTW-style.
I know this question is old, but in case someone else is interested here follows the python code. It is inspired by johndcook blog post, @Joachim's, @DanS's code and @Jaime comments. The code below still gives small imprecisions for small data windows sizes. Enjoy.
from __future__ import division
import collections
import math
class RunningStats:
def __init__(self, WIN_SIZE=20):
self.n = 0
self.mean = 0
self.run_var = 0
self.WIN_SIZE = WIN_SIZE
self.windows = collections.deque(maxlen=WIN_SIZE)
def clear(self):
self.n = 0
self.windows.clear()
def push(self, x):
self.windows.append(x)
if self.n <= self.WIN_SIZE:
# Calculating first variance
self.n += 1
delta = x - self.mean
self.mean += delta / self.n
self.run_var += delta * (x - self.mean)
else:
# Adjusting variance
x_removed = self.windows.popleft()
old_m = self.mean
self.mean += (x - x_removed) / self.WIN_SIZE
self.run_var += (x + x_removed - old_m - self.mean) * (x - x_removed)
def get_mean(self):
return self.mean if self.n else 0.0
def get_var(self):
return self.run_var / (self.WIN_SIZE - 1) if self.n > 1 else 0.0
def get_std(self):
return math.sqrt(self.get_var())
def get_all(self):
return list(self.windows)
def __str__(self):
return "Current window values: {}".format(list(self.windows))
I look forward to be proven wrong on this but I don't think this can be done "quickly." That said, a large part of the calculation is keeping track of the EV over the window which can be done easily.
I'll leave with the question: are you sure you need a windowed function? Unless you are working with very large windows it is probably better to just use a well known predefined algorithm.
I guess keeping track of your 20 samples, Sum(X^2 from 1..20), and Sum(X from 1..20) and then successively recomputing the two sums at each iteration isn't efficient enough? It's possible to recompute the new variance without adding up, squaring, etc., all of the samples each time.
As in:
Sum(X^2 from 2..21) = Sum(X^2 from 1..20) - X_1^2 + X_21^2
Sum(X from 2..21) = Sum(X from 1..20) - X_1 + X_21
Here's another O(log k)
solution: find squares the original sequence, then sum pairs, then quadruples, etc.. (You'll need a bit of a buffer to be able to find all of these efficiently.) Then add up those values that you need to to get your answer. For example:
||||||||||||||||||||||||| // Squares
| | | | | | | | | | | | | // Sum of squares for pairs
| | | | | | | // Pairs of pairs
| | | | // (etc.)
| |
^------------------^ // Want these 20, which you can get with
| | // one...
| | | | // two, three...
| | // four...
|| // five stored values.
Now you use your standard E(x^2)-E(x)^2 formula and you're done. (Not if you need good stability for small sets of numbers; this was assuming that it was only accumulation of rolling error that was causing issues.)
That said, summing 20 squared numbers is very fast these days on most architectures. If you were doing more--say, a couple hundred--a more efficient method would clearly be better. But I'm not sure that brute force isn't the way to go here.
For only 20 values, it's trivial to adapt the method exposed here (I didn't say fast, though).
You can simply pick up an array of 20 of these RunningStat
classes.
The first 20 elements of the stream are somewhat special, however once this is done, it's much more simple:
- when a new element arrives, clear the current
RunningStat
instance, add the element to all 20 instances, and increment the "counter" (modulo 20) which identifies the new "full"RunningStat
instance - at any given moment, you can consult the current "full" instance to get your running variant.
You will obviously note that this approach isn't really scalable...
You can also note that there is some redudancy in the numbers we keep (if you go with the RunningStat
full class). An obvious improvement would be to keep the 20 lasts Mk
and Sk
directly.
I cannot think of a better formula using this particular algorithm, I am afraid that its recursive formulation somewhat ties our hands.
This is just a minor addition to the excellent answer provided by DanS. The following equations are for removing the oldest sample from the window and updating the mean and variance. This is useful, for example, if you want to take smaller windows near the right edge of your input data stream (i.e. just remove the oldest window sample without adding a new sample).
window_size -= 1; % decrease window size by 1 sample
new_mean = prev_mean + (prev_mean - x_old) / window_size
varSum = varSum - (prev_mean - x_old) * (new_mean - x_old)
Here, x_old is the oldest sample in the window you wish to remove.
For those coming here now, here's a reference containing the full derivation, with proofs, of DanS's answer and Jaime's related comment.
DanS and Jaime's response in concise C.
typedef struct {
size_t n, i;
float *samples, mean, var;
} rolling_var_t;
void rolling_var_init(rolling_var_t *c, size_t window_size) {
size_t ss;
memset(c, 0, sizeof(*c));
c->n = window_size;
c->samples = (float *) malloc(ss = sizeof(float)*window_size);
memset(c->samples, 0, ss);
}
void rolling_var_add(rolling_var_t *c, float x) {
float nmean; // new mean
float xold; // oldest x
float dx;
c->i = (c->i + 1) % c->n;
xold = c->samples[c->i];
dx = x - xold;
nmean = c->mean + dx / (float) c->n; // walk mean
//c->var += ((x - c->mean)*(x - nmean) - (xold - c->mean) * (xold - nmean)) / (float) c->n;
c->var += ((x + xold - c->mean - nmean) * dx) / (float) c->n;
c->mean = nmean;
c->samples[c->i] = x;
}
精彩评论