开发者

True non-blocking two-way communication between parent and external child process

I have read around 50 posts and tutorials on this topic, I have copied, written and tested around 20 alternatives and done every possible research I can think of. Still, I have not seen a working solution for the following problem:

Parent process A wants to pass data to an external process B, let process B modify the data and pass it back to parent process A, then continue with parent process A. Process B is part of an external program suite that I have no influence over, and that is normally run like this on the UNIX command line:

< input_data program_B1 | program_B2 | program_B3 > output_data

...where

input_data, output_data: Some data that is processed in programs B1-B3

program_B1,B2,B3: Programs that read data from stdin (fread) and output to stdout (fwrite) and apply some processing to the data.

So, in sequence:

(1) Parent process A passes data to child process B

(2) Child process B reads data and modifies it

(3) Child process B passes data back to parent process A

(4) Parent process A reads data and continues (for example passing it further on to a process B2..).

(5) Parent process A passes another data set to child process B etc.

The problem is, whatever I do, the program almost always ends up hanging on a read/fread (or write/fwrite?) to or from a pipe.

One important thing to note is that the parent process cannot simply close the pipes after passing data on to the child process, because it works in a loop and wants to pass another set of data to the child process once it has finished processing the first set.

Here is a working set of parent/child programs (compile with g++ 开发者_C百科pipe_parent.cc -o pipe_parent, g++ pipe_child.cc -o pipe_child) illustrating the problem with unnamed pipes. I have also tried named pipes, but not as extensively. Each execution can have a slightly different outcome. If the sleep statement is omitted in the parent, or the fflush() statement is omitted in the child, the pipes will almost surely block. If the amount of data to be passed on is increased, it will always block independent of the sleep or fflush.

Parent program A:

#include <cstring>
#include <cstdio>
#include <cstdlib>

extern "C" {
  #include <unistd.h>
  #include <fcntl.h>
 }

using namespace std;

/*
 * Parent-child inter-communication
 * Child is external process
 */

int main() {
  int fd[2];
  if( pipe(fd) == -1 ) {
    fprintf(stderr,"Unable to create pipe\n");
  }
  int fd_parentWrite = fd[1];
  int fd_childRead   = fd[0];
  if( pipe(fd) == -1 ) {
    fprintf(stderr,"Unable to create pipe\n");
    exit(-1);
  }
  int fd_childWrite = fd[1];
  int fd_parentRead = fd[0];

  pid_t pid = fork();
  if( pid == -1 ) {
    fprintf(stderr,"Unable to fork new process\n");
    exit(-1);
  }

  if( pid == 0 ) { // Child process
    dup2( fd_childRead,  fileno(stdin)  );  // Redirect standard input(0) to child 'read pipe'
        dup2( fd_childWrite, fileno(stdout) );  // Redirect standard output(1) to child 'write pipe'

    close(fd_parentRead);
    close(fd_parentWrite);
    close(fd_childRead);
    close(fd_childWrite);
    // execl replaces child process with an external one
    int ret = execl("/disk/sources/pipe_test/pipe_child","pipe_child",NULL);
    fprintf(stderr,"External process failed, return code: %d...\n", ret);
    exit(-1);
    // Child process is done. Will not continue from here on
  }
  else { // Parent process
    // Nothing to set up
  }

  // ...more code...

  if( pid > 0 ) { // Parent process (redundant if statement)
    int numElements = 10000;
    int totalSize = numElements * sizeof(float);
    float* buffer = new float[numElements];
    for( int i = 0; i < numElements; i++ ) {
      buffer[i] = (float)i;
    }

    for( int iter = 0; iter < 5; iter++ ) {
      fprintf(stderr,"--------- Iteration #%d -----------\n", iter);
      int sizeWrite = (int)write( fd_parentWrite, buffer, totalSize );
      if( sizeWrite == -1 ) {
        fprintf(stderr,"Parent process write error\n");
        exit(-1);
      }
      fprintf(stderr,"Parent #%d: Wrote %d elements. Total size: %d\n", iter, sizeWrite, totalSize);
      sleep(1);   // <--- CHANGE!
      int sizeRead = (int)read( fd_parentRead, buffer, totalSize );
      if( sizeRead <= 0 ) {
        fprintf(stderr,"Parent process read error\n");
      }
      while( sizeRead < totalSize ) {
        fprintf(stderr,"Parent #%d: Read %d elements, continue reading...\n", iter, sizeRead);
        int sizeNew = (int)read( fd_parentRead, &buffer[sizeRead], totalSize-sizeRead );
        fprintf(stderr," ...newly read %d elements\n", sizeNew);
        if( sizeNew < 0 ) {
          exit(-1);
        }
        sizeRead += sizeNew;
      }
      fprintf(stderr,"Parent #%d: Read %d elements. Total size: %d\n", iter, sizeRead, totalSize);
      fprintf(stderr,"Examples :  %f  %f  %f\n", buffer[0], buffer[10], buffer[100]);
    }

    delete [] buffer;
  }

  close(fd_parentRead);
  close(fd_parentWrite);
  close(fd_childRead);
  close(fd_childWrite);

  return 0;
}

