开发者

multiple processes via socketpair hang sometimes

I am trying to implement something that will give me a solution for:

       | --> cmd3 --> cmd4 -->
cmd2-->|
       | --> cmd5 --> cmd6 -->

and so on...

This is multiple executions of processes and pipe the results via chains of other's processes with threads, each commands chain should run in different thread. I choose socketpair for the implementation 开发者_高级运维of IPC, because pipe has a a bottleneck with the buffer size limit 64K. When I test the program with single chain - it's work as expected, but when I am running master command and the output of it I send via socketpair to read end of multiple processes in each thread - the program stuck (look like a deadlock)

Whats I am doing wrong:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <sys/socket.h>

typedef struct command {
    char** argv;
    int num_children;
    struct command* master_cmd;
    struct command** chains;
    struct command* next;
    int fd;
} command;

void be_child(command* cmd);
int execute_master_command_and_pipe_to_childs(command* cmd, int input);
int run_pipeline_sockets(command *cmd, int input);
void waitfor(int fd);

int main(int argc, char* argv[]) {

    handle_segfault();

    command* cmd1 = (command*) malloc(sizeof(command));
    command* cmd2 = (command*) malloc(sizeof(command));
    command* cmd3 = (command*) malloc(sizeof(command));
    command* cmd4 = (command*) malloc(sizeof(command));
    command* cmd5 = (command*) malloc(sizeof(command));
    command* cmd6 = (command*) malloc(sizeof(command));

    command* chains1[2];

    chains1[0] = cmd3;
    chains1[1] = cmd5;

    char* args1[] = { "cat", "/tmp/test.log", NULL };
    char* args3[] = { "sort", NULL, NULL };
    char* args4[] = { "wc", "-l", NULL };
    char* args5[] = { "wc", "-l", NULL };
    char* args6[] = { "wc", "-l", NULL };

    cmd1->argv = args1;
    cmd2->argv = NULL;
    cmd3->argv = args3;
    cmd4->argv = args4;
    cmd5->argv = args5;
    cmd6->argv = args6;

    cmd1->master_cmd = NULL;
    cmd1->next = NULL;
    cmd1->chains = NULL;
    cmd1->num_children = -1;

    cmd2->master_cmd = cmd1;
    cmd2->chains = chains1;
    cmd2->next = NULL;
    cmd2->num_children = 2;

    cmd3->master_cmd = NULL;
    cmd3->next = cmd4;
    cmd3->chains = NULL;
    cmd3->num_children = -1;

    cmd4->master_cmd = NULL;
    cmd4->next = NULL;
    cmd4->chains = NULL;
    cmd4->num_children = -1;

    cmd5->master_cmd = NULL;
    cmd5->next = cmd6;
    cmd5->chains = NULL;
    cmd5->num_children = -1;

    cmd6->master_cmd = NULL;
    cmd6->next = NULL;
    cmd6->chains = NULL;
    cmd6->num_children = -1;

    int rc = execute_master_command_and_pipe_to_childs(cmd2, -1);

    return 0;
}

int execute_master_command_and_pipe_to_childs(command* cmd, int input) {

    int num_children = cmd->num_children;
    int write_pipes[num_children];
    pthread_t threads[num_children];
    command* master_cmd = cmd->master_cmd;

    pid_t pid;
    int i;

    for (i = 0; i < num_children; i++) {
        int new_pipe[2];
        if (socketpair(AF_LOCAL, SOCK_STREAM, 0, new_pipe) < 0) {
            int errnum = errno;
            fprintf(STDERR_FILENO, "ERROR (%d: %s)\n", errnum,
                    strerror(errnum));
            return EXIT_FAILURE;
        }

        if (cmd->chains[i] != NULL) {
            cmd->chains[i]->fd = new_pipe[0];

            if (pthread_create(&threads[i], NULL, (void *) be_child,
                    cmd->chains[i]) != 0) {
                perror("pthread_create"), exit(1);
            }

            write_pipes[i] = new_pipe[1];
        } else {
            perror("ERROR\n");
        }
    }

    if (input != -1) {
        waitfor(input);
    }

    int pipefd = run_pipeline_sockets(master_cmd, input);

    int buffer[1024];

    int len = 0;
    while ((len = read(pipefd, buffer, sizeof(buffer))) != 0) {
        int j;
        for (j = 0; j < num_children; j++) {
            if (write(write_pipes[j], &buffer, len) != len) {
                fprintf(STDERR_FILENO, "Write failed (child %d)\n", j);
                exit(1);
            }

        }

    }

    close(pipefd);

    for (i = 0; i < num_children; i++) {
        close(write_pipes[i]);
    }

    for (i = 0; i < num_children; i++) {
        if (pthread_join(threads[i], NULL) != 0) {
            perror("pthread_join"), exit(1);
        }
    }

}

