开发者

Multithreaded Synchronised List<T>

I have a requirement whereby I needed to store a simple cache of a list of items. I was using List< T > for this purpose, but we have now changed the design to a开发者_开发技巧ccommodate multiple threads.

The architecture of the system is driven by events, therefore it's quite likely that a read and write operation could collide. Since the vast majority of access will be read-only I thought that the ReaderWriterLockSlim might be a good candidate. The cache only needs to be accurate at the point of reading for that moment in time.

I have written the code below and it seems to work ok, but are there some potential pain points?

UPDATE: Whilst the code below does fix some synchronisation problems it's not 100% perfect. I have since decided to implement a much simpler class that doesn't expose all of the IList< T > operations and therefore makes it 'safer' to re-use.

public class SynchronisedList<T> : IList<T>
{
    private ReaderWriterLockSlim cacheLock = new ReaderWriterLockSlim();
    private IList<T> innerCache = new List<T>();

    private U ReadReturn<U>(Func<U> function)
    {
        cacheLock.EnterReadLock();
        try { return function(); }
        finally { cacheLock.ExitReadLock(); }
    }

    private void Read(Action action)
    {
        cacheLock.EnterReadLock();
        try { action(); }
        finally { cacheLock.ExitReadLock(); }
    }

    private U WriteReturn<U>(Func<U> function)
    {
        cacheLock.EnterWriteLock();
        try { return function(); }
        finally { cacheLock.ExitWriteLock(); }
    }

    private void Write(Action action)
    {
        cacheLock.EnterWriteLock();
        try { action(); }
        finally { cacheLock.ExitWriteLock(); }
    }

    public T this[int index]
    {
        get { return ReadReturn(() => innerCache[index]); }
        set { Write(() => innerCache[index] = value); }
    }

    public int IndexOf(T item) { return ReadReturn(() => innerCache.IndexOf(item)); }
    public void Insert(int index, T item) { Write(() => innerCache.Insert(index, item)); }
    public void RemoveAt(int index) { Write(() => innerCache.RemoveAt(index)); }
    public void Add(T item) { Write(() => innerCache.Add(item)); }
    public void Clear() { Write(() => innerCache.Clear()); }
    public bool Contains(T item) { return ReadReturn(() => innerCache.Contains(item)); }
    public int Count { get { return ReadReturn(() => innerCache.Count); } }
    public bool IsReadOnly { get { return ReadReturn(() => innerCache.IsReadOnly); } }
    public void CopyTo(T[] array, int arrayIndex) { Read(() => innerCache.CopyTo(array, arrayIndex)); }
    public bool Remove(T item) { return WriteReturn(() => innerCache.Remove(item)); }
    public IEnumerator<T> GetEnumerator() { return ReadReturn(() => innerCache.GetEnumerator()); }
    IEnumerator IEnumerable.GetEnumerator() { return ReadReturn(() => (innerCache as IEnumerable).GetEnumerator()); }
}

internal class Program
{
    private static SynchronisedList<int> list = new SynchronisedList<int>();

    private static void Main()
    {          
        for (int i = 0; i < 500000; i++)
        {
            var index = i;
            ThreadPool.QueueUserWorkItem((state) =>
            {
                var threadNum = (int)state;
                if (threadNum % 10 == 0)
                {
                    list.Add(threadNum);
                }
                else
                {
                    Console.WriteLine(list.Count);
                }
            }, index);
        }
        Console.ReadKey();
    }
}


Are you aware of the built-in SynchronizedCollection<T> class?

It uses standard Monitor-based locking rather than ReaderWriterLockSlim. You'd need to profile to determine whether this makes a significant performance difference in your particular usage scenarios.


There are a couple of thread issues here.

1. I think the GetEnumerator functions exposes a thread issue here. They give away a reference to the innerCache that is not controlled by your locks.

Example where it may break down is if you have a thread doing a foreach over the list while another thread is removing or inserting elements.

The solution would be to copy the list and return an enumerator on that newly cloned list instead. The draw back would be memory issues if the list is long.

2. The Contains() and IndexOf() functions are more or less useless unless you have another locking method outside of the synchronised list.

Example: Thread A gets index of object, Thread B inserts/removed/updates that object, Thread A index is now stale.


I don't think this is a great idea really with a fully synchronised list. Write a customised version instead with limited functionality.

If you only need a queue or stack, implement that one with only the two or three necessary methods that are fully synchronised. If you need more functionality than that, use a List and have the different threads do the synchronisation.


This class solves all problems and makes your list 100% thread-safe.

Race conditions are avoided by using scopes that work just like transactions in databases.

Client code

List<T> unsafeList = ... 
var threadSafeList = new SyncronisedList(unsafeList);
using (threadSafeList.EnterReadScope()) {
   // all your sequential read operations are thread-safe
}
using (threadSafeList.EnterWriteScope()) {
   // all sequential read/write operations are thread-safe
}

Class code

public class SyncronisedList<T> : IList<T> {
    private readonly ReaderWriterLockSlim _threadLock;
    private readonly IList<T> _internalList;

