multithreading 保证线程池中的任务执行顺序

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/7192223/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-10 01:16:15  来源:igfitidea点击:

Ensuring task execution order in threadpool

multithreadingdesign-patternsconcurrencythreadpool

提问by nc3b

I have been reading about the thread-pool pattern and I can't seem to find the usual solution for the following problem.

我一直在阅读有关线程池模式的信息,但似乎无法找到以下问题的常用解决方案。

I sometimes want tasks to be executed serially. For example, I read chunks of text from a file and for some reason I need the chunks to be processed in that order. So basically I want to eliminate concurrency for some of the tasks.

我有时希望任务可以串行执行。例如,我从文件中读取文本块,出于某种原因,我需要按该顺序处理文本块。所以基本上我想消除一些任务的并发性。

Consider this scenario where the tasks with *need to be processed in the order they were pushed in. The other tasks can be processed in any order.

考虑这种情况,其中*需要按照推送的顺序处理任务。其他任务可以按任何顺序处理。

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on

In the context of a thread-pool, without this constraint, a single queue of pending tasks works fine but clearly here it doesn't.

在线程池的上下文中,如果没有此约束,单个待处理任务队列可以正常工作,但在这里显然不行。

I thought about having some of the threadsoperate on a thread-specific queue and the others on the "global" queue. Then, in order to execute some of the tasks serially, I simply have to push them onto a queue where a single thread looks. It doessounds a bit clumsy.

我想过让一些线程在线程特定的队列上运行,而其他线程在“全局”队列上运行。然后,为了串行执行某些任务,我只需要将它们推送到单个线程所在的队列中。它确实听起来有点笨拙。

So, the real question in this long story: how would you solve this ? How would you ensure those tasks are ordered?

所以,在这个长篇小说中真正的问题是:你将如何解决这个问题?你如何确保这些任务是有序的

EDIT

编辑

As a more general problem, suppose the scenario above becomes

作为一个更普遍的问题,假设上面的场景变成

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on

What I mean is that the tasks within a group should be executed sequentially, but the groups themselves can mix. So you can have 3-2-5-4-7for example.

我的意思是组内的任务应该按顺序执行,但组本身可以混合。所以你可以有3-2-5-4-7例如。

One other thing to note is that I don't have access to all the tasks in a group upfront (and I can't wait for all of them to arrive before starting the group).

另一件需要注意的事情是,我无法预先访问组中的所有任务(并且我不能在开始组之前等待所有任务都到达)。

Thank you for your time.

感谢您的时间。

采纳答案by Tim Lloyd

Something like the following will allow serial and parallel tasks to be queued, where serial tasks will be executed one after the other, and parallel tasks will be executed in any order, but in parallel. This gives you the ability to serialize tasks where necessary, also have parallel tasks, but do this as tasks are received i.e. you do not need to know about the entire sequence up-front, execution order is maintained dynamically.

类似于以下内容将允许串行和并行任务排队,其中串行任务将一个接一个执行,并行任务将以任何顺序执行,但并行执行。这使您能够在必要时序列化任务,也有并行任务,但在接收到任务时执行此操作,即您不需要预先了解整个序列,执行顺序是动态维护的。

internal class TaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    {
        lock (_syncObj)
        {
            _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
        }

        ProcessTaskQueue();
    }

    public int Count
    {
        get{lock (_syncObj){return _tasks.Count;}}
    }

    private void ProcessTaskQueue()
    {
        lock (_syncObj)
        {
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            {
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            }

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            {
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            }
        }
    }

    private void QueueUserWorkItem(QTask qTask)
    {
        Action completionTask = () =>
        {
            qTask.Task();

            OnTaskCompleted();
        };

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    }

    private void OnTaskCompleted()
    {
        lock (_syncObj)
        {
            if (--_runningTaskCount == 0)
            {
                ProcessTaskQueue();
            }
        }
    }

    private class QTask
    {
        public Action Task { get; set; }
        public bool IsParallel { get; set; }
    }
}

