开发者

a simple boss-worker model using pthreads

I'm an amateur programmer that's experimenting using pthreads, to see to what extent a multi-threaded program can lead to efficiencies in a rather long computation I'm working on. The computation runs through a std::list< string > object, popping the first element of the list off, and farming it out to a thread that computes something with it. The program keeps track of the active threads, and ensures there's always a certain number of active threads running. Once the list is empty, the program sorts the resulting data, dumps a data file and terminates.

The multi-threaded version of the program currently does not work. It gets 20 or 40 or 200 or so elements down the list (depending on which list I give it), and segfaults. It seems that the segfault happens on particular elements of the list, meaning they don't appear random in any way.

BUT the strange thing is, if I compile with debug symbols and run the program through gdb, the program does not segfault. It runs perfectly. Slowly, of course, but it runs and does everything the way I expect it to.

After playing around with everyone's suggestions for a while, using (among other things) valgrind's tools to开发者_如何学Go try and sort out what's happening. I've noticed the simplified code below (without any calls outside the std library or the pthread library) causes trouble for helgrind and this is likely the source of my problems. So here is just the simplified code, and helgrind's complaints.

#include <cstdlib>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <list>
#include <iostream>
#include <signal.h>
#include <sys/select.h>

struct thread_detail {
 pthread_t *threadID; 
 unsigned long num;
};

pthread_mutex_t coutLock;

void *ThreadToSpawn(void *threadarg)
{
   struct thread_detail *my_data;
   my_data = (struct thread_detail *) threadarg;
   int taskid = my_data->num;

   struct timeval timeout;
   for (unsigned long i=0; i < 10; i++)
    { 
     timeout.tv_sec = 0;  timeout.tv_usec = 500000; // half-second 
     select( 0, NULL, NULL, NULL, & timeout );
     pthread_mutex_lock(&coutLock);
     std::cout << taskid << " "; std::cout.flush();
     pthread_mutex_unlock(&coutLock);
    }
   pthread_exit(NULL);
}


int main (int argc, char *argv[])
{
  unsigned long comp_DONE=0; 
  unsigned long comp_START=0;
  unsigned long ms_LAG=10000; // microsecond lag between polling of threads

  // set-up the mutexes
  pthread_mutex_init( &coutLock, NULL );

  if (argc != 3) { std::cout << "Program requires two arguments: (1) number of threads to use,"
                               " and (2) tasks to accomplish. \n"; exit(1); }
  unsigned long NUM_THREADS(atoi( argv[1] ));
  unsigned long comp_TODO(atoi(argv[2]));
  std::cout << "Program will have " << NUM_THREADS << " threads. \n";
  std::list < thread_detail > thread_table;

   while (comp_DONE != comp_TODO) // main loop to set-up and track threads
    {
     // poll stack of computations to see if any have finished, 
     // extract data and remove completed ones from stack
     std::list < thread_detail >::iterator i(thread_table.begin());
     while (i!=thread_table.end())
      {
       if (pthread_kill(*i->threadID,0)!=0) // thread is dead
        { // if there was relevant info in *i we'd extract it here
         if (pthread_join(*i->threadID, NULL)!=0) { std::cout << "Thread join error!\n"; exit(1); }
         pthread_mutex_lock(&coutLock);
         std::cout << i->num << " done. "; std::cout.flush();
         pthread_mutex_unlock(&coutLock);
         delete i->threadID;
         thread_table.erase(i++);  
         comp_DONE++;
        }
       else (i++);
      }
     // if list not full, toss another on the pile
     while ( (thread_table.size() < NUM_THREADS) && (comp_TODO > comp_START) )
      {
        pthread_t *tId( new pthread_t );
        thread_detail Y; Y.threadID=tId; Y.num=comp_START;
        thread_table.push_back(Y);
        int rc( pthread_create( tId, NULL, ThreadToSpawn, (void *)(&(thread_table.back() )) ) );
        if (rc) { printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); }
        pthread_mutex_lock(&coutLock);
       std::cout << comp_START << " start. "; std::cout.flush();
        pthread_mutex_unlock(&coutLock);
        comp_START++;
      }

     // wait a specified amount of time
     struct timeval timeout;
     timeout.tv_sec = 0;  timeout.tv_usec = ms_LAG; 
     select( 0, NULL, NULL, NULL, & timeout );
    } // the big while loop

   pthread_exit(NULL);
}

Helgrind output


==2849== Helgrind, a thread error detector
==2849== Copyright (C) 2007-2009, and GNU GPL'd, by OpenWorks LLP et al.
==2849== Using Valgrind-3.6.0.SVN-Debian and LibVEX; rerun with -h for copyright info
==2849== Command: ./thread2 2 6
==2849== 
Program will have 2 threads. 
==2849== Thread #2 was created
==2849==    at 0x64276BE: clone (clone.S:77)
==2849==    by 0x555E172: pthread_create@@GLIBC_2.2.5 (createthread.c:75)
==2849==    by 0x4C2D42C: pthread_create_WRK (hg_intercepts.c:230)
==2849==    by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849==    by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849== 
==2849== Thread #1 is the program's root thread
==2849== 
==2849== Possible data race during write of size 8 at 0x7feffffe0 by thread #2
==2849==    at 0x4C2D54C: mythread_wrapper (hg_intercepts.c:200)
==2849==  This conflicts with a previous read of size 8 by thread #1
==2849==    at 0x4C2D440: pthread_create_WRK (hg_intercepts.c:235)
==2849==    by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849==    by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849== 
 [0 start.]  [1 start.] 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1  [0 done.]  [1 done.]  [2 start.]  [3 start.] 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3  [2 done.]  [3 done.]  [4 start.]  [5 start.] 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5  [4 done.]  [5 done.] ==2849== 
