Java:通过多线程并行化快速排序

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/3425126/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-14 00:40:44  来源:igfitidea点击:

Java: Parallelizing quick sort via multi-threading

javamultithreadingparallel-processingquicksort

提问by Robz

I am experimenting with parallelizing algorithms in Java. I began with merge sort, and posted my attempt in this question. My revised attempt is in the code below, where I now try to parallelize quick sort.

我正在试验 Java 中的并行化算法。我从合并排序开始,并在这个问题中发布了我的尝试。我修改后的尝试在下面的代码中,我现在尝试并行化快速排序。

Are there any rookie mistakes in my multi-threaded implementation or approach to this problem? If not, shouldn't I expect more than a 32% speed increase between a sequential and a parallelized algorithm on a duel-core (see timings at bottom)?

我的多线程实现或解决此问题的方法中是否有任何菜鸟错误?如果不是,我不应该期望双核上的顺序算法和并行算法之间的速度提高超过 32%(请参阅底部的计时)?

Here is the multithreading algorithm:

这是多线程算法:

    public class ThreadedQuick extends Thread
    {
        final int MAX_THREADS = Runtime.getRuntime().availableProcessors();

        CountDownLatch doneSignal;
        static int num_threads = 1;

        int[] my_array;
        int start, end;

        public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) {
            this.my_array = array;
            this.start = start;
            this.end = end;
            this.doneSignal = doneSignal;
        }

        public static void reset() {
            num_threads = 1;
        }

        public void run() {
            quicksort(my_array, start, end);
            doneSignal.countDown();
            num_threads--;
        }

        public void quicksort(int[] array, int start, int end) {
            int len = end-start+1;

            if (len <= 1)
                return;

            int pivot_index = medianOfThree(array, start, end);
            int pivotValue = array[pivot_index];

            swap(array, pivot_index, end);

            int storeIndex = start;
            for (int i = start; i < end; i++) {
               if (array[i] <= pivotValue) {
                   swap(array, i, storeIndex);
                   storeIndex++;
               }
            }

            swap(array, storeIndex, end);

            if (num_threads < MAX_THREADS) {
                num_threads++;

                CountDownLatch completionSignal = new CountDownLatch(1);

                new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start();
                quicksort(array, storeIndex + 1, end);

                try {
                    completionSignal.await(1000, TimeUnit.SECONDS);
                } catch(Exception ex) {
                    ex.printStackTrace();
                }
            } else {
                quicksort(array, start, storeIndex - 1);
                quicksort(array, storeIndex + 1, end);
            }
        }
    }

Here is how I start it off:

这是我开始的方式:

ThreadedQuick.reset();
CountDownLatch completionSignal = new CountDownLatch(1);
new ThreadedQuick(completionSignal, array, 0, array.length-1).start();
try {
    completionSignal.await(1000, TimeUnit.SECONDS);
} catch(Exception ex){
    ex.printStackTrace();
}

I tested this against Arrays.sort and a similar sequential quick sort algorithm. Here are the timing results on an intel duel-core dell laptop, in seconds:

我针对 Arrays.sort 和类似的顺序快速排序算法对此进行了测试。以下是英特尔双核戴尔笔记本电脑的计时结果,以秒为单位:

Elements: 500,000, sequential: 0.068592, threaded: 0.046871, Arrays.sort: 0.079677

元素:500,000,顺序:0.068592,线程:0.046871,Arrays.sort:0.079677

Elements: 1,000,000, sequential: 0.14416, threaded: 0.095492, Arrays.sort: 0.167155

元素:1,000,000,顺序:0.14416,线程:0.095492,Arrays.sort:0.167155

Elements: 2,000,000, sequential: 0.301666, threaded: 0.205719, Arrays.sort: 0.350982

元素:2,000,000,顺序:0.301666,线程:0.205719,Arrays.sort:0.350982

Elements: 4,000,000, sequential: 0.623291, threaded: 0.424119, Arrays.sort: 0.712698

