Java ExecutorService(特别是ThreadPoolExecutor)线程安全吗?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/1702386/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 21:44:10  来源:igfitidea点击:

Is ExecutorService (specifically ThreadPoolExecutor) thread safe?

javamultithreadingexecutorservice

提问by leeeroy

Does the ExecutorServiceguarantee thread safety ?

是否ExecutorService保证线程安全?

I'll be submitting jobs from different threads to the same ThreadPoolExecutor, do I have to synchronize access to the executor before interacting/submitting tasks?

我将把来自不同线程的作业提交到同一个 ThreadPoolExecutor,在交互/提交任务之前,我是否必须同步对执行器的访问?

采纳答案by Kevin Bourrillion

It's true, the JDK classes in question don't seem to make an explicit guarantee of thread-safe task submission. However, in practice, all ExecutorService implementations in the library are indeed thread-safe in this way. I think it's reasonable to depend on this. Since all the code implementing these features was placed in the public domain, there's absolutely no motivation for anyone to completely rewrite it a different way.

确实,有问题的 JDK 类似乎没有明确保证线程安全的任务提交。但是,在实践中,库中的所有 ExecutorService 实现确实是线程安全的。我认为依赖这一点是合理的。由于实现这些功能的所有代码都放在公共领域中,因此任何人都绝对没有动机以不同的方式完全重写它。

回答by Adamski

Your question is rather open-ended: All the ExecutorServiceinterface does is guarantee that some thread somewhere will process the submitted Runnableor Callableinstance.

您的问题相当开放:ExecutorService接口所做的只是保证某个地方的某个线程将处理提交的RunnableCallable实例。

If the submitted Runnable/ Callablereferences a shared data structure that is accessible from other Runnable/ Callables instances (potentially being processed simulataneously by different threads), then it is your responsibilityto ensure thread safety across this data structure.

如果提交的Runnable/Callable引用了可从其他Runnable/ Callables 实例访问的共享数据结构(可能由不同的线程同时处理),则您有责任确保跨该数据结构的线程安全。

To answer the second part of your question, yes you will have access to the ThreadPoolExecutor before submitting any tasks; e.g.

要回答问题的第二部分,是的,您可以在提交任何任务之前访问 ThreadPoolExecutor;例如

BlockingQueue<Runnable> workQ = new LinkedBlockingQueue<Runnable>();
ExecutorService execService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.SECONDS, workQ);
...
execService.submit(new Callable(...));

EDIT

编辑

