COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
SpParMat.cpp
Go to the documentation of this file.
1/****************************************************************/
2/* Parallel Combinatorial BLAS Library (for Graph Computations) */
3/* version 1.6 -------------------------------------------------*/
4/* date: 6/15/2017 ---------------------------------------------*/
5/* authors: Ariful Azad, Aydin Buluc --------------------------*/
6/****************************************************************/
7/*
8 Copyright (c) 2010-2017, The Regents of the University of California
9
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the Software is
15 furnished to do so, subject to the following conditions:
16
17 The above copyright notice and this permission notice shall be included in
18 all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 THE SOFTWARE.
27 */
28
29
30
31#include "SpParMat.h"
32#include "ParFriends.h"
33#include "Operations.h"
34#include "FileHeader.h"
35extern "C" {
36#include "mmio.h"
37}
38#include <sys/types.h>
39#include <sys/stat.h>
40
41#include <mpi.h>
42#include <fstream>
43#include <algorithm>
44#include <set>
45#include <stdexcept>
46
47namespace combblas {
48
52template <class IT, class NT, class DER>
53SpParMat< IT,NT,DER >::SpParMat (std::ifstream & input, MPI_Comm & world)
54{
55 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
56 if(!input.is_open())
57 {
58 perror("Input file doesn't exist\n");
59 exit(-1);
60 }
61 commGrid.reset(new CommGrid(world, 0, 0));
62 input >> (*spSeq);
63}
64
65template <class IT, class NT, class DER>
66SpParMat< IT,NT,DER >::SpParMat (DER * myseq, MPI_Comm & world): spSeq(myseq)
67{
68 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
69 commGrid.reset(new CommGrid(world, 0, 0));
70}
71
72template <class IT, class NT, class DER>
73SpParMat< IT,NT,DER >::SpParMat (DER * myseq, std::shared_ptr<CommGrid> grid): spSeq(myseq)
74{
75 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
76 commGrid = grid;
77}
78
79template <class IT, class NT, class DER>
80SpParMat< IT,NT,DER >::SpParMat (std::shared_ptr<CommGrid> grid)
81{
82 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
83 spSeq = new DER();
84 commGrid = grid;
85}
86
88template <class IT, class NT, class DER>
89SpParMat< IT,NT,DER >::SpParMat ()
90{
91 SpParHelper::Print("COMBBLAS Warning: It is dangerous to create (matrix) objects without specifying the communicator, are you sure you want to create this object in MPI_COMM_WORLD?\n");
92 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
93 spSeq = new DER();
94 commGrid.reset(new CommGrid(MPI_COMM_WORLD, 0, 0));
95}
96
100template <class IT, class NT, class DER>
101SpParMat< IT,NT,DER >::SpParMat (MPI_Comm world)
102{
103
104 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
105 spSeq = new DER();
106 commGrid.reset(new CommGrid(world, 0, 0));
107}
108
109template <class IT, class NT, class DER>
110SpParMat< IT,NT,DER >::~SpParMat ()
111{
112 if(spSeq != NULL) delete spSeq;
113}
114
115template <class IT, class NT, class DER>
116void SpParMat< IT,NT,DER >::FreeMemory ()
117{
118 if(spSeq != NULL) delete spSeq;
119 spSeq = NULL;
120}
121
122
128template <class IT, class NT, class DER>
129template <typename VT, typename GIT> // GIT: global index type of vector
130void SpParMat<IT,NT,DER>::TopKGather(std::vector<NT> & all_medians, std::vector<IT> & nnz_per_col, int & thischunk, int & chunksize,
131 const std::vector<NT> & activemedians, const std::vector<IT> & activennzperc, int itersuntil,
132 std::vector< std::vector<NT> > & localmat, const std::vector<IT> & actcolsmap, std::vector<IT> & klimits,
133 std::vector<IT> & toretain, std::vector<std::vector<std::pair<IT,NT>>> & tmppair, IT coffset, const FullyDistVec<GIT,VT> & rvec) const
134{
135 int rankincol = commGrid->GetRankInProcCol();
136 int colneighs = commGrid->GetGridRows();
137 int nprocs = commGrid->GetSize();
138 std::vector<double> finalWeightedMedians(thischunk, 0.0);
139
140 MPI_Gather(activemedians.data() + itersuntil*chunksize, thischunk, MPIType<NT>(), all_medians.data(), thischunk, MPIType<NT>(), 0, commGrid->GetColWorld());
141 MPI_Gather(activennzperc.data() + itersuntil*chunksize, thischunk, MPIType<IT>(), nnz_per_col.data(), thischunk, MPIType<IT>(), 0, commGrid->GetColWorld());
142
143 if(rankincol == 0)
144 {
145 std::vector<double> columnCounts(thischunk, 0.0);
146 std::vector< std::pair<NT, double> > mediansNweights(colneighs); // (median,weight) pairs [to be reused at each iteration]
147
148 for(int j = 0; j < thischunk; ++j) // for each column
149 {
150 for(int k = 0; k<colneighs; ++k)
151 {
152 size_t fetchindex = k*thischunk+j;
153 columnCounts[j] += static_cast<double>(nnz_per_col[fetchindex]);
154 }
155 for(int k = 0; k<colneighs; ++k)
156 {
157 size_t fetchindex = k*thischunk+j;
158 mediansNweights[k] = std::make_pair(all_medians[fetchindex], static_cast<double>(nnz_per_col[fetchindex]) / columnCounts[j]);
159 }
160 sort(mediansNweights.begin(), mediansNweights.end()); // sort by median
161
162 double sumofweights = 0;
163 int k = 0;
164 while( k<colneighs && sumofweights < 0.5)
165 {
166 sumofweights += mediansNweights[k++].second;
167 }
168 finalWeightedMedians[j] = mediansNweights[k-1].first;
169 }
170 }
171 MPI_Bcast(finalWeightedMedians.data(), thischunk, MPIType<double>(), 0, commGrid->GetColWorld());
172
173 std::vector<IT> larger(thischunk, 0);
174 std::vector<IT> smaller(thischunk, 0);
175 std::vector<IT> equal(thischunk, 0);
176
177#ifdef THREADED
178#pragma omp parallel for
179#endif
180 for(int j = 0; j < thischunk; ++j) // for each active column
181 {
182 size_t fetchindex = actcolsmap[j+itersuntil*chunksize];
183 for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
184 {
185 // count those above/below/equal to the median
186 if(localmat[fetchindex][k] > finalWeightedMedians[j])
187 larger[j]++;
188 else if(localmat[fetchindex][k] < finalWeightedMedians[j])
189 smaller[j]++;
190 else
191 equal[j]++;
192 }
193 }
194 MPI_Allreduce(MPI_IN_PLACE, larger.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
195 MPI_Allreduce(MPI_IN_PLACE, smaller.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
196 MPI_Allreduce(MPI_IN_PLACE, equal.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
197
198 int numThreads = 1; // default case
199#ifdef THREADED
200 omp_lock_t lock[nprocs]; // a lock per recipient
201 for (int i=0; i<nprocs; i++)
202 omp_init_lock(&(lock[i]));
203#pragma omp parallel
204 {
205 numThreads = omp_get_num_threads();
206 }
207#endif
208
209 std::vector < std::vector<IT> > perthread2retain(numThreads);
210
211#ifdef THREADED
212#pragma omp parallel for
213#endif
214 for(int j = 0; j < thischunk; ++j) // for each active column
215 {
216#ifdef THREADED
217 int myThread = omp_get_thread_num();
218#else
219 int myThread = 0;
220#endif
221
222 // both clmapindex and fetchindex are unique for a given j (hence not shared among threads)
223 size_t clmapindex = j+itersuntil*chunksize; // klimits is of the same length as actcolsmap
224 size_t fetchindex = actcolsmap[clmapindex]; // localmat can only be dereferenced using the original indices.
225
226 // these following if/else checks are the same (because klimits/large/equal vectors are mirrored) on every processor along ColWorld
227 if(klimits[clmapindex] <= larger[j]) // the entries larger than Weighted-Median are plentiful, we can discard all the smaller/equal guys
228 {
229 std::vector<NT> survivors;
230 for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
231 {
232 if(localmat[fetchindex][k] > finalWeightedMedians[j]) // keep only the large guys (even equal guys go)
233 survivors.push_back(localmat[fetchindex][k]);
234 }
235 localmat[fetchindex].swap(survivors);
236 perthread2retain[myThread].push_back(clmapindex); // items to retain in actcolsmap
237 }
238 else if (klimits[clmapindex] > larger[j] + equal[j]) // the elements that are either larger or equal-to are surely keepers, no need to reprocess them
239 {
240 std::vector<NT> survivors;
241 for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
242 {
243 if(localmat[fetchindex][k] < finalWeightedMedians[j]) // keep only the small guys (even equal guys go)
244 survivors.push_back(localmat[fetchindex][k]);
245 }
246 localmat[fetchindex].swap(survivors);
247
248 klimits[clmapindex] -= (larger[j] + equal[j]); // update the k limit for this column only
249 perthread2retain[myThread].push_back(clmapindex); // items to retain in actcolsmap
250 }
251 else // larger[j] < klimits[clmapindex] && klimits[clmapindex] <= larger[j] + equal[j]
252 { // we will always have equal[j] > 0 because the weighted median is part of the dataset so it has to be equal to itself.
253 std::vector<NT> survivors;
254 for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
255 {
256 if(localmat[fetchindex][k] >= finalWeightedMedians[j]) // keep the larger and equal to guys (might exceed k-limit but that's fine according to MCL)
257 survivors.push_back(localmat[fetchindex][k]);
258 }
259 localmat[fetchindex].swap(survivors);
260
261 // We found it: the kth largest element in column (coffset + fetchindex) is finalWeightedMedians[j]
262 // But everyone in the same processor column has the information, only one of them should send it
263 IT n_perproc = getlocalcols() / colneighs; // find a typical processor's share
264 int assigned = std::max(static_cast<int>(fetchindex/n_perproc), colneighs-1);
265 if( assigned == rankincol)
266 {
267 IT locid;
268 int owner = rvec.Owner(coffset + fetchindex, locid);
269
270 #ifdef THREADED
271 omp_set_lock(&(lock[owner]));
272 #endif
273 tmppair[owner].emplace_back(std::make_pair(locid, finalWeightedMedians[j]));
274 #ifdef THREADED
275 omp_unset_lock(&(lock[owner]));
276 #endif
277 }
278 } // end_else
279 } // end_for
280 // ------ concatenate toretain "indices" processed by threads ------
281 std::vector<IT> tdisp(numThreads+1);
282 tdisp[0] = 0;
283 for(int i=0; i<numThreads; ++i)
284 {
285 tdisp[i+1] = tdisp[i] + perthread2retain[i].size();
286 }
287 toretain.resize(tdisp[numThreads]);
288
289#pragma omp parallel for
290 for(int i=0; i< numThreads; i++)
291 {
292 std::copy(perthread2retain[i].data() , perthread2retain[i].data()+ perthread2retain[i].size(), toretain.data() + tdisp[i]);
293 }
294
295#ifdef THREADED
296 for (int i=0; i<nprocs; i++) // destroy the locks
297 omp_destroy_lock(&(lock[i]));
298#endif
299}
300
301
307template <class IT, class NT, class DER>
308template <typename VT, typename GIT> // GIT: global index type of vector
309bool SpParMat<IT,NT,DER>::Kselect2(FullyDistVec<GIT,VT> & rvec, IT k_limit) const
310{
311 if(*rvec.commGrid != *commGrid)
312 {
313 SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
314 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
315 }
316
317
318 int rankincol = commGrid->GetRankInProcCol();
319 int rankinrow = commGrid->GetRankInProcRow();
320 int rowneighs = commGrid->GetGridCols(); // get # of processors on the row
321 int colneighs = commGrid->GetGridRows();
322 int myrank = commGrid->GetRank();
323 int nprocs = commGrid->GetSize();
324
325 FullyDistVec<GIT,IT> colcnt(commGrid);
326 Reduce(colcnt, Column, std::plus<IT>(), (IT) 0, [](NT i){ return (IT) 1;});
327
328 // <begin> Gather vector along columns (Logic copied from DimApply)
329 int xsize = (int) colcnt.LocArrSize();
330 int trxsize = 0;
331 int diagneigh = colcnt.commGrid->GetComplementRank();
332 MPI_Status status;
333 MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, commGrid->GetWorld(), &status);
334
335 IT * trxnums = new IT[trxsize];
336 MPI_Sendrecv(const_cast<IT*>(SpHelper::p2a(colcnt.arr)), xsize, MPIType<IT>(), diagneigh, TRX, trxnums, trxsize, MPIType<IT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
337
338 int * colsize = new int[colneighs];
339 colsize[rankincol] = trxsize;
340 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, commGrid->GetColWorld());
341 int * dpls = new int[colneighs](); // displacements (zero initialized pid)
342 std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
343 int accsize = std::accumulate(colsize, colsize+colneighs, 0);
344 std::vector<IT> percsum(accsize); // per column sum of the number of entries
345
346 MPI_Allgatherv(trxnums, trxsize, MPIType<IT>(), percsum.data(), colsize, dpls, MPIType<VT>(), commGrid->GetColWorld());
347 DeleteAll(trxnums,colsize, dpls);
348 // <end> Gather vector along columns
349
350 IT locm = getlocalcols(); // length (number of columns) assigned to this processor (and processor column)
351 std::vector< std::vector<NT> > localmat(locm); // some sort of minimal local copy of matrix
352
353#ifdef COMBBLAS_DEBUG
354 if(accsize != locm) std::cout << "Gather vector along columns logic is wrong" << std::endl;
355#endif
356
357 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
358 {
359 if(percsum[colit.colid()] >= k_limit) // don't make a copy of an inactive column
360 {
361 localmat[colit.colid()].reserve(colit.nnz());
362 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
363 {
364 localmat[colit.colid()].push_back(nzit.value());
365 }
366 }
367 }
368
369 int64_t activecols = std::count_if(percsum.begin(), percsum.end(), [k_limit](IT i){ return i >= k_limit;});
370 int64_t activennz = std::accumulate(percsum.begin(), percsum.end(), (int64_t) 0);
371
372 int64_t totactcols, totactnnzs;
373 MPI_Allreduce(&activecols, &totactcols, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
374 if(myrank == 0) std::cout << "Number of initial active columns are " << totactcols << std::endl;
375
376 MPI_Allreduce(&activennz, &totactnnzs, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
377 if(myrank == 0) std::cout << "Number of initial nonzeros are " << totactnnzs << std::endl;
378
379#ifdef COMBBLAS_DEBUG
380 IT glactcols = colcnt.Count([k_limit](IT i){ return i >= k_limit;});
381 if(myrank == 0) std::cout << "Number of initial active columns are " << glactcols << std::endl;
382 if(glactcols != totactcols) if(myrank == 0) std::cout << "Wrong number of active columns are computed" << std::endl;
383#endif
384
385
386 rvec = FullyDistVec<GIT,VT> ( rvec.getcommgrid(), getncol(), std::numeric_limits<NT>::min()); // set length of rvec correctly
387
388#ifdef COMBBLAS_DEBUG
389 PrintInfo();
390 rvec.PrintInfo("rvector");
391#endif
392
393 if(totactcols == 0)
394 {
395 std::ostringstream ss;
396 ss << "TopK: k_limit (" << k_limit <<")" << " >= maxNnzInColumn. Returning the result of Reduce(Column, minimum<NT>()) instead..." << std::endl;
397 SpParHelper::Print(ss.str());
398 return false; // no prune needed
399 }
400
401
402 std::vector<IT> actcolsmap(activecols); // the map that gives the original index of that active column (this map will shrink over iterations)
403 for (IT i=0, j=0; i< locm; ++i) {
404 if(percsum[i] >= k_limit)
405 actcolsmap[j++] = i;
406 }
407
408 std::vector<NT> all_medians;
409 std::vector<IT> nnz_per_col;
410 std::vector<IT> klimits(activecols, k_limit); // is distributed management of this vector needed?
411 int activecols_lowerbound = 10*colneighs;
412
413
414 IT * locncols = new IT[rowneighs];
415 locncols[rankinrow] = locm;
416 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetRowWorld());
417 IT coffset = std::accumulate(locncols, locncols+rankinrow, static_cast<IT>(0));
418 delete [] locncols;
419
420 /* Create/allocate variables for vector assignment */
421 MPI_Datatype MPI_pair;
422 MPI_Type_contiguous(sizeof(std::pair<IT,NT>), MPI_CHAR, &MPI_pair);
423 MPI_Type_commit(&MPI_pair);
424
425 int * sendcnt = new int[nprocs];
426 int * recvcnt = new int[nprocs];
427 int * sdispls = new int[nprocs]();
428 int * rdispls = new int[nprocs]();
429
430 while(totactcols > 0)
431 {
432 int chunksize, iterations, lastchunk;
433 if(activecols > activecols_lowerbound)
434 {
435 // two reasons for chunking:
436 // (1) keep memory limited to activecols (<= n/sqrt(p))
437 // (2) avoid overflow in sentcount
438 chunksize = static_cast<int>(activecols/colneighs); // invariant chunksize >= 10 (by activecols_lowerbound)
439 iterations = std::max(static_cast<int>(activecols/chunksize), 1);
440 lastchunk = activecols - (iterations-1)*chunksize; // lastchunk >= chunksize by construction
441 }
442 else
443 {
444 chunksize = activecols;
445 iterations = 1;
446 lastchunk = activecols;
447 }
448 std::vector<NT> activemedians(activecols); // one per "active" column
449 std::vector<IT> activennzperc(activecols);
450
451#ifdef THREADED
452#pragma omp parallel for
453#endif
454 for(IT i=0; i< activecols; ++i) // recompute the medians and nnzperc
455 {
456 size_t orgindex = actcolsmap[i]; // assert: no two threads will share the same "orgindex"
457 if(localmat[orgindex].empty())
458 {
459 activemedians[i] = (NT) 0;
460 activennzperc[i] = 0;
461 }
462 else
463 {
464 // this actually *sorts* increasing but doesn't matter as long we solely care about the median as opposed to a general nth element
465 auto entriesincol(localmat[orgindex]); // create a temporary vector as nth_element modifies the vector
466 std::nth_element(entriesincol.begin(), entriesincol.begin() + entriesincol.size()/2, entriesincol.end());
467 activemedians[i] = entriesincol[entriesincol.size()/2];
468 activennzperc[i] = entriesincol.size();
469 }
470 }
471
472 percsum.resize(activecols, 0);
473 MPI_Allreduce(activennzperc.data(), percsum.data(), activecols, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
474 activennz = std::accumulate(percsum.begin(), percsum.end(), (int64_t) 0);
475
476#ifdef COMBBLAS_DEBUG
477 MPI_Allreduce(&activennz, &totactnnzs, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
478 if(myrank == 0) std::cout << "Number of active nonzeros are " << totactnnzs << std::endl;
479#endif
480
481 std::vector<IT> toretain;
482 if(rankincol == 0)
483 {
484 all_medians.resize(lastchunk*colneighs);
485 nnz_per_col.resize(lastchunk*colneighs);
486 }
487 std::vector< std::vector< std::pair<IT,NT> > > tmppair(nprocs);
488 for(int i=0; i< iterations-1; ++i) // this loop should not be parallelized if we want to keep storage small
489 {
490 TopKGather(all_medians, nnz_per_col, chunksize, chunksize, activemedians, activennzperc, i, localmat, actcolsmap, klimits, toretain, tmppair, coffset, rvec);
491 }
492 TopKGather(all_medians, nnz_per_col, lastchunk, chunksize, activemedians, activennzperc, iterations-1, localmat, actcolsmap, klimits, toretain, tmppair, coffset, rvec);
493
494 /* Set the newly found vector entries */
495 IT totsend = 0;
496 for(IT i=0; i<nprocs; ++i)
497 {
498 sendcnt[i] = tmppair[i].size();
499 totsend += tmppair[i].size();
500 }
501
502 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());
503
504 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
505 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
506 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
507 assert((totsend < std::numeric_limits<int>::max()));
508 assert((totrecv < std::numeric_limits<int>::max()));
509
510 std::pair<IT,NT> * sendpair = new std::pair<IT,NT>[totsend];
511 for(int i=0; i<nprocs; ++i)
512 {
513 std::copy(tmppair[i].begin(), tmppair[i].end(), sendpair+sdispls[i]);
514 std::vector< std::pair<IT,NT> >().swap(tmppair[i]); // clear memory
515 }
516 std::vector< std::pair<IT,NT> > recvpair(totrecv);
517 MPI_Alltoallv(sendpair, sendcnt, sdispls, MPI_pair, recvpair.data(), recvcnt, rdispls, MPI_pair, commGrid->GetWorld());
518 delete [] sendpair;
519
520 IT updated = 0;
521 for(auto & update : recvpair ) // Now, write these to rvec
522 {
523 updated++;
524 rvec.arr[update.first] = update.second;
525 }
526#ifdef COMBBLAS_DEBUG
527 MPI_Allreduce(MPI_IN_PLACE, &updated, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
528 if(myrank == 0) std::cout << "Total vector entries updated " << updated << std::endl;
529#endif
530
531 /* End of setting up the newly found vector entries */
532
533 std::vector<IT> newactivecols(toretain.size());
534 std::vector<IT> newklimits(toretain.size());
535 IT newindex = 0;
536 for(auto & retind : toretain )
537 {
538 newactivecols[newindex] = actcolsmap[retind];
539 newklimits[newindex++] = klimits[retind];
540 }
541 actcolsmap.swap(newactivecols);
542 klimits.swap(newklimits);
543 activecols = actcolsmap.size();
544
545 MPI_Allreduce(&activecols, &totactcols, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
546#ifdef COMBBLAS_DEBUG
547 if(myrank == 0) std::cout << "Number of active columns are " << totactcols << std::endl;
548#endif
549 }
550 DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
551 MPI_Type_free(&MPI_pair);
552
553#ifdef COMBBLAS_DEBUG
554 if(myrank == 0) std::cout << "Exiting kselect2"<< std::endl;
555#endif
556 return true; // prune needed
557}
558
559
560
561template <class IT, class NT, class DER>
562void SpParMat< IT,NT,DER >::Dump(std::string filename) const
563{
564 MPI_Comm World = commGrid->GetWorld();
565 int rank = commGrid->GetRank();
566 int nprocs = commGrid->GetSize();
567
568 MPI_File thefile;
569 char * _fn = const_cast<char*>(filename.c_str());
570 MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
571
572 int rankinrow = commGrid->GetRankInProcRow();
573 int rankincol = commGrid->GetRankInProcCol();
574 int rowneighs = commGrid->GetGridCols(); // get # of processors on the row
575 int colneighs = commGrid->GetGridRows();
576
577 IT * colcnts = new IT[rowneighs];
578 IT * rowcnts = new IT[colneighs];
579 rowcnts[rankincol] = getlocalrows();
580 colcnts[rankinrow] = getlocalcols();
581
582 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), colcnts, 1, MPIType<IT>(), commGrid->GetRowWorld());
583 IT coloffset = std::accumulate(colcnts, colcnts+rankinrow, static_cast<IT>(0));
584
585 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), rowcnts, 1, MPIType<IT>(), commGrid->GetColWorld());
586 IT rowoffset = std::accumulate(rowcnts, rowcnts+rankincol, static_cast<IT>(0));
587 DeleteAll(colcnts, rowcnts);
588
589 IT * prelens = new IT[nprocs];
590 prelens[rank] = 2*getlocalnnz();
591 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
592 IT lengthuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
593
594 // The disp displacement argument specifies the position
595 // (absolute offset in bytes from the beginning of the file)
596 MPI_Offset disp = lengthuntil * sizeof(uint32_t);
597 char native[] = "native";
598 MPI_File_set_view(thefile, disp, MPI_UNSIGNED, MPI_UNSIGNED, native, MPI_INFO_NULL);
599 uint32_t * gen_edges = new uint32_t[prelens[rank]];
600
601 IT k = 0;
602 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
603 {
604 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
605 {
606 gen_edges[k++] = (uint32_t) (nzit.rowid() + rowoffset);
607 gen_edges[k++] = (uint32_t) (colit.colid() + coloffset);
608 }
609 }
610 assert(k == prelens[rank]);
611 MPI_File_write(thefile, gen_edges, prelens[rank], MPI_UNSIGNED, NULL);
612 MPI_File_close(&thefile);
613
614 delete [] prelens;
615 delete [] gen_edges;
616}
617
618
619template <class IT, class NT, class DER>
620void SpParMat< IT,NT,DER >::ParallelBinaryWrite(std::string filename) const
621{
622 int myrank = commGrid->GetRank();
623 int nprocs = commGrid->GetSize();
624 IT totalm = getnrow();
625 IT totaln = getncol();
626 IT totnnz = getnnz();
627
628
629 const int64_t headersize = 52; // 52 is the size of the header, 4 characters + 6*8 integer space
630 int64_t elementsize = 2*sizeof(IT)+sizeof(NT);
631 int64_t localentries = getlocalnnz();
632 int64_t localbytes = localentries*elementsize ; // localsize in bytes
633 if(myrank == 0)
634 localbytes += headersize;
635
636 int64_t bytesuntil = 0;
637 MPI_Exscan( &localbytes, &bytesuntil, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetWorld());
638 if(myrank == 0) bytesuntil = 0; // because MPI_Exscan says the recvbuf in process 0 is undefined
639 int64_t bytestotal;
640 MPI_Allreduce(&localbytes, &bytestotal, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetWorld());
641
642 size_t writtensofar = 0;
643 char * localdata = new char[localbytes];
644 if(myrank ==0)
645 {
646 char start[5] = "HKDT";
647 uint64_t hdr[6];
648 hdr[0] = 2; // version: 2.0
649 hdr[1] = sizeof(NT); // object size
650 hdr[2] = 0; // format: binary
651 hdr[3] = totalm; // number of rows
652 hdr[4] = totaln; // number of columns
653 hdr[5] = totnnz; // number of nonzeros
654
655 std::memmove(localdata, start, 4);
656 std::memmove(localdata+4, hdr, sizeof(hdr));
657 writtensofar = headersize;
658 }
659
660 IT roffset = 0;
661 IT coffset = 0;
662 GetPlaceInGlobalGrid(roffset, coffset);
663 roffset += 1; // increment by 1 (binary format is 1-based)
664 coffset += 1;
665
666 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over nonempty subcolumns
667 {
668 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
669 {
670 IT glrowid = nzit.rowid() + roffset;
671 IT glcolid = colit.colid() + coffset;
672 NT glvalue = nzit.value();
673 std::memmove(localdata+writtensofar, &glrowid, sizeof(IT));
674 std::memmove(localdata+writtensofar+sizeof(IT), &glcolid, sizeof(IT));
675 std::memmove(localdata+writtensofar+2*sizeof(IT), &glvalue, sizeof(NT));
676 writtensofar += (2*sizeof(IT) + sizeof(NT));
677 }
678 }
679#ifdef IODEBUG
680 if(myrank == 0)
681 cout << "local move happened..., writing to file\n";
682#endif
683
684
685 MPI_File thefile;
686 MPI_File_open(commGrid->GetWorld(), (char*) filename.c_str(), MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile) ;
687 MPI_File_set_view(thefile, bytesuntil, MPI_CHAR, MPI_CHAR, (char*)"native", MPI_INFO_NULL);
688
689 int64_t batchSize = 256 * 1024 * 1024; // 256 MB (per processor)
690 size_t localfileptr = 0;
691 int64_t remaining = localbytes;
692 int64_t totalremaining = bytestotal;
693
694 while(totalremaining > 0)
695 {
696 #ifdef IODEBUG
697 if(myrank == 0)
698 std::cout << "Remaining " << totalremaining << " bytes to write in aggregate" << std::endl;
699 #endif
700 MPI_Status status;
701 int curBatch = std::min(batchSize, remaining);
702 MPI_File_write_all(thefile, localdata+localfileptr, curBatch, MPI_CHAR, &status);
703 int count;
704 MPI_Get_count(&status, MPI_CHAR, &count); // known bug: https://github.com/pmodels/mpich/issues/2332
705 assert( (curBatch == 0) || (count == curBatch) ); // count can return the previous/wrong value when 0 elements are written
706 localfileptr += curBatch;
707 remaining -= curBatch;
708 MPI_Allreduce(&remaining, &totalremaining, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetWorld());
709 }
710 MPI_File_close(&thefile);
711
712 delete [] localdata;
713}
714
715template <class IT, class NT, class DER>
716SpParMat< IT,NT,DER >::SpParMat (const SpParMat< IT,NT,DER > & rhs)
717{
718 if(rhs.spSeq != NULL)
719 spSeq = new DER(*(rhs.spSeq)); // Deep copy of local block
720
721 commGrid = rhs.commGrid;
722}
723
724template <class IT, class NT, class DER>
725SpParMat< IT,NT,DER > & SpParMat< IT,NT,DER >::operator=(const SpParMat< IT,NT,DER > & rhs)
726{
727 if(this != &rhs)
728 {
731 if(spSeq != NULL) delete spSeq;
732 if(rhs.spSeq != NULL)
733 spSeq = new DER(*(rhs.spSeq)); // Deep copy of local block
734
735 commGrid = rhs.commGrid;
736 }
737 return *this;
738}
739
740template <class IT, class NT, class DER>
741SpParMat< IT,NT,DER > & SpParMat< IT,NT,DER >::operator+=(const SpParMat< IT,NT,DER > & rhs)
742{
743 if(this != &rhs)
744 {
745 if(*commGrid == *rhs.commGrid)
746 {
747 (*spSeq) += (*(rhs.spSeq));
748 }
749 else
750 {
751 std::cout << "Grids are not comparable for parallel addition (A+B)" << std::endl;
752 }
753 }
754 else
755 {
756 std::cout<< "Missing feature (A+A): Use multiply with 2 instead !"<<std::endl;
757 }
758 return *this;
759}
760
761template <class IT, class NT, class DER>
762float SpParMat< IT,NT,DER >::LoadImbalance() const
763{
764 IT totnnz = getnnz(); // collective call
765 IT maxnnz = 0;
766 IT localnnz = (IT) spSeq->getnnz();
767 MPI_Allreduce( &localnnz, &maxnnz, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
768 if(totnnz == 0) return 1;
769 return static_cast<float>((commGrid->GetSize() * maxnnz)) / static_cast<float>(totnnz);
770}
771
772template <class IT, class NT, class DER>
773IT SpParMat< IT,NT,DER >::getnnz() const
774{
775 IT totalnnz = 0;
776 IT localnnz = spSeq->getnnz();
777 MPI_Allreduce( &localnnz, &totalnnz, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
778 return totalnnz;
779}
780
781template <class IT, class NT, class DER>
782IT SpParMat< IT,NT,DER >::getnrow() const
783{
784 IT totalrows = 0;
785 IT localrows = spSeq->getnrow();
786 MPI_Allreduce( &localrows, &totalrows, 1, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
787 return totalrows;
788}
789
790template <class IT, class NT, class DER>
791IT SpParMat< IT,NT,DER >::getncol() const
792{
793 IT totalcols = 0;
794 IT localcols = spSeq->getncol();
795 MPI_Allreduce( &localcols, &totalcols, 1, MPIType<IT>(), MPI_SUM, commGrid->GetRowWorld());
796 return totalcols;
797}
798
799template <class IT, class NT, class DER>
800template <typename _BinaryOperation>
801void SpParMat<IT,NT,DER>::DimApply(Dim dim, const FullyDistVec<IT, NT>& x, _BinaryOperation __binary_op)
802{
803
804 if(!(*commGrid == *(x.commGrid)))
805 {
806 std::cout << "Grids are not comparable for SpParMat::DimApply" << std::endl;
807 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
808 }
809
810 MPI_Comm World = x.commGrid->GetWorld();
811 MPI_Comm ColWorld = x.commGrid->GetColWorld();
812 MPI_Comm RowWorld = x.commGrid->GetRowWorld();
813 switch(dim)
814 {
815 case Column: // scale each column
816 {
817 int xsize = (int) x.LocArrSize();
818 int trxsize = 0;
819 int diagneigh = x.commGrid->GetComplementRank();
820 MPI_Status status;
821 MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, World, &status);
822
823 NT * trxnums = new NT[trxsize];
824 MPI_Sendrecv(const_cast<NT*>(SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), diagneigh, TRX, trxnums, trxsize, MPIType<NT>(), diagneigh, TRX, World, &status);
825
826 int colneighs, colrank;
827 MPI_Comm_size(ColWorld, &colneighs);
828 MPI_Comm_rank(ColWorld, &colrank);
829 int * colsize = new int[colneighs];
830 colsize[colrank] = trxsize;
831
832 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
833 int * dpls = new int[colneighs](); // displacements (zero initialized pid)
834 std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
835 int accsize = std::accumulate(colsize, colsize+colneighs, 0);
836 NT * scaler = new NT[accsize];
837
838 MPI_Allgatherv(trxnums, trxsize, MPIType<NT>(), scaler, colsize, dpls, MPIType<NT>(), ColWorld);
839 DeleteAll(trxnums,colsize, dpls);
840
841 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
842 {
843 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
844 {
845 nzit.value() = __binary_op(nzit.value(), scaler[colit.colid()]);
846 }
847 }
848 delete [] scaler;
849 break;
850 }
851 case Row:
852 {
853 int xsize = (int) x.LocArrSize();
854 int rowneighs, rowrank;
855 MPI_Comm_size(RowWorld, &rowneighs);
856 MPI_Comm_rank(RowWorld, &rowrank);
857 int * rowsize = new int[rowneighs];
858 rowsize[rowrank] = xsize;
859 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, rowsize, 1, MPI_INT, RowWorld);
860 int * dpls = new int[rowneighs](); // displacements (zero initialized pid)
861 std::partial_sum(rowsize, rowsize+rowneighs-1, dpls+1);
862 int accsize = std::accumulate(rowsize, rowsize+rowneighs, 0);
863 NT * scaler = new NT[accsize];
864
865 MPI_Allgatherv(const_cast<NT*>(SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), scaler, rowsize, dpls, MPIType<NT>(), RowWorld);
866 DeleteAll(rowsize, dpls);
867
868 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
869 {
870 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
871 {
872 nzit.value() = __binary_op(nzit.value(), scaler[nzit.rowid()]);
873 }
874 }
875 delete [] scaler;
876 break;
877 }
878 default:
879 {
880 std::cout << "Unknown scaling dimension, returning..." << std::endl;
881 break;
882 }
883 }
884}
885
886template <class IT, class NT, class DER>
887template <typename _BinaryOperation, typename _UnaryOperation >
888FullyDistVec<IT,NT> SpParMat<IT,NT,DER>::Reduce(Dim dim, _BinaryOperation __binary_op, NT id, _UnaryOperation __unary_op) const
889{
890 IT length;
891 switch(dim)
892 {
893 case Column:
894 {
895 length = getncol();
896 break;
897 }
898 case Row:
899 {
900 length = getnrow();
901 break;
902 }
903 default:
904 {
905 std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
906 break;
907 }
908 }
909 FullyDistVec<IT,NT> parvec(commGrid, length, id);
910 Reduce(parvec, dim, __binary_op, id, __unary_op);
911 return parvec;
912}
913
914template <class IT, class NT, class DER>
915template <typename _BinaryOperation>
916FullyDistVec<IT,NT> SpParMat<IT,NT,DER>::Reduce(Dim dim, _BinaryOperation __binary_op, NT id) const
917{
918 IT length;
919 switch(dim)
920 {
921 case Column:
922 {
923 length = getncol();
924 break;
925 }
926 case Row:
927 {
928 length = getnrow();
929 break;
930 }
931 default:
932 {
933 std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
934 break;
935 }
936 }
937 FullyDistVec<IT,NT> parvec(commGrid, length, id);
938 Reduce(parvec, dim, __binary_op, id, myidentity<NT>()); // myidentity<NT>() is a no-op function
939 return parvec;
940}
941
942
943template <class IT, class NT, class DER>
944template <typename VT, typename GIT, typename _BinaryOperation>
945void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id) const
946{
947 Reduce(rvec, dim, __binary_op, id, myidentity<NT>() );
948}
949
950
951template <class IT, class NT, class DER>
952template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation> // GIT: global index type of vector
953void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op) const
954{
955 Reduce(rvec, dim, __binary_op, id, __unary_op, MPIOp<_BinaryOperation, VT>::op() );
956}
957
958
959template <class IT, class NT, class DER>
960template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation> // GIT: global index type of vector
961void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op, MPI_Op mympiop) const
962{
963 if(*rvec.commGrid != *commGrid)
964 {
965 SpParHelper::Print("Grids are not comparable, SpParMat::Reduce() fails!", commGrid->GetWorld());
966 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
967 }
968 switch(dim)
969 {
970 case Column: // pack along the columns, result is a vector of size n
971 {
972 // We can't use rvec's distribution (rows first, columns later) here
973 // because the ownership model of the vector has the order P(0,0), P(0,1),...
974 // column reduction will first generate vector distribution in P(0,0), P(1,0),... order.
975
976 IT n_thiscol = getlocalcols(); // length assigned to this processor column
977 int colneighs = commGrid->GetGridRows(); // including oneself
978 int colrank = commGrid->GetRankInProcCol();
979
980 GIT * loclens = new GIT[colneighs];
981 GIT * lensums = new GIT[colneighs+1](); // begin/end points of local lengths
982
983 GIT n_perproc = n_thiscol / colneighs; // length on a typical processor
984 if(colrank == colneighs-1)
985 loclens[colrank] = n_thiscol - (n_perproc*colrank);
986 else
987 loclens[colrank] = n_perproc;
988
989 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetColWorld());
990 std::partial_sum(loclens, loclens+colneighs, lensums+1); // loclens and lensums are different, but both would fit in 32-bits
991
992 std::vector<VT> trarr;
993 typename DER::SpColIter colit = spSeq->begcol();
994 for(int i=0; i< colneighs; ++i)
995 {
996 VT * sendbuf = new VT[loclens[i]];
997 std::fill(sendbuf, sendbuf+loclens[i], id); // fill with identity
998
999 for(; colit != spSeq->endcol() && colit.colid() < lensums[i+1]; ++colit) // iterate over a portion of columns
1000 {
1001 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit) // all nonzeros in this column
1002 {
1003 sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1004 }
1005 }
1006
1007 VT * recvbuf = NULL;
1008 if(colrank == i)
1009 {
1010 trarr.resize(loclens[i]);
1011 recvbuf = SpHelper::p2a(trarr);
1012 }
1013 MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), mympiop, i, commGrid->GetColWorld()); // root = i
1014 delete [] sendbuf;
1015 }
1016 DeleteAll(loclens, lensums);
1017
1018 GIT reallen; // Now we have to transpose the vector
1019 GIT trlen = trarr.size();
1020 int diagneigh = commGrid->GetComplementRank();
1021 MPI_Status status;
1022 MPI_Sendrecv(&trlen, 1, MPIType<IT>(), diagneigh, TRNNZ, &reallen, 1, MPIType<IT>(), diagneigh, TRNNZ, commGrid->GetWorld(), &status);
1023
1024 rvec.arr.resize(reallen);
1025 MPI_Sendrecv(SpHelper::p2a(trarr), trlen, MPIType<VT>(), diagneigh, TRX, SpHelper::p2a(rvec.arr), reallen, MPIType<VT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
1026 rvec.glen = getncol(); // ABAB: Put a sanity check here
1027 break;
1028
1029 }
1030 case Row: // pack along the rows, result is a vector of size m
1031 {
1032 rvec.glen = getnrow();
1033 int rowneighs = commGrid->GetGridCols();
1034 int rowrank = commGrid->GetRankInProcRow();
1035 GIT * loclens = new GIT[rowneighs];
1036 GIT * lensums = new GIT[rowneighs+1](); // begin/end points of local lengths
1037 loclens[rowrank] = rvec.MyLocLength();
1038 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetRowWorld());
1039 std::partial_sum(loclens, loclens+rowneighs, lensums+1);
1040 try
1041 {
1042 rvec.arr.resize(loclens[rowrank], id);
1043
1044 // keeping track of all nonzero iterators within columns at once is unscalable w.r.t. memory (due to sqrt(p) scaling)
1045 // thus we'll do batches of column as opposed to all columns at once. 5 million columns take 80MB (two pointers per column)
1046 #define MAXCOLUMNBATCH 5 * 1024 * 1024
1047 typename DER::SpColIter begfinger = spSeq->begcol(); // beginning finger to columns
1048
1049 // Each processor on the same processor row should execute the SAME number of reduce calls
1050 int numreducecalls = (int) ceil(static_cast<float>(spSeq->getnzc()) / static_cast<float>(MAXCOLUMNBATCH));
1051 int maxreducecalls;
1052 MPI_Allreduce( &numreducecalls, &maxreducecalls, 1, MPI_INT, MPI_MAX, commGrid->GetRowWorld());
1053
1054 for(int k=0; k< maxreducecalls; ++k)
1055 {
1056 std::vector<typename DER::SpColIter::NzIter> nziters;
1057 typename DER::SpColIter curfinger = begfinger;
1058 for(; curfinger != spSeq->endcol() && nziters.size() < MAXCOLUMNBATCH ; ++curfinger)
1059 {
1060 nziters.push_back(spSeq->begnz(curfinger));
1061 }
1062 for(int i=0; i< rowneighs; ++i) // step by step to save memory
1063 {
1064 VT * sendbuf = new VT[loclens[i]];
1065 std::fill(sendbuf, sendbuf+loclens[i], id); // fill with identity
1066
1067 typename DER::SpColIter colit = begfinger;
1068 IT colcnt = 0; // "processed column" counter
1069 for(; colit != curfinger; ++colit, ++colcnt) // iterate over this batch of columns until curfinger
1070 {
1071 typename DER::SpColIter::NzIter nzit = nziters[colcnt];
1072 for(; nzit != spSeq->endnz(colit) && nzit.rowid() < lensums[i+1]; ++nzit) // a portion of nonzeros in this column
1073 {
1074 sendbuf[nzit.rowid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[nzit.rowid()-lensums[i]]);
1075 }
1076 nziters[colcnt] = nzit; // set the new finger
1077 }
1078
1079 VT * recvbuf = NULL;
1080 if(rowrank == i)
1081 {
1082 for(int j=0; j< loclens[i]; ++j)
1083 {
1084 sendbuf[j] = __binary_op(rvec.arr[j], sendbuf[j]); // rvec.arr will be overriden with MPI_Reduce, save its contents
1085 }
1086 recvbuf = SpHelper::p2a(rvec.arr);
1087 }
1088 MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), mympiop, i, commGrid->GetRowWorld()); // root = i
1089 delete [] sendbuf;
1090 }
1091 begfinger = curfinger; // set the next begfilter
1092 }
1093 DeleteAll(loclens, lensums);
1094 }
1095 catch (std::length_error& le)
1096 {
1097 std::cerr << "Length error: " << le.what() << std::endl;
1098 }
1099 break;
1100 }
1101 default:
1102 {
1103 std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
1104 break;
1105 }
1106 }
1107}
1108
1109#ifndef KSELECTLIMIT
1110#define KSELECTLIMIT 10000
1111#endif
1112
1118template <class IT, class NT, class DER>
1119template <typename VT, typename GIT>
1120bool SpParMat<IT,NT,DER>::Kselect(FullyDistSpVec<GIT,VT> & kth, IT k_limit, int kselectVersion) const
1121{
1122#ifdef COMBBLAS_DEBUG
1123 FullyDistVec<GIT,VT> test1(kth.getcommgrid());
1124 FullyDistVec<GIT,VT> test2(kth.getcommgrid());
1125 Kselect1(test1, k_limit, myidentity<NT>());
1126 Kselect2(test2, k_limit);
1127 if(test1 == test2)
1128 SpParHelper::Print("Kselect1 and Kselect2 producing same results\n");
1129 else
1130 {
1131 SpParHelper::Print("WARNING: Kselect1 and Kselect2 producing DIFFERENT results\n");
1132 test1.PrintToFile("test1");
1133 test2.PrintToFile("test2");
1134 }
1135#endif
1136
1137 if(kselectVersion==1 || k_limit < KSELECTLIMIT)
1138 {
1139 return Kselect1(kth, k_limit, myidentity<NT>());
1140 }
1141 else
1142 {
1143 FullyDistVec<GIT,VT> kthAll ( getcommgrid());
1144 bool ret = Kselect2(kthAll, k_limit);
1145 FullyDistSpVec<GIT,VT> temp = EWiseApply<VT>(kth, kthAll,
1146 [](VT spval, VT dval){return dval;},
1147 [](VT spval, VT dval){return true;},
1148 false, NT());
1149 kth = temp;
1150 return ret;
1151 }
1152}
1153
1154
1158template <class IT, class NT, class DER>
1159template <typename VT, typename GIT>
1160bool SpParMat<IT,NT,DER>::Kselect(FullyDistVec<GIT,VT> & rvec, IT k_limit, int kselectVersion) const
1161{
1162#ifdef COMBBLAS_DEBUG
1163 FullyDistVec<GIT,VT> test1(rvec.getcommgrid());
1164 FullyDistVec<GIT,VT> test2(rvec.getcommgrid());
1165 Kselect1(test1, k_limit, myidentity<NT>());
1166 Kselect2(test2, k_limit);
1167 if(test1 == test2)
1168 SpParHelper::Print("Kselect1 and Kselect2 producing same results\n");
1169 else
1170 {
1171 SpParHelper::Print("WARNING: Kselect1 and Kselect2 producing DIFFERENT results\n");
1172 //test1.PrintToFile("test1");
1173 //test2.PrintToFile("test2");
1174 }
1175#endif
1176
1177 if(kselectVersion==1 || k_limit < KSELECTLIMIT)
1178 return Kselect1(rvec, k_limit, myidentity<NT>());
1179 else
1180 return Kselect2(rvec, k_limit);
1181
1182}
1183
1184/* identify the k-th maximum element in each column of a matrix
1185** if the number of nonzeros in a column is less than or equal to k, return minimum entry
1186** Caution: this is a preliminary implementation: needs 3*(n/sqrt(p))*k memory per processor
1187** this memory requirement is too high for larger k
1188 */
1189template <class IT, class NT, class DER>
1190template <typename VT, typename GIT, typename _UnaryOperation> // GIT: global index type of vector
1191bool SpParMat<IT,NT,DER>::Kselect1(FullyDistVec<GIT,VT> & rvec, IT k, _UnaryOperation __unary_op) const
1192{
1193 if(*rvec.commGrid != *commGrid)
1194 {
1195 SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
1196 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1197 }
1198
1199 std::cerr << "Dense kslect is called!! " << std::endl;
1200 FullyDistVec<IT, IT> nnzPerColumn (getcommgrid());
1201 Reduce(nnzPerColumn, Column, std::plus<IT>(), (IT)0, [](NT val){return (IT)1;});
1202 IT maxnnzPerColumn = nnzPerColumn.Reduce(maximum<IT>(), (IT)0);
1203 if(k>maxnnzPerColumn)
1204 {
1205 SpParHelper::Print("Kselect: k is greater then maxNnzInColumn. Calling Reduce instead...\n");
1206 Reduce(rvec, Column, minimum<NT>(), static_cast<NT>(0));
1207 return false;
1208 }
1209
1210 IT n_thiscol = getlocalcols(); // length (number of columns) assigned to this processor (and processor column)
1211
1212 // check, memory should be min(n_thiscol*k, local nnz)
1213 // hence we will not overflow for very large k
1214 std::vector<VT> sendbuf(n_thiscol*k);
1215 std::vector<IT> send_coldisp(n_thiscol+1,0);
1216 std::vector<IT> local_coldisp(n_thiscol+1,0);
1217
1218
1219 //displacement of local columns
1220 //local_coldisp is the displacement of all nonzeros per column
1221 //send_coldisp is the displacement of k nonzeros per column
1222 IT nzc = 0;
1223 if(spSeq->getnnz()>0)
1224 {
1225 typename DER::SpColIter colit = spSeq->begcol();
1226 for(IT i=0; i<n_thiscol; ++i)
1227 {
1228 local_coldisp[i+1] = local_coldisp[i];
1229 send_coldisp[i+1] = send_coldisp[i];
1230 if((colit != spSeq->endcol()) && (i==colit.colid()))
1231 {
1232 local_coldisp[i+1] += colit.nnz();
1233 if(colit.nnz()>=k)
1234 send_coldisp[i+1] += k;
1235 else
1236 send_coldisp[i+1] += colit.nnz();
1237 colit++;
1238 nzc++;
1239 }
1240 }
1241 }
1242 assert(local_coldisp[n_thiscol] == spSeq->getnnz());
1243
1244 // a copy of local part of the matrix
1245 // this can be avoided if we write our own local kselect function instead of using partial_sort
1246 std::vector<VT> localmat(spSeq->getnnz());
1247
1248
1249#ifdef THREADED
1250#pragma omp parallel for
1251#endif
1252 for(IT i=0; i<nzc; i++)
1253 //for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1254 {
1255 typename DER::SpColIter colit = spSeq->begcol() + i;
1256 IT colid = colit.colid();
1257 IT idx = local_coldisp[colid];
1258 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
1259 {
1260 localmat[idx++] = static_cast<VT>(__unary_op(nzit.value()));
1261 }
1262
1263 if(colit.nnz()<=k)
1264 {
1265 std::sort(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid+1], std::greater<VT>());
1266 std::copy(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid+1], sendbuf.begin()+send_coldisp[colid]);
1267 }
1268 else
1269 {
1270 std::partial_sort(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid]+k, localmat.begin()+local_coldisp[colid+1], std::greater<VT>());
1271 std::copy(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid]+k, sendbuf.begin()+send_coldisp[colid]);
1272 }
1273 }
1274
1275 std::vector<VT>().swap(localmat);
1276 std::vector<IT>().swap(local_coldisp);
1277
1278 std::vector<VT> recvbuf(n_thiscol*k);
1279 std::vector<VT> tempbuf(n_thiscol*k);
1280 std::vector<IT> recv_coldisp(n_thiscol+1);
1281 std::vector<IT> templen(n_thiscol);
1282
1283 int colneighs = commGrid->GetGridRows();
1284 int colrank = commGrid->GetRankInProcCol();
1285
1286 for(int p=2; p <= colneighs; p*=2)
1287 {
1288
1289 if(colrank%p == p/2) // this processor is a sender in this round
1290 {
1291 int receiver = colrank - ceil(p/2);
1292 MPI_Send(send_coldisp.data(), n_thiscol+1, MPIType<IT>(), receiver, 0, commGrid->GetColWorld());
1293 MPI_Send(sendbuf.data(), send_coldisp[n_thiscol], MPIType<VT>(), receiver, 1, commGrid->GetColWorld());
1294 //break;
1295 }
1296 else if(colrank%p == 0) // this processor is a receiver in this round
1297 {
1298 int sender = colrank + ceil(p/2);
1299 if(sender < colneighs)
1300 {
1301
1302 MPI_Recv(recv_coldisp.data(), n_thiscol+1, MPIType<IT>(), sender, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1303 MPI_Recv(recvbuf.data(), recv_coldisp[n_thiscol], MPIType<VT>(), sender, 1, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1304
1305
1306
1307#ifdef THREADED
1308#pragma omp parallel for
1309#endif
1310 for(IT i=0; i<n_thiscol; ++i)
1311 {
1312 // partial merge until first k elements
1313 IT j=send_coldisp[i], l=recv_coldisp[i];
1314 //IT templen[i] = k*i;
1315 IT offset = k*i;
1316 IT lid = 0;
1317 for(; j<send_coldisp[i+1] && l<recv_coldisp[i+1] && lid<k;)
1318 {
1319 if(sendbuf[j] > recvbuf[l]) // decision
1320 tempbuf[offset+lid++] = sendbuf[j++];
1321 else
1322 tempbuf[offset+lid++] = recvbuf[l++];
1323 }
1324 while(j<send_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = sendbuf[j++];
1325 while(l<recv_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = recvbuf[l++];
1326 templen[i] = lid;
1327 }
1328
1329 send_coldisp[0] = 0;
1330 for(IT i=0; i<n_thiscol; i++)
1331 {
1332 send_coldisp[i+1] = send_coldisp[i] + templen[i];
1333 }
1334
1335
1336#ifdef THREADED
1337#pragma omp parallel for
1338#endif
1339 for(IT i=0; i<n_thiscol; i++) // direct copy
1340 {
1341 IT offset = k*i;
1342 std::copy(tempbuf.begin()+offset, tempbuf.begin()+offset+templen[i], sendbuf.begin() + send_coldisp[i]);
1343 }
1344 }
1345 }
1346 }
1347 MPI_Barrier(commGrid->GetWorld());
1348 std::vector<VT> kthItem(n_thiscol);
1349
1350 int root = commGrid->GetDiagOfProcCol();
1351 if(root==0 && colrank==0) // rank 0
1352 {
1353#ifdef THREADED
1354#pragma omp parallel for
1355#endif
1356 for(IT i=0; i<n_thiscol; i++)
1357 {
1358 IT nitems = send_coldisp[i+1]-send_coldisp[i];
1359 if(nitems >= k)
1360 kthItem[i] = sendbuf[send_coldisp[i]+k-1];
1361 else
1362 kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty or has less than k elements
1363 }
1364 }
1365 else if(root>0 && colrank==0) // send to the diagonl processor of this processor column
1366 {
1367#ifdef THREADED
1368#pragma omp parallel for
1369#endif
1370 for(IT i=0; i<n_thiscol; i++)
1371 {
1372 IT nitems = send_coldisp[i+1]-send_coldisp[i];
1373 if(nitems >= k)
1374 kthItem[i] = sendbuf[send_coldisp[i]+k-1];
1375 else
1376 kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty or has less than k elements
1377 }
1378 MPI_Send(kthItem.data(), n_thiscol, MPIType<VT>(), root, 0, commGrid->GetColWorld());
1379 }
1380 else if(root>0 && colrank==root)
1381 {
1382 MPI_Recv(kthItem.data(), n_thiscol, MPIType<VT>(), 0, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1383 }
1384
1385 std::vector <int> sendcnts;
1386 std::vector <int> dpls;
1387 if(colrank==root)
1388 {
1389 int proccols = commGrid->GetGridCols();
1390 IT n_perproc = n_thiscol / proccols;
1391 sendcnts.resize(proccols);
1392 std::fill(sendcnts.data(), sendcnts.data()+proccols-1, n_perproc);
1393 sendcnts[proccols-1] = n_thiscol - (n_perproc * (proccols-1));
1394 dpls.resize(proccols,0); // displacements (zero initialized pid)
1395 std::partial_sum(sendcnts.data(), sendcnts.data()+proccols-1, dpls.data()+1);
1396 }
1397
1398 int rowroot = commGrid->GetDiagOfProcRow();
1399 int recvcnts = 0;
1400 // scatter received data size
1401 MPI_Scatter(sendcnts.data(),1, MPI_INT, & recvcnts, 1, MPI_INT, rowroot, commGrid->GetRowWorld());
1402
1403 rvec.arr.resize(recvcnts);
1404 MPI_Scatterv(kthItem.data(),sendcnts.data(), dpls.data(), MPIType<VT>(), rvec.arr.data(), rvec.arr.size(), MPIType<VT>(),rowroot, commGrid->GetRowWorld());
1405 rvec.glen = getncol();
1406 return true;
1407}
1408
1409
1410
1411template <class IT, class NT, class DER>
1412template <typename VT, typename GIT, typename _UnaryOperation> // GIT: global index type of vector
1413bool SpParMat<IT,NT,DER>::Kselect1(FullyDistSpVec<GIT,VT> & rvec, IT k, _UnaryOperation __unary_op) const
1414{
1415 int myrank;
1416 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
1417
1418 if(*rvec.commGrid != *commGrid)
1419 {
1420 SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
1421 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1422 }
1423
1424 /*
1425 FullyDistVec<IT, IT> nnzPerColumn (getcommgrid());
1426 Reduce(nnzPerColumn, Column, plus<IT>(), (IT)0, [](NT val){return (IT)1;});
1427 IT maxnnzPerColumn = nnzPerColumn.Reduce(maximum<IT>(), (IT)0);
1428 if(k>maxnnzPerColumn)
1429 {
1430 SpParHelper::Print("Kselect: k is greater then maxNnzInColumn. Calling Reduce instead...\n");
1431 Reduce(rvec, Column, minimum<NT>(), static_cast<NT>(0));
1432 return false;
1433 }
1434 */
1435
1436
1437
1438 IT n_thiscol = getlocalcols(); // length (number of columns) assigned to this processor (and processor column)
1439 MPI_Comm World = rvec.commGrid->GetWorld();
1440 MPI_Comm ColWorld = rvec.commGrid->GetColWorld();
1441 MPI_Comm RowWorld = rvec.commGrid->GetRowWorld();
1442 int colneighs = commGrid->GetGridRows();
1443 int colrank = commGrid->GetRankInProcCol();
1444 int coldiagrank = commGrid->GetDiagOfProcCol();
1445
1446 //double memk = 3 * (double)n_thiscol*k*sizeof(VT)/1000000000;
1447 //double maxmemk =0.0; // nnz in a process column
1448 //MPI_Allreduce(&memk, &maxmemk , 1, MPIType<double>(), MPI_MAX, MPI_COMM_WORLD);
1449
1450 //int myrank;
1451 //MPI_Comm_rank( MPI_COMM_WORLD, &myrank ) ;
1452 //if(myrank==0)
1453// std::cerr << "Actual kselect memory: " << maxmemk << "GB " << " columns " << n_thiscol << " activecol: " << nActiveCols << " \n";
1454 // MPI_Barrier(MPI_COMM_WORLD);
1455
1456 //replicate sparse indices along processor column
1457 int accnz;
1458 int32_t trxlocnz;
1459 GIT lenuntil;
1460 int32_t *trxinds, *activeCols;
1461 VT *trxnums, *numacc=NULL;
1462 TransposeVector(World, rvec, trxlocnz, lenuntil, trxinds, trxnums, true);
1463
1464 if(rvec.commGrid->GetGridRows() > 1)
1465 {
1466 //TODO: we only need to communicate indices
1467 AllGatherVector(ColWorld, trxlocnz, lenuntil, trxinds, trxnums, activeCols, numacc, accnz, true); // trxindS/trxnums deallocated, indacc/numacc allocated, accnz set
1468 }
1469 else
1470 {
1471 accnz = trxlocnz;
1472 activeCols = trxinds; //aliasing ptr
1473 // since indexisvalue is set true in TransposeVector(), trxnums is never allocated
1474 //numacc = trxnums; //aliasing ptr
1475 }
1476
1477 std::vector<bool> isactive(n_thiscol,false);
1478 for(int i=0; i<accnz ; i++)
1479 {
1480 isactive[activeCols[i]] = true;
1481 }
1482 IT nActiveCols = accnz;//count_if(isactive.begin(), isactive.end(), [](bool ac){return ac;});
1483
1484
1485 int64_t lannz = getlocalnnz();
1486 int64_t nnzColWorld=0; // nnz in a process column
1487 MPI_Allreduce(&lannz, &nnzColWorld, 1, MPIType<int64_t>(), MPI_SUM, ColWorld);
1488 int64_t maxPerProcMemory = std::min(nnzColWorld, (int64_t)nActiveCols*k) * sizeof(VT);
1489
1490 // hence we will not overflow for very large k
1491 std::vector<IT> send_coldisp(n_thiscol+1,0);
1492 std::vector<IT> local_coldisp(n_thiscol+1,0);
1493 //vector<VT> sendbuf(nActiveCols*k);
1494 //VT * sendbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1495 //VT * sendbuf = static_cast<VT *> (::operator new (nActiveCols*k*sizeof(VT)));
1496 VT * sendbuf = static_cast<VT *> (::operator new (maxPerProcMemory));
1497
1498 //displacement of local columns
1499 //local_coldisp is the displacement of all nonzeros per column
1500 //send_coldisp is the displacement of k nonzeros per column
1501 IT nzc = 0;
1502 if(spSeq->getnnz()>0)
1503 {
1504 typename DER::SpColIter colit = spSeq->begcol();
1505 for(IT i=0; i<n_thiscol; ++i)
1506 {
1507 local_coldisp[i+1] = local_coldisp[i];
1508 send_coldisp[i+1] = send_coldisp[i];
1509 if( (colit != spSeq->endcol()) && (i==colit.colid()) )
1510 {
1511 if(isactive[i])
1512 {
1513 local_coldisp[i+1] += colit.nnz();
1514 if(colit.nnz()>=k)
1515 send_coldisp[i+1] += k;
1516 else
1517 send_coldisp[i+1] += colit.nnz();
1518 }
1519 colit++;
1520 nzc++;
1521 }
1522 }
1523 }
1524
1525 // a copy of local part of the matrix
1526 // this can be avoided if we write our own local kselect function instead of using partial_sort
1527 //vector<VT> localmat(local_coldisp[n_thiscol]);
1528 VT * localmat = static_cast<VT *> (::operator new (local_coldisp[n_thiscol]*sizeof(VT)));
1529
1530
1531#ifdef THREADED
1532#pragma omp parallel for
1533#endif
1534 for(IT i=0; i<nzc; i++)
1535 //for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1536 {
1537 typename DER::SpColIter colit = spSeq->begcol() + i;
1538 IT colid = colit.colid();
1539 if(isactive[colid])
1540 {
1541 IT idx = local_coldisp[colid];
1542 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
1543 {
1544 localmat[idx++] = static_cast<VT>(__unary_op(nzit.value()));
1545 }
1546
1547 if(colit.nnz()<=k)
1548 {
1549 sort(localmat+local_coldisp[colid], localmat+local_coldisp[colid+1], std::greater<VT>());
1550 std::copy(localmat+local_coldisp[colid], localmat+local_coldisp[colid+1], sendbuf+send_coldisp[colid]);
1551 }
1552 else
1553 {
1554 partial_sort(localmat+local_coldisp[colid], localmat+local_coldisp[colid]+k, localmat+local_coldisp[colid+1], std::greater<VT>());
1555 std::copy(localmat+local_coldisp[colid], localmat+local_coldisp[colid]+k, sendbuf+send_coldisp[colid]);
1556 }
1557 }
1558 }
1559
1560 //vector<VT>().swap(localmat);
1561 ::operator delete(localmat);
1562 std::vector<IT>().swap(local_coldisp);
1563
1564 //VT * recvbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1565 //VT * tempbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1566
1567 //VT * recvbuf = static_cast<VT *> (::operator new ( nActiveCols*k*sizeof(VT)));
1568 //VT * tempbuf = static_cast<VT *> (::operator new ( nActiveCols*k*sizeof(VT)));
1569
1570
1571 VT * recvbuf = static_cast<VT *> (::operator new (maxPerProcMemory));
1572 VT * tempbuf = static_cast<VT *> (::operator new (maxPerProcMemory));
1573 //vector<VT> recvbuf(n_thiscol*k);
1574 //vector<VT> tempbuf(n_thiscol*k);
1575 std::vector<IT> recv_coldisp(n_thiscol+1);
1576 std::vector<IT> temp_coldisp(n_thiscol+1);
1577 //std::vector<IT> templen(n_thiscol);
1578
1579 // Put a barrier and then print sth
1580
1581 for(int p=2; p <= colneighs; p*=2)
1582 {
1583
1584 if(colrank%p == p/2) // this processor is a sender in this round
1585 {
1586 int receiver = colrank - ceil(p/2);
1587 MPI_Send(send_coldisp.data(), n_thiscol+1, MPIType<IT>(), receiver, 0, commGrid->GetColWorld());
1588 MPI_Send(sendbuf, send_coldisp[n_thiscol], MPIType<VT>(), receiver, 1, commGrid->GetColWorld());
1589 //break;
1590 }
1591 else if(colrank%p == 0) // this processor is a receiver in this round
1592 {
1593 int sender = colrank + ceil(p/2);
1594 if(sender < colneighs)
1595 {
1596
1597 MPI_Recv(recv_coldisp.data(), n_thiscol+1, MPIType<IT>(), sender, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1598 MPI_Recv(recvbuf, recv_coldisp[n_thiscol], MPIType<VT>(), sender, 1, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1599
1600 temp_coldisp[0] = 0;
1601 for(IT i=0; i<n_thiscol; ++i)
1602 {
1603 IT sendlen = send_coldisp[i+1] - send_coldisp[i];
1604 IT recvlen = recv_coldisp[i+1] - recv_coldisp[i];
1605 IT templen = std::min((sendlen+recvlen), k);
1606 temp_coldisp[i+1] = temp_coldisp[i] + templen;
1607 }
1608
1609
1610#ifdef THREADED
1611#pragma omp parallel for
1612#endif
1613 for(IT i=0; i<n_thiscol; ++i)
1614 {
1615 // partial merge until first k elements
1616 IT j=send_coldisp[i], l=recv_coldisp[i];
1617 //IT templen[i] = k*i;
1618 //IT offset = k*i;
1619 IT offset = temp_coldisp[i];
1620 IT lid = 0;
1621 for(; j<send_coldisp[i+1] && l<recv_coldisp[i+1] && lid<k;)
1622 {
1623 if(sendbuf[j] > recvbuf[l]) // decision
1624 tempbuf[offset+lid++] = sendbuf[j++];
1625 else
1626 tempbuf[offset+lid++] = recvbuf[l++];
1627 }
1628 while(j<send_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = sendbuf[j++];
1629 while(l<recv_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = recvbuf[l++];
1630 //templen[i] = lid;
1631 }
1632
1633 std::copy(temp_coldisp.begin(), temp_coldisp.end(), send_coldisp.begin());
1634 std::copy(tempbuf, tempbuf+temp_coldisp[n_thiscol], sendbuf);
1635
1636 /*
1637 send_coldisp[0] = 0;
1638 for(IT i=0; i<n_thiscol; i++)
1639 {
1640 send_coldisp[i+1] = send_coldisp[i] + templen[i];
1641 assert(send_coldisp[i+1] == temp_coldisp[i+1]);
1642 }
1643
1644
1645#ifdef THREADED
1646#pragma omp parallel for
1647#endif
1648 for(IT i=0; i<n_thiscol; i++) // direct copy
1649 {
1650 //IT offset = k*i;
1651 IT offset = temp_coldisp[i];
1652 std::copy(tempbuf+offset, tempbuf+offset+templen[i], sendbuf + send_coldisp[i]);
1653 }
1654
1655 */
1656 }
1657 }
1658 }
1659 MPI_Barrier(commGrid->GetWorld());
1660 // Print sth here as well
1661
1662 /*--------------------------------------------------------
1663 At this point, top k elements in every active column
1664 are gathered on the first processor row, P(0,:).
1665
1666 Next step: At P(0,i) find the kth largest element in
1667 active columns belonging to P(0,i).
1668 If nnz in a column is less than k, keep the largest nonzero.
1669 If a column is empty, keep the lowest numeric value.
1670 --------------------------------------------------------*/
1671
1672 std::vector<VT> kthItem(nActiveCols); // kth elements of local active columns
1673 if(colrank==0)
1674 {
1675#ifdef THREADED
1676#pragma omp parallel for
1677#endif
1678 for(IT i=0; i<nActiveCols; i++)
1679 {
1680 IT ai = activeCols[i]; // active column index
1681 IT nitems = send_coldisp[ai+1]-send_coldisp[ai];
1682 if(nitems >= k)
1683 kthItem[i] = sendbuf[send_coldisp[ai]+k-1];
1684 else if (nitems==0)
1685 kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty
1686 else
1687 kthItem[i] = sendbuf[send_coldisp[ai+1]-1]; // returning the last entry if nnz in this column is less than k
1688
1689 }
1690 }
1691
1692 /*--------------------------------------------------------
1693 At this point, kth largest elements in every active column
1694 are gathered on the first processor row, P(0,:).
1695
1696 Next step: Send the kth largest elements from P(0,i) to P(i,i)
1697 Nothing to do for P(0,0)
1698 --------------------------------------------------------*/
1699 if(coldiagrank>0 && colrank==0)
1700 {
1701 MPI_Send(kthItem.data(), nActiveCols, MPIType<VT>(), coldiagrank, 0, commGrid->GetColWorld());
1702 }
1703 else if(coldiagrank>0 && colrank==coldiagrank) // receive in the diagonal processor
1704 {
1705 MPI_Recv(kthItem.data(), nActiveCols, MPIType<VT>(), 0, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1706 }
1707
1708 // Put a barrier and print sth
1709
1710 /*--------------------------------------------------------
1711 At this point, kth largest elements in every active column
1712 are gathered on the diagonal processors P(i,i).
1713
1714 Next step: Scatter the kth largest elements from P(i,i)
1715 to all processors in the ith row, P(i,:).
1716 Each processor recevies exactly local nnz of rvec entries
1717 so that the received data can be directly put in rvec.
1718 --------------------------------------------------------*/
1719 int rowroot = commGrid->GetDiagOfProcRow();
1720 int proccols = commGrid->GetGridCols();
1721 std::vector <int> sendcnts(proccols,0);
1722 std::vector <int> dpls(proccols,0);
1723 int lsize = rvec.ind.size();
1724 // local sizes of the input vecotor will be sent from the doagonal processor
1725 MPI_Gather(&lsize,1, MPI_INT, sendcnts.data(), 1, MPI_INT, rowroot, RowWorld);
1726 std::partial_sum(sendcnts.data(), sendcnts.data()+proccols-1, dpls.data()+1);
1727 MPI_Scatterv(kthItem.data(),sendcnts.data(), dpls.data(), MPIType<VT>(), rvec.num.data(), rvec.num.size(), MPIType<VT>(),rowroot, RowWorld);
1728
1729 delete [] activeCols;
1730 delete [] numacc;
1731
1732 ::operator delete(sendbuf);
1733 ::operator delete(recvbuf);
1734 ::operator delete(tempbuf);
1735 //delete [] activeCols;
1736 //delete [] numacc;
1737
1738 return true;
1739}
1740
1741// only defined for symmetric matrix
1742template <class IT, class NT, class DER>
1743IT SpParMat<IT,NT,DER>::Bandwidth() const
1744{
1745 IT upperlBW = -1;
1746 IT lowerlBW = -1;
1747 IT m_perproc = getnrow() / commGrid->GetGridRows();
1748 IT n_perproc = getncol() / commGrid->GetGridCols();
1749 IT moffset = commGrid->GetRankInProcCol() * m_perproc;
1750 IT noffset = commGrid->GetRankInProcRow() * n_perproc;
1751
1752 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1753 {
1754 IT diagrow = colit.colid() + noffset;
1755 typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1756 if(nzit != spSeq->endnz(colit)) // nonempty column
1757 {
1758 IT firstrow = nzit.rowid() + moffset;
1759 IT lastrow = (nzit+ colit.nnz()-1).rowid() + moffset;
1760
1761 if(firstrow <= diagrow) // upper diagonal
1762 {
1763 IT dev = diagrow - firstrow;
1764 if(upperlBW < dev) upperlBW = dev;
1765 }
1766 if(lastrow >= diagrow) // lower diagonal
1767 {
1768 IT dev = lastrow - diagrow;
1769 if(lowerlBW < dev) lowerlBW = dev;
1770 }
1771 }
1772 }
1773 IT upperBW;
1774 //IT lowerBW;
1775 MPI_Allreduce( &upperlBW, &upperBW, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
1776 //MPI_Allreduce( &lowerlBW, &lowerBW, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
1777
1778 //return (upperBW + lowerBW + 1);
1779 return (upperBW);
1780}
1781
1782
1783
1784// only defined for symmetric matrix
1785template <class IT, class NT, class DER>
1786IT SpParMat<IT,NT,DER>::Profile() const
1787{
1788 int colrank = commGrid->GetRankInProcRow();
1789 IT cols = getncol();
1790 IT rows = getnrow();
1791 IT m_perproc = cols / commGrid->GetGridRows();
1792 IT n_perproc = rows / commGrid->GetGridCols();
1793 IT moffset = commGrid->GetRankInProcCol() * m_perproc;
1794 IT noffset = colrank * n_perproc;
1795
1796
1797 int pc = commGrid->GetGridCols();
1798 IT n_thisproc;
1799 if(colrank!=pc-1 ) n_thisproc = n_perproc;
1800 else n_thisproc = cols - (pc-1)*n_perproc;
1801
1802
1803 std::vector<IT> firstRowInCol(n_thisproc,getnrow());
1804 std::vector<IT> lastRowInCol(n_thisproc,-1);
1805
1806 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1807 {
1808 IT diagrow = colit.colid() + noffset;
1809 typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1810 if(nzit != spSeq->endnz(colit)) // nonempty column
1811 {
1812 IT firstrow = nzit.rowid() + moffset;
1813 IT lastrow = (nzit+ colit.nnz()-1).rowid() + moffset;
1814 if(firstrow <= diagrow) // upper diagonal
1815 {
1816 firstRowInCol[colit.colid()] = firstrow;
1817 }
1818 if(lastrow >= diagrow) // lower diagonal
1819 {
1820 lastRowInCol[colit.colid()] = lastrow;
1821 }
1822 }
1823 }
1824
1825 std::vector<IT> firstRowInCol_global(n_thisproc,getnrow());
1826 //vector<IT> lastRowInCol_global(n_thisproc,-1);
1827 MPI_Allreduce( firstRowInCol.data(), firstRowInCol_global.data(), n_thisproc, MPIType<IT>(), MPI_MIN, commGrid->colWorld);
1828 //MPI_Allreduce( lastRowInCol.data(), lastRowInCol_global.data(), n_thisproc, MPIType<IT>(), MPI_MAX, commGrid->GetColWorld());
1829
1830 IT profile = 0;
1831 for(IT i=0; i<n_thisproc; i++)
1832 {
1833 if(firstRowInCol_global[i]==rows) // empty column
1834 profile++;
1835 else
1836 profile += (i + noffset - firstRowInCol_global[i]);
1837 }
1838
1839 IT profile_global = 0;
1840 MPI_Allreduce( &profile, &profile_global, 1, MPIType<IT>(), MPI_SUM, commGrid->rowWorld);
1841
1842 return (profile_global);
1843}
1844
1845
1846
1847template <class IT, class NT, class DER>
1848template <typename VT, typename GIT, typename _BinaryOperation>
1849void SpParMat<IT,NT,DER>::MaskedReduce(FullyDistVec<GIT,VT> & rvec, FullyDistSpVec<GIT,VT> & mask, Dim dim, _BinaryOperation __binary_op, VT id, bool exclude) const
1850{
1851 if (dim!=Column)
1852 {
1853 SpParHelper::Print("SpParMat::MaskedReduce() is only implemented for Colum\n");
1854 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1855 }
1856 MaskedReduce(rvec, mask, dim, __binary_op, id, myidentity<NT>(), exclude);
1857}
1858
1868template <class IT, class NT, class DER>
1869template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation> // GIT: global index type of vector
1870void SpParMat<IT,NT,DER>::MaskedReduce(FullyDistVec<GIT,VT> & rvec, FullyDistSpVec<GIT,VT> & mask, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op, bool exclude) const
1871{
1872 MPI_Comm World = commGrid->GetWorld();
1873 MPI_Comm ColWorld = commGrid->GetColWorld();
1874 MPI_Comm RowWorld = commGrid->GetRowWorld();
1875
1876 if (dim!=Column)
1877 {
1878 SpParHelper::Print("SpParMat::MaskedReduce() is only implemented for Colum\n");
1879 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1880 }
1881 if(*rvec.commGrid != *commGrid)
1882 {
1883 SpParHelper::Print("Grids are not comparable, SpParMat::MaskedReduce() fails!", commGrid->GetWorld());
1884 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1885 }
1886
1887 int rowneighs = commGrid->GetGridCols();
1888 int rowrank = commGrid->GetRankInProcRow();
1889 std::vector<int> rownz(rowneighs);
1890 int locnnzMask = static_cast<int> (mask.getlocnnz());
1891 rownz[rowrank] = locnnzMask;
1892 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, rownz.data(), 1, MPI_INT, RowWorld);
1893 std::vector<int> dpls(rowneighs+1,0);
1894 std::partial_sum(rownz.begin(), rownz.end(), dpls.data()+1);
1895 int accnz = std::accumulate(rownz.begin(), rownz.end(), 0);
1896 std::vector<GIT> sendInd(locnnzMask);
1897 std::transform(mask.ind.begin(), mask.ind.end(),sendInd.begin(), bind2nd(std::plus<GIT>(), mask.RowLenUntil()));
1898
1899 std::vector<GIT> indMask(accnz);
1900 MPI_Allgatherv(sendInd.data(), rownz[rowrank], MPIType<GIT>(), indMask.data(), rownz.data(), dpls.data(), MPIType<GIT>(), RowWorld);
1901
1902
1903 // We can't use rvec's distribution (rows first, columns later) here
1904 IT n_thiscol = getlocalcols(); // length assigned to this processor column
1905 int colneighs = commGrid->GetGridRows(); // including oneself
1906 int colrank = commGrid->GetRankInProcCol();
1907
1908 GIT * loclens = new GIT[colneighs];
1909 GIT * lensums = new GIT[colneighs+1](); // begin/end points of local lengths
1910
1911 GIT n_perproc = n_thiscol / colneighs; // length on a typical processor
1912 if(colrank == colneighs-1)
1913 loclens[colrank] = n_thiscol - (n_perproc*colrank);
1914 else
1915 loclens[colrank] = n_perproc;
1916
1917 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetColWorld());
1918 std::partial_sum(loclens, loclens+colneighs, lensums+1); // loclens and lensums are different, but both would fit in 32-bits
1919
1920 std::vector<VT> trarr;
1921 typename DER::SpColIter colit = spSeq->begcol();
1922 for(int i=0; i< colneighs; ++i)
1923 {
1924 VT * sendbuf = new VT[loclens[i]];
1925 std::fill(sendbuf, sendbuf+loclens[i], id); // fill with identity
1926
1927 for(; colit != spSeq->endcol() && colit.colid() < lensums[i+1]; ++colit) // iterate over a portion of columns
1928 {
1929 int k=0;
1930 typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1931
1932 for(; nzit != spSeq->endnz(colit) && k < indMask.size(); ) // all nonzeros in this column
1933 {
1934 if(nzit.rowid() < indMask[k])
1935 {
1936 if(exclude)
1937 {
1938 sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1939 }
1940 ++nzit;
1941 }
1942 else if(nzit.rowid() > indMask[k]) ++k;
1943 else
1944 {
1945 if(!exclude)
1946 {
1947 sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1948 }
1949 ++k;
1950 ++nzit;
1951 }
1952
1953 }
1954 if(exclude)
1955 {
1956 while(nzit != spSeq->endnz(colit))
1957 {
1958 sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1959 ++nzit;
1960 }
1961 }
1962 }
1963
1964 VT * recvbuf = NULL;
1965 if(colrank == i)
1966 {
1967 trarr.resize(loclens[i]);
1968 recvbuf = SpHelper::p2a(trarr);
1969 }
1970 MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), MPIOp<_BinaryOperation, VT>::op(), i, commGrid->GetColWorld()); // root = i
1971 delete [] sendbuf;
1972 }
1973 DeleteAll(loclens, lensums);
1974
1975 GIT reallen; // Now we have to transpose the vector
1976 GIT trlen = trarr.size();
1977 int diagneigh = commGrid->GetComplementRank();
1978 MPI_Status status;
1979 MPI_Sendrecv(&trlen, 1, MPIType<IT>(), diagneigh, TRNNZ, &reallen, 1, MPIType<IT>(), diagneigh, TRNNZ, commGrid->GetWorld(), &status);
1980
1981 rvec.arr.resize(reallen);
1982 MPI_Sendrecv(SpHelper::p2a(trarr), trlen, MPIType<VT>(), diagneigh, TRX, SpHelper::p2a(rvec.arr), reallen, MPIType<VT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
1983 rvec.glen = getncol(); // ABAB: Put a sanity check here
1984
1985}
1986
1987
1988
1989
1990template <class IT, class NT, class DER>
1991template <typename NNT,typename NDER>
1992SpParMat<IT,NT,DER>::operator SpParMat<IT,NNT,NDER> () const
1993{
1994 NDER * convert = new NDER(*spSeq);
1995 return SpParMat<IT,NNT,NDER> (convert, commGrid);
1996}
1997
1999template <class IT, class NT, class DER>
2000template <typename NIT, typename NNT,typename NDER>
2001SpParMat<IT,NT,DER>::operator SpParMat<NIT,NNT,NDER> () const
2002{
2003 NDER * convert = new NDER(*spSeq);
2004 return SpParMat<NIT,NNT,NDER> (convert, commGrid);
2005}
2006
2011template <class IT, class NT, class DER>
2012SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::SubsRefCol (const std::vector<IT> & ci) const
2013{
2014 std::vector<IT> ri;
2015 DER * tempseq = new DER((*spSeq)(ri, ci));
2016 return SpParMat<IT,NT,DER> (tempseq, commGrid);
2017}
2018
2026template <class IT, class NT, class DER>
2027template <typename PTNTBOOL, typename PTBOOLNT>
2028SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::SubsRef_SR (const FullyDistVec<IT,IT> & ri, const FullyDistVec<IT,IT> & ci, bool inplace)
2029{
2030 typedef typename DER::LocalIT LIT;
2031
2032 // infer the concrete type SpMat<LIT,LIT>
2033 typedef typename create_trait<DER, LIT, bool>::T_inferred DER_IT;
2034
2035 if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
2036 {
2037 SpParHelper::Print("Grids are not comparable, SpRef fails !");
2038 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2039 }
2040
2041 // Safety check
2042 IT locmax_ri = 0;
2043 IT locmax_ci = 0;
2044 if(!ri.arr.empty())
2045 locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
2046 if(!ci.arr.empty())
2047 locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
2048
2049 IT totalm = getnrow();
2050 IT totaln = getncol();
2051 if(locmax_ri > totalm || locmax_ci > totaln)
2052 {
2053 throw outofrangeexception();
2054 }
2055
2056 // The indices for FullyDistVec are offset'd to 1/p pieces
2057 // The matrix indices are offset'd to 1/sqrt(p) pieces
2058 // Add the corresponding offset before sending the data
2059 IT roffset = ri.RowLenUntil();
2060 IT rrowlen = ri.MyRowLength();
2061 IT coffset = ci.RowLenUntil();
2062 IT crowlen = ci.MyRowLength();
2063
2064 // We create two boolean matrices P and Q
2065 // Dimensions: P is size(ri) x m
2066 // Q is n x size(ci)
2067 // Range(ri) = {0,...,m-1}
2068 // Range(ci) = {0,...,n-1}
2069
2070 IT rowneighs = commGrid->GetGridCols(); // number of neighbors along this processor row (including oneself)
2071 IT m_perproccol = totalm / rowneighs;
2072 IT n_perproccol = totaln / rowneighs;
2073
2074 // Get the right local dimensions
2075 IT diagneigh = commGrid->GetComplementRank();
2076 IT mylocalrows = getlocalrows();
2077 IT mylocalcols = getlocalcols();
2078 IT trlocalrows;
2079 MPI_Status status;
2080 MPI_Sendrecv(&mylocalrows, 1, MPIType<IT>(), diagneigh, TRROWX, &trlocalrows, 1, MPIType<IT>(), diagneigh, TRROWX, commGrid->GetWorld(), &status);
2081 // we don't need trlocalcols because Q.Transpose() will take care of it
2082
2083 std::vector< std::vector<IT> > rowid(rowneighs); // reuse for P and Q
2084 std::vector< std::vector<IT> > colid(rowneighs);
2085
2086 // Step 1: Create P
2087 IT locvec = ri.arr.size(); // nnz in local vector
2088 for(typename std::vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
2089 {
2090 // numerical values (permutation indices) are 0-based
2091 // recipient alone progessor row
2092 IT rowrec = (m_perproccol!=0) ? std::min(ri.arr[i] / m_perproccol, rowneighs-1) : (rowneighs-1);
2093
2094 // ri's numerical values give the colids and its local indices give rowids
2095 rowid[rowrec].push_back( i + roffset);
2096 colid[rowrec].push_back(ri.arr[i] - (rowrec * m_perproccol));
2097 }
2098
2099 int * sendcnt = new int[rowneighs]; // reuse in Q as well
2100 int * recvcnt = new int[rowneighs];
2101 for(IT i=0; i<rowneighs; ++i)
2102 sendcnt[i] = rowid[i].size();
2103
2104 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld()); // share the counts
2105 int * sdispls = new int[rowneighs]();
2106 int * rdispls = new int[rowneighs]();
2107 std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
2108 std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
2109 IT p_nnz = std::accumulate(recvcnt,recvcnt+rowneighs, static_cast<IT>(0));
2110
2111 // create space for incoming data ...
2112 IT * p_rows = new IT[p_nnz];
2113 IT * p_cols = new IT[p_nnz];
2114 IT * senddata = new IT[locvec]; // re-used for both rows and columns
2115 for(int i=0; i<rowneighs; ++i)
2116 {
2117 std::copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
2118 std::vector<IT>().swap(rowid[i]); // clear memory of rowid
2119 }
2120 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2121
2122 for(int i=0; i<rowneighs; ++i)
2123 {
2124 std::copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
2125 std::vector<IT>().swap(colid[i]); // clear memory of colid
2126 }
2127 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2128 delete [] senddata;
2129
2130 std::tuple<LIT,LIT,bool> * p_tuples = new std::tuple<LIT,LIT,bool>[p_nnz];
2131 for(IT i=0; i< p_nnz; ++i)
2132 {
2133 p_tuples[i] = std::make_tuple(p_rows[i], p_cols[i], 1); // here we can convert to local indices
2134 }
2135 DeleteAll(p_rows, p_cols);
2136
2137 DER_IT * PSeq = new DER_IT();
2138 PSeq->Create( p_nnz, rrowlen, trlocalrows, p_tuples); // deletion of tuples[] is handled by SpMat::Create
2139
2140 SpParMat<IT,NT,DER> PA(commGrid);
2141 if(&ri == &ci) // Symmetric permutation
2142 {
2143 DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
2144 #ifdef SPREFDEBUG
2145 SpParHelper::Print("Symmetric permutation\n", commGrid->GetWorld());
2146 #endif
2147 SpParMat<IT,bool,DER_IT> P (PSeq, commGrid);
2148 if(inplace)
2149 {
2150 #ifdef SPREFDEBUG
2151 SpParHelper::Print("In place multiplication\n", commGrid->GetWorld());
2152 #endif
2153 *this = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *this, false, true); // clear the memory of *this
2154
2155 //ostringstream outb;
2156 //outb << "P_after_" << commGrid->myrank;
2157 //ofstream ofb(outb.str().c_str());
2158 //P.put(ofb);
2159
2160 P.Transpose();
2161 *this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(*this, P, true, true); // clear the memory of both *this and P
2162 return SpParMat<IT,NT,DER>(commGrid); // dummy return to match signature
2163 }
2164 else
2165 {
2166 PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P,*this);
2167 P.Transpose();
2168 return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, P);
2169 }
2170 }
2171 else
2172 {
2173 // Intermediate step (to save memory): Form PA and store it in P
2174 // Distributed matrix generation (collective call)
2175 SpParMat<IT,bool,DER_IT> P (PSeq, commGrid);
2176
2177 // Do parallel matrix-matrix multiply
2178 PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *this);
2179 } // P is destructed here
2180#ifndef NDEBUG
2181 PA.PrintInfo();
2182#endif
2183 // Step 2: Create Q (use the same row-wise communication and transpose at the end)
2184 // This temporary to-be-transposed Q is size(ci) x n
2185 locvec = ci.arr.size(); // nnz in local vector (reset variable)
2186 for(typename std::vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
2187 {
2188 // numerical values (permutation indices) are 0-based
2189 IT rowrec = (n_perproccol!=0) ? std::min(ci.arr[i] / n_perproccol, rowneighs-1) : (rowneighs-1);
2190
2191 // ri's numerical values give the colids and its local indices give rowids
2192 rowid[rowrec].push_back( i + coffset);
2193 colid[rowrec].push_back(ci.arr[i] - (rowrec * n_perproccol));
2194 }
2195
2196 for(IT i=0; i<rowneighs; ++i)
2197 sendcnt[i] = rowid[i].size(); // update with new sizes
2198
2199 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld()); // share the counts
2200 std::fill(sdispls, sdispls+rowneighs, 0); // reset
2201 std::fill(rdispls, rdispls+rowneighs, 0);
2202 std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
2203 std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
2204 IT q_nnz = std::accumulate(recvcnt,recvcnt+rowneighs, static_cast<IT>(0));
2205
2206 // create space for incoming data ...
2207 IT * q_rows = new IT[q_nnz];
2208 IT * q_cols = new IT[q_nnz];
2209 senddata = new IT[locvec];
2210 for(int i=0; i<rowneighs; ++i)
2211 {
2212 std::copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
2213 std::vector<IT>().swap(rowid[i]); // clear memory of rowid
2214 }
2215 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2216
2217 for(int i=0; i<rowneighs; ++i)
2218 {
2219 std::copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
2220 std::vector<IT>().swap(colid[i]); // clear memory of colid
2221 }
2222 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2223 DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
2224
2225 std::tuple<LIT,LIT,bool> * q_tuples = new std::tuple<LIT,LIT,bool>[q_nnz]; // here we can convert to local indices (2018 note by Aydin)
2226 for(IT i=0; i< q_nnz; ++i)
2227 {
2228 q_tuples[i] = std::make_tuple(q_rows[i], q_cols[i], 1);
2229 }
2230 DeleteAll(q_rows, q_cols);
2231 DER_IT * QSeq = new DER_IT();
2232 QSeq->Create( q_nnz, crowlen, mylocalcols, q_tuples); // Creating Q' instead
2233
2234 // Step 3: Form PAQ
2235 // Distributed matrix generation (collective call)
2236 SpParMat<IT,bool,DER_IT> Q (QSeq, commGrid);
2237 Q.Transpose();
2238 if(inplace)
2239 {
2240 *this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q, true, true); // clear the memory of both PA and P
2241 return SpParMat<IT,NT,DER>(commGrid); // dummy return to match signature
2242 }
2243 else
2244 {
2245 return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q);
2246 }
2247}
2248
2249
2250
2251template<class IT,
2252 class NT,
2253 class DER>
2254template<typename PTNTBOOL,
2255 typename PTBOOLNT>
2256SpParMat<IT, NT, DER>
2257SpParMat<IT, NT, DER>::SubsRef_SR (
2258 const FullyDistVec<IT, IT> &v,
2259 Dim dim,
2260 bool inplace
2261 )
2262{
2263 typedef typename DER::LocalIT LIT;
2264 typedef typename create_trait<DER, LIT, bool>::T_inferred DER_IT;
2265
2266 if (*(v.commGrid) != *commGrid)
2267 {
2268 SpParHelper::Print("Grids are not comparable, SpRef fails!");
2269 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2270 }
2271
2272 IT locmax = 0;
2273 if (!v.arr.empty())
2274 locmax = *std::max_element(v.arr.begin(), v.arr.end());
2275
2276 IT offset = v.RowLenUntil();
2277 IT rowlen = v.MyRowLength();
2278 IT totalm = getnrow();
2279 IT totaln = getncol();
2280 IT rowneighs = commGrid->GetGridCols();
2281 IT perproccol = -1;
2282 IT dimy = -1;
2283 IT diagneigh, tmp;
2284
2285 switch(dim)
2286 {
2287 case Row:
2288 if (locmax > totalm)
2289 throw outofrangeexception();
2290
2291 perproccol = totalm / rowneighs;
2292
2293 diagneigh = commGrid->GetComplementRank();
2294 MPI_Status status;
2295 tmp = getlocalrows();
2296 MPI_Sendrecv(&tmp, 1, MPIType<IT>(), diagneigh, TRROWX,
2297 &dimy, 1, MPIType<IT>(), diagneigh, TRROWX,
2298 commGrid->GetWorld(), &status);
2299
2300 break;
2301
2302
2303 case Column:
2304 if (locmax > totaln)
2305 throw outofrangeexception();
2306
2307 perproccol = totaln / rowneighs;
2308 dimy = getlocalcols();
2309
2310 break;
2311
2312
2313 default:
2314 break;
2315 }
2316
2317
2318 // find owner processes and fill in the vectors
2319 std::vector<std::vector<IT>> rowid(rowneighs);
2320 std::vector<std::vector<IT>> colid(rowneighs);
2321 IT locvec = v.arr.size();
2322 for(typename std::vector<IT>::size_type i = 0; i < (unsigned)locvec; ++i)
2323 {
2324 IT rowrec = (perproccol != 0)
2325 ? std::min(v.arr[i] / perproccol, rowneighs - 1)
2326 : (rowneighs - 1);
2327
2328 rowid[rowrec].push_back(i + offset);
2329 colid[rowrec].push_back(v.arr[i] - (rowrec * perproccol));
2330 }
2331
2332
2333 // exchange data
2334 int *sendcnt = new int[rowneighs];
2335 int *recvcnt = new int[rowneighs];
2336 for (IT i = 0; i < rowneighs; ++i)
2337 sendcnt[i] = rowid[i].size();
2338
2339 MPI_Alltoall(sendcnt, 1, MPI_INT,
2340 recvcnt, 1, MPI_INT,
2341 commGrid->GetRowWorld());
2342
2343 int *sdispls = new int[rowneighs]();
2344 int *rdispls = new int[rowneighs]();
2345 std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
2346 std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
2347 IT v_nnz = std::accumulate(recvcnt, recvcnt+rowneighs, static_cast<IT>(0));
2348
2349 IT *v_rows = new IT[v_nnz];
2350 IT *v_cols = new IT[v_nnz];
2351 IT *senddata = new IT[locvec];
2352
2353 for(int i = 0; i < rowneighs; ++i)
2354 {
2355 std::copy(rowid[i].begin(), rowid[i].end(), senddata + sdispls[i]);
2356 std::vector<IT>().swap(rowid[i]); // free memory
2357 }
2358
2359 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(),
2360 v_rows, recvcnt, rdispls, MPIType<IT>(),
2361 commGrid->GetRowWorld());
2362
2363 for(int i = 0; i < rowneighs; ++i)
2364 {
2365 std::copy(colid[i].begin(), colid[i].end(), senddata + sdispls[i]);
2366 std::vector<IT>().swap(colid[i]); // free memory
2367 }
2368
2369 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(),
2370 v_cols, recvcnt, rdispls, MPIType<IT>(),
2371 commGrid->GetRowWorld());
2372
2373 delete [] senddata;
2374
2375
2376 // form tuples and create the permutation matrix
2377 std::tuple<LIT, LIT, bool> *v_tuples =
2378 new std::tuple<LIT, LIT, bool>[v_nnz];
2379 for(IT i = 0; i < v_nnz; ++i)
2380 v_tuples[i] = std::make_tuple(v_rows[i], v_cols[i], 1);
2381
2382 DeleteAll(v_rows, v_cols);
2383
2384 DER_IT *vseq = new DER_IT();
2385 vseq->Create(v_nnz, rowlen, dimy, v_tuples);
2386 SpParMat<IT, bool, DER_IT> V(vseq, commGrid); // permutation matrix
2387
2388
2389 // generate the final matrix
2390 switch(dim)
2391 {
2392 case Row:
2393 if (inplace)
2394 {
2395 *this = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(V, *this, true, true);
2396 return SpParMat<IT, NT, DER>(commGrid); // dummy
2397 }
2398 else
2399 return Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(V, *this);
2400
2401 break;
2402
2403
2404 case Column:
2405 V.Transpose();
2406 if (inplace)
2407 {
2408 *this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(*this, V, true, true);
2409 return SpParMat<IT, NT, DER>(commGrid); // dummy
2410 }
2411 else
2412 return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(*this, V);
2413
2414
2415 default:
2416 break;
2417 }
2418
2419
2420 // should not reach at this point
2421 return SpParMat<IT, NT, DER>(commGrid); // dummy
2422}
2423
2424
2425
2426template <class IT, class NT, class DER>
2427void SpParMat<IT,NT,DER>::SpAsgn(const FullyDistVec<IT,IT> & ri, const FullyDistVec<IT,IT> & ci, SpParMat<IT,NT,DER> & B)
2428{
2429 typedef PlusTimesSRing<NT, NT> PTRing;
2430
2431 if((*(ri.commGrid) != *(B.commGrid)) || (*(ci.commGrid) != *(B.commGrid)))
2432 {
2433 SpParHelper::Print("Grids are not comparable, SpAsgn fails !", commGrid->GetWorld());
2434 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2435 }
2436 IT total_m_A = getnrow();
2437 IT total_n_A = getncol();
2438 IT total_m_B = B.getnrow();
2439 IT total_n_B = B.getncol();
2440
2441 if(total_m_B != ri.TotalLength())
2442 {
2443 SpParHelper::Print("First dimension of B does NOT match the length of ri, SpAsgn fails !", commGrid->GetWorld());
2444 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2445 }
2446 if(total_n_B != ci.TotalLength())
2447 {
2448 SpParHelper::Print("Second dimension of B does NOT match the length of ci, SpAsgn fails !", commGrid->GetWorld());
2449 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2450 }
2451 Prune(ri, ci); // make a hole
2452
2453 // embed B to the size of A
2454 FullyDistVec<IT,IT> * rvec = new FullyDistVec<IT,IT>(ri.commGrid);
2455 rvec->iota(total_m_B, 0); // sparse() expects a zero based index
2456
2457 SpParMat<IT,NT,DER> R(total_m_A, total_m_B, ri, *rvec, 1);
2458 delete rvec; // free memory
2459 SpParMat<IT,NT,DER> RB = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(R, B, true, false); // clear memory of R but not B
2460
2461 FullyDistVec<IT,IT> * qvec = new FullyDistVec<IT,IT>(ri.commGrid);
2462 qvec->iota(total_n_B, 0);
2463 SpParMat<IT,NT,DER> Q(total_n_B, total_n_A, *qvec, ci, 1);
2464 delete qvec; // free memory
2465 SpParMat<IT,NT,DER> RBQ = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(RB, Q, true, true); // clear memory of RB and Q
2466 *this += RBQ; // extend-add
2467}
2468
2469// this only prunes the submatrix A[ri,ci] in matlab notation
2470// or if the input is an adjacency matrix and ri=ci, it removes the connections
2471// between the subgraph induced by ri
2472// if you need to remove all contents of rows in ri and columns in ci:
2473// then call PruneFull
2474template <class IT, class NT, class DER>
2475void SpParMat<IT,NT,DER>::Prune(const FullyDistVec<IT,IT> & ri, const FullyDistVec<IT,IT> & ci)
2476{
2477 if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
2478 {
2479 SpParHelper::Print("Grids are not comparable, Prune fails!\n", commGrid->GetWorld());
2480 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2481 }
2482
2483 // Safety check
2484 IT locmax_ri = 0;
2485 IT locmax_ci = 0;
2486 if(!ri.arr.empty())
2487 locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
2488 if(!ci.arr.empty())
2489 locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
2490
2491 IT total_m = getnrow();
2492 IT total_n = getncol();
2493 if(locmax_ri > total_m || locmax_ci > total_n)
2494 {
2495 throw outofrangeexception();
2496 }
2497
2498 // infer the concrete types to replace the value with bool
2499 typedef typename DER::LocalIT LIT;
2500 typedef typename create_trait<DER, LIT, bool>::T_inferred DER_BOOL;
2501 typedef typename create_trait<DER, LIT, IT>::T_inferred DER_IT;
2502
2503 // create and downcast to boolean because this type of constructor can not be booleand as FullyDist can not be boolean
2504 SpParMat<IT,bool,DER_BOOL> S = SpParMat<IT, IT, DER_IT> (total_m, total_m, ri, ri, 1);
2505 SpParMat<IT,NT,DER> SA = Mult_AnXBn_DoubleBuff< BoolCopy2ndSRing<NT> , NT, DER>(S, *this, true, false); // clear memory of S but not *this
2506
2507 SpParMat<IT,bool,DER_BOOL> T = SpParMat<IT, IT, DER_IT> (total_n, total_n, ci, ci, 1);
2508 SpParMat<IT,NT,DER> SAT = Mult_AnXBn_DoubleBuff< BoolCopy1stSRing<NT> , NT, DER>(SA, T, true, true); // clear memory of SA and T
2509
2510
2511 // the type of the SAT matrix does not matter when calling set difference
2512 // because it just copies the non-excluded values from (*this) matrix, without touching values in SAT
2513 SetDifference(SAT);
2514}
2515
2516
2517// removes all nonzeros of rows in ri and columns in ci
2518// if A is an adjacency matrix and ri=ci,
2519// this prunes *all* (and not just the induced) connections of vertices in ri
2520// effectively rendering the vertices in ri *disconnected*
2521template <class IT, class NT, class DER>
2522void SpParMat<IT,NT,DER>::PruneFull(const FullyDistVec<IT,IT> & ri, const FullyDistVec<IT,IT> & ci)
2523{
2524 if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
2525 {
2526 SpParHelper::Print("Grids are not comparable, Prune fails!\n", commGrid->GetWorld());
2527 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2528 }
2529
2530 // Safety check
2531 IT locmax_ri = 0;
2532 IT locmax_ci = 0;
2533 if(!ri.arr.empty())
2534 locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
2535 if(!ci.arr.empty())
2536 locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
2537
2538 IT total_m = getnrow();
2539 IT total_n = getncol();
2540 if(locmax_ri > total_m || locmax_ci > total_n)
2541 {
2542 throw outofrangeexception();
2543 }
2544
2545 // infer the concrete types to replace the value with bool
2546 typedef typename DER::LocalIT LIT;
2547 typedef typename create_trait<DER, LIT, bool>::T_inferred DER_BOOL;
2548 typedef typename create_trait<DER, LIT, IT>::T_inferred DER_IT;
2549
2550 // create and downcast to boolean because this type of constructor can not be booleand as FullyDist can not be boolean
2551 SpParMat<IT,bool,DER_BOOL> S = SpParMat<IT, IT, DER_IT> (total_m, total_m, ri, ri, 1);
2552 SpParMat<IT,NT,DER> SA = Mult_AnXBn_DoubleBuff< BoolCopy2ndSRing<NT> , NT, DER>(S, *this, true, false); // clear memory of S, but not *this
2553
2554 SpParMat<IT,bool,DER_BOOL> T = SpParMat<IT, IT, DER_IT> (total_n, total_n, ci, ci, 1);
2555 SpParMat<IT,NT,DER> AT = Mult_AnXBn_DoubleBuff< BoolCopy1stSRing<NT> , NT, DER>(*this, T, false, true); // clear memory of T, but not *this
2556
2557 // SA extracted rows of A in ri
2558 // AT extracted columns of A in ci
2559
2560 SetDifference(SA);
2561 SetDifference(AT);
2562}
2563
2565template <class IT, class NT, class DER>
2566template <typename _BinaryOperation>
2567SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::PruneColumn(const FullyDistVec<IT,NT> & pvals, _BinaryOperation __binary_op, bool inPlace)
2568{
2569 int myrank;
2570 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
2571 //MPI_Barrier(MPI_COMM_WORLD);
2572 MPI_Comm World = pvals.commGrid->GetWorld();
2573 MPI_Barrier(World);
2574 if(getncol() != pvals.TotalLength())
2575 {
2576 std::ostringstream outs;
2577 outs << "Can not prune column-by-column, dimensions does not match"<< std::endl;
2578 outs << getncol() << " != " << pvals.TotalLength() << std::endl;
2579 SpParHelper::Print(outs.str());
2580 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2581 }
2582 if(! ( *(getcommgrid()) == *(pvals.getcommgrid())) )
2583 {
2584 std::cout << "Grids are not comparable for PurneColumn" << std::endl;
2585 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2586 }
2587
2588 MPI_Comm ColWorld = pvals.commGrid->GetColWorld();
2589
2590 int xsize = (int) pvals.LocArrSize();
2591 int trxsize = 0;
2592
2593
2594 int diagneigh = pvals.commGrid->GetComplementRank();
2595 MPI_Status status;
2596 MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, World, &status);
2597
2598
2599 NT * trxnums = new NT[trxsize];
2600 MPI_Sendrecv(const_cast<NT*>(SpHelper::p2a(pvals.arr)), xsize, MPIType<NT>(), diagneigh, TRX, trxnums, trxsize, MPIType<NT>(), diagneigh, TRX, World, &status);
2601
2602 int colneighs, colrank;
2603 MPI_Comm_size(ColWorld, &colneighs);
2604 MPI_Comm_rank(ColWorld, &colrank);
2605 int * colsize = new int[colneighs];
2606 colsize[colrank] = trxsize;
2607 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
2608 int * dpls = new int[colneighs](); // displacements (zero initialized pid)
2609 std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
2610 int accsize = std::accumulate(colsize, colsize+colneighs, 0);
2611 std::vector<NT> numacc(accsize);
2612
2613#ifdef COMBBLAS_DEBUG
2614 std::ostringstream outs2;
2615 outs2 << "PruneColumn displacements: ";
2616 for(int i=0; i< colneighs; ++i)
2617 {
2618 outs2 << dpls[i] << " ";
2619 }
2620 outs2 << std::endl;
2621 SpParHelper::Print(outs2.str());
2622 MPI_Barrier(World);
2623#endif
2624
2625
2626 MPI_Allgatherv(trxnums, trxsize, MPIType<NT>(), numacc.data(), colsize, dpls, MPIType<NT>(), ColWorld);
2627 delete [] trxnums;
2628 delete [] colsize;
2629 delete [] dpls;
2630
2631 //sanity check
2632 if(accsize != getlocalcols()){
2633 fprintf(stderr, "[PruneColumn]\tmyrank:%d\taccsize:%d\tgetlocalcols():%d\n", myrank, accsize, getlocalcols());
2634 }
2635 assert(accsize == getlocalcols());
2636 if (inPlace)
2637 {
2638 spSeq->PruneColumn(numacc.data(), __binary_op, inPlace);
2639 return SpParMat<IT,NT,DER>(getcommgrid()); // return blank to match signature
2640 }
2641 else
2642 {
2643 return SpParMat<IT,NT,DER>(spSeq->PruneColumn(numacc.data(), __binary_op, inPlace), commGrid);
2644 }
2645}
2646
2647template <class IT, class NT, class DER>
2648template <class IRRELEVANT_NT>
2649void SpParMat<IT,NT,DER>::PruneColumnByIndex(const FullyDistSpVec<IT,IRRELEVANT_NT>& ci)
2650{
2651 MPI_Comm World = ci.commGrid->GetWorld();
2652 MPI_Barrier(World);
2653
2654 if (getncol() != ci.TotalLength())
2655 {
2656 std::ostringstream outs;
2657 outs << "Cannot prune column-by-column, dimensions do not match" << std::endl;
2658 outs << getncol() << " != " << ci.TotalLength() << std::endl;
2659 SpParHelper::Print(outs.str());
2660 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2661 }
2662
2663 if (!(*(getcommgrid()) == *(ci.getcommgrid())))
2664 {
2665 std::cout << "Grids are not comparable for PruneColumnByIndex" << std::endl;
2666 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2667 }
2668
2669 MPI_Comm ColWorld = ci.commGrid->GetColWorld();
2670 int diagneigh = ci.commGrid->GetComplementRank();
2671
2672 IT xlocnz = ci.getlocnnz();
2673 IT xrofst = ci.RowLenUntil();
2674 IT trxrofst;
2675 IT trxlocnz = 0;
2676
2677 MPI_Sendrecv(&xrofst, 1, MPIType<IT>(), diagneigh, TROST, &trxrofst, 1, MPIType<IT>(), diagneigh, TROST, World, MPI_STATUS_IGNORE);
2678 MPI_Sendrecv(&xlocnz, 1, MPIType<IT>(), diagneigh, TRNNZ, &trxlocnz, 1, MPIType<IT>(), diagneigh, TRNNZ, World, MPI_STATUS_IGNORE);
2679
2680 std::vector<IT> trxinds(trxlocnz);
2681
2682 MPI_Sendrecv(ci.ind.data(), xlocnz, MPIType<IT>(), diagneigh, TRI, trxinds.data(), trxlocnz, MPIType<IT>(), diagneigh, TRI, World, MPI_STATUS_IGNORE);
2683
2684 std::transform(trxinds.data(), trxinds.data() + trxlocnz, trxinds.data(), std::bind2nd(std::plus<IT>(), trxrofst));
2685
2686 int colneighs, colrank;
2687 MPI_Comm_size(ColWorld, &colneighs);
2688 MPI_Comm_rank(ColWorld, &colrank);
2689
2690 std::vector<int> colnz(colneighs);
2691 colnz[colrank] = trxlocnz;
2692 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colnz.data(), 1, MPI_INT, ColWorld);
2693 std::vector<int> dpls(colneighs, 0);
2694 std::partial_sum(colnz.begin(), colnz.end()-1, dpls.begin()+1);
2695 IT accnz = std::accumulate(colnz.begin(), colnz.end(), 0);
2696
2697 std::vector<IT> indacc(accnz);
2698 MPI_Allgatherv(trxinds.data(), trxlocnz, MPIType<IT>(), indacc.data(), colnz.data(), dpls.data(), MPIType<IT>(), ColWorld);
2699
2700 std::sort(indacc.begin(), indacc.end());
2701
2702 spSeq->PruneColumnByIndex(indacc);
2703}
2704
2705
2708template <class IT, class NT, class DER>
2709template <typename _BinaryOperation>
2710SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::PruneColumn(const FullyDistSpVec<IT,NT> & pvals, _BinaryOperation __binary_op, bool inPlace)
2711{
2712 //MPI_Barrier(MPI_COMM_WORLD);
2713 MPI_Comm World = pvals.commGrid->GetWorld();
2714 MPI_Barrier(World);
2715 if(getncol() != pvals.TotalLength())
2716 {
2717 std::ostringstream outs;
2718 outs << "Can not prune column-by-column, dimensions does not match"<< std::endl;
2719 outs << getncol() << " != " << pvals.TotalLength() << std::endl;
2720 SpParHelper::Print(outs.str());
2721 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2722 }
2723 if(! ( *(getcommgrid()) == *(pvals.getcommgrid())) )
2724 {
2725 std::cout << "Grids are not comparable for PurneColumn" << std::endl;
2726 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2727 }
2728
2729 MPI_Comm ColWorld = pvals.commGrid->GetColWorld();
2730 int diagneigh = pvals.commGrid->GetComplementRank();
2731
2732 IT xlocnz = pvals.getlocnnz();
2733 IT roffst = pvals.RowLenUntil();
2734 IT roffset;
2735 IT trxlocnz = 0;
2736
2737 MPI_Status status;
2738 MPI_Sendrecv(&roffst, 1, MPIType<IT>(), diagneigh, TROST, &roffset, 1, MPIType<IT>(), diagneigh, TROST, World, &status);
2739 MPI_Sendrecv(&xlocnz, 1, MPIType<IT>(), diagneigh, TRNNZ, &trxlocnz, 1, MPIType<IT>(), diagneigh, TRNNZ, World, &status);
2740
2741 std::vector<IT> trxinds (trxlocnz);
2742 std::vector<NT> trxnums (trxlocnz);
2743 MPI_Sendrecv(pvals.ind.data(), xlocnz, MPIType<IT>(), diagneigh, TRI, trxinds.data(), trxlocnz, MPIType<IT>(), diagneigh, TRI, World, &status);
2744 MPI_Sendrecv(pvals.num.data(), xlocnz, MPIType<NT>(), diagneigh, TRX, trxnums.data(), trxlocnz, MPIType<NT>(), diagneigh, TRX, World, &status);
2745 std::transform(trxinds.data(), trxinds.data()+trxlocnz, trxinds.data(), std::bind2nd(std::plus<IT>(), roffset));
2746
2747 int colneighs, colrank;
2748 MPI_Comm_size(ColWorld, &colneighs);
2749 MPI_Comm_rank(ColWorld, &colrank);
2750 int * colnz = new int[colneighs];
2751 colnz[colrank] = trxlocnz;
2752 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colnz, 1, MPI_INT, ColWorld);
2753 int * dpls = new int[colneighs](); // displacements (zero initialized pid)
2754 std::partial_sum(colnz, colnz+colneighs-1, dpls+1);
2755 IT accnz = std::accumulate(colnz, colnz+colneighs, 0);
2756
2757 std::vector<IT> indacc(accnz);
2758 std::vector<NT> numacc(accnz);
2759 MPI_Allgatherv(trxinds.data(), trxlocnz, MPIType<IT>(), indacc.data(), colnz, dpls, MPIType<IT>(), ColWorld);
2760 MPI_Allgatherv(trxnums.data(), trxlocnz, MPIType<NT>(), numacc.data(), colnz, dpls, MPIType<NT>(), ColWorld);
2761
2762 delete [] colnz;
2763 delete [] dpls;
2764
2765
2766 if (inPlace)
2767 {
2768 spSeq->PruneColumn(indacc.data(), numacc.data(), __binary_op, inPlace);
2769 return SpParMat<IT,NT,DER>(getcommgrid()); // return blank to match signature
2770 }
2771 else
2772 {
2773 return SpParMat<IT,NT,DER>(spSeq->PruneColumn(indacc.data(), numacc.data(), __binary_op, inPlace), commGrid);
2774 }
2775}
2776
2777
2778
2779// In-place version where rhs type is the same (no need for type promotion)
2780template <class IT, class NT, class DER>
2781void SpParMat<IT,NT,DER>::EWiseMult (const SpParMat< IT,NT,DER > & rhs, bool exclude)
2782{
2783 if(*commGrid == *rhs.commGrid)
2784 {
2785 spSeq->EWiseMult(*(rhs.spSeq), exclude); // Dimension compatibility check performed by sequential function
2786 }
2787 else
2788 {
2789 std::cout << "Grids are not comparable, EWiseMult() fails !" << std::endl;
2790 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2791 }
2792}
2793
2794
2795// Aydin (June 2021):
2796// This currently duplicates the work of EWiseMult with exclude = true
2797// However, this is the right way of implementing it because it allows set difference when
2798// the types of two matrices do not have a valid multiplication operator defined
2799// set difference should not require such an operator so we will move all code
2800// bases that use EWiseMult(..., exclude=true) to this one
2801template <class IT, class NT, class DER>
2802void SpParMat<IT,NT,DER>::SetDifference(const SpParMat<IT,NT,DER> & rhs)
2803{
2804 if(*commGrid == *rhs.commGrid)
2805 {
2806 spSeq->SetDifference(*(rhs.spSeq)); // Dimension compatibility check performed by sequential function
2807 }
2808 else
2809 {
2810 std::cout << "Grids are not comparable, SetDifference() fails !" << std::endl;
2811 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2812 }
2813}
2814
2815
2816template <class IT, class NT, class DER>
2817void SpParMat<IT,NT,DER>::EWiseScale(const DenseParMat<IT, NT> & rhs)
2818{
2819 if(*commGrid == *rhs.commGrid)
2820 {
2821 spSeq->EWiseScale(rhs.array, rhs.m, rhs.n); // Dimension compatibility check performed by sequential function
2822 }
2823 else
2824 {
2825 std::cout << "Grids are not comparable, EWiseScale() fails !" << std::endl;
2826 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2827 }
2828}
2829
2830template <class IT, class NT, class DER>
2831template <typename _BinaryOperation>
2832void SpParMat<IT,NT,DER>::UpdateDense(DenseParMat<IT, NT> & rhs, _BinaryOperation __binary_op) const
2833{
2834 if(*commGrid == *rhs.commGrid)
2835 {
2836 if(getlocalrows() == rhs.m && getlocalcols() == rhs.n)
2837 {
2838 spSeq->UpdateDense(rhs.array, __binary_op);
2839 }
2840 else
2841 {
2842 std::cout << "Matrices have different dimensions, UpdateDense() fails !" << std::endl;
2843 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2844 }
2845 }
2846 else
2847 {
2848 std::cout << "Grids are not comparable, UpdateDense() fails !" << std::endl;
2849 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2850 }
2851}
2852
2853template <class IT, class NT, class DER>
2854void SpParMat<IT,NT,DER>::PrintInfo() const
2855{
2856 IT mm = getnrow();
2857 IT nn = getncol();
2858 IT nznz = getnnz();
2859
2860 if (commGrid->myrank == 0)
2861 std::cout << "As a whole: " << mm << " rows and "<< nn <<" columns and "<< nznz << " nonzeros" << std::endl;
2862
2863#ifdef DEBUG
2864 IT allprocs = commGrid->grrows * commGrid->grcols;
2865 for(IT i=0; i< allprocs; ++i)
2866 {
2867 if (commGrid->myrank == i)
2868 {
2869 std::cout << "Processor (" << commGrid->GetRankInProcRow() << "," << commGrid->GetRankInProcCol() << ")'s data: " << std::endl;
2870 spSeq->PrintInfo();
2871 }
2872 MPI_Barrier(commGrid->GetWorld());
2873 }
2874#endif
2875}
2876
2877template <class IT, class NT, class DER>
2878bool SpParMat<IT,NT,DER>::operator== (const SpParMat<IT,NT,DER> & rhs) const
2879{
2880 int local = static_cast<int>((*spSeq) == (*(rhs.spSeq)));
2881 int whole = 1;
2882 MPI_Allreduce( &local, &whole, 1, MPI_INT, MPI_BAND, commGrid->GetWorld());
2883 return static_cast<bool>(whole);
2884}
2885
2886
2891template <class IT, class NT, class DER>
2892template <typename _BinaryOperation, typename LIT>
2893void SpParMat< IT,NT,DER >::SparseCommon(std::vector< std::vector < std::tuple<LIT,LIT,NT> > > & data, LIT locsize, IT total_m, IT total_n, _BinaryOperation BinOp)
2894//void SpParMat< IT,NT,DER >::SparseCommon(std::vector< std::vector < std::tuple<typename DER::LocalIT,typename DER::LocalIT,NT> > > & data, typename DER::LocalIT locsize, IT total_m, IT total_n, _BinaryOperation BinOp)
2895{
2896 //typedef typename DER::LocalIT LIT;
2897 int nprocs = commGrid->GetSize();
2898 int * sendcnt = new int[nprocs];
2899 int * recvcnt = new int[nprocs];
2900 for(int i=0; i<nprocs; ++i)
2901 sendcnt[i] = data[i].size(); // sizes are all the same
2902
2903 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
2904 int * sdispls = new int[nprocs]();
2905 int * rdispls = new int[nprocs]();
2906 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
2907 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
2908 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
2909 IT totsent = std::accumulate(sendcnt,sendcnt+nprocs, static_cast<IT>(0));
2910
2911 assert((totsent < std::numeric_limits<int>::max()));
2912 assert((totrecv < std::numeric_limits<int>::max()));
2913
2914
2915#if 0
2916 ofstream oput;
2917 commGrid->OpenDebugFile("Displacements", oput);
2918 copy(sdispls, sdispls+nprocs, ostream_iterator<int>(oput, " ")); oput << endl;
2919 copy(rdispls, rdispls+nprocs, ostream_iterator<int>(oput, " ")); oput << endl;
2920 oput.close();
2921
2922 IT * gsizes;
2923 if(commGrid->GetRank() == 0) gsizes = new IT[nprocs];
2924 MPI_Gather(&totrecv, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetWorld());
2925 if(commGrid->GetRank() == 0) { std::copy(gsizes, gsizes+nprocs, std::ostream_iterator<IT>(std::cout, " ")); std::cout << std::endl; }
2926 MPI_Barrier(commGrid->GetWorld());
2927 MPI_Gather(&totsent, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetWorld());
2928 if(commGrid->GetRank() == 0) { copy(gsizes, gsizes+nprocs, ostream_iterator<IT>(cout, " ")); cout << endl; }
2929 MPI_Barrier(commGrid->GetWorld());
2930 if(commGrid->GetRank() == 0) delete [] gsizes;
2931#endif
2932
2933 std::tuple<LIT,LIT,NT> * senddata = new std::tuple<LIT,LIT,NT>[locsize]; // re-used for both rows and columns
2934 for(int i=0; i<nprocs; ++i)
2935 {
2936 std::copy(data[i].begin(), data[i].end(), senddata+sdispls[i]);
2937 data[i].clear(); // clear memory
2938 data[i].shrink_to_fit();
2939 }
2940 MPI_Datatype MPI_triple;
2941 MPI_Type_contiguous(sizeof(std::tuple<LIT,LIT,NT>), MPI_CHAR, &MPI_triple);
2942 MPI_Type_commit(&MPI_triple);
2943
2944 std::tuple<LIT,LIT,NT> * recvdata = new std::tuple<LIT,LIT,NT>[totrecv];
2945 MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_triple, recvdata, recvcnt, rdispls, MPI_triple, commGrid->GetWorld());
2946
2947 DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
2948 MPI_Type_free(&MPI_triple);
2949
2950 int r = commGrid->GetGridRows();
2951 int s = commGrid->GetGridCols();
2952 IT m_perproc = total_m / r;
2953 IT n_perproc = total_n / s;
2954 int myprocrow = commGrid->GetRankInProcCol();
2955 int myproccol = commGrid->GetRankInProcRow();
2956 IT locrows, loccols;
2957 if(myprocrow != r-1) locrows = m_perproc;
2958 else locrows = total_m - myprocrow * m_perproc;
2959 if(myproccol != s-1) loccols = n_perproc;
2960 else loccols = total_n - myproccol * n_perproc;
2961
2962 SpTuples<LIT,NT> A(totrecv, locrows, loccols, recvdata); // It is ~SpTuples's job to deallocate
2963
2964 // the previous constructor sorts based on columns-first (but that doesn't matter as long as they are sorted one way or another)
2965 A.RemoveDuplicates(BinOp);
2966
2967 spSeq = new DER(A,false); // Convert SpTuples to DER
2968}
2969
2970
2971
2972template <class IT, class NT, class DER>
2973std::vector<std::vector<SpParMat<IT, NT, DER>>>
2974SpParMat<IT, NT, DER>::BlockSplit (int br, int bc)
2975{
2976 IT g_nr = this->getnrow();
2977 IT g_nc = this->getncol();
2978
2979 if (br == 1 && bc == 1 || (br > g_nr || bc > g_nc))
2980 return std::vector<std::vector<SpParMat<IT, NT, DER>>>
2981 (1, std::vector<SpParMat<IT, NT, DER>>(1, *this));
2982
2983 int np = commGrid->GetSize();
2984 int rank = commGrid->GetRank();
2985
2986 std::vector<std::vector<SpParMat<IT, NT, DER>>>
2987 bmats(br,
2988 std::vector<SpParMat<IT, NT, DER>>
2989 (bc, SpParMat<IT, NT, DER>(commGrid)));
2990 std::vector<std::vector<std::vector<std::vector<std::tuple<IT, IT, NT>>>>>
2991 btuples(br,
2992 std::vector<std::vector<std::vector<std::tuple<IT, IT, NT>>>>
2993 (bc, std::vector<std::vector<std::tuple<IT, IT, NT>>>
2994 (np, std::vector<std::tuple<IT, IT, NT>>())));
2995
2996 assert(spSeq != NULL);
2997
2998 SpTuples<IT, NT> tuples(*spSeq);
2999 IT g_rbeg = (g_nr/commGrid->GetGridRows()) * commGrid->GetRankInProcCol();
3000 IT g_cbeg = (g_nc/commGrid->GetGridCols()) * commGrid->GetRankInProcRow();
3001 IT br_sz = g_nr / br;
3002 IT br_r = g_nr % br;
3003 IT bc_sz = g_nc / bc;
3004 IT bc_r = g_nc % bc;
3005
3006 std::vector<IT> br_sizes(br, br_sz);
3007 std::vector<IT> bc_sizes(bc, bc_sz);
3008 for (IT i = 0; i < br_r; ++i)
3009 ++br_sizes[i];
3010 for (IT i = 0; i < bc_r; ++i)
3011 ++bc_sizes[i];
3012
3013 auto get_block = [](IT x, IT sz, IT r, IT &bid, IT &idx)
3014 {
3015 if (x < (r*(sz+1)))
3016 {
3017 bid = x / (sz+1);
3018 idx = x % (sz+1);
3019 }
3020 else
3021 {
3022 bid = (x-r) / sz;
3023 idx = (x-r) % sz;
3024 }
3025 };
3026
3027
3028 // gather tuples
3029 for (int64_t i = 0; i < tuples.getnnz(); ++i)
3030 {
3031 IT g_ridx = g_rbeg + tuples.rowindex(i);
3032 IT g_cidx = g_cbeg + tuples.colindex(i);
3033
3034 IT rbid, ridx, ridx_l, cbid, cidx, cidx_l;
3035 get_block(g_ridx, br_sz, br_r, rbid, ridx);
3036 get_block(g_cidx, bc_sz, bc_r, cbid, cidx);
3037 int owner = Owner(br_sizes[rbid], bc_sizes[cbid], ridx, cidx,
3038 ridx_l, cidx_l);
3039
3040 btuples[rbid][cbid][owner].push_back({ridx_l, cidx_l, tuples.numvalue(i)});
3041 }
3042
3043
3044 // form matrices
3045 for (int i = 0; i < br; ++i)
3046 {
3047 for (int j = 0; j < bc; ++j)
3048 {
3049 IT locsize = 0;
3050 for (auto &el : btuples[i][j])
3051 locsize += el.size();
3052
3053 auto &M = bmats[i][j];
3054 M.SparseCommon(btuples[i][j], locsize, br_sizes[i], bc_sizes[j],
3055 maximum<NT>()); // there are no duplicates
3056 }
3057 }
3058
3059 return bmats;
3060}
3061
3062
3063
3065template <class IT, class NT, class DER>
3066SpParMat< IT,NT,DER >::SpParMat (IT total_m, IT total_n, const FullyDistVec<IT,IT> & distrows,
3067 const FullyDistVec<IT,IT> & distcols, const FullyDistVec<IT,NT> & distvals, bool SumDuplicates)
3068{
3069 if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
3070 {
3071 SpParHelper::Print("Grids are not comparable, Sparse() fails!\n"); // commGrid is not initialized yet
3072 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
3073 }
3074 if((distrows.TotalLength() != distcols.TotalLength()) || (distcols.TotalLength() != distvals.TotalLength()))
3075 {
3076 SpParHelper::Print("Vectors have different sizes, Sparse() fails!");
3077 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
3078 }
3079
3080 commGrid = distrows.commGrid;
3081 int nprocs = commGrid->GetSize();
3082 std::vector< std::vector < std::tuple<IT,IT,NT> > > data(nprocs);
3083
3084 IT locsize = distrows.LocArrSize();
3085 for(IT i=0; i<locsize; ++i)
3086 {
3087 IT lrow, lcol;
3088 int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
3089 data[owner].push_back(std::make_tuple(lrow,lcol,distvals.arr[i]));
3090 }
3091 if(SumDuplicates)
3092 {
3093 SparseCommon(data, locsize, total_m, total_n, std::plus<NT>());
3094 }
3095 else
3096 {
3097 SparseCommon(data, locsize, total_m, total_n, maximum<NT>());
3098 }
3099}
3100
3101
3102
3103template <class IT, class NT, class DER>
3104SpParMat< IT,NT,DER >::SpParMat (IT total_m, IT total_n, const FullyDistVec<IT,IT> & distrows,
3105 const FullyDistVec<IT,IT> & distcols, const NT & val, bool SumDuplicates)
3106{
3107 if((*(distrows.commGrid) != *(distcols.commGrid)) )
3108 {
3109 SpParHelper::Print("Grids are not comparable, Sparse() fails!\n");
3110 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
3111 }
3112 if((distrows.TotalLength() != distcols.TotalLength()) )
3113 {
3114 SpParHelper::Print("Vectors have different sizes, Sparse() fails!\n");
3115 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
3116 }
3117 commGrid = distrows.commGrid;
3118 int nprocs = commGrid->GetSize();
3119 std::vector< std::vector < std::tuple<IT,IT,NT> > > data(nprocs);
3120
3121 IT locsize = distrows.LocArrSize();
3122 for(IT i=0; i<locsize; ++i)
3123 {
3124 IT lrow, lcol;
3125 int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
3126 data[owner].push_back(std::make_tuple(lrow,lcol,val));
3127 }
3128 if(SumDuplicates)
3129 {
3130 SparseCommon(data, locsize, total_m, total_n, std::plus<NT>());
3131 }
3132 else
3133 {
3134 SparseCommon(data, locsize, total_m, total_n, maximum<NT>());
3135 }
3136}
3137
3138template <class IT, class NT, class DER>
3139template <class DELIT>
3140SpParMat< IT,NT,DER >::SpParMat (const DistEdgeList<DELIT> & DEL, bool removeloops)
3141{
3142 commGrid = DEL.commGrid;
3143 typedef typename DER::LocalIT LIT;
3144
3145 int nprocs = commGrid->GetSize();
3146 int gridrows = commGrid->GetGridRows();
3147 int gridcols = commGrid->GetGridCols();
3148 std::vector< std::vector<LIT> > data(nprocs); // enties are pre-converted to local indices before getting pushed into "data"
3149
3150 LIT m_perproc = DEL.getGlobalV() / gridrows;
3151 LIT n_perproc = DEL.getGlobalV() / gridcols;
3152
3153 if(sizeof(LIT) < sizeof(DELIT))
3154 {
3155 std::ostringstream outs;
3156 outs << "Warning: Using smaller indices for the matrix than DistEdgeList\n";
3157 outs << "Local matrices are " << m_perproc << "-by-" << n_perproc << std::endl;
3158 SpParHelper::Print(outs.str(), commGrid->GetWorld()); // commgrid initialized
3159 }
3160
3161 LIT stages = MEM_EFFICIENT_STAGES; // to lower memory consumption, form sparse matrix in stages
3162
3163 // even if local indices (LIT) are 32-bits, we should work with 64-bits for global info
3164 int64_t perstage = DEL.nedges / stages;
3165 LIT totrecv = 0;
3166 std::vector<LIT> alledges;
3167
3168 for(LIT s=0; s< stages; ++s)
3169 {
3170 int64_t n_befor = s*perstage;
3171 int64_t n_after= ((s==(stages-1))? DEL.nedges : ((s+1)*perstage));
3172
3173 // clear the source vertex by setting it to -1
3174 int realedges = 0; // these are "local" realedges
3175
3176 if(DEL.pedges)
3177 {
3178 for (int64_t i = n_befor; i < n_after; i++)
3179 {
3180 int64_t fr = get_v0_from_edge(&(DEL.pedges[i]));
3181 int64_t to = get_v1_from_edge(&(DEL.pedges[i]));
3182
3183 if(fr >= 0 && to >= 0) // otherwise skip
3184 {
3185 IT lrow, lcol;
3186 int owner = Owner(DEL.getGlobalV(), DEL.getGlobalV(), fr, to, lrow, lcol);
3187 data[owner].push_back(lrow); // row_id
3188 data[owner].push_back(lcol); // col_id
3189 ++realedges;
3190 }
3191 }
3192 }
3193 else
3194 {
3195 for (int64_t i = n_befor; i < n_after; i++)
3196 {
3197 if(DEL.edges[2*i+0] >= 0 && DEL.edges[2*i+1] >= 0) // otherwise skip
3198 {
3199 IT lrow, lcol;
3200 int owner = Owner(DEL.getGlobalV(), DEL.getGlobalV(), DEL.edges[2*i+0], DEL.edges[2*i+1], lrow, lcol);
3201 data[owner].push_back(lrow);
3202 data[owner].push_back(lcol);
3203 ++realedges;
3204 }
3205 }
3206 }
3207
3208 LIT * sendbuf = new LIT[2*realedges];
3209 int * sendcnt = new int[nprocs];
3210 int * sdispls = new int[nprocs];
3211 for(int i=0; i<nprocs; ++i)
3212 sendcnt[i] = data[i].size();
3213
3214 int * rdispls = new int[nprocs];
3215 int * recvcnt = new int[nprocs];
3216 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT,commGrid->GetWorld()); // share the counts
3217
3218 sdispls[0] = 0;
3219 rdispls[0] = 0;
3220 for(int i=0; i<nprocs-1; ++i)
3221 {
3222 sdispls[i+1] = sdispls[i] + sendcnt[i];
3223 rdispls[i+1] = rdispls[i] + recvcnt[i];
3224 }
3225 for(int i=0; i<nprocs; ++i)
3226 std::copy(data[i].begin(), data[i].end(), sendbuf+sdispls[i]);
3227
3228 // clear memory
3229 for(int i=0; i<nprocs; ++i)
3230 std::vector<LIT>().swap(data[i]);
3231
3232 // ABAB: Total number of edges received might not be LIT-addressible
3233 // However, each edge_id is LIT-addressible
3234 IT thisrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0)); // thisrecv = 2*locedges
3235 LIT * recvbuf = new LIT[thisrecv];
3236 totrecv += thisrecv;
3237
3238 MPI_Alltoallv(sendbuf, sendcnt, sdispls, MPIType<LIT>(), recvbuf, recvcnt, rdispls, MPIType<LIT>(), commGrid->GetWorld());
3239 DeleteAll(sendcnt, recvcnt, sdispls, rdispls,sendbuf);
3240 std::copy (recvbuf,recvbuf+thisrecv,std::back_inserter(alledges)); // copy to all edges
3241 delete [] recvbuf;
3242 }
3243
3244 int myprocrow = commGrid->GetRankInProcCol();
3245 int myproccol = commGrid->GetRankInProcRow();
3246 LIT locrows, loccols;
3247 if(myprocrow != gridrows-1) locrows = m_perproc;
3248 else locrows = DEL.getGlobalV() - myprocrow * m_perproc;
3249 if(myproccol != gridcols-1) loccols = n_perproc;
3250 else loccols = DEL.getGlobalV() - myproccol * n_perproc;
3251
3252 SpTuples<LIT,NT> A(totrecv/2, locrows, loccols, alledges, removeloops); // alledges is empty upon return
3253 spSeq = new DER(A,false); // Convert SpTuples to DER
3254}
3255
3256template <class IT, class NT, class DER>
3257IT SpParMat<IT,NT,DER>::RemoveLoops()
3258{
3259 MPI_Comm DiagWorld = commGrid->GetDiagWorld();
3260 IT totrem;
3261 IT removed = 0;
3262 if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
3263 {
3264 typedef typename DER::LocalIT LIT;
3265 SpTuples<LIT,NT> tuples(*spSeq);
3266 delete spSeq;
3267 removed = tuples.RemoveLoops();
3268 spSeq = new DER(tuples, false); // Convert to DER
3269 }
3270 MPI_Allreduce( &removed, & totrem, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
3271 return totrem;
3272}
3273
3274
3275
3276template <class IT, class NT, class DER>
3277void SpParMat<IT,NT,DER>::AddLoops(NT loopval, bool replaceExisting)
3278{
3279 MPI_Comm DiagWorld = commGrid->GetDiagWorld();
3280 if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
3281 {
3282 typedef typename DER::LocalIT LIT;
3283 SpTuples<LIT,NT> tuples(*spSeq);
3284 delete spSeq;
3285 tuples.AddLoops(loopval, replaceExisting);
3286 tuples.SortColBased();
3287 spSeq = new DER(tuples, false); // Convert to DER
3288 }
3289}
3290
3291
3292// Different values on the diagonal
3293template <class IT, class NT, class DER>
3294void SpParMat<IT,NT,DER>::AddLoops(FullyDistVec<IT,NT> loopvals, bool replaceExisting)
3295{
3296
3297
3298 if(*loopvals.commGrid != *commGrid)
3299 {
3300 SpParHelper::Print("Grids are not comparable, SpParMat::AddLoops() fails!\n", commGrid->GetWorld());
3301 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
3302 }
3303 if (getncol()!= loopvals.TotalLength())
3304 {
3305 SpParHelper::Print("The number of entries in loopvals is not equal to the number of diagonal entries.\n");
3306 MPI_Abort(MPI_COMM_WORLD,DIMMISMATCH);
3307 }
3308
3309 // Gather data on the diagonal processor
3310 IT locsize = loopvals.LocArrSize();
3311 int rowProcs = commGrid->GetGridCols();
3312 std::vector<int> recvcnt(rowProcs, 0);
3313 std::vector<int> rdpls(rowProcs, 0);
3314 MPI_Gather(&locsize, 1, MPI_INT, recvcnt.data(), 1, MPI_INT, commGrid->GetDiagOfProcRow(), commGrid->GetRowWorld());
3315 std::partial_sum(recvcnt.data(), recvcnt.data()+rowProcs-1, rdpls.data()+1);
3316
3317 IT totrecv = rdpls[rowProcs-1] + recvcnt[rowProcs-1];
3318 assert((totrecv < std::numeric_limits<int>::max()));
3319
3320 std::vector<NT> rowvals(totrecv);
3321 MPI_Gatherv(loopvals.arr.data(), locsize, MPIType<NT>(), rowvals.data(), recvcnt.data(), rdpls.data(),
3322 MPIType<NT>(), commGrid->GetDiagOfProcRow(), commGrid->GetRowWorld());
3323
3324
3325 MPI_Comm DiagWorld = commGrid->GetDiagWorld();
3326 if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
3327 {
3328 typedef typename DER::LocalIT LIT;
3329 SpTuples<LIT,NT> tuples(*spSeq);
3330 delete spSeq;
3331 tuples.AddLoops(rowvals, replaceExisting);
3332 tuples.SortColBased();
3333 spSeq = new DER(tuples, false); // Convert to DER
3334 }
3335}
3336
3337
3341template <class IT, class NT, class DER>
3342template <typename LIT, typename OT>
3343void SpParMat<IT,NT,DER>::OptimizeForGraph500(OptBuf<LIT,OT> & optbuf)
3344{
3345 if(spSeq->getnsplit() > 0)
3346 {
3347 SpParHelper::Print("Can not declare preallocated buffers for multithreaded execution\n", commGrid->GetWorld());
3348 return;
3349 }
3350
3351 typedef typename DER::LocalIT LocIT; // ABAB: should match the type of LIT. Check?
3352
3353 // Set up communication buffers, one for all
3354 LocIT mA = spSeq->getnrow();
3355 LocIT nA = spSeq->getncol();
3356
3357 int p_c = commGrid->GetGridCols();
3358 int p_r = commGrid->GetGridRows();
3359
3360 LocIT rwperproc = mA / p_c; // per processors in row-wise communication
3361 LocIT cwperproc = nA / p_r; // per processors in column-wise communication
3362
3363#ifdef GATHERVOPT
3364 LocIT * colinds = seq->GetDCSC()->jc; // local nonzero column id's
3365 LocIT locnzc = seq->getnzc();
3366 LocIT cci = 0; // index to column id's array (cci: current column index)
3367 int * gsizes = NULL;
3368 IT * ents = NULL;
3369 IT * dpls = NULL;
3370 std::vector<LocIT> pack2send;
3371
3372 FullyDistSpVec<IT,IT> dummyRHS ( commGrid, getncol()); // dummy RHS vector to estimate index start position
3373 IT recveclen;
3374
3375 for(int pid = 1; pid <= p_r; pid++)
3376 {
3377 IT diagoffset;
3378 MPI_Status status;
3379 IT offset = dummyRHS.RowLenUntil(pid-1);
3380 int diagneigh = commGrid->GetComplementRank();
3381 MPI_Sendrecv(&offset, 1, MPIType<IT>(), diagneigh, TRTAGNZ, &diagoffset, 1, MPIType<IT>(), diagneigh, TRTAGNZ, commGrid->GetWorld(), &status);
3382
3383 LocIT endind = (pid == p_r)? nA : static_cast<LocIT>(pid) * cwperproc; // the last one might have a larger share (is this fitting to the vector boundaries?)
3384 while(cci < locnzc && colinds[cci] < endind)
3385 {
3386 pack2send.push_back(colinds[cci++]-diagoffset);
3387 }
3388 if(pid-1 == myrank) gsizes = new int[p_r];
3389 MPI_Gather(&mysize, 1, MPI_INT, gsizes, 1, MPI_INT, pid-1, commGrid->GetColWorld());
3390 if(pid-1 == myrank)
3391 {
3392 IT colcnt = std::accumulate(gsizes, gsizes+p_r, static_cast<IT>(0));
3393 recvbuf = new IT[colcnt];
3394 dpls = new IT[p_r](); // displacements (zero initialized pid)
3395 std::partial_sum(gsizes, gsizes+p_r-1, dpls+1);
3396 }
3397
3398 // int MPI_Gatherv (void* sbuf, int scount, MPI_Datatype stype, void* rbuf, int *rcount, int* displs, MPI_Datatype rtype, int root, MPI_Comm comm)
3399 MPI_Gatherv(SpHelper::p2a(pack2send), mysize, MPIType<LocIT>(), recvbuf, gsizes, dpls, MPIType<LocIT>(), pid-1, commGrid->GetColWorld());
3400 std::vector<LocIT>().swap(pack2send);
3401
3402 if(pid-1 == myrank)
3403 {
3404 recveclen = dummyRHS.MyLocLength();
3405 std::vector< std::vector<LocIT> > service(recveclen);
3406 for(int i=0; i< p_r; ++i)
3407 {
3408 for(int j=0; j< gsizes[i]; ++j)
3409 {
3410 IT colid2update = recvbuf[dpls[i]+j];
3411 if(service[colid2update].size() < GATHERVNEIGHLIMIT)
3412 {
3413 service.push_back(i);
3414 }
3415 // else don't increase any further and mark it done after the iterations are complete
3416 }
3417 }
3418 }
3419 }
3420#endif
3421
3422
3423 std::vector<bool> isthere(mA, false); // perhaps the only appropriate use of this crippled data structure
3424 std::vector<int> maxlens(p_c,0); // maximum data size to be sent to any neighbor along the processor row
3425
3426 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
3427 {
3428 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3429 {
3430 LocIT rowid = nzit.rowid();
3431 if(!isthere[rowid])
3432 {
3433 LocIT owner = std::min(nzit.rowid() / rwperproc, (LocIT) p_c-1);
3434 maxlens[owner]++;
3435 isthere[rowid] = true;
3436 }
3437 }
3438 }
3439 SpParHelper::Print("Optimization buffers set\n", commGrid->GetWorld());
3440 optbuf.Set(maxlens,mA);
3441}
3442
3443template <class IT, class NT, class DER>
3444void SpParMat<IT,NT,DER>::ActivateThreading(int numsplits)
3445{
3446 spSeq->RowSplit(numsplits);
3447}
3448
3449
3454template <class IT, class NT, class DER>
3455template <typename SR>
3456void SpParMat<IT,NT,DER>::Square ()
3457{
3458 int stages, dummy; // last two parameters of productgrid are ignored for synchronous multiplication
3459 std::shared_ptr<CommGrid> Grid = ProductGrid(commGrid.get(), commGrid.get(), stages, dummy, dummy);
3460
3461 typedef typename DER::LocalIT LIT;
3462
3463 LIT AA_m = spSeq->getnrow();
3464 LIT AA_n = spSeq->getncol();
3465
3466 DER seqTrn = spSeq->TransposeConst(); // will be automatically discarded after going out of scope
3467
3468 MPI_Barrier(commGrid->GetWorld());
3469
3470 LIT ** NRecvSizes = SpHelper::allocate2D<LIT>(DER::esscount, stages);
3471 LIT ** TRecvSizes = SpHelper::allocate2D<LIT>(DER::esscount, stages);
3472
3473 SpParHelper::GetSetSizes( *spSeq, NRecvSizes, commGrid->GetRowWorld());
3474 SpParHelper::GetSetSizes( seqTrn, TRecvSizes, commGrid->GetColWorld());
3475
3476 // Remotely fetched matrices are stored as pointers
3477 DER * NRecv;
3478 DER * TRecv;
3479 std::vector< SpTuples<LIT,NT> *> tomerge;
3480
3481 int Nself = commGrid->GetRankInProcRow();
3482 int Tself = commGrid->GetRankInProcCol();
3483
3484 for(int i = 0; i < stages; ++i)
3485 {
3486 std::vector<LIT> ess;
3487 if(i == Nself) NRecv = spSeq; // shallow-copy
3488 else
3489 {
3490 ess.resize(DER::esscount);
3491 for(int j=0; j< DER::esscount; ++j)
3492 ess[j] = NRecvSizes[j][i]; // essentials of the ith matrix in this row
3493 NRecv = new DER(); // first, create the object
3494 }
3495
3496 SpParHelper::BCastMatrix(Grid->GetRowWorld(), *NRecv, ess, i); // then, broadcast its elements
3497 ess.clear();
3498
3499 if(i == Tself) TRecv = &seqTrn; // shallow-copy
3500 else
3501 {
3502 ess.resize(DER::esscount);
3503 for(int j=0; j< DER::esscount; ++j)
3504 ess[j] = TRecvSizes[j][i];
3505 TRecv = new DER();
3506 }
3507 SpParHelper::BCastMatrix(Grid->GetColWorld(), *TRecv, ess, i);
3508
3509 SpTuples<LIT,NT> * AA_cont = MultiplyReturnTuples<SR, NT>(*NRecv, *TRecv, false, true);
3510 if(!AA_cont->isZero())
3511 tomerge.push_back(AA_cont);
3512
3513 if(i != Nself) delete NRecv;
3514 if(i != Tself) delete TRecv;
3515 }
3516
3517 SpHelper::deallocate2D(NRecvSizes, DER::esscount);
3518 SpHelper::deallocate2D(TRecvSizes, DER::esscount);
3519
3520 delete spSeq;
3521 spSeq = new DER(MergeAll<SR>(tomerge, AA_m, AA_n), false); // First get the result in SpTuples, then convert to UDER
3522 for(unsigned int i=0; i<tomerge.size(); ++i)
3523 delete tomerge[i];
3524}
3525
3526
3527template <class IT, class NT, class DER>
3528void SpParMat<IT,NT,DER>::Transpose()
3529{
3530 if(commGrid->myproccol == commGrid->myprocrow) // Diagonal
3531 {
3532 spSeq->Transpose();
3533 }
3534 else
3535 {
3536 typedef typename DER::LocalIT LIT;
3537 SpTuples<LIT,NT> Atuples(*spSeq);
3538 LIT locnnz = Atuples.getnnz();
3539 LIT * rows = new LIT[locnnz];
3540 LIT * cols = new LIT[locnnz];
3541 NT * vals = new NT[locnnz];
3542 for(LIT i=0; i < locnnz; ++i)
3543 {
3544 rows[i] = Atuples.colindex(i); // swap (i,j) here
3545 cols[i] = Atuples.rowindex(i);
3546 vals[i] = Atuples.numvalue(i);
3547 }
3548 LIT locm = getlocalcols();
3549 LIT locn = getlocalrows();
3550 delete spSeq;
3551
3552 LIT remotem, remoten, remotennz;
3553 std::swap(locm,locn);
3554 int diagneigh = commGrid->GetComplementRank();
3555
3556 MPI_Status status;
3557 MPI_Sendrecv(&locnnz, 1, MPIType<LIT>(), diagneigh, TRTAGNZ, &remotennz, 1, MPIType<LIT>(), diagneigh, TRTAGNZ, commGrid->GetWorld(), &status);
3558 MPI_Sendrecv(&locn, 1, MPIType<LIT>(), diagneigh, TRTAGM, &remotem, 1, MPIType<LIT>(), diagneigh, TRTAGM, commGrid->GetWorld(), &status);
3559 MPI_Sendrecv(&locm, 1, MPIType<LIT>(), diagneigh, TRTAGN, &remoten, 1, MPIType<LIT>(), diagneigh, TRTAGN, commGrid->GetWorld(), &status);
3560
3561 LIT * rowsrecv = new LIT[remotennz];
3562 MPI_Sendrecv(rows, locnnz, MPIType<LIT>(), diagneigh, TRTAGROWS, rowsrecv, remotennz, MPIType<LIT>(), diagneigh, TRTAGROWS, commGrid->GetWorld(), &status);
3563 delete [] rows;
3564
3565 LIT * colsrecv = new LIT[remotennz];
3566 MPI_Sendrecv(cols, locnnz, MPIType<LIT>(), diagneigh, TRTAGCOLS, colsrecv, remotennz, MPIType<LIT>(), diagneigh, TRTAGCOLS, commGrid->GetWorld(), &status);
3567 delete [] cols;
3568
3569 NT * valsrecv = new NT[remotennz];
3570 MPI_Sendrecv(vals, locnnz, MPIType<NT>(), diagneigh, TRTAGVALS, valsrecv, remotennz, MPIType<NT>(), diagneigh, TRTAGVALS, commGrid->GetWorld(), &status);
3571 delete [] vals;
3572
3573 std::tuple<LIT,LIT,NT> * arrtuples = new std::tuple<LIT,LIT,NT>[remotennz];
3574 for(LIT i=0; i< remotennz; ++i)
3575 {
3576 arrtuples[i] = std::make_tuple(rowsrecv[i], colsrecv[i], valsrecv[i]);
3577 }
3578 DeleteAll(rowsrecv, colsrecv, valsrecv);
3579 ColLexiCompare<LIT,NT> collexicogcmp;
3580 sort(arrtuples , arrtuples+remotennz, collexicogcmp ); // sort w.r.t columns here
3581
3582 spSeq = new DER();
3583 spSeq->Create( remotennz, remotem, remoten, arrtuples); // the deletion of arrtuples[] is handled by SpMat::Create
3584 }
3585}
3586
3587
3588template <class IT, class NT, class DER>
3589template <class HANDLER>
3590void SpParMat< IT,NT,DER >::SaveGathered(std::string filename, HANDLER handler, bool transpose) const
3591{
3592 int proccols = commGrid->GetGridCols();
3593 int procrows = commGrid->GetGridRows();
3594 IT totalm = getnrow();
3595 IT totaln = getncol();
3596 IT totnnz = getnnz();
3597 int flinelen = 0;
3598 std::ofstream out;
3599 if(commGrid->GetRank() == 0)
3600 {
3601 std::string s;
3602 std::stringstream strm;
3603 strm << "%%MatrixMarket matrix coordinate real general" << std::endl;
3604 strm << totalm << " " << totaln << " " << totnnz << std::endl;
3605 s = strm.str();
3606 out.open(filename.c_str(),std::ios_base::trunc);
3607 flinelen = s.length();
3608 out.write(s.c_str(), flinelen);
3609 out.close();
3610 }
3611 int colrank = commGrid->GetRankInProcCol();
3612 int colneighs = commGrid->GetGridRows();
3613 IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
3614 locnrows[colrank] = (IT) getlocalrows();
3615 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
3616 IT roffset = std::accumulate(locnrows, locnrows+colrank, 0);
3617 delete [] locnrows;
3618
3619 MPI_Datatype datatype;
3620 MPI_Type_contiguous(sizeof(std::pair<IT,NT>), MPI_CHAR, &datatype);
3621 MPI_Type_commit(&datatype);
3622
3623 for(int i = 0; i < procrows; i++) // for all processor row (in order)
3624 {
3625 if(commGrid->GetRankInProcCol() == i) // only the ith processor row
3626 {
3627 IT localrows = spSeq->getnrow(); // same along the processor row
3628 std::vector< std::vector< std::pair<IT,NT> > > csr(localrows);
3629 if(commGrid->GetRankInProcRow() == 0) // get the head of processor row
3630 {
3631 IT localcols = spSeq->getncol(); // might be different on the last processor on this processor row
3632 MPI_Bcast(&localcols, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
3633 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over nonempty subcolumns
3634 {
3635 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3636 {
3637 csr[nzit.rowid()].push_back( std::make_pair(colit.colid(), nzit.value()) );
3638 }
3639 }
3640 }
3641 else // get the rest of the processors
3642 {
3643 IT n_perproc;
3644 MPI_Bcast(&n_perproc, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
3645 IT noffset = commGrid->GetRankInProcRow() * n_perproc;
3646 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over nonempty subcolumns
3647 {
3648 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3649 {
3650 csr[nzit.rowid()].push_back( std::make_pair(colit.colid() + noffset, nzit.value()) );
3651 }
3652 }
3653 }
3654 std::pair<IT,NT> * ents = NULL;
3655 int * gsizes = NULL, * dpls = NULL;
3656 if(commGrid->GetRankInProcRow() == 0) // only the head of processor row
3657 {
3658 out.open(filename.c_str(),std::ios_base::app);
3659 gsizes = new int[proccols];
3660 dpls = new int[proccols](); // displacements (zero initialized pid)
3661 }
3662 for(int j = 0; j < localrows; ++j)
3663 {
3664 IT rowcnt = 0;
3665 sort(csr[j].begin(), csr[j].end());
3666 int mysize = csr[j].size();
3667 MPI_Gather(&mysize, 1, MPI_INT, gsizes, 1, MPI_INT, 0, commGrid->GetRowWorld());
3668 if(commGrid->GetRankInProcRow() == 0)
3669 {
3670 rowcnt = std::accumulate(gsizes, gsizes+proccols, static_cast<IT>(0));
3671 std::partial_sum(gsizes, gsizes+proccols-1, dpls+1);
3672 ents = new std::pair<IT,NT>[rowcnt]; // nonzero entries in the j'th local row
3673 }
3674
3675 // int MPI_Gatherv (void* sbuf, int scount, MPI_Datatype stype,
3676 // void* rbuf, int *rcount, int* displs, MPI_Datatype rtype, int root, MPI_Comm comm)
3677 MPI_Gatherv(SpHelper::p2a(csr[j]), mysize, datatype, ents, gsizes, dpls, datatype, 0, commGrid->GetRowWorld());
3678 if(commGrid->GetRankInProcRow() == 0)
3679 {
3680 for(int k=0; k< rowcnt; ++k)
3681 {
3682 //out << j + roffset + 1 << "\t" << ents[k].first + 1 <<"\t" << ents[k].second << endl;
3683 if (!transpose)
3684 // regular
3685 out << j + roffset + 1 << "\t" << ents[k].first + 1 << "\t";
3686 else
3687 // transpose row/column
3688 out << ents[k].first + 1 << "\t" << j + roffset + 1 << "\t";
3689 handler.save(out, ents[k].second, j + roffset, ents[k].first);
3690 out << std::endl;
3691 }
3692 delete [] ents;
3693 }
3694 }
3695 if(commGrid->GetRankInProcRow() == 0)
3696 {
3697 DeleteAll(gsizes, dpls);
3698 out.close();
3699 }
3700 } // end_if the ith processor row
3701 MPI_Barrier(commGrid->GetWorld()); // signal the end of ith processor row iteration (so that all processors block)
3702 }
3703}
3704
3705
3708template <class IT, class NT, class DER>
3709MPI_File SpParMat< IT,NT,DER >::TupleRead1stPassNExchange (const std::string & filename, TYPE2SEND * & senddata, IT & totsend,
3710 FullyDistVec<IT,STRASARRAY> & distmapper, uint64_t & totallength)
3711{
3712 int myrank = commGrid->GetRank();
3713 int nprocs = commGrid->GetSize();
3714
3715 MPI_Offset fpos, end_fpos;
3716 struct stat st; // get file size
3717 if (stat(filename.c_str(), &st) == -1)
3718 {
3719 MPI_Abort(MPI_COMM_WORLD, NOFILE);
3720 }
3721 int64_t file_size = st.st_size;
3722 if(myrank == 0) // the offset needs to be for this rank
3723 {
3724 std::cout << "File is " << file_size << " bytes" << std::endl;
3725 }
3726 fpos = myrank * file_size / nprocs;
3727
3728 if(myrank != (nprocs-1)) end_fpos = (myrank + 1) * file_size / nprocs;
3729 else end_fpos = file_size;
3730
3731 MPI_File mpi_fh;
3732 MPI_File_open (commGrid->commWorld, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
3733
3734 typedef std::map<std::string, uint64_t> KEYMAP; // due to potential (but extremely unlikely) collusions in MurmurHash, make the key to the std:map the string itself
3735 std::vector< KEYMAP > allkeys(nprocs); // map keeps the outgoing data unique, we could have applied this to HipMer too
3736
3737 std::vector<std::string> lines;
3738 bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
3739 int64_t entriesread = lines.size();
3740 SpHelper::ProcessLinesWithStringKeys(allkeys, lines,nprocs);
3741
3742 while(!finished)
3743 {
3744 finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
3745
3746 entriesread += lines.size();
3747 SpHelper::ProcessLinesWithStringKeys(allkeys, lines,nprocs);
3748 }
3749 int64_t allentriesread;
3750 MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3751#ifdef COMBBLAS_DEBUG
3752 if(myrank == 0)
3753 std::cout << "Initial reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3754#endif
3755
3756 int * sendcnt = new int[nprocs];
3757 int * recvcnt = new int[nprocs];
3758 for(int i=0; i<nprocs; ++i)
3759 sendcnt[i] = allkeys[i].size();
3760
3761 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
3762 int * sdispls = new int[nprocs]();
3763 int * rdispls = new int[nprocs]();
3764 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
3765 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
3766 totsend = std::accumulate(sendcnt,sendcnt+nprocs, static_cast<IT>(0));
3767 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
3768
3769 assert((totsend < std::numeric_limits<int>::max()));
3770 assert((totrecv < std::numeric_limits<int>::max()));
3771
3772 // The following are declared in SpParMat.h
3773 // typedef std::array<char, MAXVERTNAME> STRASARRAY;
3774 // typedef std::pair< STRASARRAY, uint64_t> TYPE2SEND;
3775 senddata = new TYPE2SEND[totsend];
3776
3777 #pragma omp parallel for
3778 for(int i=0; i<nprocs; ++i)
3779 {
3780 size_t j = 0;
3781 for(auto pobj:allkeys[i])
3782 {
3783 // The naked C-style array type is not copyable or assignable, but pair will require it, hence used std::array
3784 std::array<char, MAXVERTNAME> vname;
3785 std::copy( pobj.first.begin(), pobj.first.end(), vname.begin() );
3786 if(pobj.first.length() < MAXVERTNAME) vname[pobj.first.length()] = '\0'; // null termination
3787
3788 senddata[sdispls[i]+j] = TYPE2SEND(vname, pobj.second);
3789 j++;
3790 }
3791 }
3792 allkeys.clear(); // allkeys is no longer needed after this point
3793
3794 MPI_Datatype MPI_HASH;
3795 MPI_Type_contiguous(sizeof(TYPE2SEND), MPI_CHAR, &MPI_HASH);
3796 MPI_Type_commit(&MPI_HASH);
3797
3798 TYPE2SEND * recvdata = new TYPE2SEND[totrecv];
3799 MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_HASH, recvdata, recvcnt, rdispls, MPI_HASH, commGrid->GetWorld());
3800 // do not delete send buffers yet as we will use them to recv back the data
3801
3802 std::set< std::pair<uint64_t, std::string> > uniqsorted;
3803 for(IT i=0; i< totrecv; ++i)
3804 {
3805 auto locnull = std::find(recvdata[i].first.begin(), recvdata[i].first.end(), '\0'); // find the null character (or string::end)
3806 std::string strtmp(recvdata[i].first.begin(), locnull); // range constructor
3807
3808 uniqsorted.insert(std::make_pair(recvdata[i].second, strtmp));
3809 }
3810 uint64_t uniqsize = uniqsorted.size();
3811
3812#ifdef COMBBLAS_DEBUG
3813 if(myrank == 0)
3814 std::cout << "out of " << totrecv << " vertices received, " << uniqsize << " were unique" << std::endl;
3815#endif
3816 uint64_t sizeuntil = 0;
3817 totallength = 0;
3818 MPI_Exscan( &uniqsize, &sizeuntil, 1, MPIType<uint64_t>(), MPI_SUM, commGrid->GetWorld() );
3819 MPI_Allreduce(&uniqsize, &totallength, 1, MPIType<uint64_t>(), MPI_SUM, commGrid->GetWorld());
3820 if(myrank == 0) sizeuntil = 0; // because MPI_Exscan says the recvbuf in process 0 is undefined
3821
3822 distmapper = FullyDistVec<IT,STRASARRAY>(commGrid, totallength,STRASARRAY{});
3823
3824 // invindex does not conform to FullyDistVec boundaries, otherwise its contents are essentially the same as distmapper
3825 KEYMAP invindex; // KEYMAP is map<string, uint64_t>.
3826 uint64_t locindex = 0;
3827 std::vector< std::vector< IT > > locs_send(nprocs);
3828 std::vector< std::vector< std::string > > data_send(nprocs);
3829 int * map_scnt = new int[nprocs](); // send counts for this map only (to no confuse with the other sendcnt)
3830 for(auto itr = uniqsorted.begin(); itr != uniqsorted.end(); ++itr)
3831 {
3832 uint64_t globalindex = sizeuntil + locindex;
3833 invindex.insert(std::make_pair(itr->second, globalindex));
3834
3835 IT newlocid;
3836 int owner = distmapper.Owner(globalindex, newlocid);
3837
3838 //if(myrank == 0)
3839 // std::cout << "invindex received " << itr->second << " with global index " << globalindex << " to be owned by " << owner << " with index " << newlocid << std::endl;
3840
3841 locs_send[owner].push_back(newlocid);
3842 data_send[owner].push_back(itr->second);
3843 map_scnt[owner]++;
3844 locindex++;
3845 }
3846 uniqsorted.clear(); // clear memory
3847
3848
3849 /* BEGIN: Redistributing the permutation vector to fit the FullyDistVec semantics */
3850 SpParHelper::ReDistributeToVector(map_scnt, locs_send, data_send, distmapper.arr, commGrid->GetWorld()); // map_scnt is deleted here
3851 /* END: Redistributing the permutation vector to fit the FullyDistVec semantics */
3852
3853 for(IT i=0; i< totrecv; ++i)
3854 {
3855 auto locnull = std::find(recvdata[i].first.begin(), recvdata[i].first.end(), '\0');
3856 std::string searchstr(recvdata[i].first.begin(), locnull); // range constructor
3857
3858 auto resp = invindex.find(searchstr); // recvdata[i] is of type pair< STRASARRAY, uint64_t>
3859 if (resp != invindex.end())
3860 {
3861 recvdata[i].second = resp->second; // now instead of random numbers, recvdata's second entry will be its new index
3862 }
3863 else
3864 std::cout << "Assertion failed at proc " << myrank << ": the absence of the entry in invindex is unexpected!!!" << std::endl;
3865 }
3866 MPI_Alltoallv(recvdata, recvcnt, rdispls, MPI_HASH, senddata, sendcnt, sdispls, MPI_HASH, commGrid->GetWorld());
3867 DeleteAll(recvdata, sendcnt, recvcnt, sdispls, rdispls);
3868 MPI_Type_free(&MPI_HASH);
3869
3870 // the following gets deleted here: allkeys
3871 return mpi_fh;
3872}
3873
3874
3875
3880template <class IT, class NT, class DER>
3881template <typename _BinaryOperation>
3882FullyDistVec<IT,std::array<char, MAXVERTNAME> > SpParMat< IT,NT,DER >::ReadGeneralizedTuples (const std::string & filename, _BinaryOperation BinOp)
3883{
3884 int myrank = commGrid->GetRank();
3885 int nprocs = commGrid->GetSize();
3886 TYPE2SEND * senddata;
3887 IT totsend;
3888 uint64_t totallength;
3889 FullyDistVec<IT,STRASARRAY> distmapper(commGrid); // choice of array<char, MAXVERTNAME> over string = array is required to be a contiguous container and an aggregate
3890
3891 MPI_File mpi_fh = TupleRead1stPassNExchange(filename, senddata, totsend, distmapper, totallength);
3892
3893 typedef std::map<std::string, uint64_t> KEYMAP;
3894 KEYMAP ultimateperm; // the ultimate permutation
3895 for(IT i=0; i< totsend; ++i)
3896 {
3897 auto locnull = std::find(senddata[i].first.begin(), senddata[i].first.end(), '\0');
3898
3899 std::string searchstr(senddata[i].first.begin(), locnull);
3900 auto ret = ultimateperm.emplace(std::make_pair(searchstr, senddata[i].second));
3901 if(!ret.second) // the second is the boolean that tells success
3902 {
3903 // remember, we only sent unique vertex ids in the first place so we are expecting unique values in return
3904 std::cout << "the duplication in ultimateperm is unexpected!!!" << std::endl;
3905 }
3906 }
3907 delete [] senddata;
3908
3909 // rename the data now, first reset file pointers
3910 MPI_Offset fpos, end_fpos;
3911 struct stat st; // get file size
3912 if (stat(filename.c_str(), &st) == -1)
3913 {
3914 MPI_Abort(MPI_COMM_WORLD, NOFILE);
3915 }
3916 int64_t file_size = st.st_size;
3917
3918 fpos = myrank * file_size / nprocs;
3919
3920 if(myrank != (nprocs-1)) end_fpos = (myrank + 1) * file_size / nprocs;
3921 else end_fpos = file_size;
3922
3923 typedef typename DER::LocalIT LIT;
3924 std::vector<LIT> rows;
3925 std::vector<LIT> cols;
3926 std::vector<NT> vals;
3927
3928 std::vector<std::string> lines;
3929 bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
3930 int64_t entriesread = lines.size();
3931
3932 SpHelper::ProcessStrLinesNPermute(rows, cols, vals, lines, ultimateperm);
3933
3934 while(!finished)
3935 {
3936 finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
3937 entriesread += lines.size();
3938 SpHelper::ProcessStrLinesNPermute(rows, cols, vals, lines, ultimateperm);
3939 }
3940 int64_t allentriesread;
3941 MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3942#ifdef COMBBLAS_DEBUG
3943 if(myrank == 0)
3944 std::cout << "Second reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3945#endif
3946
3947 MPI_File_close(&mpi_fh);
3948 std::vector< std::vector < std::tuple<LIT,LIT,NT> > > data(nprocs);
3949
3950 LIT locsize = rows.size(); // remember: locsize != entriesread (unless the matrix is unsymmetric)
3951 for(LIT i=0; i<locsize; ++i)
3952 {
3953 LIT lrow, lcol;
3954 int owner = Owner(totallength, totallength, rows[i], cols[i], lrow, lcol);
3955 data[owner].push_back(std::make_tuple(lrow,lcol,vals[i]));
3956 }
3957 std::vector<LIT>().swap(rows);
3958 std::vector<LIT>().swap(cols);
3959 std::vector<NT>().swap(vals);
3960
3961#ifdef COMBBLAS_DEBUG
3962 if(myrank == 0)
3963 std::cout << "Packing to recipients finished, about to send..." << std::endl;
3964#endif
3965
3966 if(spSeq) delete spSeq;
3967 SparseCommon(data, locsize, totallength, totallength, BinOp);
3968 // PrintInfo();
3969 // distmapper.ParallelWrite("distmapper.mtx", 1, CharArraySaveHandler());
3970 return distmapper;
3971}
3972
3973
3974
3978template <class IT, class NT, class DER>
3979template <typename _BinaryOperation>
3980void SpParMat< IT,NT,DER >::ParallelReadMM (const std::string & filename, bool onebased, _BinaryOperation BinOp)
3981{
3982 int32_t type = -1;
3983 int32_t symmetric = 0;
3984 int64_t nrows, ncols, nonzeros;
3985 int64_t linesread = 0;
3986
3987 FILE *f;
3988 int myrank = commGrid->GetRank();
3989 int nprocs = commGrid->GetSize();
3990 if(myrank == 0)
3991 {
3992 MM_typecode matcode;
3993 if ((f = fopen(filename.c_str(), "r")) == NULL)
3994 {
3995 printf("COMBBLAS: Matrix-market file %s can not be found\n", filename.c_str());
3996 MPI_Abort(MPI_COMM_WORLD, NOFILE);
3997 }
3998 if (mm_read_banner(f, &matcode) != 0)
3999 {
4000 printf("Could not process Matrix Market banner.\n");
4001 exit(1);
4002 }
4003 linesread++;
4004
4005 if (mm_is_complex(matcode))
4006 {
4007 printf("Sorry, this application does not support complext types");
4008 printf("Market Market type: [%s]\n", mm_typecode_to_str(matcode));
4009 }
4010 else if(mm_is_real(matcode))
4011 {
4012 std::cout << "Matrix is Float" << std::endl;
4013 type = 0;
4014 }
4015 else if(mm_is_integer(matcode))
4016 {
4017 std::cout << "Matrix is Integer" << std::endl;
4018 type = 1;
4019 }
4020 else if(mm_is_pattern(matcode))
4021 {
4022 std::cout << "Matrix is Boolean" << std::endl;
4023 type = 2;
4024 }
4025 if(mm_is_symmetric(matcode) || mm_is_hermitian(matcode))
4026 {
4027 std::cout << "Matrix is symmetric" << std::endl;
4028 symmetric = 1;
4029 }
4030 int ret_code;
4031 if ((ret_code = mm_read_mtx_crd_size(f, &nrows, &ncols, &nonzeros, &linesread)) !=0) // ABAB: mm_read_mtx_crd_size made 64-bit friendly
4032 exit(1);
4033
4034 std::cout << "Total number of nonzeros expected across all processors is " << nonzeros << std::endl;
4035
4036 }
4037 MPI_Bcast(&type, 1, MPI_INT, 0, commGrid->commWorld);
4038 MPI_Bcast(&symmetric, 1, MPI_INT, 0, commGrid->commWorld);
4039 MPI_Bcast(&nrows, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
4040 MPI_Bcast(&ncols, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
4041 MPI_Bcast(&nonzeros, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
4042
4043 // Use fseek again to go backwards two bytes and check that byte with fgetc
4044 struct stat st; // get file size
4045 if (stat(filename.c_str(), &st) == -1)
4046 {
4047 MPI_Abort(MPI_COMM_WORLD, NOFILE);
4048 }
4049 int64_t file_size = st.st_size;
4050 MPI_Offset fpos, end_fpos, endofheader;
4051 if(commGrid->GetRank() == 0) // the offset needs to be for this rank
4052 {
4053 std::cout << "File is " << file_size << " bytes" << std::endl;
4054 fpos = ftell(f);
4055 endofheader = fpos;
4056 MPI_Bcast(&endofheader, 1, MPIType<MPI_Offset>(), 0, commGrid->commWorld);
4057 fclose(f);
4058 }
4059 else
4060 {
4061 MPI_Bcast(&endofheader, 1, MPIType<MPI_Offset>(), 0, commGrid->commWorld); // receive the file loc at the end of header
4062 fpos = endofheader + myrank * (file_size-endofheader) / nprocs;
4063 }
4064 if(myrank != (nprocs-1)) end_fpos = endofheader + (myrank + 1) * (file_size-endofheader) / nprocs;
4065 else end_fpos = file_size;
4066
4067 MPI_File mpi_fh;
4068 MPI_File_open (commGrid->commWorld, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
4069
4070
4071 typedef typename DER::LocalIT LIT;
4072 std::vector<LIT> rows;
4073 std::vector<LIT> cols;
4074 std::vector<NT> vals;
4075
4076 std::vector<std::string> lines;
4077 bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
4078 int64_t entriesread = lines.size();
4079 SpHelper::ProcessLines(rows, cols, vals, lines, symmetric, type, onebased);
4080 MPI_Barrier(commGrid->commWorld);
4081
4082 while(!finished)
4083 {
4084 finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
4085 entriesread += lines.size();
4086 SpHelper::ProcessLines(rows, cols, vals, lines, symmetric, type, onebased);
4087 }
4088 int64_t allentriesread;
4089 MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
4090#ifdef COMBBLAS_DEBUG
4091 if(myrank == 0)
4092 std::cout << "Reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
4093#endif
4094
4095 std::vector< std::vector < std::tuple<LIT,LIT,NT> > > data(nprocs);
4096
4097 LIT locsize = rows.size(); // remember: locsize != entriesread (unless the matrix is unsymmetric)
4098 for(LIT i=0; i<locsize; ++i)
4099 {
4100 LIT lrow, lcol;
4101 int owner = Owner(nrows, ncols, rows[i], cols[i], lrow, lcol);
4102 data[owner].push_back(std::make_tuple(lrow,lcol,vals[i]));
4103 }
4104 std::vector<LIT>().swap(rows);
4105 std::vector<LIT>().swap(cols);
4106 std::vector<NT>().swap(vals);
4107
4108#ifdef COMBBLAS_DEBUG
4109 if(myrank == 0)
4110 std::cout << "Packing to recepients finished, about to send..." << std::endl;
4111#endif
4112
4113 if(spSeq) delete spSeq;
4114 SparseCommon(data, locsize, nrows, ncols, BinOp);
4115}
4116
4117
4118template <class IT, class NT, class DER>
4119template <class HANDLER>
4120void SpParMat< IT,NT,DER >::ParallelWriteMM(const std::string & filename, bool onebased, HANDLER handler)
4121{
4122 int myrank = commGrid->GetRank();
4123 int nprocs = commGrid->GetSize();
4124 IT totalm = getnrow();
4125 IT totaln = getncol();
4126 IT totnnz = getnnz();
4127
4128 std::stringstream ss;
4129 if(myrank == 0)
4130 {
4131 ss << "%%MatrixMarket matrix coordinate real general" << std::endl;
4132 ss << totalm << " " << totaln << " " << totnnz << std::endl;
4133 }
4134
4135 IT entries = getlocalnnz();
4136 IT sizeuntil = 0;
4137 MPI_Exscan( &entries, &sizeuntil, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld() );
4138 if(myrank == 0) sizeuntil = 0; // because MPI_Exscan says the recvbuf in process 0 is undefined
4139
4140 IT roffset = 0;
4141 IT coffset = 0;
4142 GetPlaceInGlobalGrid(roffset, coffset);
4143 if(onebased)
4144 {
4145 roffset += 1; // increment by 1
4146 coffset += 1;
4147 }
4148
4149 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over nonempty subcolumns
4150 {
4151 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
4152 {
4153 IT glrowid = nzit.rowid() + roffset;
4154 IT glcolid = colit.colid() + coffset;
4155 ss << glrowid << '\t';
4156 ss << glcolid << '\t';
4157 handler.save(ss, nzit.value(), glrowid, glcolid);
4158 ss << '\n';
4159 }
4160 }
4161 std::string text = ss.str();
4162
4163 int64_t * bytes = new int64_t[nprocs];
4164 bytes[myrank] = text.size();
4165 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<int64_t>(), bytes, 1, MPIType<int64_t>(), commGrid->GetWorld());
4166 int64_t bytesuntil = std::accumulate(bytes, bytes+myrank, static_cast<int64_t>(0));
4167 int64_t bytestotal = std::accumulate(bytes, bytes+nprocs, static_cast<int64_t>(0));
4168
4169
4170 MPI_File thefile;
4171 MPI_File_open(commGrid->GetWorld(), (char*) filename.c_str(), MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile) ;
4172 int mpi_err = MPI_File_set_view(thefile, bytesuntil, MPI_CHAR, MPI_CHAR, (char*)"external32", MPI_INFO_NULL);
4173 if (mpi_err == 51) {
4174 // external32 datarep is not supported, use native instead
4175 MPI_File_set_view(thefile, bytesuntil, MPI_CHAR, MPI_CHAR, (char*)"native", MPI_INFO_NULL);
4176 }
4177
4178 int64_t batchSize = 256 * 1024 * 1024;
4179 size_t localfileptr = 0;
4180 int64_t remaining = bytes[myrank];
4181 int64_t totalremaining = bytestotal;
4182
4183 while(totalremaining > 0)
4184 {
4185 #ifdef COMBBLAS_DEBUG
4186 if(myrank == 0)
4187 std::cout << "Remaining " << totalremaining << " bytes to write in aggregate" << std::endl;
4188 #endif
4189 MPI_Status status;
4190 int curBatch = std::min(batchSize, remaining);
4191 MPI_File_write_all(thefile, text.c_str()+localfileptr, curBatch, MPI_CHAR, &status);
4192 int count;
4193 MPI_Get_count(&status, MPI_CHAR, &count); // known bug: https://github.com/pmodels/mpich/issues/2332
4194 assert( (curBatch == 0) || (count == curBatch) ); // count can return the previous/wrong value when 0 elements are written
4195 localfileptr += curBatch;
4196 remaining -= curBatch;
4197 MPI_Allreduce(&remaining, &totalremaining, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetWorld());
4198 }
4199 MPI_File_close(&thefile);
4200
4201 delete [] bytes;
4202}
4203
4204
4205
4209template <class IT, class NT, class DER>
4210template <class HANDLER>
4211void SpParMat< IT,NT,DER >::ReadDistribute (const std::string & filename, int master, bool nonum, HANDLER handler, bool transpose, bool pario)
4212{
4213#ifdef TAU_PROFILE
4214 TAU_PROFILE_TIMER(rdtimer, "ReadDistribute", "void SpParMat::ReadDistribute (const string & , int, bool, HANDLER, bool)", TAU_DEFAULT);
4215 TAU_PROFILE_START(rdtimer);
4216#endif
4217
4218 std::ifstream infile;
4219 FILE * binfile = NULL; // points to "past header" if the file is binary
4220 int seeklength = 0;
4221 HeaderInfo hfile;
4222 if(commGrid->GetRank() == master) // 1 processor
4223 {
4224 hfile = ParseHeader(filename, binfile, seeklength);
4225 }
4226 MPI_Bcast(&seeklength, 1, MPI_INT, master, commGrid->commWorld);
4227
4228 IT total_m, total_n, total_nnz;
4229 IT m_perproc = 0, n_perproc = 0;
4230
4231 int colneighs = commGrid->GetGridRows(); // number of neighbors along this processor column (including oneself)
4232 int rowneighs = commGrid->GetGridCols(); // number of neighbors along this processor row (including oneself)
4233
4234 IT buffpercolneigh = MEMORYINBYTES / (colneighs * (2 * sizeof(IT) + sizeof(NT)));
4235 IT buffperrowneigh = MEMORYINBYTES / (rowneighs * (2 * sizeof(IT) + sizeof(NT)));
4236 if(pario)
4237 {
4238 // since all colneighs will be reading the data at the same time
4239 // chances are they might all read the data that should go to one
4240 // in that case buffperrowneigh > colneighs * buffpercolneigh
4241 // in order not to overflow
4242 buffpercolneigh /= colneighs;
4243 if(seeklength == 0)
4244 SpParHelper::Print("COMBBLAS: Parallel I/O requested but binary header is corrupted\n", commGrid->GetWorld());
4245 }
4246
4247 // make sure that buffperrowneigh >= buffpercolneigh to cover for this patological case:
4248 // -- all data received by a given column head (by vertical communication) are headed to a single processor along the row
4249 // -- then making sure buffperrowneigh >= buffpercolneigh guarantees that the horizontal buffer will never overflow
4250 buffperrowneigh = std::max(buffperrowneigh, buffpercolneigh);
4251 if(std::max(buffpercolneigh * colneighs, buffperrowneigh * rowneighs) > std::numeric_limits<int>::max())
4252 {
4253 SpParHelper::Print("COMBBLAS: MPI doesn't support sending int64_t send/recv counts or displacements\n", commGrid->GetWorld());
4254 }
4255
4256 int * cdispls = new int[colneighs];
4257 for (IT i=0; i<colneighs; ++i) cdispls[i] = i*buffpercolneigh;
4258 int * rdispls = new int[rowneighs];
4259 for (IT i=0; i<rowneighs; ++i) rdispls[i] = i*buffperrowneigh;
4260
4261 int *ccurptrs = NULL, *rcurptrs = NULL;
4262 int recvcount = 0;
4263 IT * rows = NULL;
4264 IT * cols = NULL;
4265 NT * vals = NULL;
4266
4267 // Note: all other column heads that initiate the horizontal communication has the same "rankinrow" with the master
4268 int rankincol = commGrid->GetRankInProcCol(master); // get master's rank in its processor column
4269 int rankinrow = commGrid->GetRankInProcRow(master);
4270 std::vector< std::tuple<IT, IT, NT> > localtuples;
4271
4272 if(commGrid->GetRank() == master) // 1 processor
4273 {
4274 if( !hfile.fileexists )
4275 {
4276 SpParHelper::Print( "COMBBLAS: Input file doesn't exist\n", commGrid->GetWorld());
4277 total_n = 0; total_m = 0;
4278 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4279 return;
4280 }
4281 if (hfile.headerexists && hfile.format == 1)
4282 {
4283 SpParHelper::Print("COMBBLAS: Ascii input with binary headers is not supported\n", commGrid->GetWorld());
4284 total_n = 0; total_m = 0;
4285 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4286 return;
4287 }
4288 if ( !hfile.headerexists ) // no header - ascii file (at this point, file exists)
4289 {
4290 infile.open(filename.c_str());
4291 char comment[256];
4292 infile.getline(comment,256);
4293 while(comment[0] == '%')
4294 {
4295 infile.getline(comment,256);
4296 }
4297 std::stringstream ss;
4298 ss << std::string(comment);
4299 ss >> total_m >> total_n >> total_nnz;
4300 if(pario)
4301 {
4302 SpParHelper::Print("COMBBLAS: Trying to read binary headerless file in parallel, aborting\n", commGrid->GetWorld());
4303 total_n = 0; total_m = 0;
4304 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4305 return;
4306 }
4307 }
4308 else // hfile.headerexists && hfile.format == 0
4309 {
4310 total_m = hfile.m;
4311 total_n = hfile.n;
4312 total_nnz = hfile.nnz;
4313 }
4314 m_perproc = total_m / colneighs;
4315 n_perproc = total_n / rowneighs;
4316 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4317 AllocateSetBuffers(rows, cols, vals, rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
4318
4319 if(seeklength > 0 && pario) // sqrt(p) processors also do parallel binary i/o
4320 {
4321 IT entriestoread = total_nnz / colneighs;
4322 #ifdef IODEBUG
4323 std::ofstream oput;
4324 commGrid->OpenDebugFile("Read", oput);
4325 oput << "Total nnz: " << total_nnz << " entries to read: " << entriestoread << std::endl;
4326 oput.close();
4327 #endif
4328 ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
4329 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
4330 }
4331 else // only this (master) is doing I/O (text or binary)
4332 {
4333 IT temprow, tempcol;
4334 NT tempval;
4335 IT ntrow = 0, ntcol = 0; // not transposed row and column index
4336 char line[1024];
4337 bool nonumline = nonum;
4338 IT cnz = 0;
4339 for(; cnz < total_nnz; ++cnz)
4340 {
4341 int colrec;
4342 size_t commonindex;
4343 std::stringstream linestream;
4344 if( (!hfile.headerexists) && (!infile.eof()))
4345 {
4346 // read one line at a time so that missing numerical values can be detected
4347 infile.getline(line, 1024);
4348 linestream << line;
4349 linestream >> temprow >> tempcol;
4350 if (!nonum)
4351 {
4352 // see if this line has a value
4353 linestream >> std::skipws;
4354 nonumline = linestream.eof();
4355 }
4356 --temprow; // file is 1-based where C-arrays are 0-based
4357 --tempcol;
4358 ntrow = temprow;
4359 ntcol = tempcol;
4360 }
4361 else if(hfile.headerexists && (!feof(binfile)) )
4362 {
4363 handler.binaryfill(binfile, temprow , tempcol, tempval);
4364 }
4365 if (transpose)
4366 {
4367 IT swap = temprow;
4368 temprow = tempcol;
4369 tempcol = swap;
4370 }
4371 colrec = std::min(static_cast<int>(temprow / m_perproc), colneighs-1); // precipient processor along the column
4372 commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
4373
4374 rows[ commonindex ] = temprow;
4375 cols[ commonindex ] = tempcol;
4376 if( (!hfile.headerexists) && (!infile.eof()))
4377 {
4378 vals[ commonindex ] = nonumline ? handler.getNoNum(ntrow, ntcol) : handler.read(linestream, ntrow, ntcol); //tempval;
4379 }
4380 else if(hfile.headerexists && (!feof(binfile)) )
4381 {
4382 vals[ commonindex ] = tempval;
4383 }
4384 ++ (ccurptrs[colrec]);
4385 if(ccurptrs[colrec] == buffpercolneigh || (cnz == (total_nnz-1)) ) // one buffer is full, or file is done !
4386 {
4387 MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld); // first, send the receive counts
4388
4389 // generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
4390 IT * temprows = new IT[recvcount];
4391 IT * tempcols = new IT[recvcount];
4392 NT * tempvals = new NT[recvcount];
4393
4394 // then, send all buffers that to their recipients ...
4395 MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
4396 MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
4397 MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankincol, commGrid->colWorld);
4398
4399 std::fill_n(ccurptrs, colneighs, 0); // finally, reset current pointers !
4400 DeleteAll(rows, cols, vals);
4401
4402 HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
4403 buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
4404
4405 if( cnz != (total_nnz-1) ) // otherwise the loop will exit with noone to claim memory back
4406 {
4407 // reuse these buffers for the next vertical communication
4408 rows = new IT [ buffpercolneigh * colneighs ];
4409 cols = new IT [ buffpercolneigh * colneighs ];
4410 vals = new NT [ buffpercolneigh * colneighs ];
4411 }
4412 } // end_if for "send buffer is full" case
4413 } // end_for for "cnz < entriestoread" case
4414 assert (cnz == total_nnz);
4415
4416 // Signal the end of file to other processors along the column
4417 std::fill_n(ccurptrs, colneighs, std::numeric_limits<int>::max());
4418 MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld);
4419
4420 // And along the row ...
4421 std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
4422 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4423 } // end of "else" (only one processor reads) block
4424 } // end_if for "master processor" case
4425 else if( commGrid->OnSameProcCol(master) ) // (r-1) processors
4426 {
4427 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4428 m_perproc = total_m / colneighs;
4429 n_perproc = total_n / rowneighs;
4430
4431 if(seeklength > 0 && pario) // these processors also do parallel binary i/o
4432 {
4433 binfile = fopen(filename.c_str(), "rb");
4434 IT entrysize = handler.entrylength();
4435 int myrankincol = commGrid->GetRankInProcCol();
4436 IT perreader = total_nnz / colneighs;
4437 IT read_offset = entrysize * static_cast<IT>(myrankincol) * perreader + seeklength;
4438 IT entriestoread = perreader;
4439 if (myrankincol == colneighs-1)
4440 entriestoread = total_nnz - static_cast<IT>(myrankincol) * perreader;
4441 fseek(binfile, read_offset, SEEK_SET);
4442
4443 #ifdef IODEBUG
4444 std::ofstream oput;
4445 commGrid->OpenDebugFile("Read", oput);
4446 oput << "Total nnz: " << total_nnz << " OFFSET : " << read_offset << " entries to read: " << entriestoread << std::endl;
4447 oput.close();
4448 #endif
4449
4450 AllocateSetBuffers(rows, cols, vals, rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
4451 ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
4452 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
4453 }
4454 else // only master does the I/O
4455 {
4456 while(total_n > 0 || total_m > 0) // otherwise input file does not exist !
4457 {
4458 // void MPI::Comm::Scatterv(const void* sendbuf, const int sendcounts[], const int displs[], const MPI::Datatype& sendtype,
4459 // void* recvbuf, int recvcount, const MPI::Datatype & recvtype, int root) const
4460 // The outcome is as if the root executed n send operations,
4461 // MPI_Send(sendbuf + displs[i] * extent(sendtype), sendcounts[i], sendtype, i, ...)
4462 // and each process executed a receive,
4463 // MPI_Recv(recvbuf, recvcount, recvtype, root, ...)
4464 // The send buffer is ignored for all nonroot processes.
4465
4466 MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld); // first receive the receive counts ...
4467 if( recvcount == std::numeric_limits<int>::max()) break;
4468
4469 // create space for incoming data ...
4470 IT * temprows = new IT[recvcount];
4471 IT * tempcols = new IT[recvcount];
4472 NT * tempvals = new NT[recvcount];
4473
4474 // receive actual data ... (first 4 arguments are ignored in the receiver side)
4475 MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
4476 MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
4477 MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankincol, commGrid->colWorld);
4478
4479 // now, send the data along the horizontal
4480 rcurptrs = new int[rowneighs];
4481 std::fill_n(rcurptrs, rowneighs, 0);
4482
4483 // HorizontalSend frees the memory of temp_xxx arrays and then creates and frees memory of all the six arrays itself
4484 HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
4485 buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
4486 }
4487 }
4488
4489 // Signal the end of file to other processors along the row
4490 std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
4491 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4492 delete [] rcurptrs;
4493 }
4494 else // r * (s-1) processors that only participate in the horizontal communication step
4495 {
4496 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
4497
4498 m_perproc = total_m / colneighs;
4499 n_perproc = total_n / rowneighs;
4500 while(total_n > 0 || total_m > 0) // otherwise input file does not exist !
4501 {
4502 // receive the receive count
4503 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4504 if( recvcount == std::numeric_limits<int>::max())
4505 break;
4506
4507 #ifdef IODEBUG
4508 std::ofstream oput;
4509 commGrid->OpenDebugFile("Read", oput);
4510 oput << "Total nnz: " << total_nnz << " total_m : " << total_m << " recvcount: " << recvcount << std::endl;
4511 oput.close();
4512 #endif
4513
4514 // create space for incoming data ...
4515 IT * temprows = new IT[recvcount];
4516 IT * tempcols = new IT[recvcount];
4517 NT * tempvals = new NT[recvcount];
4518
4519 MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4520 MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4521 MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankinrow, commGrid->rowWorld);
4522
4523 // now push what is ours to tuples
4524 IT moffset = commGrid->myprocrow * m_perproc;
4525 IT noffset = commGrid->myproccol * n_perproc;
4526
4527 for(IT i=0; i< recvcount; ++i)
4528 {
4529 localtuples.push_back( std::make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
4530 }
4531 DeleteAll(temprows,tempcols,tempvals);
4532 }
4533 }
4534 DeleteAll(cdispls, rdispls);
4535 std::tuple<IT,IT,NT> * arrtuples = new std::tuple<IT,IT,NT>[localtuples.size()]; // the vector will go out of scope, make it stick !
4536 std::copy(localtuples.begin(), localtuples.end(), arrtuples);
4537
4538 IT localm = (commGrid->myprocrow != (commGrid->grrows-1))? m_perproc: (total_m - (m_perproc * (commGrid->grrows-1)));
4539 IT localn = (commGrid->myproccol != (commGrid->grcols-1))? n_perproc: (total_n - (n_perproc * (commGrid->grcols-1)));
4540 spSeq->Create( localtuples.size(), localm, localn, arrtuples); // the deletion of arrtuples[] is handled by SpMat::Create
4541
4542#ifdef TAU_PROFILE
4543 TAU_PROFILE_STOP(rdtimer);
4544#endif
4545 return;
4546}
4547
4548template <class IT, class NT, class DER>
4549void SpParMat<IT,NT,DER>::AllocateSetBuffers(IT * & rows, IT * & cols, NT * & vals, int * & rcurptrs, int * & ccurptrs, int rowneighs, int colneighs, IT buffpercolneigh)
4550{
4551 // allocate buffers on the heap as stack space is usually limited
4552 rows = new IT [ buffpercolneigh * colneighs ];
4553 cols = new IT [ buffpercolneigh * colneighs ];
4554 vals = new NT [ buffpercolneigh * colneighs ];
4555
4556 ccurptrs = new int[colneighs];
4557 rcurptrs = new int[rowneighs];
4558 std::fill_n(ccurptrs, colneighs, 0); // fill with zero
4559 std::fill_n(rcurptrs, rowneighs, 0);
4560}
4561
4562template <class IT, class NT, class DER>
4563void SpParMat<IT,NT,DER>::BcastEssentials(MPI_Comm & world, IT & total_m, IT & total_n, IT & total_nnz, int master)
4564{
4565 MPI_Bcast(&total_m, 1, MPIType<IT>(), master, world);
4566 MPI_Bcast(&total_n, 1, MPIType<IT>(), master, world);
4567 MPI_Bcast(&total_nnz, 1, MPIType<IT>(), master, world);
4568}
4569
4570/*
4571 * @post {rows, cols, vals are pre-allocated on the heap after this call}
4572 * @post {ccurptrs are set to zero; so that if another call is made to this function without modifying ccurptrs, no data will be send from this procesor}
4573 */
4574template <class IT, class NT, class DER>
4575void SpParMat<IT,NT,DER>::VerticalSend(IT * & rows, IT * & cols, NT * & vals, std::vector< std::tuple<IT,IT,NT> > & localtuples, int * rcurptrs, int * ccurptrs, int * rdispls, int * cdispls,
4576 IT m_perproc, IT n_perproc, int rowneighs, int colneighs, IT buffperrowneigh, IT buffpercolneigh, int rankinrow)
4577{
4578 // first, send/recv the counts ...
4579 int * colrecvdispls = new int[colneighs];
4580 int * colrecvcounts = new int[colneighs];
4581 MPI_Alltoall(ccurptrs, 1, MPI_INT, colrecvcounts, 1, MPI_INT, commGrid->colWorld); // share the request counts
4582 int totrecv = std::accumulate(colrecvcounts,colrecvcounts+colneighs,0);
4583 colrecvdispls[0] = 0; // receive displacements are exact whereas send displacements have slack
4584 for(int i=0; i<colneighs-1; ++i)
4585 colrecvdispls[i+1] = colrecvdispls[i] + colrecvcounts[i];
4586
4587 // generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
4588 IT * temprows = new IT[totrecv];
4589 IT * tempcols = new IT[totrecv];
4590 NT * tempvals = new NT[totrecv];
4591
4592 // then, exchange all buffers that to their recipients ...
4593 MPI_Alltoallv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
4594 MPI_Alltoallv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
4595 MPI_Alltoallv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, colrecvcounts, colrecvdispls, MPIType<NT>(), commGrid->colWorld);
4596
4597 // finally, reset current pointers !
4598 std::fill_n(ccurptrs, colneighs, 0);
4599 DeleteAll(colrecvdispls, colrecvcounts);
4600 DeleteAll(rows, cols, vals);
4601
4602 // rcurptrs/rdispls are zero initialized scratch space
4603 HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls, buffperrowneigh, rowneighs, totrecv, m_perproc, n_perproc, rankinrow);
4604
4605 // reuse these buffers for the next vertical communication
4606 rows = new IT [ buffpercolneigh * colneighs ];
4607 cols = new IT [ buffpercolneigh * colneighs ];
4608 vals = new NT [ buffpercolneigh * colneighs ];
4609}
4610
4611
4618template <class IT, class NT, class DER>
4619template <class HANDLER>
4620void SpParMat<IT,NT,DER>::ReadAllMine(FILE * binfile, IT * & rows, IT * & cols, NT * & vals, std::vector< std::tuple<IT,IT,NT> > & localtuples, int * rcurptrs, int * ccurptrs, int * rdispls, int * cdispls,
4621 IT m_perproc, IT n_perproc, int rowneighs, int colneighs, IT buffperrowneigh, IT buffpercolneigh, IT entriestoread, HANDLER handler, int rankinrow, bool transpose)
4622{
4623 assert(entriestoread != 0);
4624 IT cnz = 0;
4625 IT temprow, tempcol;
4626 NT tempval;
4627 int finishedglobal = 1;
4628 while(cnz < entriestoread && !feof(binfile)) // this loop will execute at least once
4629 {
4630 handler.binaryfill(binfile, temprow , tempcol, tempval);
4631
4632 if (transpose)
4633 {
4634 IT swap = temprow;
4635 temprow = tempcol;
4636 tempcol = swap;
4637 }
4638 int colrec = std::min(static_cast<int>(temprow / m_perproc), colneighs-1); // precipient processor along the column
4639 size_t commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
4640 rows[ commonindex ] = temprow;
4641 cols[ commonindex ] = tempcol;
4642 vals[ commonindex ] = tempval;
4643 ++ (ccurptrs[colrec]);
4644 if(ccurptrs[colrec] == buffpercolneigh || (cnz == (entriestoread-1)) ) // one buffer is full, or this processor's share is done !
4645 {
4646 #ifdef IODEBUG
4647 std::ofstream oput;
4648 commGrid->OpenDebugFile("Read", oput);
4649 oput << "To column neighbors: ";
4650 std::copy(ccurptrs, ccurptrs+colneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4651 oput.close();
4652 #endif
4653
4654 VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
4655 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
4656
4657 if(cnz == (entriestoread-1)) // last execution of the outer loop
4658 {
4659 int finishedlocal = 1; // I am done, but let me check others
4660 MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4661 while(!finishedglobal)
4662 {
4663 #ifdef IODEBUG
4664 std::ofstream oput;
4665 commGrid->OpenDebugFile("Read", oput);
4666 oput << "To column neighbors: ";
4667 std::copy(ccurptrs, ccurptrs+colneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4668 oput.close();
4669 #endif
4670
4671 // postcondition of VerticalSend: ccurptrs are set to zero
4672 // if another call is made to this function without modifying ccurptrs, no data will be send from this procesor
4673 VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
4674 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
4675
4676 MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4677 }
4678 }
4679 else // the other loop will continue executing
4680 {
4681 int finishedlocal = 0;
4682 MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4683 }
4684 } // end_if for "send buffer is full" case
4685 ++cnz;
4686 }
4687
4688 // signal the end to row neighbors
4689 std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
4690 int recvcount;
4691 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4692}
4693
4694
4701template <class IT, class NT, class DER>
4702void SpParMat<IT,NT,DER>::HorizontalSend(IT * & rows, IT * & cols, NT * & vals, IT * & temprows, IT * & tempcols, NT * & tempvals, std::vector < std::tuple <IT,IT,NT> > & localtuples,
4703 int * rcurptrs, int * rdispls, IT buffperrowneigh, int rowneighs, int recvcount, IT m_perproc, IT n_perproc, int rankinrow)
4704{
4705 rows = new IT [ buffperrowneigh * rowneighs ];
4706 cols = new IT [ buffperrowneigh * rowneighs ];
4707 vals = new NT [ buffperrowneigh * rowneighs ];
4708
4709 // prepare to send the data along the horizontal
4710 for(int i=0; i< recvcount; ++i)
4711 {
4712 int rowrec = std::min(static_cast<int>(tempcols[i] / n_perproc), rowneighs-1);
4713 rows[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = temprows[i];
4714 cols[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempcols[i];
4715 vals[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempvals[i];
4716 ++ (rcurptrs[rowrec]);
4717 }
4718
4719 #ifdef IODEBUG
4720 std::ofstream oput;
4721 commGrid->OpenDebugFile("Read", oput);
4722 oput << "To row neighbors: ";
4723 std::copy(rcurptrs, rcurptrs+rowneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4724 oput << "Row displacements were: ";
4725 std::copy(rdispls, rdispls+rowneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4726 oput.close();
4727 #endif
4728
4729 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld); // Send the receive counts for horizontal communication
4730
4731 // the data is now stored in rows/cols/vals, can reset temporaries
4732 // sets size and capacity to new recvcount
4733 DeleteAll(temprows, tempcols, tempvals);
4734 temprows = new IT[recvcount];
4735 tempcols = new IT[recvcount];
4736 tempvals = new NT[recvcount];
4737
4738 // then, send all buffers that to their recipients ...
4739 MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4740 MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4741 MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankinrow, commGrid->rowWorld);
4742
4743 // now push what is ours to tuples
4744 IT moffset = commGrid->myprocrow * m_perproc;
4745 IT noffset = commGrid->myproccol * n_perproc;
4746
4747 for(int i=0; i< recvcount; ++i)
4748 {
4749 localtuples.push_back( std::make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
4750 }
4751
4752 std::fill_n(rcurptrs, rowneighs, 0);
4753 DeleteAll(rows, cols, vals, temprows, tempcols, tempvals);
4754}
4755
4756
4759template <class IT, class NT, class DER>
4760void SpParMat<IT,NT,DER>::Find (FullyDistVec<IT,IT> & distrows, FullyDistVec<IT,IT> & distcols, FullyDistVec<IT,NT> & distvals) const
4761{
4762 if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
4763 {
4764 SpParHelper::Print("Grids are not comparable, Find() fails!", commGrid->GetWorld());
4765 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
4766 }
4767 IT globallen = getnnz();
4768 SpTuples<IT,NT> Atuples(*spSeq);
4769
4770 FullyDistVec<IT,IT> nrows ( distrows.commGrid, globallen, 0);
4771 FullyDistVec<IT,IT> ncols ( distcols.commGrid, globallen, 0);
4772 FullyDistVec<IT,NT> nvals ( distvals.commGrid, globallen, NT());
4773
4774 IT prelen = Atuples.getnnz();
4775 //IT postlen = nrows.MyLocLength();
4776
4777 int rank = commGrid->GetRank();
4778 int nprocs = commGrid->GetSize();
4779 IT * prelens = new IT[nprocs];
4780 prelens[rank] = prelen;
4781 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
4782 IT prelenuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
4783
4784 int * sendcnt = new int[nprocs](); // zero initialize
4785 IT * rows = new IT[prelen];
4786 IT * cols = new IT[prelen];
4787 NT * vals = new NT[prelen];
4788
4789 int rowrank = commGrid->GetRankInProcRow();
4790 int colrank = commGrid->GetRankInProcCol();
4791 int rowneighs = commGrid->GetGridCols();
4792 int colneighs = commGrid->GetGridRows();
4793 IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
4794 IT * locncols = new IT[rowneighs];
4795 locnrows[colrank] = getlocalrows();
4796 locncols[rowrank] = getlocalcols();
4797
4798 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
4799 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetRowWorld());
4800
4801 IT roffset = std::accumulate(locnrows, locnrows+colrank, static_cast<IT>(0));
4802 IT coffset = std::accumulate(locncols, locncols+rowrank, static_cast<IT>(0));
4803
4804 DeleteAll(locnrows, locncols);
4805 for(int i=0; i< prelen; ++i)
4806 {
4807 IT locid; // ignore local id, data will come in order
4808 int owner = nrows.Owner(prelenuntil+i, locid);
4809 sendcnt[owner]++;
4810
4811 rows[i] = Atuples.rowindex(i) + roffset; // need the global row index
4812 cols[i] = Atuples.colindex(i) + coffset; // need the global col index
4813 vals[i] = Atuples.numvalue(i);
4814 }
4815
4816 int * recvcnt = new int[nprocs];
4817 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // get the recv counts
4818
4819 int * sdpls = new int[nprocs](); // displacements (zero initialized pid)
4820 int * rdpls = new int[nprocs]();
4821 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
4822 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
4823
4824 MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4825 MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4826 MPI_Alltoallv(vals, sendcnt, sdpls, MPIType<NT>(), SpHelper::p2a(nvals.arr), recvcnt, rdpls, MPIType<NT>(), commGrid->GetWorld());
4827
4828 DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
4829 DeleteAll(prelens, rows, cols, vals);
4830 distrows = nrows;
4831 distcols = ncols;
4832 distvals = nvals;
4833}
4834
4837template <class IT, class NT, class DER>
4838void SpParMat<IT,NT,DER>::Find (FullyDistVec<IT,IT> & distrows, FullyDistVec<IT,IT> & distcols) const
4839{
4840 if((*(distrows.commGrid) != *(distcols.commGrid)) )
4841 {
4842 SpParHelper::Print("Grids are not comparable, Find() fails!", commGrid->GetWorld());
4843 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
4844 }
4845 IT globallen = getnnz();
4846 SpTuples<IT,NT> Atuples(*spSeq);
4847
4848 FullyDistVec<IT,IT> nrows ( distrows.commGrid, globallen, 0);
4849 FullyDistVec<IT,IT> ncols ( distcols.commGrid, globallen, 0);
4850
4851 IT prelen = Atuples.getnnz();
4852
4853 int rank = commGrid->GetRank();
4854 int nprocs = commGrid->GetSize();
4855 IT * prelens = new IT[nprocs];
4856 prelens[rank] = prelen;
4857 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
4858 IT prelenuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
4859
4860 int * sendcnt = new int[nprocs](); // zero initialize
4861 IT * rows = new IT[prelen];
4862 IT * cols = new IT[prelen];
4863 NT * vals = new NT[prelen];
4864
4865 int rowrank = commGrid->GetRankInProcRow();
4866 int colrank = commGrid->GetRankInProcCol();
4867 int rowneighs = commGrid->GetGridCols();
4868 int colneighs = commGrid->GetGridRows();
4869 IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
4870 IT * locncols = new IT[rowneighs];
4871 locnrows[colrank] = getlocalrows();
4872 locncols[rowrank] = getlocalcols();
4873
4874 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
4875 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetColWorld());
4876 IT roffset = std::accumulate(locnrows, locnrows+colrank, static_cast<IT>(0));
4877 IT coffset = std::accumulate(locncols, locncols+rowrank, static_cast<IT>(0));
4878
4879 DeleteAll(locnrows, locncols);
4880 for(int i=0; i< prelen; ++i)
4881 {
4882 IT locid; // ignore local id, data will come in order
4883 int owner = nrows.Owner(prelenuntil+i, locid);
4884 sendcnt[owner]++;
4885
4886 rows[i] = Atuples.rowindex(i) + roffset; // need the global row index
4887 cols[i] = Atuples.colindex(i) + coffset; // need the global col index
4888 }
4889
4890 int * recvcnt = new int[nprocs];
4891 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // get the recv counts
4892
4893 int * sdpls = new int[nprocs](); // displacements (zero initialized pid)
4894 int * rdpls = new int[nprocs]();
4895 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
4896 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
4897
4898 MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4899 MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4900
4901 DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
4902 DeleteAll(prelens, rows, cols, vals);
4903 distrows = nrows;
4904 distcols = ncols;
4905}
4906
4907template <class IT, class NT, class DER>
4908DER SpParMat<IT,NT,DER>::InducedSubgraphs2Procs(const FullyDistVec<IT,IT>& Assignments, std::vector<IT>& LocalIdxs) const
4909{
4910 int nprocs = commGrid->GetSize();
4911 int myrank = commGrid->GetRank();
4912 int nverts = getnrow();
4913
4914 if (nverts != getncol()) {
4915 SpParHelper::Print("Number of rows and columns differ, not allowed for graphs!\n");
4916 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
4917 }
4918
4919 if (nverts != Assignments.TotalLength()) {
4920 SpParHelper::Print("Assignments vector length does not match number of vertices!\n");
4921 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
4922 }
4923
4924 IT maxproc = Assignments.Reduce(maximum<IT>(), static_cast<IT>(0));
4925
4926 if (maxproc >= static_cast<IT>(nprocs)) {
4927 SpParHelper::Print("Assignments vector assigns to process not not in this group!\n");
4928 MPI_Abort(MPI_COMM_WORLD, INVALIDPARAMS);
4929 }
4930
4931 MPI_Comm World = commGrid->GetWorld();
4932 MPI_Comm RowWorld = commGrid->GetRowWorld();
4933 MPI_Comm ColWorld = commGrid->GetColWorld();
4934
4935 int rowneighs, rowrank;
4936 MPI_Comm_size(RowWorld, &rowneighs);
4937 MPI_Comm_rank(RowWorld, &rowrank);
4938
4939
4940 int mylocsize = Assignments.LocArrSize();
4941 std::vector<int> rowvecs_counts(rowneighs, 0);
4942 std::vector<int> rowvecs_displs(rowneighs, 0);
4943
4944 rowvecs_counts[rowrank] = mylocsize;
4945
4946 MPI_Allgather(MPI_IN_PLACE, 0, MPI_INT, rowvecs_counts.data(), 1, MPI_INT, RowWorld);
4947
4948 /* TODO GRGR: make this resilent if displacements don't fit 32 bits */
4949
4950 std::partial_sum(rowvecs_counts.begin(), rowvecs_counts.end()-1, rowvecs_displs.begin()+1);
4951 size_t rowvecs_size = std::accumulate(rowvecs_counts.begin(), rowvecs_counts.end(), static_cast<size_t>(0));
4952
4953 std::vector<IT> rowvecs(rowvecs_size);
4954
4955 MPI_Allgatherv(Assignments.GetLocArr(), mylocsize, MPIType<IT>(), rowvecs.data(), rowvecs_counts.data(), rowvecs_displs.data(), MPIType<IT>(), RowWorld);
4956
4957 int complement_rank = commGrid->GetComplementRank();
4958 int complement_rowvecs_size = 0;
4959
4960 MPI_Sendrecv(&rowvecs_size, 1, MPI_INT,
4961 complement_rank, TRX,
4962 &complement_rowvecs_size, 1, MPI_INT,
4963 complement_rank, TRX,
4964 World, MPI_STATUS_IGNORE);
4965
4966 std::vector<IT> complement_rowvecs(complement_rowvecs_size);
4967
4968 MPI_Sendrecv(rowvecs.data(), rowvecs_size, MPIType<IT>(),
4969 complement_rank, TRX,
4970 complement_rowvecs.data(), complement_rowvecs_size, MPIType<IT>(),
4971 complement_rank, TRX,
4972 World, MPI_STATUS_IGNORE);
4973
4974 std::vector<std::vector<std::tuple<IT,IT,NT>>> svec(nprocs);
4975
4976 std::vector<int> sendcounts(nprocs, 0);
4977 std::vector<int> recvcounts(nprocs, 0);
4978 std::vector<int> sdispls(nprocs, 0);
4979 std::vector<int> rdispls(nprocs, 0);
4980
4981 int sbuflen = 0;
4982
4983 IT row_offset, col_offset;
4984 GetPlaceInGlobalGrid(row_offset, col_offset);
4985
4986 for (auto colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) {
4987 IT destproc = complement_rowvecs[colit.colid()];
4988 if (destproc != -1)
4989 for (auto nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit) {
4990 if (destproc == rowvecs[nzit.rowid()]) {
4991 svec[destproc].emplace_back(row_offset + nzit.rowid(), col_offset + colit.colid(), nzit.value());
4992 sendcounts[destproc]++;
4993 sbuflen++;
4994 }
4995 }
4996 }
4997
4998 MPI_Alltoall(sendcounts.data(), 1, MPI_INT, recvcounts.data(), 1, MPI_INT, World);
4999
5000 size_t rbuflen = std::accumulate(recvcounts.begin(), recvcounts.end(), static_cast<size_t>(0));
5001
5002 std::partial_sum(sendcounts.begin(), sendcounts.end()-1, sdispls.begin()+1);
5003 std::partial_sum(recvcounts.begin(), recvcounts.end()-1, rdispls.begin()+1);
5004
5005 std::tuple<IT,IT,NT> *sbuf = new std::tuple<IT,IT,NT>[sbuflen];
5006 std::tuple<IT,IT,NT> *rbuf = new std::tuple<IT,IT,NT>[rbuflen];
5007
5008 for (int i = 0; i < nprocs; ++i)
5009 std::copy(svec[i].begin(), svec[i].end(), sbuf + sdispls[i]);
5010
5011
5012 MPI_Alltoallv(sbuf, sendcounts.data(), sdispls.data(), MPIType<std::tuple<IT,IT,NT>>(), rbuf, recvcounts.data(), rdispls.data(), MPIType<std::tuple<IT,IT,NT>>(), World);
5013
5014 delete[] sbuf;
5015
5016 LocalIdxs.clear();
5017 LocalIdxs.shrink_to_fit();
5018
5019 std::unordered_map<IT, IT> locmap;
5020
5021 IT new_id = 0;
5022 IT global_ids[2];
5023
5024 for (int i = 0; i < rbuflen; ++i) {
5025 global_ids[0] = std::get<0>(rbuf[i]);
5026 global_ids[1] = std::get<1>(rbuf[i]);
5027 for (int j = 0; j < 2; ++j) {
5028 if (locmap.find(global_ids[j]) == locmap.end()) {
5029 locmap.insert(std::make_pair(global_ids[j], new_id++));
5030 LocalIdxs.push_back(global_ids[j]);
5031 }
5032 }
5033 std::get<0>(rbuf[i]) = locmap[global_ids[0]];
5034 std::get<1>(rbuf[i]) = locmap[global_ids[1]];
5035 }
5036
5037 IT localdim = LocalIdxs.size();
5038
5039 DER LocalMat;
5040 LocalMat.Create(rbuflen, localdim, localdim, rbuf);
5041
5042 return LocalMat;
5043}
5044
5045template <class IT, class NT, class DER>
5046std::ofstream& SpParMat<IT,NT,DER>::put(std::ofstream& outfile) const
5047{
5048 outfile << (*spSeq) << std::endl;
5049 return outfile;
5050}
5051
5052template <class IU, class NU, class UDER>
5053std::ofstream& operator<<(std::ofstream& outfile, const SpParMat<IU, NU, UDER> & s)
5054{
5055 return s.put(outfile) ; // use the right put() function
5056
5057}
5058
5066template <class IT, class NT,class DER>
5067template <typename LIT>
5068int SpParMat<IT,NT,DER>::Owner(IT total_m, IT total_n, IT grow, IT gcol, LIT & lrow, LIT & lcol) const
5069{
5070 int procrows = commGrid->GetGridRows();
5071 int proccols = commGrid->GetGridCols();
5072 IT m_perproc = total_m / procrows;
5073 IT n_perproc = total_n / proccols;
5074
5075 int own_procrow; // owner's processor row
5076 if(m_perproc != 0)
5077 {
5078 own_procrow = std::min(static_cast<int>(grow / m_perproc), procrows-1); // owner's processor row
5079 }
5080 else // all owned by the last processor row
5081 {
5082 own_procrow = procrows -1;
5083 }
5084 int own_proccol;
5085 if(n_perproc != 0)
5086 {
5087 own_proccol = std::min(static_cast<int>(gcol / n_perproc), proccols-1);
5088 }
5089 else
5090 {
5091 own_proccol = proccols-1;
5092 }
5093 lrow = grow - (own_procrow * m_perproc);
5094 lcol = gcol - (own_proccol * n_perproc);
5095 return commGrid->GetRank(own_procrow, own_proccol);
5096}
5097
5102template <class IT, class NT,class DER>
5103void SpParMat<IT,NT,DER>::GetPlaceInGlobalGrid(IT& rowOffset, IT& colOffset) const
5104{
5105 IT total_rows = getnrow();
5106 IT total_cols = getncol();
5107
5108 int procrows = commGrid->GetGridRows();
5109 int proccols = commGrid->GetGridCols();
5110 IT rows_perproc = total_rows / procrows;
5111 IT cols_perproc = total_cols / proccols;
5112
5113 rowOffset = commGrid->GetRankInProcCol()*rows_perproc;
5114 colOffset = commGrid->GetRankInProcRow()*cols_perproc;
5115}
5116
5117}
int64_t IT
double NT
void convert(string ifname, string ofname, string sort="revsize")
void DeleteAll(A arr1)
Definition Deleter.h:7
std::ostream & operator<<(std::ostream &os, const TwitterEdge &twe)
Definition TwitterEdge.h:59
Definition test.cpp:53
std::shared_ptr< CommGrid > commGrid
SpParMat()
Deprecated. Don't call the default constructor.
Definition SpParMat.cpp:89
int size
Definition common.h:20
int rank
int nprocs
Definition comms.cpp:55
long int64_t
Definition compat.h:21
#define DIMMISMATCH
Definition SpDefs.h:73
#define TROST
Definition SpDefs.h:106
#define INVALIDPARAMS
Definition SpDefs.h:78
#define MEMORYINBYTES
Definition SpDefs.h:130
#define TRNNZ
Definition SpDefs.h:105
#define MAXVERTNAME
Definition SpDefs.h:68
#define TRROWX
Definition SpDefs.h:101
#define TRTAGVALS
Definition SpDefs.h:95
#define TRX
Definition SpDefs.h:103
#define TRTAGN
Definition SpDefs.h:92
#define NOFILE
Definition SpDefs.h:75
#define GRIDMISMATCH
Definition SpDefs.h:72
#define TRTAGROWS
Definition SpDefs.h:93
#define MEM_EFFICIENT_STAGES
Definition SpDefs.h:67
#define TRTAGM
Definition SpDefs.h:91
#define TRTAGNZ
Definition SpDefs.h:90
#define TRI
Definition SpDefs.h:104
#define TRTAGCOLS
Definition SpDefs.h:94
#define mm_is_complex(typecode)
Definition mmio.h:38
char MM_typecode[4]
Definition mmio.h:16
#define mm_is_hermitian(typecode)
Definition mmio.h:46
char * mm_typecode_to_str(MM_typecode matcode)
#define mm_is_real(typecode)
Definition mmio.h:39
#define mm_is_pattern(typecode)
Definition mmio.h:40
#define mm_is_integer(typecode)
Definition mmio.h:41
int mm_read_mtx_crd_size(FILE *f, int64_t *M, int64_t *N, int64_t *nz, int64_t *numlinesread)
int mm_read_banner(FILE *f, MM_typecode *matcode)
#define mm_is_symmetric(typecode)
Definition mmio.h:43
#define MAXCOLUMNBATCH
@ Column
Definition SpDefs.h:115
HeaderInfo ParseHeader(const std::string &inputname, FILE *&f, int &seeklength)
Definition FileHeader.h:52
Dcsc< IU, typename promote_trait< NU1, NU2 >::T_promote > SetDifference(const Dcsc< IU, NU1 > &A, const Dcsc< IU, NU2 > *B)
Definition Friends.h:748
void AllGatherVector(MPI_Comm &ColWorld, int trxlocnz, IU lenuntil, int32_t *&trxinds, NV *&trxnums, int32_t *&indacc, NV *&numacc, int &accnz, bool indexisvalue)
void TransposeVector(MPI_Comm &World, const FullyDistSpVec< IU, NV > &x, int32_t &trxlocnz, IU &lenuntil, int32_t *&trxinds, NV *&trxnums, bool indexisvalue)
shared_ptr< CommGrid > ProductGrid(CommGrid *gridA, CommGrid *gridB, int &innerdim, int &Aoffset, int &Boffset)
Definition CommGrid.cpp:164
Collection of Generic Sequential Functions.
double A
unsigned int uint32_t
Definition stdint.h:80
signed int int32_t
Definition stdint.h:77
unsigned __int64 uint64_t
Definition stdint.h:90