开发者

Thread-safe buffer of data to make batch inserts of controlled size

I have a simulation that generates data which must be saved to database.

ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    SaveDataToDatabase(cds);

});

The simulation generates a whole lot of data, so it wouldn't be practical to first generate it and then save it to database (up to 1 GB of data) and it also wouldn't make sense to save it to database one by one (too small transanctions to be practical). I want to insert them to database as a batch insert of controlled size (say 100 with one commit).

However, I think my knowledge of parallel computing is less that theoretical. I came up with this (which as you can see is very flawed):

DataBuffer buffer = new DataBuffer(...);

ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    buffer.SaveDataToBuffer(cds, i == r - 1);

});

public class DataBuffer
{
    int count = 0;
    int limit = 100

    object _locker = new object();

    ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }

    public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
    {
            lock (_locker)
            {
                if(count >= limit)
                {
                    ConcurrentBag<ComplexDataSet> dequeueRef;
                    if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
                    {
                        Commit(dequeueRef);
                    }

                    _lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
                    ComplexDataSetsQueue.Enqueue(_lastItemRef);
                    count = 1;
                }
                else
                {
                    // First time
                    if(_lastItemRef == null)
                    {
                        _lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
                        ComplexDataSetsQueue.Enqueue(_lastItemRef);
                        count = 1;
                    }
                    // If buffer isn't full
                    else
                    {
                        _lastItemRef.Add(data);
                        count++;
                    }
                }

                if(isfinalcycle)
                {
                        // Commit everything that hasn't been committed yet
                        ConcurrentBag<ComplexDataSet> dequeueRef;    
                    while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
                    {
                        Commit(dequeueRef);
                    }
                }
            }
    }

    public void Commit(ConcurrentBag<ComplexDataSet> data)
    {
        // Commit data to database..should this be somehow in another thread or something ?
    }
}

As you can see, I'm using queue to create a buffer and then manually decide when to commit. However I have a strong feeling that this isn'开发者_Go百科t very performing solution to my problem. First, I'm unsure whether I'm doing locking right. Second, I'm not sure even if this is fully thread-safe (or at all).

Can you please take a look for a moment and comment what should I do differently ? Or if there is a complitely better way of doing this (using somekind of Producer-Consumer technique or something) ?

Thanks and best wishes, D.


There is no need to use locks or expensive concurrency-safe data structures. The data is all independent, so introducing locking and sharing will only hurt performance and scalability.

Parallel.For has an overload that lets you specify per-thread data. In this you can store a private queue and private database connection.

Also: Parallel.For internally partitions your range into smaller chunks. It's perfectly efficient to pass it a huge range, so nothing to change there.

Parallel.For(0, 10000000, () => new ThreadState(),
    (i, loopstate, threadstate) =>
{
    ComplexDataSet data = GenerateData(i);

    threadstate.Add(data);

    return threadstate;
}, threadstate => threadstate.Dispose());

sealed class ThreadState : IDisposable
{
    readonly IDisposable db;
    readonly Queue<ComplexDataSet> queue = new Queue<ComplexDataSet>();

    public ThreadState()
    {
        // initialize db with a private MongoDb connection.
    }

    public void Add(ComplexDataSet cds)
    {
        queue.Enqueue(cds);

        if(queue.Count == 100)
        {
            Commit();
        }
    }

    void Commit()
    {
        db.Write(queue);
        queue.Clear();
    }

    public void Dispose()
    {
        try
        {
            if(queue.Count > 0)
            {
                Commit();
            }
        }
        finally
        {
            db.Dispose();
        }
    }
}

Now, MongoDb currently doesn't support truly concurrent inserts -- it holds some expensive locks in the server, so parallel commits won't gain you much (if any) speed. They want to fix this in the future, so you might get a free speed-up one day.

If you need to limit the number of database connections held, a producer/consumer setup is a good alternative. You can use a BlockingCollection queue to do this efficiently without using any locks:

// Specify a maximum of 1000 items in the collection so that we don't
// run out of memory if we get data faster than we can commit it.
// Add() will wait if it is full.

BlockingCollection<ComplexDataSet> commits =
    new BlockingCollection<ComplexDataSet>(1000);

Task consumer = Task.Factory.StartNew(() =>
    {
        // This is the consumer.  It processes the
        // "commits" queue until it signals completion.

        while(!commits.IsCompleted)
        {
            ComplexDataSet cds;

            // Timeout of -1 will wait for an item or IsCompleted == true.

            if(commits.TryTake(out cds, -1))
            {
                // Got at least one item, write it.
                db.Write(cds);

                // Continue dequeuing until the queue is empty, where it will
                // timeout instantly and return false, or until we've dequeued
                // 100 items.

                for(int i = 1; i < 100 && commits.TryTake(out cds, 0); ++i)
                {
                    db.Write(cds);
                }

                // Now that we're waiting for more items or have dequeued 100
                // of them, commit.  More can be continue to be added to the
                // queue by other threads while this commit is processing.

                db.Commit();
            }
        }
    }, TaskCreationOptions.LongRunning);

