开发者

Sending distributed chunks of a 2D array to the root process in MPI

I have a 2D array which is distributed across a MPI process grid (3 x 2 processes in this example). The values of the array are generated within the process which that chunk of the array is distributed to, and I want to gather all of those chunks together at the root process to display them.

So far, I have the code below. This generates a cartesian communicator, finds out the co-ordinates of the MPI process and works out how much of the array it should get based on that (as the array need not be a multiple of the cartesian grid size). I then create a new MPI derived datatype which will send the whole of that processes subarray as one item (that is, the stride, blocklength and count are different for each process, as each process has different sized arrays). However, when I come to gather the data together with MPI_Gather, I get a segmentation fault.

I think this is because I shouldn't be using the same datatype for sending and receiving in the MPI_Gather call. The data type is fine for sending the data, as it has the right count, stride and blocklength, but when it gets to the other end it'll need a very different derived datatype. I'm not sure how to calculate the parameters for this datatype - does anyone have any ideas?

Also, if I'm approaching this from completely the wrong angle then please let me know!

#include<stdio.h>
#include<array_alloc.h>
#include<math.h>
#include<mpi.h>

int main(int argc, char ** argv)
{
    int size, rank;
    int dim_size[2];
    int periods[2];
    int A = 2;
    int B = 3;
    MPI_Comm cart_comm;
    MPI_Datatype block_type;
    int coords[2];

    float **array;
    float **whole_array;

    int n = 10;
    int rows_per_core;
    int cols_per_core;
    int i, j;

    int x_start, x_finish;
    int y_start, y_finish;

    /* Initialise MPI */
    MPI_Init(&argc, &argv);

    /* Get the rank for this process, and the number of processes */
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (rank == 0)
    {
        /* If we're the master process */
        whole_array = alloc_2d_float(n, n);

        /* Initialise whole array to silly values */
        for (i = 0; i < n; i++)
        {
            for (j = 0; j < n; j++)
            {
                whole_array[i][j] = 9999.99;
            }
        }

        for (j = 0; j < n; j ++)
        {
            for (i = 0; i < n; i++)
            {
                printf("%f ", whole_array[j][i]);
            }
            printf("\n");
        }
    }

    /* Create the cartesian communicator */
    dim_size[0] = B;
    dim_size[1] = A;
    periods[0] = 1;
    periods[1] = 1;

    MPI_Cart_create(MPI_COMM_WORLD, 2, dim_size, periods, 1, &cart_comm);

    /* Get our co-ordinates within that communicator */
    MPI_Cart_coords(cart_comm, rank, 2, coords);

    rows_per_core = ceil(n / (float) A);
    cols_per_core = ceil(n / (float) B);

    if (coords[0] == (B - 1))
    {
        /* We're at the far end of a row */
        cols_per_core = n - (cols_per_core * (B - 1));
    }
    if (coords[1] == (A - 1))
    {
        /* We're at the bottom of a col */
        rows_per_core = n - (rows_per_core * (A - 1));
    }

    printf("X: %d, Y: %d, RpC: %d, CpC:开发者_如何学JAVA %d\n", coords[0], coords[1], rows_per_core, cols_per_core);

    MPI_Type_vector(rows_per_core, cols_per_core, cols_per_core + 1, MPI_FLOAT, &block_type);
    MPI_Type_commit(&block_type);

    array = alloc_2d_float(rows_per_core, cols_per_core);

    if (array == NULL)
    {
        printf("Problem with array allocation.\nExiting\n");
        return 1;
    }

    for (j = 0; j < rows_per_core; j++)
    {
        for (i = 0; i < cols_per_core; i++)
        {
            array[j][i] = (float) (i + 1);
        }
    }

    MPI_Barrier(MPI_COMM_WORLD);

    MPI_Gather(array, 1, block_type, whole_array, 1, block_type, 0, MPI_COMM_WORLD);

    /*
    if (rank == 0)
    {
        for (j = 0; j < n; j ++)
        {
            for (i = 0; i < n; i++)
            {
                printf("%f ", whole_array[j][i]);
            }
            printf("\n");
        }
    }
    */
    /* Close down the MPI environment */
    MPI_Finalize();
}

The 2D array allocation routine I have used above is implemented as:

float **alloc_2d_float( int ndim1, int ndim2 ) {

  float **array2 = malloc( ndim1 * sizeof( float * ) );

  int i;

  if( array2 != NULL ){

    array2[0] = malloc( ndim1 * ndim2 * sizeof( float ) );

    if( array2[ 0 ] != NULL ) {

      for( i = 1; i < ndim1; i++ )
    array2[i] = array2[0] + i * ndim2;

    }

    else {
      free( array2 );
      array2 = NULL;
    }

  }

  return array2;

}


This is a tricky one. You're on the right track, and yes, you will need different types for sending and receiving.

The sending part is easy -- if you're sending the whole subarray array, then you don't even need the vector type; you can send the entire (rows_per_core)*(cols_per_core) contiguous floats starting at &(array[0][0]) (or array[0], if you prefer).

It's the receiving that's the tricky part, as you've gathered. Let's start with the simplest case -- assuming that everything divides evenly so all the blocks have the same size. Then you can use the very helfpul MPI_Type_create_subarray (you could always cobble this together with vector types, but for higher-dimensional arrays this becomes tedious, as you need to create 1 intermediate type for each dimension of the array except the last...

Also, rather than hardcoding the decomposition, you can use the also-helpful MPI_Dims_create to create an as-square-as-possible decomposition of your ranks. Note that this doesn't necessarily have anything to do with MPI_Cart_create, although you can use it for the requested dimensions. I'm going to skip the cart_create stuff here, not because it's not useful, but because I want to focus on the gather stuff.

So if everyone has the same size of array, then root is receiving the same data type from everyone, and one can use a very simple subarray type to get their data:

MPI_Type_create_subarray(2, whole_array_size, sub_array_size, starts,
                         MPI_ORDER_C, MPI_FLOAT, &block_type);
MPI_Type_commit(&block_type);

where sub_array_size[] = {rows_per_core, cols_per_core}, whole_array_size[] = {n,n}, and for here, starts[]={0,0} - eg, we'll just assume that everything starts the start. The reason for this is that we can then use Gatherv to explicitly set the displacements into the array:

for (int i=0; i<size; i++) {
    counts[i] = 1;   /* one block_type per rank */

    int row = (i % A);
    int col = (i / A);
    /* displacement into the whole_array */
    disps[i] = (col*cols_per_core + row*(rows_per_core)*n);
}

MPI_Gatherv(array[0], rows_per_core*cols_per_core, MPI_FLOAT,
            recvptr, counts, disps, resized_type, 0, MPI_COMM_WORLD);

So now everyone sends their data in one chunk, and it's received into the type into the right part of the array. For this to work, I've resized the type so that it's extent is just one float, so the displacements can be calculated in that unit:

MPI_Type_create_resized(block_type, 0, 1*sizeof(float), &resized_type);
MPI_Type_commit(&resized_type);

The whole code is below:

#include<stdio.h>
#include<stdlib.h>
#include<math.h>
#include<mpi.h>

float **alloc_2d_float( int ndim1, int ndim2 ) {
    float **array2 = malloc( ndim1 * sizeof( float * ) );
    int i;

    if( array2 != NULL ){
        array2[0] = malloc( ndim1 * ndim2 * sizeof( float ) );
        if( array2[ 0 ] != NULL ) {
            for( i = 1; i < ndim1; i++ )
                array2[i] = array2[0] + i * ndim2;
        }

        else {
            free( array2 );
            array2 = NULL;
        }
    }
    return array2;
}

void free_2d_float( float **array ) {
    if (array != NULL) {
        free(array[0]);
        free(array);
    }
    return;
}

void init_array2d(float **array, int ndim1, int ndim2, float data) {
    for (int i=0; i<ndim1; i++) 
        for (int j=0; j<ndim2; j++)
            array[i][j] = data;
    return;
}

void print_array2d(float **array, int ndim1, int ndim2) {
    for (int i=0; i<ndim1; i++) {
        for (int j=0; j<ndim2; j++) {
            printf("%6.2f ", array[i][j]);
        }
        printf("\n");
    }
    return;
}


int main(int argc, char ** argv)
{
    int size, rank;
    int dim_size[2];
    int periods[2];
    MPI_Datatype block_type, resized_type;

    float **array;
    float **whole_array;
    float *recvptr;

    int *counts, *disps;

    int n = 10;
    int rows_per_core;
    int cols_per_core;
    int i, j;

    int whole_array_size[2];
    int sub_array_size[2];
    int starts[2];
    int A, B;

    /* Initialise MPI */
    MPI_Init(&argc, &argv);

    /* Get the rank for this process, and the number of processes */
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (rank == 0)
    {
        /* If we're the master process */
        whole_array = alloc_2d_float(n, n);
        recvptr = &(whole_array[0][0]);

        /* Initialise whole array to silly values */
        for (i = 0; i < n; i++)
        {
            for (j = 0; j < n; j++)
            {
                whole_array[i][j] = 9999.99;
            }
        }

        print_array2d(whole_array, n, n);
        puts("\n\n");
    }

    /* Create the cartesian communicator */
    MPI_Dims_create(size, 2, dim_size);
    A = dim_size[1];
    B = dim_size[0];
    periods[0] = 1;
    periods[1] = 1;

    rows_per_core = ceil(n / (float) A);
    cols_per_core = ceil(n / (float) B);
    if (rows_per_core*A != n) {
        if (rank == 0) fprintf(stderr,"Aborting: rows %d don't divide by %d evenly\n", n, A);
        MPI_Abort(MPI_COMM_WORLD,1);
    }
    if (cols_per_core*B != n) {
        if (rank == 0) fprintf(stderr,"Aborting: cols %d don't divide by %d evenly\n", n, B);
        MPI_Abort(MPI_COMM_WORLD,2);
    }

    array = alloc_2d_float(rows_per_core, cols_per_core);
    printf("%d, RpC: %d, CpC: %d\n", rank, rows_per_core, cols_per_core);

    whole_array_size[0] = n;             
    sub_array_size  [0] = rows_per_core; 
    whole_array_size[1] = n;
    sub_array_size  [1] = cols_per_core;
    starts[0] = 0; starts[1] = 0;

    MPI_Type_create_subarray(2, whole_array_size, sub_array_size, starts, 
                             MPI_ORDER_C, MPI_FLOAT, &block_type);
    MPI_Type_commit(&block_type);
    MPI_Type_create_resized(block_type, 0, 1*sizeof(float), &resized_type);
    MPI_Type_commit(&resized_type);

    if (array == NULL)
    {
        printf("Problem with array allocation.\nExiting\n");
        MPI_Abort(MPI_COMM_WORLD,3);
    }

    init_array2d(array,rows_per_core,cols_per_core,(float)rank);

    counts = (int *)malloc(size * sizeof(int));
    disps  = (int *)malloc(size * sizeof(int));
    /* note -- we're just using MPI_COMM_WORLD rank here to
     * determine location, not the cart_comm for now... */
    for (int i=0; i<size; i++) {
        counts[i] = 1;   /* one block_type per rank */

        int row = (i % A);
        int col = (i / A);
        /* displacement into the whole_array */
        disps[i] = (col*cols_per_core + row*(rows_per_core)*n);
    }

    MPI_Gatherv(array[0], rows_per_core*cols_per_core, MPI_FLOAT, 
                recvptr, counts, disps, resized_type, 0, MPI_COMM_WORLD);

    free_2d_float(array);
    if (rank == 0) print_array2d(whole_array, n, n);
    if (rank == 0) free_2d_float(whole_array);
    MPI_Finalize();
}

Minor thing -- you don't need the barrier before the gather. In fact, you hardly ever really need a barrier, and they're expensive operations for a few reasons, and can hide problems -- my rule of thumb is to never, ever, use barriers unless you know exactly why the rule needs to be broken in this case. In this case in particular, the collective gather routine does exactly the same syncronization as the barrier, so just use that.

Now, moving onto the harder stuff. If things don't divide evenly, you have a few options. The simplest, though not necessarily the best, is just to pad the array so that it does divide evenly, even if just for this operation.

If you can arrange it so that the number of columns does divide evenly, even if the number of rows doesn't, then you can still use the gatherv and create a vector type for each part of the row, and gatherv that the appropriate number of rows from each processor. That would work fine.

If you definately have the case where neither can be counted on to divide, and you can't pad data for sending, then there are three sub-options I can see:

  • As susterpatt suggests, do point-to-point. For small numbers of tasks, this is fine, but as it gets larger, this will be significantly less efficient than the collective operations.
  • Create a communicator consisting of all the processors not on the outer edges, and use exactly the code above to gather their code; and then point-to-point the edge tasks' data.
  • Don't gather to process 0 at all; use the Distributed array type to describe the layout of the array, and use MPI-IO to write all the data to a file; once that's done, you can have process zero display the data in some way if you like.


It looks like the first argument to you MPI_Gather call should probably be array[0], and not array.

Also, if you need to get different amounts of data from each rank, you might be better off using MPI_Gatherv.

Finally, not that gathering all your data in once place to do output is not scalable in many circumstances. As the amount of data grows, eventually, it will exceed the memory available to rank 0. You might be much better off distributing the output work (if you are writing to a file, using MPI IO or other library calls) or doing point-to-point sends to rank 0 one at a time, to limit the total memory consumption.

On the other hand, I would not recommend coordinating each of your ranks printing to standard output, one after another, because some major MPI implementations don't guarantee that standard output will be produced in order. Cray's MPI, in particular, jumbles up standard output pretty thoroughly if multiple ranks print.


Accordding to this (emphasis by me):

The type-matching conditions for the collective operations are more strict than the corresponding conditions between sender and receiver in point-to-point. Namely, for collective operations, the amount of data sent must exactly match the amount of data specified by the receiver. Distinct type maps between sender and receiver are still allowed.

Sounds to me like you have two options:

  1. Pad smaller submatrices so that all processes send the same amount of data, then crop the matrix back to its original size after the Gather. If you're feeling adventurous, you might try defining the receiving typemap so that paddings are automatically overwritten during the Gather operation, thus eliminating the need for the crop afterwards. This could get a bit complicated though.
  2. Fall back to point-to-point communication. Much more straightforward, but possibly higher communication costs.

Personally, I'd go with option 2.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