开发者

Can the below class thread safety ever be broken; I am sure it can't be but just wanted to be doubly sure as it is not easy to test

public class ThreadSafe implements ITaskCompletionListener {

private final Set<String> taskIds = new HashSet<String>();
private final Set<String> successfulIds = new HashSet<String>();
private final Set<String> cancelledIds = new HashSet<String>();
private final Set<String> errorIds = new HashSet<String>();

public ThreadSafe() {

}

// invoked concurrently
@Override
public void onCancel(String pTaskId) {
    remove(pTaskId);
    cancelle开发者_StackOverflowdIds.add(pTaskId);
}

// invoked concurrently
@Override
public void onError(String pTaskId) {
    remove(pTaskId);
    errorIds.add(pTaskId);
}

// invoked concurrently
@Override
public void onSuccess(String pTaskId) {
    remove(pTaskId);
    successfulIds.add(pTaskId);
}

private void remove(String pTaskId) {
    taskIds.remove(pTaskId);
}

}


From the HashSet docs:

Note that this implementation is not synchronized. If multiple threads access a hash set concurrently, and at least one of the threads modifies the set, it must be synchronized externally

So no, your code is not thread-safe. Concurrent accesses to any of your methods could have strange results.


Rather than having lots of sets and passing ids between collections, you can use a single collection which is thread safe.

private final ConcurrentMap<String, State> idState = new ConcurrentHashMap<String, State>();
enum State { TASK, SUCCESS, CANCELLED, ERROR }

public void onSuccess(String taskId) {
    idState.put(taskId, State.SUCCESS);
}

public void onCancelled(String taskId) {
    idState.put(taskId, State.CANCELLED);
}

public void onError(String taskId) {
    idState.put(taskId, State.ERROR);
}

public void remove(String taskId) {
    idState.remove(taskId);
}


This code is riddled with thread-safety problems.

HashSet is not thread-safe (it is based on the non-threads-safe HashMap - race conditions in which can end up causing infinite loops).

Secondly, there is no atomicity between an ID being in the taskIds set and it being added to one of the others, so a task's existence is ephemeral.

Thirdly, the code implicitly assumes that the task state is simply inProgress -> success|error|cancel and that there is no concurrent task execution. If this is not true then the code fails.


Methods onError, onSuccess, and onCancel can be invoked in parallel by two or more threads, which could eventually invoke taskIds.remove in parallel, which is NOT thread-safe.

Just mark these three methods as synchronized and you should be done.


Peter is right. "Rather than having lots of sets and passing ids between collections, you can use a single collection which is thread safe."

This would make it a bit less complex and make it look uneasy to break.


I think you should use synchronization block in methods which are called concurrently .

example

public void onCancel(String pTaskId) {
synchronized(this){
    remove(pTaskId);
    cancelledIds.add(pTaskId);
}
}


Thanks to each one of you for responding and also for pointing out the obvious bloopers in the code I pasted. I will give a context of the problem I am solving.

I will receive requests (each request will have a set of task id's); I have a thread pool which would be used to submit this request (batch of tasks for each request). I will accept from the clients listeners that would be invoked after task completion. In the current context, I could have used an ExecutorCompletionService to wait for all the tasks corresponding to a request to complete, however, I do not wish to make any blocking calls.

Instead I wish to subclass FutureTask (override done() method) and this would be instantiated in newTaskFor() of my subclasses executor service. All the future tasks corresponding to one request will share the TaskCompletionListener, and in the override done() method will invoke one of success(), error() or cancel. The TaskCompletionListener, will be initialized with the initial set of task id (number of tasks) and for each invocation will decrement the count by one after which the completion would be signaled.

Here is a modified version of the code, I hope I haven't committed much mistakes this time around (and yes, thanks once again for all the answers last time around).

public class TaskCompletionListener implements ITaskCompletionListener {

private final AtomicInteger taskCount;

private final IProcessCompletionListener completionListener;

public TaskCompletionListener(final HashSet<String> pTaskIds, IProcessCompletionListener pCompletionListener) {
    taskCount = new AtomicInteger(pTaskIds.size());
    completionListener = pCompletionListener;
}

// invoked concurrently
@Override
public void onCancel(String pTaskId) {
    checkCompletion();
}

// invoked concurrently
@Override
public void onError(String pTaskId) {
    checkCompletion();
}

// invoked concurrently
@Override
public void onSuccess(String pTaskId) {
    checkCompletion();
}

private void checkCompletion() {
              // method level confinement  
    int currentCount = taskCount.decrementAndGet();
    if (currentCount == 0) {
        completionListener.onProcessComplete();
    }
}

interface IProcessCompletionListener {
    void onProcessComplete();
}
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