元素:4,000,000,顺序:0.623291,线程:0.424119,Arrays.sort:0.712698

Elements: 8,000,000, sequential: 1.279374, threaded: 0.859363, Arrays.sort: 1.487671

元素:8,000,000,顺序:1.279374,线程:0.859363,Arrays.sort:1.487671

Each number above is the average time of 100 tests, throwing out the 3 lowest and 3 highest cases. I used Random.nextInt(Integer.MAX_VALUE) to generate an array for each test, which was initialized once every 10 tests with the same seed. Each test consisted of timing the given algorithm with System.nanoTime. I rounded to six decimal places after averaging. And obviously, I did check to see if each sort worked.

上面的每个数字都是 100 次测试的平均时间,剔除 3 个最低和 3 个最高案例。我使用 Random.nextInt(Integer.MAX_VALUE) 为每个测试生成一个数组,该数组每 10 个测试使用相同的种子初始化一次。每个测试都包括使用 System.nanoTime 对给定算法进行计时。平均后我四舍五入到小数点后六位。显然,我确实检查了每种排序是否有效

As you can see, there is about a 32% increase in speed between the sequential and threaded cases in every set of tests. As I asked above, shouldn't I expect more than that?

如您所见,在每组测试中,顺序和线程情况之间的速度提高了约 32%。正如我上面所问的,我不应该期望更多吗?

采纳答案by Michael Barker

Making numThreads static can cause problems, it is highly likely that you will end up with more than MAX_THREADS running at some point.

将 numThreads 设为静态可能会导致问题,很可能您最终会在某个时刻运行超过 MAX_THREADS。

Probably the reason why you don't get a full double up in performance is that your quick sort can not be fully parallelised. Note that the first call to quicksort will do a pass through the whole array in the initial thread before it starts to really run in parallel. There is also an overhead in parallelising an algorithm in the form of context switching and mode transitions when farming off to separate threads.

性能没有完全翻倍的原因可能是您的快速排序无法完全并行化。请注意,在开始真正并行运行之前,对快速排序的第一次调用将在初始线程中遍历整个数组。在分离线程时,以上下文切换和模式转换的形式并行化算法也会产生开销。

Have a look at the Fork/Join framework, this problem would probably fit quite neatly there.

看看 Fork/Join 框架,这个问题可能非常适合那里。

A couple of points on the implementation. Implement Runnable rather than extending Thread. Extending a Thread should be used only when you create some new version of Thread class. When you just want to do some job to be run in parallel you are better off with Runnable. While iplementing a Runnable you can also still extend another class which gives you more flexibility in OO design. Use a thread pool that is restricted to the number of threads you have available in the system. Also don't use numThreads to make the decision on whether to fork off a new thread or not. You can calculate this up front. Use a minimum partition size which is the size of the total array divided by the number of processors available. Something like:

关于实施的几点。实现 Runnable 而不是扩展 Thread。仅当您创建一些新版本的 Thread 类时才应使用扩展线程。当你只想做一些并行运行的工作时,你最好使用 Runnable。在实现 Runnable 的同时,您还可以扩展另一个类,这使您在 OO 设计中具有更大的灵活性。使用受系统中可用线程数限制的线程池。也不要使用 numThreads 来决定是否分叉新线程。你可以预先计算这个。使用最小分区大小,即总数组大小除以可用处理器数。就像是:

public class ThreadedQuick implements Runnable {

