java 生产者/消费者工作队列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/2233561/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-29 20:11:34  来源:igfitidea点击:

producer/consumer work queues

javaconcurrencyexecutorserviceproducer-consumerblockingqueue

提问by Jolly Roger

I'm wrestling with the best way to implement my processing pipeline.

我正在努力寻找实现我的处理管道的最佳方式。

My producers feed work to a BlockingQueue. On the consumer side, I poll the queue, wrap what I get in a Runnable task, and submit it to an ExecutorService.

我的制作人将工作提供给 BlockingQueue。在消费者方面,我轮询队列,将我得到的内容包装在一个 Runnable 任务中,并将其提交给一个 ExecutorService。

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}

This is not ideal; the ExecutorService has its own queue, of course, so what's really happening is that I'm always fully draining my work queue and filling the task queue, which slowly empties as the tasks complete.

这并不理想;ExecutorService 有它自己的队列,当然,所以真正发生的是我总是完全排空我的工作队列并填充任务队列,随着任务的完成它慢慢地清空。

I realize that I could queue tasks at the producer end, but I'd really rather not do that - I like the indirection/isolation of my work queue being dumb strings; it really isn't any business of the producer what's going to happen to them. Forcing the producer to queue a Runnable or Callable breaks an abstraction, IMHO.

我意识到我可以在生产者端对任务进行排队,但我真的不想这样做 - 我喜欢我的工作队列的间接/隔离是哑字符串;这真的不是生产者的任何事情,他们会发生什么。强制生产者将 Runnable 或 Callable 排队会破坏抽象,恕我直言。

But I do want the shared work queue to represent the current processing state. I want to be able to block the producers if the consumers aren't keeping up.

但我确实希望共享工作队列代表当前的处理状态。如果消费者没有跟上,我希望能够阻止生产者。

I'd love to use Executors, but I feel like I'm fighting their design. Can I partially drink the Kool-ade, or do I have to gulp it? Am I being wrong-headed in resisting queueing tasks? (I suspect I could set up ThreadPoolExecutor to use a 1-task queue and override it's execute method to block rather than reject-on-queue-full, but that feels gross.)

我很想使用 Executors,但我觉得我正在与他们的设计作斗争。我可以部分饮用 Kool-ade,还是必须一口一口?我在抵制排队任务方面是错误的吗?(我怀疑我可以设置 ThreadPoolExecutor 以使用 1-task 队列并覆盖它的 execute 方法以阻止而不是拒绝队列满,但这感觉很糟糕。)

Suggestions?

建议?

回答by Kevin

I want the shared work queue to represent the current processing state.

我希望共享工作队列代表当前的处理状态。

Try using a shared BlockingQueueand have a pool of Worker threads taking work items off of the Queue.

尝试使用共享BlockingQueue并让一个工作线程池从队列中取出工作项。

I want to be able to block the producers if the consumers aren't keeping up.

如果消费者没有跟上,我希望能够阻止生产者。

Both ArrayBlockingQueueand LinkedBlockingQueuesupport bounded queues such that they will block on put when full. Using the blocking put()methods ensures that producers are blocked if the queue is full.

无论ArrayBlockingQueue的LinkedBlockingQueue支持有界队列,使得它们在放置块满时。使用阻塞put()方法可确保在队列已满时阻塞生产者。

Here is a rough start. You can tune the number of workers and queue size:

这是一个艰难的开始。您可以调整工作人员的数量和队列大小:

public class WorkerTest<T> {

    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;

    public WorkerTest(int numWorkers, int workQueueSize) {
        workQueue = new LinkedBlockingQueue<T>(workQueueSize);
        service = Executors.newFixedThreadPool(numWorkers);

        for (int i=0; i < numWorkers; i++) {
            service.submit(new Worker<T>(workQueue));
        }
    }

    public void produce(T item) {
        try {
            workQueue.put(item);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


    private static class Worker<T> implements Runnable {
        private final BlockingQueue<T> workQueue;

        public Worker(BlockingQueue<T> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    T item = workQueue.take();
                    // Process item
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

回答by Melissa

"find an available existing worker thread if one exists, create one if necessary, kill them if they go idle."

“找到一个可用的现有工作线程(如果存在),必要时创建一个,如果它们空闲则杀死它们。”

Managing all those worker states is as unnecessary as it is perilous. I would create one monitor thread that constantly runs in the background, who's only task is to fill up the queue and spawn consumers... why not make the worker threads daemonsso they die as soon as they complete? If you attach them all to one ThreadGroup you can dynamically re-size the pool... for example:

管理所有这些工人状态既不必要又危险。我会创建一个在后台不断运行的监视器线程,谁的唯一任务就是填满队列并产生消费者......为什么不让工作线程守护进程让它们在完成后立即死亡?如果您将它们全部附加到一个线程组,您可以动态地重新调整池的大小...例如:

  **for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
         spawnDaemonWorkers(queue.poll());
  }**

回答by D.Shawley

You could have your consumer execute Runnable::rundirectly instead of starting a new thread up. Combine this with a blocking queue with a maximum size and I think that you will get what you want. Your consumer becomes a worker that is executing tasks inline based on the work items on the queue. They will only dequeue items as fast as they process them so your producer when your consumers stop consuming.

您可以让您的消费者Runnable::run直接执行,而不是启动一个新线程。将此与具有最大大小的阻塞队列相结合,我认为您会得到您想要的。您的使用者将成为根据队列上的工作项内联执行任务的工作人员。当您的消费者停止消费时,他们只会在处理项目时尽快出列项目,以便您的生产者。