61#define KCORE_NOT_ASSIGNED 18446744073709551615
67 if (
debug) { printf(
"Task %d run_kcore() start\n",
procid); }
70 MPI_Barrier(MPI_COMM_WORLD);
71 elt = omp_get_wtime();
76 if (!run_approx) num_iter =
MAX_ITER;
81#pragma omp parallel default(shared)
90#pragma omp for schedule(guided) nowait
109#pragma omp for schedule(guided) nowait
137 while (global_changes && iter < num_iter)
140 printf(
"Task %d iter %lu changes %u run_kcore()\n",
procid, iter, global_changes);
151#pragma omp for schedule(guided) reduction(+:global_changes)
155 uint64_t vert_kcore = kcores[vert_index];
157 for (
uint64_t j = 0; j < (vert_kcore+1); ++j)
165 uint64_t kcore_out = kcores[out_index] < vert_kcore ?
166 kcores[out_index] : vert_kcore;
174 uint64_t kcore_in = kcores[in_index] < vert_kcore ?
175 kcores[in_index] : vert_kcore;
179 for (
uint64_t j = vert_kcore; j > 0; --j)
180 counts[j - 1] = counts[j - 1] + counts[j];
183 while (new_kcore > 2 && counts[new_kcore] < new_kcore)
186 if (new_kcore != vert_kcore)
189 assert(new_kcore <= vert_kcore);
191 kcores[vert_index] = new_kcore;
202 MPI_Allreduce(MPI_IN_PLACE, &global_changes, 1,
203 MPI_UINT64_T, MPI_SUM, MPI_COMM_WORLD);
218 elt = omp_get_wtime() - elt;
219 printf(
"Task %d, run_kcore() time %9.6f (s)\n",
procid, elt);
221 if (
debug) { printf(
"Task %d run_kcore() success\n",
procid); }
228 if (
debug) printf(
"Task %d kcores to %s\n",
procid, output_file);
232#pragma omp parallel for
236#pragma omp parallel for
242 MPI_Reduce(MPI_IN_PLACE, global_kcores, (
int32_t)g->
n,
243 MPI_UINT64_T, MPI_MIN, 0, MPI_COMM_WORLD);
245 MPI_Reduce(global_kcores, global_kcores, (
int32_t)g->
n,
246 MPI_UINT64_T, MPI_MIN, 0, MPI_COMM_WORLD);
254 printf(
"Kcores error: %lu not assigned\n", i);
255 global_kcores[i] = 0;
258 std::ofstream outfile;
259 outfile.open(output_file);
262 outfile << global_kcores[i] << std::endl;
269 if (
debug) printf(
"Task %d done writing kcores\n",
procid);
276 if (
debug) { printf(
"Task %d kcore_verify() start\n",
procid); }
281#pragma omp parallel for
285#pragma omp parallel for
290 MPI_Reduce(MPI_IN_PLACE, global_kcores, (
int32_t)g->
n,
291 MPI_UINT64_T, MPI_MIN, 0, MPI_COMM_WORLD);
293 MPI_Reduce(global_kcores, global_kcores, (
int32_t)g->
n,
294 MPI_UINT64_T, MPI_MIN, 0, MPI_COMM_WORLD);
298#pragma omp parallel for
300 kcores_counts[i] = 0;
307 ++kcores_counts[global_kcores[i]];
308 if (global_kcores[i] > max_k)
310 max_k = global_kcores[i];
315 printf(
"KC MAX K: %lu, vert: %lu\n", max_k, max_v);
317 for (
uint64_t i = 0; i < num_to_output; ++i)
318 printf(
"KC VERIFY: coreness: %lu, number: %lu\n", i, kcores_counts[i]);
324 if (
debug) { printf(
"Task %d kcore_verify() success\n",
procid); }
330 uint32_t num_iter,
char* output_file,
bool run_approx)
332 if (
debug) { printf(
"Task %d kcore_dist() start\n",
procid); }
334 MPI_Barrier(MPI_COMM_WORLD);
335 double elt = omp_get_wtime();
338 run_kcore(g, comm,
q, kcores, num_iter, run_approx);
340 MPI_Barrier(MPI_COMM_WORLD);
341 elt = omp_get_wtime() - elt;
342 if (
procid == 0) printf(
"Kcore time %9.6f (s)\n", elt);
354 if (
debug) printf(
"Task %d kcore_dist() success\n",
procid);
Mac OS X ATTR com apple quarantine q
void init_thread_comm(thread_comm_t *tc)
void init_recvbuf_vid_data(mpi_data_t *comm)
void init_sendbuf_vid_data(mpi_data_t *comm)
void clear_allbuf_vid_data(mpi_data_t *comm)
void clear_thread_comm(thread_comm_t *tc)
void empty_vid_data(thread_comm_t *tc, mpi_data_t *comm)
void exchange_verts(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q)
void exchange_data(mpi_data_t *comm)
void update_vid_data_queues(dist_graph_t *g, thread_comm_t *tc, mpi_data_t *comm, uint64_t vert_index, uint64_t data)
void update_sendcounts_thread(dist_graph_t *g, thread_comm_t *tc, uint64_t vert_index)
#define out_vertices(g, n)
#define in_vertices(g, n)
uint64_t get_value(fast_map *map, uint64_t key)
int kcore_verify(dist_graph_t *g, uint64_t *kcores, uint64_t num_to_output)
int kcore_dist(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q, uint32_t num_iter, char *output_file, bool run_approx)
#define KCORE_NOT_ASSIGNED
int kcore_output(dist_graph_t *g, uint64_t *kcores, char *output_file)
int run_kcore(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q, uint64_t *kcores, uint32_t num_iter, bool run_approx)
unsigned __int64 uint64_t
uint64_t * sendcounts_temp
uint64_t * sendcounts_thread