从 Java 线程返回值

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/2314402/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 06:01:44  来源:igfitidea点击:

Return values from Java Threads

javamultithreadingconcurrency

提问by Langali

I have a Java Thread like the following:

我有一个如下所示的 Java 线程:

   public class MyThread extends Thread {
        MyService service;
        String id;
        public MyThread(String id) {
            this.id = node;
        }
        public void run() {
            User user = service.getUser(id)
        }
    }

I have about 300 ids, and every couple of seconds - I fire up threads to make a call for each of the id. Eg.

我有大约 300 个 id,每隔几秒钟 - 我启动线程来调用每个 id。例如。

for(String id: ids) {
    MyThread thread = new MyThread(id);
    thread.start();
}

Now, I would like to collect the results from each threads, and do a batch insert to the database, instead of making 300 database inserts every 2 seconds.

现在,我想从每个线程收集结果,并对数据库进行批量插入,而不是每 2 秒进行 300 次数据库插入。

Any idea how I can accomplish this?

知道我怎么能做到这一点吗?

采纳答案by erickson

If you want to collect all of the results before doing the database update, you can use the invokeAllmethod. This takes care of the bookkeeping that would be required if you submit tasks one at a time, like davebsuggests.

如果要在进行数据库更新之前收集所有结果,可以使用该invokeAll方法。如果您像daveb建议的那样一次提交一个任务,这将负责记账。

private static final ExecutorService workers = Executors.newCachedThreadPool();

...

Collection<Callable<User>> tasks = new ArrayList<Callable<User>>();
for (final String id : ids) {
  tasks.add(new Callable<User>()
  {

    public User call()
      throws Exception
    {
      return svc.getUser(id);
    }

  });
}
/* invokeAll blocks until all service requests complete, 
 * or a max of 10 seconds. */
List<Future<User>> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS);
for (Future<User> f : results) {
  User user = f.get();
  /* Add user to batch update. */
  ...
}
/* Commit batch. */
...

回答by daveb

The canonical approach is to use a Callableand an ExecutorService. submitting a Callableto an ExecutorServicereturns a (typesafe) Futurefrom which you can getthe result.

规范方法是使用 aCallable和 an ExecutorServicesubmitting a Callableto anExecutorService返回一个(类型安全)Future,您可以从中get获得结果。

class TaskAsCallable implements Callable<Result> {
    @Override
    public Result call() {
        return a new Result() // this is where the work is done.
    }
}

ExecutorService executor = Executors.newFixedThreadPool(300);
Future<Result> task = executor.submit(new TaskAsCallable());
Result result = task.get(); // this blocks until result is ready

In your case, you probably want to use invokeAllwhich returns a Listof Futures, or create that list yourself as you add tasks to the executor. To collect results, simply call geton each one.

在您的情况下,您可能希望使用invokeAllwhich 返回 a Listof Futures,或者在向执行程序添加任务时自己创建该列表。要收集结果,只需调用get每个结果。

回答by fastcodejava

You need to store the result in a something like singleton. This has to be properly synchronized.

您需要将结果存储在类似单例的东西中。这必须正确同步。

EDIT: I know this not the best advice as it is not good idea to handle raw Threads. But given the question this will work, won't it? I may not be upvoted, but why down vote?

编辑:我知道这不是最好的建议,因为处理 raw 不是一个好主意Threads。但考虑到这个问题会奏效,不是吗?我可能不会被赞成,但为什么反对?

回答by rsp

You could create a queue or list which you pass to the threads you create, the threads add their result to the list which gets emptied by a consumer which performs the batch insert.

您可以创建一个队列或列表,将其传递给您创建的线程,线程将其结果添加到列表中,该列表由执行批量插入的使用者清空。

回答by AndiDog

The simplest approach is to pass an object to each thread (one object per thread) that will contain the result later. The main thread should keep a reference to each result object. When all threads are joined, you can use the results.

最简单的方法是将一个对象传递给稍后将包含结果的每个线程(每个线程一个对象)。主线程应该保留对每个结果对象的引用。当所有线程都加入时,您可以使用结果。

回答by Oso

public class TopClass {
     List<User> users = new ArrayList<User>();
     void addUser(User user) {
         synchronized(users) {
             users.add(user);
         }
     }
     void store() throws SQLException {
        //storing code goes here
     }
     class MyThread extends Thread {
            MyService service;
            String id;
            public MyThread(String id) {
                this.id = node;
            }
            public void run() {
                User user = service.getUser(id)
                addUser(user);
            }
        }
}

回答by jonescb

You could make a class which extends Observable. Then your thread can call a method in the Observable class which would notify any classes that registered in that observer by calling Observable.notifyObservers(Object).

您可以创建一个扩展 Observable 的类。然后您的线程可以调用 Observable 类中的一个方法,该方法将通过调用 Observable.notifyObservers(Object) 通知在该观察者中注册的任何类。

The observing class would implement Observer, and register itself with the Observable. You would then implement an update(Observable, Object) method that gets called when Observerable.notifyObservers(Object) is called.

观察类将实现观察者,并向观察者注册自己。然后,您将实现一个 update(Observable, Object) 方法,该方法在 Observerable.notifyObservers(Object) 被调用时被调用。

回答by Bill K

Store your result in your object. When it completes, have it drop itself into a synchronized collection (a synchronized queue comes to mind).

将结果存储在对象中。当它完成时,让它自己放入一个同步集合中(想到一个同步队列)。

When you wish to collect your results to submit, grab everything from the queue and read your results from the objects. You might even have each object know how to "post" it's own results to the database, this way different classes can be submitted and all handled with the exact same tiny, elegant loop.

当您希望收集要提交的结果时,请从队列中获取所有内容并从对象中读取您的结果。您甚至可能让每个对象都知道如何将它自己的结果“发布”到数据库,这样可以提交不同的类,并且所有类都可以使用完全相同的微小、优雅的循环进行处理。

There are lots of tools in the JDK to help with this, but it is really easy once you start thinking of your thread as a true object and not just a bunch of crap around a "run" method. Once you start thinking of objects this way programming becomes much simpler and more satisfying.

JDK 中有很多工具可以帮助解决这个问题,但是一旦您开始将线程视为真正的对象,而不仅仅是围绕“run”方法的一堆废话,这真的很容易。一旦你开始以这种方式思考对象,编程就会变得更加简单和令人满意。

回答by Sleiman Jneidi

In Java8 there is better way for doing this using CompletableFuture. Say we have class that get's id from the database, for simplicity we can just return a number as below,

在 Java8 中,使用CompletableFuture有更好的方法来做到这一点。假设我们有一个从数据库中获取 id 的类,为了简单起见,我们可以只返回一个数字,如下所示,

static class GenerateNumber implements Supplier<Integer>{

    private final int number;

    GenerateNumber(int number){
        this.number = number;
    }
    @Override
    public Integer get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return this.number;
    }
}

Now we can add the result to a concurrent collection once the results of every future is ready.

现在,一旦每个未来的结果准备就绪,我们就可以将结果添加到并发集合中。

Collection<Integer> results = new ConcurrentLinkedQueue<>();
int tasks = 10;
CompletableFuture<?>[] allFutures = new CompletableFuture[tasks];
for (int i = 0; i < tasks; i++) {
     int temp = i;
     CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()-> new GenerateNumber(temp).get(), executor);
     allFutures[i] = future.thenAccept(results::add);
 }

Now we can add a callback when all the futures are ready,

现在我们可以在所有期货准备就绪时添加回调,

CompletableFuture.allOf(allFutures).thenAccept(c->{
   System.out.println(results); // do something with result
});