MIDAPACK - MIcrowave Data Analysis PACKage  1.1b
Parallel software tools for high performance CMB DA analysis
ring.c
Go to the documentation of this file.
1 
20 #ifdef W_MPI
21 
22 #include "mapmat.h"
23 #include <mpi.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 
48 int ring_init(int *indices, int count, int **R, int *nR, int **S, int *nS,
49  int steps, MPI_Comm comm) {
50  int err, p, tag;
51  int size, rank, sp, rp;
52  int *buf, nbuf;
53  MPI_Request s_request, r_request;
54 
55  MPI_Comm_size(comm, &size);
56  MPI_Comm_rank(comm, &rank);
57  MPI_Allreduce(&count, &nbuf, 1, MPI_INT, MPI_MAX,
58  comm); // compute the buffer size : max(count)_{comm}
59  buf = (int *) malloc(nbuf * sizeof(int)); // allocate buffer
60  tag = 0;
61  for (p = 1; p < steps; p++) { // communication phase to get nb shared
62  // indices between peer of preocesses
63  sp = (rank + p) % size;
64  rp = (rank + size - p) % size;
65  MPI_Isend(&count, 1, MPI_INT, sp, 0, comm,
66  &s_request); // send my number of indices
67  MPI_Irecv(&nbuf, 1, MPI_INT, rp, 0, comm,
68  &r_request); // receive a number of indices
69  tag++;
70  MPI_Wait(&r_request, MPI_STATUS_IGNORE);
71  MPI_Irecv(buf, nbuf, MPI_INT, rp, tag, comm,
72  &r_request); // receive indices tab
73  MPI_Wait(&s_request, MPI_STATUS_IGNORE);
74  MPI_Isend(indices, count, MPI_INT, sp, tag, comm,
75  &s_request); // send indices tab
76  tag++;
77 
78  MPI_Wait(&r_request, MPI_STATUS_IGNORE);
79  nR[p] = card_and(indices, count, buf,
80  nbuf); // compute number of shared indices
81  nS[steps - p] = nR[p];
82  R[p] = (int *) malloc(nR[p] * sizeof(int)); // allocate receiving tab
83  S[steps - p] = (int *) malloc(nS[steps - p]
84  * sizeof(int)); // allocate sanding tab
85  map_and(indices, count, buf, nbuf, R[p]); // fill receiving tab
86  S[steps - p] = R[p]; //
87  }
88  free(buf);
89  nS[0] = 0; //
90  nR[0] = 0; //
91  return 0;
92 }
93 
109 int ring_reduce(int **R, int *nR, int nRmax, int **S, int *nS, int nSmax,
110  double *val, double *res_val, int steps, MPI_Comm comm) {
111  int tag, rank, size, p;
112  MPI_Request s_request, r_request;
113  int sp, rp;
114  double *sbuf, *rbuf;
115 
116  MPI_Comm_size(comm, &size);
117  MPI_Comm_rank(comm, &rank);
118  tag = 0;
119 
120  rbuf = (double *) malloc(nRmax * sizeof(double));
121  sbuf = (double *) malloc(nSmax * sizeof(double));
122 
123  for (p = 1; p < steps; p++) {
124  rp = (rank + size - p) % size;
125  MPI_Irecv(rbuf, nR[p], MPI_DOUBLE, rp, tag, comm, &r_request);
126  sp = (rank + p) % size;
127  m2s(val, sbuf, S[p], nS[p]); // fill the sending buffer
128  MPI_Isend(sbuf, nS[p], MPI_DOUBLE, sp, tag, comm, &s_request);
129 
130  tag++;
131 
132  MPI_Wait(&r_request, MPI_STATUS_IGNORE);
133  s2m_sum(res_val, rbuf, R[p], nR[p]); // sum receive buffer into values
134 
135  MPI_Wait(&s_request, MPI_STATUS_IGNORE);
136  }
137  free(sbuf);
138  free(rbuf);
139  return 0;
140 }
141 
142 
157 int alltoallv_reduce(int **R, int *nR, int nRtot, int **S, int *nS, int nStot,
158  double *val, double *res_val, int steps, MPI_Comm comm) {
159  int rank, size, p;
160  MPI_Request s_request, r_request;
161  int sp, rp, *rindx, *sindx, *rdisp, *sdisp;
162  double *sbuf, *rbuf;
163 
164 
165  MPI_Comm_size(comm,
166  &size); // N.B. size and steps must be equal, shall we check
167  // for this ?! -- rs
168  MPI_Comm_rank(comm, &rank);
169 
170  rbuf = (double *) malloc(nRtot * sizeof(double));
171  sbuf = (double *) malloc(nStot * sizeof(double));
172 
173  rindx = (int *) calloc(size, sizeof(int));
174  sindx = (int *) calloc(size, sizeof(int));
175 
176  rdisp = (int *) calloc(size, sizeof(int));
177  sdisp = (int *) calloc(size, sizeof(int));
178 
179  // compute shifts ...
180 
181  for (p = 0; p < steps; p++) { // starts with 0 !
182  rp = (rank + size - p) % size;
183  rindx[rp] = nR[p];
184  sp = (rank + p) % size;
185  sindx[sp] = nS[p];
186  }
187 
188  for (p = 1; p < size; p++) {
189  sdisp[p] = sdisp[p - 1] + sindx[p - 1];
190  rdisp[p] = rdisp[p - 1] + rindx[p - 1];
191  }
192 
193  // prepare data to send ...
194 
195  for (p = 0; p < steps; p++) {
196  sp = (rank + p) % size;
197  m2s(val, &sbuf[sdisp[sp]], S[p], nS[p]); // fill the sending buffer
198  }
199 
200  MPI_Alltoallv(sbuf, sindx, sdisp, MPI_DOUBLE, rbuf, rindx, rdisp,
201  MPI_DOUBLE, comm);
202 
203  // accumulate contributions ...
204 
205  for (p = 0; p < steps; p++) {
206  rp = (rank + size - p) % size;
207  s2m_sum(res_val, &rbuf[rdisp[rp]], R[p],
208  nR[p]); // sum receive buffer into values
209  }
210 
211  free(sdisp);
212  free(rdisp);
213  free(sindx);
214  free(rindx);
215  free(sbuf);
216  free(rbuf);
217 
218  return 0;
219 }
220 
234 int ring_nonblocking_reduce(int **R, int *nR, int **S, int *nS, double *val,
235  double *res_val, int steps, MPI_Comm comm) {
236  int tag, rank, size, p;
237  MPI_Request *s_request, *r_request;
238  int sp, rp;
239  double **sbuf, **rbuf;
240 
241  MPI_Comm_size(comm, &size);
242  MPI_Comm_rank(comm, &rank);
243  // printf("\n non_blocking rank %d", rank);
244 
245  s_request = (MPI_Request *) malloc((steps - 1) * sizeof(MPI_Request));
246  r_request = (MPI_Request *) malloc((steps - 1) * sizeof(MPI_Request));
247 
248  rbuf = (double **) malloc((steps - 1) * sizeof(double *));
249  sbuf = (double **) malloc((steps - 1) * sizeof(double *));
250 
251  for (p = 1; p < steps; p++) {
252  // printf("\n buf alloc %d", p);
253  rbuf[p - 1] = (double *) malloc(nR[p] * sizeof(double));
254  sbuf[p - 1] = (double *) malloc(nS[p] * sizeof(double));
255  m2s(val, sbuf[p - 1], S[p], nS[p]); // fill the sending buffer
256  }
257 
258  tag = 0;
259  for (p = 1; p < steps; p++) {
260  // printf("\n isend %d", p);
261  sp = (rank + p) % size;
262  rp = (rank + size - p) % size;
263 
264  MPI_Irecv(rbuf[p - 1], nR[p], MPI_DOUBLE, rp, tag, comm,
265  &r_request[p - 1]);
266  MPI_Isend(sbuf[p - 1], nS[p], MPI_DOUBLE, sp, tag, comm,
267  &s_request[p - 1]);
268  tag++;
269  }
270  MPI_Waitall(size - 1, r_request, MPI_STATUSES_IGNORE);
271 
272  for (p = 1; p < steps; p++) {
273  s2m_sum(res_val, rbuf[p - 1], R[p],
274  nR[p]); // sum receive buffer into values
275  }
276  MPI_Waitall(size - 1, s_request, MPI_STATUSES_IGNORE);
277  free(r_request);
278  free(s_request);
279  free(sbuf);
280  free(rbuf);
281  return 0;
282 }
283 
298 int ring_noempty_reduce(int **R, int *nR, int nneR, int **S, int *nS, int nneS,
299  double *val, double *res_val, int steps,
300  MPI_Comm comm) {
301  int tag, rank, size, p;
302  MPI_Request *s_request, *r_request;
303  int sp, rp, nesi, neri;
304  double **sbuf, **rbuf;
305 
306  MPI_Comm_size(comm, &size);
307  MPI_Comm_rank(comm, &rank);
308  // printf("\n non_blocking rank %d", rank);
309 
310  s_request = (MPI_Request *) malloc(nneS * sizeof(MPI_Request));
311  r_request = (MPI_Request *) malloc(nneR * sizeof(MPI_Request));
312 
313  rbuf = (double **) malloc(nneR * sizeof(double *));
314  sbuf = (double **) malloc(nneS * sizeof(double *));
315 
316  nesi = 0;
317  for (p = 1; p < steps; p++) {
318  if (nS[p] != 0) {
319  sbuf[nesi] = (double *) malloc(nS[p] * sizeof(double));
320  m2s(val, sbuf[nesi], S[p], nS[p]); // fill the sending buffer
321  nesi++;
322  }
323  }
324 
325  tag = 0;
326  nesi = 0;
327  neri = 0;
328  for (p = 1; p < steps; p++) {
329  sp = (rank + p) % size;
330  rp = (rank + size - p) % size;
331  if (nR[p] != 0) {
332  rbuf[neri] = (double *) malloc(nR[p] * sizeof(double));
333  MPI_Irecv(rbuf[neri], nR[p], MPI_DOUBLE, rp, tag, comm,
334  &r_request[neri]);
335  neri++;
336  }
337  if (nS[p] != 0) {
338  MPI_Isend(sbuf[nesi], nS[p], MPI_DOUBLE, sp, tag, comm,
339  &s_request[nesi]);
340  nesi++;
341  }
342  tag++;
343  }
344  MPI_Waitall(nneR, r_request, MPI_STATUSES_IGNORE);
345 
346  neri = 0;
347  for (p = 1; p < steps; p++) {
348  if (nR[p] != 0) {
349  s2m_sum(res_val, rbuf[neri], R[p],
350  nR[p]); // sum receive buffer into values
351  neri++;
352  }
353  }
354  MPI_Waitall(nneS, s_request, MPI_STATUSES_IGNORE);
355  free(r_request);
356  free(s_request);
357  free(sbuf);
358  free(rbuf);
359  return 0;
360 }
361 
362 //=======================================================Modification added by
363 //Sebastien Cayrols : 01/09/2015 ; Berkeley
364 
380 int ring_noempty_step_reduce(int **R, int *nR, int nRmax, int **S, int *nS,
381  int nSmax, double *val, double *res_val, int steps,
382  MPI_Comm comm) {
383  int tag, rank, size, p;
384  MPI_Request s_request, r_request;
385  int sp, rp;
386  double *sbuf, *rbuf;
387 
388  MPI_Comm_size(comm, &size);
389  MPI_Comm_rank(comm, &rank);
390  tag = 0;
391 
392  rbuf = (double *) malloc(nRmax * sizeof(double));
393  sbuf = (double *) malloc(nSmax * sizeof(double));
394 
395  for (p = 1; p < steps; p++) {
396  rp = (rank + size - p) % size;
397  if (nR[p] != 0)
398  MPI_Irecv(rbuf, nR[p], MPI_DOUBLE, rp, tag, comm, &r_request);
399  sp = (rank + p) % size;
400  if (nS[p] != 0) {
401  m2s(val, sbuf, S[p], nS[p]); // fill the sending buffer
402  MPI_Isend(sbuf, nS[p], MPI_DOUBLE, sp, tag, comm, &s_request);
403  }
404  tag++;
405 
406  if (nR[p] != 0) {
407  MPI_Wait(&r_request, MPI_STATUS_IGNORE);
408  s2m_sum(res_val, rbuf, R[p],
409  nR[p]); // sum receive buffer into values
410  }
411  if (nS[p] != 0) MPI_Wait(&s_request, MPI_STATUS_IGNORE);
412  }
413  free(sbuf);
414  free(rbuf);
415  return 0;
416 }
417 
418 #endif
void m2s(double *mapval, double *submapval, int *subset, int count)
Definition: alm.c:27
void s2m_sum(double *mapval, double *submapval, int *subset, int count)
Sum submap values the submap values array.
Definition: alm.c:54
int card_and(int *A1, int n1, int *A2, int n2)
Definition: als.c:111
int map_and(int *A1, int n1, int *A2, int n2, int *mapA1andA2)
Compute map A1 and A2 / A1.
Definition: als.c:210
int ring_noempty_step_reduce(int **R, int *nR, int nRmax, int **S, int *nS, int nSmax, double *val, double *res_val, int steps, MPI_Comm comm)
Perform a sparse sum reduction (or mapped reduction) using a ring-like communication scheme.
Definition: ring.c:380
int alltoallv_reduce(int **R, int *nR, int nRtot, int **S, int *nS, int nStot, double *val, double *res_val, int steps, MPI_Comm comm)
Perform a sparse sum reduction (or mapped reduction) using an MPI-Alltoallv call.
Definition: ring.c:157
int ring_nonblocking_reduce(int **R, int *nR, int **S, int *nS, double *val, double *res_val, int steps, MPI_Comm comm)
Perform a sparse sum reduction (or mapped reduction) using a ring-like non-blocking communication sch...
Definition: ring.c:234
int ring_noempty_reduce(int **R, int *nR, int nneR, int **S, int *nS, int nneS, double *val, double *res_val, int steps, MPI_Comm comm)
Perform a sparse sum reduction (or mapped reduction) using a ring-like non-blocking no-empty communic...
Definition: ring.c:298
int ring_init(int *indices, int count, int **R, int *nR, int **S, int *nS, int steps, MPI_Comm comm)
Initialize tables for ring-like communication scheme.
Definition: ring.c:48
int ring_reduce(int **R, int *nR, int nRmax, int **S, int *nS, int nSmax, double *val, double *res_val, int steps, MPI_Comm comm)
Perform a sparse sum reduction (or mapped reduction) using a ring-like communication scheme.
Definition: ring.c:109