开发者

Parallelizing quicksort makes it slower

I'm quicksorting over a very large amount of data, and for fun am trying to parallelize it to speed up the sort. However, in it's current form, the multithreaded version is slower than the singlethreaded version due to synchronization chokepoints.

Every time I spawn a thread, I get a lock on an int and increment it, and every time the thread finishes I again get a lock and decrement, in addition to checking if there are any threads still running (int > 0). If not, I wake up my main thread and do work with the sorted data.

I'm sure there's a better way to do this. Not sure what it is though. Help is greatly appreciated.

EDIT: I guess I didn't provide enough info.

This is a Java code on an oct开发者_开发知识库o-core Opteron. I can't switch languages.

The amount I'm sorting fits into memory, and it already exists in memory at the point when quicksort is called, so there's no reason to write it to disk only to read it back into memory.

By "get a lock" I mean having a synchronized block on the integer.


Without knowing more about the implementation here are my suggestions and/or comments:

  1. Limit the number of threads that can run at any give time. Pergaps 8 or 10 (perhaps to give the scheduler more leeway, although it would best to put one per core/hw thread). There isn't really a point running more threads for "throughput" on a CPU-bound problem if the affinity doesn't support it.

  2. Don't thread near the leaves!!! Only thread on the larger branches. There is no point spawning a thread to sort a relatively few number of items and at this level there are many many little branches! Threading would add much more relative overhead here. (This is similar to switching to a "simple sort" for the leaves).

  3. Make sure each thread can work in isolation -- should not stomp on another thread during work -> no locks, just wait for join. Divide and conquer.

  4. Possibly look at performing a "breadth-first" approach to spawn threads.

  5. Consider a mergesort over a quicksort (I am biased towards mergesort :-) Remember there are numerous different kinds of mergesorts including bottom-up.

Edit

  1. Make sure it actually works. Remember to correctly utilize memory barriers between threads -- required even if no two threads modify the same data at once to ensure the correct visibility.

Edit (proof-of-concept):

I threw together this simple demonstration. On my Intel Core2 Duo @ 2Ghz I could get it run in about 2/3 to 3/4 the time which is definitely some improvement :) (Settings: DATA_SIZE = 3000000, MAX_THREADS = 4, MIN_PARALLEL = 1000). This is with the basic in-place quicksort code ripped from Wikipedia that does not take advantage of any other basic optimizations.

The method in which it determines if a new thread can/should start is also very primitive -- if no new thread is available it just chugs right along (because, you know, why wait?)

This code should also (hopefully) fan-out breadth-wise with the threads. This may be less efficient for data-locality than keeping it depth-wise, but the model seemed simple enough if my head.

An executor service is also used to simplify the design and to be able to reuse the same threads (vs. spawning new threads). MIN_PARALLEL can become quite small (say, about 20) before the executor overhead starts to show -- the maximum number of threads and only-using-a-new-thread-if-possible likely keeps this in check as well.

qsort average seconds: 0.6290541056
pqsort average seconds: 0.4513915392

I make absolutely no guarantees about the usefulness or correctness of this code but it "seems to work" here. Pay heed to the warning next to the ThreadPoolExecutor as it clearly shows I'm not entirely certain what is happening :-) I am fairly certain the design is somewhat flawed in under-utilizing the threads.

package psq;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;

public class Main {

    int[] genData (int len) {
        Random r = new Random();
        int[] newData = new int[len];
        for (int i = 0; i < newData.length; i++) {
            newData[i] = r.nextInt();
        }
        return newData;
    }      

    boolean check (int[] arr) {
        if (arr.length == 0) {
            return true;
        }
        int lastValue = arr[0];
        for (int i = 1; i < arr.length; i++) {
            //System.out.println(arr[i]);
            if (arr[i] < lastValue) {
                return false;
            }
            lastValue = arr[i];
        }
        return true;
    }

