java 可以递增的锁存器
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/14255019/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Latch that can be incremented
提问by Razvi
Does anyone know if there is any latch implementation that does the following:
有谁知道是否有执行以下操作的闩锁实现:
- has a method to decrement the latch value, or wait if the value is zero
- has a method for waiting for the latch value to be zero
- has a method for adding a number to the latch's value
- 有一个方法来递减锁存器的值,或者如果值为零则等待
- 有一个等待锁存值为零的方法
- 有一种将数字添加到锁存器值的方法
采纳答案by assylias
Instead of starting back from AQS, you could use a simple implementation like below. It is somewhat naive (it is synchronized vs. AQS lock-free algorithms) but unless you expect to use it in a contented scenario it could be good enough.
您可以使用如下简单的实现,而不是从 AQS 开始。它有点幼稚(它是同步算法与 AQS 无锁算法),但除非您希望在满足的场景中使用它,否则它可能已经足够好了。
public class CountUpAndDownLatch {
private CountDownLatch latch;
private final Object lock = new Object();
public CountUpAndDownLatch(int count) {
this.latch = new CountDownLatch(count);
}
public void countDownOrWaitIfZero() throws InterruptedException {
synchronized(lock) {
while(latch.getCount() == 0) {
lock.wait();
}
latch.countDown();
lock.notifyAll();
}
}
public void waitUntilZero() throws InterruptedException {
synchronized(lock) {
while(latch.getCount() != 0) {
lock.wait();
}
}
}
public void countUp() { //should probably check for Integer.MAX_VALUE
synchronized(lock) {
latch = new CountDownLatch((int) latch.getCount() + 1);
lock.notifyAll();
}
}
public int getCount() {
synchronized(lock) {
return (int) latch.getCount();
}
}
}
Note: I have not tested it in depth but it seems to behave as expected:
注意:我还没有对其进行深入测试,但它的行为似乎符合预期:
public static void main(String[] args) throws InterruptedException {
final CountUpAndDownLatch latch = new CountUpAndDownLatch(1);
Runnable up = new Runnable() {
@Override
public void run() {
try {
System.out.println("IN UP " + latch.getCount());
latch.countUp();
System.out.println("UP " + latch.getCount());
} catch (InterruptedException ex) {
}
}
};
Runnable downOrWait = new Runnable() {
@Override
public void run() {
try {
System.out.println("IN DOWN " + latch.getCount());
latch.countDownOrWaitIfZero();
System.out.println("DOWN " + latch.getCount());
} catch (InterruptedException ex) {
}
}
};
Runnable waitFor0 = new Runnable() {
@Override
public void run() {
try {
System.out.println("WAIT FOR ZERO " + latch.getCount());
latch.waitUntilZero();
System.out.println("ZERO " + latch.getCount());
} catch (InterruptedException ex) {
}
}
};
new Thread(waitFor0).start();
up.run();
downOrWait.run();
Thread.sleep(100);
downOrWait.run();
new Thread(up).start();
downOrWait.run();
}
Output:
输出:
IN UP 1
UP 2
WAIT FOR ZERO 1
IN DOWN 2
DOWN 1
IN DOWN 1
ZERO 0
DOWN 0
IN DOWN 0
IN UP 0
DOWN 0
UP 0
回答by Michael-7
You could also use a Phaser (java.util.concurrent.Phaser)
您还可以使用Phaser (java.util.concurrent.Phaser)
final Phaser phaser = new Phaser(1); // register self
while (/* some condition */) {
phaser.register(); // Equivalent to countUp
// do some work asynchronously, invoking
// phaser.arriveAndDeregister() (equiv to countDown) in a finally block
}
phaser.arriveAndAwaitAdvance(); // await any async tasks to complete
I hope this helps.
我希望这有帮助。
回答by Thilo
java.util.concurrent.Semaphoreseems to fit the bill.
java.util.concurrent.Semaphore似乎符合要求。
- acquire() or acquire(n)
also acquire() (not sure I understand what the difference is here)(*)- release() or release(n)
- 获得()或获得(n)
也acquire()(不确定我明白这里的区别是什么)(*)- 释放()或释放(n)
(*) Okay, there is no built-in method to wait until the semaphore becomes unavailable. I suppose you'd write your own wrapper for acquire
that does a tryAcquire
first and if that fails triggers your "busy event" (and continues using the normal acquire
). Everyone would need to call your wrapper. Maybe subclass Semaphore?
(*) 好的,没有内置方法可以等待信号量变为不可用。我想你会写自己的包装器acquire
,做了tryAcquire
第一,如果失败将触发你的“忙事件”(与使用正常继续acquire
)。每个人都需要调用您的包装器。也许子类信号量?
回答by Michael-7
For those needing an AQS based solution, here's one that worked for me:
对于那些需要基于 AQS 的解决方案的人,这里有一个对我有用的:
public class CountLatch {
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
protected int tryAcquireShared(int arg) {
return count.get() == releaseValue ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
return true;
}
}
private final Sync sync;
private final AtomicLong count;
private volatile long releaseValue;
public CountLatch(final long initial, final long releaseValue) {
this.releaseValue = releaseValue;
this.count = new AtomicLong(initial);
this.sync = new Sync();
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public long countUp() {
final long current = count.incrementAndGet();
if (current == releaseValue) {
sync.releaseShared(0);
}
return current;
}
public long countDown() {
final long current = count.decrementAndGet();
if (current == releaseValue) {
sync.releaseShared(0);
}
return current;
}
public long getCount() {
return count.get();
}
}
You initialize the synchronizer with an initial and target value. Once the target value has been reached (by counting up and / or down), the waiting threads will be released.
您使用初始值和目标值初始化同步器。一旦达到目标值(通过向上和/或向下计数),等待的线程将被释放。
回答by mike rodent
This is a variation on CounterLatch
, available from the Apache site.
这是 的变体CounterLatch
,可从 Apache 站点获得。
Their version, for reasons best known to themselves, blocks the caller thread whilethe variable (AtomicInteger
) is at a given value.
由于他们自己最清楚的原因,他们的版本会在变量 ( AtomicInteger
) 处于给定值时阻塞调用者线程。
But it is the height of easiness to tweak this code so that you can choose either just what the Apache version does, or... to say "wait here untilthe counter reaches a certain value". Arguably the latter is going to have more applicability. In my particular case I rustled this up because I wanted to check that all "chunks" had been published in SwingWorker.process()
... but I have since found other uses for it.
但是调整此代码非常容易,这样您就可以选择 Apache 版本的功能,或者……说“在这里等待,直到计数器达到某个值”。可以说后者将具有更多的适用性。在我的特殊情况下,我把它弄得一团糟,因为我想检查所有“块”是否已发布在SwingWorker.process()
......但我已经找到了它的其他用途。
Here it is written in Jython, officially the best language in the world (TM). I am going to rustle up a Java version in due course.
它是用 Jython 编写的,官方称其为世界上最好的语言 (TM)。我将在适当的时候推出 Java 版本。
class CounterLatch():
def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
self.count = java.util.concurrent.atomic.AtomicLong( initial )
self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )
class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
def tryAcquireShared( sync_self, arg ):
if lift_on_reached:
return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
else:
return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
def tryReleaseShared( self, args ):
return True
self.sync = Sync()
self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False
def await( self, *args ):
if args:
assert len( args ) == 2
assert type( args[ 0 ] ) is int
timeout = args[ 0 ]
assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
unit = args[ 1 ]
return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
else:
self.sync.acquireSharedInterruptibly( 1 )
def count_relative( self, n ):
previous = self.count.addAndGet( n )
if previous == self.signal.get():
self.sync.releaseShared( 0 )
return previous
NB the Apache version uses the keyword volatile
for signal
and released
. In Jython I don't think this exists as such, but using AtomicInteger
and AtomicBoolean
should ensure that no values are "out of date" in any thread.
注意 Apache 版本使用关键字volatile
forsignal
和released
。在 Jython 中,我不认为这样存在,但是使用AtomicInteger
并且AtomicBoolean
应该确保在任何线程中没有值是“过时的”。
Example usage:
用法示例:
In the SwingWorker constructor:
在 SwingWorker 构造函数中:
self.publication_counter_latch = CounterLatch()
In SW.publish:
在 SW.publish 中:
# increase counter value BEFORE publishing chunks
self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )
In SW.process:
在 SW.process 中:
# ... do sthg [HERE] with the chunks!
# AFTER having done what you want to do with your chunks:
self.publication_counter_latch.count_relative( - len( chunks ) )
In the thread waiting for chunk processing to stop:
在等待块处理停止的线程中:
worker.publication_counter_latch.await()
回答by Guido Medina
I needed one and built it using the same strategy as CountDownLatch which uses AQS (non-blocking), this class is also very similar (If not exact) to one created for Apache Camel, I think it is also lighter than JDK Phaser, this will act just like CountDownLact from JDK, it won't let you count down below zero and will allow you count down and up:
我需要一个并使用与使用 AQS(非阻塞)的 CountDownLatch 相同的策略构建它,这个类也与为 Apache Camel 创建的类非常相似(如果不准确的话),我认为它也比 JDK Phaser 轻,这个就像 JDK 中的 CountDownLact 一样,它不会让你倒数到零以下,而是允许你倒数和倒数:
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer;
导入 java.util.concurrent.TimeUnit; 导入 java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class CountingLatch
{
/**
* Synchronization control for CountingLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer
{
private Sync()
{
}
private Sync(final int initialState)
{
setState(initialState);
}
int getCount()
{
return getState();
}
protected int tryAcquireShared(final int acquires)
{
return getState()==0 ? 1 : -1;
}
protected boolean tryReleaseShared(final int delta)
{
// Decrement count; signal when transition to zero
for(; ; ){
final int c=getState();
final int nextc=c+delta;
if(nextc<0){
return false;
}
if(compareAndSetState(c,nextc)){
return nextc==0;
}
}
}
}
private final Sync sync;
public CountingLatch()
{
sync=new Sync();
}
public CountingLatch(final int initialCount)
{
sync=new Sync(initialCount);
}
public void increment()
{
sync.releaseShared(1);
}
public int getCount()
{
return sync.getCount();
}
public void decrement()
{
sync.releaseShared(-1);
}
public void await() throws InterruptedException
{
sync.acquireSharedInterruptibly(1);
}
public boolean await(final long timeout) throws InterruptedException
{
return sync.tryAcquireSharedNanos(1,TimeUnit.MILLISECONDS.toNanos(timeout));
}
}
回答by broc.seib
It seems a CountDownLatch
will do as you wish:
似乎CountDownLatch
可以随心所欲:
A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.
CountDownLatch 使用给定的计数进行初始化。由于调用了 countDown() 方法, await 方法会阻塞直到当前计数达到零,之后所有等待线程都被释放并且任何后续的 await 调用都会立即返回。这是一种一次性现象——无法重置计数。如果您需要重置计数的版本,请考虑使用 CyclicBarrier。
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html