Update

更新

To handle task groups with serial and parallel task mixes, a GroupedTaskQueuecan manage a TaskQueuefor each group. Again, you do not need to know about groups up-front, it is all dynamically managed as tasks are received.

要处理具有串行和并行任务组合的任务组,GroupedTaskQueue可以TaskQueue为每个组管理一个。同样,您不需要预先了解组,所有这些都是在接收任务时动态管理的。

internal class GroupedTaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    {
        Queue(_defaultGroup, isParallel, task);
    }

    public void Queue(string group, bool isParallel, Action task)
    {
        TaskQueue queue;

        lock (_syncObj)
        {
            if (!_queues.TryGetValue(group, out queue))
            {
                queue = new TaskQueue();

                _queues.Add(group, queue);
            }
        }

        Action completionTask = () =>
        {
            task();

            OnTaskCompleted(group, queue);
        };

        queue.Queue(isParallel, completionTask);
    }

    private void OnTaskCompleted(string group, TaskQueue queue)
    {
        lock (_syncObj)
        {
            if (queue.Count == 0)
            {
                _queues.Remove(group);
            }
        }
    }
}

回答by Anthony Williams

Thread pools are good for cases where the relative order of the tasks doesn't matter, provided they all get done. In particular, it must be OK for them all to be done in parallel.

线程池适用于任务的相对顺序无关紧要的情况,前提是它们都完成了。特别是,它们必须并行完成。

If your tasks must be done in a specific order, then they are not suitable for parallelism, so a thread pool is not appropriate.

如果你的任务必须按照特定的顺序完成,那么它们不适合并行化,因此线程池是不合适的。

If you want to move these serial tasks off the main thread, then a single background thread with a task queue would be appropriate for those tasks. You can continue to use a thread pool for the remaining tasks which are suitable for parallelism.

如果您想将这些串行任务移出主线程,那么带有任务队列的单个后台线程将适合这些任务。您可以继续将线程池用于适合并行的剩余任务。

Yes, it means you have to decide where to submit the task depending on whether it is an in-order task or a "may be parallelized" task, but this is not a big deal.

是的,这意味着您必须根据它是有序任务还是“可能并行化”的任务来决定在哪里提交任务,但这没什么大不了的。

If you have groups that must be serialized, but which can run in parallel with other tasks then you have multiple choices:

如果您的组必须序列化,但可以与其他任务并行运行,那么您有多种选择:

  1. Create a single task for each group, which does the relevant group tasks in order, and post this task to the thread pool.
  2. Have each task in a group explicitly wait for the previous task in the group, and post them to the thread pool. This requires that your thread pool can handle the case where a thread is waiting for a not-yet-scheduled task without deadlocking.
  3. Have a dedicated thread for each group, and post group tasks on the appropriate message queue.
  1. 为每个组创建单个任务,依次执行相关组任务,并将该任务发布到线程池中。
  2. 让组中的每个任务显式等待组中的前一个任务,并将它们发布到线程池中。这要求您的线程池可以处理线程正在等待尚未调度的任务而不会死锁的情况。
  3. 每个组都有一个专门的线程,并将组任务发布到适当的消息队列上。

回答by pvoosten

Basically, there are a number of pending tasks. Some of the tasks can only be performed when one or more other pending tasks have finished executing.

基本上,有许多待处理的任务。某些任务只能在一个或多个其他待处理任务执行完毕后才能执行。

The pending tasks can be modeled in a dependency graph:

