Java并行流
在本教程中,我们将看到Java中的并行流。
Java并行流介绍
Java 8介绍了并行流的概念来进行并行处理。
由于我们现在拥有更多廉价的硬件成本,因此,并行处理可用于更快地执行操作。
让我们看个简单的例子
package org.igi.theitroad.java8;
import java.util.Arrays;
import java.util.stream.IntStream;
public class Java8ParallelStreamMain {
public static void main(String[] args) {
System.out.println("=================================");
System.out.println("Using Sequential Stream");
System.out.println("=================================");
int[] array= {1,2,3,4,5,6,7,8,9,10};
IntStream intArrStream=Arrays.stream(array);
intArrStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
}
);
System.out.println("=================================");
System.out.println("Using Parallel Stream");
System.out.println("=================================");
IntStream intParallelStream=Arrays.stream(array).parallel();
intParallelStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
}
);
}
}
运行上面的程序时,我们将得到以下输出
================================= Using Sequential Stream ================================= 1 main 2 main 3 main 4 main 5 main 6 main 7 main 8 main 9 main 10 main ================================= Using Parallel Stream ================================= 7 main 6 ForkJoinPool.commonPool-worker-3 3 ForkJoinPool.commonPool-worker-1 9 ForkJoinPool.commonPool-worker-2 2 ForkJoinPool.commonPool-worker-3 5 ForkJoinPool.commonPool-worker-1 10 ForkJoinPool.commonPool-worker-2 1 ForkJoinPool.commonPool-worker-3 8 ForkJoinPool.commonPool-worker-2 4 ForkJoinPool.commonPool-worker-1
如果我们注意到输出,则主线程在顺序流时执行所有工作。
它等待当前的迭代完成,然后在下一次迭代时进行工作。
在并行流的情况下,将同时生成4个线程,并使用Fork并加入池在内部生成和管理线程.Parallel Streams Create ForkJoinPool实例通过静态 ForkJoinPool.commonPool()方法。
并行流占据所有可用的好处 CPU cores并并行处理任务。
如果任务数超出核心的数量,则剩余任务等待当前运行的任务完成。
并行流很酷,所以你应该总是用吗?
不是,只需添加即可轻松地将顺序流转换为并行流。
并行,并不意味着我们应该始终使用它。
使用并行流时需要考虑许多因素,否则我们将遭受并行流的负面影响。
并行流的开销比顺序流更高,并且在线程之间坐标需要良好的时间。
如果且仅当才能考虑并行流,则只需:
- 我们有大型数据集要处理。
- 如我们所知,Java使用
ForkJoinPool要实现并行性,ForkjoInpool叉源流并提交执行,因此源流应分类。
例如:ArrayList非常易于拆分,因为我们可以通过其索引找到一个中间元素并将其拆分,但LinkedList非常困难拆分,并且在大多数情况下都不会很好地表现得很好。 - 我们实际上患有性能问题。
- 我们需要确保线程之间的所有共享资源都需要正确同步,否则可能会产生意外结果。
衡量并行性的最简单公式是Brian Goetz在他的演示文稿中提供的"NQ"模型。
NQ模型:
n x q> 10000
其中,n =数据集中的项目数q =每个项目的工作量
这意味着如果我们有大量数据集和每个项目的工作(例如:和),则并行性可能会更快运行程序,反之亦然也是如此。
因此,如果数据集数量少以及每件商品的更多工作(做一些计算工作),那么也是如此 parallelism可能会更快地实现结果。
让我们在另一个例子的帮助下看看。
在此示例中,我们将在并行流和顺序流的情况下执行长度计算时,我们将如何表现为多个计算。
我们正在进行一些arbit计算以使CPU忙。
package org.igi.theitroad.java8;
import java.util.ArrayList;
import java.util.List;
public class PerformanceComparisonMain {
public static void main(String[] args) {
long currentTime=System.currentTimeMillis();
List<Integer> data=new ArrayList<Integer>();
for (int i = 0; i < 100000; i++) {
data.add(i);
}
long sum=data.stream()
.map(i ->(int)Math.sqrt(i))
.map(number->performComputation(number))
.reduce(0,Integer::sum);
System.out.println(sum);
long endTime=System.currentTimeMillis();
System.out.println("Time taken to complete:"+(endTime-currentTime)/(1000*60)+" minutes");
}
public static int performComputation(int number)
{
int sum=0;
for (int i = 1; i < 1000000; i++) {
int div=(number/i);
sum+=div;
}
return sum;
}
}
但我们对此处的输出不感兴趣,但在执行上述操作时CPU如何表现。
正如我们在顺序流的情况下,可以看到CPU未充分利用。
让我们在16行没有改变。
并使流并行并再次运行程序。
long sum=data.stream() .parallel() .map(i ->(int)Math.sqrt(i)) .map(number->performComputation(number)) .reduce(0,Integer::sum);
我们使用并行流运行程序时检查CPU历史记录。
如我们所见,并行流使用所有4个CPU核心以执行计算。
并行流中的自定义线程池
默认使用并行流 ForkJoinPool.commonPool它具有比处理器数量更少的线程。
这意味着并行流使用所有可用的处理器,因为它也使用主线程。
如果我们使用多个并行流,然后它们将共享相同 ForkJoinPool.commonPool这意味着我们可能无法使用分配给每个并行流的所有处理器。
要解决此问题,我们可以在处理流时创建自己的线程池。
ForkJoinPool fjp = new ForkJoinPool(parallelism);
这将是创造的 ForkJoinPool与目标 parallelism等级。
如果我们不通过并行性,则默认情况下它将等于处理器的数量。
现在,我们可以将并行流提交到此自定义forkjoinpool。
ForkJoinPool fjp1 = new ForkJoinPool(5);
Callable<Integer> callable1 = () -> data.parallelStream()
.map(i -> (int) Math.sqrt(i))
.map(number -> performComputation(number))
.peek( (i) -> {
System.out.println("Processing with "+Thread.currentThread()+" "+ i);
})
.reduce(0, Integer::sum);
try {
sumFJ1 = fjp1.submit(callable1).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
例子如下:
package org.igi.theitroad;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
public class PerformanceComparisonMain {
public static void main(String[] args) {
List<Integer> data = new ArrayList<Integer>();
for (int i = 0; i < 10; i++) {
data.add(i);
}
System.out.println("================");
System.out.println("Parallel stream 1");
System.out.println("================");
long sum1 =data.parallelStream()
.map(i -> (int) Math.sqrt(i))
.map(number -> performComputation(number))
.peek( (i) -> {
System.out.println("Processing with "+Thread.currentThread()+" "+ i);
})
.reduce(0, Integer::sum);
System.out.println("Sum: "+sum1);
System.out.println("================");
System.out.println("Parallel stream 2");
System.out.println("================");
long sum2 = data.parallelStream()
.map(i -> ((int) Math.sqrt(i)*10))
.map(number -> performComputation(number))
.peek( (i) -> {
System.out.println("Processing with "+Thread.currentThread()+" "+ i);
})
.reduce(0, Integer::sum);
System.out.println("Sum: "+sum2);
System.out.println("================");
System.out.println("Parallel stream with custom thread pool 1");
System.out.println("================");
ForkJoinPool fjp1 = new ForkJoinPool(5);
long sumFJ1 = 0;
Callable<Integer> callable1 = () -> data.parallelStream()
.map(i -> (int) Math.sqrt(i))
.map(number -> performComputation(number))
.peek( (i) -> {
System.out.println("Processing with "+Thread.currentThread()+" "+ i);
})
.reduce(0, Integer::sum);
try {
sumFJ1 = fjp1.submit(callable1).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("Sum: "+sumFJ1);
System.out.println("================");
System.out.println("Parallel stream with custom thread pool 2");
System.out.println("================");
Callable<Integer> callable2 = () -> data.parallelStream()
.map(i -> (int) Math.sqrt(i)*10)
.map(number -> performComputation(number))
.peek( (i) -> {
System.out.println("Processing with "+Thread.currentThread()+" "+ i);
})
.reduce(0, Integer::sum);
long sumFJ2 = 0;
ForkJoinPool fjp2 = new ForkJoinPool(4);
try {
sumFJ2 = fjp2.submit(callable2).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("Sum: "+sumFJ2);
}
public static int performComputation(int number) {
int sum = 0;
for (int i = 1; i < 100000; i++) {
int div = (number/i);
sum += div;
}
return sum;
}
}
如我们所见,前两个并行流正在使用 ForkJoinPool.commonPool和接下来的2是使用自定义线程池,例如: ForkJoinPool-1和 ForkJoinPool-2
使用并行流时,我们应该记住的事情
有状态的lambda表达
我们应该避免使用Stream操作中的状态Lambda表达式。
一个状态lambda表达式是一个输出取决于在执行流操作期间可能会改变的任何状态。
package org.igi.theitroad;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class ListOfIntegersStatefulLambda {
public static void main(String[] args) {
List<Integer> listOfIntegers = Arrays.asList(new Integer[] {40,34,21,37,20});
List<Integer> syncList = Collections.synchronizedList(new ArrayList<>());
listOfIntegers.parallelStream()
//You shou! It uses a stateful lambda expression.
.map(e -> {
syncList.add(e);
return e;
})
.forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
syncList.stream().forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
}
}
.map(e -> {syncList.add(e); return e;})是有状态的lambda和订单 .map(e -> {syncList.add(e); return e;})添加元素到 syncList可以不同,因此我们不应该在使用并行流时使用状态lambda操作。
干涉
在流操作lambda表达式不应修改流的源。
以下代码尝试将元素添加到整数列表并抛出并发映射异常。
package org.igi.theitroad;
import java.util.ArrayList;
import java.util.List;
public class ListOfIntegersStatefulLambda {
public static void main(String[] args) {
List<Integer> listOfIntegers = new ArrayList<>();
Integer[] intArray =new Integer[] {40,34,21,37,20};
for(Integer in:intArray)
{
listOfIntegers.add(in);
}
listOfIntegers.parallelStream()
.peek( i -> listOfIntegers.add(7))
.forEach(e -> System.out.print(e + " "));
System.out.println("");
}
}