void waitfor(int fd) {
    fd_set rfds;
    struct timeval tv;
    int retval;

    FD_ZERO(&rfds);
    FD_SET(fd, &rfds);

    tv.tv_sec = 0;
    tv.tv_usec = 500000;

    retval = select(fd + 1, &rfds, NULL, NULL, &tv);

    if (retval == -1)
        perror("select()");
    else if (retval) {
        printf("Data is available now on: %d\n", fd);
    } else {
        printf("No data on: %d\n", fd);
        ///waitfor(fd);
    }
}

void be_child(command* cmd) {

    printf(
            "fd = %d , argv = %s , args = %s , next = %d , master_cmd = %d , next_chain = %d\n",
            cmd->fd, cmd->argv[0], cmd->argv[1], cmd->next, cmd->master_cmd,
            cmd->chains);

    waitfor(cmd->fd);

    int fd = run_pipeline_sockets(cmd, cmd->fd);

    waitfor(fd);

    int buffer[1024];

    int len = 0;

    while ((len = read(fd, buffer, sizeof(buffer))) != 0) {
        write(STDERR_FILENO, &buffer, len);
    }

    close(cmd->fd);
    close(fd);

}

int run_pipeline_sockets(command *cmd, int input) {
    int pfds[2] = { -1, -1 };
    int pid = -1;

    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, pfds) < 0) {
        int errnum = errno;
        fprintf(STDERR_FILENO, "socketpair failed (%d: %s)\n", errnum,
                strerror(errnum));
        return EXIT_FAILURE;
    }

    if ((pid = fork()) == 0) { /* child */
        if (input != -1) {
            dup2(input, STDIN_FILENO);
            close(input);
        }
        if (pfds[1] != -1) {
            dup2(pfds[1], STDOUT_FILENO);
            close(pfds[1]);
        }
        if (pfds[0] != -1) {
            close(pfds[0]);
        }
        execvp(cmd->argv[0], cmd->argv);
        exit(1);
    } else { /* parent */
        if (input != -1) {
            close(input);
        }
        if (pfds[1] != -1) {
            close(pfds[1]);
        }
        if (cmd->next != NULL) {
            run_pipeline_sockets(cmd->next, pfds[0]);
        } else {
            return pfds[0];
        }
    }
}

void segfault_sigaction(int signal, siginfo_t *si, void *arg) {
    printf("Caught segfault at address %p\n", si->si_addr);
    printf("Caught segfault errno %p\n", si->si_errno);
    exit(0);
}

void handle_segfault(void) {
    struct sigaction sa;

    memset(&sa, 0, sizeof(sigaction));
    sigemptyset(&sa.sa_mask);
    sa.sa_sigaction = segfault_sigaction;
    sa.sa_flags = SA_SIGINFO;

    sigaction(SIGSEGV, &sa, NULL);
}


I would come at this problem from a very different angle: rather than coming up with a large data structure to manage the pipe tree, and using threads (where an io blockage in a process may block in its threads) I would use only processes.

I also fail to see how a 64K buffer is your bottleneck when you're only using a 1K buffer.

2 simple functions should guide this: (error handling omitted for brevity, and using a pseudocodey parsecmd() function which turns a space separated string into an argument vector)

int mkproc(char *cmd, int outfd)
{
    Command c = parsecmd(cmd);
    int pipeleft[2];
    pipe(pipeleft);
    if(!fork()){
        close(pipeleft[1]);
        dup2(pipeleft[0], 0);
        dup2(outfd, 1);
        execvp(c.name, c.argv);
    }
    close(pipeleft[0]);
    return pipeleft[1];
}

Mkproc takes the fd it will write to, and returns what it will read from. This way chains are really easy to initalize:

int chain_in = mkproc("cat foo.txt", mkproc("sort", mkproc("wc -l", 1)));

the next is:

int mktree(char *cmd, int ofd0, ...)
{
    int piperight[2];
    pipe(piperight);

    int cmdin = mkproc(cmd, piperight[1]);
    close(piperight[1]);
    if(!fork()){
        uchar buf[4096];
        int n;

        while((n=read(piperight[0], buf, sizeof buf))>0){
            va_list ap;
            int fd;
            va_start(ap, ofd0);
            for(fd=ofd0; fd!=-1; fd=va_arg(ap, int)){
                write(fd, buf, n);
            }
            va_end(ap);
        }
    }
    return cmdin;
}

Between the two of these, it is very easy to construct trees of arbitrary complexity, as so:

int tree_in = mktree("cat foo.txt", 
                  mktree("rot13",
                      mkproc("uniq", mkproc("wc -l", 1)),
                      mkproc("wc -l", open("out.txt", O_WRONLY)), -1),
                  mkproc("sort", 2), -1);

This would output a sorted foo.txt to stderr, the number of lines in rot13'd foo.txt to out.txt, and the number of non-duplicate lines of rot13'd foo.txt to stdout.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