挂起的任务可以在依赖图中建模:

  • "task 1 -> task2" means "task 2 can be executed only after task 1 is finished." the arrows point in the direction of execution order.
  • the indegree of a task (the number of tasks pointing to it) determines whether the task is ready for execution. If the indegree is 0, it can be executed.
  • sometimes a task must wait for multiple tasks to finish, the indegree is then >1.
  • if a task doesn't have to wait for other tasks to finish anymore (its indegree is zero), it can be submitted to the thread pool with worker threads, or the queue with tasks waiting to be picked up by a worker thread. You know the submitted task will not cause deadlock, because the task isn't waiting for anything. As an optimization, you can use a priority queue, e.g. in which tasks that more tasks in the dependency graph depend on will be executed first. This also can't provoke deadlock, because all tasks in the thread pool can be executed. It can provoke starvation, however.
  • If a task finishes execution, it can be removed from the dependency graph, possibly reducing the indegree of other tasks, which can in turn be submitted to the pool of working threads.
  • “task 1 -> task2”的意思是“只有在task 1完成后才能执行task 2”。箭头指向执行顺序的方向。
  • 任务的入度(指向它的任务数)决定了任务是否准备好执行。如果入度为0,则可以执行。
  • 有时一个任务必须等待多个任务完成,那么indegree>1。
  • 如果一个任务不必再等待其他任务完成(其入度为零),则可以将其提交到带有工作线程的线程池,或者带有等待由工作线程拾取的任务的队列。您知道提交的任务不会导致死锁,因为任务不会等待任何事情。作为优化,您可以使用优先级队列,例如,依赖图中的更多任务所依赖的任务将首先执行。这也不会引起死锁,因为线程池中的所有任务都可以执行。然而,它可能会引起饥饿。
  • 如果一个任务完成执行,它可以从依赖图中删除,可能会减少其他任务的 indegree,这些任务可以依次提交到工作线程池。

So there is (at least) one thread used to add/remove pending tasks, and there is a thread pool of working threads.

所以有(至少)一个线程用于添加/删除挂起的任务,并且有一个工作线程的线程池。

When a task is added to the dependency graph, you must check:

当一个任务被添加到依赖图中时,你必须检查:

  • how the task is connected in the dependency graph: what tasks must it wait for to finish and what tasks must wait for it to finish? Draw connections from and to the new task accordingly.
  • once the connections are drawn: did the new connections cause any cycles in the dependency graph? If so, there is a deadlock situation.
  • 任务在依赖图中是如何连接的:它必须等待哪些任务完成,哪些任务必须等待它完成?相应地绘制新任务之间的连接。
  • 绘制连接后:新连接是否导致依赖图中出现任何循环?如果是这样,则存在死锁情况。

Performance:

性能

  • this pattern is slower than sequential execution if parallel execution is in fact rarely possible, because you need extra administration to do everything almost sequentially anyway.
  • this pattern is fast if many tasks can be performed simultaneously in practice.
  • 如果实际上几乎不可能并行执行,则此模式比顺序执行慢,因为无论如何您都需要额外的管理来几乎按顺序执行所有操作。
  • 如果在实践中可以同时执行许多任务,则此模式很快。

Assumptions:

假设

As you may have read between the lines, you must design the tasks so that they don't interfere with other tasks. Also, there must be a way to determine the priority of the tasks. The task priority should include the data handled by each task. Two tasks may not alter the same object simultaneously; one of the tasks should get priority over the other one instead, or the performed operations on the object must be thread-safe.

正如您可能在字里行间读到的那样,您必须设计任务,使其不会干扰其他任务。此外,必须有一种方法来确定任务的优先级。任务优先级应该包括每个任务处理的数据。两个任务不能同时改变同一个对象;其中一个任务应该优先于另一个任务,或者对对象执行的操作必须是线程安全的。

回答by Martin

To do what you want to do with a threadpool, you might have to create some kind of scheduler.

要使用线程池做您想做的事情,您可能必须创建某种调度程序。

Something like that:

类似的东西:

TaskQueue -> Scheduler -> Queue -> ThreadPool

任务队列 -> 调度程序 -> 队列 -> 线程池

Scheduler runs in its own thread, keeping tracks of dependencies between jobs. When a job is ready to be done, the scheduler just pushes it in the queue for the threadpool.

调度程序在它自己的线程中运行,跟踪作业之间的依赖关系。当一个作业准备好完成时,调度程序只是将它推入线程池的队列中。

