java 可以递增的锁存器

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/14255019/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-31 15:37:40  来源:igfitidea点击:

Latch that can be incremented

javaconcurrency

提问by Razvi

Does anyone know if there is any latch implementation that does the following:

有谁知道是否有执行以下操作的闩锁实现:

  • has a method to decrement the latch value, or wait if the value is zero
  • has a method for waiting for the latch value to be zero
  • has a method for adding a number to the latch's value
  • 有一个方法来递减锁存器的值,或者如果值为零则等待
  • 有一个等待锁存值为零的方法
  • 有一种将数字添加到锁存器值的方法

采纳答案by assylias

Instead of starting back from AQS, you could use a simple implementation like below. It is somewhat naive (it is synchronized vs. AQS lock-free algorithms) but unless you expect to use it in a contented scenario it could be good enough.

您可以使用如下简单的实现,而不是从 AQS 开始。它有点幼稚(它是同步算法与 AQS 无锁算法),但除非您希望在满足的场景中使用它,否则它可能已经足够好了。

public class CountUpAndDownLatch {
    private CountDownLatch latch;
    private final Object lock = new Object();

    public CountUpAndDownLatch(int count) {
        this.latch = new CountDownLatch(count);
    }

    public void countDownOrWaitIfZero() throws InterruptedException {
        synchronized(lock) {
            while(latch.getCount() == 0) {
                lock.wait();
            }
            latch.countDown();
            lock.notifyAll();
        }
    }

    public void waitUntilZero() throws InterruptedException {
        synchronized(lock) {
            while(latch.getCount() != 0) {
                lock.wait();
            }
        }
    }

    public void countUp() { //should probably check for Integer.MAX_VALUE
        synchronized(lock) {
            latch = new CountDownLatch((int) latch.getCount() + 1);
            lock.notifyAll();
        }
    }

    public int getCount() {
        synchronized(lock) {
            return (int) latch.getCount();
        }
    }
}

Note: I have not tested it in depth but it seems to behave as expected:

注意:我还没有对其进行深入测试,但它的行为似乎符合预期:

public static void main(String[] args) throws InterruptedException {
    final CountUpAndDownLatch latch = new CountUpAndDownLatch(1);
    Runnable up = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("IN UP " + latch.getCount());
                latch.countUp();
                System.out.println("UP " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };

    Runnable downOrWait = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("IN DOWN " + latch.getCount());
                latch.countDownOrWaitIfZero();
                System.out.println("DOWN " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };

    Runnable waitFor0 = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("WAIT FOR ZERO " + latch.getCount());
                latch.waitUntilZero();
                System.out.println("ZERO " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };
    new Thread(waitFor0).start();
    up.run();
    downOrWait.run();
    Thread.sleep(100);
    downOrWait.run();
    new Thread(up).start();
    downOrWait.run();
}

Output:

输出:

IN UP 1
UP 2
WAIT FOR ZERO 1
IN DOWN 2
DOWN 1
IN DOWN 1
ZERO 0
DOWN 0
IN DOWN 0
IN UP 0
DOWN 0
UP 0

回答by Michael-7

You could also use a Phaser (java.util.concurrent.Phaser)

您还可以使用Phaser (java.util.concurrent.Phaser)

final Phaser phaser = new Phaser(1); // register self
while (/* some condition */) {
    phaser.register(); // Equivalent to countUp
    // do some work asynchronously, invoking
    // phaser.arriveAndDeregister() (equiv to countDown) in a finally block
}
phaser.arriveAndAwaitAdvance(); // await any async tasks to complete

I hope this helps.

我希望这有帮助。

回答by Thilo

java.util.concurrent.Semaphoreseems to fit the bill.

java.util.concurrent.Semaphore似乎符合要求。

  • acquire() or acquire(n)
  • also acquire() (not sure I understand what the difference is here)(*)
  • release() or release(n)
  • 获得()或获得(n)
  • 也acquire()(不确定我明白这里的区别是什么)(*)
  • 释放()或释放(n)

(*) Okay, there is no built-in method to wait until the semaphore becomes unavailable. I suppose you'd write your own wrapper for acquirethat does a tryAcquirefirst and if that fails triggers your "busy event" (and continues using the normal acquire). Everyone would need to call your wrapper. Maybe subclass Semaphore?

(*) 好的,没有内置方法可以等待信号量变为不可用。我想你会写自己的包装器acquire,做了tryAcquire第一,如果失败将触发你的“忙事件”(与使用正常继续acquire)。每个人都需要调用您的包装器。也许子类信号量?

回答by Michael-7

For those needing an AQS based solution, here's one that worked for me:

对于那些需要基于 AQS 的解决方案的人,这里有一个对我有用的:

public class CountLatch {

    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }

        @Override
        protected int tryAcquireShared(int arg) {
            return count.get() == releaseValue ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            return true;
        }
    }

    private final Sync sync;
    private final AtomicLong count;
    private volatile long releaseValue;

    public CountLatch(final long initial, final long releaseValue) {
        this.releaseValue = releaseValue;
        this.count = new AtomicLong(initial);
        this.sync = new Sync();
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public long countUp() {
        final long current = count.incrementAndGet();
        if (current == releaseValue) {
            sync.releaseShared(0);
        }
        return current;
    }

    public long countDown() {
        final long current = count.decrementAndGet();
        if (current == releaseValue) {
            sync.releaseShared(0);
        }
        return current;
    }

    public long getCount() {
        return count.get();
    }
}

You initialize the synchronizer with an initial and target value. Once the target value has been reached (by counting up and / or down), the waiting threads will be released.

您使用初始值和目标值初始化同步器。一旦达到目标值(通过向上和/或向下计数),等待的线程将被释放。

