MPI: Canceling Non-blocking Send
I'm using Open MPI library to implement the following algorithm: we have two processes p1
and p2
. They're both executing some iterations and at the end of each iteration, they communicate their results. The problem is that the execution is not necessarily balanced, so p1
may execute 10 iterations in the time p2
开发者_StackOverflow社区executes 1. Even though, I want p2
to read the latest result from the last iteration executed by p1
.
Thus, my idea is that p1
sends its results at each iteration. But, before sending the result from an iteration i
, it should check if p2
actually read the information from iteration i-1
. If not, it should cancel the previous send so that when p2
reads from p1
, it will read the most recent result.
Unfortunately, I'm not sure how to do that. I've tried using MPI_Cancel, as in the following code:
int main (int argc, char *argv[]){
int myrank, numprocs;
MPI_Status status;
MPI_Request request;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if(myrank == 0){
int send_buf = 1, flag;
MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD,
&request);
MPI_Cancel(&request);
MPI_Wait(&request, &status);
MPI_Test_cancelled(&status, &flag);
if (flag) printf("Send cancelled\n");
else printf("Send NOT cancelled\n");
send_buf = 2;
MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD,
&request);
}
else {
sleep(5);
int msg;
MPI_Recv(&msg, 1, MPI_INT, 0, 123,
MPI_COMM_WORLD, &status);
printf("%d\n", msg);
}
MPI_Finalize();
return 0;
}
But when I execute, it says that the send could not be cancelled and p2
prints 1 instead of 2.
I'd like to know if there's any way to achieve what I'm proposing or if there's an alternative to code the behaviour between p1
and p2
.
I would reverse the control of communications. Instead of p1
sending unnecessary messages that it has to cancel, p2
should signal that it is ready to receive a message, and p1
would send only then. In the meantime, p1
simply overwrites its send buffer with the latest results.
In (untested) code:
if ( rank == 0 )
{
int ready;
MPI_Request p2_request;
MPI_Status p2_status;
// initial request
MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request);
for (int i=0; true; i++)
{
sleep(1);
MPI_Test(&p2_request, &ready, &p2_status);
if ( ready )
{
// blocking send: p2 is ready to receive
MPI_Send(&i, 1, MPI_INT, 1, 123, MPI_COMM_WORLD);
// post new request
MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request);
}
}
}
else
{
int msg;
MPI_Status status;
while (true)
{
sleep(5);
// actual message content doesn't matter, just let p1 know we're ready
MPI_Send(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD);
// receive message
MPI_Recv(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD, &status);
}
}
Now like I said, that's untested code, but you can probably see what I'm getting at there. MPI_Cancel
should only be used when things go horribly wrong: no message should be cancelled during normal execution.
Another approach entirely would be to use MPI one-sided communications. Note however that doing the passive communications, which is what you really want here, is fairly tricky (although pairwise, with mpi_win_post
and mpi_win_start
is easier) and that the one-sided stuff will hopefully all change in MPI-3, so I don't know how far down that road I would advise you to go.
More directly related to your first attempt here: rather than cancelling messages (which, as suggested above, is pretty drastic), it's probably much easier just to go through all queued messages (MPI guarantees that messages will not overtake each other - the only caveat is if you are using MPI_THREAD_MULTIPLE
and having multiple threads sending within one MPI task in which case order is poorly defined):
#include <stdio.h>
#include <mpi.h>
#include <stdlib.h>
#include <unistd.h>
#include <math.h>
void compute() {
const int maxusecs=500;
unsigned long sleepytime=(unsigned long)round(((float)rand()/RAND_MAX)*maxusecs);
usleep(sleepytime);
}
int main(int argc, char** argv)
{
int rank, size, i;
int otherrank;
const int niters=10;
const int tag=5;
double newval;
double sentvals[niters+1];
double othernewval;
MPI_Request reqs[niters+1];
MPI_Status stat;
int ready;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (size != 2) {
fprintf(stderr,"This assumes 2 processes\n");
MPI_Finalize();
exit(-1);
}
otherrank = (rank == 0 ? 1 : 0);
srand(rank);
compute();
newval = rank * 100. + 0;
sentvals[0] = newval;
MPI_Isend(&(sentvals[0]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0]));
MPI_Recv (&othernewval, 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat);
for (i=0; i<niters; i++) {
MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat);
while (ready) {
MPI_Recv(&othernewval, 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat);
printf("%s[%d]: Reading queued data %lf:\n",
(rank == 0 ? "" : "\t\t\t\t"), rank, othernewval);
MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat);
}
printf("%s[%d]: Got data %lf, computing:\n",
(rank == 0 ? "" : "\t\t\t\t"), rank, othernewval);
compute();
/* update my data */
newval = rank * 100. + i + 1;
printf("%s[%d]: computed %lf, sending:\n",
(rank == 0 ? "" : "\t\t\t\t"), rank, newval);
sentvals[i+1] = newval;
MPI_Isend(&(sentvals[i+1]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0]));
}
MPI_Finalize();
return 0;
}
Running this gives you (notice that just because data is sent doesn't mean its received at time of printing):
[0]: Got data 100.000000, computing:
[1]: Got data 0.000000, computing:
[0]: computed 1.000000, sending:
[0]: Got data 100.000000, computing:
[1]: computed 101.000000, sending:
[1]: Got data 0.000000, computing:
[0]: computed 2.000000, sending:
[0]: Got data 100.000000, computing:
[1]: computed 102.000000, sending:
[1]: Reading queued data 1.000000:
[1]: Got data 1.000000, computing:
[0]: computed 3.000000, sending:
[0]: Reading queued data 101.000000:
[0]: Got data 101.000000, computing:
[1]: computed 103.000000, sending:
[1]: Reading queued data 2.000000:
[1]: Got data 2.000000, computing:
[0]: computed 4.000000, sending:
[1]: computed 104.000000, sending:
[0]: Reading queued data 102.000000:
[1]: Reading queued data 3.000000:
[1]: Got data 3.000000, computing:
[0]: Got data 102.000000, computing:
[0]: computed 5.000000, sending:
[0]: Reading queued data 103.000000:
[0]: Got data 103.000000, computing:
[1]: computed 105.000000, sending:
[1]: Reading queued data 4.000000:
[1]: Got data 4.000000, computing:
[0]: computed 6.000000, sending:
[0]: Reading queued data 104.000000:
[0]: Got data 104.000000, computing:
[1]: computed 106.000000, sending:
[1]: Reading queued data 5.000000:
[1]: Got data 5.000000, computing:
[0]: computed 7.000000, sending:
[0]: Reading queued data 105.000000:
[0]: Got data 105.000000, computing:
[1]: computed 107.000000, sending:
[1]: Reading queued data 6.000000:
[1]: Got data 6.000000, computing:
[0]: computed 8.000000, sending:
[0]: Reading queued data 106.000000:
[0]: Got data 106.000000, computing:
[1]: computed 108.000000, sending:
[1]: Reading queued data 7.000000:
[1]: Got data 7.000000, computing:
[0]: computed 9.000000, sending:
[0]: Reading queued data 107.000000:
[0]: Got data 107.000000, computing:
[1]: computed 109.000000, sending:
[1]: Reading queued data 8.000000:
[1]: Got data 8.000000, computing:
[0]: computed 10.000000, sending:
[1]: computed 110.000000, sending:
Note that this is just demonstration code, a final version really needs to do waitalls and more iprobes at the end there to free any pending requests and flush any waiting messages.
Does your environment and MPI distribution support multi-threading? If so, you could create a thread in P1 that calculates the value and stores each iteration's result in a variable shared with P1's main thread (writes protected via semaphore) As suggested by suszterpatt above, then have P2 send a "i'm ready" message to P1 and have P1 respond with the value from the most recent iteration.
精彩评论