COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
wcc.cpp
Go to the documentation of this file.
1/*
2//@HEADER
3// *****************************************************************************
4//
5// HPCGraph: Graph Computation on High Performance Computing Systems
6// Copyright (2016) Sandia Corporation
7//
8// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
9// the U.S. Government retains certain rights in this software.
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17//
18// 2. Redistributions in binary form must reproduce the above copyright
19// notice, this list of conditions and the following disclaimer in the
20// documentation and/or other materials provided with the distribution.
21//
22// 3. Neither the name of the Corporation nor the names of the
23// contributors may be used to endorse or promote products derived from
24// this software without specific prior written permission.
25//
26// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
27// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
29// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
30// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
31// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
32// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
33// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
34// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
35// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
36// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37//
38// Questions? Contact George M. Slota (gmslota@sandia.gov)
39// Siva Rajamanickam (srajama@sandia.gov)
40// Kamesh Madduri (madduri@cse.psu.edu)
41//
42// *****************************************************************************
43//@HEADER
44*/
45
46
47#include <mpi.h>
48#include <omp.h>
49#include <stdio.h>
50#include <stdlib.h>
51#include <stdint.h>
52#include <fstream>
53
54#include "dist_graph.h"
55#include "comms.h"
56#include "util.h"
57
58#define WCC_NOT_VISITED 18446744073709551615
59#define WCC_VISITED 18446744073709551614
60
61extern int procid, nprocs;
62extern bool verbose, debug, verify, output;
63
64
66 uint64_t* wcc, uint64_t root)
67{
68 if (debug) { printf("procid %d wcc_bfs() start\n", procid); }
69 double elt = 0.0;
70 if (verbose) {
71 MPI_Barrier(MPI_COMM_WORLD);
72 elt = omp_get_wtime();
73 }
74
75 q->queue_size = 0;
76 q->next_size = 0;
77 q->send_size = 0;
78
79 uint64_t root_index = get_value(&g->map, root);
80 if (root_index != NULL_KEY && root_index < g->n_local)
81 {
82 q->queue[0] = root;
83 q->queue_size = 1;
84 }
85
86 uint64_t iter = 0;
87 comm->global_queue_size = 1;
88#pragma omp parallel default(shared)
89{
92
93#pragma omp for
94 for (uint64_t i = 0; i < g->n_local; ++i)
95 if (out_degree(g, i) == 0 && in_degree(g, i) == 0)
96 wcc[i] = g->local_unmap[i];
97 else
98 wcc[i] = WCC_NOT_VISITED;
99#pragma omp for
100 for (uint64_t i = g->n_local; i < g->n_total; ++i)
101 wcc[i] = WCC_NOT_VISITED;
102
103 while (comm->global_queue_size)
104 {
105 if (debug && tq.tid == 0) {
106 printf("Task: %d wcc_bfs() GQ: %lu, TQ: %lu\n",
107 procid, comm->global_queue_size, q->queue_size);
108 }
109
110#pragma omp for schedule(guided) nowait
111 for (uint64_t i = 0; i < q->queue_size; ++i)
112 {
113 uint64_t vert = q->queue[i];
114 uint64_t vert_index = get_value(&g->map, vert);
115 if (wcc[vert_index] == root)
116 continue;
117 wcc[vert_index] = root;
118
119 uint64_t out_degree = out_degree(g, vert_index);
120 uint64_t* outs = out_vertices(g, vert_index);
121 for (uint64_t j = 0; j < out_degree; ++j)
122 {
123 uint64_t out_index = outs[j];
124 if (wcc[out_index] == WCC_NOT_VISITED)
125 {
126 wcc[out_index] = WCC_VISITED;
127
128 if (out_index < g->n_local)
129 add_vid_to_queue(&tq, q, g->local_unmap[out_index]);
130 else
131 add_vid_to_send(&tq, q, out_index);
132 }
133 }
134
135 uint64_t in_degree = in_degree(g, vert_index);
136 uint64_t* ins = in_vertices(g, vert_index);
137 for (uint64_t j = 0; j < in_degree; ++j)
138 {
139 uint64_t in_index = ins[j];
140 if (wcc[in_index] == WCC_NOT_VISITED)
141 {
142 wcc[in_index] = WCC_VISITED;
143
144 if (in_index < g->n_local)
145 add_vid_to_queue(&tq, q, g->local_unmap[in_index]);
146 else
147 add_vid_to_send(&tq, q, in_index);
148 }
149 }
150 }
151
152 empty_queue(&tq, q);
153 empty_send(&tq, q);
154#pragma omp barrier
155
156#pragma omp single
157 {
158 exchange_verts(g, comm, q);
159 ++iter;
160 }
161 } // end while
162
164} // end parallel
165
166 if (verbose) {
167 elt = omp_get_wtime() - elt;
168 printf("Task %d wcc_bfs() time %9.6f (s)\n", procid, elt);
169 }
170 if (debug) { printf("Task %d wcc_bfs() success\n", procid); }
171
172 return 0;
173}
174
175
177 uint64_t* wcc)
178{
179 if (debug) { printf("Task %d wcc_color() start\n", procid); }
180 double elt = 0.0;
181 if (verbose) {
182 MPI_Barrier(MPI_COMM_WORLD);
183 elt = omp_get_wtime();
184 }
185
186 q->queue_size = 0;
187 q->next_size = 0;
188 q->send_size = 0;
189
190 for (int32_t i = 0; i < nprocs; ++i)
191 comm->sendcounts_temp[i] = 0;
192
193 uint64_t iter = 0;
194 comm->global_queue_size = 1;
195#pragma omp parallel default(shared)
196{
198 thread_comm_t tc;
200 init_thread_comm(&tc);
201
202#pragma omp for
203 for (uint64_t i = 0; i < g->n_local; ++i)
204 {
205 if (wcc[i] == WCC_NOT_VISITED)
206 {
207 wcc[i] = g->local_unmap[i];
208 add_vid_to_queue(&tq, q, i);
209 }
210 }
211
212#pragma omp for
213 for (uint64_t i = g->n_local; i < g->n_total; ++i)
214 if (wcc[i] == WCC_NOT_VISITED)
215 wcc[i] = g->ghost_unmap[i - g->n_local];
216
217 empty_queue(&tq, q);
218#pragma omp barrier
219
220#pragma omp single
221{
222 q->queue_size = q->next_size;
223 q->next_size = 0;
224
225 uint64_t* temp = q->queue;
226 q->queue = q->queue_next;
227 q->queue_next = temp;
228}
229
230 while (comm->global_queue_size)
231 {
232 if (debug && tq.tid == 0) {
233 printf("Task %d Iter %lu wcc_color() GQ: %lu, TQ: %lu\n",
234 procid, iter, comm->global_queue_size, q->queue_size);
235 }
236
237#pragma omp for schedule(guided) nowait
238 for (uint64_t i = 0; i < q->queue_size; ++i)
239 {
240 uint64_t vert_index = q->queue[i];
241 bool send = false;
242
243 uint64_t out_degree = out_degree(g, vert_index);
244 uint64_t* outs = out_vertices(g, vert_index);
245 for (uint64_t j = 0; j < out_degree; ++j)
246 {
247 uint64_t out_index = outs[j];
248 if (wcc[out_index] > wcc[vert_index])
249 {
250 wcc[vert_index] = wcc[out_index];
251 send = true;
252 }
253 }
254
255 uint64_t in_degree = in_degree(g, vert_index);
256 uint64_t* ins = in_vertices(g, vert_index);
257 for (uint64_t j = 0; j < in_degree; ++j)
258 {
259 uint64_t in_index = ins[j];
260 if (wcc[in_index] > wcc[vert_index])
261 {
262 wcc[vert_index] = wcc[in_index];
263 send = true;
264 }
265 }
266
267 if (send)
268 add_vid_to_send(&tq, q, vert_index);
269 }
270
271 empty_send(&tq, q);
272#pragma omp barrier
273
274 for (int32_t i = 0; i < nprocs; ++i)
275 tc.sendcounts_thread[i] = 0;
276
277#pragma omp for schedule(guided) nowait
278 for (uint64_t i = 0; i < q->send_size; ++i)
279 {
280 uint64_t vert_index = q->queue_send[i];
281 update_sendcounts_thread(g, &tc, vert_index);
282 }
283
284 for (int32_t i = 0; i < nprocs; ++i)
285 {
286#pragma omp atomic
287 comm->sendcounts_temp[i] += tc.sendcounts_thread[i];
288
289 tc.sendcounts_thread[i] = 0;
290 }
291#pragma omp barrier
292
293#pragma omp single
294{
296}
297
298#pragma omp for schedule(guided) nowait
299 for (uint64_t i = 0; i < q->send_size; ++i)
300 {
301 uint64_t vert_index = q->queue_send[i];
302 update_vid_data_queues(g, &tc, comm,
303 vert_index, wcc[vert_index]);
304 }
305
306 empty_vid_data(&tc, comm);
307#pragma omp barrier
308
309#pragma omp single
310{
311 exchange_vert_data(g, comm, q);
312} // end single
313
314
315#pragma omp for
316 for (uint64_t i = 0; i < comm->total_recv; ++i)
317 {
318 uint64_t index = get_value(&g->map, comm->recvbuf_vert[i]);
319 wcc[index] = comm->recvbuf_data[i];
320 }
321
322#pragma omp single
323{
325 ++iter;
326}
327 }// end while
328
331} // end parallel
332
333 if (verbose) {
334 elt = omp_get_wtime() - elt;
335 printf("Task %d, wcc_color() time %9.6f (s)\n", procid, elt);
336 }
337 if (debug) { printf("Task %d wcc_color() success\n", procid); }
338
339 return 0;
340}
341
343{
344 MPI_Barrier(MPI_COMM_WORLD);
345
346 uint64_t* counts = (uint64_t*)malloc(g->n*sizeof(uint64_t));
347 uint64_t unassigned = 0;
348
349 for (uint64_t i = 0; i < g->n; ++i)
350 counts[i] = 0;
351 for (uint64_t i = 0; i < g->n_local; ++i)
352 if (wcc[i] != WCC_NOT_VISITED)
353 ++counts[wcc[i]];
354 else
355 ++unassigned;
356
357 MPI_Allreduce(MPI_IN_PLACE, counts, (int32_t)g->n,
358 MPI_UINT64_T, MPI_SUM, MPI_COMM_WORLD);
359
360 uint64_t num_wccs = 0;
361 uint64_t max_wcc = 0;
362 for (uint64_t i = 0; i < g->n; ++i)
363 if (counts[i])
364 {
365 ++num_wccs;
366 if (counts[i] > max_wcc)
367 max_wcc = counts[i];
368 }
369
370 if (procid == 0)
371 printf("Num CCs: %lu, Max CC: %lu, Unassigned %lu\n",
372 num_wccs, max_wcc, unassigned);
373
374 free(counts);
375
376 return 0;
377}
378
379
380int wcc_output(dist_graph_t* g, uint64_t* wcc, char* output_file)
381{
382 if (verbose) printf("Task %d wcc assignments to %s\n", procid, output_file);
383
384 uint64_t* global_wcc = (uint64_t*)malloc(g->n*sizeof(uint64_t));
385
386#pragma omp parallel for
387 for (uint64_t i = 0; i < g->n; ++i)
388 global_wcc[i] = WCC_NOT_VISITED;
389
390#pragma omp parallel for
391 for (uint64_t i = 0; i < g->n_local; ++i)
392 global_wcc[g->local_unmap[i]] = wcc[i];
393
394 if (procid == 0)
395 MPI_Reduce(MPI_IN_PLACE, global_wcc, (int32_t)g->n,
396 MPI_UINT64_T, MPI_MIN, 0, MPI_COMM_WORLD);
397 else
398 MPI_Reduce(global_wcc, global_wcc, (int32_t)g->n,
399 MPI_UINT64_T, MPI_MIN, 0, MPI_COMM_WORLD);
400
401 if (procid == 0)
402 {
403 if (debug)
404 for (uint64_t i = 0; i < g->n; ++i)
405 if (global_wcc[i] == WCC_NOT_VISITED)
406 {
407 printf("WCC error: %lu not assigned\n", i);
408 global_wcc[i] = 0;
409 }
410
411 std::ofstream outfile;
412 outfile.open(output_file);
413
414 for (uint64_t i = 0; i < g->n; ++i)
415 outfile << global_wcc[i] << std::endl;
416
417 outfile.close();
418 }
419
420 free(global_wcc);
421
422 if (verbose) printf("Task %d done writing assignments\n", procid);
423
424 return 0;
425}
426
428 uint64_t root, char* output_file)
429{
430 if (debug) { printf("Task %d wcc_dist() start\n", procid); }
431
432 MPI_Barrier(MPI_COMM_WORLD);
433 double elt = omp_get_wtime();
434
435 uint64_t* wcc = (uint64_t*)malloc(g->n_total*sizeof(uint64_t));
436 wcc_bfs(g, comm, q, wcc, root);
437 wcc_color(g, comm, q, wcc);
438
439 MPI_Barrier(MPI_COMM_WORLD);
440 elt = omp_get_wtime() - elt;
441 if (procid == 0) printf("WCC time %9.6f (s)\n", elt);
442
443 if (output) {
444 wcc_output(g, wcc, output_file);
445 }
446
447 if (verify) {
448 wcc_verify(g, wcc);
449 }
450
451 free(wcc);
452
453 if (debug) printf("Task %d wcc_dist() success\n", procid);
454 return 0;
455}
456
Mac OS X ATTR com apple quarantine q
Definition ._remapper.cpp:1
void init_thread_comm(thread_comm_t *tc)
Definition comms.cpp:169
void init_thread_queue(thread_queue_t *tq)
Definition comms.cpp:136
void clear_thread_queue(thread_queue_t *tq)
Definition comms.cpp:157
void init_sendbuf_vid_data(mpi_data_t *comm)
Definition comms.cpp:251
void clear_allbuf_vid_data(mpi_data_t *comm)
Definition comms.cpp:370
void clear_thread_comm(thread_comm_t *tc)
Definition comms.cpp:199
void empty_vid_data(thread_comm_t *tc, mpi_data_t *comm)
Definition comms.h:792
void empty_queue(thread_queue_t *tq, queue_data_t *q)
Definition comms.h:730
void add_vid_to_queue(thread_queue_t *tq, queue_data_t *q, uint64_t vertex_id)
Definition comms.h:721
void exchange_verts(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q)
Definition comms.h:184
void update_vid_data_queues(dist_graph_t *g, thread_comm_t *tc, mpi_data_t *comm, uint64_t vert_index, uint64_t data)
Definition comms.h:628
void add_vid_to_send(thread_queue_t *tq, queue_data_t *q, uint64_t vertex_id)
Definition comms.h:743
void exchange_vert_data(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q)
Definition comms.h:258
void empty_send(thread_queue_t *tq, queue_data_t *q)
Definition comms.h:752
void update_sendcounts_thread(dist_graph_t *g, thread_comm_t *tc, uint64_t vert_index)
Definition comms.h:563
#define in_degree(g, n)
Definition dist_graph.h:55
#define out_vertices(g, n)
Definition dist_graph.h:56
#define out_degree(g, n)
Definition dist_graph.h:54
#define in_vertices(g, n)
Definition dist_graph.h:57
#define NULL_KEY
Definition fast_map.h:58
uint64_t get_value(fast_map *map, uint64_t key)
Definition fast_map.h:132
signed int int32_t
Definition stdint.h:77
unsigned __int64 uint64_t
Definition stdint.h:90
uint64_t n_total
Definition dist_graph.h:69
uint64_t * local_unmap
Definition dist_graph.h:80
uint64_t * ghost_unmap
Definition dist_graph.h:81
uint64_t n_local
Definition dist_graph.h:66
uint64_t n
Definition dist_graph.h:61
fast_map map
Definition dist_graph.h:83
uint64_t * sendcounts_temp
Definition comms.h:73
uint64_t * recvbuf_data
Definition comms.h:82
uint64_t total_recv
Definition comms.h:85
uint64_t global_queue_size
Definition comms.h:87
uint64_t * recvbuf_vert
Definition comms.h:81
uint64_t * sendcounts_thread
Definition comms.h:111
int32_t tid
Definition comms.h:101
int wcc_color(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q, uint64_t *wcc)
Definition wcc.cpp:176
int wcc_output(dist_graph_t *g, uint64_t *wcc, char *output_file)
Definition wcc.cpp:380
int procid
Definition main.cpp:55
bool debug
Definition wcc.cpp:62
bool verify
Definition wcc.cpp:62
int wcc_verify(dist_graph_t *g, uint64_t *wcc)
Definition wcc.cpp:342
#define WCC_NOT_VISITED
Definition wcc.cpp:58
bool verbose
Definition main.cpp:56
int wcc_bfs(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q, uint64_t *wcc, uint64_t root)
Definition wcc.cpp:65
bool output
Definition wcc.cpp:62
int nprocs
Definition wcc.cpp:61
int wcc_dist(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q, uint64_t root, char *output_file)
Definition wcc.cpp:427
#define WCC_VISITED
Definition wcc.cpp:59