开发者

My first multithreading application - Strange behavior of Boost:Threads?

I wrote a multithreading application for parsing logfiles. It's based on the mutex buffer example from http://drdobbs.com/cpp/184401518?pgno=5.

The idea is having a buffer class that has functions to put items into the buffer and take items out of it. The Synchronization of the reading and the writing threads is handled using conditions. While buffer is not full, new items are written into the buffer and while it's not empty, items are being read from it. Otherwise threads will wait.

The example uses a fixed amount of items to process so I changed the reading thread to run while there is input from file and the processing threads run while there's input or while there are items left in buffer.

My Problem is if I use 1 reading and 1 processing thread, everything is ok and stable. When I add another processing thread there's a big boost in performance and it's still stable even after 10.000 testruns.

Now when I add another processor thread (1 read, 3 processing), programm seems to hang (deadlock?) periodically (but not every time) and is either waiting for buffer to fill or to get empty.

How come 2 threads doing the same thing synchronize stable and 3 of them crash?

I'm a newb to C++ threading so perhaps any of you more experienced c开发者_Go百科oders know what can cause this behavior?

Here's my code:

Buffer Class:

#include "StringBuffer.h"

void StringBuffer::put(string str)
  {
    scoped_lock lock(mutex);
    if (full == BUF_SIZE)
    {
      {
        //boost::mutex::scoped_lock lock(io_mutex);
        //std::cout << "Buffer is full. Waiting..." << std::endl;
      }
      while (full == BUF_SIZE)
        cond.wait(lock);
    }
    str_buffer[p] = str;
    p = (p+1) % BUF_SIZE;
    ++full;
    cond.notify_one();
  }

string StringBuffer::get()
  {
    scoped_lock lk(mutex);
    if (full == 0)
    {
      {
        //boost::mutex::scoped_lock lock(io_mutex);
        //std::cout << "Buffer is empty. Waiting..." << std::endl;
      }
      while (full == 0)
        cond.wait(lk);
    }
    string test = str_buffer[c];
    c = (c+1) % BUF_SIZE;
    --full;
    cond.notify_one();
    return test;
  }

and here's the main:

Parser p;
StringBuffer buf;
Report report;
string transfer;
ifstream input;
vector <boost::regex> regs;

int proc_count = 0;
int push_count = 0;
bool pusher_done = false;

//  Show filter configuration and init report by dimensioning counter vectors

void setup_report() {
    for (int k = 0; k < p.filters(); k++) {
        std::cout << "SID(NUM):" << k << "  Name(TXT):\"" << p.name_at(k) << "\"" << "  Filter(REG):\"" << p.filter_at(k) << "\"" << endl;
        regs.push_back(boost::regex(p.filter_at(k)));
        report.hits_filters.push_back(0);
        report.names.push_back(p.name_at(k));
        report.filters.push_back(p.filter_at(k));
    }
}

// Read strings from sourcefiles and put them into buffer

void pusher() {

    // as long as another string could be red, ...

    while (input) {

        // put it into buffer

        buf.put(transfer);

        // and get another string from source file

        getline(input, transfer);
        push_count++;
    }
    pusher_done = true;
}

// Get strings from buffer and check RegEx filters. Pass matches to report

void processor()
{
    while (!pusher_done || buf.get_rest()) {
        string n = buf.get();
        for (unsigned sid = 0; sid < regs.size(); sid++) {
            if (boost::regex_search(n, regs[sid])) report.report_hit(sid);
        }
        boost::mutex::scoped_lock lk(buf.count_mutex);
        {
            proc_count++;
        }
    }
}