    int partition (int[] arr, int left, int right, int pivotIndex) {
        // pivotValue := array[pivotIndex]
        int pivotValue = arr[pivotIndex];
        {
            // swap array[pivotIndex] and array[right] // Move pivot to end
            int t = arr[pivotIndex];
            arr[pivotIndex] = arr[right];
            arr[right] = t;
        }
        // storeIndex := left
        int storeIndex = left;
        // for i  from  left to right - 1 // left ≤ i < right
        for (int i = left; i < right; i++) {
            //if array[i] ≤ pivotValue
            if (arr[i] <= pivotValue) {
                //swap array[i] and array[storeIndex]
                //storeIndex := storeIndex + 1            
                int t = arr[i];
                arr[i] = arr[storeIndex];
                arr[storeIndex] = t;
                storeIndex++;                   
            }
        }
        {
            // swap array[storeIndex] and array[right] // Move pivot to its final place
            int t = arr[storeIndex];
            arr[storeIndex] = arr[right];
            arr[right] = t;
        }
        // return storeIndex
        return storeIndex;
    }

    void quicksort (int[] arr, int left, int right) {
        // if right > left
        if (right > left) {            
            // select a pivot index //(e.g. pivotIndex := left + (right - left)/2)
            int pivotIndex = left + (right - left) / 2;
            // pivotNewIndex := partition(array, left, right, pivotIndex)
            int pivotNewIndex = partition(arr, left, right, pivotIndex);
            // quicksort(array, left, pivotNewIndex - 1)
            // quicksort(array, pivotNewIndex + 1, right)
            quicksort(arr, left, pivotNewIndex - 1);
            quicksort(arr, pivotNewIndex + 1, right);
        }
    }

    static int DATA_SIZE = 3000000;
    static int MAX_THREADS = 4;
    static int MIN_PARALLEL = 1000;

    // NOTE THAT THE THREAD POOL EXECUTER USES A LINKEDBLOCKINGQUEUE
    // That is, because it's possible to OVER SUBMIT with this code,
    // even with the semaphores!
    ThreadPoolExecutor tp = new ThreadPoolExecutor(
            MAX_THREADS,
            MAX_THREADS,
            Long.MAX_VALUE,
            TimeUnit.NANOSECONDS,
            new LinkedBlockingQueue<Runnable>());
    // if there are no semaphore available then then we just continue
    // processing from the same thread and "deal with it"
    Semaphore sem = new Semaphore(MAX_THREADS, false); 

    class QuickSortAction implements Runnable {
        int[] arr;
        int left;
        int right;

        public QuickSortAction (int[] arr, int left, int right) {
            this.arr = arr;
            this.left = left;
            this.right = right;
        }

        public void run () {
            try {
                //System.out.println(">>[" + left + "|" + right + "]");
                pquicksort(arr, left, right);
                //System.out.println("<<[" + left + "|" + right + "]");
            } catch (Exception ex) {
                // I got nothing for this
                throw new RuntimeException(ex); 
            }
        }

    }

    // pquicksort
    // threads will [hopefully] fan-out "breadth-wise"
    // this is because it's likely that the 2nd executer (if needed)
    // will be submitted prior to the 1st running and starting its own executors
    // of course this behavior is not terribly well-define
    void pquicksort (int[] arr, int left, int right) throws ExecutionException, InterruptedException {
        if (right > left) {
            // memory barrier -- pquicksort is called from different threads
            synchronized (arr) {}

            int pivotIndex = left + (right - left) / 2;
            int pivotNewIndex = partition(arr, left, right, pivotIndex);

            Future<?> f1 = null;
            Future<?> f2 = null;

            if ((pivotNewIndex - 1) - left > MIN_PARALLEL) {
                if (sem.tryAcquire()) {
                    f1 = tp.submit(new QuickSortAction(arr, left, pivotNewIndex - 1));
                } else {
                    pquicksort(arr, left, pivotNewIndex - 1);
                }
            } else {
                quicksort(arr, left, pivotNewIndex - 1);
            }
            if (right - (pivotNewIndex + 1) > MIN_PARALLEL) {
                if (sem.tryAcquire()) {
                    f2 = tp.submit(new QuickSortAction(arr, pivotNewIndex + 1, right));
                } else {
                    pquicksort(arr, pivotNewIndex + 1, right);
                }
            } else {
                quicksort(arr, pivotNewIndex + 1, right);
            }

            // join back up
            if (f1 != null) {
                f1.get();
                sem.release();
            }
            if (f2 != null) {
                f2.get();
                sem.release();
            }
        }        
    }

