COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
kcore.cpp
Go to the documentation of this file.
1/*
2//@HEADER
3// *****************************************************************************
4//
5// HPCGraph: Graph Computation on High Performance Computing Systems
6// Copyright (2016) Sandia Corporation
7//
8// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
9// the U.S. Government retains certain rights in this software.
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17//
18// 2. Redistributions in binary form must reproduce the above copyright
19// notice, this list of conditions and the following disclaimer in the
20// documentation and/or other materials provided with the distribution.
21//
22// 3. Neither the name of the Corporation nor the names of the
23// contributors may be used to endorse or promote products derived from
24// this software without specific prior written permission.
25//
26// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
27// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
29// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
30// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
31// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
32// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
33// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
34// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
35// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
36// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37//
38// Questions? Contact George M. Slota (gmslota@sandia.gov)
39// Siva Rajamanickam (srajama@sandia.gov)
40// Kamesh Madduri (madduri@cse.psu.edu)
41//
42// *****************************************************************************
43//@HEADER
44*/
45
46#include <mpi.h>
47#include <omp.h>
48#include <stdio.h>
49#include <stdlib.h>
50#include <stdint.h>
51#include <fstream>
52
53#include "dist_graph.h"
54#include "comms.h"
55#include "util.h"
56#include "kcore.h"
57
58extern int procid, nprocs;
59extern bool verbose, debug, verify;
60
61#define KCORE_NOT_ASSIGNED 18446744073709551615
62#define MAX_ITER 10000
63
65 uint64_t* kcores, uint32_t num_iter, bool run_approx)
66{
67 if (debug) { printf("Task %d run_kcore() start\n", procid); }
68 double elt = 0.0;
69 if (verbose) {
70 MPI_Barrier(MPI_COMM_WORLD);
71 elt = omp_get_wtime();
72 }
73
74 uint64_t global_changes = g->n_local;
75 uint32_t iter = 0;
76 if (!run_approx) num_iter = MAX_ITER;
77
78 for (int32_t i = 0; i < nprocs; ++i)
79 comm->sendcounts_temp[i] = 0;
80
81#pragma omp parallel default(shared)
82{
85
86#pragma omp for
87 for (uint64_t i = 0; i < g->n_local; ++i)
88 kcores[i] = out_degree(g, i) + in_degree(g, i);
89
90#pragma omp for schedule(guided) nowait
91 for (uint64_t i = 0; i < g->n_local; ++i)
92 update_sendcounts_thread(g, &tc, i);
93
94 for (int32_t i = 0; i < nprocs; ++i)
95 {
96#pragma omp atomic
97 comm->sendcounts_temp[i] += tc.sendcounts_thread[i];
98
99 tc.sendcounts_thread[i] = 0;
100 }
101#pragma omp barrier
102
103#pragma omp single
104{
107}
108
109#pragma omp for schedule(guided) nowait
110 for (uint64_t i = 0; i < g->n_local; ++i)
111 update_vid_data_queues(g, &tc, comm, i, kcores[i]);
112
113 empty_vid_data(&tc, comm);
114#pragma omp barrier
115
116#pragma omp single
117{
118 exchange_verts(comm);
119 exchange_data(comm);
120}
121
122#pragma omp for
123 for (uint64_t i = 0; i < comm->total_recv; ++i)
124 {
125 uint64_t index = get_value(&g->map, comm->recvbuf_vert[i]);
126 kcores[index] = comm->recvbuf_data[i];
127 comm->recvbuf_vert[i] = index;
128 }
129
130#pragma omp for
131 for (uint64_t i = 0; i < comm->total_send; ++i)
132 {
133 uint64_t index = get_value(&g->map, comm->sendbuf_vert[i]);
134 comm->sendbuf_vert[i] = index;
135 }
136
137 while (global_changes && iter < num_iter)
138 {
139 if (debug && tc.tid == 0) {
140 printf("Task %d iter %lu changes %u run_kcore()\n", procid, iter, global_changes);
141 }
142
143#pragma omp barrier
144#pragma omp single
145{
146 global_changes = 0;
147 ++iter;
148}
149
150
151#pragma omp for schedule(guided) reduction(+:global_changes)
152 for (uint64_t v = 0; v < g->n_local; ++v)
153 {
154 uint64_t vert_index = v;
155 uint64_t vert_kcore = kcores[vert_index];
156 uint64_t* counts = (uint64_t*)malloc((vert_kcore+1)*sizeof(uint64_t));
157 for (uint64_t j = 0; j < (vert_kcore+1); ++j)
158 counts[j] = 0;
159
160 uint64_t out_degree = out_degree(g, vert_index);
161 uint64_t* outs = out_vertices(g, vert_index);
162 for (uint64_t j = 0; j < out_degree; ++j)
163 {
164 uint64_t out_index = outs[j];
165 uint64_t kcore_out = kcores[out_index] < vert_kcore ?
166 kcores[out_index] : vert_kcore;
167 ++counts[kcore_out];
168 }
169 uint64_t in_degree = in_degree(g, vert_index);
170 uint64_t* ins = in_vertices(g, vert_index);
171 for (uint64_t j = 0; j < in_degree; ++j)
172 {
173 uint64_t in_index = ins[j];
174 uint64_t kcore_in = kcores[in_index] < vert_kcore ?
175 kcores[in_index] : vert_kcore;
176 ++counts[kcore_in];
177 }
178
179 for (uint64_t j = vert_kcore; j > 0; --j)
180 counts[j - 1] = counts[j - 1] + counts[j];
181
182 uint64_t new_kcore = vert_kcore;
183 while (new_kcore > 2 && counts[new_kcore] < new_kcore)
184 --new_kcore;
185
186 if (new_kcore != vert_kcore)
187 ++global_changes;
188
189 assert(new_kcore <= vert_kcore);
190
191 kcores[vert_index] = new_kcore;
192 free(counts);
193 }
194
195#pragma omp for
196 for (uint64_t i = 0; i < comm->total_send; ++i)
197 comm->sendbuf_data[i] = kcores[comm->sendbuf_vert[i]];
198
199#pragma omp single
200{
201 exchange_data(comm);
202 MPI_Allreduce(MPI_IN_PLACE, &global_changes, 1,
203 MPI_UINT64_T, MPI_SUM, MPI_COMM_WORLD);
204}
205
206#pragma omp for
207 for (uint64_t i = 0; i < comm->total_recv; ++i)
208 kcores[comm->recvbuf_vert[i]] = comm->recvbuf_data[i];
209
210 } // end for loop
211
213} // end parallel
214
216
217 if (verbose) {
218 elt = omp_get_wtime() - elt;
219 printf("Task %d, run_kcore() time %9.6f (s)\n", procid, elt);
220 }
221 if (debug) { printf("Task %d run_kcore() success\n", procid); }
222
223 return 0;
224}
225
226int kcore_output(dist_graph_t* g, uint64_t* kcores, char* output_file)
227{
228 if (debug) printf("Task %d kcores to %s\n", procid, output_file);
229
230 uint64_t* global_kcores = (uint64_t*)malloc(g->n*sizeof(uint64_t));
231
232#pragma omp parallel for
233 for (uint64_t i = 0; i < g->n; ++i)
234 global_kcores[i] = KCORE_NOT_ASSIGNED;
235
236#pragma omp parallel for
237 for (uint64_t i = 0; i < g->n_local; ++i)
238 global_kcores[g->local_unmap[i]] = kcores[i];
239
240
241 if (procid == 0)
242 MPI_Reduce(MPI_IN_PLACE, global_kcores, (int32_t)g->n,
243 MPI_UINT64_T, MPI_MIN, 0, MPI_COMM_WORLD);
244 else
245 MPI_Reduce(global_kcores, global_kcores, (int32_t)g->n,
246 MPI_UINT64_T, MPI_MIN, 0, MPI_COMM_WORLD);
247
248 if (procid == 0)
249 {
250 if (debug)
251 for (uint64_t i = 0; i < g->n; ++i)
252 if (global_kcores[i] == KCORE_NOT_ASSIGNED)
253 {
254 printf("Kcores error: %lu not assigned\n", i);
255 global_kcores[i] = 0;
256 }
257
258 std::ofstream outfile;
259 outfile.open(output_file);
260
261 for (uint64_t i = 0; i < g->n; ++i)
262 outfile << global_kcores[i] << std::endl;
263
264 outfile.close();
265 }
266
267 free(global_kcores);
268
269 if (debug) printf("Task %d done writing kcores\n", procid);
270
271 return 0;
272}
273
274int kcore_verify(dist_graph_t* g, uint64_t* kcores, uint64_t num_to_output)
275{
276 if (debug) { printf("Task %d kcore_verify() start\n", procid); }
277
278 uint64_t* global_kcores = (uint64_t*)malloc(g->n*sizeof(uint64_t));
279 uint64_t* kcores_counts = (uint64_t*)malloc(g->n*sizeof(uint64_t));
280
281#pragma omp parallel for
282 for (uint64_t i = 0; i < g->n; ++i)
283 global_kcores[i] = KCORE_NOT_ASSIGNED;
284
285#pragma omp parallel for
286 for (uint64_t i = 0; i < g->n_local; ++i)
287 global_kcores[g->local_unmap[i]] = kcores[i];
288
289 if (procid == 0)
290 MPI_Reduce(MPI_IN_PLACE, global_kcores, (int32_t)g->n,
291 MPI_UINT64_T, MPI_MIN, 0, MPI_COMM_WORLD);
292 else
293 MPI_Reduce(global_kcores, global_kcores, (int32_t)g->n,
294 MPI_UINT64_T, MPI_MIN, 0, MPI_COMM_WORLD);
295
296 if (procid == 0)
297 {
298#pragma omp parallel for
299 for (uint64_t i = 0; i < g->n; ++i)
300 kcores_counts[i] = 0;
301
302 uint64_t max_k = 0;
303 uint64_t max_v = 0;
304
305 for (uint64_t i = 0; i < g->n; ++i)
306 {
307 ++kcores_counts[global_kcores[i]];
308 if (global_kcores[i] > max_k)
309 {
310 max_k = global_kcores[i];
311 max_v = i;
312 }
313 }
314
315 printf("KC MAX K: %lu, vert: %lu\n", max_k, max_v);
316
317 for (uint64_t i = 0; i < num_to_output; ++i)
318 printf("KC VERIFY: coreness: %lu, number: %lu\n", i, kcores_counts[i]);
319 }
320
321 free(global_kcores);
322 free(kcores_counts);
323
324 if (debug) { printf("Task %d kcore_verify() success\n", procid); }
325
326 return 0;
327}
328
330 uint32_t num_iter, char* output_file, bool run_approx)
331{
332 if (debug) { printf("Task %d kcore_dist() start\n", procid); }
333
334 MPI_Barrier(MPI_COMM_WORLD);
335 double elt = omp_get_wtime();
336
337 uint64_t* kcores = (uint64_t*)malloc(g->n_total*sizeof(uint64_t));
338 run_kcore(g, comm, q, kcores, num_iter, run_approx);
339
340 MPI_Barrier(MPI_COMM_WORLD);
341 elt = omp_get_wtime() - elt;
342 if (procid == 0) printf("Kcore time %9.6f (s)\n", elt);
343
344 if (output) {
345 kcore_output(g, kcores, output_file);
346 }
347
348 if (verify) {
349 kcore_verify(g, kcores, 20);
350 }
351
352 free(kcores);
353
354 if (debug) printf("Task %d kcore_dist() success\n", procid);
355 return 0;
356}
357
Mac OS X ATTR com apple quarantine q
Definition ._remapper.cpp:1
void init_thread_comm(thread_comm_t *tc)
Definition comms.cpp:169
void init_recvbuf_vid_data(mpi_data_t *comm)
Definition comms.cpp:278
bool output
Definition comms.cpp:56
void init_sendbuf_vid_data(mpi_data_t *comm)
Definition comms.cpp:251
void clear_allbuf_vid_data(mpi_data_t *comm)
Definition comms.cpp:370
void clear_thread_comm(thread_comm_t *tc)
Definition comms.cpp:199
void empty_vid_data(thread_comm_t *tc, mpi_data_t *comm)
Definition comms.h:792
void exchange_verts(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q)
Definition comms.h:184
void exchange_data(mpi_data_t *comm)
Definition comms.h:428
void update_vid_data_queues(dist_graph_t *g, thread_comm_t *tc, mpi_data_t *comm, uint64_t vert_index, uint64_t data)
Definition comms.h:628
void update_sendcounts_thread(dist_graph_t *g, thread_comm_t *tc, uint64_t vert_index)
Definition comms.h:563
#define in_degree(g, n)
Definition dist_graph.h:55
#define out_vertices(g, n)
Definition dist_graph.h:56
#define out_degree(g, n)
Definition dist_graph.h:54
#define in_vertices(g, n)
Definition dist_graph.h:57
uint64_t get_value(fast_map *map, uint64_t key)
Definition fast_map.h:132
int procid
Definition main.cpp:55
bool debug
Definition kcore.cpp:59
bool verify
Definition kcore.cpp:59
int kcore_verify(dist_graph_t *g, uint64_t *kcores, uint64_t num_to_output)
Definition kcore.cpp:274
bool verbose
Definition main.cpp:56
int kcore_dist(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q, uint32_t num_iter, char *output_file, bool run_approx)
Definition kcore.cpp:329
#define MAX_ITER
Definition kcore.cpp:62
int nprocs
Definition kcore.cpp:58
#define KCORE_NOT_ASSIGNED
Definition kcore.cpp:61
int kcore_output(dist_graph_t *g, uint64_t *kcores, char *output_file)
Definition kcore.cpp:226
int run_kcore(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q, uint64_t *kcores, uint32_t num_iter, bool run_approx)
Definition kcore.cpp:64
unsigned int uint32_t
Definition stdint.h:80
signed int int32_t
Definition stdint.h:77
unsigned __int64 uint64_t
Definition stdint.h:90
uint64_t n_total
Definition dist_graph.h:69
uint64_t * local_unmap
Definition dist_graph.h:80
uint64_t n_local
Definition dist_graph.h:66
uint64_t n
Definition dist_graph.h:61
fast_map map
Definition dist_graph.h:83
uint64_t * sendbuf_data
Definition comms.h:79
uint64_t * sendcounts_temp
Definition comms.h:73
uint64_t total_send
Definition comms.h:86
uint64_t * recvbuf_data
Definition comms.h:82
uint64_t * sendbuf_vert
Definition comms.h:78
uint64_t total_recv
Definition comms.h:85
uint64_t * recvbuf_vert
Definition comms.h:81
int32_t tid
Definition comms.h:109
uint64_t * sendcounts_thread
Definition comms.h:111