开发者

How to synchronize/lock correctly when using CountDownLatch

It boils down to one thread submitting job via some service. Job is executed in some TPExecutor. Afterwards this service checks for results and throw exception in original thread under certain conditions (job exceeds maximum number of retries, etc.). Code snippet below roughly illustrate this scenario in legacy code:

import java.util.concurrent.CountDownLatch;

public class IncorrectLockingExample {

private static class Request {

    private final CountDownLatch latch = new CountDownLatch(1);

    private Throwable throwable;

    public void await() {
        try {
            latch.await();
        } catch (InterruptedException ignoredForDemoPurposes) {
        }
    }

    publ开发者_如何学Pythonic void countDown() {
        latch.countDown();
    }

    public Throwable getThrowable() {
        return throwable;
    }

    public void setThrowable(Throwable throwable) {
        this.throwable = throwable;
    }

}

private static final Request wrapper = new Request();

public static void main(String[] args) throws InterruptedException {

    final Thread blockedThread = new Thread() {
        public void run() {
            wrapper.await();
            synchronized (wrapper) {
                if (wrapper.getThrowable() != null)
                    throw new RuntimeException(wrapper.getThrowable());
            }
        }
    };

    final Thread workingThread = new Thread() {
        public void run() {
            wrapper.setThrowable(new RuntimeException());
            wrapper.countDown();

        }
    };

    blockedThread.start();
    workingThread.start();

    blockedThread.join();
    workingThread.join();
}

}

Sometimes, (not reproducible on my box, but happens on 16 core server box) exception isn't getting reported to original thread. I think this is because happens-before is not forced(eg. 'countDown' happens before 'setThrowable') and program continues to work(but should fail). I would appreciate any help about how to resolve this case. Constraints are: release in a week, minimum impact on existing codebase is needed.


The code above (as now updated) should work as you expected without the use of further synchronisation mechanisms. The memory barrier and its corresponding 'happens-before' relationship is enforced by the use of the CountDownLatch await() and countdown() methods.

From the API docs:

Actions prior to "releasing" synchronizer methods such as Lock.unlock, Semaphore.release, and CountDownLatch.countDown happen-before actions subsequent to a successful "acquiring" method such as Lock.lock, Semaphore.acquire, Condition.await, and CountDownLatch.await on the same synchronizer object in another thread.

If you are dealing with concurrency on a regular basis get yourself a copy of 'Java Concurrency in Practice', it's the Java concurrency bible and will be well worth its weight on your bookshelf :-).


I suspect you need

private volatile Throwable throwable

Have you tried using an ExecutorService as it is built in and does this for you. The following prints

future1 := result
future2  threw java.lang.IllegalStateException
future3  timed out

The code is

public static void main(String... args)  {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<String> future1 = executor.submit(new Callable<String>() {
        public String call() throws Exception {
            return "result";
        }
    });

    Future<String> future2 = executor.submit(new Callable<String>() {
        public String call() throws Exception {
            throw new IllegalStateException();
        }
    });

    Future<String> future3 = executor.submit(new Callable<String>() {
        public String call() throws Exception {
            Thread.sleep(2000);
            throw new AssertionError();
        }
    });

    printResult("future1", future1);
    printResult("future2", future2);
    printResult("future3", future3);
    executor.shutdown();
}

private static void printResult(String description, Future<String> future) {
    try {
        System.out.println(description+" := "+future.get(1, TimeUnit.SECONDS));
    } catch (InterruptedException e) {
        System.out.println(description+"  interrupted");
    } catch (ExecutionException e) {
        System.out.println(description+"  threw "+e.getCause());
    } catch (TimeoutException e) {
        System.out.println(description+"  timed out");
    }
}

In the code for FutureTask, there is a comment.

/**
 * The thread running task. When nulled after set/cancel, this
 * indicates that the results are accessible.  Must be
 * volatile, to ensure visibility upon completion.
 */

If you are not going to re-use the code in the JDK, it can still be worth reading it so you can pick up on any tricks they use.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