COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
io_pp.cpp
Go to the documentation of this file.
1/*
2//@HEADER
3// *****************************************************************************
4//
5// HPCGraph: Graph Computation on High Performance Computing Systems
6// Copyright (2016) Sandia Corporation
7//
8// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
9// the U.S. Government retains certain rights in this software.
10//
11// Redistribution and use in source and binary forms, with or without
12// modification, are permitted provided that the following conditions are
13// met:
14//
15// 1. Redistributions of source code must retain the above copyright
16// notice, this list of conditions and the following disclaimer.
17//
18// 2. Redistributions in binary form must reproduce the above copyright
19// notice, this list of conditions and the following disclaimer in the
20// documentation and/or other materials provided with the distribution.
21//
22// 3. Neither the name of the Corporation nor the names of the
23// contributors may be used to endorse or promote products derived from
24// this software without specific prior written permission.
25//
26// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
27// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
29// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
30// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
31// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
32// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
33// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
34// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
35// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
36// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37//
38// Questions? Contact George M. Slota (gmslota@sandia.gov)
39// Siva Rajamanickam (srajama@sandia.gov)
40// Kamesh Madduri (madduri@cse.psu.edu)
41//
42// *****************************************************************************
43//@HEADER
44*/
45
46#include <mpi.h>
47#include <omp.h>
48#include <stdio.h>
49#include <stdlib.h>
50#include <stdint.h>
51
52#include "io_pp.h"
53#include "comms.h"
54#include "util.h"
55
56extern int procid, nprocs;
57extern bool verbose, debug, verify, output;
58
59
60int load_graph_edges_32(char *input_filename, graph_gen_data_t *ggi)
61{
62 if (debug) {
63 printf("Task %d load_graph_edges() %s start\n", procid, input_filename);
64 }
65
66 double elt = 0.0;
67 if (verbose) {
68 MPI_Barrier(MPI_COMM_WORLD);
69 elt = omp_get_wtime();
70 }
71
72 FILE *infp = fopen(input_filename, "rb");
73 if(infp == NULL)
74 throw_err("load_graph_edges() unable to open input file", procid);
75
76 fseek(infp, 0L, SEEK_END);
77 uint64_t file_size = ftell(infp);
78 fseek(infp, 0L, SEEK_SET);
79
80 uint64_t nedges_global = file_size/(2*sizeof(uint32_t));
81 ggi->m = nedges_global;
82
83 uint64_t read_offset_start = procid*2*sizeof(uint32_t)*(nedges_global/nprocs);
84 uint64_t read_offset_end = (procid+1)*2*sizeof(uint32_t)*(nedges_global/nprocs);
85
86 if (procid == nprocs - 1)
87 read_offset_end = 2*sizeof(uint32_t)*nedges_global;
88
89 uint64_t nedges = (read_offset_end - read_offset_start)/8;
90 ggi->m_local_read = nedges;
91
92 if (debug) {
93 printf("Task %d, read_offset_start %ld, read_offset_end %ld, nedges_global %ld, nedges: %ld\n", procid, read_offset_start, read_offset_end, nedges_global, nedges);
94 }
95
96 uint32_t* gen_edges_read = (uint32_t*)malloc(2*nedges*sizeof(uint32_t));
97 uint64_t* gen_edges = (uint64_t*)malloc(2*nedges*sizeof(uint64_t));
98 if (gen_edges_read == NULL || gen_edges == NULL)
99 throw_err("load_graph_edges(), unable to allocate buffer", procid);
100
101 fseek(infp, read_offset_start, SEEK_SET);
102 fread(gen_edges_read, nedges, 2*sizeof(uint32_t), infp);
103 fclose(infp);
104
105 for (uint64_t i = 0; i < nedges*2; ++i)
106 gen_edges[i] = (uint64_t)gen_edges_read[i];
107
108 free(gen_edges_read);
109 ggi->gen_edges = gen_edges;
110
111 if (verbose) {
112 elt = omp_get_wtime() - elt;
113 printf("Task %d read %lu edges, %9.6f (s)\n", procid, nedges, elt);
114 }
115
116 uint64_t max_n = 0;
117 for (uint64_t i = 0; i < ggi->m_local_read*2; ++i)
118 if (gen_edges[i] > max_n)
119 max_n = gen_edges[i];
120
121 uint64_t n_global;
122 MPI_Allreduce(&max_n, &n_global, 1, MPI_UINT64_T, MPI_MAX, MPI_COMM_WORLD);
123
124 ggi->n = n_global+1;
125 ggi->n_offset = procid*(ggi->n/nprocs + 1);
126 ggi->n_local = ggi->n/nprocs + 1;
127 if (procid == nprocs - 1)
128 ggi->n_local = n_global - ggi->n_offset + 1;
129
130 if (verbose) {
131 printf("Task %d, n %lu, n_offset %lu, n_local %lu\n",
132 procid, ggi->n, ggi->n_offset, ggi->n_local);
133 }
134
135 if (debug) { printf("Task %d load_graph_edges() success\n", procid); }
136 return 0;
137}
138
139
140int load_graph_edges_64(char *input_filename, graph_gen_data_t *ggi)
141{
142 if (debug) { printf("Task %d load_graph_edges() start\n", procid); }
143
144 double elt = 0.0;
145 if (verbose) {
146 MPI_Barrier(MPI_COMM_WORLD);
147 elt = omp_get_wtime();
148 }
149
150 FILE *infp = fopen(input_filename, "rb");
151 if(infp == NULL)
152 throw_err("load_graph_edges() unable to open input file", procid);
153
154 fseek(infp, 0L, SEEK_END);
155 uint64_t file_size = ftell(infp);
156 fseek(infp, 0L, SEEK_SET);
157
158 uint64_t nedges_global = file_size/(2*sizeof(uint64_t));
159 ggi->m = nedges_global;
160
161 uint64_t read_offset_start = procid*2*sizeof(uint64_t)*(nedges_global/nprocs);
162 uint64_t read_offset_end = (procid+1)*2*sizeof(uint64_t)*(nedges_global/nprocs);
163
164 if (procid == nprocs - 1)
165 read_offset_end = 2*sizeof(uint64_t)*nedges_global;
166
167 uint64_t nedges = (read_offset_end - read_offset_start)/8;
168 ggi->m_local_read = nedges;
169
170 if (debug) {
171 printf("Task %d, read_offset_start %ld, read_offset_end %ld, nedges_global %ld, nedges: %ld\n", procid, read_offset_start, read_offset_end, nedges_global, nedges);
172 }
173
174 uint64_t* gen_edges = (uint64_t*)malloc(2*nedges*sizeof(uint64_t));
175 if (gen_edges == NULL)
176 throw_err("load_graph_edges(), unable to allocate buffer", procid);
177
178 fseek(infp, read_offset_start, SEEK_SET);
179 fread(gen_edges, nedges, 2*sizeof(uint64_t), infp);
180 fclose(infp);
181
182 ggi->gen_edges = gen_edges;
183
184 if (verbose) {
185 elt = omp_get_wtime() - elt;
186 printf("Task %d read %ld edges, %9.6f (s)\n", procid, nedges, elt);
187 }
188
189 uint64_t max_n = 0;
190 for (uint64_t i = 0; i < ggi->m_local_read*2; ++i)
191 if (gen_edges[i] > max_n)
192 max_n = gen_edges[i];
193
194 uint64_t n_global;
195 MPI_Allreduce(&max_n, &n_global, 1, MPI_UINT64_T, MPI_MAX, MPI_COMM_WORLD);
196
197 ggi->n = n_global+1;
198 ggi->n_offset = procid*(ggi->n/nprocs + 1);
199 ggi->n_local = ggi->n/nprocs + 1;
200 if (procid == nprocs - 1)
201 ggi->n_local = n_global - ggi->n_offset + 1;
202
203 if (verbose) {
204 printf("Task %d, n %lu, n_offset %lu, n_local %lu\n",
205 procid, ggi->n, ggi->n_offset, ggi->n_local);
206 }
207
208 if (debug) { printf("Task %d load_graph_edges() success\n", procid); }
209 return 0;
210}
211
213{
214 if (debug) { printf("Task %d exchange_out_edges() start\n", procid); }
215 double elt = 0.0;
216 if (verbose) {
217 MPI_Barrier(MPI_COMM_WORLD);
218 elt = omp_get_wtime();
219 }
220
221 uint64_t* temp_sendcounts = (uint64_t*)malloc(nprocs*sizeof(uint64_t));
222 uint64_t* temp_recvcounts = (uint64_t*)malloc(nprocs*sizeof(uint64_t));
223 for (int i = 0; i < nprocs; ++i)
224 {
225 temp_sendcounts[i] = 0;
226 temp_recvcounts[i] = 0;
227 }
228
229 uint64_t n_per_rank = ggi->n / nprocs + 1;
230 for (uint64_t i = 0; i < ggi->m_local_read*2; i+=2)
231 {
232 uint64_t vert = ggi->gen_edges[i];
233 int32_t vert_task = (int32_t)(vert / n_per_rank);
234 temp_sendcounts[vert_task] += 2;
235 }
236
237 MPI_Alltoall(temp_sendcounts, 1, MPI_UINT64_T,
238 temp_recvcounts, 1, MPI_UINT64_T, MPI_COMM_WORLD);
239
240 uint64_t total_recv = 0;
241 uint64_t total_send = 0;
242 for (int32_t i = 0; i < nprocs; ++i)
243 {
244 total_recv += temp_recvcounts[i];
245 total_send += temp_sendcounts[i];
246 }
247 free(temp_sendcounts);
248 free(temp_recvcounts);
249
250 uint64_t* recvbuf = (uint64_t*)malloc(total_recv*sizeof(uint64_t));
251 if (recvbuf == NULL)
252 {
253 fprintf(stderr, "Task %d Error: exchange_out_edges(), unable to allocate buffer\n", procid);
254 MPI_Abort(MPI_COMM_WORLD, 1);
255 }
256
257 uint64_t max_transfer = total_send > total_recv ? total_send : total_recv;
258 uint64_t num_comms = max_transfer / (uint64_t)MAX_SEND_SIZE + 1;
259 MPI_Allreduce(MPI_IN_PLACE, &num_comms, 1,
260 MPI_UINT64_T, MPI_MAX, MPI_COMM_WORLD);
261
262 if (debug)
263 printf("Task %d exchange_out_edges() num_comms %lu total_send %lu total_recv %lu\n", procid, num_comms, total_send, total_recv);
264
265 uint64_t sum_recv = 0;
266 for (uint64_t c = 0; c < num_comms; ++c)
267 {
268 uint64_t send_begin = (ggi->m_local_read * c) / num_comms;
269 uint64_t send_end = (ggi->m_local_read * (c + 1)) / num_comms;
270 if (c == (num_comms-1))
271 send_end = ggi->m_local_read;
272
273 for (int32_t i = 0; i < nprocs; ++i)
274 {
275 comm->sendcounts[i] = 0;
276 comm->recvcounts[i] = 0;
277 }
278
279 for (int64_t i = send_begin; i < send_end; ++i)
280 {
281 uint64_t vert = ggi->gen_edges[i*2];
282 int32_t vert_task = (int32_t)(vert / n_per_rank);
283 comm->sendcounts[vert_task] += 2;
284 }
285
286 MPI_Alltoall(comm->sendcounts, 1, MPI_INT32_T,
287 comm->recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
288
289 comm->sdispls[0] = 0;
290 comm->sdispls_cpy[0] = 0;
291 comm->rdispls[0] = 0;
292 for (int32_t i = 1; i < nprocs; ++i)
293 {
294 comm->sdispls[i] = comm->sdispls[i-1] + comm->sendcounts[i-1];
295 comm->rdispls[i] = comm->rdispls[i-1] + comm->recvcounts[i-1];
296 comm->sdispls_cpy[i] = comm->sdispls[i];
297 }
298
299 int32_t cur_send = comm->sdispls[nprocs-1] + comm->sendcounts[nprocs-1];
300 int32_t cur_recv = comm->rdispls[nprocs-1] + comm->recvcounts[nprocs-1];
301 uint64_t* sendbuf = (uint64_t*) malloc((uint64_t)cur_send*sizeof(uint64_t));
302 if (sendbuf == NULL)
303 {
304 fprintf(stderr, "Task %d Error: exchange_out_edges(), unable to allocate comm buffers", procid);
305 MPI_Abort(MPI_COMM_WORLD, 1);
306 }
307
308 for (uint64_t i = send_begin; i < send_end; ++i)
309 {
310 uint64_t vert1 = ggi->gen_edges[2*i];
311 uint64_t vert2 = ggi->gen_edges[2*i+1];
312 int32_t vert_task = (int32_t)(vert1 / n_per_rank);
313
314 sendbuf[comm->sdispls_cpy[vert_task]++] = vert1;
315 sendbuf[comm->sdispls_cpy[vert_task]++] = vert2;
316 }
317
318 MPI_Alltoallv(sendbuf, comm->sendcounts, comm->sdispls, MPI_UINT64_T,
319 recvbuf+sum_recv, comm->recvcounts, comm->rdispls,
320 MPI_UINT64_T, MPI_COMM_WORLD);
321 sum_recv += cur_recv;
322 free(sendbuf);
323 }
324
325 free(ggi->gen_edges);
326 ggi->gen_edges = recvbuf;
327 ggi->m_local_out = total_recv / 2;
328
329 if (verbose) {
330 elt = omp_get_wtime() - elt;
331 printf("Task %d exchange_out_edges() sent %lu, recv %lu, m_local_out %lu, %9.6f (s)\n", procid, total_send, total_recv, ggi->m_local_out, elt);
332 }
333
334 if (debug) { printf("Task %d exchange_out_edges() success\n", procid); }
335 return 0;
336}
337
339{
340 if (debug) { printf("Task %d exchange_in_edges() start\n", procid); }
341 double elt = 0.0;
342 if (verbose) {
343 MPI_Barrier(MPI_COMM_WORLD);
344 elt = omp_get_wtime();
345 }
346
347 uint64_t* temp_sendcounts = (uint64_t*)malloc(nprocs*sizeof(uint64_t));
348 uint64_t* temp_recvcounts = (uint64_t*)malloc(nprocs*sizeof(uint64_t));
349 for (int i = 0; i < nprocs; ++i)
350 {
351 temp_sendcounts[i] = 0;
352 temp_recvcounts[i] = 0;
353 }
354
355 uint64_t n_per_rank = ggi->n / nprocs + 1;
356 for (uint64_t i = 0; i < ggi->m_local_out; ++i)
357 {
358 uint64_t vert = ggi->gen_edges[2*i+1];
359 int32_t vert_task = (int32_t)(vert / n_per_rank);
360 temp_sendcounts[vert_task] += 2;
361 }
362
363 MPI_Alltoall(temp_sendcounts, 1, MPI_UINT64_T,
364 temp_recvcounts, 1, MPI_UINT64_T, MPI_COMM_WORLD);
365
366 uint64_t total_recv = 0;
367 uint64_t total_send = 0;
368 for (int32_t i = 0; i < nprocs; ++i)
369 {
370 total_recv += temp_recvcounts[i];
371 total_send += temp_sendcounts[i];
372 }
373 free(temp_sendcounts);
374 free(temp_recvcounts);
375
376 uint64_t* recvbuf = (uint64_t*) malloc(total_recv*sizeof(uint64_t));
377 if (recvbuf == NULL)
378 {
379 fprintf(stderr, "Task %d Error: exchange_in_edges(), unable to allocate buffer\n", procid);
380 MPI_Abort(MPI_COMM_WORLD, 1);
381 }
382
383 uint64_t max_transfer = total_send > total_recv ? total_send : total_recv;
384 uint64_t num_comms = max_transfer / (uint64_t)MAX_SEND_SIZE + 1;
385 MPI_Allreduce(MPI_IN_PLACE, &num_comms, 1,
386 MPI_UINT64_T, MPI_MAX, MPI_COMM_WORLD);
387
388 if (debug)
389 printf("Task %d exchange_in_edges() num_comms %li total_send %li total_recv %li\n", procid, num_comms, total_send, total_recv);
390
391 uint64_t sum_recv = 0;
392 for (int64_t c = 0; c < num_comms; ++c)
393 {
394 uint64_t send_begin = (ggi->m_local_out * c) / num_comms;
395 uint64_t send_end = (ggi->m_local_out * (c + 1)) / num_comms;
396 if (c == (num_comms-1))
397 send_end = ggi->m_local_out;
398
399 for (int32_t i = 0; i < nprocs; ++i)
400 {
401 comm->sendcounts[i] = 0;
402 comm->recvcounts[i] = 0;
403 }
404
405 for (uint64_t i = send_begin; i < send_end; ++i)
406 {
407 uint64_t vert = ggi->gen_edges[i*2+1];
408 int32_t vert_task = (int32_t)(vert / n_per_rank);
409 comm->sendcounts[vert_task] += 2;
410 }
411
412 MPI_Alltoall(comm->sendcounts, 1, MPI_INT32_T,
413 comm->recvcounts, 1, MPI_INT32_T, MPI_COMM_WORLD);
414
415 comm->sdispls[0] = 0;
416 comm->sdispls_cpy[0] = 0;
417 comm->rdispls[0] = 0;
418 for (int32_t i = 1; i < nprocs; ++i)
419 {
420 comm->sdispls[i] = comm->sdispls[i-1] + comm->sendcounts[i-1];
421 comm->rdispls[i] = comm->rdispls[i-1] + comm->recvcounts[i-1];
422 comm->sdispls_cpy[i] = comm->sdispls[i];
423 }
424
425 int32_t cur_send = comm->sdispls[nprocs-1] + comm->sendcounts[nprocs-1];
426 int32_t cur_recv = comm->rdispls[nprocs-1] + comm->recvcounts[nprocs-1];
427 uint64_t* sendbuf = (uint64_t*) malloc((uint64_t)cur_send*sizeof(uint64_t));
428 if (sendbuf == NULL)
429 {
430 fprintf(stderr, "Task %d Error: exchange_in_edges(), unable to allocate comm buffers\n", procid);
431 MPI_Abort(MPI_COMM_WORLD, 1);
432 }
433
434 for (uint64_t i = send_begin; i < send_end; ++i)
435 {
436 uint64_t vert1 = ggi->gen_edges[2*i];
437 uint64_t vert2 = ggi->gen_edges[2*i+1];
438 int32_t vert_task = (int32_t)(vert2 / n_per_rank);
439
440 sendbuf[comm->sdispls_cpy[vert_task]++] = vert1;
441 sendbuf[comm->sdispls_cpy[vert_task]++] = vert2;
442 }
443
444 MPI_Alltoallv(sendbuf, comm->sendcounts, comm->sdispls, MPI_UINT64_T,
445 recvbuf+sum_recv, comm->recvcounts, comm->rdispls,
446 MPI_UINT64_T, MPI_COMM_WORLD);
447 sum_recv += cur_recv;
448 free(sendbuf);
449 }
450
451 ggi->gen_edges_rev = recvbuf;
452 ggi->m_local_in = total_recv / 2;
453
454 if (verbose) {
455 elt = omp_get_wtime() - elt;
456 printf("Task %d exchange_in_edges() sent %ld, recv %ld, m_local_out %ld, %9.6f (s)\n", procid, total_send, total_recv, ggi->m_local_out, elt);
457 }
458
459 if (debug) { printf("Task %d exchange_in_edges() success\n", procid); }
460 return 0;
461}
#define MAX_SEND_SIZE
Definition comms.h:62
long int64_t
Definition compat.h:21
int load_graph_edges_32(char *input_filename, graph_gen_data_t *ggi)
Definition io_pp.cpp:60
int procid
Definition main.cpp:55
bool debug
Definition io_pp.cpp:57
bool verify
Definition io_pp.cpp:57
int load_graph_edges_64(char *input_filename, graph_gen_data_t *ggi)
Definition io_pp.cpp:140
int exchange_in_edges(graph_gen_data_t *ggi, mpi_data_t *comm)
Definition io_pp.cpp:338
bool verbose
Definition main.cpp:56
bool output
Definition io_pp.cpp:57
int nprocs
Definition io_pp.cpp:56
int exchange_out_edges(graph_gen_data_t *ggi, mpi_data_t *comm)
Definition io_pp.cpp:212
unsigned int uint32_t
Definition stdint.h:80
signed int int32_t
Definition stdint.h:77
unsigned __int64 uint64_t
Definition stdint.h:90
uint64_t m_local_in
Definition dist_graph.h:96
uint64_t n_offset
Definition dist_graph.h:92
uint64_t n_local
Definition dist_graph.h:91
uint64_t * gen_edges
Definition dist_graph.h:98
uint64_t * gen_edges_rev
Definition dist_graph.h:99
uint64_t m_local_read
Definition dist_graph.h:94
uint64_t m_local_out
Definition dist_graph.h:95
int32_t * sendcounts
Definition comms.h:66
int32_t * sdispls
Definition comms.h:68
int32_t * rdispls
Definition comms.h:69
int32_t * recvcounts
Definition comms.h:67
int32_t * sdispls_cpy
Definition comms.h:70
void throw_err(char const *err_message)
Definition util.cpp:58