62#define MAX_SEND_SIZE 2147483648
63#define THREAD_QUEUE_SIZE 1024
187 uint64_t task_queue_size =
q->next_size +
q->send_size;
189 MPI_UINT64_T, MPI_SUM, MPI_COMM_WORLD);
193 for (
uint64_t c = 0; c < num_comms; ++c)
195 uint64_t send_begin = (
q->send_size * c) / num_comms;
196 uint64_t send_end = (
q->send_size * (c + 1)) / num_comms;
197 if (c == (num_comms-1))
198 send_end =
q->send_size;
205 for (
uint64_t i = send_begin; i < send_end; ++i)
212 MPI_Alltoall(comm->
sendcounts, 1, MPI_INT32_T,
213 comm->
recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
229 throw_err(
"exchange_verts(), unable to allocate comm buffers",
procid);
231 for (
uint64_t i = send_begin; i < send_end; ++i)
241 q->queue_next+
q->next_size+sum_recv,
245 sum_recv += cur_recv;
248 q->queue_size =
q->next_size + sum_recv;
252 q->queue =
q->queue_next;
253 q->queue_next = temp;
268 for (
int i = 0; i <
nprocs; ++i)
275 throw_err(
"exchange_vert_data() unable to allocate comm buffers",
procid);
281 MPI_UINT64_T, MPI_SUM, MPI_COMM_WORLD);
286 for (
uint64_t c = 0; c < num_comms; ++c)
292 if (c == (num_comms-1))
298 MPI_Alltoall(comm->
sendcounts, 1, MPI_INT32_T,
299 comm->
recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
315 if (buf_v == NULL || buf_d == NULL)
316 throw_err(
"exchange_verts(), unable to allocate comm buffers",
procid);
322 if (c == (num_comms-1))
325 for (
uint64_t j = send_begin; j < send_end; ++j)
337 comm->
rdispls, MPI_UINT64_T, MPI_COMM_WORLD);
341 comm->
rdispls, MPI_UINT64_T, MPI_COMM_WORLD);
344 sum_recv += cur_recv;
345 sum_send += cur_send;
354 MPI_UINT64_T, MPI_SUM, MPI_COMM_WORLD);
362 if (
debug) { printf(
"Task %d exchange_verts() start\n",
procid); }
367 for (
uint64_t c = 0; c < num_comms; ++c)
373 if (c == (num_comms-1))
379 MPI_Alltoall(comm->
sendcounts, 1, MPI_INT32_T,
380 comm->
recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
396 throw_err(
"exchange_verts(), unable to allocate comm buffers",
procid);
402 if (c == (num_comms-1))
405 for (
uint64_t j = send_begin; j < send_end; ++j)
415 comm->
rdispls, MPI_UINT64_T, MPI_COMM_WORLD);
417 sum_recv += cur_recv;
418 sum_send += cur_send;
424 if (
debug) { printf(
"Task %d exchange_verts() success\n",
procid); }
430 if (
debug) { printf(
"Task %d exchange_data() start\n",
procid); }
435 for (
uint64_t c = 0; c < num_comms; ++c)
441 if (c == (num_comms-1))
447 MPI_Alltoall(comm->
sendcounts, 1, MPI_INT32_T,
448 comm->
recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
470 if (c == (num_comms-1))
473 for (
uint64_t j = send_begin; j < send_end; ++j)
483 comm->
rdispls, MPI_UINT64_T, MPI_COMM_WORLD);
485 sum_recv += cur_recv;
486 sum_send += cur_send;
492 if (
debug) { printf(
"Task %d exchange_data() success\n",
procid); }
497 if (
debug) { printf(
"Task %d exchange_data_flt() start\n",
procid); }
502 for (
uint64_t c = 0; c < num_comms; ++c)
508 if (c == (num_comms-1))
514 MPI_Alltoall(comm->
sendcounts, 1, MPI_INT32_T,
515 comm->
recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
529 double* buf_d = (
double*)malloc((
double)(cur_send)*
sizeof(
double));
531 throw_err(
"exchange_data_flt(), unable to allocate comm buffers",
procid);
537 if (c == (num_comms-1))
540 for (
uint64_t j = send_begin; j < send_end; ++j)
550 comm->
rdispls, MPI_DOUBLE, MPI_COMM_WORLD);
552 sum_recv += cur_recv;
553 sum_send += cur_send;
559 if (
debug) { printf(
"Task %d exchange_data_flt() success\n",
procid); }
734#pragma omp atomic capture
756#pragma omp atomic capture
796#pragma omp atomic capture
824#pragma omp atomic capture
Mac OS X ATTR com apple quarantine q
void clear_comm_data(mpi_data_t *comm)
void empty_vid_data(thread_comm_t *tc, mpi_data_t *comm)
void update_vid_data_queues_out(dist_graph_t *g, thread_comm_t *tc, mpi_data_t *comm, uint64_t vert_index, uint64_t data)
void clear_recvbuf_vid_data(mpi_data_t *comm)
void empty_queue(thread_queue_t *tq, queue_data_t *q)
void init_thread_comm(thread_comm_t *tc)
void clear_queue_data(queue_data_t *q)
void init_thread_queue(thread_queue_t *tq)
void init_recvbuf_vid_data(mpi_data_t *comm)
void add_vid_data_to_send_flt(thread_comm_t *tc, mpi_data_t *comm, uint64_t vertex_id, double data_val, int32_t send_rank)
void add_vid_to_queue(thread_queue_t *tq, queue_data_t *q, uint64_t vertex_id)
void init_sendbuf_vid_data_flt(mpi_data_t *comm)
void init_thread_comm_flt(thread_comm_t *tc)
#define THREAD_QUEUE_SIZE
void exchange_data_flt(mpi_data_t *comm)
void init_recvbuf_vid_data_flt(mpi_data_t *comm)
void empty_vid_data_flt(thread_comm_t *tc, mpi_data_t *comm)
void clear_thread_queue(thread_queue_t *tq)
void init_queue_data(dist_graph_t *g, queue_data_t *q)
void exchange_verts(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q)
void update_sendcounts_thread_out(dist_graph_t *g, thread_comm_t *tc, uint64_t vert_index)
void clear_thread_commflt(thread_comm_t *tc)
void exchange_data(mpi_data_t *comm)
void init_sendbuf_vid_data(mpi_data_t *comm)
void update_vid_data_queues(dist_graph_t *g, thread_comm_t *tc, mpi_data_t *comm, uint64_t vert_index, uint64_t data)
void clear_allbuf_vid_data(mpi_data_t *comm)
void add_vid_to_send(thread_queue_t *tq, queue_data_t *q, uint64_t vertex_id)
void add_vid_data_to_send(thread_comm_t *tc, mpi_data_t *comm, uint64_t vertex_id, uint64_t data_val, int32_t send_rank)
void exchange_vert_data(dist_graph_t *g, mpi_data_t *comm, queue_data_t *q)
void empty_send(thread_queue_t *tq, queue_data_t *q)
void init_comm_data(mpi_data_t *comm)
void update_sendcounts_thread(dist_graph_t *g, thread_comm_t *tc, uint64_t vert_index)
void clear_thread_comm(thread_comm_t *tc)
#define out_vertices(g, n)
#define in_vertices(g, n)
unsigned __int64 uint64_t
uint64_t * sendcounts_temp
double * sendbuf_data_flt
uint64_t * recvcounts_temp
uint64_t * sdispls_cpy_temp
uint64_t global_queue_size
double * recvbuf_data_flt
double * sendbuf_data_thread_flt
uint64_t * sendbuf_vert_thread
uint64_t thread_queue_size
uint64_t * sendbuf_data_thread
int32_t * sendbuf_rank_thread
uint64_t * sendcounts_thread
uint64_t thread_queue_size
uint64_t thread_send_size
void throw_err(char const *err_message)