开发者

reading from a pipe in c

Scenario: Say I have 8 files that I want to sort all the data of numbers it has in order from least to greatest. Only leaf proceses can sort all the numbers that a file contains. These leaf processes must send the sorted data to a parent process via pipes. This parent process will compare the data is receives and send which ever number is smaller to the next process up. It will do this until all number in the pipe is empty.

So think of it as a tree. We have one master process. With 8 files to sort, the Master process will spawn 2 processes off of it (a left and a right). Those two new processes will spawn their own processes. This will happen until there are 8 leaf processes at the bottom. Internal nodes can only hold onto one number. These will pass their number along a series of pipes until they reach the master process. The master process will output its piped contents to a file.

I've included the code here (as it is a bit lengthy but straightforward).

This works if I have 2 files to sort. So we have 1 master process and then two children. The two children sort their file's numbers and then pass them up. The master process then prints out the data in order from the pipes. However if I add some complexity (4 files), the leaf processes still send their data up, however when the master process begins to read the from the internal nodes pipes, it thinks it is empty and finishes the program without any data.

Any idea why the master process is thinking that its left and right pipes are empty? Like I said, works great when there is one parent and 2 children. Anymore processes and it fails. (assuming that processing will happen in powers of开发者_开发问答 2).

NOTE: perror is being used for debugging purposes.

