COMBINATORIAL_BLAS
1.6
Loading...
Searching...
No Matches
Reductions.h
Go to the documentation of this file.
1
#ifndef _REDUCTIONS_H_
2
#define _REDUCTIONS_H_
3
4
5
#include <mpi.h>
6
#include <
sys/time.h
>
7
#include <iostream>
8
#include <iomanip>
9
#include <functional>
10
#include <algorithm>
11
#include <vector>
12
#include <string>
13
#include <sstream>
14
15
16
#include "CombBLAS/CombBLAS.h"
17
#include "
Glue.h
"
18
#include "
CCGrid.h
"
19
20
namespace
combblas
{
21
22
23
/***************************************************************************
24
* Distribute a local m/sqrt(p) x n/sqrt(p) matrix (represented by a list of tuples) across layers
25
* so that a each processor along the third dimension receives m/sqrt(p) x n/(c*sqrt(p)) submatrices.
26
* After receiving c submatrices, they are merged to create one m/sqrt(p) x n/(c*sqrt(p)) matrix.
27
* Assumption: input tuples are deleted
28
* Inputs:
29
* fibWorld: Communicator along the third dimension
30
* localmerged: input array of tuples, which will be distributed across layers
31
* Output: output array of tuples, after distributing across layers
32
and merging locally in the received processor
33
*
34
***************************************************************************/
35
36
template
<
typename
SR,
typename
IT,
typename
NT>
37
SpTuples<IT,NT>
*
ParallelReduce_Alltoall_threaded
(
MPI_Comm
&
fibWorld
,
SpTuples<IT,NT>
* &
localmerged
)
38
{
39
double
comp_begin
,
comm_begin
,
comp_time
=0,
comm_time
=0;
40
int
fprocs
,
fibrank
;
41
MPI_Comm_size
(
fibWorld
,&
fprocs
);
42
MPI_Comm_rank
(
fibWorld
,&
fibrank
);
43
IT
mdim
=
localmerged
->getnrow();
44
IT
ndim
=
localmerged
->getncol();
45
if
(
fprocs
== 1)
46
{
47
return
localmerged
;
48
}
49
50
51
// ------------ find splitters to distributed across layers -----------
52
comp_begin
=
MPI_Wtime
();
53
std::vector<int>
send_sizes
(
fprocs
);
54
std::vector<int>
recv_sizes
(
fprocs
);
55
std::vector<int>
recv_offsets
(
fprocs
);
56
std::vector<int>
send_offsets
=
findColSplitters<int>
(
localmerged
,
fprocs
);
57
for
(
int
i=0; i<
fprocs
; i++)
58
{
59
send_sizes
[i] =
send_offsets
[i+1] -
send_offsets
[i];
60
}
61
comp_time
+= (
MPI_Wtime
() -
comp_begin
);
62
63
64
// ------------ Communicate counts -----------
65
comm_begin
=
MPI_Wtime
();
66
MPI_Alltoall
(
send_sizes
.data(), 1,
MPI_INT
,
recv_sizes
.data(), 1,
MPI_INT
,
fibWorld
);
67
comm_time
+= (
MPI_Wtime
() -
comm_begin
);
68
MPI_Datatype
MPI_triple
;
69
MPI_Type_contiguous
(
sizeof
(std::tuple<IT,IT,NT>),
MPI_CHAR
, &
MPI_triple
);
70
MPI_Type_commit
(&
MPI_triple
);
71
72
73
// ------------ Allocate memory to receive data -----------
74
comp_begin
=
MPI_Wtime
();
75
int
recv_count
= 0;
76
for
(
int
i = 0; i <
fprocs
; i++ )
77
{
78
recv_count
+=
recv_sizes
[i];
79
}
80
std::tuple<IT,IT,NT> *
recvbuf
=
static_cast<
std::tuple<IT, IT, NT>*
>
(::operator
new
(
sizeof
(std::tuple<IT, IT, NT>[
recv_count
])));
81
82
recv_offsets
[0] = 0;
83
for
(
int
i = 1; i <
fprocs
; i++ )
84
{
85
recv_offsets
[i] =
recv_offsets
[i-1]+
recv_sizes
[i-1];
86
}
87
comp_time
+= (
MPI_Wtime
() -
comp_begin
);
88
89
90
// ------------ Communicate split tuples -----------
91
comm_begin
=
MPI_Wtime
();
92
MPI_Alltoallv
(
localmerged
->tuples,
send_sizes
.data(),
send_offsets
.data(),
MPI_triple
,
recvbuf
,
recv_sizes
.data(),
recv_offsets
.data(),
MPI_triple
,
fibWorld
);
// WARNING: is this big enough?
93
comm_time
+= (
MPI_Wtime
() -
comm_begin
);
94
95
96
97
// -------- update column indices of split tuples ----------
98
comp_begin
=
MPI_Wtime
();
99
IT
ndimSplit
=
ndim
/
fprocs
;
100
if
(
fibrank
==(
fprocs
-1))
101
ndimSplit
=
ndim
-
ndimSplit
*
fibrank
;
102
IT
coloffset
=
fibrank
*
ndimSplit
;
103
#pragma omp parallel for
104
for
(
int
k=0; k<
recv_count
; k++)
105
{
106
std::get<1>(
recvbuf
[k]) = std::get<1>(
recvbuf
[k]) -
coloffset
;
107
}
108
109
110
// -------- create vector of SpTuples for MultiwayMerge ----------
111
std::vector< SpTuples<IT,NT>* >
lists
;
112
for
(
int
i=0; i<
fprocs
; ++i)
113
{
114
SpTuples<IT, NT>
*
spTuples
=
new
SpTuples<IT, NT>
(
recv_sizes
[i],
mdim
,
ndimSplit
, &
recvbuf
[
recv_offsets
[i]],
true
);
// If needed pass an empty object of proper dimension
115
lists
.push_back(
spTuples
);
116
}
117
118
// -------- merge received tuples ----------
119
SpTuples<IT,NT>
*
globalmerged
=
MultiwayMerge<SR>
(
lists
,
mdim
,
ndimSplit
,
false
);
120
121
comp_time
+= (
MPI_Wtime
() -
comp_begin
);
122
comp_reduce_layer
+=
comp_time
;
123
comm_reduce
+=
comm_time
;
124
125
126
::operator
delete
(
recvbuf
);
127
delete
localmerged
;
// not sure if we can call ::operator delete here
128
129
return
globalmerged
;
130
}
131
132
133
template
<
typename
NT,
typename
IT>
134
SpDCCols<IT,NT>
*
ReduceAll_threaded
(std::vector<
SpTuples<IT,NT>
* > &
unreducedC
,
CCGrid
&
CMG
)
135
{
136
typedef
PlusTimesSRing<double, double>
PTDD
;
137
IT
mdim
=
unreducedC
[0]->getnrow();
138
IT
ndim
=
unreducedC
[0]->getncol();
139
140
// ------ merge list of tuples from n/sqrt(p) stages of SUMMA -------
141
double
loc_beg1
=
MPI_Wtime
();
142
//SpTuples<IT, NT>* localmerged = multiwayMerge(unreducedC, true);
143
SpTuples<IT, NT>
*
localmerged
=
MultiwayMerge<PTDD>
(
unreducedC
,
mdim
,
ndim
,
true
);
144
comp_reduce
+= (
MPI_Wtime
() -
loc_beg1
);
145
146
// scatter local tuples across layers
147
SpTuples<IT,NT>
*
mergedSpTuples
=
ParallelReduce_Alltoall_threaded<PTDD>
(
CMG
.fiberWorld,
localmerged
);
148
149
loc_beg1
=
MPI_Wtime
();
150
// TODO: this is not a good constructor. Change it back to SpTuple-based constructor
151
SpDCCols<IT,NT>
*
reducedC
=
new
SpDCCols<IT,NT>
(
mergedSpTuples
->getnrow(),
mergedSpTuples
->getncol(),
mergedSpTuples
->getnnz(),
mergedSpTuples
->tuples,
false
);
152
comp_result
+= (
MPI_Wtime
() -
loc_beg1
);
153
delete
mergedSpTuples
;
// too expensive
154
return
reducedC
;
155
}
156
157
}
158
159
#endif
160
161
IT
int64_t IT
Definition
BlockedSpGEMM.cpp:9
CCGrid.h
Glue.h
comm_reduce
double comm_reduce
Definition
mpipspgemm.cpp:23
comp_reduce
double comp_reduce
Definition
mpipspgemm.cpp:25
comp_result
double comp_result
Definition
mpipspgemm.cpp:26
comp_reduce_layer
double comp_reduce_layer
Definition
mpipspgemm.cpp:27
combblas::CCGrid
Definition
CCGrid.h:7
combblas::DistEdgeList
Definition
DistEdgeList.h:82
combblas
Definition
CCGrid.h:4
combblas::ParallelReduce_Alltoall_threaded
SpTuples< IT, NT > * ParallelReduce_Alltoall_threaded(MPI_Comm &fibWorld, SpTuples< IT, NT > *&localmerged)
Definition
Reductions.h:37
combblas::ReduceAll_threaded
SpDCCols< IT, NT > * ReduceAll_threaded(std::vector< SpTuples< IT, NT > * > &unreducedC, CCGrid &CMG)
Definition
Reductions.h:134
time.h
3DSpGEMM
Reductions.h
Generated by
1.9.8