Java的“Parallel.For”?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/4010185/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-14 08:19:38  来源:igfitidea点击:

"Parallel.For" for Java?

javaparallel-processing

提问by Jamie

I was wondering if there is a Parallel.Forequivalent to the .net version for Java?

我想知道是否有Parallel.For相当于 Java 的 .net 版本?

If there is could someone please supply an example? thanks!

如果有人可以提供一个例子吗?谢谢!

采纳答案by Matt Crinklaw-Vogt

I guess the closest thing would be:

我想最接近的事情是:

ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);
try {
    for (final Object o : list) {
        exec.submit(new Runnable() {
            @Override
            public void run() {
                // do stuff with o.
            }
        });
    }
} finally {
    exec.shutdown();
}

Based on TheLQ's comments, you would set SUM_NUM_THREADS to Runtime.getRuntime().availableProcessors();

根据 TheLQ 的评论,您可以将 SUM_NUM_THREADS 设置为 Runtime.getRuntime().availableProcessors();

Edit: Decided to add a basic "Parallel.For" implementation

编辑:决定添加一个基本的“Parallel.For”实现

public class Parallel {
    private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();

    private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For"));

    public static <T> void For(final Iterable<T> elements, final Operation<T> operation) {
        try {
            // invokeAll blocks for us until all submitted tasks in the call complete
            forPool.invokeAll(createCallables(elements, operation));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) {
        List<Callable<Void>> callables = new LinkedList<Callable<Void>>();
        for (final T elem : elements) {
            callables.add(new Callable<Void>() {
                @Override
                public Void call() {
                    operation.perform(elem);
                    return null;
                }
            });
        }

        return callables;
    }

    public static interface Operation<T> {
        public void perform(T pParameter);
    }
}

Example Usage of Parallel.For

Parallel.For 的示例用法

// Collection of items to process in parallel
Collection<Integer> elems = new LinkedList<Integer>();
for (int i = 0; i < 40; ++i) {
    elems.add(i);
}
Parallel.For(elems, 
 // The operation to perform with each item
 new Parallel.Operation<Integer>() {
    public void perform(Integer param) {
        System.out.println(param);
    };
});

I guess this implementation is really more similar to Parallel.ForEach

我猜这个实现真的更类似于Parallel.ForEach

EditI put this up on GitHub if anyone is interested. Parallel For on GitHub

编辑如果有人感兴趣,我把它放在 GitHub 上。GitHub 上的 Parallel For

回答by Emil

Fork join framework in Java 7is for concurrency support. But I don't know about an exact equivalent for Parallel.For.

Java 7 中的 Fork join 框架用于并发支持。但我不知道Parallel.For.

回答by Peter Lawrey

A simpler option would be

一个更简单的选择是

// A thread pool which runs for the life of the application.
private static final ExecutorService EXEC = 
    Executors.newFixedThreadPool(SOME_NUM_OF_THREADS); 

//later 
EXEC.invokeAll(tasks); // you can optionally specify a timeout.

回答by Weimin Xiao

MLaw's solution is a very practical Parallel.ForEach. I added a bit modification to make a Parallel.For.

MLaw 的解决方案是一个非常实用的Parallel.ForEach。我添加了一些修改来制作 Parallel.For。

public class Parallel
{
static final int iCPU = Runtime.getRuntime().availableProcessors();

public static <T> void ForEach(Iterable <T> parameters,
                   final LoopBody<T> loopBody)
{
    ExecutorService executor = Executors.newFixedThreadPool(iCPU);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (final T param : parameters)
    {
        Future<?> future = executor.submit(new Runnable()
        {
            public void run() { loopBody.run(param); }
        });

        futures.add(future);
    }

    for (Future<?> f : futures)
    {
        try   { f.get(); }
        catch (InterruptedException e) { } 
        catch (ExecutionException   e) { }         
    }

    executor.shutdown();     
}

public static void For(int start,
                   int stop,
               final LoopBody<Integer> loopBody)
{
    ExecutorService executor = Executors.newFixedThreadPool(iCPU);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (int i=start; i<stop; i++)
    {
        final Integer k = i;
        Future<?> future = executor.submit(new Runnable()
        {
            public void run() { loopBody.run(k); }
        });     
        futures.add(future);
    }

    for (Future<?> f : futures)
    {
        try   { f.get(); }
        catch (InterruptedException e) { } 
        catch (ExecutionException   e) { }         
    }

    executor.shutdown();     
}
}

public interface LoopBody <T>
{
    void run(T i);
}

public class ParallelTest
{
int k;  

public ParallelTest()
{
    k = 0;
    Parallel.For(0, 10, new LoopBody <Integer>()
    {
        public void run(Integer i)
        {
            k += i;
            System.out.println(i);          
        }
    });
    System.out.println("Sum = "+ k);
}

public static void main(String [] argv)
{
    ParallelTest test = new ParallelTest();
}
}

回答by Clippy

There is a equivalent for Parallel.For available as a java extension. It is called Ateji PX, they have a free version you can play with. http://www.ateji.com/px/index.html

Parallel.For 有一个等价物,可用作 java 扩展。它被称为 Ateji PX,他们有一个免费版本你可以玩。http://www.ateji.com/px/index.html

It is the exact equivalent of parallel.for and looks similar to.

它完全等同于parallel.for 并且看起来类似。

For ||

For ||

More examples and explaination on wikipedia: http://en.wikipedia.org/wiki/Ateji_PX

更多关于维基百科的例子和解释:http: //en.wikipedia.org/wiki/Ateji_PX

Closed thing in Java IMO

Java IMO 中的封闭事物

回答by santiwk

Built upon mlaw suggestion, add CountDownLatch. Add chunksize to reduce submit().

根据 mlaw 建议,添加 CountDownLatch。添加块大小以减少提交()。

When tested with 4 million items array, this one gives 5X speed up over sequential for() on my Core i7 2630QM CPU.

当使用 400 万个项目数组进行测试时,在我的 Core i7 2630QM CPU 上,这个数组比顺序 for() 的速度提高了 5 倍。

public class Loop {
    public interface Each {
        void run(int i);
    }

