开发者

Updating a map from a high-rate data flow

I have a multi-threade开发者_如何转开发d Java application where a method [update(key, value)] updates a ConcurrentHashMap. For each key there will be more values received than can be put in the map and so once a key has been updated only the newest value of the threads waiting should be used to then update the map again. Or maybe there is some kind of lock that can be used where there is only ever 1 thread waiting - the one that has reached the lock last (effectively disposing of the thread already waiting)? It is important that the whole map is not locked which is why I haven't used a synchronized block around a normal HashMap as even if there are threads waiting on key A, key B should still be allowed to be updated as long as there are no threads already updating the value stored for B.

More succinctly, how do you update a map where key-value pairs are being received faster than updates can be made, using the last received value as the next update? So in the time A is updated to 1, values of 5, 3, 6, 8 are then received meaning the next update of A will be to 8.


This is a difficult problem, and the root of the difficulty is in capturing the order in which the updates arrive.

If the updates already have an associated (fine grained) timestamp, then the solution is fairly straight-forward:

  1. Define a Value class that hold the actual value and a timestamp. It needs a synchronized setIfNewer(ActualValue v, Timestamp t) which updates the actual value if the supplied timestamp is more recent.
  2. Define the map as ConcurrentHashMap<Key, Value>.
  3. Use putIfAbsent to put values into the map. If the putIfAbsent() returns a non-null value, use setIfNewer(...) to update it.

Note that this only works if the map updates can keep up in the long term; i.e. average data rate is not too high to cope with.

If the updates do not have an associated timestamp, then you've got a problem. If you are having difficulty keeping up with the updates, then you will have difficulty adding an timestamp to the updates that accurately reflects the arrival time. And that means that there is a risk that updates will be (in effect) reordered. (If this is the case then I don't think the problem is solvable ... without changing the problem; see below.)

Some things that might work:

  • Do some profiling / performance analysis to figure out where the bottleneck really is. It might not be in doing the map updates at all. (After all ConcurrentHashMap is designed to be highly scalable.)

  • If there is strong affinity between the threads and the key values, then you could try 1) de-duping the updates in each thread using a per-thread LRU map, or 2) use a per-thread counter instead of a timestamp.

  • You could try partitioning the map based on the keyspace.

  • You could try adding more processors and/or more memory ... depending on what your profiling and monitoring are reporting.

  • You could try partitioning the entire application based on the keyspace. If the real problem is that the application cannot keep up, this may be the only possible approach.


How to do it?

There is a fairly simple solution to implement a sequencer, each object you add, needs a long field that's assigned upon construction w/ smth like AtomicLong.getAndIncrement().

update looks like that and doesn't need sync.

Class Value{
private static final AtomicLong sequencer = new AtomicLong()
final long seq = sequencer.getAndIncrement():
public boolean equals(Object o){
  //include seq as well :)
}
....
}
ConcurrentMap map;
for (Value cur;;){
    cur = map.get(key);
    if (cur==null){
        if (null==(cur=map.putIfAbsent(key, value))){
            break;
        }
    }           
    if (cur.seq>=value.seq){
        break;
    }
    if (map.replace(key, cur, value))
        break;
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