Child program B:

#include <cstdio>

using namespace std;

int main() {

  int numElements = 10000;
  int totalSize = numElements * sizeof(float);
  float* buffer = new float[numElements];

  int counter = 0;
  int sizeRead = 0;
  do {
    sizeRead = fread( buffer, 1, totalSize, stdin);
    fprintf(stderr,"Child  #%d: Read %d elements, buffer100: %f\n", counter, sizeRead, buffer[100]);
    if( sizeRead > 0 ) {
      for( int i = 0; i < numElements; i++ ) {
        buffer[i] += numElements;
      }
      int sizeWrite = fwrite( buffer, 1, totalSize, stdout);
      fflush(stdout);  // <--- CHANGE!

      fprintf(stderr,"Child  #%d: Wrote %d elements\n", counter, sizeWrite);
      counter += 1;
    }
  } while( sizeRead > 0 );

  return 0;
}

Is there any way to check when the pipe has enough data to be read? Or is there an alternative way to resolve the above problem, with or without pipes?

Please help!


Possibly the best solution when reading is to check with select whether you can read from the pipe. You can even pass a timeout. The alternative might be setting the O_NONBLOCK flag on file descriptor 0 (stdin) with fcntl, though I think the select way is better.

As with ensuring non-blocking write: that's a bit harder as you don't know how much you can write before the pipe blocks. One way (that I feel is very ugly) would be to only write 1 byte chunks and again check with select whether you can write. But that would be a performance killer, so use only if performance in communication is not an issue.


The first answer (using select to find out whether a pipe is ready to be read from) was good but didn't really solve my issue, see also my previous comments. Sooner or later I always ended up with a "race condition" where the program kept hanging either on a read or write.

The solution (maybe not be the only one?) is to run the child-to-parent data transfer in a different thread. I also went back and implemented the pipes as named pipes. It would probably also work with unnamed pipes but I didn't check that.

The final code is below. Note that no explicit flushing is required; the parent-to-child and child-to-parent data transfers are now decoupled. Any comments how this can be improved welcome! One residual problem I can see is that the pipes may fill up depending on how long time the child needs to process the data. I'm not sure how likely this is to happen. And by the way this worked fine with my external programs, not only with the provided child program.

Parent program A:

#include <cstring>
#include <cstdio>
#include <cstdlib>
#include <string>
#include <iostream>

extern "C" {
  #include <unistd.h>
  #include <fcntl.h>
  #include <sys/stat.h>
  #include <sys/types.h>
  #include <errno.h>
  #include <signal.h>
  #include <sys/wait.h>
  #include <pthread.h>
}

using namespace std;

static int const READING  = -1;
static int const BUFFER_READY = 1;
static int const FINISHED = 0;

/*
 * Parent-child inter-communication
 * Child is external process
 */

struct threadStruct {
  FILE*  file_c2p;
  int    sizeBuffer;
  float* buffer;
  int    io_flag;
};

