Java 如何使用固定数量的工作线程实现简单线程

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/125333/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 08:35:53  来源:igfitidea点击:

How to implement simple threading with a fixed number of worker threads

javamultithreadingconcurrency

提问by shsteimer

I'm looking for the simplest, most straightforward way to implement the following:

我正在寻找最简单、最直接的方法来实现以下内容:

  • The main program instantiates worker threads to do a task.
  • Only ntasks can be running at once.
  • When nis reached, no more workers are started until the count of running threads drops back below n.
  • 主程序实例化工作线程以执行任务。
  • 只有n任务可以同时运行。
  • n到达,没有更多的工人开始,直到运行的线程数降回到低于n

采纳答案by erickson

I think that Executors.newFixedThreadPoolfits your requirements. There are a number of different ways to use the resulting ExecutorService, depending on whether you want a result returned to the main thread, or whether the task is totally self-contained, and whether you have a collection of tasks to perform up front, or whether tasks are queued in response to some event.

我认为Executors.newFixedThreadPool符合您的要求。使用生成的 ExecutorService 有多种不同的方法,这取决于您是否希望将结果返回到主线程,或者该任务是否完全独立,以及您是否有一组要预先执行的任务,或者任务是否排队以响应某个事件。

  Collection<YourTask> tasks = new ArrayList<YourTask>();
  YourTask yt1 = new YourTask();
  ...
  tasks.add(yt1);
  ...
  ExecutorService exec = Executors.newFixedThreadPool(5);
  List<Future<YourResultType>> results = exec.invokeAll(tasks);

Alternatively, if you have a new asynchronous task to perform in response to some event, you probably just want to use the ExecutorService's simple execute(Runnable)method.

或者,如果您有一个新的异步任务要执行以响应某个事件,您可能只想使用 ExecutorService 的简单execute(Runnable)方法。

回答by hazzen

Use the Executor framework; namely newFixedThreadPool(N)

使用Executor框架;即newFixedThreadPool(N)

回答by Matt

Executors.newFixedThreadPool(int)

Executors.newFixedThreadPool(int)

Executor executor = Executors.newFixedThreadPool(n);

Runnable runnable = new Runnable() {
 public void run() {
  // do your thing here
 }
}

executor.execute(runnable);

回答by rjohnston

If you want to roll your own:

如果你想推出自己的:

private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);

private boolean roomLeft() {
    synchronized (workers) {
        return (workers.size() < MAX_WORKERS);
    }
}

private void addWorker() {
    synchronized (workers) {
        workers.add(new Worker(this));
    }
}

public void removeWorker(Worker worker) {
    synchronized (workers) {
        workers.remove(worker);
    }
}

public Example() {
    while (true) {
        if (roomLeft()) {
            addWorker();
        } 
    }
}

Where Worker is your class that extends Thread. Each worker will call this class's removeWorker method, passing itself in as a parameter, when it's finished doing it's thing.

其中 Worker 是您扩展 Thread 的类。每个worker都会调用这个类的removeWorker方法,将自己作为参数传入,当它完成它的事情时。

With that said, the Executor framework looks a lot better.

话虽如此,Executor 框架看起来好多了。

Edit: Anyone care to explain why this is so bad, instead of just downmodding it?

编辑:有人愿意解释为什么这如此糟糕,而不仅仅是对其进行向下修改吗?

回答by Eli Courtwright

As others here have mentioned, your best bet is to make a thread pool with the Executorsclass:

正如这里的其他人所提到的,最好的办法是使用Executors类创建一个线程池:

However, if you want to roll your own, this code should give you an idea how to proceed. Basically, just add every new thread to a thread group and make sure that you never have more than N active threads in the group:

但是,如果您想推出自己的产品,此代码应该可以让您了解如何继续。基本上,只需将每个新线程添加到线程组中,并确保组中的活动线程永远不会超过 N:

Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
    if( group.activeCount()<N && i<tasks.length ) {
        new TaskThread(group, tasks[i]).start();
        i++;
    } else {
        Thread.sleep(100);
    }
}

