Java的“Parallel.For”?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/4010185/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
"Parallel.For" for Java?
提问by Jamie
I was wondering if there is a Parallel.Forequivalent to the .net version for Java?
我想知道是否有Parallel.For相当于 Java 的 .net 版本?
If there is could someone please supply an example? thanks!
如果有人可以提供一个例子吗?谢谢!
采纳答案by Matt Crinklaw-Vogt
I guess the closest thing would be:
我想最接近的事情是:
ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);
try {
for (final Object o : list) {
exec.submit(new Runnable() {
@Override
public void run() {
// do stuff with o.
}
});
}
} finally {
exec.shutdown();
}
Based on TheLQ's comments, you would set SUM_NUM_THREADS to Runtime.getRuntime().availableProcessors();
根据 TheLQ 的评论,您可以将 SUM_NUM_THREADS 设置为 Runtime.getRuntime().availableProcessors();
Edit: Decided to add a basic "Parallel.For" implementation
编辑:决定添加一个基本的“Parallel.For”实现
public class Parallel {
private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();
private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For"));
public static <T> void For(final Iterable<T> elements, final Operation<T> operation) {
try {
// invokeAll blocks for us until all submitted tasks in the call complete
forPool.invokeAll(createCallables(elements, operation));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) {
List<Callable<Void>> callables = new LinkedList<Callable<Void>>();
for (final T elem : elements) {
callables.add(new Callable<Void>() {
@Override
public Void call() {
operation.perform(elem);
return null;
}
});
}
return callables;
}
public static interface Operation<T> {
public void perform(T pParameter);
}
}
Example Usage of Parallel.For
Parallel.For 的示例用法
// Collection of items to process in parallel
Collection<Integer> elems = new LinkedList<Integer>();
for (int i = 0; i < 40; ++i) {
elems.add(i);
}
Parallel.For(elems,
// The operation to perform with each item
new Parallel.Operation<Integer>() {
public void perform(Integer param) {
System.out.println(param);
};
});
I guess this implementation is really more similar to Parallel.ForEach
我猜这个实现真的更类似于Parallel.ForEach
EditI put this up on GitHub if anyone is interested. Parallel For on GitHub
编辑如果有人感兴趣,我把它放在 GitHub 上。GitHub 上的 Parallel For
回答by Emil
Fork join framework in Java 7is for concurrency support. But I don't know about an exact equivalent for Parallel.For
.
Java 7 中的 Fork join 框架用于并发支持。但我不知道Parallel.For
.
回答by Peter Lawrey
A simpler option would be
一个更简单的选择是
// A thread pool which runs for the life of the application.
private static final ExecutorService EXEC =
Executors.newFixedThreadPool(SOME_NUM_OF_THREADS);
//later
EXEC.invokeAll(tasks); // you can optionally specify a timeout.
回答by Weimin Xiao
MLaw's solution is a very practical Parallel.ForEach. I added a bit modification to make a Parallel.For.
MLaw 的解决方案是一个非常实用的Parallel.ForEach。我添加了一些修改来制作 Parallel.For。
public class Parallel
{
static final int iCPU = Runtime.getRuntime().availableProcessors();
public static <T> void ForEach(Iterable <T> parameters,
final LoopBody<T> loopBody)
{
ExecutorService executor = Executors.newFixedThreadPool(iCPU);
List<Future<?>> futures = new LinkedList<Future<?>>();
for (final T param : parameters)
{
Future<?> future = executor.submit(new Runnable()
{
public void run() { loopBody.run(param); }
});
futures.add(future);
}
for (Future<?> f : futures)
{
try { f.get(); }
catch (InterruptedException e) { }
catch (ExecutionException e) { }
}
executor.shutdown();
}
public static void For(int start,
int stop,
final LoopBody<Integer> loopBody)
{
ExecutorService executor = Executors.newFixedThreadPool(iCPU);
List<Future<?>> futures = new LinkedList<Future<?>>();
for (int i=start; i<stop; i++)
{
final Integer k = i;
Future<?> future = executor.submit(new Runnable()
{
public void run() { loopBody.run(k); }
});
futures.add(future);
}
for (Future<?> f : futures)
{
try { f.get(); }
catch (InterruptedException e) { }
catch (ExecutionException e) { }
}
executor.shutdown();
}
}
public interface LoopBody <T>
{
void run(T i);
}
public class ParallelTest
{
int k;
public ParallelTest()
{
k = 0;
Parallel.For(0, 10, new LoopBody <Integer>()
{
public void run(Integer i)
{
k += i;
System.out.println(i);
}
});
System.out.println("Sum = "+ k);
}
public static void main(String [] argv)
{
ParallelTest test = new ParallelTest();
}
}
回答by Clippy
There is a equivalent for Parallel.For available as a java extension. It is called Ateji PX, they have a free version you can play with. http://www.ateji.com/px/index.html
Parallel.For 有一个等价物,可用作 java 扩展。它被称为 Ateji PX,他们有一个免费版本你可以玩。http://www.ateji.com/px/index.html
It is the exact equivalent of parallel.for and looks similar to.
它完全等同于parallel.for 并且看起来类似。
For ||
For ||
More examples and explaination on wikipedia: http://en.wikipedia.org/wiki/Ateji_PX
更多关于维基百科的例子和解释:http: //en.wikipedia.org/wiki/Ateji_PX
Closed thing in Java IMO
Java IMO 中的封闭事物
回答by santiwk
Built upon mlaw suggestion, add CountDownLatch. Add chunksize to reduce submit().
根据 mlaw 建议,添加 CountDownLatch。添加块大小以减少提交()。
When tested with 4 million items array, this one gives 5X speed up over sequential for() on my Core i7 2630QM CPU.
当使用 400 万个项目数组进行测试时,在我的 Core i7 2630QM CPU 上,这个数组比顺序 for() 的速度提高了 5 倍。
public class Loop {
public interface Each {
void run(int i);
}
private static final int CPUs = Runtime.getRuntime().availableProcessors();
public static void withIndex(int start, int stop, final Each body) {
int chunksize = (stop - start + CPUs - 1) / CPUs;
int loops = (stop - start + chunksize - 1) / chunksize;
ExecutorService executor = Executors.newFixedThreadPool(CPUs);
final CountDownLatch latch = new CountDownLatch(loops);
for (int i=start; i<stop;) {
final int lo = i;
i += chunksize;
final int hi = (i<stop) ? i : stop;
executor.submit(new Runnable() {
public void run() {
for (int i=lo; i<hi; i++)
body.run(i);
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {}
executor.shutdown();
}
public static void main(String [] argv) {
Loop.withIndex(0, 9, new Loop.Each() {
public void run(int i) {
System.out.println(i*10);
}
});
}
}
回答by Weimin Xiao
I have an updated Java Parallel class which can do Parallel.For, Parallel.ForEach, Parallel.Tasks, and partitioned parallel loop. Source code is as follows:
我有一个更新的 Java Parallel 类,它可以执行 Parallel.For、Parallel.ForEach、Parallel.Tasks 和分区并行循环。源代码如下:
Examples of using those parallel loops are the following:
使用这些并行循环的示例如下:
public static void main(String [] argv)
{
//sample data
final ArrayList<String> ss = new ArrayList<String>();
String [] s = {"a", "b", "c", "d", "e", "f", "g"};
for (String z : s) ss.add(z);
int m = ss.size();
//parallel-for loop
System.out.println("Parallel.For loop:");
Parallel.For(0, m, new LoopBody<Integer>()
{
public void run(Integer i)
{
System.out.println(i +"\t"+ ss.get(i));
}
});
//parallel for-each loop
System.out.println("Parallel.ForEach loop:");
Parallel.ForEach(ss, new LoopBody<String>()
{
public void run(String p)
{
System.out.println(p);
}
});
//partitioned parallel loop
System.out.println("Partitioned Parallel loop:");
Parallel.ForEach(Parallel.create(0, m), new LoopBody<Partition>()
{
public void run(Partition p)
{
for(int i=p.start; i<p.end; i++)
System.out.println(i +"\t"+ ss.get(i));
}
});
//parallel tasks
System.out.println("Parallel Tasks:");
Parallel.Tasks(new Task []
{
//task-1
new Task() {public void run()
{
for(int i=0; i<3; i++)
System.out.println(i +"\t"+ ss.get(i));
}},
//task-2
new Task() {public void run()
{
for (int i=3; i<6; i++)
System.out.println(i +"\t"+ ss.get(i));
}}
});
}
回答by Pablo R. Mier
Here is my contribution to this topic https://github.com/pablormier/parallel-loops. The usage is very simple:
这是我对这个主题的贡献https://github.com/pablormier/parallel-loops。用法很简单:
Collection<String> upperCaseWords =
Parallel.ForEach(words, new Parallel.F<String, String>() {
public String apply(String s) {
return s.toUpperCase();
}
});
It's also possible to change some behaviour aspects, like the number of threads (by default it uses a cached thread pool):
还可以更改某些行为方面,例如线程数(默认情况下它使用缓存线程池):
Collection<String> upperCaseWords =
new Parallel.ForEach<String, String>(words)
.withFixedThreads(4)
.apply(new Parallel.F<String, String>() {
public String apply(String s) {
return s.toUpperCase();
}
}).values();
All the code is self-contained in one java classand has no more dependencies than the JDK. I also encourage you to check the new way to parallelizein a functional-style way with Java 8
所有代码都自包含在一个 java 类中,并且没有比 JDK 多的依赖项。我还鼓励您检查使用 Java 8 以函数式方式并行化的新方法
回答by Chris
Synchronization often kills the speedup of parallel for-loops. Therefore, parallel for-loops often need their private data and a reduction mechanism to reduce all threads private data to comprise a single result.
同步通常会扼杀并行 for 循环的加速。因此,并行 for 循环通常需要它们的私有数据和减少机制来减少所有线程私有数据以包含单个结果。
So I've extended the Parallel.For version of Weimin Xiao
by a reduction mechanism.
所以我Weimin Xiao
通过减少机制扩展了 Parallel.For 版本。
public class Parallel {
public static interface IntLoopBody {
void run(int i);
}
public static interface LoopBody<T> {
void run(T i);
}
public static interface RedDataCreator<T> {
T run();
}
public static interface RedLoopBody<T> {
void run(int i, T data);
}
public static interface Reducer<T> {
void run(T returnData, T addData);
}
private static class ReductionData<T> {
Future<?> future;
T data;
}
static final int nCPU = Runtime.getRuntime().availableProcessors();
public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) {
ExecutorService executor = Executors.newFixedThreadPool(nCPU);
List<Future<?>> futures = new LinkedList<Future<?>>();
for (final T param : parameters) {
futures.add(executor.submit(() -> loopBody.run(param) ));
}
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println(e);
}
}
executor.shutdown();
}
public static void For(int start, int stop, final IntLoopBody loopBody) {
final int chunkSize = (stop - start + nCPU - 1)/nCPU;
final int loops = (stop - start + chunkSize - 1)/chunkSize;
ExecutorService executor = Executors.newFixedThreadPool(loops);
List<Future<?>> futures = new LinkedList<Future<?>>();
for (int i=start; i < stop; ) {
final int iStart = i;
i += chunkSize;
final int iStop = (i < stop) ? i : stop;
futures.add(executor.submit(() -> {
for (int j = iStart; j < iStop; j++)
loopBody.run(j);
}));
}
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println(e);
}
}
executor.shutdown();
}
public static <T> void For(int start, int stop, T result, final RedDataCreator<T> creator, final RedLoopBody<T> loopBody, final Reducer<T> reducer) {
final int chunkSize = (stop - start + nCPU - 1)/nCPU;
final int loops = (stop - start + chunkSize - 1)/chunkSize;
ExecutorService executor = Executors.newFixedThreadPool(loops);
List<ReductionData<T>> redData = new LinkedList<ReductionData<T>>();
for (int i = start; i < stop; ) {
final int iStart = i;
i += chunkSize;
final int iStop = (i < stop) ? i : stop;
final ReductionData<T> rd = new ReductionData<T>();
rd.data = creator.run();
rd.future = executor.submit(() -> {
for (int j = iStart; j < iStop; j++) {
loopBody.run(j, rd.data);
}
});
redData.add(rd);
}
for (ReductionData<T> rd : redData) {
try {
rd.future.get();
if (rd.data != null) {
reducer.run(result, rd.data);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
}
}
Here is a simple test example: a parallel character counter using a non-synchronized map.
这是一个简单的测试示例:使用非同步映射的并行字符计数器。
import java.util.*;
public class ParallelTest {
static class Counter {
int cnt;
Counter() {
cnt = 1;
}
}
public static void main(String[] args) {
String text = "More formally, if this map contains a mapping from a key k to a " +
"value v such that key compares equal to k according to the map's ordering, then " +
"this method returns v; otherwise it returns null.";
Map<Character, Counter> charCounter1 = new TreeMap<Character, Counter>();
Map<Character, Counter> charCounter2 = new TreeMap<Character, Counter>();
// first sequentially
for(int i=0; i < text.length(); i++) {
char c = text.charAt(i);
Counter cnt = charCounter1.get(c);
if (cnt == null) {
charCounter1.put(c, new Counter());
} else {
cnt.cnt++;
}
}
for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue().cnt);
}
// now parallel without synchronization
Parallel.For(0, text.length(), charCounter2,
// Creator
() -> new TreeMap<Character, Counter>(),
// Loop Body
(i, map) -> {
char c = text.charAt(i);
Counter cnt = map.get(c);
if (cnt == null) {
map.put(c, new Counter());
} else {
cnt.cnt++;
}
},
// Reducer
(result, map) -> {
for(Map.Entry<Character, Counter> entry: map.entrySet()) {
Counter cntR = result.get(entry.getKey());
if (cntR == null) {
result.put(entry.getKey(), entry.getValue());
} else {
cntR.cnt += entry.getValue().cnt;
}
}
}
);
// compare results
assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size();
Iterator<Map.Entry<Character, Counter>> it2 = charCounter2.entrySet().iterator();
for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) {
Map.Entry<Character, Counter> entry2 = it2.next();
assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content";
}
System.out.println("Well done!");
}
}
回答by Crammeur
This is what I use for Java 7 and less.
这是我在 Java 7 及以下版本中使用的。
For Java 8 you can use forEach()
对于 Java 8,您可以使用forEach()
[UPDATE ]
[更新 ]
Parallel class :
平行类:
private static final int NUM_CORES = Runtime.getRuntime().availableProcessors();
private static final int MAX_THREAD = NUM_CORES*2;
public static <T2 extends T, T> void For(final Iterable<T2> elements, final Operation<T> operation) {
if (elements != null) {
final Iterator<T2> iterator = elements.iterator();
if (iterator.hasNext()) {
final Throwable[] throwable = new Throwable[1];
final Callable<Void> callable = new Callable<Void>() {
boolean first = true;
@Override
public final Void call() throws Exception {
if ((first || operation.follow()) && iterator.hasNext()) {
T result;
result = iterator.next();
operation.perform(result);
if (first) {
synchronized (this) {
first = false;
}
}
}
return null;
}
};
final Runnable runnable = new Runnable() {
@Override
public final void run() {
while (iterator.hasNext()) {
try {
synchronized (callable) {
callable.call();
}
if (!operation.follow()) {
break;
}
} catch (Throwable t) {
t.printStackTrace();
synchronized (throwable) {
throwable[0] = t;
}
throw new RuntimeException(t);
}
}
}
};
final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD);
for (int threadIndex=0; threadIndex<MAX_THREAD && iterator.hasNext(); threadIndex++) {
executor.execute(runnable);
}
executor.shutdown();
while (!executor.isTerminated()) {
try {
Thread.sleep(0,1);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if (throwable[0] != null) throw new RuntimeException(throwable[0]);
}
}
}
public interface Operation<T> {
void perform(T pParameter);
boolean follow();
}
Example
例子
@Test
public void test() {
List<Long> longList = new ArrayList<Long>();
for (long i = 0; i < 1000000; i++) {
longList.add(i);
}
final List<Integer> integerList = new LinkedList<>();
Parallel.For((Iterable<? extends Number>) longList, new Parallel.Operation<Number>() {
@Override
public void perform(Number pParameter) {
System.out.println(pParameter);
integerList.add(pParameter.intValue());
}
@Override
public boolean follow() {
return true;
}
});
for (Number num : integerList) {
System.out.println(num);
}
}