    private static final int CPUs = Runtime.getRuntime().availableProcessors();

    public static void withIndex(int start, int stop, final Each body) {
        int chunksize = (stop - start + CPUs - 1) / CPUs;
        int loops = (stop - start + chunksize - 1) / chunksize;
        ExecutorService executor = Executors.newFixedThreadPool(CPUs);
        final CountDownLatch latch = new CountDownLatch(loops);
        for (int i=start; i<stop;) {
            final int lo = i;
            i += chunksize;
            final int hi = (i<stop) ? i : stop;
            executor.submit(new Runnable() {
                public void run() {
                    for (int i=lo; i<hi; i++)
                        body.run(i);
                    latch.countDown();
                }
            });
        }
        try {
            latch.await();
        } catch (InterruptedException e) {}
        executor.shutdown();
    }

    public static void main(String [] argv) {
        Loop.withIndex(0, 9, new Loop.Each() {
            public void run(int i) {
                System.out.println(i*10);
            }
        });
    }
}

回答by Weimin Xiao

I have an updated Java Parallel class which can do Parallel.For, Parallel.ForEach, Parallel.Tasks, and partitioned parallel loop. Source code is as follows:

我有一个更新的 Java Parallel 类,它可以执行 Parallel.For、Parallel.ForEach、Parallel.Tasks 和分区并行循环。源代码如下:

Examples of using those parallel loops are the following:

使用这些并行循环的示例如下:

public static void main(String [] argv)
{
    //sample data
    final ArrayList<String> ss = new ArrayList<String>();

    String [] s = {"a", "b", "c", "d", "e", "f", "g"};
    for (String z : s) ss.add(z);
    int m = ss.size();

    //parallel-for loop
    System.out.println("Parallel.For loop:");
    Parallel.For(0, m, new LoopBody<Integer>()
    {
        public void run(Integer i)
        {
           System.out.println(i +"\t"+ ss.get(i));   
        }       
    });   

   //parallel for-each loop
   System.out.println("Parallel.ForEach loop:");
   Parallel.ForEach(ss, new LoopBody<String>()
   {
       public void run(String p)
       {
           System.out.println(p);               
       }       
   });

   //partitioned parallel loop
   System.out.println("Partitioned Parallel loop:");
   Parallel.ForEach(Parallel.create(0, m), new LoopBody<Partition>()
   {
       public void run(Partition p)
       {
           for(int i=p.start; i<p.end; i++)
               System.out.println(i +"\t"+ ss.get(i));
       }
   });

   //parallel tasks
   System.out.println("Parallel Tasks:");
   Parallel.Tasks(new Task []
   {
       //task-1
       new Task() {public void run()
       {
           for(int i=0; i<3; i++)
               System.out.println(i +"\t"+ ss.get(i));
       }},

       //task-2
       new Task() {public void run()
       {
           for (int i=3; i<6; i++)
               System.out.println(i +"\t"+ ss.get(i));
       }}   
   });
}

回答by Pablo R. Mier

Here is my contribution to this topic https://github.com/pablormier/parallel-loops. The usage is very simple:

这是我对这个主题的贡献https://github.com/pablormier/parallel-loops。用法很简单:

Collection<String> upperCaseWords = 
    Parallel.ForEach(words, new Parallel.F<String, String>() {
        public String apply(String s) {
            return s.toUpperCase();
        }
    });

It's also possible to change some behaviour aspects, like the number of threads (by default it uses a cached thread pool):

还可以更改某些行为方面,例如线程数(默认情况下它使用缓存线程池):

Collection<String> upperCaseWords = 
            new Parallel.ForEach<String, String>(words)
                .withFixedThreads(4)
                .apply(new Parallel.F<String, String>() {
                    public String apply(String s) {
                        return s.toUpperCase();
                    }
                }).values();

All the code is self-contained in one java classand has no more dependencies than the JDK. I also encourage you to check the new way to parallelizein a functional-style way with Java 8

所有代码都自包含在一个 java 类中,并且没有比 JDK 多的依赖项。我还鼓励您检查使用 Java 8 以函数式方式并行化的新方法

回答by Chris

Synchronization often kills the speedup of parallel for-loops. Therefore, parallel for-loops often need their private data and a reduction mechanism to reduce all threads private data to comprise a single result.

同步通常会扼杀并行 for 循环的加速。因此,并行 for 循环通常需要它们的私有数据和减少机制来减少所有线程私有数据以包含单个结果。

So I've extended the Parallel.For version of Weimin Xiaoby a reduction mechanism.

所以我Weimin Xiao通过减少机制扩展了 Parallel.For 版本。

public class Parallel {
public static interface IntLoopBody {
    void run(int i);
}

public static interface LoopBody<T> {
    void run(T i);
}

public static interface RedDataCreator<T> {
    T run();
}

public static interface RedLoopBody<T> {
    void run(int i, T data);
}

public static interface Reducer<T> {
    void run(T returnData, T addData);
}

private static class ReductionData<T> {
    Future<?> future;
    T data;
}

static final int nCPU = Runtime.getRuntime().availableProcessors();

public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) {
    ExecutorService executor = Executors.newFixedThreadPool(nCPU);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (final T param : parameters) {
        futures.add(executor.submit(() -> loopBody.run(param) ));
    }

    for (Future<?> f : futures) {
        try { 
            f.get();
        } catch (InterruptedException | ExecutionException e) { 
            System.out.println(e); 
        }
    }
    executor.shutdown();     
}

public static void For(int start, int stop, final IntLoopBody loopBody) {
    final int chunkSize = (stop - start + nCPU - 1)/nCPU;
    final int loops = (stop - start + chunkSize - 1)/chunkSize;
    ExecutorService executor = Executors.newFixedThreadPool(loops);
    List<Future<?>> futures  = new LinkedList<Future<?>>();

    for (int i=start; i < stop; ) {
        final int iStart = i;
        i += chunkSize;
        final int iStop = (i < stop) ? i : stop;

        futures.add(executor.submit(() -> {
            for (int j = iStart; j < iStop; j++) 
                loopBody.run(j);
        }));     
    }

    for (Future<?> f : futures) {
        try { 
            f.get();
        } catch (InterruptedException | ExecutionException e) { 
            System.out.println(e); 
        }
    }
    executor.shutdown();     
}

public static <T> void For(int start, int stop, T result, final RedDataCreator<T> creator, final RedLoopBody<T> loopBody, final Reducer<T> reducer) {
    final int chunkSize = (stop - start + nCPU - 1)/nCPU;
    final int loops = (stop - start + chunkSize - 1)/chunkSize;
    ExecutorService executor = Executors.newFixedThreadPool(loops);
    List<ReductionData<T>> redData  = new LinkedList<ReductionData<T>>();

    for (int i = start; i < stop; ) {
        final int iStart = i;
        i += chunkSize;
        final int iStop = (i < stop) ? i : stop;
        final ReductionData<T> rd = new ReductionData<T>();

        rd.data = creator.run();
        rd.future = executor.submit(() -> {
            for (int j = iStart; j < iStop; j++) {
                loopBody.run(j, rd.data);
            }
        });
        redData.add(rd);
    }

    for (ReductionData<T> rd : redData) {
        try { 
            rd.future.get();
            if (rd.data != null) {
                reducer.run(result, rd.data);
            }
        } catch (InterruptedException | ExecutionException e) { 
            e.printStackTrace();
        }
    }
    executor.shutdown();     
}
}

