COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
SpParHelper.cpp
Go to the documentation of this file.
1/****************************************************************/
2/* Parallel Combinatorial BLAS Library (for Graph Computations) */
3/* version 1.6 -------------------------------------------------*/
4/* date: 6/15/2017 ---------------------------------------------*/
5/* authors: Ariful Azad, Aydin Buluc --------------------------*/
6/****************************************************************/
7/*
8 Copyright (c) 2010-2017, The Regents of the University of California
9
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the Software is
15 furnished to do so, subject to the following conditions:
16
17 The above copyright notice and this permission notice shall be included in
18 all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 THE SOFTWARE.
27 */
28
29
30#include "usort/parUtils.h"
31
32namespace combblas {
33
34template <typename IT>
35void SpParHelper::ReDistributeToVector(int* & map_scnt, std::vector< std::vector< IT > > & locs_send, std::vector< std::vector< std::string > > & data_send,
36 std::vector<std::array<char, MAXVERTNAME>> & distmapper_array, const MPI_Comm & comm)
37{
38 int nprocs, myrank;
39 MPI_Comm_size(comm, &nprocs);
40 MPI_Comm_rank(comm, &myrank);
41
42 int * map_rcnt = new int[nprocs];
43 MPI_Alltoall(map_scnt, 1, MPI_INT, map_rcnt, 1, MPI_INT, comm);
44 int * map_sdspl = new int[nprocs]();
45 int * map_rdspl = new int[nprocs]();
46 std::partial_sum(map_scnt, map_scnt+nprocs-1, map_sdspl+1);
47 std::partial_sum(map_rcnt, map_rcnt+nprocs-1, map_rdspl+1);
48 IT totmapsend = map_sdspl[nprocs-1] + map_scnt[nprocs-1];
49 IT totmaprecv = map_rdspl[nprocs-1] + map_rcnt[nprocs-1];
50
51 // sendbuf is a pointer to array of MAXVERTNAME chars.
52 // Explicit grouping syntax is due to precedence of [] over *
53 // char* sendbuf[MAXVERTNAME] would have declared a MAXVERTNAME-length array of char pointers
54 char (*sendbuf)[MAXVERTNAME]; // each sendbuf[i] is type char[MAXVERTNAME]
55 sendbuf = (char (*)[MAXVERTNAME]) malloc(sizeof(char[MAXVERTNAME])* totmapsend); // notice that this is allocating a contiguous block of memory
56
57 IT * sendinds = new IT[totmapsend];
58 for(int i=0; i<nprocs; ++i)
59 {
60 int loccnt = 0;
61 for(std::string s:data_send[i])
62 {
63 std::strcpy(sendbuf[map_sdspl[i]+loccnt], s.c_str());
64 loccnt++;
65 }
66 std::vector<std::string>().swap(data_send[i]); // free memory
67 }
68 for(int i=0; i<nprocs; ++i) // sanity check: received indices should be sorted by definition
69 {
70 std::copy(locs_send[i].begin(), locs_send[i].end(), sendinds+map_sdspl[i]);
71 std::vector<IT>().swap(locs_send[i]); // free memory
72 }
73
74 char (*recvbuf)[MAXVERTNAME]; // recvbuf is of type char (*)[MAXVERTNAME]
75 recvbuf = (char (*)[MAXVERTNAME]) malloc(sizeof(char[MAXVERTNAME])* totmaprecv);
76
77 MPI_Datatype MPI_STRING; // this is not necessary (we could just use char) but easier for bookkeeping
78 MPI_Type_contiguous(sizeof(char[MAXVERTNAME]), MPI_CHAR, &MPI_STRING);
79 MPI_Type_commit(&MPI_STRING);
80
81 MPI_Alltoallv(sendbuf, map_scnt, map_sdspl, MPI_STRING, recvbuf, map_rcnt, map_rdspl, MPI_STRING, comm);
82 free(sendbuf); // can't delete[] so use free
83 MPI_Type_free(&MPI_STRING);
84
85 IT * recvinds = new IT[totmaprecv];
86 MPI_Alltoallv(sendinds, map_scnt, map_sdspl, MPIType<IT>(), recvinds, map_rcnt, map_rdspl, MPIType<IT>(), comm);
87 DeleteAll(sendinds, map_scnt, map_sdspl, map_rcnt, map_rdspl);
88
89 if(!std::is_sorted(recvinds, recvinds+totmaprecv))
90 std::cout << "Assertion failed at proc " << myrank << ": Received indices are not sorted, this is unexpected" << std::endl;
91
92 for(IT i=0; i< totmaprecv; ++i)
93 {
94 assert(i == recvinds[i]);
95 std::copy(recvbuf[i], recvbuf[i]+MAXVERTNAME, distmapper_array[i].begin());
96 }
97 free(recvbuf);
98 delete [] recvinds;
99}
100
101
102template<typename KEY, typename VAL, typename IT>
103void SpParHelper::MemoryEfficientPSort(std::pair<KEY,VAL> * array, IT length, IT * dist, const MPI_Comm & comm)
104{
105 int nprocs, myrank;
106 MPI_Comm_size(comm, &nprocs);
107 MPI_Comm_rank(comm, &myrank);
108 int nsize = nprocs / 2; // new size
109 if(nprocs < 10000)
110 {
111 bool excluded = false;
112 if(dist[myrank] == 0) excluded = true;
113
114 int nreals = 0;
115 for(int i=0; i< nprocs; ++i)
116 if(dist[i] != 0) ++nreals;
117
118 //SpParHelper::MemoryEfficientPSort(vecpair, nnz, dist, World);
119
120 if(nreals == nprocs) // general case
121 {
122 long * dist_in = new long[nprocs];
123 for(int i=0; i< nprocs; ++i) dist_in[i] = (long) dist[i];
124 vpsort::parallel_sort (array, array+length, dist_in, comm);
125 delete [] dist_in;
126 }
127 else
128 {
129 long * dist_in = new long[nreals];
130 int * dist_out = new int[nprocs-nreals]; // ranks to exclude
131 int indin = 0;
132 int indout = 0;
133 for(int i=0; i< nprocs; ++i)
134 {
135 if(dist[i] == 0)
136 dist_out[indout++] = i;
137 else
138 dist_in[indin++] = (long) dist[i];
139 }
140
141 #ifdef DEBUG
142 std::ostringstream outs;
143 outs << "To exclude indices: ";
144 std::copy(dist_out, dist_out+indout, std::ostream_iterator<int>(outs, " ")); outs << std::endl;
145 SpParHelper::Print(outs.str());
146 #endif
147
148 MPI_Group sort_group, real_group;
149 MPI_Comm_group(comm, &sort_group);
150 MPI_Group_excl(sort_group, indout, dist_out, &real_group);
151 MPI_Group_free(&sort_group);
152
153 // The Create() function should be executed by all processes in comm,
154 // even if they do not belong to the new group (in that case MPI_COMM_NULL is returned as real_comm?)
155 // MPI::Intracomm MPI::Intracomm::Create(const MPI::Group& group) const;
156 MPI_Comm real_comm;
157 MPI_Comm_create(comm, real_group, &real_comm);
158 if(!excluded)
159 {
160 vpsort::parallel_sort (array, array+length, dist_in, real_comm);
161 MPI_Comm_free(&real_comm);
162 }
163 MPI_Group_free(&real_group);
164 delete [] dist_in;
165 delete [] dist_out;
166 }
167 }
168 else
169 {
170 IT gl_median = std::accumulate(dist, dist+nsize, static_cast<IT>(0)); // global rank of the first element of the median processor
171 sort(array, array+length); // re-sort because we might have swapped data in previous iterations
172 int color = (myrank < nsize)? 0: 1;
173
174 std::pair<KEY,VAL> * low = array;
175 std::pair<KEY,VAL> * upp = array;
176 GlobalSelect(gl_median, low, upp, array, length, comm);
177 BipartiteSwap(low, array, length, nsize, color, comm);
178
179 if(color == 1) dist = dist + nsize; // adjust for the second half of processors
180
181 // recursive call; two implicit 'spawn's where half of the processors execute different paramaters
182 // MPI::Intracomm MPI::Intracomm::Split(int color, int key) const;
183
184 MPI_Comm halfcomm;
185 MPI_Comm_split(comm, color, myrank, &halfcomm); // split into two communicators
186 MemoryEfficientPSort(array, length, dist, halfcomm);
187 }
188
189}
190
191
192/*
193 TODO: This function is just a hack at this moment.
194 The payload (VAL) can only be integer at this moment.
195 FIX this.
196 */
197template<typename KEY, typename VAL, typename IT>
198std::vector<std::pair<KEY,VAL>> SpParHelper::KeyValuePSort(std::pair<KEY,VAL> * array, IT length, IT * dist, const MPI_Comm & comm)
199{
200 int nprocs, myrank;
201 MPI_Comm_size(comm, &nprocs);
202 MPI_Comm_rank(comm, &myrank);
203 int nsize = nprocs / 2; // new size
204
205
206
207 bool excluded = false;
208 if(dist[myrank] == 0) excluded = true;
209
210 int nreals = 0;
211 for(int i=0; i< nprocs; ++i)
212 if(dist[i] != 0) ++nreals;
213
214 std::vector<IndexHolder<KEY>> in(length);
215#ifdef THREADED
216#pragma omp parallel for
217#endif
218 for(int i=0; i< length; ++i)
219 {
220 in[i] = IndexHolder<KEY>(array[i].first, static_cast<unsigned long>(array[i].second));
221 }
222
223 if(nreals == nprocs) // general case
224 {
225 par::sampleSort(in, comm);
226 }
227 else
228 {
229 long * dist_in = new long[nreals];
230 int * dist_out = new int[nprocs-nreals]; // ranks to exclude
231 int indin = 0;
232 int indout = 0;
233 for(int i=0; i< nprocs; ++i)
234 {
235 if(dist[i] == 0)
236 dist_out[indout++] = i;
237 else
238 dist_in[indin++] = (long) dist[i];
239 }
240
241#ifdef DEBUG
242 std::ostringstream outs;
243 outs << "To exclude indices: ";
244 std::copy(dist_out, dist_out+indout, std::ostream_iterator<int>(outs, " ")); outs << std::endl;
245 SpParHelper::Print(outs.str());
246#endif
247
248 MPI_Group sort_group, real_group;
249 MPI_Comm_group(comm, &sort_group);
250 MPI_Group_excl(sort_group, indout, dist_out, &real_group);
251 MPI_Group_free(&sort_group);
252
253 // The Create() function should be executed by all processes in comm,
254 // even if they do not belong to the new group (in that case MPI_COMM_NULL is returned as real_comm?)
255 // MPI::Intracomm MPI::Intracomm::Create(const MPI::Group& group) const;
256 MPI_Comm real_comm;
257 MPI_Comm_create(comm, real_group, &real_comm);
258 if(!excluded)
259 {
260 par::sampleSort(in, real_comm);
261 MPI_Comm_free(&real_comm);
262 }
263 MPI_Group_free(&real_group);
264 delete [] dist_in;
265 delete [] dist_out;
266 }
267
268 std::vector<std::pair<KEY,VAL>> sorted(in.size());
269 for(int i=0; i<in.size(); i++)
270 {
271 sorted[i].second = static_cast<VAL>(in[i].index);
272 sorted[i].first = in[i].value;
273 }
274 return sorted;
275}
276
277
278template<typename KEY, typename VAL, typename IT>
279void SpParHelper::GlobalSelect(IT gl_rank, std::pair<KEY,VAL> * & low, std::pair<KEY,VAL> * & upp, std::pair<KEY,VAL> * array, IT length, const MPI_Comm & comm)
280{
281 int nprocs, myrank;
282 MPI_Comm_size(comm, &nprocs);
283 MPI_Comm_rank(comm, &myrank);
284 IT begin = 0;
285 IT end = length; // initially everyone is active
286 std::pair<KEY, double> * wmminput = new std::pair<KEY,double>[nprocs]; // (median, #{actives})
287
288 MPI_Datatype MPI_sortType;
289 MPI_Type_contiguous (sizeof(std::pair<KEY,double>), MPI_CHAR, &MPI_sortType);
290 MPI_Type_commit (&MPI_sortType);
291
292 KEY wmm; // our median pick
293 IT gl_low, gl_upp;
294 IT active = end-begin; // size of the active range
295 IT nacts = 0;
296 bool found = 0;
297 int iters = 0;
298
299 /* changes by shan : to add begin0 and end0 for exit condition */
300 IT begin0, end0;
301 do
302 {
303 iters++;
304 begin0 = begin; end0 = end;
305 KEY median = array[(begin + end)/2].first; // median of the active range
306 wmminput[myrank].first = median;
307 wmminput[myrank].second = static_cast<double>(active);
308 MPI_Allgather(MPI_IN_PLACE, 0, MPI_sortType, wmminput, 1, MPI_sortType, comm);
309 double totact = 0; // total number of active elements
310 for(int i=0; i<nprocs; ++i)
311 totact += wmminput[i].second;
312
313 // input to weighted median of medians is a set of (object, weight) pairs
314 // the algorithm computes the first set of elements (according to total
315 // order of "object"s), whose sum is still less than or equal to 1/2
316 for(int i=0; i<nprocs; ++i)
317 wmminput[i].second /= totact ; // normalize the weights
318
319 sort(wmminput, wmminput+nprocs); // sort w.r.t. medians
320 double totweight = 0;
321 int wmmloc=0;
322 while( wmmloc<nprocs && totweight < 0.5 )
323 {
324 totweight += wmminput[wmmloc++].second;
325 }
326
327 wmm = wmminput[wmmloc-1].first; // weighted median of medians
328
329 std::pair<KEY,VAL> wmmpair = std::make_pair(wmm, VAL());
330 low =std::lower_bound (array+begin, array+end, wmmpair);
331 upp =std::upper_bound (array+begin, array+end, wmmpair);
332 IT loc_low = low-array; // #{elements smaller than wmm}
333 IT loc_upp = upp-array; // #{elements smaller or equal to wmm}
334
335 MPI_Allreduce( &loc_low, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm);
336 MPI_Allreduce( &loc_upp, &gl_upp, 1, MPIType<IT>(), MPI_SUM, comm);
337
338 if(gl_upp < gl_rank)
339 {
340 // our pick was too small; only recurse to the right
341 begin = (low - array);
342 }
343 else if(gl_rank < gl_low)
344 {
345 // our pick was too big; only recurse to the left
346 end = (upp - array);
347 }
348 else
349 {
350 found = true;
351 }
352 active = end-begin;
353 MPI_Allreduce(&active, &nacts, 1, MPIType<IT>(), MPI_SUM, comm);
354 if (begin0 == begin && end0 == end) break; // ABAB: Active range did not shrink, so we break (is this kosher?)
355 }
356 while((nacts > 2*nprocs) && (!found));
357 delete [] wmminput;
358
359 MPI_Datatype MPI_pairType;
360 MPI_Type_contiguous (sizeof(std::pair<KEY,VAL>), MPI_CHAR, &MPI_pairType);
361 MPI_Type_commit (&MPI_pairType);
362
363 int * nactives = new int[nprocs];
364 nactives[myrank] = static_cast<int>(active); // At this point, actives are small enough
365 MPI_Allgather(MPI_IN_PLACE, 0, MPI_INT, nactives, 1, MPI_INT, comm);
366 int * dpls = new int[nprocs](); // displacements (zero initialized pid)
367 std::partial_sum(nactives, nactives+nprocs-1, dpls+1);
368 std::pair<KEY,VAL> * recvbuf = new std::pair<KEY,VAL>[nacts];
369 low = array + begin; // update low to the beginning of the active range
370 MPI_Allgatherv(low, active, MPI_pairType, recvbuf, nactives, dpls, MPI_pairType, comm);
371
372 std::pair<KEY,int> * allactives = new std::pair<KEY,int>[nacts];
373 int k = 0;
374 for(int i=0; i<nprocs; ++i)
375 {
376 for(int j=0; j<nactives[i]; ++j)
377 {
378 allactives[k] = std::make_pair(recvbuf[k].first, i);
379 k++;
380 }
381 }
382 DeleteAll(recvbuf, dpls, nactives);
383 sort(allactives, allactives+nacts);
384 MPI_Allreduce(&begin, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm); // update
385 int diff = gl_rank - gl_low;
386 for(int k=0; k < diff; ++k)
387 {
388 if(allactives[k].second == myrank)
389 ++low; // increment the local pointer
390 }
391 delete [] allactives;
392 begin = low-array;
393 MPI_Allreduce(&begin, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm); // update
394}
395
396template<typename KEY, typename VAL, typename IT>
397void SpParHelper::BipartiteSwap(std::pair<KEY,VAL> * low, std::pair<KEY,VAL> * array, IT length, int nfirsthalf, int color, const MPI_Comm & comm)
398{
399 int nprocs, myrank;
400 MPI_Comm_size(comm, &nprocs);
401 MPI_Comm_rank(comm, &myrank);
402
403 IT * firsthalves = new IT[nprocs];
404 IT * secondhalves = new IT[nprocs];
405 firsthalves[myrank] = low-array;
406 secondhalves[myrank] = length - (low-array);
407
408 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), firsthalves, 1, MPIType<IT>(), comm);
409 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), secondhalves, 1, MPIType<IT>(), comm);
410
411 int * sendcnt = new int[nprocs](); // zero initialize
412 int totrecvcnt = 0;
413
414 std::pair<KEY,VAL> * bufbegin = NULL;
415 if(color == 0) // first processor half, only send second half of data
416 {
417 bufbegin = low;
418 totrecvcnt = length - (low-array);
419 IT beg_oftransfer = std::accumulate(secondhalves, secondhalves+myrank, static_cast<IT>(0));
420 IT spaceafter = firsthalves[nfirsthalf];
421 int i=nfirsthalf+1;
422 while(i < nprocs && spaceafter < beg_oftransfer)
423 {
424 spaceafter += firsthalves[i++]; // post-incremenet
425 }
426 IT end_oftransfer = beg_oftransfer + secondhalves[myrank]; // global index (within second half) of the end of my data
427 IT beg_pour = beg_oftransfer;
428 IT end_pour = std::min(end_oftransfer, spaceafter);
429 sendcnt[i-1] = end_pour - beg_pour;
430 while( i < nprocs && spaceafter < end_oftransfer ) // find other recipients until I run out of data
431 {
432 beg_pour = end_pour;
433 spaceafter += firsthalves[i];
434 end_pour = std::min(end_oftransfer, spaceafter);
435 sendcnt[i++] = end_pour - beg_pour; // post-increment
436 }
437 }
438 else if(color == 1) // second processor half, only send first half of data
439 {
440 bufbegin = array;
441 totrecvcnt = low-array;
442 // global index (within the second processor half) of the beginning of my data
443 IT beg_oftransfer = std::accumulate(firsthalves+nfirsthalf, firsthalves+myrank, static_cast<IT>(0));
444 IT spaceafter = secondhalves[0];
445 int i=1;
446 while( i< nfirsthalf && spaceafter < beg_oftransfer)
447 {
448 //spacebefore = spaceafter;
449 spaceafter += secondhalves[i++]; // post-increment
450 }
451 IT end_oftransfer = beg_oftransfer + firsthalves[myrank]; // global index (within second half) of the end of my data
452 IT beg_pour = beg_oftransfer;
453 IT end_pour = std::min(end_oftransfer, spaceafter);
454 sendcnt[i-1] = end_pour - beg_pour;
455 while( i < nfirsthalf && spaceafter < end_oftransfer ) // find other recipients until I run out of data
456 {
457 beg_pour = end_pour;
458 spaceafter += secondhalves[i];
459 end_pour = std::min(end_oftransfer, spaceafter);
460 sendcnt[i++] = end_pour - beg_pour; // post-increment
461 }
462 }
463 DeleteAll(firsthalves, secondhalves);
464 int * recvcnt = new int[nprocs];
465 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, comm); // get the recv counts
466 // Alltoall is actually unnecessary, because sendcnt = recvcnt
467 // If I have n_mine > n_yours data to send, then I can send you only n_yours
468 // as this is your space, and you'll send me identical amount.
469 // Then I can only receive n_mine - n_yours from the third processor and
470 // that processor can only send n_mine - n_yours to me back.
471 // The proof follows from induction
472
473 MPI_Datatype MPI_valueType;
474 MPI_Type_contiguous(sizeof(std::pair<KEY,VAL>), MPI_CHAR, &MPI_valueType);
475 MPI_Type_commit(&MPI_valueType);
476
477 std::pair<KEY,VAL> * receives = new std::pair<KEY,VAL>[totrecvcnt];
478 int * sdpls = new int[nprocs](); // displacements (zero initialized pid)
479 int * rdpls = new int[nprocs]();
480 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
481 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
482
483 MPI_Alltoallv(bufbegin, sendcnt, sdpls, MPI_valueType, receives, recvcnt, rdpls, MPI_valueType, comm); // sparse swap
484
485 DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
486 std::copy(receives, receives+totrecvcnt, bufbegin);
487 delete [] receives;
488}
489
490
491template<typename KEY, typename VAL, typename IT>
492void SpParHelper::DebugPrintKeys(std::pair<KEY,VAL> * array, IT length, IT * dist, MPI_Comm & World)
493{
494 int rank, nprocs;
495 MPI_Comm_rank(World, &rank);
496 MPI_Comm_size(World, &nprocs);
497 MPI_File thefile;
498
499 char _fn[] = "temp_sortedkeys"; // AL: this is to avoid the problem that C++ string literals are const char* while C string literals are char*, leading to a const warning (technically error, but compilers are tolerant)
500 MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
501
502 // The cast in the last parameter is crucial because the signature of the function is
503 // T accumulate ( InputIterator first, InputIterator last, T init )
504 // Hence if init if of type "int", the output also becomes an it (remember C++ signatures are ignorant of return value)
505 IT sizeuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
506
507 MPI_Offset disp = sizeuntil * sizeof(KEY); // displacement is in bytes
508 MPI_File_set_view(thefile, disp, MPIType<KEY>(), MPIType<KEY>(), "native", MPI_INFO_NULL);
509
510 KEY * packed = new KEY[length];
511 for(int i=0; i<length; ++i)
512 {
513 packed[i] = array[i].first;
514 }
515 MPI_File_write(thefile, packed, length, MPIType<KEY>(), NULL);
516 MPI_File_close(&thefile);
517 delete [] packed;
518
519 // Now let processor-0 read the file and print
520 if(rank == 0)
521 {
522 FILE * f = fopen("temp_sortedkeys", "r");
523 if(!f)
524 {
525 std::cerr << "Problem reading binary input file\n";
526 return;
527 }
528 IT maxd = *std::max_element(dist, dist+nprocs);
529 KEY * data = new KEY[maxd];
530
531 for(int i=0; i<nprocs; ++i)
532 {
533 // read n_per_proc integers and print them
534 fread(data, sizeof(KEY), dist[i],f);
535
536 std::cout << "Elements stored on proc " << i << ": " << std::endl;
537 std::copy(data, data+dist[i], std::ostream_iterator<KEY>(std::cout, "\n"));
538 }
539 delete [] data;
540 }
541}
542
543
551template <class IT, class NT, class DER>
552void SpParHelper::FetchMatrix(SpMat<IT,NT,DER> & MRecv, const std::vector<IT> & essentials, std::vector<MPI_Win> & arrwin, int ownind)
553{
554 MRecv.Create(essentials); // allocate memory for arrays
555
556 Arr<IT,NT> arrinfo = MRecv.GetArrays();
557 assert( (arrwin.size() == arrinfo.totalsize()));
558
559 // C-binding for MPI::Get
560 // int MPI_Get(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp,
561 // int target_count, MPI_Datatype target_datatype, MPI_Win win)
562
563 IT essk = 0;
564 for(int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
565 {
566 //arrwin[essk].Lock(MPI::LOCK_SHARED, ownind, 0);
567 MPI_Get( arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), ownind, 0, arrinfo.indarrs[i].count, MPIType<IT>(), arrwin[essk++]);
568 }
569 for(int i=0; i< arrinfo.numarrs.size(); ++i) // get numerical arrays
570 {
571 //arrwin[essk].Lock(MPI::LOCK_SHARED, ownind, 0);
572 MPI_Get(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), ownind, 0, arrinfo.numarrs[i].count, MPIType<NT>(), arrwin[essk++]);
573 }
574}
575
576
582template<typename IT, typename NT, typename DER>
583void SpParHelper::BCastMatrix(MPI_Comm & comm1d, SpMat<IT,NT,DER> & Matrix, const std::vector<IT> & essentials, int root)
584{
585 int myrank;
586 MPI_Comm_rank(comm1d, &myrank);
587 if(myrank != root)
588 {
589 Matrix.Create(essentials); // allocate memory for arrays
590 }
591
592 Arr<IT,NT> arrinfo = Matrix.GetArrays();
593 for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
594 {
595 MPI_Bcast(arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), root, comm1d);
596 }
597 for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i) // get numerical arrays
598 {
599 MPI_Bcast(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), root, comm1d);
600 }
601}
602
608template<typename IT, typename NT, typename DER>
609void SpParHelper::IBCastMatrix(MPI_Comm & comm1d, SpMat<IT,NT,DER> & Matrix, const std::vector<IT> & essentials, int root, std::vector<MPI_Request> & indarrayReq , std::vector<MPI_Request> & numarrayReq)
610{
611 int myrank;
612 MPI_Comm_rank(comm1d, &myrank);
613 if(myrank != root)
614 {
615 Matrix.Create(essentials); // allocate memory for arrays
616 }
617
618 Arr<IT,NT> arrinfo = Matrix.GetArrays();
619 for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
620 {
621 MPI_Ibcast(arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), root, comm1d, &indarrayReq[i]);
622 }
623 for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i) // get numerical arrays
624 {
625 MPI_Ibcast(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), root, comm1d, &numarrayReq[i]);
626 }
627}
628
636template<typename IT, typename NT, typename DER>
637void SpParHelper::GatherMatrix(MPI_Comm & comm1d, SpMat<IT,NT,DER> & Matrix, int root)
638{
639 int myrank, nprocs;
640 MPI_Comm_rank(comm1d, &myrank);
641 MPI_Comm_size(comm1d,&nprocs);
642
643 /*
644 if(myrank != root)
645 {
646 Matrix.Create(essentials); // allocate memory for arrays
647 }
648 */
649
650 Arr<IT,NT> arrinfo = Matrix.GetArrays();
651 std::vector<std::vector<int>> recvcnt_ind(arrinfo.indarrs.size());
652 std::vector<std::vector<int>> recvcnt_num(arrinfo.numarrs.size());
653 for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
654 {
655 recvcnt_ind[i].resize(nprocs);
656 int lcount = (int)arrinfo.indarrs[i].count;
657 MPI_Gather(&lcount, 1, MPI_INT, recvcnt_ind[i].data(),1, MPI_INT, root, comm1d);
658 }
659 for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i) // get numerical arrays
660 {
661 recvcnt_num[i].resize(nprocs);
662 int lcount = (int) arrinfo.numarrs[i].count;
663 MPI_Gather(&lcount, 1, MPI_INT, recvcnt_num[i].data(),1, MPI_INT, root, comm1d);
664 }
665
666 // now gather the actual vector
667 std::vector<std::vector<int>> recvdsp_ind(arrinfo.indarrs.size());
668 std::vector<std::vector<int>> recvdsp_num(arrinfo.numarrs.size());
669 std::vector<std::vector<IT>> recvind(arrinfo.indarrs.size());
670 std::vector<std::vector<IT>> recvnum(arrinfo.numarrs.size());
671 for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
672 {
673 recvdsp_ind[i].resize(nprocs);
674 recvdsp_ind[i][0] = 0;
675 for(int j=1; j<nprocs; j++)
676 recvdsp_ind[i][j] = recvdsp_ind[i][j-1] + recvcnt_ind[i][j-1];
677 recvind[i].resize(recvdsp_ind[i][nprocs-1] + recvcnt_ind[i][nprocs-1]);
678 MPI_Gatherv(arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), recvind[i].data(),recvcnt_ind[i].data(), recvdsp_ind[i].data(), MPIType<IT>(), root, comm1d);
679 }
680
681
682 for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i) // gather num arrays
683 {
684 recvdsp_num[i].resize(nprocs);
685 recvdsp_num[i][0] = 0;
686 for(int j=1; j<nprocs; j++)
687 recvdsp_num[i][j] = recvdsp_num[i][j-1] + recvcnt_num[i][j-1];
688 recvnum[i].resize(recvdsp_num[i][nprocs-1] + recvcnt_num[i][nprocs-1]);
689 MPI_Gatherv(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), recvnum[i].data(),recvcnt_num[i].data(), recvdsp_num[i].data(), MPIType<NT>(), root, comm1d);
690 }
691}
692
693
694template <class IT, class NT, class DER>
695void SpParHelper::SetWindows(MPI_Comm & comm1d, const SpMat< IT,NT,DER > & Matrix, std::vector<MPI_Win> & arrwin)
696{
697 Arr<IT,NT> arrs = Matrix.GetArrays();
698
699 // static MPI::Win MPI::Win::create(const void *base, MPI::Aint size, int disp_unit, MPI::Info info, const MPI_Comm & comm);
700 // The displacement unit argument is provided to facilitate address arithmetic in RMA operations
701 // *** COLLECTIVE OPERATION ***, everybody exposes its own array to everyone else in the communicator
702
703 for(int i=0; i< arrs.indarrs.size(); ++i)
704 {
705 MPI_Win nWin;
706 MPI_Win_create(arrs.indarrs[i].addr,
707 arrs.indarrs[i].count * sizeof(IT), sizeof(IT), MPI_INFO_NULL, comm1d, &nWin);
708 arrwin.push_back(nWin);
709 }
710 for(int i=0; i< arrs.numarrs.size(); ++i)
711 {
712 MPI_Win nWin;
713 MPI_Win_create(arrs.numarrs[i].addr,
714 arrs.numarrs[i].count * sizeof(NT), sizeof(NT), MPI_INFO_NULL, comm1d, &nWin);
715 arrwin.push_back(nWin);
716 }
717}
718
719inline void SpParHelper::LockWindows(int ownind, std::vector<MPI_Win> & arrwin)
720{
721 for(std::vector<MPI_Win>::iterator itr = arrwin.begin(); itr != arrwin.end(); ++itr)
722 {
723 MPI_Win_lock(MPI_LOCK_SHARED, ownind, 0, *itr);
724 }
725}
726
727inline void SpParHelper::UnlockWindows(int ownind, std::vector<MPI_Win> & arrwin)
728{
729 for(std::vector<MPI_Win>::iterator itr = arrwin.begin(); itr != arrwin.end(); ++itr)
730 {
731 MPI_Win_unlock( ownind, *itr);
732 }
733}
734
735
740inline void SpParHelper::StartAccessEpoch(int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group)
741{
742 /* Now start using the whole comm as a group */
743 int acc_ranks[1];
744 acc_ranks[0] = owner;
745 MPI_Group access;
746 MPI_Group_incl(group, 1, acc_ranks, &access); // take only the owner
747
748 // begin the ACCESS epochs for the arrays of the remote matrices A and B
749 // Start() *may* block until all processes in the target group have entered their exposure epoch
750 for(unsigned int i=0; i< arrwin.size(); ++i)
751 MPI_Win_start(access, 0, arrwin[i]);
752
753 MPI_Group_free(&access);
754}
755
759inline void SpParHelper::PostExposureEpoch(int self, std::vector<MPI_Win> & arrwin, MPI_Group & group)
760{
761 // begin the EXPOSURE epochs for the arrays of the local matrices A and B
762 for(unsigned int i=0; i< arrwin.size(); ++i)
763 MPI_Win_post(group, MPI_MODE_NOPUT, arrwin[i]);
764}
765
766template <class IT, class DER>
767void SpParHelper::AccessNFetch(DER * & Matrix, int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group, IT ** sizes)
768{
769 StartAccessEpoch(owner, arrwin, group); // start the access epoch to arrwin of owner
770
771 std::vector<IT> ess(DER::esscount); // pack essentials to a vector
772 for(int j=0; j< DER::esscount; ++j)
773 ess[j] = sizes[j][owner];
774
775 Matrix = new DER(); // create the object first
776 FetchMatrix(*Matrix, ess, arrwin, owner); // then start fetching its elements
777}
778
779template <class IT, class DER>
780void SpParHelper::LockNFetch(DER * & Matrix, int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group, IT ** sizes)
781{
782 LockWindows(owner, arrwin);
783
784 std::vector<IT> ess(DER::esscount); // pack essentials to a vector
785 for(int j=0; j< DER::esscount; ++j)
786 ess[j] = sizes[j][owner];
787
788 Matrix = new DER(); // create the object first
789 FetchMatrix(*Matrix, ess, arrwin, owner); // then start fetching its elements
790}
791
797template <class IT, class NT, class DER>
798void SpParHelper::GetSetSizes(const SpMat<IT,NT,DER> & Matrix, IT ** & sizes, MPI_Comm & comm1d)
799{
800 std::vector<IT> essentials = Matrix.GetEssentials();
801 int index;
802 MPI_Comm_rank(comm1d, &index);
803
804 for(IT i=0; (unsigned)i < essentials.size(); ++i)
805 {
806 sizes[i][index] = essentials[i];
807 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), sizes[i], 1, MPIType<IT>(), comm1d);
808 }
809}
810
811inline void SpParHelper::PrintFile(const std::string & s, const std::string & filename)
812{
813 int myrank;
814 MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
815 if(myrank == 0)
816 {
817 std::ofstream out(filename.c_str(), std::ofstream::app);
818 out << s;
819 out.close();
820 }
821}
822
823inline void SpParHelper::PrintFile(const std::string & s, const std::string & filename, MPI_Comm & world)
824{
825 int myrank;
826 MPI_Comm_rank(world, &myrank);
827 if(myrank == 0)
828 {
829 std::ofstream out(filename.c_str(), std::ofstream::app);
830 out << s;
831 out.close();
832 }
833}
834
835
836inline void SpParHelper::Print(const std::string & s)
837{
838 int myrank;
839 MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
840 if(myrank == 0)
841 {
842 std::cerr << s;
843 }
844}
845
846inline void SpParHelper::Print(const std::string & s, MPI_Comm & world)
847{
848 int myrank;
849 MPI_Comm_rank(world, &myrank);
850 if(myrank == 0)
851 {
852 std::cerr << s;
853 }
854}
855
856
857inline void SpParHelper::check_newline(int *bytes_read, int bytes_requested, char *buf)
858{
859 if ((*bytes_read) < bytes_requested) {
860 // fewer bytes than expected, this means EOF
861 if (buf[(*bytes_read) - 1] != '\n') {
862 // doesn't terminate with a newline, add one to prevent infinite loop later
863 buf[(*bytes_read) - 1] = '\n';
864 std::cout << "Error in Matrix Market format, appending missing newline at end of file" << std::endl;
865 (*bytes_read)++;
866 }
867 }
868}
869
870
871inline bool SpParHelper::FetchBatch(MPI_File & infile, MPI_Offset & curpos, MPI_Offset end_fpos, bool firstcall, std::vector<std::string> & lines, int myrank)
872{
873 size_t bytes2fetch = ONEMILLION; // we might read more than needed but no problem as we won't process them
874 MPI_Status status;
875 int bytes_read;
876 if(firstcall && myrank != 0)
877 {
878 curpos -= 1; // first byte is to check whether we started at the beginning of a line
879 bytes2fetch += 1;
880 }
881 char * buf = new char[bytes2fetch]; // needs to happen **after** bytes2fetch is updated
882 char * originalbuf = buf; // so that we can delete it later because "buf" will move
883
884 MPI_File_read_at(infile, curpos, buf, bytes2fetch, MPI_CHAR, &status);
885 MPI_Get_count(&status, MPI_CHAR, &bytes_read); // MPI_Get_Count can only return 32-bit integers
886 if(!bytes_read)
887 {
888 delete [] originalbuf;
889 return true; // done
890 }
891 SpParHelper::check_newline(&bytes_read, bytes2fetch, buf);
892 if(firstcall && myrank != 0)
893 {
894 if(buf[0] == '\n') // we got super lucky and hit the line break
895 {
896 buf += 1;
897 bytes_read -= 1;
898 curpos += 1;
899 }
900 else // skip to the next line and let the preceeding processor take care of this partial line
901 {
902 char *c = (char*)memchr(buf, '\n', MAXLINELENGTH); // return a pointer to the matching byte or NULL if the character does not occur
903 if (c == NULL) {
904 std::cout << "Unexpected line without a break" << std::endl;
905 }
906 int n = c - buf + 1;
907 bytes_read -= n;
908 buf += n;
909 curpos += n;
910 }
911 }
912 while(bytes_read > 0 && curpos < end_fpos) // this will also finish the last line
913 {
914 char *c = (char*)memchr(buf, '\n', bytes_read); // return a pointer to the matching byte or NULL if the character does not occur
915 if (c == NULL) {
916 delete [] originalbuf;
917 return false; // if bytes_read stops in the middle of a line, that line will be re-read next time since curpos has not been moved forward yet
918 }
919 int n = c - buf + 1;
920
921 // string constructor from char * buffer: copies the first n characters from the array of characters pointed by s
922 lines.push_back(std::string(buf, n-1)); // no need to copy the newline character
923 bytes_read -= n; // reduce remaining bytes
924 buf += n; // move forward the buffer
925 curpos += n;
926 }
927 delete [] originalbuf;
928 if (curpos >= end_fpos) return true; // don't call it again, nothing left to read
929 else return false;
930}
931
932
933inline void SpParHelper::WaitNFree(std::vector<MPI_Win> & arrwin)
934{
935 // End the exposure epochs for the arrays of the local matrices A and B
936 // The Wait() call matches calls to Complete() issued by ** EACH OF THE ORIGIN PROCESSES **
937 // that were granted access to the window during this epoch.
938 for(unsigned int i=0; i< arrwin.size(); ++i)
939 {
940 MPI_Win_wait(arrwin[i]);
941 }
942 FreeWindows(arrwin);
943}
944
945inline void SpParHelper::FreeWindows(std::vector<MPI_Win> & arrwin)
946{
947 for(unsigned int i=0; i< arrwin.size(); ++i)
948 {
949 MPI_Win_free(&arrwin[i]);
950 }
951}
952
953}
int64_t IT
A small helper class used while sorting a pair of value and its index in an array/vector.
Definition indexHolder.h:20
static void FreeWindows(std::vector< MPI_Win > &arrwin)
static void StartAccessEpoch(int owner, std::vector< MPI_Win > &arrwin, MPI_Group &group)
static void LockWindows(int ownind, std::vector< MPI_Win > &arrwin)
static bool FetchBatch(MPI_File &infile, MPI_Offset &curpos, MPI_Offset end_fpos, bool firstcall, std::vector< std::string > &lines, int myrank)
static void GetSetSizes(const SpMat< IT, NT, DER > &Matrix, IT **&sizes, MPI_Comm &comm1d)
static void GlobalSelect(IT gl_rank, std::pair< KEY, VAL > *&low, std::pair< KEY, VAL > *&upp, std::pair< KEY, VAL > *array, IT length, const MPI_Comm &comm)
static void UnlockWindows(int ownind, std::vector< MPI_Win > &arrwin)
static void SetWindows(MPI_Comm &comm1d, const SpMat< IT, NT, DER > &Matrix, std::vector< MPI_Win > &arrwin)
static void BipartiteSwap(std::pair< KEY, VAL > *low, std::pair< KEY, VAL > *array, IT length, int nfirsthalf, int color, const MPI_Comm &comm)
static std::vector< std::pair< KEY, VAL > > KeyValuePSort(std::pair< KEY, VAL > *array, IT length, IT *dist, const MPI_Comm &comm)
static void PostExposureEpoch(int self, std::vector< MPI_Win > &arrwin, MPI_Group &group)
static void GatherMatrix(MPI_Comm &comm1d, SpMat< IT, NT, DER > &Matrix, int root)
static void PrintFile(const std::string &s, const std::string &filename)
static void BCastMatrix(MPI_Comm &comm1d, SpMat< IT, NT, DER > &Matrix, const std::vector< IT > &essentials, int root)
static void Print(const std::string &s)
static void WaitNFree(std::vector< MPI_Win > &arrwin)
static void FetchMatrix(SpMat< IT, NT, DER > &MRecv, const std::vector< IT > &essentials, std::vector< MPI_Win > &arrwin, int ownind)
static void ReDistributeToVector(int *&map_scnt, std::vector< std::vector< IT > > &locs_send, std::vector< std::vector< std::string > > &data_send, std::vector< std::array< char, MAXVERTNAME > > &distmapper_array, const MPI_Comm &comm)
static void DebugPrintKeys(std::pair< KEY, VAL > *array, IT length, IT *dist, MPI_Comm &World)
static void MemoryEfficientPSort(std::pair< KEY, VAL > *array, IT length, IT *dist, const MPI_Comm &comm)
static void LockNFetch(DER *&Matrix, int owner, std::vector< MPI_Win > &arrwin, MPI_Group &group, IT **sizes)
static void AccessNFetch(DER *&Matrix, int owner, std::vector< MPI_Win > &arrwin, MPI_Group &group, IT **sizes)
static void check_newline(int *bytes_read, int bytes_requested, char *buf)
static void IBCastMatrix(MPI_Comm &comm1d, SpMat< IT, NT, DER > &Matrix, const std::vector< IT > &essentials, int root, std::vector< MPI_Request > &indarrayReq, std::vector< MPI_Request > &numarrayReq)
int rank
int nprocs
Definition comms.cpp:55
#define MAXLINELENGTH
Definition SpDefs.h:61
#define MAXVERTNAME
Definition SpDefs.h:68
#define ONEMILLION
Definition SpDefs.h:60
void DeleteAll(A arr1)
Definition Deleter.h:48
T median(const T &a, const T &b, const T &c, Pred comp)
Definition sort.timpl.h:49
int sampleSort(std::vector< T > &in, std::vector< T > &out, MPI_Comm comm)
A parallel sample sort implementation. In our implementation, we do not pose any restriction on the i...
void parallel_sort(_RandomAccessIter first, _RandomAccessIter last, _Compare comp, long *dist_in, SeqSort< _SeqSortType > &mysort, Split< _SplitType > &mysplit, Merge< _MergeType > &mymerge, MPI_Comm comm)
Definition psort.h:38