32#include "PBBS/radixSort.h"
33#include "Tommy/tommyhashdyn.h"
56template <
class SR,
class IT,
class NUM,
class IVT,
class OVT>
57void SpImpl<SR,IT,NUM,IVT,OVT>::SpMXSpV(
const Dcsc<IT,NUM> & Adcsc,
int32_t mA,
const int32_t * indx,
const IVT * numx,
int32_t veclen,
58 std::vector<int32_t> & indy, std::vector< OVT > & numy)
65 if(
sizeof(
NUM) >
sizeof(
OVT))
73 if(SR::returnedSAID())
103 if(!SR::returnedSAID())
134 if (!SR::returnedSAID())
167template <
class SR,
class IT,
class IVT,
class OVT>
168void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV(
const Dcsc<IT,bool> & Adcsc,
int32_t mA,
const int32_t * indx,
const IVT * numx,
int32_t veclen,
169 std::vector<int32_t> & indy, std::vector<OVT> & numy)
171 IT inf = std::numeric_limits<IT>::min();
172 IT sup = std::numeric_limits<IT>::max();
194 if(
sHeap.getSize() > 0)
198 numy.push_back( num );
200 while(
sHeap.getSize() > 0)
205 numy.back() = SR::add(
numy.back(), num);
222template <
typename SR,
typename IT,
typename IVT,
class OVT>
223void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV(
const Dcsc<IT,bool> & Adcsc,
int32_t mA,
const int32_t * indx,
const IVT * numx,
int32_t veclen,
224 int32_t * indy, OVT * numy,
int * cnts,
int * dspls,
int p_c)
228 std::vector< std::vector<int32_t> >
nzinds(p_c);
242 if(!isthere.get_bit(rowid))
247 isthere.set_bit(rowid);
259 for(
int p = 0; p< p_c; ++p)
265 for(
int i=0; i<
cnts[p]; ++i)
276template <
typename SR,
typename IT,
typename IVT,
typename OVT>
277void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV_ForThreading(
const Dcsc<IT,bool> & Adcsc,
int32_t mA,
const int32_t * indx,
const IVT * numx,
int32_t veclen,
278 std::vector<int32_t> & indy, std::vector<OVT> & numy,
int32_t offset)
282 std::vector<uint32_t>
nzinds;
284 SpMXSpV_ForThreading(
Adcsc,
mA,
indx, numx,
veclen,
indy,
numy,
offset,
localy, isthere,
nzinds);
290template <
typename SR,
typename IT,
typename IVT,
typename OVT>
291void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV_ForThreading(
const Dcsc<IT,bool> & Adcsc,
int32_t mA,
const int32_t * indx,
const IVT * numx,
int32_t veclen, std::vector<int32_t> & indy, std::vector<OVT> & numy,
int32_t offset, std::vector<OVT> & localy, BitMap & isthere, std::vector<uint32_t> & nzinds)
305 if(!isthere.get_bit(rowid))
309 isthere.set_bit(rowid);
323 for(
int i=0; i<
nnzy; ++i)
344template <
typename SR,
typename IT,
typename NT,
typename IVT,
typename OVT>
347 IT inf = std::numeric_limits<IT>::min();
348 IT sup = std::numeric_limits<IT>::max();
352 for (
int32_t k = 0; k < veclen; ++k)
355 for(
IT j=Acsc.jc[colid]; j < Acsc.jc[colid+1]; ++j)
357 OVT val = SR::multiply( Acsc.num[j], numx[k]);
358 sHeap.insert(Acsc.ir[j], val);
364 if(sHeap.getSize() > 0)
366 sHeap.deleteMin(&row, &num);
368 indy.push_back( (
int32_t) row);
369 numy.push_back( num );
371 while(sHeap.getSize() > 0)
373 sHeap.deleteMin(&row, &num);
375 if(indy.back() == row)
377 numy.back() = SR::add(numy.back(), num);
381 indy.push_back( (
int32_t) row);
389template <
typename SR,
typename IT,
typename NT,
typename IVT,
typename OVT>
391 std::vector<int32_t> & indy, std::vector< OVT > & numy, PreAllocatedSPA<OVT> & SPA)
396 double tstart = MPI_Wtime();
398 int rowSplits = SPA.buckets;
402 nthreads = omp_get_num_threads();
405 if(rowSplits < nthreads)
407 std::ostringstream outs;
408 outs <<
"Warning in SpMXSpV_Bucket: " << rowSplits <<
" buckets are supplied for " << nthreads <<
" threads\n";
409 outs <<
"4 times the number of threads are recommended when creating PreAllocatedSPA\n";
413 int32_t rowPerSplit = mA / rowSplits;
422 std::vector<std::vector<int32_t>> bSize(rowSplits, std::vector<int32_t> ( rowSplits, 0));
423 std::vector<std::vector<int32_t>> bOffset(rowSplits, std::vector<int32_t> ( rowSplits, 0));
424 std::vector<int32_t> sendSize(rowSplits);
425 double t0, t1, t2, t3, t4;
426#ifdef BENCHMARK_SPMSPV
431#pragma omp parallel for schedule(dynamic, 1)
433 for(
int b=0; b<rowSplits; b++)
436 int perBucket = veclen/rowSplits;
437 int spill = veclen%rowSplits;
438 int32_t xstart = b*perBucket + std::min(spill, b);
439 int32_t xend = (b+1)*perBucket + std::min(spill, b+1);
440 std::vector<int32_t> temp(rowSplits,0);
441 for (
int32_t i = xstart; i < xend; ++i)
444 for(
IT j=Acsc.jc[colid]; j < Acsc.jc[colid+1]; ++j)
448 if(rowPerSplit!=0) splitId = (rowid/rowPerSplit > rowSplits-1) ? rowSplits-1 : rowid/rowPerSplit;
454 for(
int k=0; k<rowSplits; k++)
456 bSize[b][k] = temp[k];
459 sendSize[b] = totSend;
463#ifdef BENCHMARK_SPMSPV
464 t1 = MPI_Wtime() - t0;
471 for(
int i=1; i<rowSplits; i++)
473 for(
int j=0; j<rowSplits; j++)
475 bOffset[i][j] = bOffset[i-1][j] + bSize[i-1][j];
480 std::vector<uint32_t> disp(rowSplits+1);
481 int maxBucketSize = -1;
483 for(
int j=0; j<rowSplits; j++)
485 int thisBucketSize = bOffset[rowSplits-1][j] + bSize[rowSplits-1][j];
486 disp[j+1] = disp[j] + thisBucketSize;
487 bSize[rowSplits-1][j] = 0;
488 maxBucketSize = std::max(thisBucketSize, maxBucketSize);
493#ifdef BENCHMARK_SPMSPV
494 double tseq = MPI_Wtime() - t0;
502#define L2_CACHE_SIZE 256000
504 int THREAD_BUF_LEN = 256;
505 int itemsize =
sizeof(
int32_t) +
sizeof(OVT);
508 int bufferMem = THREAD_BUF_LEN * rowSplits * itemsize + 8 * rowSplits;
512 THREAD_BUF_LEN = std::min(maxBucketSize+1,THREAD_BUF_LEN);
514#ifdef BENCHMARK_SPMSPV
523 OVT* tNumSplitA =
new OVT[rowSplits*THREAD_BUF_LEN];
524 std::vector<int32_t> tBucketSize(rowSplits);
525 std::vector<int32_t> tOffset(rowSplits);
527#pragma omp for schedule(dynamic,1)
529 for(
int b=0; b<rowSplits; b++)
532 std::fill(tBucketSize.begin(), tBucketSize.end(), 0);
533 std::fill(tOffset.begin(), tOffset.end(), 0);
534 int perBucket = veclen/rowSplits;
535 int spill = veclen%rowSplits;
536 int32_t xstart = b*perBucket + std::min(spill, b);
537 int32_t xend = (b+1)*perBucket + std::min(spill, b+1);
539 for (
int32_t i = xstart; i < xend; ++i)
542 for(
IT j=Acsc.jc[colid]; j < Acsc.jc[colid+1]; ++j)
544 OVT val = SR::multiply( Acsc.num[j], numx[i]);
547 if(rowPerSplit!=0) splitId = (rowid/rowPerSplit > rowSplits-1) ? rowSplits-1 : rowid/rowPerSplit;
548 if (tBucketSize[splitId] < THREAD_BUF_LEN)
550 tIndSplitA[splitId*THREAD_BUF_LEN + tBucketSize[splitId]] = rowid;
551 tNumSplitA[splitId*THREAD_BUF_LEN + tBucketSize[splitId]++] = val;
555 std::copy(tIndSplitA + splitId*THREAD_BUF_LEN, tIndSplitA + (splitId+1)*THREAD_BUF_LEN, &SPA.indSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
556 std::copy(tNumSplitA + splitId*THREAD_BUF_LEN, tNumSplitA + (splitId+1)*THREAD_BUF_LEN, &SPA.numSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
557 tIndSplitA[splitId*THREAD_BUF_LEN] = rowid;
558 tNumSplitA[splitId*THREAD_BUF_LEN] = val;
559 tOffset[splitId] += THREAD_BUF_LEN ;
560 tBucketSize[splitId] = 1;
565 for(
int splitId=0; splitId<rowSplits; ++splitId)
567 if(tBucketSize[splitId]>0)
569 std::copy(tIndSplitA + splitId*THREAD_BUF_LEN, tIndSplitA + splitId*THREAD_BUF_LEN + tBucketSize[splitId], &SPA.indSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
570 std::copy(tNumSplitA + splitId*THREAD_BUF_LEN, tNumSplitA + splitId*THREAD_BUF_LEN + tBucketSize[splitId], &SPA.numSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
574 delete [] tIndSplitA;
575 delete [] tNumSplitA;
578#ifdef BENCHMARK_SPMSPV
579 t2 = MPI_Wtime() - t0;
582 std::vector<uint32_t> nzInRowSplits(rowSplits);
586#pragma omp parallel for schedule(dynamic,1)
588 for(
int rs=0; rs<rowSplits; ++rs)
591 for(
int i=disp[rs]; i<disp[rs+1] ; i++)
593 int32_t lrowid = SPA.indSplitA[i] - rs * rowPerSplit;
594 SPA.V_isthereBool[rs][lrowid] =
false;
597 for(
int i=disp[rs]; i<disp[rs+1] ; i++)
599 int32_t rowid = SPA.indSplitA[i];
600 int32_t lrowid = rowid - rs * rowPerSplit;
601 if(!SPA.V_isthereBool[rs][lrowid])
603 SPA.V_localy[0][rowid] = SPA.numSplitA[i];
604 nzinds[tMergeDisp++] = rowid;
605 SPA.V_isthereBool[rs][lrowid]=
true;
609 SPA.V_localy[0][rowid] = SR::add(SPA.V_localy[0][rowid], SPA.numSplitA[i]);
613 integerSort(nzinds + disp[rs], tMergeDisp - disp[rs]);
614 nzInRowSplits[rs] = tMergeDisp - disp[rs];
618#ifdef BENCHMARK_SPMSPV
619 t3 = MPI_Wtime() - t0;
622 std::vector<uint32_t> dispRowSplits(rowSplits+1);
623 dispRowSplits[0] = 0;
624 for(
int i=0; i<rowSplits; i++)
626 dispRowSplits[i+1] = dispRowSplits[i] + nzInRowSplits[i];
629#ifdef BENCHMARK_SPMSPV
632 int nnzy = dispRowSplits[rowSplits];
635#ifdef BENCHMARK_SPMSPV
636 tseq = MPI_Wtime() - t0;
640 int maxNnzInSplit = *std::max_element(nzInRowSplits.begin(),nzInRowSplits.end());
641 THREAD_BUF_LEN = std::min(maxNnzInSplit+1,256);
646 OVT* tnumy =
new OVT [THREAD_BUF_LEN];
650#pragma omp for schedule(dynamic,1)
652 for(
int rs=0; rs<rowSplits; rs++)
656 uint32_t * thisind = nzinds + disp[rs];
657 std::copy(nzinds+disp[rs], nzinds+disp[rs]+nzInRowSplits[rs], indy.begin()+dispRowSplits[rs]);
658 for(
int j=0; j<nzInRowSplits[rs]; j++)
661 if ( curSize < THREAD_BUF_LEN)
663 tnumy[curSize++] = SPA.V_localy[0][thisind[j]];
667 std::copy(tnumy, tnumy+curSize, numy.begin()+dispRowSplits[rs]+tdisp);
669 tnumy[0] = SPA.V_localy[0][thisind[j]];
675 std::copy(tnumy, tnumy+curSize, numy.begin()+dispRowSplits[rs]+tdisp);
683#ifdef BENCHMARK_SPMSPV
684 t4 = MPI_Wtime() - t0;
691#ifdef BENCHMARK_SPMSPV
692 double tall = MPI_Wtime() - tstart;
693 std::ostringstream outs1;
694 outs1 <<
"Time breakdown of SpMSpV-bucket." << std::endl;
695 outs1 <<
"Estimate buckets: "<< t1 <<
" Bucketing: " << t2 <<
" SPA-merge: " << t3 <<
" Output: " << t4 <<
" Total: "<< tall << std::endl;
static void Print(const std::string &s)
void integerSort(std::pair< uint32_t, T > *A, int n)
void SpMXSpV_Bucket(const Csc< IT, NT > &Acsc, int32_t mA, const int32_t *indx, const IVT *numx, int32_t veclen, std::vector< int32_t > &indy, std::vector< OVT > &numy, PreAllocatedSPA< OVT > &SPA)
void SpMXSpV_ForThreading(const Dcsc< IT, NUM > &Adcsc, int32_t mA, const int32_t *indx, const IVT *numx, int32_t veclen, std::vector< int32_t > &indy, std::vector< OVT > &numy, int32_t offset)
Overload #3: DCSC.
void SpMXSpV_HeapSort(const Csc< IT, NT > &Acsc, int32_t mA, const int32_t *indx, const IVT *numx, int32_t veclen, std::vector< int32_t > &indy, std::vector< OVT > &numy, int32_t offset)
static void SpMXSpV(const Dcsc< IT, NUM > &Adcsc, int32_t mA, const int32_t *indx, const IVT *numx, int32_t veclen, std::vector< int32_t > &indy, std::vector< OVT > &numy)
static void SpMXSpV_ForThreading(const Dcsc< IT, NUM > &Adcsc, int32_t mA, const int32_t *indx, const IVT *numx, int32_t veclen, std::vector< int32_t > &indy, std::vector< OVT > &numy, int32_t offset)