回答by mike rodent

This is a variation on CounterLatch, available from the Apache site.

这是 的变体CounterLatch,可从 Apache 站点获得。

Their version, for reasons best known to themselves, blocks the caller thread whilethe variable (AtomicInteger) is at a given value.

由于他们自己最清楚的原因,他们的版本在变量 ( AtomicInteger) 处于给定值阻塞调用者线程。

But it is the height of easiness to tweak this code so that you can choose either just what the Apache version does, or... to say "wait here untilthe counter reaches a certain value". Arguably the latter is going to have more applicability. In my particular case I rustled this up because I wanted to check that all "chunks" had been published in SwingWorker.process()... but I have since found other uses for it.

但是调整此代码非常容易,这样您就可以选择 Apache 版本的功能,或者……说“在这里等待,直到计数器达到某个值”。可以说后者将具有更多的适用性。在我的特殊情况下,我把它弄得一团糟,因为我想检查所有“块”是否已发布在SwingWorker.process()......但我已经找到了它的其他用途。

Here it is written in Jython, officially the best language in the world (TM). I am going to rustle up a Java version in due course.

它是用 Jython 编写的,官方称其为世界上最好的语言 (TM)。我将在适当的时候推出 Java 版本。

class CounterLatch():
    def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
        self.count = java.util.concurrent.atomic.AtomicLong( initial )
        self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )

        class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
            def tryAcquireShared( sync_self, arg ):
                if lift_on_reached:
                    return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
                else:
                    return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
            def tryReleaseShared( self, args ):
                return True

        self.sync = Sync()
        self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False

    def await( self, *args ):
        if args:
            assert len( args ) == 2
            assert type( args[ 0 ] ) is int
            timeout = args[ 0 ]
            assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
            unit = args[ 1 ]
            return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
        else:
            self.sync.acquireSharedInterruptibly( 1 )

    def count_relative( self, n ):
        previous = self.count.addAndGet( n )
        if previous == self.signal.get():
            self.sync.releaseShared( 0 )
        return previous

NB the Apache version uses the keyword volatilefor signaland released. In Jython I don't think this exists as such, but using AtomicIntegerand AtomicBooleanshould ensure that no values are "out of date" in any thread.

注意 Apache 版本使用关键字volatileforsignalreleased。在 Jython 中,我不认为这样存在,但是使用AtomicInteger并且AtomicBoolean应该确保在任何线程中没有值是“过时的”。

Example usage:

用法示例:

In the SwingWorker constructor:

在 SwingWorker 构造函数中:

self.publication_counter_latch = CounterLatch() 

In SW.publish:

在 SW.publish 中:

# increase counter value BEFORE publishing chunks
self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )

In SW.process:

在 SW.process 中:

# ... do sthg [HERE] with the chunks!
# AFTER having done what you want to do with your chunks:
self.publication_counter_latch.count_relative( - len( chunks ) )

In the thread waiting for chunk processing to stop:

在等待块处理停止的线程中:

worker.publication_counter_latch.await()

回答by Guido Medina

I needed one and built it using the same strategy as CountDownLatch which uses AQS (non-blocking), this class is also very similar (If not exact) to one created for Apache Camel, I think it is also lighter than JDK Phaser, this will act just like CountDownLact from JDK, it won't let you count down below zero and will allow you count down and up:

我需要一个并使用与使用 AQS(非阻塞)的 CountDownLatch 相同的策略构建它,这个类也与为 Apache Camel 创建的类非常相似(如果不准确的话),我认为它也比 JDK Phaser 轻,这个就像 JDK 中的 CountDownLact 一样,它不会让你倒数到零以下,而是允许你倒数和倒数:

import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer;

导入 java.util.concurrent.TimeUnit; 导入 java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class CountingLatch
{
  /**
   * Synchronization control for CountingLatch.
   * Uses AQS state to represent count.
   */
  private static final class Sync extends AbstractQueuedSynchronizer
  {
    private Sync()
    {
    }

    private Sync(final int initialState)
    {
      setState(initialState);
    }

    int getCount()
    {
      return getState();
    }

    protected int tryAcquireShared(final int acquires)
    {
      return getState()==0 ? 1 : -1;
    }

    protected boolean tryReleaseShared(final int delta)
    {
      // Decrement count; signal when transition to zero
      for(; ; ){
        final int c=getState();
        final int nextc=c+delta;
        if(nextc<0){
          return false;
        }
        if(compareAndSetState(c,nextc)){
          return nextc==0;
        }
      }
    }
  }

  private final Sync sync;

  public CountingLatch()
  {
    sync=new Sync();
  }

  public CountingLatch(final int initialCount)
  {
    sync=new Sync(initialCount);
  }

  public void increment()
  {
    sync.releaseShared(1);
  }

  public int getCount()
  {
    return sync.getCount();
  }

  public void decrement()
  {
    sync.releaseShared(-1);
  }

  public void await() throws InterruptedException
  {
    sync.acquireSharedInterruptibly(1);
  }

  public boolean await(final long timeout) throws InterruptedException
  {
    return sync.tryAcquireSharedNanos(1,TimeUnit.MILLISECONDS.toNanos(timeout));
  }
}

回答by broc.seib

It seems a CountDownLatchwill do as you wish:

似乎CountDownLatch可以随心所欲:

A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

CountDownLatch 使用给定的计数进行初始化。由于调用了 countDown() 方法, await 方法会阻塞直到当前计数达到零,之后所有等待线程都被释放并且任何后续的 await 调用都会立即返回。这是一种一次性现象——无法重置计数。如果您需要重置计数的版本,请考虑使用 CyclicBarrier。

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html