    public SyncronisedList() : this(new List<T>()) {
    }

    public SyncronisedList(IList<T> internalList) {
        _internalList = internalList;
        _threadLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    }


    private U Read<U>(Func<U> function) {
        using (EnterReadScope())
            return function();
    }

    private void Read(Action action) {
        using (EnterReadScope())
            action();
    }

    private U Write<U>(Func<U> function) {
        using (EnterWriteScope())
            return function();
    }

    private void Write(Action action) {
        using (EnterWriteScope())
            action();
    }

    public IDisposable EnterReadScope() {
        return new Scope<T>(this, false);
    }

    public IDisposable EnterWriteScope() {
        return new Scope<T>(this, true);
    }

    public T this[int index] {
        get { return Read(() => _internalList[index]); }
        set { Write(() => _internalList[index] = value); }
    }

    public int IndexOf(T item) { return Read(() => _internalList.IndexOf(item)); }
    public void Insert(int index, T item) { Write(() => _internalList.Insert(index, item)); }
    public void RemoveAt(int index) { Write(() => _internalList.RemoveAt(index)); }
    public void Add(T item) { Write(() => _internalList.Add(item)); }
    public void Clear() { Write(() => _internalList.Clear()); }
    public bool Contains(T item) { return Read(() => _internalList.Contains(item)); }
    public int Count { get { return Read(() => _internalList.Count); } }
    public bool IsReadOnly { get { return Read(() => _internalList.IsReadOnly); } }
    public void CopyTo(T[] array, int arrayIndex) { Read(() => _internalList.CopyTo(array, arrayIndex)); }
    public bool Remove(T item) { return Write(() => _internalList.Remove(item)); }
    public IEnumerator<T> GetEnumerator() { return Read(() => _internalList.GetEnumerator()); }
    IEnumerator IEnumerable.GetEnumerator() { return Read(() => (_internalList as IEnumerable).GetEnumerator()); }


    private class Scope<U> : IDisposable {
        private readonly SyncronisedList<U> _owner;
        private readonly bool _write;

        internal Scope(SyncronisedList<U> owner, bool write) {
            _owner = owner;
            _write = write;
            if (_write)
                _owner._threadLock.EnterWriteLock();
            else
                _owner._threadLock.EnterReadLock();
        }

        public void Dispose() {
            if (_write)
                _owner._threadLock.ExitWriteLock();
            else
                _owner._threadLock.ExitReadLock();
        }

    }

}


Your implementation is ok but you still have to care about synchronization problems :

given a list {"foo"}

int index = list.IndexOf("foo");
Console.WriteLine(list[index]);

Now, what if another thread does a list.Clear() between thoses two lines ? You reader writer lock should be publicly accessible to handle thoses situations. Of course, same for Enumerator,...


Trying to make a list be all things to all people and to be thread safe is very hard.

I think you should look at the operations that your application needs and then design a class that exposes them in a thread safe way. (A list it too low level)

Your design is not likely to be thread safe in real life, as the code calling the list is likely to combine operations in an unsafe way.

Just exposing an Enumerator opens up a lot of problem – what does it mean to visit all items in a list while another thread is changing the list?


Certain operations on your list cannot be meaningfully thread-safe with regard to other operations. For example, one thread is about to write to element 5, and another thread deletes element 3 such that what used to be element 5 gets moved to 4, and what used to be 6 gets moved to 5, the first thread will end up overwriting an element other than the one it was expecting.

It's certainly possible to have a useful thread-safe collection which exposes access by index, if one puts certain restrictions on its behavior, and has it implement a the same subset of IList<T> as would an array.

The most useful style of ThreadSafeList I can envision would support the following operations, in addition to those that would be available on an array-implemented IList<T>:

  1. int Add(T item) -- Returns index of the new item
  2. AccessItem(index, ActionByRef proc) -- Call proc with list item as a `ref` parameter
  3. AccessItem(index, ActionByRef proc, ref XP1 xparam1) -- Call proc with list item as one `ref` parameter and `xparam1` as a second
  4. AccessItem(index, ActionByRef proc, ref XP1 xparam1, ref XP2 xparam2) -- As above, with another pass-through `ref` parameter
  5. Enumeration -- Returns the "live" state of each item that has been created prior to the start of enumeration.

The list would support adding items, but not inserting or deleting them. Since the Add method would return the index of the newly-created item, whatever thread added an item would have exclusive control over it. Further, the AccessItem primitives would allow two or more threads to use atomic primitives on list items in any way they saw fit (if T is a struct type with an exposed field, atomic primitives could also be used on those fields).

Such a type should not be built on List<T>, but instead on an T[32][] arr. Start by setting arr[0] to a T[16]; if a 17th item is added, initialize arr[1] to a T[32]; if that gets full, initialize arr[2] to a T[64], etc. Any given index will always be represented by the same array element, even as the list expands, so accesses to an existing list element will not be affected by the expansion of the list.

An append-only list could be a useful thread-safe type, but I'm as yet unaware of any such types in any version of .net.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