    long qsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        quicksort(data, 0, data.length - 1);
        long duration = System.nanoTime() - start;
        if (!check(data)) {
            throw new Exception("qsort not sorted!");
        }
        return duration;
    }

    long pqsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        pquicksort(data, 0, data.length - 1);
        long duration = System.nanoTime() - start;
        if (!check(data)) {
            throw new Exception("pqsort not sorted!");
        }
        return duration;
    }

    public Main () throws Exception {
        long qsort_duration = 0;
        long pqsort_duration = 0;
        int ITERATIONS = 10;
        for (int i = 0; i < ITERATIONS; i++) {
            System.out.println("Iteration# " + i);
            int[] data = genData(DATA_SIZE);
            if ((i & 1) == 0) {
                qsort_duration += qsort_call(data);
                pqsort_duration += pqsort_call(data);
            } else {
                pqsort_duration += pqsort_call(data);
                qsort_duration += qsort_call(data);
            }
        }
        System.out.println("====");
        System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9));
        System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9));
    }

    public static void main(String[] args) throws Exception {
        new Main();
    }

}

YMMV. Happy coding.

(Also, I'd like to know how this -- or similar -- code fairs on your 8-core box. Wikipedia claims a linear speedup by number-of-cpus is possible :)

Edit (better numbers)

Removed use of futures which caused a minor "jam" and switched to a single final-wait-semaphore: less useless waiting. Now runs in just 55% of non-threaded time :-)

qsort average seconds: 0.5999702528
pqsort average seconds: 0.3346969088

