COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
mtSpGEMM.h
Go to the documentation of this file.
1#ifndef _mtSpGEMM_h
2#define _mtSpGEMM_h
3
4#include "CombBLAS.h"
5
6namespace combblas {
7/*
8 Multithreaded prefix sum
9 Inputs:
10 in: an input array
11 size: the length of the input array "in"
12 nthreads: number of threads used to compute the prefix sum
13
14 Output:
15 return an array of size "size+1"
16 the memory of the output array is allocated internallay
17
18 Example:
19
20 in = [2, 1, 3, 5]
21 out = [0, 2, 3, 6, 11]
22 */
23template <typename T>
24T* prefixsum(T* in, int size, int nthreads)
25{
26 std::vector<T> tsum(nthreads+1);
27 tsum[0] = 0;
28 T* out = new T[size+1];
29 out[0] = 0;
30 T* psum = &out[1];
31#ifdef THREADED
32#pragma omp parallel
33#endif
34 {
35 int ithread = 0;
36 #ifdef THREADED
37 ithread = omp_get_thread_num();
38 #endif
39
40 T sum = 0;
41#ifdef THREADED
42#pragma omp for schedule(static)
43#endif
44 for (int i=0; i<size; i++)
45 {
46 sum += in[i];
47 psum[i] = sum;
48 }
49
50 tsum[ithread+1] = sum;
51#ifdef THREADED
52#pragma omp barrier
53#endif
54 T offset = 0;
55 for(int i=0; i<(ithread+1); i++)
56 {
57 offset += tsum[i];
58 }
59
60#ifdef THREADED
61#pragma omp for schedule(static)
62#endif
63 for (int i=0; i<size; i++)
64 {
65 psum[i] += offset;
66 }
67
68 }
69 return out;
70}
71
72
73// multithreaded HeapSpGEMM
74template <typename SR, typename NTO, typename IT, typename NT1, typename NT2>
75SpTuples<IT, NTO> * LocalSpGEMM
76(const SpDCCols<IT, NT1> & A,
77 const SpDCCols<IT, NT2> & B,
78 bool clearA, bool clearB)
79{
80 IT mdim = A.getnrow();
81 IT ndim = B.getncol();
82 IT nnzA = A.getnnz();
83 if(A.isZero() || B.isZero())
84 {
85 return new SpTuples<IT, NTO>(0, mdim, ndim);
86 }
87
88
89 Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
90 Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
91 IT nA = A.getncol();
92 float cf = static_cast<float>(nA+1) / static_cast<float>(Adcsc->nzc);
93 IT csize = static_cast<IT>(ceil(cf)); // chunk size
94 IT * aux;
95 Adcsc->ConstructAux(nA, aux);
96
97
98 int numThreads = 1;
99#ifdef THREADED
100#pragma omp parallel
101 {
102 numThreads = omp_get_num_threads();
103 }
104#endif
105
106 IT* colnnzC = estimateNNZ(A, B, aux,false); // don't free aux
107 IT* colptrC = prefixsum<IT>(colnnzC, Bdcsc->nzc, numThreads);
108 delete [] colnnzC;
109 IT nnzc = colptrC[Bdcsc->nzc];
110 std::tuple<IT,IT,NTO> * tuplesC = static_cast<std::tuple<IT,IT,NTO> *> (::operator new (sizeof(std::tuple<IT,IT,NTO>[nnzc])));
111
112 // thread private space for heap and colinds
113 std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
114 std::vector<std::vector<HeapEntry<IT,NT1>>> globalheapVec(numThreads);
115
116 for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
117 {
118 colindsVec[i].resize(nnzA/numThreads);
119 globalheapVec[i].resize(nnzA/numThreads);
120 }
121
122 size_t Bnzc = (size_t) Bdcsc->nzc;
123#ifdef THREADED
124#pragma omp parallel for
125#endif
126 for(size_t i=0; i < Bnzc; ++i)
127 {
128 size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
129 int myThread = 0;
130 #ifdef THREADED
131 myThread = omp_get_thread_num();
132 #endif
133 if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
134 {
135 colindsVec[myThread].resize(nnzcolB);
136 globalheapVec[myThread].resize(nnzcolB);
137 }
138
139
140 // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
141 // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
142 Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
143 std::pair<IT,IT> * colinds = colindsVec[myThread].data();
144 HeapEntry<IT,NT1> * wset = globalheapVec[myThread].data();
145 IT hsize = 0;
146
147
148 for(size_t j = 0; j < nnzcolB; ++j) // create the initial heap
149 {
150 if(colinds[j].first != colinds[j].second) // current != end
151 {
152 wset[hsize++] = HeapEntry< IT,NT1 > (Adcsc->ir[colinds[j].first], j, Adcsc->numx[colinds[j].first]);
153 }
154 }
155 std::make_heap(wset, wset+hsize);
156
157 IT curptr = colptrC[i];
158 while(hsize > 0)
159 {
160 std::pop_heap(wset, wset + hsize); // result is stored in wset[hsize-1]
161 IT locb = wset[hsize-1].runr; // relative location of the nonzero in B's current column
162
163 NTO mrhs = SR::multiply(wset[hsize-1].num, Bdcsc->numx[Bdcsc->cp[i]+locb]);
164 if (!SR::returnedSAID())
165 {
166 if( (curptr > colptrC[i]) && std::get<0>(tuplesC[curptr-1]) == wset[hsize-1].key)
167 {
168 std::get<2>(tuplesC[curptr-1]) = SR::add(std::get<2>(tuplesC[curptr-1]), mrhs);
169 }
170 else
171 {
172 tuplesC[curptr++]= std::make_tuple(wset[hsize-1].key, Bdcsc->jc[i], mrhs) ;
173 }
174
175 }
176
177 if( (++(colinds[locb].first)) != colinds[locb].second) // current != end
178 {
179 // runr stays the same !
180 wset[hsize-1].key = Adcsc->ir[colinds[locb].first];
181 wset[hsize-1].num = Adcsc->numx[colinds[locb].first];
182 std::push_heap(wset, wset+hsize);
183 }
184 else
185 {
186 --hsize;
187 }
188 }
189 }
190
191 if(clearA)
192 delete const_cast<SpDCCols<IT, NT1> *>(&A);
193 if(clearB)
194 delete const_cast<SpDCCols<IT, NT2> *>(&B);
195
196 delete [] colptrC;
197 delete [] aux;
198
199 SpTuples<IT, NTO>* spTuplesC = new SpTuples<IT, NTO> (nnzc, mdim, ndim, tuplesC, true, true);
200 return spTuplesC;
201
202}
203
204
205
206template <typename IT, typename NT>
207bool sort_less(const std::pair<IT, NT> &left, const std::pair<IT, NT> &right)
208{
209 return left.first < right.first;
210}
211
212// Hybrid approach of multithreaded HeapSpGEMM and HashSpGEMM
213template <typename SR, typename NTO, typename IT, typename NT1, typename NT2>
214SpTuples<IT, NTO> * LocalHybridSpGEMM
215(const SpDCCols<IT, NT1> & A,
216 const SpDCCols<IT, NT2> & B,
217 bool clearA, bool clearB, IT * aux = nullptr)
218{
219
220
221 IT mdim = A.getnrow();
222 IT ndim = B.getncol();
223 IT nnzA = A.getnnz();
224 if(A.isZero() || B.isZero())
225 {
226 return new SpTuples<IT, NTO>(0, mdim, ndim);
227 }
228
229
230 Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
231 Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
232 IT nA = A.getncol();
233 float cf = static_cast<float>(nA+1) / static_cast<float>(Adcsc->nzc);
234 IT csize = static_cast<IT>(ceil(cf)); // chunk size
235 //IT * aux;
236 bool deleteAux = false;
237 if(aux==nullptr)
238 {
239 deleteAux = true;
240 Adcsc->ConstructAux(nA, aux);
241 }
242
243 int numThreads = 1;
244#ifdef THREADED
245#pragma omp parallel
246 {
247 numThreads = omp_get_num_threads();
248 }
249#endif
250
251 // std::cout << "numThreads: " << numThreads << std::endl;
252
253 IT* flopC = estimateFLOP(A, B, aux);
254 //IT* flopptr = prefixsum<IT>(flopC, Bdcsc->nzc, numThreads);
255 //IT flop = flopptr[Bdcsc->nzc];
256 // std::cout << "FLOP of A * B is " << flop << std::endl;
257
258
259 IT* colnnzC = estimateNNZ_Hash(A, B, flopC, aux);
260 IT* flopptr = prefixsum<IT>(flopC, Bdcsc->nzc, numThreads);
261 IT flop = flopptr[Bdcsc->nzc];
262 IT* colptrC = prefixsum<IT>(colnnzC, Bdcsc->nzc, numThreads);
263 delete [] colnnzC;
264 delete [] flopC;
265 IT nnzc = colptrC[Bdcsc->nzc];
266 //double compression_ratio = (double)flop / nnzc;
267
268
269 // std::cout << "NNZ of A * B is " << nnzc << std::endl;
270 // std::cout << "Compression ratio is " << compression_ratio << std::endl;
271
272 std::tuple<IT,IT,NTO> * tuplesC = static_cast<std::tuple<IT,IT,NTO> *> (::operator new (sizeof(std::tuple<IT,IT,NTO>[nnzc])));
273 //std::tuple<IT,IT,NTO> * tuplesC = new std::tuple<IT,IT,NTO>[nnzc];
274
275 // thread private space for heap and colinds
276 std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
277
278 std::vector<std::vector< std::pair<IT,NTO>>> globalHashVecAll(numThreads);
279 std::vector<std::vector< HeapEntry<IT,NT1>>> globalHeapVecAll(numThreads);
280 /*
281 for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
282 {
283 colindsVec[i].resize(nnzA/numThreads);
284 }*/
285
286 // IT hashSelected = 0;
287
288
289#ifdef THREADED
290#pragma omp parallel for
291#endif
292 for(size_t i=0; i < Bdcsc->nzc; ++i)
293 {
294 size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
295 int myThread = 0;
296
297#ifdef THREADED
298 myThread = omp_get_thread_num();
299#endif
300 if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
301 {
302 colindsVec[myThread].resize(nnzcolB);
303 }
304
305 // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
306 // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
307 Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
308 std::pair<IT,IT> * colinds = colindsVec[myThread].data();
309
310 double cr = static_cast<double>(flopptr[i+1] - flopptr[i]) / (colptrC[i+1] - colptrC[i]);
311 if (cr < 2.0) // Heap Algorithm
312 {
313 if(globalHeapVecAll[myThread].size() < nnzcolB)
314 globalHeapVecAll[myThread].resize(nnzcolB);
315 //std::vector<HeapEntry<IT,NT1>> globalheapVec(nnzcolB);
316 //HeapEntry<IT, NT1> * wset = globalheapVec.data();
317 HeapEntry<IT, NT1> * wset = globalHeapVecAll[myThread].data();
318
319 IT hsize = 0;
320
321 for(size_t j = 0; j < nnzcolB; ++j) // create the initial heap
322 {
323 if(colinds[j].first != colinds[j].second) // current != end
324 {
325 wset[hsize++] = HeapEntry< IT,NT1 > (Adcsc->ir[colinds[j].first], j, Adcsc->numx[colinds[j].first]);
326 }
327 }
328 std::make_heap(wset, wset+hsize);
329
330 IT curptr = colptrC[i];
331 while(hsize > 0)
332 {
333 std::pop_heap(wset, wset + hsize); // result is stored in wset[hsize-1]
334 IT locb = wset[hsize-1].runr; // relative location of the nonzero in B's current column
335
336 NTO mrhs = SR::multiply(wset[hsize-1].num, Bdcsc->numx[Bdcsc->cp[i]+locb]);
337 if (!SR::returnedSAID())
338 {
339 if( (curptr > colptrC[i]) && std::get<0>(tuplesC[curptr-1]) == wset[hsize-1].key)
340 {
341 std::get<2>(tuplesC[curptr-1]) = SR::add(std::get<2>(tuplesC[curptr-1]), mrhs);
342 }
343 else
344 {
345 tuplesC[curptr++]= std::make_tuple(wset[hsize-1].key, Bdcsc->jc[i], mrhs) ;
346 }
347 }
348 if( (++(colinds[locb].first)) != colinds[locb].second) // current != end
349 {
350 // runr stays the same !
351 wset[hsize-1].key = Adcsc->ir[colinds[locb].first];
352 wset[hsize-1].num = Adcsc->numx[colinds[locb].first];
353 std::push_heap(wset, wset+hsize);
354 }
355 else
356 {
357 --hsize;
358 }
359 }
360 } // Finish Heap
361
362 else // Hash Algorithm
363 {
364 // #pragma omp atomic
365 // hashSelected++;
366 const IT minHashTableSize = 16;
367 const IT hashScale = 107;
368 size_t nnzcolC = colptrC[i+1] - colptrC[i]; //nnz in the current column of C (=Output)
369
370 size_t ht_size = minHashTableSize;
371 while(ht_size < nnzcolC) //ht_size is set as 2^n
372 {
373 ht_size <<= 1;
374 }
375
376
377 if(globalHashVecAll[myThread].size() < ht_size)
378 globalHashVecAll[myThread].resize(ht_size);
379 //std::vector<HeapEntry<IT,NT1>> globalheapVec(nnzcolB);
380 //HeapEntry<IT, NT1> * wset = globalheapVec.data();
381 //HeapEntry<IT, NT1> * wset = globalheapVecAll[myThread].data();
382
383 //std::vector< std::pair<IT,NTO>> globalHashVec(ht_size);
384 std::pair<IT,NTO>* globalHashVec = globalHashVecAll[myThread].data();
385 // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
386 // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
387
388 // Initialize hash tables
389 for(size_t j=0; j < ht_size; ++j)
390 {
391 globalHashVec[j].first = -1;
392 }
393
394 // Multiply and add on Hash table
395 for (size_t j=0; j < nnzcolB; ++j)
396 {
397 IT t_bcol = Bdcsc->ir[Bdcsc->cp[i] + j];
398 NT2 t_bval = Bdcsc->numx[Bdcsc->cp[i] + j];
399 for (IT k = colinds[j].first; k < colinds[j].second; ++k)
400 {
401 NTO mrhs = SR::multiply(Adcsc->numx[k], t_bval);
402 IT key = Adcsc->ir[k];
403 IT hash = (key*hashScale) & (ht_size-1);
404 while (1) //hash probing
405 {
406 if (globalHashVec[hash].first == key) //key is found in hash table
407 {
408 globalHashVec[hash].second = SR::add(mrhs, globalHashVec[hash].second);
409 break;
410 }
411 else if (globalHashVec[hash].first == -1) //key is not registered yet
412 {
413 globalHashVec[hash].first = key;
414 globalHashVec[hash].second = mrhs;
415 break;
416 }
417 else //key is not found
418 {
419 hash = (hash+1) & (ht_size-1);
420 }
421 }
422 }
423 }
424 // gather non-zero elements from hash table, and then sort them by row indices
425 size_t index = 0;
426 for (size_t j=0; j < ht_size; ++j)
427 {
428 if (globalHashVec[j].first != -1)
429 {
430 globalHashVec[index++] = globalHashVec[j];
431 }
432 }
433 //std::sort(globalHashVec.begin(), globalHashVec.begin() + index, sort_less<IT, NTO>);
434 std::sort(globalHashVecAll[myThread].begin(), globalHashVecAll[myThread].begin() + index, sort_less<IT, NTO>);
435 IT curptr = colptrC[i];
436 for (size_t j=0; j < index; ++j)
437 {
438 tuplesC[curptr++]= std::make_tuple(globalHashVec[j].first, Bdcsc->jc[i], globalHashVec[j].second);
439 }
440 }
441 }
442
443 if(clearA)
444 delete const_cast<SpDCCols<IT, NT1> *>(&A);
445 if(clearB)
446 delete const_cast<SpDCCols<IT, NT2> *>(&B);
447
448 delete [] colptrC;
449 delete [] flopptr;
450 if(deleteAux)
451 delete [] aux;
452
453 SpTuples<IT, NTO>* spTuplesC = new SpTuples<IT, NTO> (nnzc, mdim, ndim, tuplesC, true, true);
454
455
456 // std::cout << "localspgemminfo," << flop << "," << nnzc << "," << compression_ratio << "," << t1-t0 << std::endl;
457 // std::cout << hashSelected << ", " << Bdcsc->nzc << ", " << (float)hashSelected / Bdcsc->nzc << std::endl;
458
459 return spTuplesC;
460}
461
462 // Hybrid approach of multithreaded HeapSpGEMM and HashSpGEMM
463 template <typename SR, typename NTO, typename IT, typename NT1, typename NT2>
464 SpTuples<IT, NTO> * LocalSpGEMMHash
465 (const SpDCCols<IT, NT1> & A,
466 const SpDCCols<IT, NT2> & B,
467 bool clearA, bool clearB, bool sort=true)
468 {
469
470 double t0=MPI_Wtime();
471
472 IT mdim = A.getnrow();
473 IT ndim = B.getncol();
474 IT nnzA = A.getnnz();
475 if(A.isZero() || B.isZero())
476 {
477 return new SpTuples<IT, NTO>(0, mdim, ndim);
478 }
479
480
481 Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
482 Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
483 IT nA = A.getncol();
484 float cf = static_cast<float>(nA+1) / static_cast<float>(Adcsc->nzc);
485 IT csize = static_cast<IT>(ceil(cf)); // chunk size
486 IT * aux;
487 Adcsc->ConstructAux(nA, aux);
488
489
490 int numThreads = 1;
491#ifdef THREADED
492#pragma omp parallel
493 {
494 numThreads = omp_get_num_threads();
495 }
496#endif
497
498 // std::cout << "numThreads: " << numThreads << std::endl;
499
500 IT* flopC = estimateFLOP(A, B);
501 IT* flopptr = prefixsum<IT>(flopC, Bdcsc->nzc, numThreads);
502 IT flop = flopptr[Bdcsc->nzc];
503 // std::cout << "FLOP of A * B is " << flop << std::endl;
504
505 IT* colnnzC = estimateNNZ_Hash(A, B, flopC);
506 IT* colptrC = prefixsum<IT>(colnnzC, Bdcsc->nzc, numThreads);
507 delete [] colnnzC;
508 delete [] flopC;
509 IT nnzc = colptrC[Bdcsc->nzc];
510 double compression_ratio = (double)flop / nnzc;
511
512 // std::cout << "NNZ of A * B is " << nnzc << std::endl;
513 // std::cout << "Compression ratio is " << compression_ratio << std::endl;
514
515 // std::tuple<IT,IT,NTO> * tuplesC = static_cast<std::tuple<IT,IT,NTO> *> (::operator new (sizeof(std::tuple<IT,IT,NTO>[nnzc])));
516 std::tuple<IT,IT,NTO> * tuplesC = new std::tuple<IT,IT,NTO>[nnzc];
517
518 // thread private space for heap and colinds
519 std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
520
521 for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
522 {
523 colindsVec[i].resize(nnzA/numThreads);
524 }
525
526 // IT hashSelected = 0;
527
528#ifdef THREADED
529#pragma omp parallel for
530#endif
531 for(size_t i=0; i < Bdcsc->nzc; ++i)
532 {
533 size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
534 int myThread = 0;
535
536#ifdef THREADED
537 myThread = omp_get_thread_num();
538#endif
539 if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
540 {
541 colindsVec[myThread].resize(nnzcolB);
542 }
543
544 // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
545 // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
546 Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
547 std::pair<IT,IT> * colinds = colindsVec[myThread].data();
548
549
550
551
552 // #pragma omp atomic
553 // hashSelected++;
554 const IT minHashTableSize = 16;
555 const IT hashScale = 107;
556 size_t nnzcolC = colptrC[i+1] - colptrC[i]; //nnz in the current column of C (=Output)
557
558 size_t ht_size = minHashTableSize;
559 while(ht_size < nnzcolC) //ht_size is set as 2^n
560 {
561 ht_size <<= 1;
562 }
563 std::vector< std::pair<IT,NTO>> globalHashVec(ht_size);
564
565 // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
566 // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
567
568 // Initialize hash tables
569 for(size_t j=0; j < ht_size; ++j)
570 {
571 globalHashVec[j].first = -1;
572 }
573
574 // Multiply and add on Hash table
575 for (size_t j=0; j < nnzcolB; ++j)
576 {
577 IT t_bcol = Bdcsc->ir[Bdcsc->cp[i] + j];
578 NT2 t_bval = Bdcsc->numx[Bdcsc->cp[i] + j];
579 for (IT k = colinds[j].first; k < colinds[j].second; ++k)
580 {
581 NTO mrhs = SR::multiply(Adcsc->numx[k], t_bval);
582 IT key = Adcsc->ir[k];
583 IT hash = (key*hashScale) & (ht_size-1);
584 while (1) //hash probing
585 {
586 if (globalHashVec[hash].first == key) //key is found in hash table
587 {
588 globalHashVec[hash].second = SR::add(mrhs, globalHashVec[hash].second);
589 break;
590 }
591 else if (globalHashVec[hash].first == -1) //key is not registered yet
592 {
593 globalHashVec[hash].first = key;
594 globalHashVec[hash].second = mrhs;
595 break;
596 }
597 else //key is not found
598 {
599 hash = (hash+1) & (ht_size-1);
600 }
601 }
602 }
603 }
604
605 if(sort)
606 {
607 // gather non-zero elements from hash table, and then sort them by row indices
608 size_t index = 0;
609 for (size_t j=0; j < ht_size; ++j)
610 {
611 if (globalHashVec[j].first != -1)
612 {
613 globalHashVec[index++] = globalHashVec[j];
614 }
615 }
616 std::sort(globalHashVec.begin(), globalHashVec.begin() + index, sort_less<IT, NTO>);
617
618 IT curptr = colptrC[i];
619 for (size_t j=0; j < index; ++j)
620 {
621 tuplesC[curptr++]= std::make_tuple(globalHashVec[j].first, Bdcsc->jc[i], globalHashVec[j].second);
622 }
623 }
624 else
625 {
626 IT curptr = colptrC[i];
627 for (size_t j=0; j < ht_size; ++j)
628 {
629 if (globalHashVec[j].first != -1)
630 {
631 tuplesC[curptr++]= std::make_tuple(globalHashVec[j].first, Bdcsc->jc[i], globalHashVec[j].second);
632 }
633 }
634 }
635
636
637 }
638
639 if(clearA)
640 delete const_cast<SpDCCols<IT, NT1> *>(&A);
641 if(clearB)
642 delete const_cast<SpDCCols<IT, NT2> *>(&B);
643
644 delete [] colptrC;
645 delete [] flopptr;
646 delete [] aux;
647
648 SpTuples<IT, NTO>* spTuplesC = new SpTuples<IT, NTO> (nnzc, mdim, ndim, tuplesC, true, false);
649
650 double t1=MPI_Wtime();
651
652 // std::cout << "localspgemminfo," << flop << "," << nnzc << "," << compression_ratio << "," << t1-t0 << std::endl;
653 // std::cout << hashSelected << ", " << Bdcsc->nzc << ", " << (float)hashSelected / Bdcsc->nzc << std::endl;
654
655 return spTuplesC;
656 }
657
658/*
659 * Estimates total flops necessary to multiply A and B
660 * Then returns the number
661 * */
662template <typename SR, typename IT, typename NT1, typename NT2>
664(const SpDCCols<IT, NT1> & A,
665 const SpDCCols<IT, NT2> & B,
666 bool clearA, bool clearB)
667{
668 Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
669 Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
670 int numThreads = 1;
671#ifdef THREADED
672#pragma omp parallel
673 {
674 numThreads = omp_get_num_threads();
675 }
676#endif
677 IT* flopC = estimateFLOP(A, B);
678 IT* flopptr = prefixsum<IT>(flopC, Bdcsc->nzc, numThreads);
679 IT flop = flopptr[Bdcsc->nzc];
680 delete [] flopC;
681
682 if(clearA)
683 delete const_cast<SpDCCols<IT, NT1> *>(&A);
684 if(clearB)
685 delete const_cast<SpDCCols<IT, NT2> *>(&B);
686
687 delete [] flopptr;
688 return flop;
689}
690
691// estimate space for result of SpGEMM
692template <typename IT, typename NT1, typename NT2>
693IT* estimateNNZ(const SpDCCols<IT, NT1> & A,const SpDCCols<IT, NT2> & B, IT * aux = nullptr, bool freeaux = true)
694{
695 IT nnzA = A.getnnz();
696 if(A.isZero() || B.isZero())
697 {
698 return NULL;
699 }
700
701 Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
702 Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
703
704 float cf = static_cast<float>(A.getncol()+1) / static_cast<float>(Adcsc->nzc);
705 IT csize = static_cast<IT>(ceil(cf)); // chunk size
706 if(aux == nullptr)
707 {
708 Adcsc->ConstructAux(A.getncol(), aux);
709 }
710
711
712 int numThreads = 1;
713#ifdef THREADED
714#pragma omp parallel
715 {
716 numThreads = omp_get_num_threads();
717 }
718#endif
719
720
721 IT* colnnzC = new IT[Bdcsc->nzc]; // nnz in every nonempty column of C
722
723#ifdef THREADED
724#pragma omp parallel for
725#endif
726 for(IT i=0; i< Bdcsc->nzc; ++i)
727 {
728 colnnzC[i] = 0;
729 }
730
731 // thread private space for heap and colinds
732 std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
733 std::vector<std::vector<std::pair<IT,IT>>> globalheapVec(numThreads);
734
735
736 for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
737 {
738 colindsVec[i].resize(nnzA/numThreads);
739 globalheapVec[i].resize(nnzA/numThreads);
740 }
741
742#ifdef THREADED
743#pragma omp parallel for
744#endif
745 for(int i=0; i < Bdcsc->nzc; ++i)
746 {
747 size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
748 int myThread = 0;
749#ifdef THREADED
750 myThread = omp_get_thread_num();
751#endif
752 if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
753 {
754 colindsVec[myThread].resize(nnzcolB);
755 globalheapVec[myThread].resize(nnzcolB);
756 }
757
758 // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
759 // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
760 Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
761 std::pair<IT,IT> * colinds = colindsVec[myThread].data();
762 std::pair<IT,IT> * curheap = globalheapVec[myThread].data();
763 IT hsize = 0;
764
765 // create the initial heap
766 for(IT j = 0; (unsigned)j < nnzcolB; ++j)
767 {
768 if(colinds[j].first != colinds[j].second)
769 {
770 curheap[hsize++] = std::make_pair(Adcsc->ir[colinds[j].first], j);
771 }
772 }
773 std::make_heap(curheap, curheap+hsize, std::greater<std::pair<IT,IT>>());
774
775 IT prevRow=-1; // previously popped row from heap
776
777 while(hsize > 0)
778 {
779 std::pop_heap(curheap, curheap + hsize, std::greater<std::pair<IT,IT>>()); // result is stored in wset[hsize-1]
780 IT locb = curheap[hsize-1].second;
781
782 if( curheap[hsize-1].first != prevRow)
783 {
784 prevRow = curheap[hsize-1].first;
785 colnnzC[i] ++;
786 }
787
788 if( (++(colinds[locb].first)) != colinds[locb].second) // current != end
789 {
790 curheap[hsize-1].first = Adcsc->ir[colinds[locb].first];
791 std::push_heap(curheap, curheap+hsize, std::greater<std::pair<IT,IT>>());
792 }
793 else
794 {
795 --hsize;
796 }
797 }
798 }
799
800 if (freeaux) delete [] aux;
801 return colnnzC;
802}
803
804
805// estimate space for result of SpGEMM with Hash
806template <typename IT, typename NT1, typename NT2>
807IT* estimateNNZ_Hash(const SpDCCols<IT, NT1> & A,const SpDCCols<IT, NT2> & B, IT *flopC, IT * aux=nullptr)
808{
809 IT nnzA = A.getnnz();
810 if(A.isZero() || B.isZero())
811 {
812 return NULL;
813 }
814
815 Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
816 Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
817
818 float cf = static_cast<float>(A.getncol()+1) / static_cast<float>(Adcsc->nzc);
819 IT csize = static_cast<IT>(ceil(cf)); // chunk size
820 bool deleteAux = false;
821 if(aux==nullptr)
822 {
823 deleteAux = true;
824 Adcsc->ConstructAux(A.getncol(), aux);
825 }
826
827 int numThreads = 1;
828#ifdef THREADED
829#pragma omp parallel
830 {
831 numThreads = omp_get_num_threads();
832 }
833#endif
834
835
836 IT* colnnzC = new IT[Bdcsc->nzc]; // nnz in every nonempty column of C
837
838
839
840 /*
841#ifdef THREADED
842#pragma omp parallel for
843#endif
844 for(IT i=0; i< Bdcsc->nzc; ++i)
845 {
846 colnnzC[i] = 0;
847 }
848 */
849 // thread private space for heap and colinds
850 std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
851 std::vector<std::vector< IT>> globalHashVecAll(numThreads);
852 /*
853 for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
854 {
855 colindsVec[i].resize(nnzA/numThreads);
856 }*/
857
858#ifdef THREADED
859#pragma omp parallel for
860#endif
861 for(int i=0; i < Bdcsc->nzc; ++i)
862 {
863 colnnzC[i] = 0;
864 size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
865 int myThread = 0;
866#ifdef THREADED
867 myThread = omp_get_thread_num();
868#endif
869 if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
870 {
871 colindsVec[myThread].resize(nnzcolB);
872 }
873
874 // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
875 // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
876 Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
877 std::pair<IT,IT> * colinds = colindsVec[myThread].data();
878
879 // Hash
880 const IT minHashTableSize = 16;
881 const IT hashScale = 107;
882
883 // Initialize hash tables
884 IT ht_size = minHashTableSize;
885 while(ht_size < flopC[i]) //ht_size is set as 2^n
886 {
887 ht_size <<= 1;
888 }
889
890 if(globalHashVecAll[myThread].size() < ht_size) //resize thread private vectors if needed
891 {
892 globalHashVecAll[myThread].resize(ht_size);
893 }
894
895 IT* globalHashVec = globalHashVecAll[myThread].data();
896
897 for(IT j=0; (unsigned)j < ht_size; ++j)
898 {
899 globalHashVec[j] = -1;
900 }
901
902 for (IT j=0; (unsigned)j < nnzcolB; ++j)
903 {
904 IT t_bcol = Bdcsc->ir[Bdcsc->cp[i] + j];
905 for (IT k = colinds[j].first; (unsigned)k < colinds[j].second; ++k)
906 {
907 IT key = Adcsc->ir[k];
908 IT hash = (key*hashScale) & (ht_size-1);
909 while (1) //hash probing
910 {
911 if (globalHashVec[hash] == key) //key is found in hash table
912 {
913 break;
914 }
915 else if (globalHashVec[hash] == -1) //key is not registered yet
916 {
917 globalHashVec[hash] = key;
918 colnnzC[i] ++;
919 break;
920 }
921 else //key is not found
922 {
923 hash = (hash+1) & (ht_size-1);
924 }
925 }
926 }
927 }
928 }
929
930 if(deleteAux)
931 delete [] aux;
932 return colnnzC;
933}
934
935// sampling-based nnz estimation (within SUMMA)
936template <typename IT, typename NT1, typename NT2>
939 const SpDCCols<IT, NT1> &A,
940 const SpDCCols<IT, NT2> &B,
941 int nrounds = 5
942 )
943{
944 IT nnzA = A.getnnz();
945 if (A.isZero() || B.isZero())
946 return 0;
947
948 Dcsc<IT,NT1> *Adcsc = A.GetDCSC();
949 Dcsc<IT,NT2> *Bdcsc = B.GetDCSC();
950 float lambda = 1.0f;
951 float usedmem = 0.0f;
952 IT m = A.getnrow();
953 IT p = A.getncol();
954 float *samples_init, *samples_mid, *samples_final;
955 float *colest;
956
957 // samples
958 samples_init = (float *) malloc(m * nrounds * sizeof(*samples_init));
959 samples_mid = (float *) malloc(p * nrounds * sizeof(*samples_mid));
960
961 int nthds = 1;
962 #ifdef THREADED
963 #pragma omp parallel
964 #endif
965 {
966 nthds = omp_get_num_threads();
967 }
968
969 #ifdef THREADED
970 #pragma omp parallel
971 #endif
972 {
973 std::default_random_engine gen;
974 std::exponential_distribution<float> exp_dist(lambda);
975
976 #ifdef THREADED
977 #pragma omp parallel for
978 #endif
979 for (IT i = 0; i < m * nrounds; ++i)
980 samples_init[i] = exp_dist(gen);
981 }
982
983 #ifdef THREADED
984 #pragma omp parallel for
985 #endif
986 for (IT i = 0; i < p * nrounds; ++i)
987 samples_mid[i] = std::numeric_limits<float>::max();
988
989 #ifdef THREADED
990 #pragma omp parallel for
991 #endif
992 for (IT i = 0; i < Adcsc->nzc; ++i)
993 {
994 IT col = Adcsc->jc[i];
995 IT beg_mid = col * nrounds;
996 for (IT j = Adcsc->cp[i]; j < Adcsc->cp[i + 1]; ++j)
997 {
998 IT row = Adcsc->ir[j];
999 IT beg_init = row * nrounds;
1000 for (int k = 0; k < nrounds; ++k)
1001 {
1002 if (samples_init[beg_init + k] < samples_mid[beg_mid + k])
1003 samples_mid[beg_mid + k] = samples_init[beg_init + k];
1004 }
1005 }
1006 }
1007
1008 free(samples_init);
1009
1010 samples_final = (float *) malloc(B.getnzc() * nrounds *
1011 sizeof(*samples_final));
1012 colest = (float *) malloc(B.getnzc() * sizeof(*colest));
1013
1014 float nnzest = 0.0f;
1015
1016 #ifdef THREADED
1017 #pragma omp parallel for reduction (+:nnzest)
1018 #endif
1019 for (IT i = 0; i < Bdcsc->nzc; ++i)
1020 {
1021 int tid = 0;
1022 #ifdef THREADED
1023 tid = omp_get_thread_num();
1024 #endif
1025
1026 IT beg_final = i * nrounds;
1027 for (IT k = beg_final; k < beg_final + nrounds; ++k)
1028 samples_final[k] = std::numeric_limits<float>::max();
1029
1030 for (IT j = Bdcsc->cp[i]; j < Bdcsc->cp[i + 1]; ++j)
1031 {
1032 IT row = Bdcsc->ir[j];
1033 IT beg_mid = row * nrounds;
1034 for (int k = 0; k < nrounds; ++k)
1035 {
1036 if (samples_mid[beg_mid + k] < samples_final[beg_final + k])
1037 samples_final[beg_final + k] = samples_mid[beg_mid + k];
1038 }
1039 }
1040
1041 colest[i] = 0.0f;
1042 for (IT k = beg_final; k < beg_final + nrounds; ++k)
1043 colest[i] += samples_final[k];
1044 colest[i] = static_cast<float>(nrounds - 1) / colest[i];
1045
1046 nnzest += colest[i];
1047 }
1048
1049 free(samples_mid);
1050 free(samples_final);
1051 free(colest);
1052
1053 return static_cast<int64_t>(nnzest);
1054}
1055
1056// estimate the number of floating point operations of SpGEMM
1057template <typename IT, typename NT1, typename NT2>
1058IT* estimateFLOP(const SpDCCols<IT, NT1> & A,const SpDCCols<IT, NT2> & B, IT * aux = nullptr)
1059{
1060 IT nnzA = A.getnnz();
1061 if(A.isZero() || B.isZero())
1062 {
1063 return NULL;
1064 }
1065
1066 Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
1067 Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
1068
1069 float cf = static_cast<float>(A.getncol()+1) / static_cast<float>(Adcsc->nzc);
1070 IT csize = static_cast<IT>(ceil(cf)); // chunk size
1071 //IT * aux;
1072 bool deleteAux = false;
1073 if(aux==nullptr)
1074 {
1075 deleteAux = true;
1076 Adcsc->ConstructAux(A.getncol(), aux);
1077 }
1078
1079
1080 int numThreads = 1;
1081#ifdef THREADED
1082#pragma omp parallel
1083 {
1084 numThreads = omp_get_num_threads();
1085 }
1086#endif
1087
1088
1089 IT* colflopC = new IT[Bdcsc->nzc]; // flop in every nonempty column of C
1090
1091#ifdef THREADED
1092#pragma omp parallel for
1093#endif
1094 for(IT i=0; i< Bdcsc->nzc; ++i)
1095 {
1096 colflopC[i] = 0;
1097 }
1098
1099
1100 // thread private space for heap and colinds
1101 std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
1102
1103 /*
1104 for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
1105 {
1106 colindsVec[i].resize(nnzA/numThreads);
1107 }*/
1108
1109#ifdef THREADED
1110#pragma omp parallel for
1111#endif
1112 for(int i=0; i < Bdcsc->nzc; ++i)
1113 {
1114 size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
1115 int myThread = 0;
1116#ifdef THREADED
1117 myThread = omp_get_thread_num();
1118#endif
1119 if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
1120 {
1121 colindsVec[myThread].resize(nnzcolB);
1122 }
1123
1124 // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
1125 // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
1126 Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
1127 for (IT j = 0; (unsigned)j < nnzcolB; ++j) {
1128 colflopC[i] += colindsVec[myThread][j].second - colindsVec[myThread][j].first;
1129 }
1130 }
1131 if(deleteAux)
1132 delete [] aux;
1133 return colflopC;
1134}
1135
1136
1137
1138
1142template <typename SR,
1143 typename NTO,
1144 typename IT,
1145 typename NT1,
1146 typename NT2>
1147SpTuples <IT, NTO> *
1148LocalHybridSpGEMM (const SpCCols<IT, NT1> &A,
1149 const SpCCols<IT, NT2> &B,
1150 bool clearA,
1151 bool clearB
1152 )
1153{
1154 double t0 = MPI_Wtime();
1155
1156 IT mdim = A.getnrow();
1157 IT ndim = B.getncol();
1158 IT nnzA = A.getnnz();
1159
1160 if(A.isZero() || B.isZero())
1161 return new SpTuples<IT, NTO>(0, mdim, ndim);
1162
1163 Csc<IT, NT1> *Acsc = A.GetCSC();
1164 Csc<IT, NT2> *Bcsc = B.GetCSC();
1165
1166 int numThreads = 1;
1167 #ifdef THREADED
1168 #pragma omp parallel
1169 {
1170 numThreads = omp_get_num_threads();
1171 }
1172 #endif
1173
1174 IT *flopC = estimateFLOP(A, B);
1175 IT *flopptr = prefixsum<IT>(flopC, Bcsc->n, numThreads);
1176 IT flop = flopptr[Bcsc->n];
1177
1178 IT *colnnzC = estimateNNZ_Hash(A, B, flopC);
1179 IT *colptrC = prefixsum<IT>(colnnzC, Bcsc->n, numThreads);
1180 delete [] colnnzC;
1181 delete [] flopC;
1182 IT nnzc = colptrC[Bcsc->n];
1183 double compression_ratio = (double)flop / nnzc;
1184
1185 std::tuple<IT, IT, NTO> *tuplesC = static_cast<std::tuple<IT, IT, NTO> *>
1186 (::operator new (sizeof(std::tuple<IT, IT, NTO>[nnzc])));
1187
1188 #ifdef THREADED
1189 #pragma omp parallel for
1190 #endif
1191 for (size_t i = 0; i < Bcsc->n; ++i)
1192 {
1193 size_t nnzcolB = Bcsc->jc[i + 1] - Bcsc->jc[i];
1194 double cr = static_cast<double>
1195 (flopptr[i+1] - flopptr[i]) / (colptrC[i+1] - colptrC[i]);
1196 if (cr < 2.0) // Heap Algorithm
1197 {
1198 std::vector<IT> cnt(nnzcolB);
1199 std::vector<HeapEntry<IT, NT1>> globalheapVec(nnzcolB);
1200 HeapEntry<IT, NT1> *wset = globalheapVec.data();
1201 IT hsize = 0;
1202 for (IT j = Bcsc->jc[i]; j < Bcsc->jc[i + 1]; ++j)
1203 {
1204 IT ca = Bcsc->ir[j];
1205 cnt[j - Bcsc->jc[i]] = Acsc->jc[ca];
1206 if (Acsc->jc[ca] != Acsc->jc[ca + 1]) // col not empty
1207 wset[hsize++] = HeapEntry<IT, NT1>
1208 (Acsc->ir[Acsc->jc[ca]], j, Acsc->num[Acsc->jc[ca]]);
1209 }
1210
1211 std::make_heap(wset, wset + hsize);
1212
1213 IT curptr = colptrC[i];
1214 while (hsize > 0)
1215 {
1216 std::pop_heap(wset, wset + hsize);
1217 IT locb = wset[hsize - 1].runr;
1218 NTO mrhs = SR::multiply(wset[hsize - 1].num, Bcsc->num[locb]);
1219 if (!SR::returnedSAID())
1220 {
1221 if ((curptr > colptrC[i]) &&
1222 std::get<0>(tuplesC[curptr - 1]) == wset[hsize - 1].key)
1223 std::get<2>(tuplesC[curptr - 1]) =
1224 SR::add(std::get<2>(tuplesC[curptr - 1]), mrhs);
1225 else
1226 tuplesC[curptr++] = std::make_tuple(wset[hsize - 1].key,
1227 i, mrhs) ;
1228 }
1229
1230 IT locb_offset = locb - Bcsc->jc[i];
1231 IT ca = Bcsc->ir[locb];
1232 ++(cnt[locb_offset]);
1233 if (cnt[locb_offset] != Acsc->jc[ca + 1])
1234 {
1235 wset[hsize - 1].key = Acsc->ir[cnt[locb_offset]];
1236 wset[hsize - 1].num = Acsc->num[cnt[locb_offset]];
1237 std::push_heap(wset, wset + hsize);
1238 }
1239 else
1240 --hsize;
1241 }
1242 } // Finish heap
1243 else // Hash Algorithm
1244 {
1245 // Set up hash table
1246 const IT minHashTableSize = 16;
1247 const IT hashScale = 107;
1248 size_t nnzcolC = colptrC[i+1] - colptrC[i];
1249 size_t ht_size = minHashTableSize;
1250
1251 while (ht_size < nnzcolC)
1252 ht_size <<= 1;
1253 std::vector< std::pair<IT, NTO>> T(ht_size);
1254
1255 for (size_t j = 0; j < ht_size; ++j)
1256 T[j].first = std::numeric_limits<IT>::max();
1257
1258 // multiplication
1259 for (IT j = Bcsc->jc[i]; j < Bcsc->jc[i + 1]; ++j)
1260 {
1261 IT t_bcol = Bcsc->ir[j];
1262 NT2 t_bval = Bcsc->num[j];
1263 for (IT k = Acsc->jc[t_bcol]; k < Acsc->jc[t_bcol + 1]; ++k)
1264 {
1265 NTO mrhs = SR::multiply(Acsc->num[k], t_bval);
1266 IT key = Acsc->ir[k];
1267 IT hv = (key * hashScale) & (ht_size - 1);
1268
1269 repeat:
1270 if (T[hv].first == key)
1271 T[hv].second = SR::add(mrhs, T[hv].second);
1272 else if (T[hv].first == std::numeric_limits<IT>::max())
1273 {
1274 T[hv].first = key;
1275 T[hv].second = mrhs;
1276 }
1277 else
1278 {
1279 hv = (hv + 1) & (ht_size - 1);
1280 goto repeat;
1281 }
1282 }
1283 }
1284
1285 size_t index = 0;
1286 for (size_t j = 0; j < ht_size; ++j)
1287 {
1288 if (T[j].first != std::numeric_limits<IT>::max())
1289 T[index++] = T[j];
1290 }
1291
1292 std::sort(T.begin(), T.begin() + index, sort_less<IT, NTO>);
1293
1294 IT curptr = colptrC[i];
1295 for (size_t j = 0; j < index; ++j)
1296 tuplesC[curptr++] = std::make_tuple(T[j].first, i, T[j].second);
1297 }
1298 }
1299
1300 if (clearA)
1301 delete const_cast<SpCCols<IT, NT1> *>(&A);
1302 if (clearB)
1303 delete const_cast<SpCCols<IT, NT2> *>(&B);
1304
1305 delete [] colptrC;
1306 delete [] flopptr;
1307
1308 SpTuples<IT, NTO> *spTuplesC =
1309 new SpTuples<IT, NTO> (nnzc, mdim, ndim, tuplesC, true, true);
1310
1311 double t1 = MPI_Wtime();
1312
1313 return spTuplesC;
1314}
1315
1316
1317
1318template <typename IT,
1319 typename NT1,
1320 typename NT2>
1321IT *
1322estimateFLOP (const SpCCols<IT, NT1> &A,
1323 const SpCCols<IT, NT2> &B
1324 )
1325{
1326 IT nnzA = A.getnnz();
1327 if (A.isZero() || B.isZero())
1328 return NULL;
1329
1330 Csc<IT, NT1> *Acsc = A.GetCSC();
1331 Csc<IT, NT2> *Bcsc = B.GetCSC();
1332
1333 int numThreads = 1;
1334 #ifdef THREADED
1335 #pragma omp parallel
1336 {
1337 numThreads = omp_get_num_threads();
1338 }
1339 #endif
1340
1341 IT *colflopC = new IT[Bcsc->n];
1342
1343 #ifdef THREADED
1344 #pragma omp parallel for
1345 #endif
1346 for (IT i = 0; i < Bcsc->n; ++i)
1347 colflopC[i] = 0;
1348
1349 #ifdef THREADED
1350 #pragma omp parallel for
1351 #endif
1352 for (IT i = 0; i < Bcsc->n; ++i)
1353 {
1354 for (IT j = Bcsc->jc[i]; j < Bcsc->jc[i+1]; ++j)
1355 colflopC[i] += Acsc->jc[Bcsc->ir[j]+1] - Acsc->jc[Bcsc->ir[j]];
1356 }
1357
1358 return colflopC;
1359}
1360
1361
1362
1363template <typename IT,
1364 typename NT1,
1365 typename NT2>
1366IT *
1367estimateNNZ_Hash (const SpCCols<IT, NT1> &A,
1368 const SpCCols<IT, NT2> &B,
1369 const IT *flopC
1370 )
1371{
1372 IT nnzA = A.getnnz();
1373 if (A.isZero() || B.isZero())
1374 return NULL;
1375
1376 Csc<IT, NT1> *Acsc = A.GetCSC();
1377 Csc<IT, NT2> *Bcsc = B.GetCSC();
1378
1379 int numThreads = 1;
1380 #ifdef THREADED
1381 #pragma omp parallel
1382 {
1383 numThreads = omp_get_num_threads();
1384 }
1385 #endif
1386
1387 IT *colnnzC = new IT[Bcsc->n];
1388
1389 #ifdef THREADED
1390 #pragma omp parallel for
1391 #endif
1392 for (IT i = 0; i < Bcsc->n; ++i)
1393 colnnzC[i] = 0;
1394
1395 #ifdef THREADED
1396 #pragma omp parallel for
1397 #endif
1398 for (IT i = 0; i < Bcsc->n; ++i)
1399 {
1400 // init hash table
1401 const IT minHashTableSize = 16;
1402 const IT hashScale = 107;
1403 IT ht_size = minHashTableSize;
1404 while (ht_size < flopC[i]) // make size of hash table a power of 2
1405 ht_size <<= 1;
1406
1407 // IT can be unsigned
1408 std::vector<IT> T(ht_size);
1409 for (IT j = 0; (unsigned)j < ht_size; ++j)
1410 T[j] = std::numeric_limits<IT>::max();
1411
1412 for (IT j = Bcsc->jc[i]; j < Bcsc->jc[i + 1]; ++j)
1413 {
1414 for (IT k = Acsc->jc[Bcsc->ir[j]]; k < Acsc->jc[Bcsc->ir[j]+1]; ++k)
1415 {
1416 IT key = Acsc->ir[k];
1417 IT hv = (key * hashScale) & (ht_size - 1);
1418
1419 while (1)
1420 {
1421 if (T[hv] == key)
1422 break;
1423 else if (T[hv] == std::numeric_limits<IT>::max())
1424 {
1425 T[hv] = key;
1426 ++(colnnzC[i]);
1427 break;
1428 }
1429 else
1430 hv = (hv + 1) & (ht_size - 1);
1431 }
1432 }
1433 }
1434 }
1435
1436 return colnnzC;
1437}
1438
1439
1440}
1441
1442#endif
int64_t IT
SelectMaxSRing< bool, int64_t > SR
Definition SpMMError.cpp:18
Definition test.cpp:53
int size
Definition common.h:20
long int64_t
Definition compat.h:21
SpTuples< IT, NTO > * LocalSpGEMMHash(const SpDCCols< IT, NT1 > &A, const SpDCCols< IT, NT2 > &B, bool clearA, bool clearB, bool sort=true)
Definition mtSpGEMM.h:465
IT * estimateFLOP(const SpDCCols< IT, NT1 > &A, const SpDCCols< IT, NT2 > &B, IT *aux=nullptr)
Definition mtSpGEMM.h:1058
SpTuples< IT, NTO > * LocalSpGEMM(const SpDCCols< IT, NT1 > &A, const SpDCCols< IT, NT2 > &B, bool clearA, bool clearB)
Definition mtSpGEMM.h:76
IT EstimateLocalFLOP(const SpDCCols< IT, NT1 > &A, const SpDCCols< IT, NT2 > &B, bool clearA, bool clearB)
Definition mtSpGEMM.h:664
SpTuples< IT, NTO > * LocalHybridSpGEMM(const SpDCCols< IT, NT1 > &A, const SpDCCols< IT, NT2 > &B, bool clearA, bool clearB, IT *aux=nullptr)
Definition mtSpGEMM.h:215
bool sort_less(const std::pair< IT, NT > &left, const std::pair< IT, NT > &right)
Definition mtSpGEMM.h:207
T * prefixsum(T *in, int size, int nthreads)
Definition mtSpGEMM.h:24
int64_t estimateNNZ_sampling(const SpDCCols< IT, NT1 > &A, const SpDCCols< IT, NT2 > &B, int nrounds=5)
Definition mtSpGEMM.h:938
IT * estimateNNZ(const SpDCCols< IT, NT1 > &A, const SpDCCols< IT, NT2 > &B, IT *aux=nullptr, bool freeaux=true)
Definition mtSpGEMM.h:693
IT * estimateNNZ_Hash(const SpDCCols< IT, NT1 > &A, const SpDCCols< IT, NT2 > &B, IT *flopC, IT *aux=nullptr)
Definition mtSpGEMM.h:807
unsigned int hash(unsigned int a)
Definition utils.h:79
double A