COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
FastSV.h
Go to the documentation of this file.
1#include <mpi.h>
2
3// These macros should be defined before stdint.h is included
4#ifndef __STDC_CONSTANT_MACROS
5#define __STDC_CONSTANT_MACROS
6#endif
7#ifndef __STDC_LIMIT_MACROS
8#define __STDC_LIMIT_MACROS
9#endif
10#include <stdint.h>
11
12#include <sys/time.h>
13#include <algorithm>
14#include <iostream>
15#include <string>
16#include "CombBLAS/CombBLAS.h"
17#include "CombBLAS/SpHelper.h"
18
23namespace combblas {
24
25template <typename T1, typename T2>
26struct Select2ndMinSR
27{
29 static T_promote id(){ return std::numeric_limits<T_promote>::max(); };
30 static bool returnedSAID() { return false; }
31 static MPI_Op mpi_op() { return MPI_MIN; };
32
33 static T_promote add(const T_promote & arg1, const T_promote & arg2) {
34 return std::min(arg1, arg2);
35 }
36
37 static T_promote multiply(const T1 & arg1, const T2 & arg2) {
38 return static_cast<T_promote> (arg2);
39 }
40
41 static void axpy(const T1 a, const T2 & x, T_promote & y) {
42 y = add(y, multiply(a, x));
43 }
44};
45
46template<typename T>
47class BinaryMin {
48public:
49 BinaryMin() = default;
50 T operator()(const T &a, const T &b) {
51 return std::min(a, b);
52 }
53};
54
55template <typename IT>
56IT LabelCC(FullyDistVec<IT, IT> & father, FullyDistVec<IT, IT> & cclabel)
57{
58 cclabel = father;
59 cclabel.ApplyInd([](IT val, IT ind){return val==ind ? -1 : val;});
60 FullyDistSpVec<IT, IT> roots (cclabel, bind2nd(std::equal_to<IT>(), -1));
61 roots.nziota(0);
62 cclabel.Set(roots);
63 cclabel = cclabel(father);
64 return roots.getnnz();
65}
66
67template <class IT, class NT>
69 std::vector<std::vector<NT>> &reduceBuffer, NT MAX_FOR_REDUCE)
70{
71 auto commGrid = ind.getcommgrid();
72 MPI_Comm World = commGrid->GetWorld();
73 int nprocs = commGrid->GetSize();
74 int myrank;
75 MPI_Comm_rank(World,&myrank);
76
77 std::vector<int> sendcnt (nprocs,0);
78 std::vector<int> recvcnt (nprocs);
79 std::vector<std::vector<IT>> indBuf(nprocs);
80 std::vector<std::vector<NT>> valBuf(nprocs);
81
82 int loclen = ind.LocArrSize();
83 const IT *indices = ind.GetLocArr();
84 const IT *values = val.GetLocArr();
85 for(IT i = 0; i < loclen; ++i) {
86 IT locind;
87 int owner = ind.Owner(indices[i], locind);
88 if(reduceBuffer[owner].size() == 0) {
89 indBuf[owner].push_back(locind);
90 valBuf[owner].push_back(values[i]);
91 sendcnt[owner]++;
92 }
93 }
94
95 MPI_Alltoall(sendcnt.data(), 1, MPI_INT, recvcnt.data(), 1, MPI_INT, World);
96 IT totrecv = std::accumulate(recvcnt.begin(),recvcnt.end(), static_cast<IT>(0));
97 double reduceCost = ind.MyLocLength() * log2(nprocs); // bandwidth cost
98 IT reducesize = 0;
99 std::vector<IT> reducecnt(nprocs,0);
100
101 int nreduce = 0;
102 if(reduceCost < totrecv)
103 reducesize = ind.MyLocLength();
105
106 for(int i = 0; i < nprocs; ++i)
107 if (reducecnt[i] > 0) nreduce++;
108
109 if(nreduce > 0) {
112
113 int ireduce = 0;
114 for (int i = 0; i < nprocs; ++i) {
115 if(reducecnt[i] > 0) {
116 reduceBuffer[i].resize(reducecnt[i], MAX_FOR_REDUCE); // this is specific to LACC
117 for (int j = 0; j < sendcnt[i]; j++)
118 reduceBuffer[i][indBuf[i][j]] = std::min(reduceBuffer[i][indBuf[i][j]], valBuf[i][j]);
119 if (myrank == i) // recv
121 else // send
123 }
124 }
126 delete [] requests;
127 delete [] statuses;
128 }
129 return nreduce;
130}
131
132template <class IT, class NT>
134{
135 IT globallen = ind.TotalLength();
136 auto commGrid = ind.getcommgrid();
137 MPI_Comm World = commGrid->GetWorld();
138 int nprocs = commGrid->GetSize();
139 int * rdispls = new int[nprocs+1];
140 int * recvcnt = new int[nprocs];
141 int * sendcnt = new int[nprocs](); // initialize to 0
142 int * sdispls = new int[nprocs+1];
143
144 std::vector<std::vector<NT> > reduceBuffer(nprocs);
145 NT MAX_FOR_REDUCE = static_cast<NT>(globallen);
147
148 std::vector<std::vector<IT> > indBuf(nprocs);
149 std::vector<std::vector<NT> > valBuf(nprocs);
150
151 int loclen = ind.LocArrSize();
152 const IT *indices = ind.GetLocArr();
153 const IT *values = val.GetLocArr();
154 for(IT i = 0; i < loclen; ++i) {
155 IT locind;
156 int owner = ind.Owner(indices[i], locind);
157 if(reduceBuffer[owner].size() == 0) {
158 indBuf[owner].push_back(locind);
159 valBuf[owner].push_back(values[i]);
160 sendcnt[owner]++;
161 }
162 }
163
165 sdispls[0] = 0;
166 rdispls[0] = 0;
167 for(int i = 0; i < nprocs; ++i) {
168 sdispls[i + 1] = sdispls[i] + sendcnt[i];
169 rdispls[i + 1] = rdispls[i] + recvcnt[i];
170 }
171 IT totsend = sdispls[nprocs];
172 IT totrecv = rdispls[nprocs];
173
174 std::vector<IT> sendInd(totsend);
175 std::vector<NT> sendVal(totsend);
176 for(int i=0; i < nprocs; ++i) {
177 std::copy(indBuf[i].begin(), indBuf[i].end(), sendInd.begin()+sdispls[i]);
178 std::vector<IT>().swap(indBuf[i]);
179 std::copy(valBuf[i].begin(), valBuf[i].end(), sendVal.begin()+sdispls[i]);
180 std::vector<NT>().swap(valBuf[i]);
181 }
182 std::vector<IT> recvInd(totrecv);
183 std::vector<NT> recvVal(totrecv);
184
185 MPI_Alltoallv(sendInd.data(), sendcnt, sdispls, MPIType<IT>(), recvInd.data(), recvcnt, rdispls, MPIType<IT>(), World);
186 MPI_Alltoallv(sendVal.data(), sendcnt, sdispls, MPIType<IT>(), recvVal.data(), recvcnt, rdispls, MPIType<IT>(), World);
187 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
188
189 int myrank;
190 MPI_Comm_rank(World, &myrank);
191 if(reduceBuffer[myrank].size() > 0)
192 for(int i = 0; i<reduceBuffer[myrank].size(); i++)
193 if(reduceBuffer[myrank][i] < MAX_FOR_REDUCE) {
194 recvInd.push_back(i);
195 recvVal.push_back(reduceBuffer[myrank][i]);
196 }
197
198 FullyDistSpVec<IT, NT> indexed(commGrid, globallen, recvInd, recvVal, false, false);
199 return indexed;
200}
201
202template <class IT, class NT>
203int replicate(const FullyDistVec<IT, NT> &dense, const FullyDistVec<IT, IT> &ri, std::vector<std::vector<NT> > &bcastBuffer)
204{
205 auto commGrid = dense.getcommgrid();
206 MPI_Comm World = commGrid->GetWorld();
207 int nprocs = commGrid->GetSize();
208
209 std::vector<int> sendcnt (nprocs, 0);
210 std::vector<int> recvcnt (nprocs, 0);
211 IT length = ri.LocArrSize();
212 const IT *p = ri.GetLocArr();
213 for(IT i = 0; i < length; ++i) {
214 IT locind;
215 int owner = dense.Owner(p[i], locind);
216 sendcnt[owner]++;
217 }
218 MPI_Alltoall(sendcnt.data(), 1, MPI_INT, recvcnt.data(), 1, MPI_INT, World);
219 IT totrecv = std::accumulate(recvcnt.begin(), recvcnt.end(), static_cast<IT>(0));
220
221 double broadcast_cost = dense.LocArrSize() * log2(nprocs); // bandwidth cost
222 IT bcastsize = 0;
223 std::vector<IT> bcastcnt(nprocs, 0);
224
225 int nbcast = 0;
227 bcastsize = dense.LocArrSize();
229
230 for (int i = 0; i < nprocs; i++)
231 if (bcastcnt[i] > 0) nbcast++;
232
233 if (nbcast > 0) {
236 int ibcast = 0;
237 const NT * arr = dense.GetLocArr();
238 for(int i = 0; i < nprocs; i++) {
239 if (bcastcnt[i] > 0) {
240 bcastBuffer[i].resize(bcastcnt[i]);
241 std::copy(arr, arr + bcastcnt[i], bcastBuffer[i].begin());
243 }
244 }
246 delete [] requests;
247 delete [] statuses;
248 }
249 return nbcast;
250}
251
252template <class IT, class NT>
254{
255 auto commGrid = ri.getcommgrid();
256 MPI_Comm World = commGrid->GetWorld();
257 int nprocs = commGrid->GetSize();
258
259 std::vector<std::vector<NT> > bcastBuffer(nprocs);
261
262 std::vector<std::vector<IT> > data_req(nprocs);
263 std::vector<std::vector<IT> > revr_map(nprocs); // to put the incoming data to the correct location
264 const NT * arr = dense.GetLocArr();
265
266 IT length = ri.LocArrSize();
267 const IT *p = ri.GetLocArr();
268
269 std::vector<IT> q(length);
270 for(IT i = 0; i < length; ++i) {
271 IT locind;
272 int owner = dense.Owner(p[i], locind);
273 if(bcastBuffer[owner].size() == 0) {
274 data_req[owner].push_back(locind);
275 revr_map[owner].push_back(i);
276 } else {
277 q[i] = bcastBuffer[owner][locind];
278 }
279 }
280 int *sendcnt = new int[nprocs];
281 int *sdispls = new int[nprocs];
282 for(int i = 0; i < nprocs; ++i)
283 sendcnt[i] = (int) data_req[i].size();
284
285 int *rdispls = new int[nprocs];
286 int *recvcnt = new int[nprocs];
287
288 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
289
290 sdispls[0] = 0;
291 rdispls[0] = 0;
292 for(int i = 0; i < nprocs - 1; ++i) {
293 sdispls[i + 1] = sdispls[i] + sendcnt[i];
294 rdispls[i + 1] = rdispls[i] + recvcnt[i];
295 }
296 IT totsend = std::accumulate(sendcnt, sendcnt + nprocs, static_cast<IT>(0));
297 IT totrecv = std::accumulate(recvcnt, recvcnt + nprocs, static_cast<IT>(0));
298
299 IT *sendbuf = new IT[totsend];
300 for(int i = 0; i < nprocs; ++i) {
301 std::copy(data_req[i].begin(), data_req[i].end(), sendbuf + sdispls[i]);
302 std::vector<IT>().swap(data_req[i]);
303 }
304 IT *reversemap = new IT[totsend];
305 for(int i = 0; i < nprocs; ++i) {
306 std::copy(revr_map[i].begin(), revr_map[i].end(), reversemap + sdispls[i]); // reversemap array is unique
307 std::vector<IT>().swap(revr_map[i]);
308 }
309 IT *recvbuf = new IT[totrecv];
311 delete[] sendbuf;
312 // access requested data
313 NT *databack = new NT[totrecv];
314
315#ifdef THREADED
316#pragma omp parallel for
317#endif
318 for(int i = 0; i < totrecv; ++i)
319 databack[i] = arr[recvbuf[i]];
320 delete[] recvbuf;
321
322 // communicate requested data
323 NT *databuf = new NT[totsend];
324 // the response counts are the same as the request counts
326 // Create the output from databuf
327 for(int i = 0; i < totsend; ++i)
328 q[reversemap[i]] = databuf[i];
329
330 DeleteAll(rdispls, recvcnt, databack);
332 return FullyDistVec<IT, IT>(q, commGrid);
333}
334
335template<typename IT, typename NT, typename DER>
337{
338 FullyDistVec<IT, IT> D(A.getcommgrid());
339 D.iota(A.getnrow(), 0); // D[i] <- i
340 FullyDistVec<IT, IT> gp(D); // grandparent
341 FullyDistVec<IT, IT> dup(D); // duplication of grandparent
342 FullyDistVec<IT, IT> mngp(D); // minimum neighbor grandparent
343 FullyDistVec<IT, IT> mod(D.getcommgrid(), A.getnrow(), 1);
344 IT diff = D.TotalLength();
345 for (int iter = 1; diff != 0; iter++) {
346 if (diff * 50 > A.getnrow()) {
347 mngp = SpMV<Select2ndMinSR<NT, IT> >(A, gp); // minimum of neighbors' grandparent
348 } else {
349 FullyDistSpVec<IT, IT> SpMod(mod, [](IT m){ return m; });
351 [](IT m, IT p) { return p; },
352 [](IT m, IT p) { return true; },
353 false, static_cast<IT>(0));
354 FullyDistSpVec<IT, IT> hooks(A.getcommgrid(), A.getnrow());
356 mngp.EWiseApply(hooks, BinaryMin<IT>(),
357 [](IT a, IT b){ return true; }, false, A.getnrow());
358 }
360 D.Set(finalhooks);
361 D.EWiseApply(gp, BinaryMin<IT>());
362 D.EWiseApply(mngp, BinaryMin<IT>());
363 gp = Extract(D, D);
364 dup.EWiseOut(gp, [](IT a, IT b) { return static_cast<IT>(a != b); }, mod);
365 diff = static_cast<IT>(mod.Reduce(std::plus<IT>(), static_cast<IT>(0)));
366 dup = gp;
367 char out[100];
368 sprintf(out, "Iteration %d: diff %ld\n", iter, diff);
370 }
371 FullyDistVec<IT, IT> cc(D.getcommgrid());
372 nCC = LabelCC(gp, cc);
373 return cc;
374} /* SV() */
375
376} /* namespace combblas */
377
int64_t IT
double NT
Mac OS X ATTR com apple quarantine q
Definition ._remapper.cpp:1
T operator()(const T &a, const T &b)
Definition FastSV.h:50
static void Print(const std::string &s)
int size
Definition common.h:20
int nprocs
Definition comms.cpp:55
int replicate(const FullyDistVec< IT, NT > dense, FullyDistSpVec< IT, IT > ri, vector< vector< NT > > &bcastBuffer)
Definition CC.h:347
IT LabelCC(FullyDistVec< IT, IT > &parent, FullyDistVec< IT, IT > &cclabel)
Definition CC.h:1384
FullyDistSpVec< IT, NT > Assign(FullyDistSpVec< IT, IT > &ind, FullyDistSpVec< IT, NT > &val)
Definition CC.h:747
int ReduceAssign(FullyDistSpVec< IT, IT > &ind, FullyDistSpVec< IT, NT > &val, vector< vector< NT > > &reduceBuffer, NT MAX_FOR_REDUCE)
Definition CC.h:580
FullyDistSpVec< IT, NT > Extract(const FullyDistVec< IT, NT > dense, FullyDistSpVec< IT, IT > ri)
Definition CC.h:420
void DeleteAll(A arr1)
Definition Deleter.h:48
FullyDistVec< IT, IT > SV(SpParMat< IT, NT, DER > &A, IT &nCC)
Definition FastSV.h:336
double A
double D
Definition options.h:15
static MPI_Op mpi_op()
Definition FastSV.h:31
static bool returnedSAID()
Definition FastSV.h:30
promote_trait< T1, T2 >::T_promote T_promote
Definition FastSV.h:28
static T_promote add(const T_promote &arg1, const T_promote &arg2)
Definition FastSV.h:33
static T_promote id()
Definition FastSV.h:29
static T_promote multiply(const T1 &arg1, const T2 &arg2)
Definition FastSV.h:37
static T2 add(const T2 &arg1, const T2 &arg2)
static T2 multiply(const T1 &arg1, const T2 &arg2)
static void axpy(const T1 a, const T2 &x, T_promote &y)
Definition FastSV.h:41