// Custom sleep function
void mini_sleep( int millisec ) {
  struct timespec req={0},rem={0};
  time_t sec = (int)(millisec/1000);
  millisec    = (int)(millisec-(sec*1000));
  req.tv_sec  = sec;
  req.tv_nsec = millisec*1000000L;
  nanosleep(&req,&rem);
}

// Function to be executed within separate thread: Reads in data from file pointer
// Hand-shaking with main thread is done via the flag 'io_flag'
void *threadFunction( void *arg ) {
  threadStruct* ptr = (threadStruct*)arg;

  ptr->io_flag = READING;
  while( ptr->io_flag != FINISHED ) {
    if( ptr->io_flag == READING ) {
      int sizeRead = fread( ptr->buffer, 1, ptr->sizeBuffer, ptr->file_c2p );
      if( sizeRead <= 0 ) {
        ptr->io_flag = FINISHED;
        return NULL;
      }
      ptr->io_flag = BUFFER_READY;
    }
    else {
      mini_sleep(10);
    }
  }
  return NULL;
}

//--------------------------------------------------
int main() {
  std::string filename_p2c("/tmp/fifo11_p2c");
  std::string filename_c2p("/tmp/fifo11_c2p");

  fprintf(stderr,"..started\n");

  int status = mknod(filename_p2c.c_str(), S_IRUSR | S_IWUSR | S_IFIFO, 0);
  if( (status == -1) && (errno != EEXIST) ) {
    fprintf(stderr,"Error creating named pipe: %s\n", strerror(errno));
    exit(-1);
  }
  status = mknod(filename_c2p.c_str(), S_IRUSR | S_IWUSR | S_IFIFO, 0);
  if( (status == -1) && (errno != EEXIST) ) {
    fprintf(stderr,"Error creating named pipe: %s\n", strerror(errno));
    exit(-1);
  }

  FILE* file_dump = fopen("parent_dump","w");

  int fd_p2c;
  int fd_c2p;
  FILE* file_c2p = NULL;

  //--------------------------------------------------
  // Set up parent/child processes
  //
  pid_t pid = fork();
  if( pid == -1 ) {
    fprintf(stderr,"Unable to fork new process\n");
  }

  if( pid == 0 ) { // Child process
    fd_p2c = open( filename_p2c.c_str(), O_RDONLY );
    if( fd_p2c < 0 ) {
      fprintf(stderr,"Child: Error opening the named pipe: %d %d '%s'\n", fd_p2c, errno, strerror(errno));
      exit(-1);
    }
    fd_c2p = open( filename_c2p.c_str(), O_WRONLY );
    if( fd_c2p < 0 ) {
      fprintf(stderr,"Child: Error opening the named pipe: %d %d '%s'\n", fd_c2p, errno, strerror(errno));
      exit(-1);
    }

    dup2(fd_p2c,fileno(stdin));    // Redirect standard input(0) to child 'read pipe'
    dup2(fd_c2p,fileno(stdout));  // Redirect standard output(1) to child 'write pipe'
    close(fd_p2c);
    close(fd_c2p);

    int ret = execl("/disk/sources/pipe_test/pipe_child","pipe_child",NULL);
    fprintf(stderr,"External process failed, return code: %d...\n", ret);
    kill( getppid(), 9 );  // Kill parent process
    exit(-1);
  }
  else { // Parent process
    fd_p2c = open( filename_p2c.c_str(), O_WRONLY );
    if( fd_p2c < 0 ) {
      fprintf(stderr,"Parent: Error opening the named pipe: %d %d '%s'\n", fd_p2c, errno, strerror(errno));
      exit(-1);
    }
    file_c2p = fopen( filename_c2p.c_str(), "r");
    fd_c2p = fileno( file_c2p );
    if( fd_c2p < 0 ) {
      fprintf(stderr,"Parent: Error opening the named pipe: %d %d '%s'\n", fd_c2p, errno, strerror(errno));
      exit(-1);
    }
  }

  int numElements = 10000;
  int sizeBuffer = numElements * sizeof(float);
  float* bufferIn  = new float[numElements];
  float* bufferOut = new float[numElements];
  for( int i = 0; i < numElements; i++ ) {
    bufferIn[i]  = 0.0;
  }
  int numIterations = 5;
  int numBytesAll = numElements * sizeof(float) * numIterations;

  pthread_t thread;
  threadStruct* threadParam = new threadStruct();
  threadParam->file_c2p   = file_c2p;
  threadParam->sizeBuffer = sizeBuffer;
  threadParam->buffer     = bufferIn;
  threadParam->io_flag    = READING;

  int thread_stat = pthread_create( &thread, NULL, threadFunction, threadParam );
  if( thread_stat < 0 ) {
    fprintf(stderr,"Error when creating thread\n");
    exit(-1);
  }

  int readCounter  = 0;
  int numBytesWrite = 0;
  int numBytesRead  = 0;
  for( int iter = 0; iter < numIterations; iter++ ) {
    for( int i = 0; i < numElements; i++ ) {
      bufferOut[i] = (float)i + iter*numElements*10;
    }

    int sizeWrite = (int)write( fd_p2c, bufferOut, sizeBuffer );
    if( sizeWrite == -1 ) {
      fprintf(stderr,"Parent process write error\n");
      exit(-1);
    }
    numBytesWrite += sizeWrite;
    fprintf(file_dump,"Parent #%d: Wrote %d/%d bytes.\n", iter, numBytesWrite, numBytesAll);

    if( iter == numIterations-1 ) close(fd_p2c);  // Closing output pipe makes sure child receives EOF

    if( threadParam->io_flag != READING ) {
          numBytesRead += sizeBuffer;
      fprintf(file_dump,"Parent #%d: Read  %d/%d bytes. Examples: %f %f\n",
              readCounter, numBytesRead, numBytesAll, bufferIn[1], bufferIn[numElements-1] );
      readCounter += 1;
      if( threadParam->io_flag != FINISHED ) threadParam->io_flag = READING;
    }
  }
  //********************************************************************************
  //

  fprintf(file_dump,"------------------------------\n");

  while( threadParam->io_flag != FINISHED ) {
    if( threadParam->io_flag == BUFFER_READY ) {
      numBytesRead += sizeBuffer;
      fprintf(file_dump,"Parent #%d: Read  %d/%d bytes. Examples: %f %f\n",
              readCounter, numBytesRead, numBytesAll, bufferIn[1], bufferIn[numElements-1] );
      readCounter += 1;
      if( threadParam->io_flag != FINISHED ) threadParam->io_flag = READING;
    }
    else {
      mini_sleep(10);
    }
  }

  // wait for thread to finish before continuing
  pthread_join( thread, NULL );


  fclose(file_dump);
  fclose(file_c2p);
  waitpid(pid, &status, 0); // clean up any children
  fprintf(stderr,"..finished\n");

  delete [] bufferIn;
  delete [] bufferOut;

  return 0;
}

Child program B:

#include <cstdio>

using namespace std;

int main() {

  int numElements = 10000;
  int totalSize = numElements * sizeof(float);
  float* buffer = new float[numElements];

  FILE* file_dump = fopen("child_dump","w");

  int counter = 0;
  int sizeRead = 0;
  do {
    sizeRead = fread( buffer, 1, totalSize, stdin);
    if( sizeRead > 0 ) {
      fprintf(file_dump,"Child  #%d: Read  %d bytes, examples:  %f  %f\n", counter, sizeRead, buffer[1], buffer[numElements-1]);
      for( int i = 0; i < numElements; i++ ) {
        buffer[i] += numElements;
      }
      int sizeWrite = fwrite( buffer, 1, totalSize, stdout);
      fprintf(file_dump,"Child  #%d: Wrote %d bytes, examples:  %f  %f\n", counter, sizeRead, buffer[1], buffer[numElements-1]);
      counter += 1;
    }
  } while( sizeRead > 0 );
  fprintf(file_dump,"Child is finished\n");
  fclose(file_dump);
  fclose(stdout);

  return 0;
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