40#include <parallel/algorithm>
41#include <parallel/numeric>
44#include "usort/parUtils.h"
48template <
class IT,
class NT>
53template <
class IT,
class NT>
54FullyDistSpVec<IT, NT>::FullyDistSpVec ( std::shared_ptr<CommGrid> grid,
IT globallen)
58template <
class IT,
class NT>
59FullyDistSpVec<IT,NT>::FullyDistSpVec ()
63template <
class IT,
class NT>
64FullyDistSpVec<IT,NT>::FullyDistSpVec (
IT globallen)
69template <
class IT,
class NT>
70FullyDistSpVec<IT,NT> & FullyDistSpVec<IT,NT>::operator=(
const FullyDistSpVec< IT,NT > & rhs)
74 FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value,
NT >::type>::operator= (rhs);
81template <
class IT,
class NT>
82FullyDistSpVec<IT,NT>::FullyDistSpVec (
const FullyDistVec<IT,NT> & rhs)
89template <
class IT,
class NT>
90template <
typename _UnaryOperation>
91FullyDistSpVec<IT,NT>::FullyDistSpVec (
const FullyDistVec<IT,NT> & rhs, _UnaryOperation unop)
96 std::vector<IT>().swap(ind);
97 std::vector<NT>().swap(num);
98 IT vecsize = rhs.LocArrSize();
99 for(
IT i=0; i< vecsize; ++i)
104 num.push_back(rhs.arr[i]);
112template <
class IT,
class NT>
113FullyDistSpVec<IT,NT>::FullyDistSpVec (std::shared_ptr<CommGrid> grid,
IT globallen,
const std::vector<IT>& indvec,
const std::vector<NT> & numvec,
bool SumDuplicates,
bool sorted)
117 assert(indvec.size()==numvec.size());
118 IT vecsize = indvec.size();
121 std::vector< std::pair<IT,NT> > tosort(vecsize);
123#pragma omp parallel for
125 for(
IT i=0; i<vecsize; ++i)
127 tosort[i] = std::make_pair(indvec[i], numvec[i]);
130#if defined(GNU_PARALLEL) && defined(_OPENMP)
131 __gnu_parallel::sort(tosort.begin(), tosort.end());
133 std::sort(tosort.begin(), tosort.end());
137 ind.reserve(vecsize);
138 num.reserve(vecsize);
140 for(
auto itr = tosort.begin(); itr != tosort.end(); ++itr)
142 if(lastIndex!=itr->first)
144 ind.push_back(itr->first);
145 num.push_back(itr->second);
146 lastIndex = itr->first;
148 else if(SumDuplicates)
150 num.back() += itr->second;
158 ind.reserve(vecsize);
159 num.reserve(vecsize);
162 for(
IT i=0; i< vecsize; ++i)
164 if(lastIndex!=indvec[i])
166 ind.push_back(indvec[i]);
167 num.push_back(numvec[i]);
168 lastIndex = indvec[i];
170 else if(SumDuplicates)
172 num.back() += numvec[i];
182template <
class IT,
class NT>
183FullyDistSpVec<IT,NT> & FullyDistSpVec<IT,NT>::operator=(
const FullyDistVec< IT,NT > & rhs)
185 FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value,
NT >::type>::operator= (rhs);
187 std::vector<IT>().swap(ind);
188 std::vector<NT>().swap(num);
189 IT vecsize = rhs.LocArrSize();
190 for(
IT i=0; i< vecsize; ++i)
194 num.push_back(rhs.arr[i]);
208template <
class IT,
class NT>
209FullyDistSpVec<IT,NT>::FullyDistSpVec (
IT globallen,
const FullyDistVec<IT,IT> & inds,
const FullyDistVec<IT,NT> & vals,
bool SumDuplicates)
210: FullyDist<
IT,
NT,typename
combblas::disable_if<
combblas::is_boolean<
NT>::value,
NT >::type>(inds.commGrid,globallen)
212 if(*(inds.commGrid) != *(vals.commGrid))
214 SpParHelper::Print(
"Grids are not comparable, FullyDistSpVec() fails !");
217 if(inds.TotalLength() != vals.TotalLength())
219 SpParHelper::Print(
"Index and value vectors have different sizes, FullyDistSpVec() fails !");
225 IT maxind = inds.Reduce(maximum<IT>(), (
IT) 0);
226 if(maxind>=globallen)
228 SpParHelper::Print(
"At least one index is greater than globallen, FullyDistSpVec() fails !");
234 MPI_Comm World = commGrid->GetWorld();
235 int nprocs = commGrid->GetSize();
236 int * rdispls =
new int[
nprocs];
237 int * recvcnt =
new int[
nprocs];
238 int * sendcnt =
new int[
nprocs]();
239 int * sdispls =
new int[
nprocs];
242 IT locsize = inds.LocArrSize();
243 for(
IT i=0; i<locsize; ++i)
246 int owner = Owner(inds.arr[i], locind);
249 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
256 for(
int i=0; i<
nprocs-1; ++i)
258 sdispls[i+1] = sdispls[i] + sendcnt[i];
259 rdispls[i+1] = rdispls[i] + recvcnt[i];
265 NT * datbuf =
new NT[locsize];
266 IT * indbuf =
new IT[locsize];
267 int *count =
new int[
nprocs]();
268 for(
IT i=0; i < locsize; ++i)
271 int owner = Owner(inds.arr[i], locind);
272 int id = sdispls[owner] + count[owner];
273 datbuf[id] = vals.arr[i];
278 IT totrecv = std::accumulate(recvcnt,recvcnt+
nprocs,
static_cast<IT>(0));
283 NT * recvdatbuf =
new NT[totrecv];
284 MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
287 IT * recvindbuf =
new IT[totrecv];
288 MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
294 std::vector< std::pair<IT,NT> > tosort;
295 tosort.resize(totrecv);
296 for(
int i=0; i<totrecv; ++i)
298 tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
301 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
302 std::sort(tosort.begin(), tosort.end());
307 ind.reserve(totrecv);
308 num.reserve(totrecv);
310 for(
auto itr = tosort.begin(); itr != tosort.end(); ++itr)
312 if(lastIndex!=itr->first)
314 ind.push_back(itr->first);
315 num.push_back(itr->second);
316 lastIndex = itr->first;
318 else if(SumDuplicates)
320 num.back() += itr->second;
328template <
class IT,
class NT>
329template <
typename _Predicate>
330FullyDistVec<IT,NT> FullyDistSpVec<IT,NT>::FindVals(_Predicate pred)
const
332 FullyDistVec<IT,NT> found(commGrid);
333 MPI_Comm World = commGrid->GetWorld();
334 int nprocs = commGrid->GetSize();
335 int rank = commGrid->GetRank();
337 IT sizelocal = getlocnnz();
338 for(
IT i=0; i<sizelocal; ++i)
342 found.arr.push_back(num[i]);
346 IT nsize = found.arr.size();
348 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
349 IT lengthuntil = std::accumulate(dist, dist+
rank,
static_cast<IT>(0));
350 found.glen = std::accumulate(dist, dist+
nprocs,
static_cast<IT>(0));
356 int * sendcnt =
new int[
nprocs];
357 std::fill(sendcnt, sendcnt+
nprocs, 0);
358 for(
IT i=0; i<nsize; ++i)
361 int owner = found.Owner(i+lengthuntil, locind);
364 int * recvcnt =
new int[
nprocs];
365 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
367 int * sdispls =
new int[
nprocs];
368 int * rdispls =
new int[
nprocs];
371 for(
int i=0; i<
nprocs-1; ++i)
373 sdispls[i+1] = sdispls[i] + sendcnt[i];
374 rdispls[i+1] = rdispls[i] + recvcnt[i];
376 IT totrecv = std::accumulate(recvcnt,recvcnt+
nprocs,
static_cast<IT>(0));
377 std::vector<NT> recvbuf(totrecv);
380 MPI_Alltoallv(found.arr.data(), sendcnt, sdispls, MPIType<NT>(), recvbuf.data(), recvcnt, rdispls, MPIType<NT>(), World);
381 found.arr.swap(recvbuf);
383 DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
392template <
class IT,
class NT>
393template <
typename _Predicate>
394FullyDistVec<IT,IT> FullyDistSpVec<IT,NT>::FindInds(_Predicate pred)
const
396 FullyDistVec<IT,IT> found(commGrid);
397 MPI_Comm World = commGrid->GetWorld();
398 int nprocs = commGrid->GetSize();
399 int rank = commGrid->GetRank();
401 IT sizelocal = getlocnnz();
402 IT sizesofar = LengthUntil();
403 for(
IT i=0; i<sizelocal; ++i)
407 found.arr.push_back(ind[i]+sizesofar);
411 IT nsize = found.arr.size();
413 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
414 IT lengthuntil = std::accumulate(dist, dist+
rank,
static_cast<IT>(0));
415 found.glen = std::accumulate(dist, dist+
nprocs,
static_cast<IT>(0));
421 int * sendcnt =
new int[
nprocs];
422 std::fill(sendcnt, sendcnt+
nprocs, 0);
423 for(
IT i=0; i<nsize; ++i)
426 int owner = found.Owner(i+lengthuntil, locind);
429 int * recvcnt =
new int[
nprocs];
430 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
432 int * sdispls =
new int[
nprocs];
433 int * rdispls =
new int[
nprocs];
436 for(
int i=0; i<
nprocs-1; ++i)
438 sdispls[i+1] = sdispls[i] + sendcnt[i];
439 rdispls[i+1] = rdispls[i] + recvcnt[i];
441 IT totrecv = std::accumulate(recvcnt,recvcnt+
nprocs,
static_cast<IT>(0));
442 std::vector<IT> recvbuf(totrecv);
445 MPI_Alltoallv(found.arr.data(), sendcnt, sdispls, MPIType<IT>(), recvbuf.data(), recvcnt, rdispls, MPIType<IT>(), World);
446 found.arr.swap(recvbuf);
448 DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
455template <
class IT,
class NT>
456void FullyDistSpVec<IT,NT>::stealFrom(FullyDistSpVec<IT,NT> & victim)
458 FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value,
NT >::type>::operator= (victim);
459 ind.swap(victim.ind);
460 num.swap(victim.num);
463template <
class IT,
class NT>
464NT FullyDistSpVec<IT,NT>::operator[](
IT indx)
468 int owner = Owner(indx, locind);
470 if(commGrid->GetRank() == owner)
472 typename std::vector<IT>::const_iterator it = std::lower_bound(ind.begin(), ind.end(), locind);
473 if(it != ind.end() && locind == (*it))
475 val = num[it-ind.begin()];
484 MPI_Bcast(&found, 1, MPI_INT, owner, commGrid->GetWorld());
485 MPI_Bcast(&val, 1, MPIType<NT>(), owner, commGrid->GetWorld());
490template <
class IT,
class NT>
491NT FullyDistSpVec<IT,NT>::GetLocalElement(
IT indx)
495 int owner = Owner(indx, locind);
497 typename std::vector<IT>::const_iterator it = std::lower_bound(ind.begin(), ind.end(), locind);
498 if(commGrid->GetRank() == owner) {
499 if(it != ind.end() && locind == (*it))
501 val = num[it-ind.begin()];
510template <
class IT,
class NT>
511void FullyDistSpVec<IT,NT>::SetElement (
IT indx,
NT numx)
514 SpParHelper::Print(
"WARNING: SetElement() called on a vector with zero length\n");
517 int owner = Owner(indx, locind);
518 if(commGrid->GetRank() == owner)
520 typename std::vector<IT>::iterator iter = std::lower_bound(ind.begin(), ind.end(), locind);
521 if(iter == ind.end())
523 ind.push_back(locind);
526 else if (locind < *iter)
530 num.insert(num.begin() + (iter-ind.begin()), numx);
531 ind.insert(iter, locind);
535 *(num.begin() + (iter-ind.begin())) = numx;
540template <
class IT,
class NT>
541void FullyDistSpVec<IT,NT>::DelElement (
IT indx)
544 int owner = Owner(indx, locind);
545 if(commGrid->GetRank() == owner)
547 typename std::vector<IT>::iterator iter = std::lower_bound(ind.begin(), ind.end(), locind);
548 if(iter != ind.end() && !(locind < *iter))
550 num.erase(num.begin() + (iter-ind.begin()));
564template <
class IT,
class NT>
565FullyDistVec<IT,NT> FullyDistSpVec<IT,NT>::operator() (
const FullyDistVec<IT,IT> & ri)
const
567 MPI_Comm World = commGrid->GetWorld();
568 FullyDistVec<IT,NT> Indexed(ri.commGrid, ri.glen,
NT());
570 MPI_Comm_size(World, &
nprocs);
571 std::unordered_map<IT, IT> revr_map;
572 std::vector< std::vector<IT> > data_req(
nprocs);
573 IT locnnz = ri.LocArrSize();
578 for(
IT i=0; i < locnnz; ++i)
580 if(ri.arr[i] >= glen || ri.arr[i] < 0)
585 MPI_Allreduce( &local, &whole, 1, MPI_INT, MPI_BAND, World);
588 throw outofrangeexception();
591 for(
IT i=0; i < locnnz; ++i)
594 int owner = Owner(ri.arr[i], locind);
595 data_req[owner].push_back(locind);
596 revr_map.insert(
typename std::unordered_map<IT, IT>::value_type(locind, i));
598 IT * sendbuf =
new IT[locnnz];
599 int * sendcnt =
new int[
nprocs];
600 int * sdispls =
new int[
nprocs];
601 for(
int i=0; i<
nprocs; ++i)
602 sendcnt[i] = data_req[i].
size();
604 int * rdispls =
new int[
nprocs];
605 int * recvcnt =
new int[
nprocs];
606 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
610 for(
int i=0; i<
nprocs-1; ++i)
612 sdispls[i+1] = sdispls[i] + sendcnt[i];
613 rdispls[i+1] = rdispls[i] + recvcnt[i];
615 IT totrecv = std::accumulate(recvcnt,recvcnt+
nprocs,0);
616 IT * recvbuf =
new IT[totrecv];
618 for(
int i=0; i<
nprocs; ++i)
620 std::copy(data_req[i].begin(), data_req[i].end(), sendbuf+sdispls[i]);
621 std::vector<IT>().swap(data_req[i]);
623 MPI_Alltoallv(sendbuf, sendcnt, sdispls, MPIType<IT>(), recvbuf, recvcnt, rdispls, MPIType<IT>(), World);
628 IT * indsback =
new IT[totrecv];
629 NT * databack =
new NT[totrecv];
631 int * ddispls =
new int[
nprocs];
632 std::copy(rdispls, rdispls+
nprocs, ddispls);
633 for(
int i=0; i<
nprocs; ++i)
636 IT * it = std::set_intersection(recvbuf+rdispls[i], recvbuf+rdispls[i]+recvcnt[i], ind.begin(), ind.end(), indsback+rdispls[i]);
637 recvcnt[i] = (it - (indsback+rdispls[i]));
640 for(
int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j)
643 while(indsback[j] > ind[vi])
645 databack[j] = num[vi++];
650 NT * databuf =
new NT[ri.LocArrSize()];
652 MPI_Alltoall(recvcnt, 1, MPI_INT, sendcnt, 1, MPI_INT, World);
653 MPI_Alltoallv(indsback, recvcnt, rdispls, MPIType<IT>(), sendbuf, sendcnt, sdispls, MPIType<IT>(), World);
654 MPI_Alltoallv(databack, recvcnt, rdispls, MPIType<NT>(), databuf, sendcnt, sdispls, MPIType<NT>(), World);
655 DeleteAll(rdispls, recvcnt, indsback, databack);
659 for(
int i=0; i<
nprocs; ++i)
664 for(
int j=sdispls[i]; j< sdispls[i]+sendcnt[i]; ++j)
666 typename std::unordered_map<IT,IT>::iterator it = revr_map.find(sendbuf[j]);
667 Indexed.arr[it->second] = databuf[j];
671 DeleteAll(sdispls, sendcnt, sendbuf, databuf);
675template <
class IT,
class NT>
676void FullyDistSpVec<IT,NT>::iota(
IT globalsize,
NT first)
679 IT length = MyLocLength();
682 SpHelper::iota(ind.begin(), ind.end(), 0);
683 SpHelper::iota(num.begin(), num.end(), LengthUntil() + first);
688template <
class IT,
class NT>
689void FullyDistSpVec<IT,NT>::nziota(
NT first)
691 std::iota(num.begin(), num.end(), NnzUntil() + first);
695template <
class IT,
class NT>
696IT FullyDistSpVec<IT,NT>::NnzUntil()
const
698 IT mynnz = ind.size();
700 MPI_Scan(&mynnz, &prevnnz, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
701 return (prevnnz - mynnz);
768template <
class IT,
class NT>
769FullyDistSpVec<IT, IT> FullyDistSpVec<IT, NT>::sort()
771 MPI_Comm World = commGrid->GetWorld();
772 FullyDistSpVec<IT,IT> temp(commGrid);
773 if(getnnz()==0)
return temp;
774 IT nnz = getlocnnz();
775 std::pair<NT,IT> * vecpair =
new std::pair<NT,IT>[nnz];
780 MPI_Comm_size(World, &
nprocs);
781 MPI_Comm_rank(World, &
rank);
785 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
786 IT until = LengthUntil();
788#pragma omp parallel for
790 for(
IT i=0; i< nnz; ++i)
792 vecpair[i].first = num[i];
793 vecpair[i].second = ind[i] + until;
796 std::vector<std::pair<NT,IT>> sorted = SpParHelper::KeyValuePSort(vecpair, nnz, dist, World);
799 temp.num.resize(nnz);
800 temp.ind.resize(nnz);
803#pragma omp parallel for
805 for(
IT i=0; i< nnz; ++i)
809 temp.num[i] = sorted[i].second;
888template <
class IT,
class NT>
889template <
typename _BinaryOperation >
890FullyDistSpVec<IT,NT> FullyDistSpVec<IT, NT>::UniqAll2All(_BinaryOperation __binary_op, MPI_Op mympiop)
892 MPI_Comm World = commGrid->GetWorld();
893 int nprocs = commGrid->GetSize();
895 std::vector< std::vector< NT > > datsent(
nprocs);
896 std::vector< std::vector< IT > > indsent(
nprocs);
899 size_t locvec = num.size();
900 IT lenuntil = LengthUntil();
901 for(
size_t i=0; i< locvec; ++i)
905 double range =
static_cast<double>(myhash) *
static_cast<double>(glen);
906 NT mapped = range /
static_cast<double>(std::numeric_limits<uint64_t>::max());
907 int owner = Owner(mapped, locind);
909 datsent[owner].push_back(num[i]);
910 indsent[owner].push_back(ind[i]+lenuntil);
912 int * sendcnt =
new int[
nprocs];
913 int * sdispls =
new int[
nprocs];
914 for(
int i=0; i<
nprocs; ++i)
915 sendcnt[i] = (
int) datsent[i].size();
917 int * rdispls =
new int[
nprocs];
918 int * recvcnt =
new int[
nprocs];
919 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
922 for(
int i=0; i<
nprocs-1; ++i)
924 sdispls[i+1] = sdispls[i] + sendcnt[i];
925 rdispls[i+1] = rdispls[i] + recvcnt[i];
927 NT * datbuf =
new NT[locvec];
928 for(
int i=0; i<
nprocs; ++i)
930 std::copy(datsent[i].begin(), datsent[i].end(), datbuf+sdispls[i]);
931 std::vector<NT>().swap(datsent[i]);
933 IT * indbuf =
new IT[locvec];
934 for(
int i=0; i<
nprocs; ++i)
936 std::copy(indsent[i].begin(), indsent[i].end(), indbuf+sdispls[i]);
937 std::vector<IT>().swap(indsent[i]);
939 IT totrecv = std::accumulate(recvcnt,recvcnt+
nprocs,
static_cast<IT>(0));
940 NT * recvdatbuf =
new NT[totrecv];
941 MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
944 IT * recvindbuf =
new IT[totrecv];
945 MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
948 std::vector< std::pair<NT,IT> > tosort;
950 for(
int i=0; i<
nprocs; ++i)
952 for(
int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j)
954 tosort.push_back(std::make_pair(recvdatbuf[j], recvindbuf[j]));
958 std::sort(tosort.begin(), tosort.end());
960 typename std::vector< std::pair<NT,IT> >::iterator last;
961 last = std::unique (tosort.begin(), tosort.end(), equal_first<NT,IT>());
963 std::vector< std::vector< NT > > datback(
nprocs);
964 std::vector< std::vector< IT > > indback(
nprocs);
966 for(
typename std::vector< std::pair<NT,IT> >::iterator itr = tosort.begin(); itr != last; ++itr)
969 int owner = Owner(itr->second, locind);
971 datback[owner].push_back(itr->first);
972 indback[owner].push_back(locind);
974 for(
int i=0; i<
nprocs; ++i) sendcnt[i] = (
int) datback[i].size();
975 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
976 for(
int i=0; i<
nprocs-1; ++i)
978 sdispls[i+1] = sdispls[i] + sendcnt[i];
979 rdispls[i+1] = rdispls[i] + recvcnt[i];
981 datbuf =
new NT[tosort.size()];
982 for(
int i=0; i<
nprocs; ++i)
984 std::copy(datback[i].begin(), datback[i].end(), datbuf+sdispls[i]);
985 std::vector<NT>().swap(datback[i]);
987 indbuf =
new IT[tosort.size()];
988 for(
int i=0; i<
nprocs; ++i)
990 std::copy(indback[i].begin(), indback[i].end(), indbuf+sdispls[i]);
991 std::vector<IT>().swap(indback[i]);
993 totrecv = std::accumulate(recvcnt,recvcnt+
nprocs,
static_cast<IT>(0));
995 recvdatbuf =
new NT[totrecv];
996 MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
999 recvindbuf =
new IT[totrecv];
1000 MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
1003 FullyDistSpVec<IT,NT> Indexed(commGrid, glen);
1005 std::vector< std::pair<IT,NT> > back2sort;
1006 for(
int i=0; i<
nprocs; ++i)
1008 for(
int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j)
1010 back2sort.push_back(std::make_pair(recvindbuf[j], recvdatbuf[j]));
1013 std::sort(back2sort.begin(), back2sort.end());
1014 for(
typename std::vector< std::pair<IT,NT> >::iterator itr = back2sort.begin(); itr != back2sort.end(); ++itr)
1016 Indexed.ind.push_back(itr->first);
1017 Indexed.num.push_back(itr->second);
1020 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
1027template <
class IT,
class NT>
1028template <
typename _BinaryOperation >
1029FullyDistSpVec<IT,NT> FullyDistSpVec<IT, NT>::Uniq(_BinaryOperation __binary_op, MPI_Op mympiop)
1031 return UniqAll2All(__binary_op, mympiop);
1034template <
class IT,
class NT>
1035FullyDistSpVec<IT,NT> & FullyDistSpVec<IT, NT>::operator+=(
const FullyDistSpVec<IT,NT> & rhs)
1039 if(glen != rhs.glen)
1041 std::cerr <<
"Vector dimensions don't match for addition\n";
1044 IT lsize = getlocnnz();
1045 IT rsize = rhs.getlocnnz();
1047 std::vector< IT > nind;
1048 std::vector< NT > nnum;
1049 nind.reserve(lsize+rsize);
1050 nnum.reserve(lsize+rsize);
1053 while(i < lsize && j < rsize)
1056 if(ind[i] > rhs.ind[j])
1058 nind.push_back( rhs.ind[j] );
1059 nnum.push_back( rhs.num[j++] );
1061 else if(ind[i] < rhs.ind[j])
1063 nind.push_back( ind[i] );
1064 nnum.push_back( num[i++] );
1068 nind.push_back( ind[i] );
1069 nnum.push_back( num[i++] + rhs.num[j++] );
1074 nind.push_back( ind[i] );
1075 nnum.push_back( num[i++] );
1079 nind.push_back( rhs.ind[j] );
1080 nnum.push_back( rhs.num[j++] );
1087 typename std::vector<NT>::iterator it;
1088 for(it = num.begin(); it != num.end(); ++it)
1093template <
class IT,
class NT>
1094FullyDistSpVec<IT,NT> & FullyDistSpVec<IT, NT>::operator-=(
const FullyDistSpVec<IT,NT> & rhs)
1098 if(glen != rhs.glen)
1100 std::cerr <<
"Vector dimensions don't match for addition\n";
1103 IT lsize = getlocnnz();
1104 IT rsize = rhs.getlocnnz();
1105 std::vector< IT > nind;
1106 std::vector< NT > nnum;
1107 nind.reserve(lsize+rsize);
1108 nnum.reserve(lsize+rsize);
1111 while(i < lsize && j < rsize)
1114 if(ind[i] > rhs.ind[j])
1116 nind.push_back( rhs.ind[j] );
1117 nnum.push_back( -
static_cast<NT>(rhs.num[j++]) );
1119 else if(ind[i] < rhs.ind[j])
1121 nind.push_back( ind[i] );
1122 nnum.push_back( num[i++] );
1126 nind.push_back( ind[i] );
1127 nnum.push_back( num[i++] - rhs.num[j++] );
1132 nind.push_back( ind[i] );
1133 nnum.push_back( num[i++] );
1137 nind.push_back( rhs.ind[j] );
1138 nnum.push_back(
NT() - (rhs.num[j++]) );
1152template <
class IT,
class NT>
1153template <
typename _BinaryOperation>
1154void FullyDistSpVec<IT,NT>::SparseCommon(std::vector< std::vector < std::pair<IT,NT> > > & data, _BinaryOperation BinOp)
1156 int nprocs = commGrid->GetSize();
1157 int * sendcnt =
new int[
nprocs];
1158 int * recvcnt =
new int[
nprocs];
1159 for(
int i=0; i<
nprocs; ++i)
1160 sendcnt[i] = data[i].
size();
1162 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());
1163 int * sdispls =
new int[
nprocs]();
1164 int * rdispls =
new int[
nprocs]();
1165 std::partial_sum(sendcnt, sendcnt+
nprocs-1, sdispls+1);
1166 std::partial_sum(recvcnt, recvcnt+
nprocs-1, rdispls+1);
1171 std::pair<IT,NT> * senddata =
new std::pair<IT,NT>[totsend];
1172 for(
int i=0; i<
nprocs; ++i)
1174 std::copy(data[i].begin(), data[i].end(), senddata+sdispls[i]);
1175 std::vector< std::pair<IT,NT> >().swap(data[i]);
1177 MPI_Datatype MPI_pair;
1178 MPI_Type_contiguous(
sizeof(std::tuple<IT,NT>), MPI_CHAR, &MPI_pair);
1179 MPI_Type_commit(&MPI_pair);
1181 std::pair<IT,NT> * recvdata =
new std::pair<IT,NT>[totrecv];
1182 MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_pair, recvdata, recvcnt, rdispls, MPI_pair, commGrid->GetWorld());
1184 DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
1185 MPI_Type_free(&MPI_pair);
1187 if(!
is_sorted(recvdata, recvdata+totrecv))
1188 std::sort(recvdata, recvdata+totrecv);
1190 ind.push_back(recvdata[0].first);
1191 num.push_back(recvdata[0].second);
1192 for(
IT i=1; i< totrecv; ++i)
1194 if(ind.back() == recvdata[i].first)
1196 num.back() = BinOp(num.back(), recvdata[i].second);
1200 ind.push_back(recvdata[i].first);
1201 num.push_back(recvdata[i].second);
1207template <
class IT,
class NT>
1208template <
typename _BinaryOperation>
1209void FullyDistSpVec<IT,NT>::ParallelRead (
const std::string & filename,
bool onebased, _BinaryOperation BinOp)
1215 int myrank = commGrid->GetRank();
1216 int nprocs = commGrid->GetSize();
1219 if((f = fopen(filename.c_str(),
"r"))==NULL)
1221 std::cout <<
"File failed to open\n";
1222 MPI_Abort(commGrid->commWorld,
NOFILE);
1226 fscanf(f,
"%lld %lld\n", &glen, &gnnz);
1228 std::cout <<
"Total number of nonzeros expected across all processors is " << gnnz << std::endl;
1231 MPI_Bcast(&glen, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
1232 MPI_Bcast(&gnnz, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
1236 if (stat(filename.c_str(), &st) == -1)
1238 MPI_Abort(commGrid->commWorld,
NOFILE);
1240 int64_t file_size = st.st_size;
1241 MPI_Offset fpos, end_fpos;
1244 std::cout <<
"File is " << file_size <<
" bytes" << std::endl;
1250 fpos = myrank * file_size /
nprocs;
1252 if(myrank != (
nprocs-1)) end_fpos = (myrank + 1) * file_size /
nprocs;
1253 else end_fpos = file_size;
1254 MPI_Barrier(commGrid->commWorld);
1257 MPI_File_open (commGrid->commWorld,
const_cast<char*
>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
1259 std::vector< std::vector < std::pair<IT,NT> > > data(
nprocs);
1261 std::vector<std::string> lines;
1263 SpParHelper::Print(
"Fetching first piece\n");
1265 MPI_Barrier(commGrid->commWorld);
1266 bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos,
true, lines, myrank);
1267 int64_t entriesread = lines.size();
1268 SpParHelper::Print(
"Fetched first piece\n");
1270 MPI_Barrier(commGrid->commWorld);
1274 for (
auto itr=lines.begin(); itr != lines.end(); ++itr)
1276 std::stringstream ss(*itr);
1280 int owner = Owner(ii, locind);
1281 data[owner].push_back(std::make_pair(locind,vv));
1286 finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos,
false, lines, myrank);
1287 entriesread += lines.size();
1288 for (
auto itr=lines.begin(); itr != lines.end(); ++itr)
1290 std::stringstream ss(*itr);
1294 int owner = Owner(ii, locind);
1295 data[owner].push_back(std::make_pair(locind,vv));
1299 MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
1300#ifdef COMBBLAS_DEBUG
1302 std::cout <<
"Reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
1305 SparseCommon(data, BinOp);
1308template <
class IT,
class NT>
1309template <
class HANDLER>
1310void FullyDistSpVec<IT,NT>::ParallelWrite(
const std::string & filename,
bool onebased, HANDLER handler,
bool includeindices,
bool includeheader)
1312 int myrank = commGrid->GetRank();
1313 int nprocs = commGrid->GetSize();
1314 IT totalLength = TotalLength();
1315 IT totalNNZ = getnnz();
1317 std::stringstream ss;
1318 if(includeheader && myrank == 0)
1320 ss << totalLength <<
'\t' << totalNNZ <<
'\n';
1322 IT entries = getlocnnz();
1324 MPI_Exscan( &entries, &sizeuntil, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld() );
1325 if(myrank == 0) sizeuntil = 0;
1329 if(onebased) sizeuntil += 1;
1331 for(
IT i=0; i< entries; ++i)
1333 ss << ind[i]+sizeuntil <<
'\t';
1334 handler.save(ss, num[i], ind[i]+sizeuntil);
1341 for(
IT i=0; i< entries; ++i)
1343 handler.save(ss, num[i], dummy);
1348 std::string text = ss.str();
1351 bytes[myrank] = text.size();
1352 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<int64_t>(), bytes, 1, MPIType<int64_t>(), commGrid->GetWorld());
1353 int64_t bytesuntil = std::accumulate(bytes, bytes+myrank,
static_cast<int64_t>(0));
1358 std::ofstream ofs(filename.c_str(), std::ios::binary | std::ios::out);
1359#ifdef COMBBLAS_DEBUG
1360 std::cout <<
"Creating file with " << bytestotal <<
" bytes" << std::endl;
1362 ofs.seekp(bytestotal - 1);
1366 MPI_Barrier(commGrid->GetWorld());
1369 if (stat(filename.c_str(), &st) == -1)
1371 MPI_Abort(commGrid->GetWorld(),
NOFILE);
1375#ifdef COMBBLAS_DEBUG
1376 std::cout <<
"File is actually " << st.st_size <<
" bytes seen from process " << myrank << std::endl;
1381 if ((ffinal = fopen(filename.c_str(),
"rb+")) == NULL)
1383 printf(
"COMBBLAS: Vector output file %s failed to open at process %d\n", filename.c_str(), myrank);
1384 MPI_Abort(commGrid->GetWorld(),
NOFILE);
1386 fseek (ffinal , bytesuntil , SEEK_SET );
1387 fwrite(text.c_str(),1, bytes[myrank] ,ffinal);
1395template <
class IT,
class NT>
1396template <
class HANDLER>
1397std::ifstream& FullyDistSpVec<IT,NT>::ReadDistribute (std::ifstream& infile,
int master, HANDLER handler)
1400 MPI_Comm World = commGrid->GetWorld();
1401 int neighs = commGrid->GetSize();
1404 int * displs =
new int[neighs];
1405 for (
int i=0; i<neighs; ++i)
1406 displs[i] = i*buffperneigh;
1408 int * curptrs = NULL;
1412 int rank = commGrid->GetRank();
1415 inds =
new IT [ buffperneigh * neighs ];
1416 vals =
new NT [ buffperneigh * neighs ];
1417 curptrs =
new int[neighs];
1418 std::fill_n(curptrs, neighs, 0);
1419 if (infile.is_open())
1423 IT numrows, numcols;
1424 bool indIsRow =
true;
1425 infile >> numrows >> numcols >> total_nnz;
1436 MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1439 IT temprow, tempcol;
1441 while ( (!infile.eof()) && cnz < total_nnz)
1443 infile >> temprow >> tempcol;
1450 int rec = Owner(tempind, locind);
1451 inds[ rec * buffperneigh + curptrs[rec] ] = locind;
1452 vals[ rec * buffperneigh + curptrs[rec] ] = handler.read(infile, tempind);
1455 if(curptrs[rec] == buffperneigh || (cnz == (total_nnz-1)) )
1458 MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1461 IT * tempinds =
new IT[recvcount];
1462 NT * tempvals =
new NT[recvcount];
1465 MPI_Scatterv(inds, curptrs, displs, MPIType<IT>(), tempinds, recvcount, MPIType<IT>(), master, World);
1466 MPI_Scatterv(vals, curptrs, displs, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), master, World);
1469 for(
IT i=0; i< recvcount; ++i)
1471 ind.push_back( tempinds[i] );
1472 num.push_back( tempvals[i] );
1476 std::fill_n(curptrs, neighs, 0);
1481 assert (cnz == total_nnz);
1484 std::fill_n(curptrs, neighs, std::numeric_limits<int>::max());
1485 MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1490 MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1496 MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1500 MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1502 if( recvcount == std::numeric_limits<int>::max())
1506 IT * tempinds =
new IT[recvcount];
1507 NT * tempvals =
new NT[recvcount];
1510 MPI_Scatterv(inds, curptrs, displs, MPIType<IT>(), tempinds, recvcount, MPIType<IT>(), master, World);
1511 MPI_Scatterv(vals, curptrs, displs, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), master, World);
1514 for(
IT i=0; i< recvcount; ++i)
1516 ind.push_back( tempinds[i] );
1517 num.push_back( tempvals[i] );
1527template <
class IT,
class NT>
1528template <
class HANDLER>
1529void FullyDistSpVec<IT,NT>::SaveGathered(std::ofstream& outfile,
int master, HANDLER handler,
bool printProcSplits)
1532 MPI_Comm World = commGrid->GetWorld();
1533 MPI_Comm_rank(World, &
rank);
1534 MPI_Comm_size(World, &
nprocs);
1537 char _fn[] =
"temp_fullydistspvec";
1538 int mpi_err = MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
1539 if(mpi_err != MPI_SUCCESS)
1541 char mpi_err_str[MPI_MAX_ERROR_STRING];
1543 MPI_Error_string(mpi_err, mpi_err_str, &mpi_err_strlen);
1544 printf(
"MPI_File_open failed (%s)\n", mpi_err_str);
1545 MPI_Abort(World, 1);
1549 dist[
rank] = getlocnnz();
1550 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
1551 IT sizeuntil = std::accumulate(dist, dist+
rank,
static_cast<IT>(0));
1552 IT totalLength = TotalLength();
1553 IT totalNNZ = getnnz();
1561 MPI_Datatype datatype;
1562 MPI_Type_contiguous(
sizeof(
mystruct), MPI_CHAR, &datatype );
1563 MPI_Type_commit(&datatype);
1565 MPI_Type_size(datatype, &dsize);
1569 char native[] =
"native";
1570 MPI_File_set_view(thefile,
static_cast<int>(sizeuntil * dsize), datatype, datatype, native, MPI_INFO_NULL);
1572 int count = ind.size();
1574 for(
int i=0; i<count; ++i)
1576 packed[i].ind = ind[i] + sizeuntil;
1577 packed[i].num = num[i];
1580 mpi_err = MPI_File_write(thefile, packed, count, datatype, NULL);
1581 if(mpi_err != MPI_SUCCESS)
1583 char mpi_err_str[MPI_MAX_ERROR_STRING];
1585 MPI_Error_string(mpi_err, mpi_err_str, &mpi_err_strlen);
1586 printf(
"MPI_File_write failed (%s)\n", mpi_err_str);
1587 MPI_Abort(World, 1);
1591 MPI_File_close(&thefile);
1597 FILE * f = fopen(
"temp_fullydistspvec",
"r");
1600 std::cerr <<
"Problem reading binary input file\n";
1603 IT maxd = *std::max_element(dist, dist+
nprocs);
1606 std::streamsize oldPrecision = outfile.precision();
1607 outfile.precision(21);
1608 outfile << totalLength <<
"\t1\t" << totalNNZ << std::endl;
1609 for(
int i=0; i<
nprocs; ++i)
1612 if (fread(data, dsize, dist[i], f) <
static_cast<size_t>(dist[i]))
1614 std::cout <<
"fread 660 failed! attempting to continue..." << std::endl;
1617 if (printProcSplits)
1618 outfile <<
"Elements stored on proc " << i <<
":" << std::endl;
1620 for (
int j = 0; j < dist[i]; j++)
1622 outfile << data[j].ind+1 <<
"\t1\t";
1623 handler.save(outfile, data[j].num, data[j].ind);
1624 outfile << std::endl;
1627 outfile.precision(oldPrecision);
1629 remove(
"temp_fullydistspvec");
1637template <
class IT,
class NT>
1638template <
typename _Predicate>
1639IT FullyDistSpVec<IT,NT>::Count(_Predicate pred)
const
1641 IT local = count_if( num.begin(), num.end(), pred );
1643 MPI_Allreduce( &local, &whole, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
1648template <
class IT,
class NT>
1649template <
typename _BinaryOperation>
1650NT FullyDistSpVec<IT,NT>::Reduce(_BinaryOperation __binary_op,
NT init)
const
1654 NT localsum = std::accumulate( num.begin(), num.end(),
init, __binary_op);
1657 MPI_Allreduce( &localsum, &totalsum, 1, MPIType<NT>(), MPIOp<_BinaryOperation, NT>::op(), commGrid->GetWorld());
1661template <
class IT,
class NT>
1662template <
typename OUT,
typename _BinaryOperation,
typename _UnaryOperation>
1663OUT FullyDistSpVec<IT,NT>::Reduce(_BinaryOperation __binary_op, OUT default_val, _UnaryOperation __unary_op)
const
1666 OUT localsum = default_val;
1670 typename std::vector< NT >::const_iterator iter = num.begin();
1673 while (iter < num.end())
1675 localsum = __binary_op(localsum, __unary_op(*iter));
1680 OUT totalsum = default_val;
1681 MPI_Allreduce( &localsum, &totalsum, 1, MPIType<OUT>(), MPIOp<_BinaryOperation, OUT>::op(), commGrid->GetWorld());
1685template <
class IT,
class NT>
1686void FullyDistSpVec<IT,NT>::PrintInfo(std::string vectorname)
const
1689 if (commGrid->GetRank() == 0)
1690 std::cout <<
"As a whole, " << vectorname <<
" has: " << nznz <<
" nonzeros and length " << glen << std::endl;
1693template <
class IT,
class NT>
1694void FullyDistSpVec<IT,NT>::DebugPrint()
1697 MPI_Comm World = commGrid->GetWorld();
1698 MPI_Comm_rank(World, &
rank);
1699 MPI_Comm_size(World, &
nprocs);
1702 char tfilename[32] =
"temp_fullydistspvec";
1703 MPI_File_open(World, tfilename, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
1706 dist[
rank] = getlocnnz();
1707 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
1708 IT sizeuntil = std::accumulate(dist, dist+
rank,
static_cast<IT>(0));
1716 MPI_Datatype datatype;
1717 MPI_Type_contiguous(
sizeof(
mystruct), MPI_CHAR, &datatype );
1718 MPI_Type_commit(&datatype);
1720 MPI_Type_size(datatype, &dsize);
1724 char openmode[32] =
"native";
1725 MPI_File_set_view(thefile,
static_cast<int>(sizeuntil * dsize), datatype, datatype, openmode, MPI_INFO_NULL);
1727 int count = ind.size();
1729 for(
int i=0; i<count; ++i)
1731 packed[i].ind = ind[i];
1732 packed[i].num = num[i];
1734 MPI_File_write(thefile, packed, count, datatype, MPI_STATUS_IGNORE);
1736 MPI_File_close(&thefile);
1742 FILE * f = fopen(
"temp_fullydistspvec",
"r");
1745 std::cerr <<
"Problem reading binary input file\n";
1748 IT maxd = *std::max_element(dist, dist+
nprocs);
1751 for(
int i=0; i<
nprocs; ++i)
1754 if (fread(data, dsize, dist[i],f) <
static_cast<size_t>(dist[i]))
1756 std::cout <<
"fread 802 failed! attempting to continue..." << std::endl;
1759 std::cout <<
"Elements stored on proc " << i <<
": {";
1760 for (
int j = 0; j < dist[i]; j++)
1762 std::cout <<
"(" << data[j].ind <<
"," << data[j].num <<
"), ";
1764 std::cout <<
"}" << std::endl;
1767 remove(
"temp_fullydistspvec");
1774template <
class IT,
class NT>
1775void FullyDistSpVec<IT,NT>::Reset()
1782template <
class IT,
class NT>
1783void FullyDistSpVec<IT,NT>::BulkSet(
IT inds[],
int count) {
1786 std::copy(inds, inds+count, ind.data());
1787 std::copy(inds, inds+count, num.data());
1900template <
class IT,
class NT>
1901FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::Invert (
IT globallen)
1903 FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
1904 IT max_entry = Reduce(maximum<IT>(), (
IT) 0 ) ;
1905 if(max_entry >= globallen)
1907 std::cout <<
"Sparse vector has entries (" << max_entry <<
") larger than requested global vector length " << globallen << std::endl;
1911 MPI_Comm World = commGrid->GetWorld();
1912 int nprocs = commGrid->GetSize();
1913 int * rdispls =
new int[
nprocs+1];
1914 int * recvcnt =
new int[
nprocs];
1915 int * sendcnt =
new int[
nprocs]();
1916 int * sdispls =
new int[
nprocs+1];
1919 IT ploclen = getlocnnz();
1921#pragma omp parallel for
1923 for(
IT k=0; k < ploclen; ++k)
1926 int owner = Inverted.Owner(num[k], locind);
1928 __sync_fetch_and_add(&sendcnt[owner], 1);
1935 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
1939 for(
int i=0; i<
nprocs; ++i)
1941 sdispls[i+1] = sdispls[i] + sendcnt[i];
1942 rdispls[i+1] = rdispls[i] + recvcnt[i];
1947 NT * datbuf =
new NT[ploclen];
1948 IT * indbuf =
new IT[ploclen];
1949 int *count =
new int[
nprocs]();
1951#pragma omp parallel for
1953 for(
IT i=0; i < ploclen; ++i)
1956 int owner = Inverted.Owner(num[i], locind);
1959 id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
1961 id = sdispls[owner] + count[owner];
1964 datbuf[id] = ind[i] + LengthUntil();
1965 indbuf[id] = locind;
1971 NT * recvdatbuf =
new NT[totrecv];
1972 MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
1975 IT * recvindbuf =
new IT[totrecv];
1976 MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
1980 std::vector< std::pair<IT,NT> > tosort;
1981 tosort.resize(totrecv);
1983#pragma omp parallel for
1985 for(
int i=0; i<totrecv; ++i)
1987 tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
1990 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
1992#if defined(GNU_PARALLEL) && defined(_OPENMP)
1993 __gnu_parallel::sort(tosort.begin(), tosort.end());
1995 std::sort(tosort.begin(), tosort.end());
1998 Inverted.ind.reserve(totrecv);
1999 Inverted.num.reserve(totrecv);
2003 for(
typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); ++itr)
2005 if(lastIndex!=itr->first)
2007 Inverted.ind.push_back(itr->first);
2008 Inverted.num.push_back(itr->second);
2010 lastIndex = itr->first;
2025template <
class IT,
class NT>
2026template <
typename _BinaryOperationIdx,
typename _BinaryOperationVal,
typename _BinaryOperationDuplicate>
2027FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::Invert (
IT globallen, _BinaryOperationIdx __binopIdx, _BinaryOperationVal __binopVal, _BinaryOperationDuplicate __binopDuplicate)
2031 FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
2035 IT localmax = (
IT) 0;
2036 for(
size_t k=0; k < num.size(); ++k)
2038 localmax = std::max(localmax, __binopIdx(num[k], ind[k] + LengthUntil()));
2040 IT globalmax = (
IT) 0;
2041 MPI_Allreduce( &localmax, &globalmax, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
2043 if(globalmax >= globallen)
2045 std::cout <<
"Sparse vector has entries (" << globalmax <<
") larger than requested global vector length " << globallen << std::endl;
2051 MPI_Comm World = commGrid->GetWorld();
2052 int nprocs = commGrid->GetSize();
2053 int * rdispls =
new int[
nprocs+1];
2054 int * recvcnt =
new int[
nprocs];
2055 int * sendcnt =
new int[
nprocs]();
2056 int * sdispls =
new int[
nprocs+1];
2059 IT ploclen = getlocnnz();
2061#pragma omp parallel for
2063 for(
IT k=0; k < ploclen; ++k)
2066 IT globind = __binopIdx(num[k], ind[k] + LengthUntil());
2067 int owner = Inverted.Owner(globind, locind);
2070 __sync_fetch_and_add(&sendcnt[owner], 1);
2077 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
2081 for(
int i=0; i<
nprocs; ++i)
2083 sdispls[i+1] = sdispls[i] + sendcnt[i];
2084 rdispls[i+1] = rdispls[i] + recvcnt[i];
2088 NT * datbuf =
new NT[ploclen];
2089 IT * indbuf =
new IT[ploclen];
2090 int *count =
new int[
nprocs]();
2092#pragma omp parallel for
2094 for(
IT i=0; i < ploclen; ++i)
2097 IT globind = __binopIdx(num[i], ind[i] + LengthUntil());
2098 int owner = Inverted.Owner(globind, locind);
2101 id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
2103 id = sdispls[owner] + count[owner];
2106 datbuf[id] = __binopVal(num[i], ind[i] + LengthUntil());
2107 indbuf[id] = locind;
2113 NT * recvdatbuf =
new NT[totrecv];
2114 MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
2117 IT * recvindbuf =
new IT[totrecv];
2118 MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
2122 std::vector< std::pair<IT,NT> > tosort;
2123 tosort.resize(totrecv);
2125#pragma omp parallel for
2127 for(
int i=0; i<totrecv; ++i)
2129 tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
2132 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
2134#if defined(GNU_PARALLEL) && defined(_OPENMP)
2135 __gnu_parallel::sort(tosort.begin(), tosort.end());
2137 std::sort(tosort.begin(), tosort.end());
2140 Inverted.ind.reserve(totrecv);
2141 Inverted.num.reserve(totrecv);
2145 for(
typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); )
2147 IT ind = itr->first;
2148 NT val = itr->second;
2151 while(itr != tosort.end() && itr->first == ind)
2153 val = __binopDuplicate(val, itr->second);
2158 Inverted.ind.push_back(ind);
2159 Inverted.num.push_back(val);
2170template <
class IT,
class NT>
2171template <
typename _BinaryOperationIdx,
typename _BinaryOperationVal>
2172FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::InvertRMA (
IT globallen, _BinaryOperationIdx __binopIdx, _BinaryOperationVal __binopVal)
2176 FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
2178 MPI_Comm_rank(commGrid->GetWorld(), &myrank);
2181 IT localmax = (
IT) 0;
2182 for(
size_t k=0; k < num.size(); ++k)
2184 localmax = std::max(localmax, __binopIdx(num[k], ind[k] + LengthUntil()));
2186 IT globalmax = (
IT) 0;
2187 MPI_Allreduce( &localmax, &globalmax, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
2189 if(globalmax >= globallen)
2191 std::cout <<
"Sparse vector has entries (" << globalmax <<
") larger than requested global vector length " << globallen << std::endl;
2197 MPI_Comm World = commGrid->GetWorld();
2198 int nprocs = commGrid->GetSize();
2199 int * rdispls =
new int[
nprocs+1];
2200 int * recvcnt =
new int[
nprocs]();
2201 int * sendcnt =
new int[
nprocs]();
2202 int * sdispls =
new int[
nprocs+1];
2205 IT ploclen = getlocnnz();
2207#pragma omp parallel for
2209 for(
IT k=0; k < ploclen; ++k)
2212 IT globind = __binopIdx(num[k], ind[k] + LengthUntil());
2213 int owner = Inverted.Owner(globind, locind);
2216 __sync_fetch_and_add(&sendcnt[owner], 1);
2224 MPI_Win_create(recvcnt,
nprocs *
sizeof(MPI_INT),
sizeof(MPI_INT), MPI_INFO_NULL, World, &win2);
2225 for(
int i=0; i<
nprocs; ++i)
2229 MPI_Win_lock(MPI_LOCK_SHARED,i,MPI_MODE_NOCHECK,win2);
2230 MPI_Put(&sendcnt[i], 1, MPI_INT, i, myrank, 1, MPI_INT, win2);
2231 MPI_Win_unlock(i, win2);
2234 MPI_Win_free(&win2);
2240 for(
int i=0; i<
nprocs; ++i)
2242 sdispls[i+1] = sdispls[i] + sendcnt[i];
2243 rdispls[i+1] = rdispls[i] + recvcnt[i];
2246 int * rmadispls =
new int[
nprocs+1];
2248 MPI_Win_create(rmadispls,
nprocs *
sizeof(MPI_INT),
sizeof(MPI_INT), MPI_INFO_NULL, World, &win3);
2249 for(
int i=0; i<
nprocs; ++i)
2253 MPI_Win_lock(MPI_LOCK_SHARED,i,MPI_MODE_NOCHECK,win3);
2254 MPI_Put(&rdispls[i], 1, MPI_INT, i, myrank, 1, MPI_INT, win3);
2255 MPI_Win_unlock(i, win3);
2258 MPI_Win_free(&win3);
2261 NT * datbuf =
new NT[ploclen];
2262 IT * indbuf =
new IT[ploclen];
2263 int *count =
new int[
nprocs]();
2265#pragma omp parallel for
2267 for(
IT i=0; i < ploclen; ++i)
2270 IT globind = __binopIdx(num[i], ind[i] + LengthUntil());
2271 int owner = Inverted.Owner(globind, locind);
2274 id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
2276 id = sdispls[owner] + count[owner];
2279 datbuf[id] = __binopVal(num[i], ind[i] + LengthUntil());
2280 indbuf[id] = locind;
2286 NT * recvdatbuf =
new NT[totrecv];
2287 IT * recvindbuf =
new IT[totrecv];
2289 MPI_Win_create(recvdatbuf, totrecv *
sizeof(
NT),
sizeof(
NT), MPI_INFO_NULL, commGrid->GetWorld(), &win);
2290 MPI_Win_create(recvindbuf, totrecv *
sizeof(
IT),
sizeof(
IT), MPI_INFO_NULL, commGrid->GetWorld(), &win1);
2293 for(
int i=0; i<
nprocs; ++i)
2297 MPI_Win_lock(MPI_LOCK_SHARED, i, 0, win);
2298 MPI_Put(&datbuf[sdispls[i]], sendcnt[i], MPIType<NT>(), i, rmadispls[i], sendcnt[i], MPIType<NT>(), win);
2299 MPI_Win_unlock(i, win);
2301 MPI_Win_lock(MPI_LOCK_SHARED, i, 0, win1);
2302 MPI_Put(&indbuf[sdispls[i]], sendcnt[i], MPIType<IT>(), i, rmadispls[i], sendcnt[i], MPIType<IT>(), win1);
2303 MPI_Win_unlock(i, win1);
2309 MPI_Win_free(&win1);
2313 delete [] rmadispls;
2316 std::vector< std::pair<IT,NT> > tosort;
2317 tosort.resize(totrecv);
2319#pragma omp parallel for
2321 for(
int i=0; i<totrecv; ++i)
2323 tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
2326 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
2328#if defined(GNU_PARALLEL) && defined(_OPENMP)
2329 __gnu_parallel::sort(tosort.begin(), tosort.end());
2331 std::sort(tosort.begin(), tosort.end());
2334 Inverted.ind.reserve(totrecv);
2335 Inverted.num.reserve(totrecv);
2339 for(
typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); ++itr)
2341 if(lastIndex!=itr->first)
2343 Inverted.ind.push_back(itr->first);
2344 Inverted.num.push_back(itr->second);
2346 lastIndex = itr->first;
2356template <
typename IT,
typename NT>
2357template <
typename NT1,
typename _UnaryOperation>
2358void FullyDistSpVec<IT,NT>::Select (
const FullyDistVec<IT,NT1> & denseVec, _UnaryOperation __unop)
2360 if(*commGrid == *(denseVec.commGrid))
2362 if(TotalLength() != denseVec.TotalLength())
2364 std::ostringstream outs;
2365 outs <<
"Vector dimensions don't match (" << TotalLength() <<
" vs " << denseVec.TotalLength() <<
") for Select\n";
2366 SpParHelper::Print(outs.str());
2372 IT spsize = getlocnnz();
2375 for(
IT i=0; i< spsize; ++i)
2377 if(__unop(denseVec.arr[ind[i]]))
2389 std::ostringstream outs;
2390 outs <<
"Grids are not comparable for Select" << std::endl;
2391 SpParHelper::Print(outs.str());
2398template <
typename IT,
typename NT>
2399template <
typename NT1>
2400void FullyDistSpVec<IT,NT>::Setminus (
const FullyDistSpVec<IT,NT1> & other)
2402 if(*commGrid == *(other.commGrid))
2404 if(TotalLength() != other.TotalLength())
2406 std::ostringstream outs;
2407 outs <<
"Vector dimensions don't match (" << TotalLength() <<
" vs " << other.TotalLength() <<
") for Select\n";
2408 SpParHelper::Print(outs.str());
2414 IT mysize = getlocnnz();
2415 IT othersize = other.getlocnnz();
2418 for(; i< mysize && j < othersize;)
2420 if(other.ind[j] == ind[i])
2424 else if(other.ind[j] > ind[i])
2427 num[k++] = num[i++];
2434 num[k++] = num[i++];
2443 std::ostringstream outs;
2444 outs <<
"Grids are not comparable for Select" << std::endl;
2445 SpParHelper::Print(outs.str());
2452template <
typename IT,
typename NT>
2453template <
typename NT1,
typename _UnaryOperation,
typename _BinaryOperation>
2454void FullyDistSpVec<IT,NT>::SelectApply (
const FullyDistVec<IT,NT1> & denseVec, _UnaryOperation __unop, _BinaryOperation __binop)
2456 if(*commGrid == *(denseVec.commGrid))
2458 if(TotalLength() != denseVec.TotalLength())
2460 std::ostringstream outs;
2461 outs <<
"Vector dimensions don't match (" << TotalLength() <<
" vs " << denseVec.TotalLength() <<
") for Select\n";
2462 SpParHelper::Print(outs.str());
2468 IT spsize = getlocnnz();
2471 for(
IT i=0; i< spsize; ++i)
2473 if(__unop(denseVec.arr[ind[i]]))
2476 num[k++] = __binop(num[i], denseVec.arr[ind[i]]);
2485 std::ostringstream outs;
2486 outs <<
"Grids are not comparable for Select" << std::endl;
2487 SpParHelper::Print(outs.str());
2515template <
class IT,
class NT>
2516template <
typename _UnaryOperation>
2517void FullyDistSpVec<IT,NT>::FilterByVal (FullyDistSpVec<IT,IT> Selector, _UnaryOperation __unop,
bool filterByIndex)
2519 if(*commGrid != *(Selector.commGrid))
2521 std::ostringstream outs;
2522 outs <<
"Grids are not comparable for Filter" << std::endl;
2523 SpParHelper::Print(outs.str());
2526 int nprocs = commGrid->GetSize();
2527 MPI_Comm World = commGrid->GetWorld();
2530 int * rdispls =
new int[
nprocs];
2531 int sendcnt = Selector.ind.size();
2532 int * recvcnt =
new int[
nprocs];
2533 MPI_Allgather(&sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
2536 for(
int i=0; i<
nprocs-1; ++i)
2538 rdispls[i+1] = rdispls[i] + recvcnt[i];
2542 IT * sendbuf =
new IT[sendcnt];
2544#pragma omp parallel for
2546 for(
int k=0; k<sendcnt; k++)
2549 sendbuf[k] = Selector.ind[k] + Selector.LengthUntil();
2551 sendbuf[k] = Selector.num[k];
2554 IT totrecv = std::accumulate(recvcnt,recvcnt+
nprocs,
static_cast<IT>(0));
2556 std::vector<IT> recvbuf;
2557 recvbuf.resize(totrecv);
2559 MPI_Allgatherv(sendbuf, sendcnt, MPIType<IT>(), recvbuf.data(), recvcnt, rdispls, MPIType<IT>(), World);
2565#if defined(GNU_PARALLEL) && defined(_OPENMP)
2566 __gnu_parallel::sort(recvbuf.begin(), recvbuf.end());
2568 std::sort(recvbuf.begin(), recvbuf.end());
2575 for(
IT i=0; i<num.size(); i++)
2577 IT val = __unop(num[i]);
2578 if(!std::binary_search(recvbuf.begin(), recvbuf.end(), val))
friend class FullyDistSpVec
bool is_sorted(_ForwardIterator __first, _ForwardIterator __last)
void MurmurHash3_x64_64(const void *key, int len, uint32_t seed, void *out)
unsigned __int64 uint64_t