Based on Brian's comment and in case I've misunderstood your question: Submission of tasks from multiple producer threads to the ExecutorServicewill typically be thread-safe (despite not being mentioned explicitly in the interface's API as far as I can tell). Any implementation that didn't offer thread safety would be useless in a multi-threaded environment (as multiple producers / multiple consumers is a fairly common paradigm), and this is specifically what ExecutorService(and the rest of java.util.concurrent) was designed for.

基于 Brian 的评论,以防万一我误解了您的问题:从多个生产者线程提交任务到ExecutorService通常是线程安全的(尽管据我所知在接口的 API 中没有明确提及)。任何不提供线程安全的实现在多线程环境中都是无用的(因为多个生产者/多个消费者是一个相当常见的范例),而这正是ExecutorService(以及其余部分java.util.concurrent)的设计目的。

回答by Scott S. McCoy

For ThreadPoolExecutorthe answer is simply yes. ExecutorServicedoes notmandate or otherwise guarantee that all implementations are thread-safe, and it cannot as it is an interface. These types of contracts are outside of the scope of a Java interface. However, ThreadPoolExecutorboth is and is clearly documented as being thread-safe. Moreover, ThreadPoolExecutormanages it's job queue using java.util.concurrent.BlockingQueuewhich is an interface that requests all implementations are thread-safe. Any java.util.concurrent.*implementation of BlockingQueuecan be safely assumed to be thread-safe. Any non-standard implementation may not, although that would be outright silly if someone were to provide a BlockingQueueimplementing queue which was not thread-safe.

对于ThreadPoolExecutor简单的答案是肯定的ExecutorService没有强制要求或以其他方式保证所有的实现是线程安全的,它不能因为它是一个接口。这些类型的契约超出了 Java 接口的范围。但是,ThreadPoolExecutor两者都是并且明确记录为线程安全的。此外,ThreadPoolExecutor管理它的作业队列使用java.util.concurrent.BlockingQueue它是一个接口,请求所有实现都是线程安全的。可以安全地假定 的任何java.util.concurrent.*实现BlockingQueue是线程安全的。任何非标准的实现都可能不会,尽管如果有人要提供一个BlockingQueue不是线程安全的实现队列,那将是完全愚蠢的。

So the answer to your title question is clearly yes. The answer to the subsequent body of your question is probably, as there are some discrepancies between the two.

所以你的标题问题的答案显然是肯定的。您问题的后续正文的答案可能是,因为两者之间存在一些差异。

回答by Luke Usherwood

(Contrary to other answers) the thread-safety contract isdocumented: look in the interfacejavadocs (as opposed to javadoc of methods). For example, at the bottom of the ExecutorServicejavadoc you find:

(相反,其他的答案)线程安全性合同记载:看在interfaceJavadoc中(而不是方法的javadoc)。例如,在ExecutorServicejavadoc的底部,您可以找到:

Memory consistency effects: Actions in a thread prior to the submission of a Runnable or Callable task to an ExecutorService happen-beforeany actions taken by that task, which in turn happen-beforethe result is retrieved via Future.get().

内存一致性影响:在将 Runnable 或 Callable 任务提交给 ExecutorService之前线程中的操作 发生在该任务采取的任何操作之前,而后者又 发生在通过 Future.get() 检索结果之前

This is sufficient to answer this:

这足以回答这个问题:

"do I have to synchronize access to the executor before interacting/submitting tasks?"

“在交互/提交任务之前,我是否必须同步对执行程序的访问?”

No you don't. It is fine to construct and submit jobs to any (correctly implemented) ExecutorServicewithout external synchronisation. This is one of the main design goals.

不,你没有。可以在ExecutorService没有外部同步的情况下构建作业并将其提交给任何(正确实施)。这是主要的设计目标之一。

ExecutorServiceis a concurrentutility, which is to say that it is designed to operate to the greatest extent without requiring synchronisation, for performance. (Synchronisation causes thread-contention, which can degrade multi-threading efficiency - particularly when scaling up to a large number of threads.)

ExecutorService是一个并发实用程序,也就是说它旨在最大程度地运行而无需同步,以提高性能。(同步会导致线程争用,这会降低多线程效率 - 特别是在扩展到大量线程时。)

There is no guarantee about at what time in the future the tasks will execute or complete (some may even execute immediately on same thread that submitted them) however the worker thread is guaranteed to have seen all effects that the submitting thread has performed up to the point of submission. Therefore (the thread that runs) your task can also safely read any data created for its use without synchronisation, thread-safe classes or any other forms of "safe publication". The act of submitting the task is itself sufficient for "safe publication" of the input data to the task. You just need to ensure that the input data won't be modified in any way while the task is running.

无法保证任务将在未来的什么时间执行或完成(有些甚至可能在提交它们的同一个线程上立即执行)但是工作线程保证已经看到提交线程执行的所有效果,直到提交点。因此(运行的线程)您的任务也可以安全地读取任何为其使用而创建的数据,而无需同步、线程安全类或任何其他形式的“安全发布”。提交任务的行为本身就足以“安全地发布”任务的输入数据。您只需要确保在任务运行时不会以任何方式修改输入数据。

Similarly, when you fetch the result of the task back via Future.get(), the retrieving thread will be guaranteed to see all effects made by the executor's worker thread (in both the returned result, plus any side-effect changes the worker-thread may have made).

类似地,当您通过 取回任务的结果时Future.get(),检索线程将保证看到执行程序的工作线程产生的所有效果(在返回的结果中,加上工作线程可能已经做出的任何副作用更改) .

This contract also implies that it is fine for the tasks themselves to submit more tasks.

该合约还暗示任务本身可以提交更多任务。

"Does the ExecutorService guarantee thread safety ?"

“ExecutorService 是否保证线程安全?”

Now this part of the question is much more general. For example could not find any statement of a thread-safety contract about the method shutdownAndAwaitTermination- although I note that the code sample in the Javadoc does not use synchronisation. (Although perhaps there's a hidden assumption that the shutdown is instigated by the same thread that created the Executor, and not for example a worker thread?)

现在这部分问题更加笼统。例如,找不到任何关于该方法的线程安全合同的声明 shutdownAndAwaitTermination- 尽管我注意到 Javadoc 中的代码示例不使用同步。(尽管可能有一个隐藏的假设,即关闭是由创建 Executor 的同一线程引发的,而不是例如工作线程?)

BTW I'd recommend the book "Java Concurrency In Practice" for a good grounder on the world of concurrent programming.

顺便说一句,我推荐《Java 并发实践》一书,作为并发编程世界的良好基础。

回答by user2357

Contrary to what the answer by Luke Usherwoodclaims, it is not implied by the documentation that ExecutorServiceimplementations are guaranteed to be thread-safe. As to the question of ThreadPoolExecutorspecifically, see other answers.

Luke Usherwood 所声称的答案相反,文档并未暗示ExecutorService实现是线程安全的。至于ThreadPoolExecutor具体的问题,见其他回答。

Yes, a happens-beforerelationship is specified, but this does not imply anything about the thread-safety of the methods themselves, as commented by Miles. In Luke Usherwood's answer it is stated that the former is sufficient to prove the latter but no actual argument is made.

是的,指定了先发生关系,但这并不意味着方法本身的线程安全性,正如Miles所评论的那样。在Luke Usherwood的回答中,指出前者足以证明后者,但没有进行实际论证。

"Thread-safety" can mean various things, but here is a simple counter-example of an Executor(not ExecutorServicebut it makes no difference) that trivially meets the required happens-beforerelationship but is not thread-safe because of unsynchronized access to the countfield.

“线程安全”可能意味着各种事情,但这里有一个简单的反例Executor(不是,ExecutorService但没有区别),它微不足道地满足了所需的发生前关系,但由于对count字段的访问不同步,因此不是线程安全的.

class CountingDirectExecutor implements Executor {

    private int count = 0;

    public int getExecutedTaskCount() {
        return count;
    }

    public void execute(Runnable command) {
        command.run();
    }
}

Disclaimer: I'm no expert and I found this question because I was searching for the answer myself.

免责声明:我不是专家,我发现这个问题是因为我自己也在寻找答案。

回答by salexinx

For ThreadPoolExecutor, it submit is thread safe. You can see the source code in jdk8. When adding a new task, it uses a mainLock to ensure the thread safe.

对于 ThreadPoolExecutor,它提交是线程安全的。你可以在jdk8中看到源代码。添加新任务时,它使用 mainLock 来确保线程安全。

private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);

                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;

                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }

            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());

                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }