开发者

MPI Asynchronous/One-Sided Communication

I have a situation similar to the code below: worker processes work on a subset of data and must send an unknown amount of data back to the master. Is it possible to have the master wait and receive an unknown number of sends from the worker processes? Is there a way to do it using one-sided communication? Thanks in advance!

#include <errno.h>
#include <mpi.h>
#include <stdio.h>开发者_高级运维;
#include <stdlib.h>
#include <time.h>

/*
    sample run/output:
    $mpirun -np 5 practice.exe
    @[1]: i=30
    @[2]: i=0
    @[2]: i=75
    @[4]: i=40
    @[4]: i=55
    @[3]: i=85
    @[3]: i=65
*/
int main(int argc, char *argv[])
{
    int i, rank, size, np, nw, num;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    nw = np -1;

    srand(time(NULL)*rank);

    if (rank > 0)
    {
        for (i=(rank-1); i<(nw*10); i+=nw)
        {
            num = rand() % 100;
            if (num % 5 == 0)
            {
                printf("@[%d]: i=%d\n", rank, num);
                // SEND num TO MASTER
            }
        }
    }
    else
    {
        // RECEIVE num FROM WORKER
    }

    MPI_Finalize();

    return EXIT_SUCCESS;
}


Sure, there's lots of ways to do this, but it doesn't really have anything to do with asynchronous communications. You can do it with 1-sided communications, but even that has its own problems with this (you still have to be able to guess how much total memory will be needed for the data).

One way to do it is simply to figure out how much data you have, send that ahead to the master so it knows how many messages to receive, and then send your data one at a time:

#include <errno.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

#define MAXPERWORKER 10
#define TAG_NUM_INCOMING 1
#define TAG_DATA 2
int main(int argc, char *argv[])
{
    int i, rank, size, np, nw, num;
    int mynums[MAXPERWORKER], numcount, total;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    nw = np -1;

    srand(time(NULL)*rank);

    if (rank > 0)
    {
        numcount = 0;
        total    = 0;
        for (i=(rank-1); i<(nw*10); i+=nw)
        {
            num = rand() % 100;
            if (num % 3 == 0)
            {
                printf("@[%d]: i=%d\n", rank, num);
                mynums[numcount] = num;
                numcount++;
                total += num;
            }

        }
        /* of course, in this case we could just
         * do this in one message, but..
         */
        MPI_Send(&numcount, 1, MPI_INT, 0, TAG_NUM_INCOMING, MPI_COMM_WORLD);
        for (i=0; i<numcount; i++)
            MPI_Send(&(mynums[i]), 1, MPI_INT, 0, TAG_DATA, MPI_COMM_WORLD);

        printf("@[%d]: Total of all nums is %d\n", rank, total);
    }
    else
    {
        int *totals = malloc(sizeof(int)*nw);
        int *counts = malloc(sizeof(int)*nw);
        int *sofar  = malloc(sizeof(int)*nw);
        int **data = malloc(sizeof(int *)*nw);
        int rcv;
        int totalcounts;
        int j;
        int workernum;
        MPI_Status status;

        for (i=0; i<nw; i++) {
            sofar[i] = 0;
            totals[i]= 0;
        }

        /* get number of incoming messages */
        for (i=0; i<nw; i++) {
            MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, TAG_NUM_INCOMING, MPI_COMM_WORLD, &status);

            workernum = status.MPI_SOURCE-1;
            counts[workernum] = rcv;
            totalcounts += rcv;
            data[workernum] = malloc(sizeof(int)*rcv);
        }

        /* get real data */
        for (i=0; i<totalcounts; i++) {
            MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, TAG_DATA, MPI_COMM_WORLD, &status);
            workernum = status.MPI_SOURCE-1;
            data[ workernum ][ sofar[workernum]++ ] = rcv;
            totals[ workernum ] += rcv;
        }

        /* print results */
        for (i=0; i<nw; i++) {
            printf("From [%2d]:", i+1);
            for (j=0; j<counts[i]; j++)
                printf("%3d ", data[i][j]);
            printf("| %3d\n", totals[i]);
        }

        for (i=0; i<nw; i++)
            free(data[i]);
        free(data);
        free(totals);
        free(counts);
        free(sofar);
    }

    MPI_Finalize();

    return EXIT_SUCCESS;
}

Running this on 4 processes, I get:

$ mpirun -np 4 ./masterworker1

@[1]: i=39
@[1]: i=81
@[3]: i=9
@[3]: i=45
@[3]: i=0
@[3]: i=57
@[3]: Total of all nums is 111
@[1]: Total of all nums is 120
From [ 1]: 39  81 | 120
From [ 2]: 24   6  39 |  69
From [ 3]:  9  45   0  57 | 111
@[2]: i=24
@[2]: i=6
@[2]: i=39
@[2]: Total of all nums is 69

However, this might not be feasible -- you might not want to buffer all your data like this (and if you could, you could just send it in one message).

Another approach is to send data, and then send a special message when you're done sending data, and the master just keeps recieving until it's heard one of these "Done" messages from each worker:

#include <errno.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

#define MAXPERWORKER 10
#define TAG_DATA 2
#define TAG_DONE 1
int main(int argc, char *argv[])
{
    int i, rank, size, np, nw, num;
    int mynums[MAXPERWORKER], numcount, total;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    nw = np -1;

    srand(time(NULL)*rank);

    if (rank > 0)
    {
        numcount = 0;
        total    = 0;
        for (i=(rank-1); i<(nw*10); i+=nw)
        {
            num = rand() % 100;
            if (num % 3 == 0)
            {
                printf("@[%d]: i=%d\n", rank, num);
                total += num;
                MPI_Send(&num, 1, MPI_INT, 0, TAG_DATA, MPI_COMM_WORLD);
            }

        }
        MPI_Send(&num, 1, MPI_INT, 0, TAG_DONE, MPI_COMM_WORLD);

        printf("@[%d]: Total of all nums is %d\n", rank, total);
    }
    else
    {
        int *totals = malloc(sizeof(int)*nw);
        int *counts = malloc(sizeof(int)*nw);
        int **data = malloc(sizeof(int *)*nw);
        int rcv;
        int j;
        int workernum;
        int stillsending;
        MPI_Status status;

        for (i=0; i<nw; i++) {
            totals[i]= 0;
            counts[i]= 0;
            data[i] = malloc(sizeof(int)*MAXPERWORKER);
        }
        stillsending = nw;

        /* get data */
        while (stillsending > 0) {
            MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

            workernum = status.MPI_SOURCE-1;
            if (status.MPI_TAG == TAG_DONE) {
                stillsending--;
            } else if (status.MPI_TAG == TAG_DATA) {
                data[workernum][counts[workernum]] = rcv;
                totals[workernum] += rcv;
                counts[workernum]++;
            }
        }

        /* print results */
        for (i=0; i<nw; i++) {
            printf("From [%2d]:", i+1);
            for (j=0; j<counts[i]; j++)
                printf("%3d ", data[i][j]);
            printf("| %3d\n", totals[i]);
        }

        for (i=0; i<nw; i++)
            free(data[i]);
        free(data);
        free(totals);
        free(counts);
    }

    MPI_Finalize();

    return EXIT_SUCCESS;
}

Again on 4 tasks, I get:

$ mpirun -np 4 ./masterworker2

@[1]: i=63
@[1]: i=99
@[1]: i=60
@[1]: i=69
@[1]: i=21
@[1]: i=48
@[1]: i=24
@[1]: Total of all nums is 384
@[2]: i=39
@[2]: i=84
@[2]: i=63
@[2]: Total of all nums is 186
@[3]: i=3
@[3]: i=51
@[3]: i=36
@[3]: Total of all nums is 90
From [ 1]: 63  99  60  69  21  48  24 | 384
From [ 2]: 39  84  63 | 186
From [ 3]:  3  51  36 |  90

Note in both of these cases I've relied on some MAXPERWORKER size array to preallocate things; you don't really need this though, you could malloc an array and realloc as necessary, or use a std::vector thing if you're willing to use C++.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