(

package psq;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;

public class Main {

    int[] genData (int len) {
        Random r = new Random();
        int[] newData = new int[len];
        for (int i = 0; i < newData.length; i++) {
            newData[i] = r.nextInt();
        }
        return newData;
    }      

    boolean check (int[] arr) {
        if (arr.length == 0) {
            return true;
        }
        int lastValue = arr[0];
        for (int i = 1; i < arr.length; i++) {
            //System.out.println(arr[i]);
            if (arr[i] < lastValue) {
                return false;
            }
            lastValue = arr[i];
        }
        return true;
    }

    int partition (int[] arr, int left, int right, int pivotIndex) {
        // pivotValue := array[pivotIndex]
        int pivotValue = arr[pivotIndex];
        {
            // swap array[pivotIndex] and array[right] // Move pivot to end
            int t = arr[pivotIndex];
            arr[pivotIndex] = arr[right];
            arr[right] = t;
        }
        // storeIndex := left
        int storeIndex = left;
        // for i  from  left to right - 1 // left ≤ i < right
        for (int i = left; i < right; i++) {
            //if array[i] ≤ pivotValue
            if (arr[i] <= pivotValue) {
                //swap array[i] and array[storeIndex]
                //storeIndex := storeIndex + 1            
                int t = arr[i];
                arr[i] = arr[storeIndex];
                arr[storeIndex] = t;
                storeIndex++;                   
            }
        }
        {
            // swap array[storeIndex] and array[right] // Move pivot to its final place
            int t = arr[storeIndex];
            arr[storeIndex] = arr[right];
            arr[right] = t;
        }
        // return storeIndex
        return storeIndex;
    }

    void quicksort (int[] arr, int left, int right) {
        // if right > left
        if (right > left) {            
            // select a pivot index //(e.g. pivotIndex := left + (right - left)/2)
            int pivotIndex = left + (right - left) / 2;
            // pivotNewIndex := partition(array, left, right, pivotIndex)
            int pivotNewIndex = partition(arr, left, right, pivotIndex);
            // quicksort(array, left, pivotNewIndex - 1)
            // quicksort(array, pivotNewIndex + 1, right)
            quicksort(arr, left, pivotNewIndex - 1);
            quicksort(arr, pivotNewIndex + 1, right);
        }
    }

    static int DATA_SIZE = 3000000;
    static int MAX_EXTRA_THREADS = 7;
    static int MIN_PARALLEL = 500;

    // To get to reducePermits
    @SuppressWarnings("serial")
    class Semaphore2 extends Semaphore {
        public Semaphore2(int permits, boolean fair) {
            super(permits, fair);
        }
        public void removePermit() {
            super.reducePermits(1);
        }
    }

    class QuickSortAction implements Runnable {
        final int[] arr;
        final int left;
        final int right;
        final SortState ss;

        public QuickSortAction (int[] arr, int left, int right, SortState ss) {
            this.arr = arr;
            this.left = left;
            this.right = right;
            this.ss = ss;
        }

        public void run () {
            try {
                //System.out.println(">>[" + left + "|" + right + "]");
                pquicksort(arr, left, right, ss);
                //System.out.println("<<[" + left + "|" + right + "]");
                ss.limit.release();
                ss.countdown.release();
            } catch (Exception ex) {
                // I got nothing for this
                throw new RuntimeException(ex); 
            }
        }

    }

    class SortState {
        final public ThreadPoolExecutor pool = new ThreadPoolExecutor(
            MAX_EXTRA_THREADS,
            MAX_EXTRA_THREADS,
            Long.MAX_VALUE,
            TimeUnit.NANOSECONDS,
            new LinkedBlockingQueue<Runnable>());
        // actual limit: executor may actually still have "active" things to process
        final public Semaphore limit = new Semaphore(MAX_EXTRA_THREADS, false); 
        final public Semaphore2 countdown = new Semaphore2(1, false); 
    }

    void pquicksort (int[] arr) throws Exception {
        SortState ss = new SortState();
        pquicksort(arr, 0, arr.length - 1, ss);
        ss.countdown.acquire();
    }

    // pquicksort
    // threads "fork" if available.
    void pquicksort (int[] arr, int left, int right, SortState ss) throws ExecutionException, InterruptedException {
        if (right > left) {
            // memory barrier -- pquicksort is called from different threads
            // and those threads may be created because they are in an executor
            synchronized (arr) {}

            int pivotIndex = left + (right - left) / 2;
            int pivotNewIndex = partition(arr, left, right, pivotIndex);

            {
                int newRight = pivotNewIndex - 1;
                if (newRight - left > MIN_PARALLEL) {
                    if (ss.limit.tryAcquire()) {
                        ss.countdown.removePermit();
                        ss.pool.submit(new QuickSortAction(arr, left, newRight, ss));
                    } else {
                        pquicksort(arr, left, newRight, ss);
                    }
                } else {
                    quicksort(arr, left, newRight);
                }
            }

            {
                int newLeft = pivotNewIndex + 1;
                if (right - newLeft > MIN_PARALLEL) {
                    if (ss.limit.tryAcquire()) {
                        ss.countdown.removePermit();
                        ss.pool.submit(new QuickSortAction(arr, newLeft, right, ss));
                    } else {
                        pquicksort(arr, newLeft, right, ss);
                    }
                } else {
                    quicksort(arr, newLeft, right);
                }
            }

        }        
    }

    long qsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        quicksort(data, 0, data.length - 1);
        long duration = System.nanoTime() - start;
        if (!check(data)) {
            throw new Exception("qsort not sorted!");
        }
        return duration;
    }

    long pqsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        pquicksort(data);
        long duration = System.nanoTime() - start;
        if (!check(data)) {            
            throw new Exception("pqsort not sorted!");
        }
        return duration;
    }

    public Main () throws Exception {
        long qsort_duration = 0;
        long pqsort_duration = 0;
        int ITERATIONS = 10;
        for (int i = 0; i < ITERATIONS; i++) {
            System.out.println("Iteration# " + i);
            int[] data = genData(DATA_SIZE);
            if ((i & 1) == 0) {
                qsort_duration += qsort_call(data);
                pqsort_duration += pqsort_call(data);
            } else {
                pqsort_duration += pqsort_call(data);
                qsort_duration += qsort_call(data);
            }
        }
        System.out.println("====");
        System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9));
        System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9));
    }

    public static void main(String[] args) throws Exception {
        new Main();
    }

}


By "get a lock" I mean having a synchronized block on the integer. If I understand you correctly: you're locking on every element you actually sort, which sounds like it's going to be VERY slow!

It sounds like you're spawning too many threads... you haven't told us how many threads you're actually spawning, but if you're doing one thread per integer then it will almost certainly be slower (where almost certainly is an understatement). What you would want to do is spawn 8 threads, since you have 8 cores, and "partition" your array into 8 sections which you will quicksort separately and then concatenate just as you would have in the original algorithm.

Here are some examples on how to achieve it: Multithreaded quicksort or mergesort


Threads are expensive. Don't use threads if you not having tons of data to sort. Or you could use a language that has better design for concurrency. E.g. Erlang has very light-weight threads that can be useful for sorting.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