full program here [very messy as I have been doing a lot with it but it will compile.


The updated code in Pastebin is not a compilable function - let alone the complete program. That makes it hard to say what's really wrong.

However, one of the problems is in the code fragment:

if (pipe(upPipe) < 0 || pipe(leftPipe) < 0 || pipe(rightPipe) < 0)
    ...error exit...
if ((leftPID = fork()) < 0)
    ...error exit...

if(leftPID == 0){
    fMax = ((fMax)/2);
    dup2(leftPipe[WRITE], upPipe[WRITE]);
    pipe(leftPipe);
    pipe(rightPipe);

The call to dup2() is odd; you carefully map the write channel of the left pipe to the write channel of the up pipe.

The two pipe() calls after the dup2() fairly promptly screw up everything in the left child, opening 4 more file descriptors but losing the previous values stored in leftPipe and rightPipe.

You need to make your problem statement clearer. I cannot fathom from what you've got what you're supposed to have. There's a call to convertToInt() which takes no arguments and returns no value; what on earth is that doing? There's a call to freeMem(); it is not clear what that does.

z.c:42: error: ‘numberChar’ undeclared (first use in this function)
z.c:42: error: ‘sizeNumbers’ undeclared (first use in this function)
z.c:43: warning: implicit declaration of function ‘readFile’
z.c:43: error: ‘fileNames’ undeclared (first use in this function)
z.c:45: warning: implicit declaration of function ‘convertToInt’
z.c:46: error: ‘i’ undeclared (first use in this function)
z.c:46: error: ‘numbs’ undeclared (first use in this function)
z.c:47: error: ‘numbers’ undeclared (first use in this function)
z.c:48: warning: implicit declaration of function ‘freeMem’

Sorry, your question is unanswerable because you are not giving us:

  1. The accurate requirements.
  2. The code you've actually got compiling.

Your code does not have a good clean break up of the functions. Do you use a VCS (version control system - such as git)? If not, you should. I made the changed version below - which is essentially a complete rewrite - in 9 check-ins, and should probably have made more smaller check-ins than that. But using a VCS was crucial to me; it allowed me to make changes with confidence, knowing I would not lose anything valuable. And I didn't have to comment code out; I removed the stuff I didn't want. The solution below is 261 lines; the original was about 687 lines in total, including a lot of commented out code; when I'd finished stripping out the comments, etc, it came down to 469 lines.

When I got your code running (and reporting on which files were being opened by each child), I found that there were 2 processes opening each of files 2 and 3 (and since the data files didn't exist at the time, they failed at that point).

The revised code has an almost clean structure; the odd bit is the 'convertToString()' phase which reads binary integers off a pipe and converts them to ASCII output again. It works; I'm not convinced it is elegant. Instead of using an array of hard-coded file names, it takes an arbitrary list of file names from the command line (it does not have to be 8; it has been tested with 0 through 8, and I've no reason to think it won't handle 20 or more). I did a fair amount of testing with:

./piped-merge-sort [1-8]

There is copious diagnostic output. I've used two functions that I find vastly helpful in my work - I have them packaged up with some other related code in a more complex package, but the simple versions of err_error() and err_remark() functions really help me. Note that these versions report the PID of the reporting process for each call. They're also careful to pre-format the message into a string and then write the string in one print to standard error; otherwise, I was getting a lot of interleaved output which was confusing at best.

'Nuff said - here's the code:

#include <assert.h>
#include <errno.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <unistd.h>

/* Error reporting */
static void err_vremark(char *fmt, va_list args)
{
    char buffer[256];
    int errnum = errno;
    int buflen = snprintf(buffer, sizeof(buffer), "%d: ", (int)getpid());
    buflen += vsnprintf(buffer + buflen, sizeof(buffer) - buflen, fmt, args);
    if (errnum != 0)
        buflen += snprintf(buffer + buflen, sizeof(buffer) - buflen,
                           ": errno = %d (%s)", errnum, strerror(errnum));
    fprintf(stderr, "%s\n", buffer);
}

static void err_error(char *fmt, ...)
{
    va_list args;
    va_start(args, fmt);
    err_vremark(fmt, args);
    va_end(args);
    exit(1);
}

static void err_remark(char *fmt, ...)
{
    va_list args;
    va_start(args, fmt);
    err_vremark(fmt, args);
    va_end(args);
}

enum { READ = 0 };
enum { WRITE = 1 };
enum { BUFFER_SIZE = 256 };

static int *a_data = 0;
static int  a_used = 0;
static int  a_size = 0;

void readFile(char const *fileName);
void freeMem(void);
void sortArray(void);
int  intcmp(void const *n1, void const *n2);

static void sortMergeFiles(int fd, int number, char **names);
static void sortOneFile(int fd, const char *file);
static void convertToString(int fd, FILE *fp);

int main(int argc, char **argv)
{
    int m_pipe[2];
    pid_t pid;
    if (pipe(m_pipe) < 0)
        err_error("Failed to create master pipe");
    if ((pid = fork()) < 0)
        err_error("Failed to fork master");
    else if (pid == 0)
    {
        close(m_pipe[READ]);
        sortMergeFiles(m_pipe[WRITE], argc - 1, &argv[1]);
        close(m_pipe[WRITE]);
    }
    else
    {
        close(m_pipe[WRITE]);
        convertToString(m_pipe[READ], stdout);
        close(m_pipe[READ]);
    }
    return 0;
}

static void convertToString(int fd, FILE *fp)
{
    int value;
    while (read(fd, &value, sizeof(int)) == sizeof(int))
        fprintf(fp, "%d\n", value);
}

static int readInteger(int fd, int *value)
{
    if (read(fd, value, sizeof(int)) != sizeof(int))
        return EOF;
    return 0;
}

static void writeInteger(int fd, int value)
{
    if (write(fd, &value, sizeof(int)) != sizeof(int))
        err_error("Failed to write integer to fd %d", fd);
}

static void mergeFiles(int fd_in1, int fd_in2, int fd_out)
{
    int value_1;
    int value_2;
    int eof_1 = readInteger(fd_in1, &value_1);
    int eof_2 = readInteger(fd_in2, &value_2);

    while (eof_1 != EOF && eof_2 != EOF)
    {
        err_remark("v1: %d; v2: %d", value_1, value_2);
        if (value_1 <= value_2)
        {
            writeInteger(fd_out, value_1);
            eof_1 = readInteger(fd_in1, &value_1);
        }
        else
        {
            writeInteger(fd_out, value_2);
            eof_2 = readInteger(fd_in2, &value_2);
        }
    }

    while (eof_1 != EOF)
    {
        err_remark("v1: %d", value_1);
        writeInteger(fd_out, value_1);
        eof_1 = readInteger(fd_in1, &value_1);
    }

    while (eof_2 != EOF)
    {
        err_remark("v2: %d", value_2);
        writeInteger(fd_out, value_2);
        eof_2 = readInteger(fd_in2, &value_2);
    }
}

static void sortMergeFiles(int fd, int number, char **names)
{
    assert(number >= 0);
    if (number == 0)
        return;
    else if (number == 1)
        sortOneFile(fd, names[0]);
    else
    {
        err_remark("Non-Leaf: processing %d files (%s .. %s)", number, names[0], names[number-1]);
        int mid = number / 2;
        int l_pipe[2];
        int r_pipe[2];
        pid_t l_pid;
        pid_t r_pid;
        if (pipe(l_pipe) < 0 || pipe(r_pipe) < 0)
            err_error("Failed to create pipes");
        if ((l_pid = fork()) < 0)
            err_error("Failed to fork left child");
        else if (l_pid == 0)
        {
            close(l_pipe[READ]);
            close(r_pipe[READ]);
            close(r_pipe[WRITE]);
            sortMergeFiles(l_pipe[WRITE], mid, names);
            close(l_pipe[WRITE]);
            exit(0);
        }
        else if ((r_pid = fork()) < 0)
            err_error("Failed to fork right child");
        else if (r_pid == 0)
        {
            close(r_pipe[READ]);
            close(l_pipe[READ]);
            close(l_pipe[WRITE]);
            sortMergeFiles(r_pipe[WRITE], number - mid, names + mid);
            close(r_pipe[WRITE]);
            exit(0);
        }
        else
        {
            close(l_pipe[WRITE]);
            close(r_pipe[WRITE]);
            mergeFiles(l_pipe[READ], r_pipe[READ], fd);
            close(l_pipe[READ]);
            close(r_pipe[READ]);
            err_remark("Non-Leaf: finished %d files (%s .. %s)", number, names[0], names[number-1]);
        }
    }
}

static void addNumberToArray(int number)
{
    assert(a_used >= 0 && a_used <= a_size);
    if (a_used == a_size)
    {
        int  n_size = (a_size + 1) * 2;
        int *n_data = realloc(a_data, sizeof(*n_data) * n_size);
        if (n_data == 0)
            err_error("Failed to allocate space for %d numbers", n_size);
        a_data = n_data;
        a_size = n_size;
    }
    a_data[a_used++] = number;
}

/* Could be compressed to write(fd, a_data, a_used * sizeof(int)); */
/* Arguably should check for write errors - but not SIGPIPE */
static void writeArray(int fd)
{
    for (int i = 0; i < a_used; i++)
    {
        err_remark("Write: %d", a_data[i]);
        write(fd, &a_data[i], sizeof(int));
    }
}

void readFile(char const *fileName)
{
    char buffer[BUFFER_SIZE];
    FILE *fp;
    fp = fopen(fileName, "r");

    if (fp == NULL)
        err_error("Failed to open file %s for reading", fileName);

    while (fgets(buffer, sizeof(buffer), fp) != NULL)
    {
        char *nl = strchr(buffer, '\n');
        if (nl != 0)
            *nl = '\0';
        err_remark("Line: %s", buffer);
        addNumberToArray(atoi(buffer));
    }

    fclose(fp);
}

int intcmp(const void *n1, const void *n2)
{
    const int num1 = *(const int *) n1;
    const int num2 = *(const int *) n2;
    return (num1 < num2) ? -1 : (num1 > num2);
}

void sortArray(void)
{
    qsort(a_data, a_used, sizeof(int), intcmp);
}

void freeMem(void)
{
    free(a_data);
}

static void sortOneFile(int fd, const char *file)
{
    err_remark("Leaf: processing file %s", file);
    readFile(file);
    sortArray();
    writeArray(fd);
    freeMem();
    err_remark("Leaf: finished file %s", file);
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