Java 将异步计算包装成同步(阻塞)计算
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/2180419/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Wrapping an asynchronous computation into a synchronous (blocking) computation
提问by Jason S
similar questions:
类似问题:
- Pattern for wrapping an Asynchronous JavaScript function to make it synchronous
- Wrapping an asynchronous method synchronously in C#
I have an object with a method I would like to expose to library clients (especially scripting clients) as something like:
我有一个对象,我想向库客户端(尤其是脚本客户端)公开一个方法,例如:
interface MyNiceInterface
{
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg);
public Future<Baz> doSomething(Foo fooArg, Bar barArg);
// doSomethingAndBlock is the straightforward way;
// doSomething has more control but deals with
// a Future and that might be too much hassle for
// scripting clients
}
but the primitive "stuff" I have available is a set of event-driven classes:
但我可用的原始“东西”是一组事件驱动类:
interface BazComputationSink
{
public void onBazResult(Baz result);
}
class ImplementingThing
{
public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink);
}
where ImplementingThing takes inputs, does some arcane stuff like enqueueing things on a task queue, and then later when a result occurs, sink.onBazResult()
gets called on a thread that may or may not be the same thread as ImplementingThing.doSomethingAsync() was called.
其中,ImplementingThing 接受输入,做一些神秘的事情,比如在任务队列上排队,然后当结果发生时,sink.onBazResult()
在一个线程上被调用,该线程可能与调用ImplementingThing.doSomethingAsync() 的线程相同,也可能不同。
Is there a way I can use the event-driven functions I have, along with concurrency primitives, to implement MyNiceInterface so scripting clients can happily wait on a blocking thread?
有没有一种方法可以使用我拥有的事件驱动函数以及并发原语来实现 MyNiceInterface 以便脚本客户端可以愉快地等待阻塞线程?
edit:can I use FutureTaskfor this?
编辑:我可以为此使用FutureTask吗?
采纳答案by Michael Barker
Using your own Future implemenation:
使用您自己的 Future 实现:
public class BazComputationFuture implements Future<Baz>, BazComputationSink {
private volatile Baz result = null;
private volatile boolean cancelled = false;
private final CountDownLatch countDownLatch;
public BazComputationFuture() {
countDownLatch = new CountDownLatch(1);
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
if (isDone()) {
return false;
} else {
countDownLatch.countDown();
cancelled = true;
return !isDone();
}
}
@Override
public Baz get() throws InterruptedException, ExecutionException {
countDownLatch.await();
return result;
}
@Override
public Baz get(final long timeout, final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
countDownLatch.await(timeout, unit);
return result;
}
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean isDone() {
return countDownLatch.getCount() == 0;
}
public void onBazResult(final Baz result) {
this.result = result;
countDownLatch.countDown();
}
}
public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
BazComputationFuture future = new BazComputationFuture();
doSomethingAsync(fooArg, barArg, future);
return future;
}
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
return doSomething(fooArg, barArg).get();
}
The solution creates a CountDownLatch internally which is cleared once the callback is received. If the user calls get, the CountDownLatch is used to block the calling thread until the computation completes and call the onBazResult callback. The CountDownLatch will assure that if the callback occurs before get() is called the get() method will return immediately with a result.
该解决方案在内部创建了一个 CountDownLatch,一旦收到回调就会清除它。如果用户调用 get,则 CountDownLatch 用于阻塞调用线程,直到计算完成并调用 onBazResult 回调。CountDownLatch 将确保如果在调用 get() 之前发生回调,则 get() 方法将立即返回结果。
回答by Paul Wagland
Well, there is the simple solution of doing something like:
好吧,有一个简单的解决方案,可以执行以下操作:
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
final AtomicReference<Baz> notifier = new AtomicReference();
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
public void onBazResult(Baz result) {
synchronized (notifier) {
notifier.set(result);
notifier.notify();
}
}
});
synchronized (notifier) {
while (notifier.get() == null)
notifier.wait();
}
return notifier.get();
}
Of course, this assumes that your Baz
result will never be null…
当然,这假设您的Baz
结果永远不会为空……
回答by kybernetikos
The google guava libraryhas an easy to use SettableFuture that makes this problem very simple (around 10 lines of code).
google guava 库有一个易于使用的 SettableFuture,可以让这个问题变得非常简单(大约 10 行代码)。
public class ImplementingThing {
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
try {
return doSomething(fooArg, barArg).get();
} catch (Exception e) {
throw new RuntimeException("Oh dear");
}
};
public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
final SettableFuture<Baz> future = new SettableFuture<Baz>();
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
@Override
public void onBazResult(Baz result) {
future.set(result);
}
});
return future;
};
// Everything below here is just mock stuff to make the example work,
// so you can copy it into your IDE and see it run.
public static class Baz {}
public static class Foo {}
public static class Bar {}
public static interface BazComputationSink {
public void onBazResult(Baz result);
}
public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Baz baz = new Baz();
sink.onBazResult(baz);
}
}).start();
};
public static void main(String[] args) {
System.err.println("Starting Main");
System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null));
System.err.println("Ending Main");
}
回答by AZ_
A very simple example, just to understand CountDownLatchwithout any extra code.
一个非常简单的例子,只是为了理解CountDownLatch,没有任何额外的代码。
A java.util.concurrent.CountDownLatch
is a concurrency construct that allows one or more threads to wait for a given set of operations to complete.
Ajava.util.concurrent.CountDownLatch
是一种并发构造,它允许一个或多个线程等待一组给定的操作完成。
A CountDownLatch
is initialized with a given count. This count is decremented by calls to the countDown()
method. Threads waiting for this count to reach zero can call one of the await()
methods. Calling await()
blocks the thread until the count reaches zero.
ACountDownLatch
用给定的计数初始化。该计数通过调用该countDown()
方法而递减。等待此计数达到零的线程可以调用其中一种await()
方法。调用await()
阻塞线程,直到计数达到零。
Below is a simple example. After the Decrementer has called countDown()
3 times on the CountDownLatch
, the waiting Waiter is released from the await()
call.
下面是一个简单的例子。Decrementer 在 上调用countDown()
3 次后CountDownLatch
,等待的 Waiter 从await()
调用中释放。
You can also mention some TimeOut
to await.
你也可以提到一些TimeOut
等待。
CountDownLatch latch = new CountDownLatch(3);
Waiter waiter = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);
new Thread(waiter) .start();
new Thread(decrementer).start();
Thread.sleep(4000);
public class Waiter implements Runnable{
CountDownLatch latch = null;
public Waiter(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter Released");
}
}
//--------------
//--------------
public class Decrementer implements Runnable {
CountDownLatch latch = null;
public Decrementer(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
Thread.sleep(1000);
this.latch.countDown();
Thread.sleep(1000);
this.latch.countDown();
Thread.sleep(1000);
this.latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
If you don't want to use a CountDownLatch
or your requirement is something same as Facebook like and unlike functionality. Means if one method is being called then don't call the other method.
如果您不想使用,CountDownLatch
或者您的要求与 Facebook 的喜欢和不同的功能相同。意味着如果正在调用一种方法,则不要调用另一种方法。
In that case you can declare a
在这种情况下,您可以声明一个
private volatile Boolean isInprocessOfLikeOrUnLike = false;
and then you can check in the beginning of your method call that if it is false
then call method otherwise return.. depends upon your implementation.
然后您可以检查方法调用的开头,如果是,false
则调用方法否则返回..取决于您的实现。
回答by Emanuel Moecklin
Here's a more generic solution based on Paul Wagland's answer:
这是基于 Paul Wagland 的回答的更通用的解决方案:
public abstract class AsyncRunnable<T> {
protected abstract void run(AtomicReference<T> notifier);
protected final void finish(AtomicReference<T> notifier, T result) {
synchronized (notifier) {
notifier.set(result);
notifier.notify();
}
}
public static <T> T wait(AsyncRunnable<T> runnable) {
final AtomicReference<T> notifier = new AtomicReference<>();
// run the asynchronous code
runnable.run(notifier);
// wait for the asynchronous code to finish
synchronized (notifier) {
while (notifier.get() == null) {
try {
notifier.wait();
} catch (InterruptedException ignore) {}
}
}
// return the result of the asynchronous code
return notifier.get();
}
}
Here's an example how to use it::
这是一个如何使用它的示例:
String result = AsyncRunnable.wait(new AsyncRunnable<String>() {
@Override
public void run(final AtomicReference<String> notifier) {
// here goes your async code, e.g.:
new Thread(new Runnable() {
@Override
public void run() {
finish(notifier, "This was a asynchronous call!");
}
}).start();
}
});
A more verbose version of the code can be found here: http://pastebin.com/hKHJUBqE
可以在此处找到更详细的代码版本:http: //pastebin.com/hKHJUBqE
EDIT: The example related to the question would be:
编辑:与问题相关的示例是:
public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) {
return AsyncRunnable.wait(new AsyncRunnable<Baz>() {
@Override
protected void run(final AtomicReference<Baz> notifier) {
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
public void onBazResult(Baz result) {
synchronized (notifier) {
notifier.set(result);
notifier.notify();
}
}
});
}
});
}
回答by Emanuel Moecklin
This is dead simple with RxJava 2.x:
这对于 RxJava 2.x 来说非常简单:
try {
Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
.toFuture().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
Or without Lambda notation:
或者不使用 Lambda 符号:
Baz baz = Single.create(new SingleOnSubscribe<Baz>() {
@Override
public void subscribe(SingleEmitter<Baz> emitter) {
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
@Override
public void onBazResult(Baz result) {
emitter.onSuccess(result);
}
});
}
}).toFuture().get();
Even simpler:
更简单:
Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
.blockingGet();
回答by Joel Shemtov
The simplest way (which works for me) is to
最简单的方法(对我有用)是
- Create a blocking queue
- Call the asynchronous method - use a handler that offers the result to that blocking queue.
Poll the queue (that's where you block) for the result.
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) throws InterruptedException { final BlockingQueue<Baz> blocker = new LinkedBlockingQueue(); doSomethingAsync(fooArg, barArg, blocker::offer); // Now block until response or timeout return blocker.poll(30, TimeUnit.SECONDS); }
- 创建阻塞队列
- 调用异步方法 - 使用将结果提供给阻塞队列的处理程序。
轮询队列(即您阻塞的位置)以获取结果。
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) throws InterruptedException { final BlockingQueue<Baz> blocker = new LinkedBlockingQueue(); doSomethingAsync(fooArg, barArg, blocker::offer); // Now block until response or timeout return blocker.poll(30, TimeUnit.SECONDS); }