具有节流/吞吐量控制的 Java Executor

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/19819837/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 20:27:42  来源:igfitidea点击:

Java Executor with throttling/throughput control

javamultithreadingexecutors

提问by mrip

I'm looking for a Java Executor that allows me to specify throttling/throughput/pacing limitations, for example, no more than say 100 tasks can be processed in a second -- if more tasks get submitted they should get queued and executed later. The main purpose of this is to avoid running into limits when hitting foreign APIs or servers.

我正在寻找一个 Java Executor,它允许我指定节流/吞吐量/步调限制,例如,一秒钟内最多可以处理 100 个任务——如果提交更多任务,它们应该排队并稍后执行。这样做的主要目的是避免在访问外部 API 或服务器时遇到限制。

I'm wondering whether either base Java (which I doubt, because I checked) or somewhere else reliable (e.g. Apache Commons) provides this, or if I have to write my own. Preferably something lightweight. I don't mind writing it myself, but if there's a "standard" version out there somewhere I'd at least like to look at it first.

我想知道是基本 Java(我怀疑,因为我检查过)还是其他可靠的地方(例如 Apache Commons)提供了这个,或者我是否必须自己编写。最好是轻量级的。我不介意自己写,但如果有一个“标准”版本,我至少想先看看它。

采纳答案by Ortwin Angermeier

Take a look at guavas RateLimiter:

看看番石榴RateLimiter

A rate limiter. Conceptually, a rate limiter distributes permits at a configurable rate. Each acquire() blocks if necessary until a permit is available, and then takes it. Once acquired, permits need not be released. Rate limiters are often used to restrict the rate at which some physical or logical resource is accessed. This is in contrast to Semaphore which restricts the number of concurrent accesses instead of the rate (note though that concurrency and rate are closely related, e.g. see Little's Law).

速率限制器。从概念上讲,速率限制器以可配置的速率分发许可证。如有必要,每个acquire() 都会阻塞,直到获得许可为止,然后获取它。获得许可后,无需发放许可证。速率限制器通常用于限制访问某些物理或逻辑资源的速率。这与限制并发访问数量而不是速率的信号量形成对比(请注意,尽管并发性和速率密切相关,例如参见利特尔定律)。

Its threadsafe, but still @Beta. Might be worth a try anyway.

它的线程安全,但仍然是@Beta. 无论如何可能值得一试。

You would have to wrap each call to the Executorwith respect to the rate limiter. For a more clean solution you could create some kind of wrapper for the ExecutorService.

您必须Executor根据速率限制器包装每个对 的调用。对于更干净的解决方案,您可以为ExecutorService.

From the javadoc:

从javadoc:

 final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
  void submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
      rateLimiter.acquire(); // may wait
      executor.execute(task);
    }
  }

回答by Little Child

no more than say 100 tasks can be processed in a second -- if more tasks get submitted they should get queued and executed later

一秒钟内最多只能处理 100 个任务——如果提交了更多任务,它们应该排入队列并稍后执行

You need to look into Executors.newFixedThreadPool(int limit). This will allow you to limit the number of threads that can be executed simultaneously. If you submit more than one thread, they will be queued and executed later.

你需要研究一下Executors.newFixedThreadPool(int limit)。这将允许您限制可以同时执行的线程数。如果您提交多个线程,它们将被排队并稍后执行。

ExecutorService threadPool = Executors.newFixedThreadPool(100);
Future<?> result1 =  threadPool.submit(runnable1);
Future<?> result2 = threadPool.submit(runnable2);
Futurte<SomeClass> result3 = threadPool.submit(callable1);  
...  

Snippet above shows how you would work with an ExecutorServicethat allows no more than 100 threads to be executed simultaneously.

上面的片段显示了如何使用ExecutorService允许同时执行不超过 100 个线程的线程。

Update:
After going over the comments, here is what I have come up with (kinda stupid). How about manually keeping a track of threads that are to be executed ? How about storing them first in an ArrayListand then submitting them to the Executorbased on how many threads have already been executed in the last one second.
So, lets say 200 tasks have been submitted into our maintained ArrayList, We can iterate and add 100 to the Executor. When a second passes, we can add few more threads based on how many have completed in theExecutorand so on

更新:
看完评论后,这是我想出的(有点愚蠢)。如何手动跟踪要执行的线程?如何首先将它们存储在 an 中ArrayList,然后Executor根据最后一秒内已执行的线程数将它们提交给。
因此,假设 200 个任务已提交到我们的维护中ArrayList,我们可以迭代并将 100 个添加到Executor. 当一秒钟过去后,我们可以根据已完成的线程数添加更多线程Executor,依此类推

回答by TwoThe