The ThreadPool might have to send signals to the Scheduler to indicate when a job is done so the scheduler can put jobs depending on that job into the Queue.

线程池可能必须向调度程序发送信号以指示作业何时完成,以便调度程序可以根据该作业将作业放入队列中。

In your case, the dependencies could probably be stored in a linked list.

在您的情况下,依赖项可能存储在链表中。

Let's say you have the following dependencies: 3 -> 4 -> 6 -> 8

假设您有以下依赖项:3 -> 4 -> 6 -> 8

Job 3 is running on the threadpool, you still have no ideas that job 8 exists.

作业 3 正在线程池上运行,您仍然不知道作业 8 存在。

Job 3 ends. You remove the 3 from the linked list, you put job 4 on the queue to the threadpool.

作业 3 结束。您从链表中删除 3,将作业 4 放在线程池的队列中。

Job 8 arrives. You put it at the end of the linked list.

工作 8 到了。你把它放在链表的末尾。

The only constructs that have to be fully synchronized are the Queues before and after the scheduler.

唯一必须完全同步的结构是调度程序之前和之后的队列。

回答by Matt

If I understand the problem correctly, the jdk executors don't have this capability but it's easy to roll your own. You basically need

如果我正确理解了这个问题,jdk executors 没有这个能力,但是很容易推出你自己的。你基本上需要

  • a pool of worker threads, each of which has a dedicated queue
  • some abstraction over those queues to which you offer work (c.f. the ExecutorService)
  • some algorithm that deterministically selects a specific queue for each piece of work
  • each piece of work then gets offers to the right queue and hence gets processed in the right order
  • 一个工作线程池,每个线程都有一个专用队列
  • 对您提供工作的那些队列的一些抽象(参见ExecutorService
  • 某些算法为每个工作确定性地选择特定队列
  • 然后每件工作都会被提供到正确的队列中,从而以正确的顺序进行处理

The difference to the jdk executors is that they have 1 queue with n threads but you want n queues and m threads (where n may or may not equal m)

与 jdk 执行器的不同之处在于它们有 1 个包含 n 个线程的队列,但您需要 n 个队列和 m 个线程(其中 n 可能等于也可能不等于 m)

* edit after reading that each task has a key *

* 阅读后编辑每个任务都有一个键 *

In a bit more detail

详细一点

  • write some code that transforms a key into an index (an int) in a given range (0-n where n is the number of threads you want), this could be as simple as key.hashCode() % nor it could be some static mapping of known key values to threads or whatever you want
  • at startup
    • create n queues, put them in an indexed structure (array, list whatever)
    • start n threads, each thread just does a blocking take from the queue
    • when it receives some work, it knows how to execute work specific to that task/event (you can obviously have some mapping of tasks to actions if you have heterogenous events)
  • store this behind some facade that accepts the work items
  • when a task arrives, hand it to the facade
    • the facade finds the right queue for the task based on the key, offers it to that queue
  • 编写一些代码,将键转换为给定范围内的索引(整数)(0-n,其中 n 是您想要的线程数),这可以很简单,key.hashCode() % n也可以是已知键值的一些静态映射线程或任何你想要的
  • 启动时
    • 创建 n 个队列,将它们放入索引结构中(数组,列出任何内容)
    • 启动 n 个线程,每个线程只从队列中进行阻塞处理
    • 当它收到一些工作时,它知道如何执行特定于该任务/事件的工作(如果您有异构事件,您显然可以将任务映射到操作)
  • 将其存储在接受工作项的某个外观后面
  • 当任务到达时,将其交给门面
    • 外观根据键为任务找到正确的队列,并将其提供给该队列

it's easier enough to add auto restarting worker threads to this scheme, you just then need the worker thread to register with some manager to state "I own this queue" and then some housekeeping around that + detection of errors in the thread (which means it unregisters the ownership of that queue returning the queue to a free pool of queues which is a trigger to start a new thread up)

将自动重新启动的工作线程添加到这个方案就足够容易了,然后你需要工作线程向某个管理器注册以声明“我拥有这个队列”,然后围绕该队列进行一些内务处理 + 检测线程中的错误(这意味着它取消注册该队列的所有权,将队列返回到空闲队列池,这是启动新线程的触发器)

回答by Andriy Tylychko

I think thread pool can be effectively used in this situation. The idea is to use separate strandobject for each group of dependent tasks. You add tasks to your queue with or w/o strandobject. You use the same strandobject with dependent tasks. Your scheduler checks if the next task has a strandand if this strandis locked. If not - lock this strandand run this task. If strandis already locked - keep this task in queue until next scheduling event. When task is done unlock its strand.

我认为在这种情况下可以有效地使用线程池。这个想法是strand为每组依赖任务使用单独的对象。您可以使用或不使用strand对象将任务添加到队列中。您对strand相关任务使用相同的对象。您的调度程序会检查下一个任务是否具有strand以及是否strand已锁定。如果没有 - 锁定它strand并运行此任务。如果strand已锁定 - 将此任务保留在队列中,直到下一个调度事件。任务完成后解锁其strand.

In result you need single queue, you don't need any additional threads, no complicated groups etc. strandobject can be very simple with two methods lockand unlock.

结果你需要单队列,你不需要任何额外的线程,没有复杂的组等。strand对象可以很简单,有两种方法lockunlock

I often meet the same design problem, e.g. for an asynchronous network server that handles multiple simultaneous sessions. Sessions are independent (this maps them to your independent tasks and groups of dependent tasks) when tasks inside sessions are dependent (this maps session internal tasks to your dependent tasks inside a group). Using described approach I avoid explicit synchronization inside session completely. Every session has own strandobject.

我经常遇到相同的设计问题,例如处理多个同时会话的异步网络服务器。当会话内的任务是相关的(这将会话内部任务映射到组内的从属任务)时,会话是独立的(这将它们映射到您的独立任务和相关任务组)。使用描述的方法,我完全避免了会话内部的显式同步。每个会话都有自己的strand对象。

And what is more, I use existing (great) implementation of this idea: Boost Asio library(C++). I just used their term strand. Implementation is elegant: I wrapmy async tasks into corresponding strandobject before scheduling them.

更重要的是,我使用了这个想法的现有(伟大的)实现:Boost Asio 库(C++)。我只是用了他们的术语strand。实现是优雅的:在调度它们之前,我我的异步任务包装到相应的strand对象中。

回答by garik

Use two Active Objects. In two words: active object pattern consists from priority queue and 1 or many working threads those can get tasks from queue and process its.

使用两个活动对象。简而言之:活动对象模式由优先级队列和1个或多个可以从队列中获取任务并对其进行处理的工作线程组成。

So use one active object with one working thread: all tasks those would be places to queue would be processed sequentially. Use second active object with number of working thread more then 1. In this case working threads would get and process tasks from queue in any order.

因此,使用一个活动对象和一个工作线程:所有要排队的任务都将按顺序处理。使用工作线程数大于 1 的第二个活动对象。在这种情况下,工作线程将以任何顺序从队列中获取和处理任务。

Luck.

运气。

回答by I GIVE CRAP ANSWERS

Option 1 - The complex one

选项 1 - 复杂的

Since you have sequential jobs, you can gather up those jobs in a chain and let the jobs themselves resubmit to the thread pool once they are done. Suppose we have a list of jobs:

由于您有顺序作业,您可以将这些作业收集到一个链中,并让作业在完成后重新提交到线程池。假设我们有一个工作列表:

 [Task1, ..., Task6]

like in your example. We have a sequential dependency, such that [Task3, Task4, Task6]is a dependency chain. We now make a job (Erlang pseudo-code):

就像在你的例子中一样。我们有一个顺序依赖,这[Task3, Task4, Task6]就是一个依赖链。我们现在做一个工作(Erlang伪代码):

 Task4Job = fun() ->
               Task4(), % Exec the Task4 job
               push_job(Task6Job)
            end.
 Task3Job = fun() ->
               Task3(), % Execute the Task3 Job
               push_job(Task4Job)
            end.
 push_job(Task3Job).

That is, we alter the Task3job by wrapping it into a job which as a continuationpushes the next job in the queue to the thread pool. There are strong similarities to a general continuation passing stylehere also seen in systems like Node.jsor Pythons Twistedframework.

也就是说,我们Task3通过将作业包装成一个作业来改变作业,该作业作为延续将队列中的下一个作业推送到线程池。与在Python 或 Python等系统中也可以看到的一般延续传递风格有很强的相似性。Node.jsTwisted

Generalizing, you make a system where you can define job chains which can deferfurther work and resubmit the further work.

概括地说,您创建了一个系统,您可以在其中定义可以defer进一步工作并重新提交进一步工作的工作链。

Option 2 - The simple one

选项 2 - 简单的

Why do we even bother splitting up the jobs? I mean, since they are sequentially dependent, executing all of them on the same Thread won't be faster or slower than taking that chain and spreading it out over multiple threads. Assuming "enough" work load, any thread will always have work to anyway, so just bundling the jobs together is probably easiest:

为什么我们还要分工?我的意思是,由于它们是顺序依赖的,因此在同一个线程上执行所有这些不会比采用该链并将其分散到多个线程上更快或更慢。假设“足够”的工作负载,任何线程无论如何都会有工作,所以将作业捆绑在一起可能是最简单的:

  Task = fun() ->
            Task3(),
            Task4(), 
            Task6()  % Just build a new job, executing them in the order desired
         end,
  push_job(Task).

It is rather easy to do stuff like this if you have functions as first-class citizens so you can build them in your language at whim, like you can in, say, Any functional programming language, Python, Ruby-blocks - and so on.

如果您将函数作为一等公民,那么做这样的事情是相当容易的,这样您就可以随心所欲地用您的语言构建它们,就像您可以使用任何函数式编程语言、Python、Ruby 块等一样.

I don't particularly like the idea of building a queue, or a continuation stack, like in "Option 1" though and I would definitely go with the second option. In Erlang, we even have a programs called jobswritten by Erlang Solutions and released as Open Source. jobsis built to execute and load regulate job executions like these. I'd probably combine option 2 with jobs if I were to solve this problem.

我并不特别喜欢构建队列或延续堆栈的想法,就像在“选项 1”中一样,我肯定会选择第二个选项。在 Erlang 中,我们甚至有一个jobs由 Erlang Solutions 编写并作为开源发布的程序。jobs旨在执行和负载调节此类作业执行。如果我要解决这个问题,我可能会将选项 2 与工作结合起来。

回答by mdma

The answers suggesting not use a thread-pool is like hard-coding the knowledge of task dependencies/execution order. Instead, I would create a CompositeTaskthat manges the start/end dependency between two tasks. By encapsulating the dependency behind the task interface, all tasks can be treated uniformly, and added to the pool. This hides the execution details and allows the task dependencies to change without affecting whether or not you use a thread pool.

建议不使用线程池的答案就像硬编码任务依赖性/执行顺序的知识。相反,我会创建一个CompositeTask管理两个任务之间的开始/结束依赖关系。通过封装任务接口背后的依赖,可以统一对待所有任务,并加入到池中。这隐藏了执行细节,并允许任务依赖改变而不会影响您是否使用线程池。

The question doesn't specify a language - I'll use Java, which I hope is readable for most.

这个问题没有指定一种语言 - 我将使用 Java,我希望它对大多数人来说都是可读的。

class CompositeTask implements Task
{
    Task firstTask;
    Task secondTask;

    public void run() {
         firstTask.run();
         secondTask.run();
    }
}

This executes tasks sequentially and on the same thread. You can chain many CompositeTasks together to create a sequence of as many sequential tasks as needed.

这会在同一线程上按顺序执行任务。您可以将多个CompositeTasks 链接在一起,以根据需要创建一个由多个顺序任务组成的序列。

The downside here is that this ties up the thread for the duration of all tasks executing sequentially. You may have other tasks that you would prefer to execute inbetween the first and second tasks. So, rather than execute the second task directly, have the composite task schedule execution of the second task:

这里的缺点是,这会在所有任务顺序执行期间占用线程。您可能希望在第一个和第二个任务之间执行其他任务。因此,与其直接执行第二个任务,不如让复合任务调度执行第二个任务:

class CompositeTask implements Runnable
{
    Task firstTask;
    Task secondTask;
    ExecutorService executor;

    public void run() {
         firstTask.run();
         executor.submit(secondTask);
    }
}

This ensures that the second task doesn't run until after the first task is complete and also allows the pool to execute other (possibly more urgent) tasks. Note that the first and second tasks may execute on separate threads, so although they do not execute concurrently, any shared data used by the tasks must be made visible to other threads (e.g. by making the variables volatile.)

这确保第二个任务在第一个任务完成后才运行,并且还允许池执行其他(可能更紧急的)任务。请注意,第一个和第二个任务可能在单独的线程上执行,因此尽管它们不会同时执行,但任务使用的任何共享数据必须对其他线程可见(例如,通过设置变量volatile.)

This is a simple, yet powerful and flexible approach, and allows the tasks themselves to define execution constraints, rather than doing it by using different thread pools.

这是一种简单但功能强大且灵活的方法,并允许任务本身定义执行约束,而不是通过使用不同的线程池来实现。

回答by yorkw

This is achievable, well, as far as I understand your scenario. Basically what you need is do something smart to coordinate your tasks in main thread. Java API your need are ExecutorCompletionServiceand Callable

据我了解您的情况,这是可以实现的。基本上你需要做的是在主线程中做一些聪明的事情来协调你的任务。您需要的 Java API 是ExecutorCompletionServiceCallable

First, implement your callable task:

首先,实现您的可调用任务:

public interface MyAsyncTask extends Callable<MyAsyncTask> {
  // tells if I am a normal or dependent task
  private boolean isDependent;

  public MyAsyncTask call() {
    // do your job here.
    return this;
  }
}

Then in your main thread, use CompletionService coordinate the dependent task execution (i.e. a wait mechanism):

然后在你的主线程中,使用 CompletionService 协调依赖任务的执行(即等待机制):

ExecutorCompletionService<MyAsyncTask> completionExecutor = new 
  ExecutorCompletionService<MyAsyncTask>(Executors.newFixedThreadPool(5));
Future<MyAsyncTask> dependentFutureTask = null;
for (MyAsyncTask task : tasks) {
  if (task.isNormal()) {
    // if it is a normal task, submit it immediately.
    completionExecutor.submit(task);
  } else {
    if (dependentFutureTask == null) {
      // submit the first dependent task, get a reference 
      // of this dependent task for later use.
      dependentFutureTask = completionExecutor.submit(task);
    } else {
      // wait for last one completed, before submit a new one.
      dependentFutureTask.get();
      dependentFutureTask = completionExecutor.submit(task);
    }
  }
}

By doing this, you use a single executor (threadpool size 5) execute both normal and dependent tasks, the normal task are executed immediately as soon as submitted, the dependent tasks are executed one by one (wait are performed in main thread by calling get() on Future before submitting new dependent task), so at any point of time, you always have a number of normal tasks and a single dependent task (if exists) running in a single threadpool.

通过这样做,您使用单个执行器(线程池大小为 5)执行普通任务和依赖任务,普通任务一提交立即执行,依赖任务一个一个执行(等待在主线程中通过调用 get () on Future 在提交新的依赖任务之前),因此在任何时间点,您总是在单个线程池中运行多个正常任务和单个依赖任务(如果存在)。

This is just a head start, by using ExecutorCompletionService, FutureTask and Semaphore, you can implement more complex thread coordination scenario.

这只是一个开端,通过使用 ExecutorCompletionService、FutureTask 和 Semaphore,您可以实现更复杂的线程协调场景。