Java 8 并行 forEach 进度指示

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/26805966/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 10:39:13  来源:igfitidea点击:

Java 8 parallel forEach progress indication

javaconcurrencylambdajavafx

提问by Michael Stauffer

For performance reason I would like to use a forEach loop of a parallel Lambda stream in order to process an instance of a Collectionin Java. As this runs in a background ServiceI would like to use the updateProgress(double,double)method in order to inform the user about the current progress.

出于性能原因,我想使用并行 Lambda 流的 forEach 循环来处理CollectionJava 中的 a 实例。由于它在后台运行,Service我想使用该updateProgress(double,double)方法来通知用户当前的进度。

In order to indicate the current progress I need a certain progress indicator in form of a Integercounter. However, this is not possible as I can only access finalvariables within the Lambda expression.

为了指示当前进度,我需要一个Integer计数器形式的特定进度指示器。但是,这是不可能的,因为我只能访问finalLambda 表达式中的变量。

Code example see below, Collectionis only a place holder for any possible instance of a Collection:

代码示例见下文,Collection只是 a 的任何可能实例的占位符Collection

int progress = 0;
Collection.parallelStream().forEach(signer -> {
   progress++;
   updateProgress(progress, Collection.size());     
});

I'm aware that I can solve this problem by using a simple for-loop. However, for performance reason it would nice to solve it in this way.

我知道我可以通过使用一个简单的 for 循环来解决这个问题。但是,出于性能原因,以这种方式解决它会很好。

Does anybody know a more or less neat solution to this?

有没有人知道一个或多或少的巧妙解决方案?

回答by Jens-Peter Haack

As proposed by markspace, using an AtomicInteger is a good solution:

正如 markspace 所提议的,使用 AtomicInteger 是一个很好的解决方案:

AtomicInteger progress = new AtomicInteger();
Collection.parallelStream().forEach(signer -> {
    progress.incrementAndGet();
    // do some other useful work
});

I would not use the runLater() variant as your goal is a high performance, and if many parallel threads will generte JavaFX 'runLater' tasks, you will again create a bottleneck...

我不会使用 runLater() 变体,因为您的目标是高性能,如果许多并行线程将生成 JavaFX 'runLater' 任务,您将再次创建瓶颈...

For the same reason I would NOT call an update to the ProgressBar each time, but use a seaparte JavaFX Timeline to update the progress bar in regular intervals independently from the processing threads.

出于同样的原因,我不会每次都调用 ProgressBar 的更新,而是使用 seaparte JavaFX Timeline 独立于处理线程定期更新进度条。

Here is a full code comparing sequential and parallel processing with ProgressBar. If you remove the sleep(1) and set the number of items to 10 million it will still work concurrently and efficiently...

这是比较顺序和并行处理与 ProgressBar 的完整代码。如果您删除 sleep(1) 并将项目数设置为 1000 万,它仍将同时有效地工作......

public class ParallelProgress extends Application {
static class ParallelProgressBar extends ProgressBar {
    AtomicInteger myDoneCount = new AtomicInteger();
    int           myTotalCount;
    Timeline      myWhatcher = new Timeline(new KeyFrame(Duration.millis(10), e -> update()));

    public void update() {
        setProgress(1.0*myDoneCount.get()/myTotalCount);
        if (myDoneCount.get() >= myTotalCount) {
            myWhatcher.stop();
            myTotalCount = 0;
        }
    }

    public boolean isRunning() { return myTotalCount > 0; }

    public void start(int totalCount) {
        myDoneCount.set(0);
        myTotalCount = totalCount;
        setProgress(0.0);
        myWhatcher.setCycleCount(Timeline.INDEFINITE);
        myWhatcher.play();
    }

    public void add(int n) {
        myDoneCount.addAndGet(n);
    }
}

HBox testParallel(HBox box) {
    ArrayList<String> myTexts = new ArrayList<String>();

    for (int i = 1; i < 10000; i++) {
        myTexts.add("At "+System.nanoTime()+" ns");
    }

    Button runp = new Button("parallel");
    Button runs = new Button("sequential");
    ParallelProgressBar progress = new ParallelProgressBar();

    Label result = new Label("-");

    runp.setOnAction(e -> {
        if (progress.isRunning()) return;
        result.setText("...");
        progress.start(myTexts.size());

        new Thread() {
            public void run() {
                long ms = System.currentTimeMillis();
                myTexts.parallelStream().forEach(text -> {
                    progress.add(1);
                    try { Thread.sleep(1);} catch (Exception e1) { }
                });
                Platform.runLater(() -> result.setText(""+(System.currentTimeMillis()-ms)+" ms"));
            }
        }.start();
    });

    runs.setOnAction(e -> {
        if (progress.isRunning()) return;
        result.setText("...");
        progress.start(myTexts.size());
        new Thread() {
            public void run() {
                final long ms = System.currentTimeMillis();
                myTexts.forEach(text -> {
                    progress.add(1);
                    try { Thread.sleep(1);} catch (Exception e1) { }
                });
                Platform.runLater(() -> result.setText(""+(System.currentTimeMillis()-ms)+" ms"));
            }
        }.start();
    });

    box.getChildren().addAll(runp, runs, progress, result);
    return box;
}


@Override
public void start(Stage primaryStage) throws Exception {        
    primaryStage.setTitle("ProgressBar's");

    HBox box = new HBox();
    Scene scene = new Scene(box,400,80,Color.WHITE);
    primaryStage.setScene(scene);

    testParallel(box);

    primaryStage.show();   
}

public static void main(String[] args) { launch(args); }
}

回答by Tomas Mikula

The naive solution would be to have progress as a field of some surrounding object; then referring to progressfrom a lambda closure would actually mean this.progress, where thisis final, thus the compiler would not complain. However, the resulting code would access the progressfield from multiple threads concurrently, which could cause race conditions. I suggest restricting access to the progressfield to the JavaFX application thread, by using Platform.runLater. The whole solution then looks like this:

天真的解决方案是将进展作为一些周围物体的场;那么progress从 lambda 闭包中引用实际上意味着this.progress, where thisis final,因此编译器不会抱怨。但是,生成的代码将progress同时从多个线程访问该字段,这可能会导致竞争条件。我建议progress使用Platform.runLater. 整个解决方案如下所示:

// accessed only on JavaFX application thread
private int progress = 0;

// invoked only on the JavaFX application thread
private void increaseProgress() {
    progress++;
    updateProgress(progress, collection.size());
}

private void processCollection() {
    collection.parallelStream().forEach(signer -> {
        // do the work (on any thread)
        // ...

        // when done, update progress on the JavaFX thread
        Platfrom.runLater(this::increaseProgress);
    });
}