COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
comms.h
Go to the documentation of this file.
1/*
2//@HEADER
3// *****************************************************************************
4//
5// HPCGraph: Graph Computation on High Performance Computing Systems
6// Copyright (2016) Sandia Corporation
7//
8// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
9// the U.S. Government retains certain rights in this software.
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17//
18// 2. Redistributions in binary form must reproduce the above copyright
19// notice, this list of conditions and the following disclaimer in the
20// documentation and/or other materials provided with the distribution.
21//
22// 3. Neither the name of the Corporation nor the names of the
23// contributors may be used to endorse or promote products derived from
24// this software without specific prior written permission.
25//
26// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
27// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
29// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
30// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
31// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
32// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
33// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
34// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
35// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
36// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37//
38// Questions? Contact George M. Slota (gmslota@sandia.gov)
39// Siva Rajamanickam (srajama@sandia.gov)
40// Kamesh Madduri (madduri@cse.psu.edu)
41//
42// *****************************************************************************
43//@HEADER
44*/
45
46#ifndef _COMMS_H_
47#define _COMMS_H_
48
49#include <mpi.h>
50#include <omp.h>
51#include <stdio.h>
52#include <stdlib.h>
53#include <stdint.h>
54#include <assert.h>
55
56#include "dist_graph.h"
57#include "util.h"
58
59extern int procid, nprocs;
60extern bool verbose, debug, verify, output;
61
62#define MAX_SEND_SIZE 2147483648
63#define THREAD_QUEUE_SIZE 1024
64
89
99
107
119
122void init_comm_data(mpi_data_t* comm);
123void clear_comm_data(mpi_data_t* comm);
124
131
138
139inline void exchange_verts(dist_graph_t* g, mpi_data_t* comm, queue_data_t* q);
140inline void exchange_verts(mpi_data_t* comm);
141inline void exchange_data(mpi_data_t* comm);
142inline void exchange_data_flt(mpi_data_t* comm);
143inline void exchange_vert_data(dist_graph_t* g, mpi_data_t* comm,
144 queue_data_t* q);
146
147
149 thread_comm_t* tc, uint64_t vert_index);
151 thread_comm_t* tc, uint64_t vert_index);
152
154 thread_comm_t* tc, mpi_data_t* comm,
155 uint64_t vert_index, uint64_t data);
157 thread_comm_t* tc, mpi_data_t* comm,
158 uint64_t vert_index, uint64_t data);
160 thread_comm_t* tc, mpi_data_t* comm,
161 uint64_t vert_index, double data);
162
163
165 uint64_t vertex_id);
166inline void empty_queue(thread_queue_t* tq, queue_data_t* q);
167
168
170 uint64_t vertex_id);
171inline void empty_send(thread_queue_t* tq, queue_data_t* q);
172
173
174inline void add_vid_data_to_send(thread_comm_t* tc, mpi_data_t* comm,
175 uint64_t vertex_id, uint64_t data_val, int32_t send_rank);
177 uint64_t vertex_id, double data_val, int32_t send_rank);
178
179inline void empty_vid_data(thread_comm_t* tc, mpi_data_t* comm);
180inline void empty_vid_data_flt(thread_comm_t* tc, mpi_data_t* comm);
181
182
183
185{
186 comm->global_queue_size = 0;
187 uint64_t task_queue_size = q->next_size + q->send_size;
188 MPI_Allreduce(&task_queue_size, &comm->global_queue_size, 1,
189 MPI_UINT64_T, MPI_SUM, MPI_COMM_WORLD);
190
191 uint64_t num_comms = comm->global_queue_size / (uint64_t)MAX_SEND_SIZE + 1;
192 uint64_t sum_recv = 0;
193 for (uint64_t c = 0; c < num_comms; ++c)
194 {
195 uint64_t send_begin = (q->send_size * c) / num_comms;
196 uint64_t send_end = (q->send_size * (c + 1)) / num_comms;
197 if (c == (num_comms-1))
198 send_end = q->send_size;
199
200 for (int32_t i = 0; i < nprocs; ++i)
201 {
202 comm->sendcounts[i] = 0;
203 comm->recvcounts[i] = 0;
204 }
205 for (uint64_t i = send_begin; i < send_end; ++i)
206 {
207 uint64_t ghost_index = q->queue_send[i] - g->n_local;
208 uint64_t ghost_task = g->ghost_tasks[ghost_index];
209 ++comm->sendcounts[ghost_task];
210 }
211
212 MPI_Alltoall(comm->sendcounts, 1, MPI_INT32_T,
213 comm->recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
214
215 comm->sdispls[0] = 0;
216 comm->sdispls_cpy[0] = 0;
217 comm->rdispls[0] = 0;
218 for (int32_t i = 1; i < nprocs; ++i)
219 {
220 comm->sdispls[i] = comm->sdispls[i-1] + comm->sendcounts[i-1];
221 comm->rdispls[i] = comm->rdispls[i-1] + comm->recvcounts[i-1];
222 comm->sdispls_cpy[i] = comm->sdispls[i];
223 }
224
225 int32_t cur_send = comm->sdispls[nprocs-1] + comm->sendcounts[nprocs-1];
226 int32_t cur_recv = comm->rdispls[nprocs-1] + comm->recvcounts[nprocs-1];
227 comm->sendbuf_vert = (uint64_t*)malloc((uint64_t)(cur_send+1)*sizeof(uint64_t));
228 if (comm->sendbuf_vert == NULL)
229 throw_err("exchange_verts(), unable to allocate comm buffers", procid);
230
231 for (uint64_t i = send_begin; i < send_end; ++i)
232 {
233 uint64_t ghost_index = q->queue_send[i] - g->n_local;
234 uint64_t ghost_task = g->ghost_tasks[ghost_index];
235 uint64_t vert = g->ghost_unmap[ghost_index];
236 comm->sendbuf_vert[comm->sdispls_cpy[ghost_task]++] = vert;
237 }
238
239 MPI_Alltoallv(comm->sendbuf_vert,
240 comm->sendcounts, comm->sdispls, MPI_UINT64_T,
241 q->queue_next+q->next_size+sum_recv,
242 comm->recvcounts, comm->rdispls, MPI_UINT64_T,
243 MPI_COMM_WORLD);
244 free(comm->sendbuf_vert);
245 sum_recv += cur_recv;
246 }
247
248 q->queue_size = q->next_size + sum_recv;
249 q->next_size = 0;
250 q->send_size = 0;
251 uint64_t* temp = q->queue;
252 q->queue = q->queue_next;
253 q->queue_next = temp;
254}
255
256
257
260{
261 for (int32_t i = 0; i < nprocs; ++i)
262 comm->recvcounts_temp[i] = 0;
263
264 MPI_Alltoall(comm->sendcounts_temp, 1, MPI_UINT64_T,
265 comm->recvcounts_temp, 1, MPI_UINT64_T, MPI_COMM_WORLD);
266
267 comm->total_recv = 0;
268 for (int i = 0; i < nprocs; ++i)
269 comm->total_recv += comm->recvcounts_temp[i];
270
271 comm->recvbuf_vert = (uint64_t*)malloc(comm->total_recv*sizeof(uint64_t));
272 comm->recvbuf_data = (uint64_t*)malloc(comm->total_recv*sizeof(uint64_t));
273 comm->recvbuf_data_flt = NULL;
274 if (comm->recvbuf_vert == NULL || comm->recvbuf_data == NULL)
275 throw_err("exchange_vert_data() unable to allocate comm buffers", procid);
276
277
278 comm->global_queue_size = 0;
279 uint64_t task_queue_size = comm->total_send;
280 MPI_Allreduce(&task_queue_size, &comm->global_queue_size, 1,
281 MPI_UINT64_T, MPI_SUM, MPI_COMM_WORLD);
282
283 uint64_t num_comms = comm->global_queue_size / (uint64_t)MAX_SEND_SIZE + 1;
284 uint64_t sum_recv = 0;
285 uint64_t sum_send = 0;
286 for (uint64_t c = 0; c < num_comms; ++c)
287 {
288 for (int32_t i = 0; i < nprocs; ++i)
289 {
290 uint64_t send_begin = (comm->sendcounts_temp[i] * c) / num_comms;
291 uint64_t send_end = (comm->sendcounts_temp[i] * (c + 1)) / num_comms;
292 if (c == (num_comms-1))
293 send_end = comm->sendcounts_temp[i];
294 comm->sendcounts[i] = (int32_t)(send_end - send_begin);
295 assert(comm->sendcounts[i] >= 0);
296 }
297
298 MPI_Alltoall(comm->sendcounts, 1, MPI_INT32_T,
299 comm->recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
300
301 comm->sdispls[0] = 0;
302 comm->sdispls_cpy[0] = 0;
303 comm->rdispls[0] = 0;
304 for (int32_t i = 1; i < nprocs; ++i)
305 {
306 comm->sdispls[i] = comm->sdispls[i-1] + comm->sendcounts[i-1];
307 comm->rdispls[i] = comm->rdispls[i-1] + comm->recvcounts[i-1];
308 comm->sdispls_cpy[i] = comm->sdispls[i];
309 }
310
311 int32_t cur_send = comm->sdispls[nprocs-1] + comm->sendcounts[nprocs-1];
312 int32_t cur_recv = comm->rdispls[nprocs-1] + comm->recvcounts[nprocs-1];
313 uint64_t* buf_v = (uint64_t*)malloc((uint64_t)(cur_send)*sizeof(uint64_t));
314 uint64_t* buf_d = (uint64_t*)malloc((uint64_t)(cur_send)*sizeof(uint64_t));
315 if (buf_v == NULL || buf_d == NULL)
316 throw_err("exchange_verts(), unable to allocate comm buffers", procid);
317
318 for (int32_t i = 0; i < nprocs; ++i)
319 {
320 uint64_t send_begin = (comm->sendcounts_temp[i] * c) / num_comms;
321 uint64_t send_end = (comm->sendcounts_temp[i] * (c + 1)) / num_comms;
322 if (c == (num_comms-1))
323 send_end = comm->sendcounts_temp[i];
324
325 for (uint64_t j = send_begin; j < send_end; ++j)
326 {
327 uint64_t vert = comm->sendbuf_vert[comm->sdispls_temp[i]+j];
328 uint64_t data = comm->sendbuf_data[comm->sdispls_temp[i]+j];
329 buf_v[comm->sdispls_cpy[i]] = vert;
330 buf_d[comm->sdispls_cpy[i]++] = data;
331 }
332 }
333
334 MPI_Alltoallv(buf_v, comm->sendcounts,
335 comm->sdispls, MPI_UINT64_T,
336 comm->recvbuf_vert+sum_recv, comm->recvcounts,
337 comm->rdispls, MPI_UINT64_T, MPI_COMM_WORLD);
338 MPI_Alltoallv(buf_d, comm->sendcounts,
339 comm->sdispls, MPI_UINT64_T,
340 comm->recvbuf_data+sum_recv, comm->recvcounts,
341 comm->rdispls, MPI_UINT64_T, MPI_COMM_WORLD);
342 free(buf_v);
343 free(buf_d);
344 sum_recv += cur_recv;
345 sum_send += cur_send;
346 }
347
348 assert(sum_recv == comm->total_recv);
349 assert(sum_send == comm->total_send);
350
351 comm->global_queue_size = 0;
352 task_queue_size = comm->total_recv + q->next_size;
353 MPI_Allreduce(&task_queue_size, &comm->global_queue_size, 1,
354 MPI_UINT64_T, MPI_SUM, MPI_COMM_WORLD);
355
356 q->send_size = 0;
357}
358
359
360inline void exchange_verts(mpi_data_t* comm)
361{
362 if (debug) { printf("Task %d exchange_verts() start\n", procid); }
363
364 uint64_t num_comms = comm->global_queue_size / (uint64_t)MAX_SEND_SIZE + 1;
365 uint64_t sum_recv = 0;
366 uint64_t sum_send = 0;
367 for (uint64_t c = 0; c < num_comms; ++c)
368 {
369 for (int32_t i = 0; i < nprocs; ++i)
370 {
371 uint64_t send_begin = (comm->sendcounts_temp[i] * c) / num_comms;
372 uint64_t send_end = (comm->sendcounts_temp[i] * (c + 1)) / num_comms;
373 if (c == (num_comms-1))
374 send_end = comm->sendcounts_temp[i];
375 comm->sendcounts[i] = (int32_t)(send_end - send_begin);
376 assert(comm->sendcounts[i] >= 0);
377 }
378
379 MPI_Alltoall(comm->sendcounts, 1, MPI_INT32_T,
380 comm->recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
381
382 comm->sdispls[0] = 0;
383 comm->sdispls_cpy[0] = 0;
384 comm->rdispls[0] = 0;
385 for (int32_t i = 1; i < nprocs; ++i)
386 {
387 comm->sdispls[i] = comm->sdispls[i-1] + comm->sendcounts[i-1];
388 comm->rdispls[i] = comm->rdispls[i-1] + comm->recvcounts[i-1];
389 comm->sdispls_cpy[i] = comm->sdispls[i];
390 }
391
392 int32_t cur_send = comm->sdispls[nprocs-1] + comm->sendcounts[nprocs-1];
393 int32_t cur_recv = comm->rdispls[nprocs-1] + comm->recvcounts[nprocs-1];
394 uint64_t* buf_v = (uint64_t*)malloc((uint64_t)(cur_send)*sizeof(uint64_t));
395 if (buf_v == NULL)
396 throw_err("exchange_verts(), unable to allocate comm buffers", procid);
397
398 for (int32_t i = 0; i < nprocs; ++i)
399 {
400 uint64_t send_begin = (comm->sendcounts_temp[i] * c) / num_comms;
401 uint64_t send_end = (comm->sendcounts_temp[i] * (c + 1)) / num_comms;
402 if (c == (num_comms-1))
403 send_end = comm->sendcounts_temp[i];
404
405 for (uint64_t j = send_begin; j < send_end; ++j)
406 {
407 uint64_t vert = comm->sendbuf_vert[comm->sdispls_temp[i]+j];
408 buf_v[comm->sdispls_cpy[i]++] = vert;
409 }
410 }
411
412 MPI_Alltoallv(buf_v, comm->sendcounts,
413 comm->sdispls, MPI_UINT64_T,
414 comm->recvbuf_vert+sum_recv, comm->recvcounts,
415 comm->rdispls, MPI_UINT64_T, MPI_COMM_WORLD);
416 free(buf_v);
417 sum_recv += cur_recv;
418 sum_send += cur_send;
419 }
420
421 assert(sum_recv == comm->total_recv);
422 assert(sum_send == comm->total_send);
423
424 if (debug) { printf("Task %d exchange_verts() success\n", procid); }
425
426}
427
428inline void exchange_data(mpi_data_t* comm)
429{
430 if (debug) { printf("Task %d exchange_data() start\n", procid); }
431
432 uint64_t num_comms = comm->global_queue_size / (uint64_t)MAX_SEND_SIZE + 1;
433 uint64_t sum_recv = 0;
434 uint64_t sum_send = 0;
435 for (uint64_t c = 0; c < num_comms; ++c)
436 {
437 for (int32_t i = 0; i < nprocs; ++i)
438 {
439 uint64_t send_begin = (comm->sendcounts_temp[i] * c) / num_comms;
440 uint64_t send_end = (comm->sendcounts_temp[i] * (c + 1)) / num_comms;
441 if (c == (num_comms-1))
442 send_end = comm->sendcounts_temp[i];
443 comm->sendcounts[i] = (int32_t)(send_end - send_begin);
444 assert(comm->sendcounts[i] >= 0);
445 }
446
447 MPI_Alltoall(comm->sendcounts, 1, MPI_INT32_T,
448 comm->recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
449
450 comm->sdispls[0] = 0;
451 comm->sdispls_cpy[0] = 0;
452 comm->rdispls[0] = 0;
453 for (int32_t i = 1; i < nprocs; ++i)
454 {
455 comm->sdispls[i] = comm->sdispls[i-1] + comm->sendcounts[i-1];
456 comm->rdispls[i] = comm->rdispls[i-1] + comm->recvcounts[i-1];
457 comm->sdispls_cpy[i] = comm->sdispls[i];
458 }
459
460 int32_t cur_send = comm->sdispls[nprocs-1] + comm->sendcounts[nprocs-1];
461 int32_t cur_recv = comm->rdispls[nprocs-1] + comm->recvcounts[nprocs-1];
462 uint64_t* buf_d = (uint64_t*)malloc((uint64_t)(cur_send)*sizeof(uint64_t));
463 if (buf_d == NULL)
464 throw_err("exchange_data(), unable to allocate comm buffers", procid);
465
466 for (int32_t i = 0; i < nprocs; ++i)
467 {
468 uint64_t send_begin = (comm->sendcounts_temp[i] * c) / num_comms;
469 uint64_t send_end = (comm->sendcounts_temp[i] * (c + 1)) / num_comms;
470 if (c == (num_comms-1))
471 send_end = comm->sendcounts_temp[i];
472
473 for (uint64_t j = send_begin; j < send_end; ++j)
474 {
475 uint64_t data = comm->sendbuf_data[comm->sdispls_temp[i]+j];
476 buf_d[comm->sdispls_cpy[i]++] = data;
477 }
478 }
479
480 MPI_Alltoallv(buf_d, comm->sendcounts,
481 comm->sdispls, MPI_UINT64_T,
482 comm->recvbuf_data+sum_recv, comm->recvcounts,
483 comm->rdispls, MPI_UINT64_T, MPI_COMM_WORLD);
484 free(buf_d);
485 sum_recv += cur_recv;
486 sum_send += cur_send;
487 }
488
489 assert(sum_recv == comm->total_recv);
490 assert(sum_send == comm->total_send);
491
492 if (debug) { printf("Task %d exchange_data() success\n", procid); }
493}
494
496{
497 if (debug) { printf("Task %d exchange_data_flt() start\n", procid); }
498
499 uint64_t num_comms = comm->global_queue_size / (uint64_t)MAX_SEND_SIZE + 1;
500 uint64_t sum_recv = 0;
501 uint64_t sum_send = 0;
502 for (uint64_t c = 0; c < num_comms; ++c)
503 {
504 for (int32_t i = 0; i < nprocs; ++i)
505 {
506 uint64_t send_begin = (comm->sendcounts_temp[i] * c) / num_comms;
507 uint64_t send_end = (comm->sendcounts_temp[i] * (c + 1)) / num_comms;
508 if (c == (num_comms-1))
509 send_end = comm->sendcounts_temp[i];
510 comm->sendcounts[i] = (int32_t)(send_end - send_begin);
511 assert(comm->sendcounts[i] >= 0);
512 }
513
514 MPI_Alltoall(comm->sendcounts, 1, MPI_INT32_T,
515 comm->recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
516
517 comm->sdispls[0] = 0;
518 comm->sdispls_cpy[0] = 0;
519 comm->rdispls[0] = 0;
520 for (int32_t i = 1; i < nprocs; ++i)
521 {
522 comm->sdispls[i] = comm->sdispls[i-1] + comm->sendcounts[i-1];
523 comm->rdispls[i] = comm->rdispls[i-1] + comm->recvcounts[i-1];
524 comm->sdispls_cpy[i] = comm->sdispls[i];
525 }
526
527 int32_t cur_send = comm->sdispls[nprocs-1] + comm->sendcounts[nprocs-1];
528 int32_t cur_recv = comm->rdispls[nprocs-1] + comm->recvcounts[nprocs-1];
529 double* buf_d = (double*)malloc((double)(cur_send)*sizeof(double));
530 if (buf_d == NULL)
531 throw_err("exchange_data_flt(), unable to allocate comm buffers", procid);
532
533 for (int32_t i = 0; i < nprocs; ++i)
534 {
535 uint64_t send_begin = (comm->sendcounts_temp[i] * c) / num_comms;
536 uint64_t send_end = (comm->sendcounts_temp[i] * (c + 1)) / num_comms;
537 if (c == (num_comms-1))
538 send_end = comm->sendcounts_temp[i];
539
540 for (uint64_t j = send_begin; j < send_end; ++j)
541 {
542 double data = comm->sendbuf_data_flt[comm->sdispls_temp[i]+j];
543 buf_d[comm->sdispls_cpy[i]++] = data;
544 }
545 }
546
547 MPI_Alltoallv(buf_d, comm->sendcounts,
548 comm->sdispls, MPI_DOUBLE,
549 comm->recvbuf_data_flt+sum_recv, comm->recvcounts,
550 comm->rdispls, MPI_DOUBLE, MPI_COMM_WORLD);
551 free(buf_d);
552 sum_recv += cur_recv;
553 sum_send += cur_send;
554 }
555
556 assert(sum_recv == comm->total_recv);
557 assert(sum_send == comm->total_send);
558
559 if (debug) { printf("Task %d exchange_data_flt() success\n", procid); }
560}
561
562
564 thread_comm_t* tc,
565 uint64_t vert_index)
566{
567 for (int32_t i = 0; i < nprocs; ++i)
568 tc->v_to_rank[i] = false;
569
570 uint64_t out_degree = out_degree(g, vert_index);
571 uint64_t* outs = out_vertices(g, vert_index);
572 for (uint64_t j = 0; j < out_degree; ++j)
573 {
574 uint64_t out_index = outs[j];
575 if (out_index >= g->n_local)
576 {
577 int32_t out_rank = g->ghost_tasks[out_index-g->n_local];
578 if (!tc->v_to_rank[out_rank])
579 {
580 tc->v_to_rank[out_rank] = true;
581 ++tc->sendcounts_thread[out_rank];
582 }
583 }
584 }
585 uint64_t in_degree = in_degree(g, vert_index);
586 uint64_t* ins = in_vertices(g, vert_index);
587 for (uint64_t j = 0; j < in_degree; ++j)
588 {
589 uint64_t in_index = ins[j];
590 if (in_index >= g->n_local)
591 {
592 int32_t in_rank = g->ghost_tasks[in_index-g->n_local];
593 if (!tc->v_to_rank[in_rank])
594 {
595 tc->v_to_rank[in_rank] = true;
596 ++tc->sendcounts_thread[in_rank];
597 }
598 }
599 }
600}
601
603 thread_comm_t* tc,
604 uint64_t vert_index)
605{
606 for (int32_t i = 0; i < nprocs; ++i)
607 tc->v_to_rank[i] = false;
608
609 uint64_t out_degree = out_degree(g, vert_index);
610 uint64_t* outs = out_vertices(g, vert_index);
611 for (uint64_t j = 0; j < out_degree; ++j)
612 {
613 uint64_t out_index = outs[j];
614 if (out_index >= g->n_local)
615 {
616 int32_t out_rank = g->ghost_tasks[out_index-g->n_local];
617 if (!tc->v_to_rank[out_rank])
618 {
619 tc->v_to_rank[out_rank] = true;
620 ++tc->sendcounts_thread[out_rank];
621 }
622 }
623 }
624}
625
626
627
629 thread_comm_t* tc, mpi_data_t* comm,
630 uint64_t vert_index, uint64_t data)
631{
632 for (int32_t i = 0; i < nprocs; ++i)
633 tc->v_to_rank[i] = false;
634
635 uint64_t out_degree = out_degree(g, vert_index);
636 uint64_t* outs = out_vertices(g, vert_index);
637 for (uint64_t j = 0; j < out_degree; ++j)
638 {
639 uint64_t out_index = outs[j];
640 if (out_index >= g->n_local)
641 {
642 int32_t out_rank = g->ghost_tasks[out_index - g->n_local];
643 if (!tc->v_to_rank[out_rank])
644 {
645 tc->v_to_rank[out_rank] = true;
646 add_vid_data_to_send(tc, comm,
647 g->local_unmap[vert_index], data, out_rank);
648 }
649 }
650 }
651
652 uint64_t in_degree = in_degree(g, vert_index);
653 uint64_t* ins = in_vertices(g, vert_index);
654 for (uint64_t j = 0; j < in_degree; ++j)
655 {
656 uint64_t in_index = ins[j];
657 if (in_index >= g->n_local)
658 {
659 int32_t in_rank = g->ghost_tasks[in_index - g->n_local];
660 if (!tc->v_to_rank[in_rank])
661 {
662 tc->v_to_rank[in_rank] = true;
663 add_vid_data_to_send(tc, comm,
664 g->local_unmap[vert_index], data, in_rank);
665 }
666 }
667 }
668}
669
671 thread_comm_t* tc, mpi_data_t* comm,
672 uint64_t vert_index, double data)
673{
674 for (int32_t i = 0; i < nprocs; ++i)
675 tc->v_to_rank[i] = false;
676
677 uint64_t out_degree = out_degree(g, vert_index);
678 uint64_t* outs = out_vertices(g, vert_index);
679 for (uint64_t j = 0; j < out_degree; ++j)
680 {
681 uint64_t out_index = outs[j];
682 if (out_index >= g->n_local)
683 {
684 int32_t out_rank = g->ghost_tasks[out_index - g->n_local];
685 if (!tc->v_to_rank[out_rank])
686 {
687 tc->v_to_rank[out_rank] = true;
689 g->local_unmap[vert_index], data, out_rank);
690 }
691 }
692 }
693}
694
696 thread_comm_t* tc, mpi_data_t* comm,
697 uint64_t vert_index, uint64_t data)
698{
699 for (int32_t i = 0; i < nprocs; ++i)
700 tc->v_to_rank[i] = false;
701
702 uint64_t out_degree = out_degree(g, vert_index);
703 uint64_t* outs = out_vertices(g, vert_index);
704 for (uint64_t j = 0; j < out_degree; ++j)
705 {
706 uint64_t out_index = outs[j];
707 if (out_index >= g->n_local)
708 {
709 int32_t out_rank = g->ghost_tasks[out_index - g->n_local];
710 if (!tc->v_to_rank[out_rank])
711 {
712 tc->v_to_rank[out_rank] = true;
713 add_vid_data_to_send(tc, comm,
714 g->local_unmap[vert_index], data, out_rank);
715 }
716 }
717 }
718}
719
720
722 uint64_t vertex_id)
723{
724 tq->thread_queue[tq->thread_queue_size++] = vertex_id;
725
727 empty_queue(tq, q);
728}
729
731{
732 uint64_t start_offset;
733
734#pragma omp atomic capture
735 start_offset = q->next_size += tq->thread_queue_size;
736
737 start_offset -= tq->thread_queue_size;
738 for (uint64_t i = 0; i < tq->thread_queue_size; ++i)
739 q->queue_next[start_offset + i] = tq->thread_queue[i];
740 tq->thread_queue_size = 0;
741}
742
744 uint64_t vertex_id)
745{
746 tq->thread_send[tq->thread_send_size++] = vertex_id;
747
749 empty_send(tq, q);
750}
751
753{
754 uint64_t start_offset;
755
756#pragma omp atomic capture
757 start_offset = q->send_size += tq->thread_send_size;
758
759 start_offset -= tq->thread_send_size;
760 for (uint64_t i = 0; i < tq->thread_send_size; ++i)
761 q->queue_send[start_offset + i] = tq->thread_send[i];
762 tq->thread_send_size = 0;
763}
764
765
767 uint64_t vertex_id, uint64_t data_val, int32_t send_rank)
768{
769 tc->sendbuf_vert_thread[tc->thread_queue_size] = vertex_id;
770 tc->sendbuf_data_thread[tc->thread_queue_size] = data_val;
771 tc->sendbuf_rank_thread[tc->thread_queue_size] = send_rank;
772 ++tc->thread_queue_size;
773 ++tc->sendcounts_thread[send_rank];
774
776 empty_vid_data(tc, comm);
777}
778
780 uint64_t vertex_id, double data_val, int32_t send_rank)
781{
782 tc->sendbuf_vert_thread[tc->thread_queue_size] = vertex_id;
783 tc->sendbuf_data_thread_flt[tc->thread_queue_size] = data_val;
784 tc->sendbuf_rank_thread[tc->thread_queue_size] = send_rank;
785 ++tc->thread_queue_size;
786 ++tc->sendcounts_thread[send_rank];
787
789 empty_vid_data_flt(tc, comm);
790}
791
793{
794 for (int32_t i = 0; i < nprocs; ++i)
795 {
796#pragma omp atomic capture
797 tc->thread_starts[i] = comm->sdispls_cpy_temp[i] += tc->sendcounts_thread[i];
798
799 tc->thread_starts[i] -= tc->sendcounts_thread[i];
800 }
801
802 for (uint64_t i = 0; i < tc->thread_queue_size; ++i)
803 {
804 int32_t cur_rank = tc->sendbuf_rank_thread[i];
805 comm->sendbuf_vert[tc->thread_starts[cur_rank]] =
806 tc->sendbuf_vert_thread[i];
807 comm->sendbuf_data[tc->thread_starts[cur_rank]] =
808 tc->sendbuf_data_thread[i];
809 ++tc->thread_starts[cur_rank];
810 }
811
812 for (int32_t i = 0; i < nprocs; ++i)
813 {
814 tc->thread_starts[i] = 0;
815 tc->sendcounts_thread[i] = 0;
816 }
817 tc->thread_queue_size = 0;
818}
819
821{
822 for (int32_t i = 0; i < nprocs; ++i)
823 {
824#pragma omp atomic capture
825 tc->thread_starts[i] = comm->sdispls_cpy_temp[i] += tc->sendcounts_thread[i];
826
827 tc->thread_starts[i] -= tc->sendcounts_thread[i];
828 }
829
830 for (uint64_t i = 0; i < tc->thread_queue_size; ++i)
831 {
832 int32_t cur_rank = tc->sendbuf_rank_thread[i];
833 comm->sendbuf_vert[tc->thread_starts[cur_rank]] =
834 tc->sendbuf_vert_thread[i];
835 comm->sendbuf_data_flt[tc->thread_starts[cur_rank]] =
837 ++tc->thread_starts[cur_rank];
838 }
839
840 for (int32_t i = 0; i < nprocs; ++i)
841 {
842 tc->thread_starts[i] = 0;
843 tc->sendcounts_thread[i] = 0;
844 }
845 tc->thread_queue_size = 0;
846}
847
848
849
850#endif
Mac OS X ATTR com apple quarantine q
Definition ._remapper.cpp:1
void clear_comm_data(mpi_data_t *comm)
Definition comms.cpp:111
void empty_vid_data(thread_comm_t *tc, mpi_data_t *comm)
Definition comms.h:792
void update_vid_data_queues_out(dist_graph_t *g, thread_comm_t *tc, mpi_data_t *comm, uint64_t vert_index, uint64_t data)
Definition comms.h:695
void clear_recvbuf_vid_data(mpi_data_t *comm)
Definition comms.cpp:359
void empty_queue(thread_queue_t *tq, queue_data_t *q)
Definition comms.h:730
int procid
Definition main.cpp:55
bool debug
Definition comms.h:60
void init_thread_comm(thread_comm_t *tc)
Definition comms.cpp:169
void clear_queue_data(queue_data_t *q)
Definition comms.cpp:75
void init_thread_queue(thread_queue_t *tq)
Definition comms.cpp:136
void init_recvbuf_vid_data(mpi_data_t *comm)
Definition comms.cpp:278
bool verify
Definition comms.h:60
void add_vid_data_to_send_flt(thread_comm_t *tc, mpi_data_t *comm, uint64_t vertex_id, double data_val, int32_t send_rank)
Definition comms.h:779
void add_vid_to_queue(thread_queue_t *tq, queue_data_t *q, uint64_t vertex_id)
Definition comms.h:721
void init_sendbuf_vid_data_flt(mpi_data_t *comm)
Definition comms.cpp:305
void init_thread_comm_flt(thread_comm_t *tc)
Definition comms.cpp:209
#define THREAD_QUEUE_SIZE
Definition comms.h:63
void exchange_data_flt(mpi_data_t *comm)
Definition comms.h:495
void init_recvbuf_vid_data_flt(mpi_data_t *comm)
Definition comms.cpp:332
void empty_vid_data_flt(thread_comm_t *tc, mpi_data_t *comm)
Definition comms.h:820
void clear_thread_queue(thread_queue_t *tq)
Definition comms.cpp:157
void init_queue_data(dist_graph_t *g, queue_data_t *q)
Definition comms.cpp:58
bool verbose
Definition main.cpp:56
void exchange_verts(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q)
Definition comms.h:184
#define MAX_SEND_SIZE
Definition comms.h:62
void update_sendcounts_thread_out(dist_graph_t *g, thread_comm_t *tc, uint64_t vert_index)
Definition comms.h:602
bool output
Definition comms.h:60
void clear_thread_commflt(thread_comm_t *tc)
void exchange_data(mpi_data_t *comm)
Definition comms.h:428
int nprocs
Definition comms.h:59
void init_sendbuf_vid_data(mpi_data_t *comm)
Definition comms.cpp:251
void update_vid_data_queues(dist_graph_t *g, thread_comm_t *tc, mpi_data_t *comm, uint64_t vert_index, uint64_t data)
Definition comms.h:628
void clear_allbuf_vid_data(mpi_data_t *comm)
Definition comms.cpp:370
void add_vid_to_send(thread_queue_t *tq, queue_data_t *q, uint64_t vertex_id)
Definition comms.h:743
void add_vid_data_to_send(thread_comm_t *tc, mpi_data_t *comm, uint64_t vertex_id, uint64_t data_val, int32_t send_rank)
Definition comms.h:766
void exchange_vert_data(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q)
Definition comms.h:258
void empty_send(thread_queue_t *tq, queue_data_t *q)
Definition comms.h:752
void init_comm_data(mpi_data_t *comm)
Definition comms.cpp:84
void update_sendcounts_thread(dist_graph_t *g, thread_comm_t *tc, uint64_t vert_index)
Definition comms.h:563
void clear_thread_comm(thread_comm_t *tc)
Definition comms.cpp:199
#define in_degree(g, n)
Definition dist_graph.h:55
#define out_vertices(g, n)
Definition dist_graph.h:56
#define out_degree(g, n)
Definition dist_graph.h:54
#define in_vertices(g, n)
Definition dist_graph.h:57
signed int int32_t
Definition stdint.h:77
unsigned __int64 uint64_t
Definition stdint.h:90
uint64_t * local_unmap
Definition dist_graph.h:80
uint64_t * ghost_tasks
Definition dist_graph.h:82
uint64_t * ghost_unmap
Definition dist_graph.h:81
uint64_t n_local
Definition dist_graph.h:66
uint64_t * sdispls_temp
Definition comms.h:74
uint64_t * sendbuf_data
Definition comms.h:79
uint64_t * sendcounts_temp
Definition comms.h:73
int32_t * sendcounts
Definition comms.h:66
uint64_t total_send
Definition comms.h:86
double * sendbuf_data_flt
Definition comms.h:80
uint64_t * recvbuf_data
Definition comms.h:82
uint64_t * sendbuf_vert
Definition comms.h:78
uint64_t total_recv
Definition comms.h:85
int32_t * sdispls
Definition comms.h:68
uint64_t * recvcounts_temp
Definition comms.h:72
int32_t * rdispls
Definition comms.h:69
int32_t * recvcounts
Definition comms.h:67
uint64_t * sdispls_cpy_temp
Definition comms.h:76
uint64_t global_queue_size
Definition comms.h:87
double * recvbuf_data_flt
Definition comms.h:83
uint64_t * recvbuf_vert
Definition comms.h:81
int32_t * sdispls_cpy
Definition comms.h:70
uint64_t * rdispls_temp
Definition comms.h:75
uint64_t send_size
Definition comms.h:97
uint64_t next_size
Definition comms.h:96
uint64_t * queue_next
Definition comms.h:92
uint64_t * queue_send
Definition comms.h:93
uint64_t queue_size
Definition comms.h:95
uint64_t * queue
Definition comms.h:91
bool * v_to_rank
Definition comms.h:110
uint64_t * thread_starts
Definition comms.h:116
double * sendbuf_data_thread_flt
Definition comms.h:114
uint64_t * sendbuf_vert_thread
Definition comms.h:112
uint64_t thread_queue_size
Definition comms.h:117
uint64_t * sendbuf_data_thread
Definition comms.h:113
int32_t * sendbuf_rank_thread
Definition comms.h:115
int32_t tid
Definition comms.h:109
uint64_t * sendcounts_thread
Definition comms.h:111
uint64_t thread_queue_size
Definition comms.h:104
uint64_t * thread_send
Definition comms.h:103
uint64_t thread_send_size
Definition comms.h:105
uint64_t * thread_queue
Definition comms.h:102
int32_t tid
Definition comms.h:101
void throw_err(char const *err_message)
Definition util.cpp:58