Here is a simple test example: a parallel character counter using a non-synchronized map.

这是一个简单的测试示例:使用非同步映射的并行字符计数器。

import java.util.*;

public class ParallelTest {
static class Counter {
    int cnt;

    Counter() {
        cnt = 1;
    }
}

public static void main(String[] args) {
    String text = "More formally, if this map contains a mapping from a key k to a " + 
            "value v such that key compares equal to k according to the map's ordering, then " +
            "this method returns v; otherwise it returns null.";
    Map<Character, Counter> charCounter1 = new TreeMap<Character, Counter>();
    Map<Character, Counter> charCounter2 = new TreeMap<Character, Counter>();

    // first sequentially
    for(int i=0; i < text.length(); i++) {
        char c = text.charAt(i);
        Counter cnt = charCounter1.get(c);
        if (cnt == null) {
            charCounter1.put(c, new Counter());
        } else {
            cnt.cnt++;
        }
    }
    for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
        System.out.println(entry.getKey() + ": " + entry.getValue().cnt);
    }

    // now parallel without synchronization
    Parallel.For(0, text.length(), charCounter2,
        // Creator
        () -> new TreeMap<Character, Counter>(), 
        // Loop Body
        (i, map) -> {
            char c = text.charAt(i);
            Counter cnt = map.get(c);
            if (cnt == null) {
                map.put(c, new Counter());
            } else {
                cnt.cnt++;
            }
        }, 
        // Reducer
        (result, map) -> {
            for(Map.Entry<Character, Counter> entry: map.entrySet()) {
                Counter cntR = result.get(entry.getKey());
                if (cntR == null) {
                    result.put(entry.getKey(), entry.getValue());
                } else {
                    cntR.cnt += entry.getValue().cnt;
                }
            }
        }
    );