The Java Executor doesn't offer such a limitation, only limitation by amount of threads, which is not what you are looking for.

Java Executor 没有提供这样的限制,只有线程数量的限制,这不是你想要的。

In general the Executor is the wrong place to limit such actions anyway, it should be at the moment where the Thread tries to call the outside server. You can do this for example by having a limiting Semaphorethat threads wait on before they submit their requests.

一般来说,无论如何,Executor 是限制此类操作的错误位置,它应该在 Thread 尝试调用外部服务器的那一刻。例如,您可以通过让线程在提交请求之前等待的限制信号量来做到这一点。

Calling Thread:

调用线程:

public void run() {
  // ...
  requestLimiter.acquire();
  connection.send();
  // ...
 }

While at the same time you schedule a (single) secondary thread to periodically (like every 60 seconds) releases acquired resources:

同时您安排一个(单个)辅助线程定期(如每 60 秒)释放获取的资源:

 public void run() {
  // ...
  requestLimiter.drainPermits();  // make sure not more than max are released by draining the Semaphore empty
  requestLimiter.release(MAX_NUM_REQUESTS);
  // ...
 }

回答by Jaime Casero

Personally I found this scenario quite interesting. In my case, I wanted to stress that the interesting phase to throttle is the consuming side one, as in classical Producer/Consumer concurrent theory. That's the opposite of some of the suggested answers before. This is, we don't want to block the submitting thread, but block the consuming threads based in a rate (tasks/second) policy. So, even if there are tasks ready in the queue, executing/consuming Threads may block waiting to meet the throtle policy.

我个人觉得这个场景很有趣。就我而言,我想强调的是,有趣的节流阶段是消费方,就像在经典的生产者/消费者并发理论中一样。这与之前的一些建议答案相反。也就是说,我们不想阻塞提交线程,而是基于速率(任务/秒)策略阻塞消费线程。因此,即使队列中有准备好的任务,执行/消耗线程可能会阻塞等待满足节流策略。

That said, I think a good candidate would be the Executors.newScheduledThreadPool(int corePoolSize). This way you would need a simple queue in front of the executor (a simple LinkedBlockingQueue would suit), and then schedule a periodic task to pick actual tasks from the queue (ScheduledExecutorService.scheduleAtFixedRate). So, is not an straightforward solution, but it should perform goog enough if you try to throttle the consumers as discussed before.

也就是说,我认为一个很好的候选者是 Executors.newScheduledThreadPool(int corePoolSize)。通过这种方式,您将需要在执行程序前面有一个简单的队列(一个简单的 LinkedBlockingQueue 适合),然后安排一个周期性任务从队列中挑选实际任务(ScheduledExecutorService.scheduleAtFixedRate)。因此,这不是一个简单的解决方案,但如果您像之前讨论的那样尝试限制消费者,它应该足够执行 goog。

回答by Joseph M. Dion

Depending on the scenario, and as suggested in one of the previous responses, the basic functionalities of a ThreadPoolExecutor may do the trick.

根据场景,并且如之前的一个响应中所建议的, ThreadPoolExecutor 的基本功能可能会起作用。

But if the threadpool is shared by multiple clients and you want to throttle, to restrict the usage of each one of them, making sure that one client won't use all the threads, then a BoundedExecutor will do the work.

但是,如果线程池由多个客户端共享,并且您想要节流,以限制每个客户端的使用,确保一个客户端不会使用所有线程,那么 BoundedExecutor 将完成这项工作。

More details can be found in the following example:

可以在以下示例中找到更多详细信息:

http://jcip.net/listings/BoundedExecutor.java

http://jcip.net/listings/BoundedExecutor.java

回答by benbai123

Can limit it inside Runnable:

可以将其限制在 Runnable 内:

public static Runnable throttle (Runnable realRunner, long delay) {
    Runnable throttleRunner = new Runnable() {
        // whether is waiting to run
        private boolean _isWaiting = false;
        // target time to run realRunner
        private long _timeToRun;
        // specified delay time to wait
        private long _delay = delay;
        // Runnable that has the real task to run
        private Runnable _realRunner = realRunner;
        @Override
        public void run() {
            // current time
            long now;
            synchronized (this) {
                // another thread is waiting, skip
                if (_isWaiting) return;
                now = System.currentTimeMillis();
                // update time to run
                // do not update it each time since
                // you do not want to postpone it unlimited
                _timeToRun = now+_delay;
                // set waiting status
                _isWaiting = true;
            }
            try {
                Thread.sleep(_timeToRun-now);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // clear waiting status before run
                _isWaiting = false;
                // do the real task
                _realRunner.run();
            }
        }};
    return throttleRunner;
}

Take from JAVA Thread Debounce and Throttle

取自JAVA Thread Debounce 和 Throttle