==2849== For counts of detected and suppressed errors, rerun with: -v
==2849== Use --history-level=approx or =none to gain increased speed, at
==2849== the cost of reduced accuracy of conflicting-access information
==2849== ERROR SUMMARY: 6 errors from 1 contexts (suppressed: 675 from 37)

Presumably I'm using pthreads in an incorrect way but it's not so clear to me what I'm doing wrong. Moreover, I'm not sure what to make the of the helgrind output. Earlier helgrind was complaining because I had not called pthread_join on threads that for other reasons the code knew was dead. Adding the pthread_join took care of those complaints.

Reading various pthread tutorials on-line I've discovered that it's probably pointless to have so much thread creation and destruction going on, as in the above code. It's probably more efficient to have N threads running simultaneously, and use mutexes and shared memory to pass data between the "BOSS" thread and the "WORKER" threads, only killing the WORKER threads once, at the end of the program. So that's something I'll eventually have to adjust for but is there anything obviously wrong with the above code?

Edit: I'm noticing some keywords more and more often. The terminology for the thing I'm trying to create is apparently a thread pool. Moreover, there are various proposals for standard implementations of this, for example in the boost library there's boost::threadpool, boost::task, boost::thread. Some of these appear to be only proposals. I come across threads here where people mention you can combine ASIO and boost::thread to accomplish what I'm looking for. Similarly there's a message queue class.

Hmm, so it seems like I'm scratching at the surface of a topics many people are thinking about nowadays, but it seems kind of germinal, like OOP was in 1989 or something.


Try enabling core dumps (ulimit -c unlimited), then run your program without gdb. When it crashes, it should leave behind a core file, which you can then open up with gdb and start investigating (gdb <executable-file> <core-file>).


Regarding top, how many threads are you using? I don't see DATA in my top output but have seen the Virtual column balloon when using threads. My understanding (and perhaps I should ask to be sure) is that each thread has its own memory space that it may potentially use. That memory isn't actually being used, it's just available if needed, which is why that number can get quite high without really causing problems. In and of itself the memory probably isn't catastrophic. You should see if the DATA utilization scales linearly with the number of threads you're using.

Regarding gdb. As you noted, gdb won't fix your code, it may move around where your errors occur if you're corrupting memory though. If the corruption occurs in an area that you don't go back to or that you've already released and don't ever try to reuse the symptoms of your problems will go away. Go away until you need to demo or use your code in some critical area that is.

Also, you'll want to take a look at helgrind, part of valgrind. This kind of thing is its bread and butter if you have a lock problem:

Helgrind is a Valgrind tool for detecting synchronisation errors in C, C++ and Fortran programs that use the POSIX pthreads threading primitives.

Just do:

valgrind --tool=helgrind {your program}


Are you sure it's complete code? I don't see where you're creating threads or where BuildKCData is being called from.

You ought to have a memory barrier after pthread_kill(), even though I doubt it makes a difference in this case.

EDIT: You're confusing in-order execution and cache-consistency.

Cache consistency: x86 (currently) guarantees that aligned 4-byte accesses are atomic, so a[0]=123 in thread A and a[1]=456 in thread B will work — thread C will eventually see "123,456". There are various cache-consistency protocols around, but I believe it's approximately a MRSW lock.

Out-of-order execution: x86 doesn't guarantee ordering of reads (and possibly writes; there was a debate about whether sfence was needed in the linux kernel). This lets the CPU prefetch data more effectively, but it means a[0]=123,a[1] in thread A and a[1]=456,a[0] in thread B could both return 0, because the fetch of a[1] can happen before the load of a[0]. There are two general ways to fix this:

  • Only access shared data when you're holding the lock. In particular, do not read shared data outside of the lock. Whether this means a lock for each entry or a lock for the entire array is up to you, and what you think lock contention is likely to be like (tip: it's not usually very big).
  • Stick a memory barrier between things that need to be in order. This is difficult to get right (pthread doesn't even have memory barriers; pthread_barrier more like a sync point).

While memory barriers are a recent trend, locking is far easier to get right (I am holding the lock, therefore nobody else is allowed to change the data under my feet). Memory barriers are all the rage in some circles, but there's a lot more to get right (I hope this read is atomic, I hope other threads write atomically, I hope other threads use a barrier, and oh yeah, I need to use a barrier too).

And if locking's too slow, reducing contention will be much more effective than replacing locks with barriers and hoping you got it right.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