回答by Fabian Steeg

/* Get an executor service that will run a maximum of 5 threads at a time: */
ExecutorService exec = Executors.newFixedThreadPool(5);
/* For all the 100 tasks to be done altogether... */
for (int i = 0; i < 100; i++) {
    /* ...execute the task to run concurrently as a runnable: */
    exec.execute(new Runnable() {
        public void run() {
            /* do the work to be done in its own thread */
            System.out.println("Running in: " + Thread.currentThread());
        }
    });
}
/* Tell the executor that after these 100 steps above, we will be done: */
exec.shutdown();
try {
    /* The tasks are now running concurrently. We wait until all work is done, 
     * with a timeout of 50 seconds: */
    boolean b = exec.awaitTermination(50, TimeUnit.SECONDS);
    /* If the execution timed out, false is returned: */
    System.out.println("All done: " + b);
} catch (InterruptedException e) { e.printStackTrace(); }

回答by Ravindra babu

  1. If your task queue is not going to be unbounded and tasks can complete in shorter time intervals, you can use Executors.newFixedThreadPool(n); as suggests by experts.

    The only drawback in this solution is unbounded task queue size. You don't have control over it. The huge pile-up in task queue will degrade performance of application and may cause out of memory in some scenarios.

  2. If you want to use ExecutorServiceand enable work stealingmechanism where idle worker threads share the work load from busy worker threads by stealing tasks in task queue. It will return ForkJoinPool type of Executor Service.

    public static ExecutorService newWorkStealingPool(int parallelism)

    Creates a thread pool that maintains enough threads to support the given parallelism level, and may use multiple queues to reduce contention. The parallelism level corresponds to the maximum number of threads actively engaged in, or available to engage in, task processing. The actual number of threads may grow and shrink dynamically. A work-stealing pool makes no guarantees about the order in which submitted tasks are executed.

  3. I prefer ThreadPoolExecutordue to flexibility in APIs to control many paratmeters, which controls the flow task execution.

    ThreadPoolExecutor(int corePoolSize, 
                           int maximumPoolSize, 
                           long keepAliveTime, 
                           TimeUnit unit, 
                           BlockingQueue<Runnable> workQueue, 
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler)
    
  1. 如果您的任务队列不会是无限的并且任务可以在更短的时间间隔内完成,您可以使用Executors.newFixedThreadPool(n); 正如专家所建议的那样。

    此解决方案的唯一缺点是无限的任务队列大小。你无法控制它。任务队列中的大量堆积会降低应用程序的性能,并且在某些情况下可能会导致内存不足。

  2. 如果您想使用ExecutorService和启用work stealing空闲工作线程通过窃取任务队列中的任务来共享繁忙工作线程的工作负载的机制。它将返回 ForkJoinPool 类型的 Executor Service。

    公共静态ExecutorService newWorkStealingPool(int 并行性)

    创建一个线程池,维护足够的线程以支持给定的并行度级别,并且可以使用多个队列来减少争用。并行度级别对应于主动参与或可用于参与任务处理的最大线程数。线程的实际数量可能会动态增长和收缩。工作窃取池不保证提交任务的执行顺序。

  3. ThreadPoolExecutor由于 API 的灵活性,我更喜欢控制许多参数,这些参数控制流任务的执行。

    ThreadPoolExecutor(int corePoolSize, 
                           int maximumPoolSize, 
                           long keepAliveTime, 
                           TimeUnit unit, 
                           BlockingQueue<Runnable> workQueue, 
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler)
    

in your case, set both corePoolSize and maximumPoolSize as N. Here you can control task queue size, define your own custom thread factory and rejection handler policy.

在你的情况下,设置两者corePoolSize and maximumPoolSize as N。在这里您可以控制任务队列大小,定义您自己的自定义线程工厂和拒绝处理程序策略。

Have a look at related SE question to control the pool size dynamically:

查看相关的 SE 问题以动态控制池大小:

Dynamic Thread Pool

动态线程池