COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
OldReductions.h
Go to the documentation of this file.
1/*
2 * This file contains current obsolete functions
3 * Kept for use during future needs (if any)
4 */
5
6#ifndef _OLD_REDUCTIONS_H_
7#define _OLD_REDUCTIONS_H_
8
9// localmerged is invalidated in all processes after this redursive function
10// globalmerged is valid only in fibWorld root (0) upon exit
11template <typename SR>
12void ParallelReduce(MPI_Comm & fibWorld, tuple<int32_t,int32_t,double> * & localmerged,
13 MPI_Datatype & MPI_triple, tuple<int32_t,int32_t,double> * & globalmerged,
14 int inputnnz, int & outputnnz)
15{
16 int fprocs, frank;
17 MPI_Comm_size(fibWorld,&fprocs);
18 MPI_Comm_rank(fibWorld,&frank);
19 if(fprocs == 1)
20 {
21 globalmerged = localmerged;
22 localmerged = NULL;
23 outputnnz = inputnnz;
24 return;
25 }
26 else if(fprocs % 2 != 0)
27 {
28 SpParHelper::Print("Not even sized neighbors, can't merge\n");
29 return;
30 }
31
32 int color = frank % 2;
33 int key = frank / 2;
34 MPI_Comm halfWorld;
35 MPI_Comm_split(fibWorld, color, key, &halfWorld); // odd-even split
36
37 if(color == 0) // even numbered - received
38 {
39 MPI_Status status;
40 int hissize = 0;
41
42 MPI_Recv(&hissize, 1, MPI_INT, frank+1, 1, fibWorld, &status);
43
44 tuple<int32_t,int32_t,double> * recvdata = new tuple<int32_t,int32_t,double>[hissize];
45
46 double reduce_beg = MPI_Wtime();
47 MPI_Recv(recvdata, hissize, MPI_triple, frank+1, 1, fibWorld, &status);
48 comm_reduce += (MPI_Wtime() - reduce_beg);
49
50
51 int i=0, j=0, k = 0;
52 tuple<int32_t,int32_t,double> * mergeddata = new tuple<int32_t,int32_t,double>[inputnnz + hissize];
53
54
55 while(i < inputnnz && j < hissize)
56 {
57 // both data are in ascending order w.r.t. first columns then rows
58 if(get<1>(localmerged[i]) > get<1>(recvdata[j]))
59 {
60 mergeddata[k] = recvdata[j++];
61 }
62 else if(get<1>(localmerged[i]) < get<1>(recvdata[j]))
63 {
64 mergeddata[k] = localmerged[i++];
65 }
66 else // columns are equal
67 {
68 if(get<0>(localmerged[i]) > get<0>(recvdata[j]))
69 {
70 mergeddata[k] = recvdata[j++];
71 }
72 else if(get<0>(localmerged[i]) < get<0>(recvdata[j]))
73 {
74 mergeddata[k] = localmerged[i++];
75 }
76 else // everything equal
77 {
78 mergeddata[k] = make_tuple(get<0>(localmerged[i]), get<1>(recvdata[j]), SR::add(get<2>(recvdata[j]), get<2>(localmerged[i])));
79 ++i; ++j;
80 }
81 }
82 ++k; // in any case, one more entry added to result
83
84 }
85
86 delete [] recvdata;
87 delete [] localmerged;
88 localmerged = NULL;
89 return ParallelReduce<SR>(halfWorld, mergeddata, MPI_triple, globalmerged, k, outputnnz); // k is the new input nnz
90
91 }
92 else // odd numbered - sender (does not recurse further)
93 {
94 MPI_Send(&inputnnz, 1, MPI_INT, frank-1, 1, fibWorld);
95 MPI_Send(localmerged, inputnnz, MPI_triple, frank-1, 1, fibWorld);
96 delete [] localmerged;
97 localmerged = NULL;
98 }
99}
100
101
102
103// localmerged is invalidated in all processes after this redursive function
104// globalmerged is valid on all processes upon exit
105template <typename SR, typename IT, typename NT>
106void ParallelReduce_Alltoall(MPI_Comm & fibWorld, tuple<IT,IT,NT> * & localmerged,
107 MPI_Datatype & MPI_triple, tuple<IT,IT,NT> * & globalmerged,
108 IT inputnnz, IT & outputnnz, IT ncols)
109{
110 int fprocs;
111 MPI_Comm_size(fibWorld,&fprocs);
112 if(fprocs == 1)
113 {
114 globalmerged = localmerged;
115 localmerged = NULL;
116 outputnnz = inputnnz;
117 return;
118 }
119 int send_sizes[fprocs];
120 int recv_sizes[fprocs];
121 // this could be made more efficient, either by a binary search or by guessing then correcting
122 //MPI_Barrier(MPI_COMM_WORLD);
123 double loc_beg1 = MPI_Wtime();
124 int target = 0;
125 int cols_per_proc = (ncols + fprocs - 1) / fprocs;
126 int split_point = cols_per_proc;
127 int send_offsets[fprocs+1];
128 send_offsets[0] = 0;
129
130 for( int i = 0; i < inputnnz; i++ )
131 {
132 if( std::get<1>(localmerged[i]) >= split_point )
133 {
134 send_offsets[++target] = i;
135 split_point += cols_per_proc;
136 }
137 }
138 while(target < fprocs) send_offsets[++target] = inputnnz;
139 for(int i=0; i<fprocs; i++)
140 {
141 send_sizes[i] = send_offsets[i+1] - send_offsets[i];
142 }
143 /*
144 for( int i = 0; i < inputnnz; i++ ) {
145 if( std::get<1>(localmerged[i]) >= split_point ) {
146 if( target == 0 )
147 send_sizes[target] = i;
148 else {
149 send_sizes[target] = i-send_offsets[target];
150 }
151 send_offsets[target+1] = i;
152 target++;
153 split_point += cols_per_proc;
154 }
155 }
156 send_sizes[fprocs-1] = inputnnz - send_offsets[fprocs-1];
157 */
158 //MPI_Barrier(MPI_COMM_WORLD);
159 //comp_reduce += (MPI_Wtime() - loc_beg1);
160
161 double reduce_beg = MPI_Wtime();
162 MPI_Alltoall( send_sizes, 1, MPI_INT, recv_sizes, 1, MPI_INT,fibWorld);
163 //MPI_Barrier(MPI_COMM_WORLD);
164 comm_reduce += (MPI_Wtime() - reduce_beg);
165
166 int recv_count = 0;
167 for( int i = 0; i < fprocs; i++ )
168 recv_count += recv_sizes[i];
169 tuple<IT,IT,NT> *recvbuf = new tuple<IT,IT,NT>[recv_count];
170
171 int recv_offsets[fprocs];
172 recv_offsets[0] = 0;
173 for( int i = 1; i < fprocs; i++ ) {
174 recv_offsets[i] = recv_offsets[i-1]+recv_sizes[i-1];
175 }
176 //MPI_Barrier(MPI_COMM_WORLD);
177 reduce_beg = MPI_Wtime();
178 MPI_Alltoallv( localmerged, send_sizes, send_offsets, MPI_triple, recvbuf, recv_sizes, recv_offsets, MPI_triple, fibWorld);
179 //MPI_Barrier(MPI_COMM_WORLD);
180 comm_reduce += (MPI_Wtime() - reduce_beg);
181 loc_beg1 = MPI_Wtime();
182
183 int pos[fprocs];
184 for( int i = 0; i < fprocs; i++ )
185 pos[i] = recv_offsets[i];
186 outputnnz = 0;
187 globalmerged = new tuple<IT,IT,NT>[recv_count];
188
189 while( true ) {
190 // find the next entry
191 int nexti = -1;
192 int r = INT_MAX;
193 int c = INT_MAX;
194 for( int i = 0; i < fprocs; i++ ) {
195 if( pos[i] < recv_offsets[i]+recv_sizes[i] ) {
196 if( std::get<1>(recvbuf[pos[i]]) < c ) {
197 c = std::get<1>(recvbuf[pos[i]]);
198 r = std::get<0>(recvbuf[pos[i]]);
199 nexti = i;
200 } else if( (std::get<1>(recvbuf[pos[i]]) == c) && (std::get<0>(recvbuf[pos[i]]) < r) ) {
201 r = std::get<0>(recvbuf[pos[i]]);
202 nexti = i;
203 }
204 }
205 }
206 if( nexti == -1 ) // merge is finished
207 break;
208
209 if( outputnnz > 0 && std::get<0>(globalmerged[outputnnz-1]) == std::get<0>(recvbuf[pos[nexti]]) && std::get<1>(globalmerged[outputnnz-1]) == std::get<1>(recvbuf[pos[nexti]]) )
210 // add this one to the previous
211 std::get<2>(globalmerged[outputnnz-1]) = SR::add( std::get<2>(globalmerged[outputnnz-1]), std::get<2>(recvbuf[pos[nexti]]) );
212 else {
213 // make this the next entry in the output
214 globalmerged[outputnnz] = recvbuf[pos[nexti]];
215 outputnnz++;
216 }
217
218 pos[nexti]++; // it was a bug since it was placed before the if statement
219 }
220 //MPI_Barrier(MPI_COMM_WORLD);
221 //comp_reduce += (MPI_Wtime() - loc_beg1);
222
223 delete [] recvbuf;
224 delete [] localmerged;
225 localmerged = NULL;
226}
227
228#endif
229
230
int64_t IT
double comm_reduce
void ParallelReduce(MPI_Comm &fibWorld, tuple< int32_t, int32_t, double > *&localmerged, MPI_Datatype &MPI_triple, tuple< int32_t, int32_t, double > *&globalmerged, int inputnnz, int &outputnnz)
void ParallelReduce_Alltoall(MPI_Comm &fibWorld, tuple< IT, IT, NT > *&localmerged, MPI_Datatype &MPI_triple, tuple< IT, IT, NT > *&globalmerged, IT inputnnz, IT &outputnnz, IT ncols)