COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
Reductions.h
Go to the documentation of this file.
1#ifndef _REDUCTIONS_H_
2#define _REDUCTIONS_H_
3
4
5#include <mpi.h>
6#include <sys/time.h>
7#include <iostream>
8#include <iomanip>
9#include <functional>
10#include <algorithm>
11#include <vector>
12#include <string>
13#include <sstream>
14
15
16#include "CombBLAS/CombBLAS.h"
17#include "Glue.h"
18#include "CCGrid.h"
19
20namespace combblas {
21
22
23/***************************************************************************
24 * Distribute a local m/sqrt(p) x n/sqrt(p) matrix (represented by a list of tuples) across layers
25 * so that a each processor along the third dimension receives m/sqrt(p) x n/(c*sqrt(p)) submatrices.
26 * After receiving c submatrices, they are merged to create one m/sqrt(p) x n/(c*sqrt(p)) matrix.
27 * Assumption: input tuples are deleted
28 * Inputs:
29 * fibWorld: Communicator along the third dimension
30 * localmerged: input array of tuples, which will be distributed across layers
31 * Output: output array of tuples, after distributing across layers
32 and merging locally in the received processor
33 *
34 ***************************************************************************/
35
36template <typename SR, typename IT, typename NT>
38{
40 int fprocs, fibrank;
43 IT mdim = localmerged->getnrow();
44 IT ndim = localmerged->getncol();
45 if(fprocs == 1)
46 {
47 return localmerged;
48 }
49
50
51 // ------------ find splitters to distributed across layers -----------
53 std::vector<int> send_sizes(fprocs);
54 std::vector<int> recv_sizes(fprocs);
55 std::vector<int> recv_offsets(fprocs);
57 for(int i=0; i<fprocs; i++)
58 {
60 }
62
63
64 // ------------ Communicate counts -----------
69 MPI_Type_contiguous(sizeof(std::tuple<IT,IT,NT>), MPI_CHAR, &MPI_triple);
71
72
73 // ------------ Allocate memory to receive data -----------
75 int recv_count = 0;
76 for( int i = 0; i < fprocs; i++ )
77 {
79 }
80 std::tuple<IT,IT,NT> * recvbuf = static_cast<std::tuple<IT, IT, NT>*> (::operator new (sizeof(std::tuple<IT, IT, NT>[recv_count])));
81
82 recv_offsets[0] = 0;
83 for( int i = 1; i < fprocs; i++ )
84 {
86 }
88
89
90 // ------------ Communicate split tuples -----------
92 MPI_Alltoallv( localmerged->tuples, send_sizes.data(), send_offsets.data(), MPI_triple, recvbuf, recv_sizes.data(), recv_offsets.data(), MPI_triple, fibWorld); // WARNING: is this big enough?
94
95
96
97 // -------- update column indices of split tuples ----------
100 if(fibrank==(fprocs-1))
103#pragma omp parallel for
104 for(int k=0; k<recv_count; k++)
105 {
106 std::get<1>(recvbuf[k]) = std::get<1>(recvbuf[k]) - coloffset;
107 }
108
109
110 // -------- create vector of SpTuples for MultiwayMerge ----------
111 std::vector< SpTuples<IT,NT>* > lists;
112 for(int i=0; i< fprocs; ++i)
113 {
114 SpTuples<IT, NT>* spTuples = new SpTuples<IT, NT> (recv_sizes[i], mdim, ndimSplit, &recvbuf[recv_offsets[i]], true); // If needed pass an empty object of proper dimension
115 lists.push_back(spTuples);
116 }
117
118 // -------- merge received tuples ----------
120
124
125
126 ::operator delete(recvbuf);
127 delete localmerged; // not sure if we can call ::operator delete here
128
129 return globalmerged;
130}
131
132
133template <typename NT, typename IT>
135{
137 IT mdim = unreducedC[0]->getnrow();
138 IT ndim = unreducedC[0]->getncol();
139
140 // ------ merge list of tuples from n/sqrt(p) stages of SUMMA -------
141 double loc_beg1 = MPI_Wtime();
142 //SpTuples<IT, NT>* localmerged = multiwayMerge(unreducedC, true);
145
146 // scatter local tuples across layers
148
150 // TODO: this is not a good constructor. Change it back to SpTuple-based constructor
151 SpDCCols<IT,NT> * reducedC = new SpDCCols<IT,NT>(mergedSpTuples->getnrow(), mergedSpTuples->getncol(), mergedSpTuples->getnnz(), mergedSpTuples->tuples, false);
153 delete mergedSpTuples; // too expensive
154 return reducedC;
155}
156
157}
158
159#endif
160
161
int64_t IT
double comm_reduce
double comp_reduce
double comp_result
double comp_reduce_layer
SpTuples< IT, NT > * ParallelReduce_Alltoall_threaded(MPI_Comm &fibWorld, SpTuples< IT, NT > *&localmerged)
Definition Reductions.h:37
SpDCCols< IT, NT > * ReduceAll_threaded(std::vector< SpTuples< IT, NT > * > &unreducedC, CCGrid &CMG)
Definition Reductions.h:134