int main(int argc, const char* argv[], char* envp[])
{

    if (argc == 3)
    {
        //  first add sourcefile from argv[1] filepath, ...

        p.addSource(argv[1]);
        std::cout << "Source File: *** Ok\n";

        //  then read configuration from argv[2] filepath, ...

        p.readPipes(envp, argv[2]);
        std::cout << "Configuration: *** Ok\n\n";

        // and setup the Report Object.

        setup_report();

        // For all sourcefiles that have been parsed, ...

        for (int i = 0; i < p.sources(); i++) {
            input.close();
            input.clear();

            // open the sourcefile in a filestream.

            input.open(p.source_at(i).c_str());

            // check if file exist, otherwise throw error and exit

            if (!input)
            {
                std::cout << "\nError! File not found: " <<  p.source_at(i);
                exit(1);
            }

            // get start time

            std::cout << "\n- started:  ";
            ptime start(second_clock::local_time());
            cout << start << endl;

            // read a first string into transfer to get the loops going

            getline(input, transfer);

            // create threads and pass a reference to functions

            boost::thread push1(&pusher);
            boost::thread proc1(&processor);
            boost::thread proc2(&processor);


            // start all the threads and wait for them to complete.

            push1.join();
            proc1.join();
            proc2.join();


            // calculate and output runtime and lines per second

            ptime end(second_clock::local_time());
            time_duration runtime = end - start;
            std::cout << "- finished: " << ptime(second_clock::local_time()) << endl;
            cout << "- processed lines: " << push_count << endl;
            cout << "- runtime: " << to_simple_string(runtime) << endl;
            float processed = push_count;
            float lines_per_second = processed/runtime.total_seconds();
            cout << "- lines per second: " << lines_per_second << endl;

            // write report to file

            report.create_filereport();         // after all threads finished write reported data to file
            cout << "\nReport saved as: ./report.log\n\nBye!" << endl;
        }
    }
    else std::cout << "Usage: \"./Speed-Extract [source][config]\"\n\n";
    return 0;
}

Edit 1:

Thanks alot for your help. By adding some counters and thread id's to output I figured out what is the problem:

I noticed that several threads could remain waiting for buffer to fill.

My processing threads run while there are new sourcestrings left that haven't been read yet OR while buffer is not empty. This is not good.

Say I have 2 threads waiting for buffer to fill. As soon as reader reads a new line (maybe the last few lines) theres 6 other threads that try to get this line(s) and lock the item so the 2 waiting threads maybe don't even have a chance to try to unlock it.

As soon as they check a line is taken by another thread and they keep waiting. The Reading thread doesn't notify them when it reaches eof and then it stops. The both waiting threads wait forever.

My Reading function aditionally has to notify all threads that it reached eof, so threads shall only remain waiting if buffer is empty and file is not EOF.


As @Martin I can not see any obvious problem in your code. The only idea I have is that you could try to use separate condition variables for writing to the buffer and reading from it. As it is now, each time a thread is finished getting an item, other threads that were waiting in the get method are potentially signaled as well.

Consider the following. The buffer is full, so the writer waits for a the cond signal. Now, the readers empty the queue, without the writer being signaled even once. This is possible, since they use the same condition variable, and becomes more likely the more readers there are. Every time a reader removes an item from the buffer, it calls notify_one. This can wake up the writer, but it can also wake up a reader. Suppose by chance all notifications end up waking up readers. The writer will never be released. In the end, all threads will wait on a signal and you have a deadlock.

If that is correct, then you have two possible fixes:

  1. Use different signals to prevent readers from "stealing" notifications intended for the writer.
  2. Use notify_all instead of notify_one to make sure the reader gets a chance every time an item is removed.


I can't actually see a problem.

But one thing to remember is that just because a thread is released from the condition variable with a signal does not mean it begins running.

Once released it must acquire the mutex before continuing, but every time the thread is scheduled to run somebody else may potentially have locked the mutex (given how tight those loops are that would not be a surprise) thus it suspends waiting for its next scheduling slot. This may be where your conflict is.

The trouble is putting print statements into the code is not going to help as the print statements affect the timing (they are expensive) and thus you will get a different behavior. Something cheap like keeping a count of what each thread action may be cheap enough so that it does not affect the timing but helps you determine the problem. Note: print the results only after you finish.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