Fast set union of integer
I need to make lots of unions of ordered set of integers (I would like to avoid duplicates, but it is okay if there are).
This is the code with the best performance so far :
// some code added for better understanding
std::vector< std::pair<std::string, std::vector<unsigned int> > vec_map;
vec_map.push_back(std::make_pair("hi", std::vector<unsigned int>({1, 12, 1450});
vec_map.push_back(std::make_pair("stackoverflow", std::vector<unsigned int>({42, 1200, 14500});
std::vector<unsigned int> match(const std::string & token){
auto lower = std::lower_bound(vec_map.begin(), vec_map.end(), tok开发者_Go百科en, comp2());
auto upper = std::upper_bound(vec_map.begin(), vec_map.end(), token, comp());
std::vector<unsigned int> result;
for(; lower != upper; ++lower){
std::vector<unsigned int> other = lower->second;
result.insert(result.end(), other.begin(), other.end());
}
std::sort(result.begin(), result.end()); // This function eats 99% of my running time
return result;
}
valgrind (using the tool callgrind) tells me that I spend 99% of the time doing the sort.
This is what I tried so far :
- Using std::set (very bad performance)
- Using std::set_union (bad performance)
- maintaining a heap with std::push_heap (50% slower)
Is there any hope to gain somehow some performance? I can change my containers and use boost, and maybe some other lib (depending on its licence).
EDIT integers can be as big a 10 000 000 EDIT 2 gave some example of how I use it, because of some confusion
This looks like an instance of multi-way merge. Depending on the input (profile and time!), the best algorithm might be what you have or something that builds the result incrementally by selecting the smallest integer from all the containers or something more complex.
A custom merge sort may give a tiny amount of help.
#include <string>
#include <vector>
#include <algorithm>
#include <map>
#include <iostream>
#include <climits>
typedef std::multimap<std::string, std::vector<unsigned int> > vec_map_type;
vec_map_type vec_map;
struct comp {
bool operator()(const std::string& lhs, const std::pair<std::string, std::vector<unsigned int> >& rhs) const
{ return lhs < rhs.first; }
bool operator()(const std::pair<std::string, std::vector<unsigned int> >& lhs, const std::string& rhs) const
{ return lhs.first < rhs; }
};
typedef comp comp2;
std::vector<unsigned int> match(const std::string & token){
auto lower = std::lower_bound(vec_map.begin(), vec_map.end(), token, comp2());
auto upper = std::upper_bound(vec_map.begin(), vec_map.end(), token, comp());
unsigned int num_vecs = std::distance(lower, upper);
typedef std::vector<unsigned int>::const_iterator iter_type;
std::vector<iter_type> curs;
curs.reserve(num_vecs);
std::vector<iter_type> ends;
ends.reserve(num_vecs);
std::vector<unsigned int> result;
unsigned int result_count = 0;
//keep track of current position and ends
for(; lower != upper; ++lower){
const std::vector<unsigned int> &other = lower->second;
curs.push_back(other.cbegin());
ends.push_back(other.cend());
result_count += other.size();
}
result.reserve(result_count);
//merge sort
unsigned int last = UINT_MAX;
if (result_count) {
while(true) {
//find which current position points to lowest number
unsigned int least=0;
for(unsigned int i=0; i< num_vecs; ++i ){
if (curs[i] != ends[i] && (curs[least]==ends[least] || *curs[i]<*curs[least]))
least = i;
}
if (curs[least] == ends[least])
break;
//push back lowest number and increase that vectors current position
if( *curs[least] != last || result.size()==0) {
last = *curs[least];
result.push_back(last);
}
++curs[least];
}
}
return result;
}
int main() {
vec_map.insert(vec_map_type::value_type("apple", std::vector<unsigned int>(10, 10)));
std::vector<unsigned int> t;
t.push_back(1); t.push_back(2); t.push_back(11); t.push_back(12);
vec_map.insert(vec_map_type::value_type("apple", t));
vec_map.insert(vec_map_type::value_type("apple", std::vector<unsigned int>()));
std::vector<unsigned int> res = match("apple");
for(unsigned int i=0; i<res.size(); ++i)
std::cout << res[i] << ' ';
return 0;
}
http://ideone.com/1rYTi
ALTERNATIVE SOLUTION:
Method std::sort (if it is based on quick-sort) is very good sorting non-sorted vectorsO(logN), is even better with sorted vector, but if your vector is inverted sorted have O(N^2). It may happen that when you perform the union you have a lot of operands, with the first one containing bigger values than the followers.
I would try the following (I suppose elements from input vectors are already sorted):
As suggested by other people here you should reseve the required size on the results vector before start filling it.
In case std::distance(lower, upper) == 1 there is no reason to union, just copy the contento of the single operand.
Sort the operands of your union, maybe by size (bigger first), or if the ranges don't overlap or partially overlap just by first value), so that you maximize the number of elements that are already sorted on the next step. Probably the best is a strategy that keep in consideration both things SIZE and RANGE of every operand of your union. A lot depends on the real data.
If there are few operands with a lot of elements each, keep appending the elements on the back of the result vector, but after appending each vector (from the second) you can try to merge (std::inplace_merge) the old content with the appendend one, this also deduplicates elements for you.
If the number of operands is big (comparing to the total number of elements) then you should stay with your former strategy of sorting afterward but call std::unique to deduplicate after sort. In this case you should sort by the range of elements contained.
If the number of elements is relatively large percentage of range of possible int
s, you might get a decent performance out of what is essentially a simplified "hash join" (to use DB terminology).
(If there is a relatively small number of integers compared to range of possible values, this is probably not the best approach.)
Essentially, we make a giant bitmap, then set flags only on indexes corresponding to the input int
s and finally reconstruct the result based on these flags:
#include <vector>
#include <algorithm>
#include <iostream>
#include <time.h>
template <typename ForwardIterator>
std::vector<int> IntSetUnion(
ForwardIterator begin1,
ForwardIterator end1,
ForwardIterator begin2,
ForwardIterator end2
) {
int min = std::numeric_limits<int>::max();
int max = std::numeric_limits<int>::min();
for (auto i = begin1; i != end1; ++i) {
min = std::min(*i, min);
max = std::max(*i, max);
}
for (auto i = begin2; i != end2; ++i) {
min = std::min(*i, min);
max = std::max(*i, max);
}
if (min < std::numeric_limits<int>::max() && max > std::numeric_limits<int>::min()) {
std::vector<int>::size_type result_size = 0;
std::vector<bool> bitmap(max - min + 1, false);
for (auto i = begin1; i != end1; ++i) {
const std::vector<bool>::size_type index = *i - min;
if (!bitmap[index]) {
++result_size;
bitmap[index] = true;
}
}
for (auto i = begin2; i != end2; ++i) {
const std::vector<bool>::size_type index = *i - min;
if (!bitmap[index]) {
++result_size;
bitmap[index] = true;
}
}
std::vector<int> result;
result.reserve(result_size);
for (std::vector<bool>::size_type index = 0; index != bitmap.size(); ++index)
if (bitmap[index])
result.push_back(index + min);
return result;
}
return std::vector<int>();
}
void main() {
// Basic sanity test...
{
std::vector<int> v1;
v1.push_back(2);
v1.push_back(2000);
v1.push_back(229013);
v1.push_back(-2243);
v1.push_back(-530);
std::vector<int> v2;
v1.push_back(2);
v2.push_back(243);
v2.push_back(90120);
v2.push_back(329013);
v2.push_back(-530);
auto result = IntSetUnion(v1.begin(), v1.end(), v2.begin(), v2.end());
for (auto i = result.begin(); i != result.end(); ++i)
std::cout << *i << std::endl;
}
// Benchmark...
{
const auto count = 10000000;
std::vector<int> v1(count);
std::vector<int> v2(count);
for (int i = 0; i != count; ++i) {
v1[i] = i;
v2[i] = i - count / 2;
}
std::random_shuffle(v1.begin(), v1.end());
std::random_shuffle(v2.begin(), v2.end());
auto start_time = clock();
auto result = IntSetUnion(v1.begin(), v1.end(), v2.begin(), v2.end());
auto end_time = clock();
std::cout << "Time: " << (((double)end_time - start_time) / CLOCKS_PER_SEC) << std::endl;
std::cout << "Union element count: " << result.size() << std::endl;
}
}
This prints...
Time: 0.402
...on my machine.
If you want to get your input int
s from something else other than std::vector<int>
, you can implement your own iterator and pass it to IntSetUnion
.
You're sorting integers with a limited range, this is one of those rare circumstances where a radix sort could be used. Unfortunately the only way to know if this beats the generalized sort is to try it.
精彩评论