    // compare results
    assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size();
    Iterator<Map.Entry<Character, Counter>> it2 = charCounter2.entrySet().iterator();
    for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
        Map.Entry<Character, Counter> entry2 = it2.next();
        assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content";
    }

    System.out.println("Well done!");
}
}

回答by Crammeur

This is what I use for Java 7 and less.

这是我在 Java 7 及以下版本中使用的。

For Java 8 you can use forEach()

对于 Java 8,您可以使用forEach()

[UPDATE ]

[更新 ]

Parallel class :

平行类:

private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();
private static final int MAX_THREAD = NUM_CORES*2;  

public static <T2 extends T, T> void For(final Iterable<T2> elements, final Operation<T> operation) {
    if (elements != null) {
        final Iterator<T2> iterator = elements.iterator();
        if (iterator.hasNext()) {
            final Throwable[] throwable = new Throwable[1];
            final Callable<Void> callable = new Callable<Void>() {
                boolean first = true;
                @Override
                public final Void call() throws Exception {
                    if ((first || operation.follow()) && iterator.hasNext()) {
                        T result;
                        result = iterator.next();
                        operation.perform(result);
                        if (first) {
                            synchronized (this) {
                                first = false;
                            }
                        }
                    }
                    return null;
                }
            };
            final Runnable runnable = new Runnable() {
                @Override
                public final void run() {
                    while (iterator.hasNext()) {
                        try {
                            synchronized (callable) {
                                callable.call();
                            }
                            if (!operation.follow()) {
                                break;
                            }
                        } catch (Throwable t) {
                            t.printStackTrace();
                            synchronized (throwable) {
                                throwable[0] = t;
                            }
                            throw new RuntimeException(t);
                        }
                    }
                }
            };
            final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD);
            for (int threadIndex=0; threadIndex<MAX_THREAD && iterator.hasNext(); threadIndex++) {
                executor.execute(runnable);
            }
            executor.shutdown();
            while (!executor.isTerminated()) {
                try {
                    Thread.sleep(0,1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
            if (throwable[0] != null) throw new RuntimeException(throwable[0]);
        }
    }
}

public interface Operation<T> {
    void perform(T pParameter);
    boolean follow();
}

Example

例子

@Test
public void test() {
    List<Long> longList = new ArrayList<Long>();
    for (long i = 0; i < 1000000; i++) {
        longList.add(i);
    }
    final List<Integer> integerList = new LinkedList<>();
    Parallel.For((Iterable<? extends Number>) longList, new Parallel.Operation<Number>() {

        @Override
        public void perform(Number pParameter) {
            System.out.println(pParameter);
            integerList.add(pParameter.intValue());
        }

        @Override
        public boolean follow() {
            return true;
        }
    });
    for (Number num : integerList) {
        System.out.println(num);
    }
}

Monitoring

监控

javaparallelmultithreading

java并行多线程