COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
pagerank.cpp
Go to the documentation of this file.
1/*
2//@HEADER
3// *****************************************************************************
4//
5// HPCGraph: Graph Computation on High Performance Computing Systems
6// Copyright (2016) Sandia Corporation
7//
8// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
9// the U.S. Government retains certain rights in this software.
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17//
18// 2. Redistributions in binary form must reproduce the above copyright
19// notice, this list of conditions and the following disclaimer in the
20// documentation and/or other materials provided with the distribution.
21//
22// 3. Neither the name of the Corporation nor the names of the
23// contributors may be used to endorse or promote products derived from
24// this software without specific prior written permission.
25//
26// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
27// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
29// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
30// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
31// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
32// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
33// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
34// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
35// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
36// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37//
38// Questions? Contact George M. Slota (gmslota@sandia.gov)
39// Siva Rajamanickam (srajama@sandia.gov)
40// Kamesh Madduri (madduri@cse.psu.edu)
41//
42// *****************************************************************************
43//@HEADER
44*/
45
46
47#include <mpi.h>
48#include <omp.h>
49#include <stdio.h>
50#include <stdlib.h>
51#include <stdint.h>
52#include <fstream>
53
54#include "dist_graph.h"
55#include "comms.h"
56#include "util.h"
57#include "pagerank.h"
58
59#define DAMPING_FACTOR 0.85
60
61extern int procid, nprocs;
62extern bool verbose, debug, verify;
63
64
66 double*& pageranks, uint32_t num_iter)
67{
68 if (debug) { printf("Task %d run_pagerank() start\n", procid); }
69 double elt = 0.0;
70 if (verbose) {
71 MPI_Barrier(MPI_COMM_WORLD);
72 elt = omp_get_wtime();
73 }
74
75 double* pageranks_next = (double*)malloc(g->n_total*sizeof(double));
76 double sum_noouts = 0.0;
77 double sum_noouts_next = 0.0;
78
79 for (int32_t i = 0; i < nprocs; ++i)
80 comm->sendcounts_temp[i] = 0;
81
82 comm->global_queue_size = 1;
83#pragma omp parallel default(shared)
84{
87
88#pragma omp for reduction(+:sum_noouts_next)
89 for (uint64_t i = 0; i < g->n_local; ++i)
90 {
91 pageranks[i] = (1.0 / (double)g->n);
93 if (out_degree > 0)
94 pageranks[i] /= (double)out_degree;
95 else
96 {
97 pageranks[i] /= (double)g->n;
98 sum_noouts_next += pageranks[i];
99 }
100 }
101#pragma omp for
102 for (uint64_t i = g->n_local; i < g->n_total; ++i)
103 pageranks[i] = (1.0 / (double)g->n) / (double)g->n;
104#pragma omp for
105 for (uint64_t i = 0; i < g->n_total; ++i)
106 pageranks_next[i] = pageranks[i];
107
108#pragma omp for schedule(guided) nowait
109 for (uint64_t i = 0; i < g->n_local; ++i)
111
112 for (int32_t i = 0; i < nprocs; ++i)
113 {
114#pragma omp atomic
115 comm->sendcounts_temp[i] += tc.sendcounts_thread[i];
116
117 tc.sendcounts_thread[i] = 0;
118 }
119#pragma omp barrier
120
121#pragma omp single
122{
125}
126
127#pragma omp for schedule(guided) nowait
128 for (uint64_t i = 0; i < g->n_local; ++i)
129 update_vid_data_queues_out(g, &tc, comm, i, pageranks[i]);
130
131 empty_vid_data_flt(&tc, comm);
132#pragma omp barrier
133
134#pragma omp single
135{
136 exchange_verts(comm);
137 exchange_data_flt(comm);
138
139 sum_noouts = 0.0;
140 MPI_Allreduce(&sum_noouts_next, &sum_noouts, 1,
141 MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);
142 sum_noouts_next = 0.0;
143}
144
145#pragma omp for
146 for (uint64_t i = 0; i < comm->total_recv; ++i)
147 {
148 uint64_t index = get_value(&g->map, comm->recvbuf_vert[i]);
149 pageranks[index] = comm->recvbuf_data_flt[i];
150 comm->recvbuf_vert[i] = index;
151 }
152
153#pragma omp for
154 for (uint64_t i = 0; i < comm->total_send; ++i)
155 {
156 uint64_t index = get_value(&g->map, comm->sendbuf_vert[i]);
157 comm->sendbuf_vert[i] = index;
158 }
159
160 for (uint32_t iter = 0; iter < num_iter; ++iter)
161 {
162 if (debug && tc.tid == 0) {
163 printf("Task %d Iter %u run_pagerank() sink contribution sum %e\n", procid, iter, sum_noouts);
164 }
165
166#pragma omp for schedule(guided) reduction(+:sum_noouts_next) nowait
167 for (uint64_t i = 0; i < g->n_local; ++i)
168 {
169 uint64_t vert_index = i;
170 double vert_pagerank = sum_noouts;
171
172 uint64_t in_degree = in_degree(g, vert_index);
173 uint64_t* ins = in_vertices(g, vert_index);
174 for (uint64_t j = 0; j < in_degree; ++j)
175 vert_pagerank += pageranks[ins[j]];
176
177 vert_pagerank *= DAMPING_FACTOR;
178 vert_pagerank += ((1.0 - DAMPING_FACTOR) / (double)g->n);
179
180 uint64_t out_degree = out_degree(g, vert_index);
181 if (out_degree > 0)
182 vert_pagerank /= (double)out_degree;
183 else
184 {
185 vert_pagerank /= (double)g->n;
186 sum_noouts_next += vert_pagerank;
187 }
188
189 pageranks_next[vert_index] = vert_pagerank;
190 }
191
192#pragma omp for
193 for (uint64_t i = 0; i < comm->total_send; ++i)
194 comm->sendbuf_data_flt[i] = pageranks_next[comm->sendbuf_vert[i]];
195
196#pragma omp single
197{
198 exchange_data_flt(comm);
199 sum_noouts = 0.0;
200 MPI_Allreduce(&sum_noouts_next, &sum_noouts, 1,
201 MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);
202 sum_noouts_next = 0.0;
203}
204
205#pragma omp for
206 for (uint64_t i = 0; i < comm->total_recv; ++i)
207 pageranks_next[comm->recvbuf_vert[i]] = comm->recvbuf_data_flt[i];
208
209#pragma omp single
210{
211 double* temp = pageranks;
212 pageranks = pageranks_next;
213 pageranks_next = temp;
214}
215 } // end for loop
216
218} // end parallel
219
221 free(pageranks_next);
222
223 if (verbose) {
224 elt = omp_get_wtime() - elt;
225 printf("Task %d, run_pagerank() time %9.6f (s)\n", procid, elt);
226 }
227 if (debug) { printf("Task %d run_pagerank() success\n", procid); }
228
229 return 0;
230}
231
232
233int pagerank_output(dist_graph_t* g, double* pageranks, char* output_file)
234{
235 if (debug) printf("Task %d pageranks to %s\n", procid, output_file);
236
237 double* global_pageranks = (double*)malloc(g->n*sizeof(double));
238
239#pragma omp parallel for
240 for (uint64_t i = 0; i < g->n; ++i)
241 global_pageranks[i] = -1.0;
242
243#pragma omp parallel for
244 for (uint64_t i = 0; i < g->n_local; ++i)
245 {
247 assert(g->local_unmap[i] < g->n);
248 if (out_degree > 0)
249 global_pageranks[g->local_unmap[i]] = pageranks[i] * (double)out_degree;
250 else
251 global_pageranks[g->local_unmap[i]] = pageranks[i] * (double)g->n;
252 }
253
254 if (procid == 0)
255 MPI_Reduce(MPI_IN_PLACE, global_pageranks, (int32_t)g->n,
256 MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
257 else
258 MPI_Reduce(global_pageranks, global_pageranks, (int32_t)g->n,
259 MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
260
261 if (procid == 0)
262 {
263 if (debug)
264 for (uint64_t i = 0; i < g->n; ++i)
265 if (global_pageranks[i] == -1.0)
266 {
267 printf("Pagerank error: %lu not assigned\n", i);
268 global_pageranks[i] = 0.0;
269 }
270
271 std::ofstream outfile;
272 outfile.open(output_file);
273
274 for (uint64_t i = 0; i < g->n; ++i)
275 outfile << global_pageranks[i] << std::endl;
276
277 outfile.close();
278 }
279
280 free(global_pageranks);
281
282 if (debug) printf("Task %d done writing pageranks\n", procid);
283
284 return 0;
285}
286
287
288int pagerank_verify(dist_graph_t* g, double* pageranks)
289{
290 if (debug) { printf("Task %d pagerank_verify() start\n", procid); }
291
292 double* global_pageranks = (double*)malloc(g->n*sizeof(double));
293
294#pragma omp parallel for
295 for (uint64_t i = 0; i < g->n; ++i)
296 global_pageranks[i] = -1.0;
297
298#pragma omp parallel for
299 for (uint64_t i = 0; i < g->n_local; ++i)
300 {
302 if (out_degree > 0)
303 global_pageranks[g->local_unmap[i]] = pageranks[i] * (double)out_degree;
304 else
305 global_pageranks[g->local_unmap[i]] = pageranks[i] * (double)g->n;
306 }
307
308 if (procid == 0)
309 MPI_Reduce(MPI_IN_PLACE, global_pageranks, (int32_t)g->n,
310 MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
311 else
312 MPI_Reduce(global_pageranks, global_pageranks, (int32_t)g->n,
313 MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
314
315 if (procid == 0)
316 {
317 double pr_sum = 0.0;
318 for (uint64_t i = 0; i < g->n; ++i)
319 pr_sum += global_pageranks[i];
320
321 printf("PageRanks sum (should be 1.0): %9.6lf\n", pr_sum);
322 }
323
324 free(global_pageranks);
325
326 if (debug) { printf("Task %d pagerank_verify() success\n", procid); }
327
328 return 0;
329}
330
332 uint32_t num_iter, char* output_file)
333{
334 if (debug) { printf("Task %d pagerank_dist() start\n", procid); }
335
336 MPI_Barrier(MPI_COMM_WORLD);
337 double elt = omp_get_wtime();
338
339 double* pageranks = (double*)malloc(g->n_total*sizeof(double));
340 run_pagerank(g, comm, pageranks, num_iter);
341
342 MPI_Barrier(MPI_COMM_WORLD);
343 elt = omp_get_wtime() - elt;
344 if (procid == 0) printf("PageRank time %9.6f (s)\n", elt);
345
346 if (output) {
347 pagerank_output(g, pageranks, output_file);
348 }
349
350 if (verify) {
351 pagerank_verify(g, pageranks);
352 }
353
354 free(pageranks);
355
356 if (debug) printf("Task %d pagerank_dist() success\n", procid);
357 return 0;
358}
359
void init_sendbuf_vid_data_flt(mpi_data_t *comm)
Definition comms.cpp:305
void init_thread_comm_flt(thread_comm_t *tc)
Definition comms.cpp:209
void init_recvbuf_vid_data_flt(mpi_data_t *comm)
Definition comms.cpp:332
bool output
Definition comms.cpp:56
void clear_allbuf_vid_data(mpi_data_t *comm)
Definition comms.cpp:370
void clear_thread_comm(thread_comm_t *tc)
Definition comms.cpp:199
void update_vid_data_queues_out(dist_graph_t *g, thread_comm_t *tc, mpi_data_t *comm, uint64_t vert_index, uint64_t data)
Definition comms.h:695
void exchange_data_flt(mpi_data_t *comm)
Definition comms.h:495
void empty_vid_data_flt(thread_comm_t *tc, mpi_data_t *comm)
Definition comms.h:820
void exchange_verts(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q)
Definition comms.h:184
void update_sendcounts_thread_out(dist_graph_t *g, thread_comm_t *tc, uint64_t vert_index)
Definition comms.h:602
#define in_degree(g, n)
Definition dist_graph.h:55
#define out_degree(g, n)
Definition dist_graph.h:54
#define in_vertices(g, n)
Definition dist_graph.h:57
uint64_t get_value(fast_map *map, uint64_t key)
Definition fast_map.h:132
int pagerank_dist(dist_graph_t *g, mpi_data_t *comm, uint32_t num_iter, char *output_file)
Definition pagerank.cpp:331
int pagerank_verify(dist_graph_t *g, double *pageranks)
Definition pagerank.cpp:288
int procid
Definition main.cpp:55
bool debug
Definition pagerank.cpp:62
bool verify
Definition pagerank.cpp:62
bool verbose
Definition main.cpp:56
int pagerank_output(dist_graph_t *g, double *pageranks, char *output_file)
Definition pagerank.cpp:233
int nprocs
Definition pagerank.cpp:61
int run_pagerank(dist_graph_t *g, mpi_data_t *comm, double *&pageranks, uint32_t num_iter)
Definition pagerank.cpp:65
#define DAMPING_FACTOR
Definition pagerank.cpp:59
unsigned int uint32_t
Definition stdint.h:80
signed int int32_t
Definition stdint.h:77
unsigned __int64 uint64_t
Definition stdint.h:90
uint64_t n_total
Definition dist_graph.h:69
uint64_t * local_unmap
Definition dist_graph.h:80
uint64_t n_local
Definition dist_graph.h:66
uint64_t n
Definition dist_graph.h:61
fast_map map
Definition dist_graph.h:83
uint64_t * sendcounts_temp
Definition comms.h:73
uint64_t total_send
Definition comms.h:86
double * sendbuf_data_flt
Definition comms.h:80
uint64_t * sendbuf_vert
Definition comms.h:78
uint64_t total_recv
Definition comms.h:85
uint64_t global_queue_size
Definition comms.h:87
double * recvbuf_data_flt
Definition comms.h:83
uint64_t * recvbuf_vert
Definition comms.h:81
int32_t tid
Definition comms.h:109
uint64_t * sendcounts_thread
Definition comms.h:111