try
{
    // This is the producer.

    Parallel.For(0, 1000000, i =>
        {
            ComplexDataSet data = GenerateData(i);
            commits.Add(data);
        });
}
finally // put in a finally to ensure the task closes down.
{
    commits.CompleteAdding(); // set commits.IsFinished = true.
    consumer.Wait(); // wait for task to finish committing all the items.
}


In your example you have 10 000 000 packages of work. Each of this needs to be distributed to a thread. Assuming you don't have a really large number of cpu cores this is not optimal. You also have to synchronize your threads on every buffer.SaveDataToBuffer call (by using locks). Additionally you should be aware that the variable r isn't necessarly increased by one in a chronology view (example: Thread1 executes r with 1,2,3 and Thread2 with 4,5,6. Chronological this would lead to the following sequence of r passed to SaveDataToBuffer 1,4,2,5,3,6 (approximately)).

I would make the packages of work larger and then commit each package at once. This has also the benefit that you don't have to lock/synchronize all to often.

Here's an example:

int total = 10000000;
int step = 1000;

Parallel.For(0, total / step, (r, state) =>
{
    int start = r * start;
    int end = start + step;

    ComplexDataSet[] result = new ComplexDataSet[step];

    for (int i = start; i < end; i++)
    {
        result[i - start] = GenerateData(i);
    }

    Commit(result);
});

In this example the whole work is split into 10 000 packages (which are executed in parallel) and every package generates 1000 data items and commits them to the database.

With this solution the Commit method might be a bottleneck, if not wisely designed. Best would be to make it thread safe without using any locks. This can be accomplished, if you don't use common objects between threads which need synchronization.

E.g. for a sql server backend that would mean creating an own sql connection in the context of every Commit() call:

private void Commit(ComplexDataSet[] data)
{
    using (var connection = new SqlConnection("connection string..."))
    {
        connection.Open();

        // insert your data here...
    }
}


Instead of increasing complexity of software, rather consider simplification. You can refactor the code into three parts:

  1. Workers that enqueue

    This is concurrent GenerateData in Parallel.For that does some heavy computation and produce ComplexDataSet.

  2. Actual queue

    A concurrent queue that stores the results from [1] - so many ComplexDataSet. Here I assumed that one instance of ComplexDataSet is actually not really resource consuming and fairly light. As long as the queue is concurrent it will support parallel "inserts" and "deletes".

  3. Workers that dequeue

    Code that takes one instance of the ComplexDataSet from processing queue [2] and puts it into the concurrent bag (or other storage). Once the bag has N number of items you block, stop dequeueing, flush the content of the bag into the database and clear it. Finally, you unblock and resume dequeueing.

Here is some metacode (it still compiles, but needs improvements)

[1]

// [1] - Class is responsible for generating complex data sets and 
// adding them to processing queue
class EnqueueWorker
{
    //generate data and add to queue
    internal void ParrallelEnqueue(ConcurrentQueue<ComplexDataSet> resultQueue)
    {
        Parallel.For(1, 10000, (i) =>
        {
            ComplexDataSet cds = GenerateData(i);
            resultQueue.Enqueue(cds);

        });
    }

    //generate data
    ComplexDataSet GenerateData(int i)
    {
        return new ComplexDataSet();
    }
}

[3]

//[3] This guy takes sets from the processing queue and flush results when 
// N items have been generated
class DequeueWorker
{
    //buffer that holds processed dequeued data
    private static ConcurrentBag<ComplexDataSet> buffer;

    //lock to flush the data to the db once in a while
    private static object syncRoot = new object();

    //take item from processing queue and add it to internal buffer storage
    //once buffer is full - flush it to the database
    internal void ParrallelDequeue(ConcurrentQueue<ComplexDataSet> resultQueue)
    {
        buffer = new ConcurrentBag<ComplexDataSet>();
        int N = 100;

        Parallel.For(1, 10000, (i) =>
        {
            //try dequeue
            ComplexDataSet cds = null;

            var spinWait = new SpinWait();

            while (cds == null)
            {
                resultQueue.TryDequeue(out cds);
                spinWait.SpinOnce();
            }

            //add to buffer
            buffer.Add(cds);

            //flush to database if needed
            if (buffer.Count == N)
            {
                lock (syncRoot)
                {
                    IEnumerable<ComplexDataSet> data = buffer.ToArray();

                    // flush data to database

                    buffer = new ConcurrentBag<ComplexDataSet>();
                }
            }

        });
    }        
}

[2] and usage

class ComplexDataSet { }

class Program
{
    //processing queueu - [2]
    private static ConcurrentQueue<ComplexDataSet> processingQueue;

    static void Main(string[] args)
    {
        // create new processing queue - single instance for whole app
        processingQueue = new ConcurrentQueue<ComplexDataSet>();

        //enqueue worker
        Task enqueueTask = Task.Factory.StartNew(() =>
            {
                EnqueueWorker enqueueWorker = new EnqueueWorker();
                enqueueWorker.ParrallelEnqueue(processingQueue);
            });

        //dequeue worker
        Task dequeueTask = Task.Factory.StartNew(() =>
        {
            DequeueWorker dequeueWorker = new DequeueWorker();
            dequeueWorker.ParrallelDequeue(processingQueue);
        });            
    }
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