    public static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
    static final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);

    final int[] my_array;
    final int start, end;

    private final int minParitionSize;

    public ThreadedQuick(int minParitionSize, int[] array, int start, int end) {
        this.minParitionSize = minParitionSize;
        this.my_array = array;
        this.start = start;
        this.end = end;
    }

    public void run() {
        quicksort(my_array, start, end);
    }

    public void quicksort(int[] array, int start, int end) {
        int len = end - start + 1;

        if (len <= 1)
            return;

        int pivot_index = medianOfThree(array, start, end);
        int pivotValue = array[pivot_index];

        swap(array, pivot_index, end);

        int storeIndex = start;
        for (int i = start; i < end; i++) {
            if (array[i] <= pivotValue) {
                swap(array, i, storeIndex);
                storeIndex++;
            }
        }

        swap(array, storeIndex, end);

        if (len > minParitionSize) {

            ThreadedQuick quick = new ThreadedQuick(minParitionSize, array, start, storeIndex - 1);
            Future<?> future = executor.submit(quick);
            quicksort(array, storeIndex + 1, end);

            try {
                future.get(1000, TimeUnit.SECONDS);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        } else {
            quicksort(array, start, storeIndex - 1);
            quicksort(array, storeIndex + 1, end);
        }
    }    
}

You can kick it off by doing:

您可以通过执行以下操作来启动它:

ThreadedQuick quick = new ThreadedQuick(array / ThreadedQuick.MAX_THREADS, array, 0, array.length - 1);
quick.run();

This will start the sort in the same thread, which avoids an unnecessary thread hop at start up.

这将在同一线程中开始排序,从而避免启动时不必要的线程跳跃。

Caveat: Not sure the above implementation will actually be faster as I haven't benchmarked it.

警告:不确定上述实现实际上会更快,因为我没有对其进行基准测试。

回答by wheaties

Couple of comments if I understand your code right:

如果我正确理解您的代码,请提出几点意见:

  1. I don't see a lock around the numthreads object even though it could be accessed via multiple threads. Perhaps you should make it an AtomicInteger.

  2. Use a thread pool and arrange the tasks, i.e. a single call to quicksort, to take advantange of a thread pool. Use Futures.

  1. 即使可以通过多个线程访问,我也没有看到 numthreads 对象周围的锁。也许您应该将其设为 AtomicInteger。

  2. 使用线程池并安排任务,即对快速排序的单个调用,以利用线程池。使用期货。

Your current method of dividing things the way you're doing could leave a smaller division with a thread and a larger division without a thread. That is to say, it doesn't prioritize larger segments with their own threads.

您目前按照自己的方式划分事物的方法可能会留下一个带有线程的较小划分和一个没有线程的较大划分。也就是说,它不会优先考虑具有自己线程的较大段。

回答by Peter Lawrey

This uses a combination of quick sort and merge sort.

这使用了快速排序和归并排序的组合。

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ParallelSortMain {
    public static void main(String... args) throws InterruptedException {
        Random rand = new Random();
        final int[] values = new int[100*1024*1024];
        for (int i = 0; i < values.length; i++)
            values[i] = rand.nextInt();

        int threads = Runtime.getRuntime().availableProcessors();
        ExecutorService es = Executors.newFixedThreadPool(threads);
        int blockSize = (values.length + threads - 1) / threads;
        for (int i = 0; i < values.length; i += blockSize) {
            final int min = i;
            final int max = Math.min(min + blockSize, values.length);
            es.submit(new Runnable() {
                @Override
                public void run() {
                    Arrays.sort(values, min, max);
                }
            });
        }
        es.shutdown();
        es.awaitTermination(10, TimeUnit.MINUTES);
        for (int blockSize2 = blockSize; blockSize2 < values.length / 2; blockSize2 *= 2) {
            for (int i = 0; i < values.length; i += blockSize2) {
                final int min = i;
                final int mid = Math.min(min + blockSize2, values.length);
                final int max = Math.min(min + blockSize2 * 2, values.length);
                mergeSort(values, min, mid, max);
            }
        }
    }

    private static boolean mergeSort(int[] values, int left, int mid, int end) {
        int[] results = new int[end - left];
        int l = left, r = mid, m = 0;
        for (; l < left && r < mid; m++) {
            int lv = values[l];
            int rv = values[r];
            if (lv < rv) {
                results[m] = lv;
                l++;
            } else {
                results[m] = rv;
                r++;
            }
        }
        while (l < mid)
            results[m++] = values[l++];
        while (r < end)
            results[m++] = values[r++];
        System.arraycopy(results, 0, values, left, results.length);
        return false;
    }
}