COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
psort_samplesort.h
Go to the documentation of this file.
1/*
2Copyright (c) 2009, David Cheng, Viral B. Shah.
3
4Permission is hereby granted, free of charge, to any person obtaining a copy
5of this software and associated documentation files (the "Software"), to deal
6in the Software without restriction, including without limitation the rights
7to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8copies of the Software, and to permit persons to whom the Software is
9furnished to do so, subject to the following conditions:
10
11The above copyright notice and this permission notice shall be included in
12all copies or substantial portions of the Software.
13
14THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20THE SOFTWARE.
21*/
22
23#ifndef PSORT_SAMPLE_H
24
25#define PSORT_SAMPLE_H
26
27#include "psort_util.h"
28
29#define MPI_PSORT_TAG 31337
30
31namespace vpsort {
32 using namespace std;
33
34 // sample n_out elements from array in _without_ replacement
35 // done with two sweeps of size n_out
36 // in is touched, but restored
37 template<typename _RandomAccessIter, typename _ValueType, typename _Distance>
38 static void sample_splitters(_RandomAccessIter in, _Distance n_in,
39 _ValueType *out, long n_out) {
40 if (n_out >= n_in) {
41 // TODO
42 } else {
43 _Distance *picks = new _Distance[n_out];
44
45 for (int i=0; i<n_out; ++i) {
46 picks[i] = static_cast<_Distance> ((n_in - i) * drand48());
47 out[i] = *(in + picks[i]);
48 *(in + picks[i]) = *(in + (n_out - i - 1));
49 }
50
51 for (int i=n_out-1; i>=0; --i) {
52 *(in + (n_out - i - 1)) = *(in + picks[i]);
53 *(in + picks[i]) = out[i];
54 }
55
56 delete [] picks;
57 }
58 }
59
60 static inline void set_splitters(long *part_gsize, long n_parts, int nproc, long *dist, long *out) {
61 // impl 1: get all but last about correct size; last might pick up slack
62 long ind = 0, cum = 0;
63 for (long i=0; i<n_parts; ++i) {
64 cum += part_gsize[i];
65 if (cum > dist[ind]) {
66 out[ind] = i;
67 if (cum - dist[ind] > dist[ind] - cum + part_gsize[i]) {
68 --out[ind];
69 cum = part_gsize[i];
70 } else {
71 cum = 0;
72 }
73 if (++ind >= nproc - 1) break;
74 }
75 }
76 }
77
78 template<typename _ValueType>
79 static void redist (int *from_dist, _ValueType *current,
80 int *to_dist, _ValueType *final,
81 int rank, int nproc,
82 MPI_Datatype &MPI_valueType, MPI_Comm comm)
83 {
84 int ** starch = new int*[nproc];
85
86 for (int i = 0; i < nproc; ++i) {
87 starch[i] = new int[nproc];
88 for (int j = 0; j < nproc; ++j) {
89 starch[i][j] = 0;
90 }
91 }
92
93 int i = 0, j=0;
94 int from_delta = from_dist[0], to_delta = to_dist[0];
95
96 while ((i < nproc) && (j < nproc)) {
97
98 int diff = from_delta - to_delta;
99
100 if (diff == 0) {
101 starch[i][j] = from_delta;
102 ++i;
103 ++j;
104 from_delta = from_dist[i];
105 to_delta = to_dist[j];
106 } else if (diff > 0) {
107 starch[i][j] = to_delta;
108 ++j;
109 from_delta -= to_delta;
110 to_delta = to_dist[j];
111 } else {
112 starch[i][j] = from_delta;
113 ++i;
114 to_delta -= from_delta;
115 from_delta = from_dist[i];
116 }
117 }
118
119 MPI_Request send_req[nproc], recv_req[nproc];
120 MPI_Status status;
121
122 int sendl = 0, recvl = 0;
123
124 for (int p = 0; p <= rank; ++p) {
125
126 int torecv = starch[p][rank];
127 if (torecv) {
128 MPI_Irecv (&final[recvl], torecv,
129 MPI_valueType, p, MPI_PSORT_TAG,
130 comm, &recv_req[p]);
131 recvl += torecv;
132 }
133
134 int tosend = starch[rank][p];
135 if (tosend) {
136 MPI_Isend (&current[sendl], tosend,
137 MPI_valueType, p, MPI_PSORT_TAG,
138 comm, &send_req[p]);
139 sendl += tosend;
140 }
141
142 }
143
144 int sendr = 0, recvr = 0;
145
146 for (int p = nproc-1; p > rank; --p) {
147
148 int torecv = starch[p][rank];
149 if (torecv) {
150 recvr += torecv;
151 MPI_Irecv (&final[to_dist[rank]-recvr], torecv,
152 MPI_valueType, p, MPI_PSORT_TAG,
153 comm, &recv_req[p]);
154 }
155
156 int tosend = starch[rank][p];
157 if (tosend) {
158 sendr += tosend;
159 MPI_Isend (&current[from_dist[rank]-sendr], tosend,
160 MPI_valueType, p, MPI_PSORT_TAG,
161 comm, &send_req[p]);
162 }
163 }
164
165 for (int p = 0; p < nproc; ++p) {
166 if ( starch[rank][p] )
167 MPI_Wait(&send_req[p], &status);
168 if ( starch[p][rank] )
169 MPI_Wait(&recv_req[p], &status);
170 }
171
172 for (int i = 0; i < nproc; ++i)
173 delete [] starch[i];
174 delete [] starch;
175
176 }
177
178
179 template<typename _RandomAccessIter, typename _Compare,
180 typename _SeqSortType>
181 void parallel_samplesort(_RandomAccessIter first, _RandomAccessIter last,
182 _Compare comp, long *dist,
183 SeqSort<_SeqSortType> &mysort,
184 long s, long k,
185 MPI_Comm comm) {
186
187 typedef typename iterator_traits<_RandomAccessIter>::value_type _ValueType;
188 typedef typename iterator_traits<_RandomAccessIter>::difference_type _Distance;
189
190 int nproc, rank;
191 MPI_Comm_size(comm, &nproc);
192 MPI_Comm_rank(comm, &rank);
193
194 MPI_Datatype MPI_valueType, MPI_distanceType;
195 MPI_Type_contiguous (sizeof(_ValueType), MPI_CHAR, &MPI_valueType);
196 MPI_Type_commit (&MPI_valueType);
197 MPI_Type_contiguous (sizeof(_Distance), MPI_CHAR, &MPI_distanceType);
198 MPI_Type_commit (&MPI_distanceType);
199
200 _Distance n_loc = last - first;
201
202 progress (rank, 0, "Sample splitters", comm);
203
204 // randomly pick s*k sample splitters
205 long n_samp = s*k;
206 _ValueType *s_splitters = new _ValueType[n_samp];
207 sample_splitters(first, n_loc, s_splitters, n_samp);
208
209 // send the sample splitters to the root
210 long n_parts = nproc * k - 1;
211 _ValueType *p_splitters = new _ValueType[n_parts];
212 if (rank != 0) {
213 MPI_Gather(s_splitters, n_samp, MPI_valueType,
214 NULL, 0, MPI_valueType, 0, comm);
215 } else {
216 _ValueType *gath_splitters = new _ValueType[nproc * n_samp];
217 MPI_Gather(s_splitters, n_samp, MPI_valueType,
218 gath_splitters, n_samp, MPI_valueType,
219 0, comm);
220
221 // root sorts sample splitters serially
222 sort(gath_splitters, gath_splitters + (nproc * n_samp), comp);
223
224 // root picks pk-1 splitters, broadcasts
225 double stride = (double) (nproc * n_samp) / (nproc * k);
226 for (int i=0; i<n_parts; ++i) {
227 p_splitters[i] = gath_splitters[(int) (stride * (double) (i + 1))];
228 }
229 delete [] gath_splitters;
230 }
231
232 MPI_Bcast(p_splitters, n_parts, MPI_valueType, 0, comm);
233
234 delete [] s_splitters;
235
236 progress (rank, 1, "Partition", comm);
237
238 // apply the splitters
239 // impl1: binary search each element over the splitters
240 // TODO: impl2: recurse, partitioning dfs order over splitters
241 _Distance *part_ind = new _Distance[n_loc];
242 long *part_lsize = new long[n_parts + 1];
243 for (long i=0; i<=n_parts; ++i) part_lsize[i] = 0;
244
245 for (long i=0; i<n_loc; ++i) {
246 // TODO: load balance when splitters have same value
247 part_ind[i] = lower_bound(p_splitters, p_splitters + n_parts, *(first + i))
248 - p_splitters;
249 ++part_lsize[part_ind[i]];
250 }
251
252 delete [] p_splitters;
253
254 long *boundaries = new long[nproc - 1];
255 if (rank != 0) {
256 MPI_Reduce(part_lsize, NULL, n_parts + 1, MPI_LONG, MPI_SUM, 0, comm);
257 } else {
258 // determine global partition sizes, starch-minimizing boundaries
259 long *part_gsize = new long[n_parts + 1];
260 MPI_Reduce(part_lsize, part_gsize, n_parts + 1, MPI_LONG, MPI_SUM, 0, comm);
261 set_splitters (part_gsize, n_parts + 1, nproc, dist, boundaries);
262 delete [] part_gsize;
263 }
264 MPI_Bcast(boundaries, nproc - 1, MPI_LONG, 0, comm);
265
266 // send data according to boundaries
267 // part_proc[i] says which processor partition i goes to
268 long *part_proc = new long[n_parts + 1];
269 int out_counts[nproc];
270 for (int i=0; i<nproc; ++i) out_counts[i] = 0;
271 long cur = 0;
272 for (long i=0; i<=n_parts; ++i) {
273 if (cur < nproc - 1 && i > boundaries[cur]) {
274 ++cur;
275 }
276 part_proc[i] = cur;
277 out_counts[cur] += part_lsize[i];
278 }
279
280 delete [] boundaries;
281 delete [] part_lsize;
282
283 long curs[nproc];
284 _ValueType **s_bufs = new _ValueType*[nproc];
285 for (int i=0; i<nproc; ++i) {
286 curs[i] = 0;
287 s_bufs[i] = new _ValueType[out_counts[i]];
288 }
289 for (int i=0; i<n_loc; ++i) {
290 int dest = part_proc[part_ind[i]];
291 s_bufs[dest][curs[dest]++] = *(first + i);
292 }
293
294 delete [] part_ind;
295 delete [] part_proc;
296
297 progress (rank, 2, "Alltoall", comm);
298
299 // tell each processor how many elements to expect from m
300 int in_counts[nproc];
301 MPI_Alltoall(out_counts, 1, MPI_INT, in_counts, 1, MPI_INT, comm);
302
303 int offsets[nproc + 1];
304 offsets[0] = 0;
305 for (int i=1; i<=nproc; ++i) {
306 offsets[i] = offsets[i-1] + in_counts[i-1];
307 }
308 int n_loct = offsets[nproc];
309
310 MPI_Request send_reqs[nproc];
311 MPI_Request recv_reqs[nproc];
312 _ValueType *trans_data = new _ValueType[n_loct];
313
314 for (int i=0; i<nproc; ++i) {
315 MPI_Irecv(&trans_data[offsets[i]], in_counts[i],
316 MPI_valueType, i, MPI_PSORT_TAG, comm, &recv_reqs[i]);
317 }
318 for (int i=1; i<=nproc; ++i) {
319 int dest = (rank + i) % nproc;
320 MPI_Isend(s_bufs[dest], out_counts[dest], MPI_valueType,
321 dest, MPI_PSORT_TAG, comm, &send_reqs[dest]);
322 }
323
324 MPI_Waitall(nproc, send_reqs, MPI_STATUS_IGNORE);
325 for (int i=0; i < nproc; ++i) {
326 delete [] s_bufs[i];
327 }
328 delete [] s_bufs;
329
330 MPI_Waitall (nproc, recv_reqs, MPI_STATUS_IGNORE);
331
332 progress (rank, 3, "Sequential sort", comm);
333
334 mysort.seqsort (trans_data, trans_data + n_loct, comp);
335
336 progress (rank, 4, "Adjust boundaries", comm);
337
338 // starch
339 int trans_dist[nproc], dist_out[nproc];
340 MPI_Allgather(&n_loct, 1, MPI_INT, trans_dist, 1, MPI_INT, comm);
341 for (int i=0; i<nproc; ++i) dist_out[i] = dist[i];
342 redist(trans_dist, trans_data, dist_out, first, rank, nproc, MPI_valueType, comm);
343 delete [] trans_data;
344
345 progress (rank, 5, "Finish", comm);
346
347 MPI_Type_free (&MPI_valueType);
348 }
349
350 template<typename _RandomAccessIter>
351 void parallel_samplesort(_RandomAccessIter first, _RandomAccessIter last,
352 long *dist, MPI_Comm comm) {
353
354 typedef typename iterator_traits<_RandomAccessIter>::value_type _ValueType;
355
356 STLSort stl_sort;
357
358 int nproc;
359 MPI_Comm_size (comm, &nproc);
360 long s = nproc, k = nproc;
361 parallel_samplesort (first, last, less<_ValueType>(), dist, stl_sort, s, k, comm);
362
363 }
364
365} /* namespace vpsort */
366
367#endif /* PSORT_SAMPLE_H */
int rank
#define MPI_PSORT_TAG
Definition psort.h:28
void parallel_samplesort(_RandomAccessIter first, _RandomAccessIter last, _Compare comp, long *dist, SeqSort< _SeqSortType > &mysort, long s, long k, MPI_Comm comm)