Wrapping an asynchronous computation into a synchronous (blocking) computation
similar questions:
- Pattern for wrapping an Asynchronous JavaScript function to make it synchronous
- Wrapping an asynchronous method synchronously in C#
I have an object with a method I would like to expose to library clients (especially scripting clients) as something like:
interface MyNiceInterface
{
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg);
public Future<Baz> doSomething(Foo fooArg, Bar barArg);
// doSomethingAndBlock is the straightforward way;
// doSomething has more control but deals with
// a Future and that might be too much hassle for
// scripting clients
}
but the primitive "stuff" I have available is a set of event-driven classes:
interface BazComputationSink
{
public void onBazResult(Baz result);
}
class ImplementingThing
{
public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink);
}
where ImplementingTh开发者_如何转开发ing takes inputs, does some arcane stuff like enqueueing things on a task queue, and then later when a result occurs, sink.onBazResult()
gets called on a thread that may or may not be the same thread as ImplementingThing.doSomethingAsync() was called.
Is there a way I can use the event-driven functions I have, along with concurrency primitives, to implement MyNiceInterface so scripting clients can happily wait on a blocking thread?
edit: can I use FutureTask for this?
Using your own Future implemenation:
public class BazComputationFuture implements Future<Baz>, BazComputationSink {
private volatile Baz result = null;
private volatile boolean cancelled = false;
private final CountDownLatch countDownLatch;
public BazComputationFuture() {
countDownLatch = new CountDownLatch(1);
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
if (isDone()) {
return false;
} else {
countDownLatch.countDown();
cancelled = true;
return !isDone();
}
}
@Override
public Baz get() throws InterruptedException, ExecutionException {
countDownLatch.await();
return result;
}
@Override
public Baz get(final long timeout, final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
countDownLatch.await(timeout, unit);
return result;
}
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean isDone() {
return countDownLatch.getCount() == 0;
}
public void onBazResult(final Baz result) {
this.result = result;
countDownLatch.countDown();
}
}
public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
BazComputationFuture future = new BazComputationFuture();
doSomethingAsync(fooArg, barArg, future);
return future;
}
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
return doSomething(fooArg, barArg).get();
}
The solution creates a CountDownLatch internally which is cleared once the callback is received. If the user calls get, the CountDownLatch is used to block the calling thread until the computation completes and call the onBazResult callback. The CountDownLatch will assure that if the callback occurs before get() is called the get() method will return immediately with a result.
Well, there is the simple solution of doing something like:
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
final AtomicReference<Baz> notifier = new AtomicReference();
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
public void onBazResult(Baz result) {
synchronized (notifier) {
notifier.set(result);
notifier.notify();
}
}
});
synchronized (notifier) {
while (notifier.get() == null)
notifier.wait();
}
return notifier.get();
}
Of course, this assumes that your Baz
result will never be null…
The google guava library has an easy to use SettableFuture that makes this problem very simple (around 10 lines of code).
public class ImplementingThing {
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
try {
return doSomething(fooArg, barArg).get();
} catch (Exception e) {
throw new RuntimeException("Oh dear");
}
};
public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
final SettableFuture<Baz> future = new SettableFuture<Baz>();
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
@Override
public void onBazResult(Baz result) {
future.set(result);
}
});
return future;
};
// Everything below here is just mock stuff to make the example work,
// so you can copy it into your IDE and see it run.
public static class Baz {}
public static class Foo {}
public static class Bar {}
public static interface BazComputationSink {
public void onBazResult(Baz result);
}
public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Baz baz = new Baz();
sink.onBazResult(baz);
}
}).start();
};
public static void main(String[] args) {
System.err.println("Starting Main");
System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null));
System.err.println("Ending Main");
}
This is dead simple with RxJava 2.x:
try {
Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
.toFuture().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
Or without Lambda notation:
Baz baz = Single.create(new SingleOnSubscribe<Baz>() {
@Override
public void subscribe(SingleEmitter<Baz> emitter) {
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
@Override
public void onBazResult(Baz result) {
emitter.onSuccess(result);
}
});
}
}).toFuture().get();
Even simpler:
Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
.blockingGet();
Kotlin Version:
val baz = Single.create<Baz> { emitter ->
doSomethingAsync(fooArg, barArg) { result -> emitter.onSuccess(result) }
}.blockingGet()
A very simple example, just to understand CountDownLatch without any extra code.
A java.util.concurrent.CountDownLatch
is a concurrency construct that allows one or more threads to wait for a given set of operations to complete.
A CountDownLatch
is initialized with a given count. This count is decremented by calls to the countDown()
method. Threads waiting for this count to reach zero can call one of the await()
methods. Calling await()
blocks the thread until the count reaches zero.
Below is a simple example. After the Decrementer has called countDown()
3 times on the CountDownLatch
, the waiting Waiter is released from the await()
call.
You can also mention some TimeOut
to await.
CountDownLatch latch = new CountDownLatch(3);
Waiter waiter = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);
new Thread(waiter) .start();
new Thread(decrementer).start();
Thread.sleep(4000);
public class Waiter implements Runnable{
CountDownLatch latch = null;
public Waiter(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter Released");
}
}
//--------------
public class Decrementer implements Runnable {
CountDownLatch latch = null;
public Decrementer(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
Thread.sleep(1000);
this.latch.countDown();
Thread.sleep(1000);
this.latch.countDown();
Thread.sleep(1000);
this.latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Reference
If you don't want to use a CountDownLatch
or your requirement is something same as Facebook like and unlike functionality. Means if one method is being called then don't call the other method.
In that case you can declare a
private volatile Boolean isInprocessOfLikeOrUnLike = false;
and then you can check in the beginning of your method call that if it is false
then call method otherwise return.. depends upon your implementation.
Here's a more generic solution based on Paul Wagland's answer:
public abstract class AsyncRunnable<T> {
protected abstract void run(AtomicReference<T> notifier);
protected final void finish(AtomicReference<T> notifier, T result) {
synchronized (notifier) {
notifier.set(result);
notifier.notify();
}
}
public static <T> T wait(AsyncRunnable<T> runnable) {
final AtomicReference<T> notifier = new AtomicReference<>();
// run the asynchronous code
runnable.run(notifier);
// wait for the asynchronous code to finish
synchronized (notifier) {
while (notifier.get() == null) {
try {
notifier.wait();
} catch (InterruptedException ignore) {}
}
}
// return the result of the asynchronous code
return notifier.get();
}
}
Here's an example how to use it::
String result = AsyncRunnable.wait(new AsyncRunnable<String>() {
@Override
public void run(final AtomicReference<String> notifier) {
// here goes your async code, e.g.:
new Thread(new Runnable() {
@Override
public void run() {
finish(notifier, "This was a asynchronous call!");
}
}).start();
}
});
A more verbose version of the code can be found here: http://pastebin.com/hKHJUBqE
EDIT: The example related to the question would be:
public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) {
return AsyncRunnable.wait(new AsyncRunnable<Baz>() {
@Override
protected void run(final AtomicReference<Baz> notifier) {
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
public void onBazResult(Baz result) {
synchronized (notifier) {
notifier.set(result);
notifier.notify();
}
}
});
}
});
}
The simplest way (which works for me) is to
- Create a blocking queue
- Call the asynchronous method - use a handler that offers the result to that blocking queue.
Poll the queue (that's where you block) for the result.
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) throws InterruptedException { final BlockingQueue<Baz> blocker = new LinkedBlockingQueue(); doSomethingAsync(fooArg, barArg, blocker::offer); // Now block until response or timeout return blocker.poll(30, TimeUnit.SECONDS); }
精彩评论