COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
FilteredBFS.cpp
Go to the documentation of this file.
1/****************************************************************/
2/* Parallel Combinatorial BLAS Library (for Graph Computations) */
3/* version 1.6 -------------------------------------------------*/
4/* date: 6/15/2017 ---------------------------------------------*/
5/* authors: Ariful Azad, Aydin Buluc --------------------------*/
6/****************************************************************/
7/*
8 Copyright (c) 2010-2017, The Regents of the University of California
9
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the Software is
15 furnished to do so, subject to the following conditions:
16
17 The above copyright notice and this permission notice shall be included in
18 all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 THE SOFTWARE.
27 */
28
29#define DETERMINISTIC
30#include "CombBLAS/CombBLAS.h"
31#include <mpi.h>
32#include <sys/time.h>
33#include <iostream>
34#include <functional>
35#include <algorithm>
36#include <vector>
37#include <string>
38#include <sstream>
39#ifdef THREADED
40 #ifndef _OPENMP
41 #define _OPENMP
42 #endif
43 #include <omp.h>
44#endif
45
46#ifdef USE_PAPI
47 #include <papi.h>
48 #include "papi_combblas_global.h"
49#endif
50
51
52/* Global variables for timing */
53
57
58/* End global variables */
59
60
61#include "TwitterEdge.h"
62
63#define MAX_ITERS 20000
64#define EDGEFACTOR 16
65#define ITERS 16
66#define CC_LIMIT 100
67#define PERCENTS 4 // testing with 4 different percentiles
68#define MINRUNS 4
69//#define ONLYTIME // don't calculate TEPS
70
71using namespace std;
72using namespace combblas;
73
74
75template <typename PARMAT>
77{
78 // boolean addition is practically a "logical or"
79 // therefore this doesn't destruct any links
80 PARMAT AT = A;
81 AT.Transpose();
82 A += AT;
83}
84
85
86#ifdef DETERMINISTIC
88#else
89MTRand GlobalMT; // generate random numbers with Mersenne Twister
90#endif
91
92
93struct Twitter_obj_randomizer : public std::unary_function<TwitterEdge, TwitterEdge>
94{
95 const TwitterEdge operator()(const TwitterEdge & x) const
96 {
97 short mycount = 1;
98 bool myfollow = 0;
99 time_t mylatest = static_cast<int64_t>(GlobalMT.rand() * 10000); // random.randrange(0,10000)
100
102 }
103};
104
105struct Twitter_materialize: public std::binary_function<TwitterEdge, time_t, bool>
106{
107 bool operator()(const TwitterEdge & x, time_t sincedate) const
108 {
109 if(x.isRetwitter() && x.LastTweetBy(sincedate))
110 return false; // false if the edge is going to be kept
111 else
112 return true; // true if the edge is to be pruned
113 }
114};
115
116#ifdef USE_PAPI
117void CheckPAPI(int errorcode, char [] errorstring)
118{
119 if (errorcode != PAPI_OK)
120 {
122 fprintf(stderr, "PAPI error (%d): %s\n", errorcode, errorstring);
123 }
124}
125#endif
126
127
128
129int main(int argc, char* argv[])
130{
131 int nprocs, myrank;
132#ifdef _OPENMP
134 int provided, flag, claimed;
137 if (!flag)
138 SpParHelper::Print("This thread called init_thread but Is_thread_main gave false\n");
140 if (claimed != provided)
141 SpParHelper::Print("Query thread gave different thread level than requested\n");
142#else
143 MPI_Init(&argc, &argv);
144 int cblas_splits = 1;
145#endif
146
149 int MAXTRIALS;
150 int retval;
151
152#ifdef USE_PAPI
153 /* Initialize the PAPI library */
155 if (retval != PAPI_VER_CURRENT && retval > 0)
156 {
157 fprintf(stderr,"PAPI library version mismatch!\en");
158 exit(1);
159 }
162 cout << "Not initialized" << endl;
163#endif
164
165 if(argc < 3)
166 {
167 if(myrank == 0)
168 {
169 cout << "Usage: ./FilteredBFS <File, Gen> <Input Name | Scale> (Optional: Double)" << endl;
170 cout << "Example: ./FilteredBFS File twitter_small.txt Double" << endl;
171 }
172 MPI_Finalize();
173 return -1;
174 }
175 {
179 fullWorld.reset( new CommGrid(MPI_COMM_WORLD, 0, 0) );
180
181 // Declare objects
183 FullyDistVec<int64_t, int64_t> indegrees(fullWorld); // in-degrees of vertices (including multi-edges and self-loops)
184 FullyDistVec<int64_t, int64_t> oudegrees(fullWorld); // out-degrees of vertices (including multi-edges and self-loops)
185 FullyDistVec<int64_t, int64_t> degrees(fullWorld); // combined degrees of vertices (including multi-edges and self-loops)
187
188 double t01 = MPI_Wtime();
189 if(string(argv[1]) == string("File")) // text|binary input option
190 {
191 SpParHelper::Print("Using real data, which we NEVER permute for load balance, also leaving isolated vertices as-is, if any\n");
192 SpParHelper::Print("This is because the input is assumed to be load balanced\n");
193 SpParHelper::Print("BFS is run on DIRECTED graph, hence hitting SCCs, and TEPS is unidirectional\n");
194
195 // ReadDistribute (const string & filename, int master, bool nonum, HANDLER handler, bool transpose, bool pario)
196 // if nonum is true, then numerics are not supplied and they are assumed to be all 1's
197 A.ReadDistribute(string(argv[2]), 0, false, TwitterReadSaveHandler<int64_t>(), true, true); // read it from file (and transpose on the fly)
198
199 A.PrintInfo();
200 SpParHelper::Print("Read input\n");
201
202 ABool = new PSpMat_Bool(A);
203 if(argc == 4 && string(argv[3]) == string("Double"))
204 {
205 MAXTRIALS = 2;
206 }
207 else
208 {
209 MAXTRIALS = 1;
210 }
211 }
212 else if(string(argv[1]) == string("Gen"))
213 {
214 SpParHelper::Print("Using synthetic data, which we ALWAYS permute for load balance\n");
215 SpParHelper::Print("We only balance the original input, we don't repermute after each filter change\n");
216 SpParHelper::Print("BFS is run on UNDIRECTED graph, hence hitting CCs, and TEPS is bidirectional\n");
217
218 double initiator[4] = {.57, .19, .19, .05};
219 double t01 = MPI_Wtime();
220 double t02;
222
223 unsigned scale = static_cast<unsigned>(atoi(argv[2]));
225 outs << "Forcing scale to : " << scale << endl;
227
228 // parameters: (double initiator[4], int log_numverts, int edgefactor, bool scramble, bool packed)
229 DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, true ); // generate packed edges
230 SpParHelper::Print("Generated renamed edge lists\n");
231
232 ABool = new PSpMat_Bool(*DEL, false);
233 int64_t removed = ABool->RemoveLoops();
235 loopinfo << "Converted to Boolean and removed " << removed << " loops" << endl;
237 ABool->PrintInfo();
238 delete DEL; // free memory
239 A = PSpMat_Twitter(*ABool); // any upcasting generates the default object
240
241 MAXTRIALS = PERCENTS; // benchmarking
242 }
243 else
244 {
245 SpParHelper::Print("Not supported yet\n");
246 return 0;
247 }
248 double t02 = MPI_Wtime();
250 tinfo << "I/O (or generation) took " << t02-t01 << " seconds" << endl;
252
253 // indegrees is sum along rows because A is loaded as "tranposed", similarly oudegrees is sum along columns
254 ABool->PrintInfo();
255 ABool->Reduce(oudegrees, Column, plus<int64_t>(), static_cast<int64_t>(0));
256 ABool->Reduce(indegrees, Row, plus<int64_t>(), static_cast<int64_t>(0));
257
258 // indegrees_filt and oudegrees_filt is used for the real data
261
262 typedef FullyDistVec<int64_t, int64_t> IntVec; // used for the synthetic data (symmetricized before randomization)
264 int64_t keep[PERCENTS] = {100, 1000, 2500, 10000}; // ratio of edges kept in range (0, 10000)
265
266 if(string(argv[1]) == string("File")) // if using synthetic data, no notion of out/in degrees after randomization exist
267 {
268 struct tm timeinfo;
269 memset(&timeinfo, 0, sizeof(struct tm));
270 int year, month, day, hour, min, sec;
271 year = 2009; month = 7; day = 1;
272 hour = 0; min = 0; sec = 0;
273
274 timeinfo.tm_year = year - 1900; // year is "years since 1900"
275 timeinfo.tm_mon = month - 1 ; // month is in range 0...11
276 timeinfo.tm_mday = day; // range 1...31
277 timeinfo.tm_hour = hour; // range 0...23
278 timeinfo.tm_min = min; // range 0...59
279 timeinfo.tm_sec = sec; // range 0.
281
285 BBool.PrintInfo();
286 BBool.Reduce(oudegrees_filt, Column, plus<int64_t>(), static_cast<int64_t>(0));
287 BBool.Reduce(indegrees_filt, Row, plus<int64_t>(), static_cast<int64_t>(0));
288 }
289
291 degrees.EWiseApply(oudegrees, plus<int64_t>());
292 SpParHelper::Print("All degrees calculated\n");
293 delete ABool;
294
295 float balance = A.LoadImbalance();
297 outs << "Load balance: " << balance << endl;
299
300 if(string(argv[1]) == string("Gen"))
301 {
302 // We symmetricize before we apply the random generator
303 // Otherwise += will naturally add the random numbers together
304 // hence will create artificially high-permeable filters
305 Symmetricize(A); // A += A';
306 SpParHelper::Print("Symmetricized\n");
307
308 A.Apply(Twitter_obj_randomizer());
309 A.PrintInfo();
310
312 SpParHelper::Print("Found (and permuted) non-isolated vertices\n");
313 nonisov->RandPerm(); // so that A(v,v) is load-balanced (both memory and time wise)
314 A(*nonisov, *nonisov, true); // in-place permute to save memory
315 SpParHelper::Print("Dropped isolated vertices from input\n");
316
317 indegrees = indegrees(*nonisov); // fix the degrees arrays too
320 delete nonisov;
321
322 for (int i=0; i < PERCENTS; i++)
323 {
325 B.Prune(bind2nd(Twitter_materialize(), keep[i]));
327 BBool.PrintInfo();
328 float balance = B.LoadImbalance();
330 outs << "Load balance of " << static_cast<float>(keep[i])/100 << "% filtered case: " << balance << endl;
332
333 // degrees_filt[i] is by-default generated as permuted
334 BBool.Reduce(degrees_filt[i], Column, plus<int64_t>(), static_cast<int64_t>(0)); // Column=Row since BBool is symmetric
335 }
336 }
337 // real data is pre-balanced, because otherwise even the load balancing routine crushes due to load imbalance
338
339 float balance_former = A.LoadImbalance();
341 outs_former << "Load balance: " << balance_former << endl;
343
345 double t1 = MPI_Wtime();
346
348 double nver = (double) degrees.TotalLength();
349 Cands.SelectCandidates(nver);
350
351 for(int trials =0; trials < MAXTRIALS; trials++)
352 {
353 if(string(argv[1]) == string("Gen"))
354 {
357 outs << "Initializing since date (only once) to " << LatestRetwitterBFS::sincedate << endl;
359 }
360
363
364 double MTEPS[ITERS]; double INVMTEPS[ITERS]; double TIMES[ITERS]; double EDGES[ITERS];
365 double MPEPS[ITERS]; double INVMPEPS[ITERS];
366 int sruns = 0; // successful runs
367 for(int i=0; i<MAX_ITERS && sruns < ITERS; ++i)
368 {
369
370 // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
371 FullyDistVec<int64_t, ParentType> parents ( A.getcommgrid(), A.getncol(), ParentType());
372
373 // FullyDistSpVec ( shared_ptr<CommGrid> grid, IT glen);
374 FullyDistSpVec<int64_t, ParentType> fringe(A.getcommgrid(), A.getncol());
375
377 double t1 = MPI_Wtime();
378
379 #ifdef USE_PAPI
382 #endif
383
384 fringe.SetElement(Cands[i], Cands[i]);
385 parents.SetElement(Cands[i], ParentType(Cands[i])); // make root discovered
386 int iterations = 0;
387 while(fringe.getnnz() > 0)
388 {
389 #ifdef USE_PAPI
391 #endif
392 fringe.ApplyInd(NumSetter);
393
394 #ifdef USE_PAPI
398 #endif
399
400 // SpMV with sparse vector, optimizations disabled for generality
402
403 #ifdef USE_PAPI
407
409 papi_this_iterate[bfs_papi_enum.SpMV].resize(combblas_papi_num_events+1); // +1 for the time
410 for(int k=0; k<combblas_papi_num_events; ++k)
411 {
413 }
415
419 #endif
420
421 // EWiseApply (const FullyDistSpVec<IU,NU1> & V, const FullyDistVec<IU,NU2> & W,
422 // _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, NU1 Vzero)
424
425 #ifdef USE_PAPI
429
431 papi_this_iterate[bfs_papi_enum.fringe_updt].resize(combblas_papi_num_events+1); // +1 for the time
432 for(int k=0; k<combblas_papi_num_events; ++k)
433 {
434 papi_this_iterate[bfs_papi_enum.fringe_updt][k] = ptr2values[k];
435 }
437
441 #endif
442
443 parents += fringe; // ABAB: Convert this to EwiseApply for compliance in PAPI timings
444
445 #ifdef USE_PAPI
449
451 papi_this_iterate[bfs_papi_enum.parents_updt].resize(combblas_papi_num_events+1); // +1 for the time
452 for(int k=0; k<combblas_papi_num_events; ++k)
453 {
454 papi_this_iterate[bfs_papi_enum.parents_updt][k] = ptr2values[k];
455 }
457 bfs_counters.push_back(papi_this_iterate); // copy to bfs_counters[iterations]
458 #endif
459
460 iterations++;
461 }
463 double t2 = MPI_Wtime();
464
465
468
469#ifndef ONLYTIME
476 int64_t nedges, in_nedges, ou_nedges;
477 if(string(argv[1]) == string("Gen"))
478 {
480 nedges = traversed.Reduce(plus<int64_t>(), (int64_t) 0);
481 }
482 else
483 {
488 nedges = in_nedges + ou_nedges; // count birectional edges twice
489 }
490
493 int64_t nedges_processed = in_nedges_processed + ou_nedges_processed; // count birectional edges twice
494#else
496#endif
497
498 if(parentsp.getnnz() > CC_LIMIT)
499 {
500 // intraversed.PrintInfo("Incoming edges traversed per vertex");
501 // intraversed.DebugPrint();
502 // outraversed.PrintInfo("Outgoing edges traversed per vertex");
503 // outraversed.DebugPrint();
504
505 #ifdef DEBUG
506 parents.PrintInfo("Final parents array");
507 parents.DebugPrint();
508 #endif
509
511 outnew << i << "th starting vertex was " << Cands[i] << endl;
512 outnew << "Number iterations: " << iterations << endl;
513 outnew << "Number of vertices found: " << parentsp.getnnz() << endl;
514 outnew << "Number of edges traversed in both directions: " << nedges << endl;
515 if(string(argv[1]) == string("File"))
516 outnew << "Number of edges traversed in one direction: " << ou_nedges << endl;
517 outnew << "Number of edges processed in both directions: " << nedges_processed << endl;
518 outnew << "Number of edges processed in one direction: " << ou_nedges_processed << endl;
519 outnew << "BFS time: " << t2-t1 << " seconds" << endl;
520 outnew << "MTEPS (bidirectional): " << static_cast<double>(nedges) / (t2-t1) / 1000000.0 << endl;
521 if(string(argv[1]) == string("File"))
522 outnew << "MTEPS (unidirectional): " << static_cast<double>(ou_nedges) / (t2-t1) / 1000000.0 << endl;
523 outnew << "MPEPS (bidirectional): " << static_cast<double>(nedges_processed) / (t2-t1) / 1000000.0 << endl;
524 outnew << "MPEPS (unidirectional): " << static_cast<double>(ou_nedges_processed) / (t2-t1) / 1000000.0 << endl;
525 outnew << "Total communication (average so far): " << (cblas_allgathertime + cblas_alltoalltime) / (i+1) << endl;
526
527 /* Write to PAPI */
528 #ifdef USE_PAPI
530 papiout << i << "th starting vertex was " << Cands[i] << endl;
531 papiout << "Threshold is " << LatestRetwitterBFS::sincedate << endl;
532
533 for(int i=0; i < iterations; i++) // over all spmv iterations in this BFS
534 {
535 papiout << "Iteration : " << i << endl;
536 for(int j=0; j < bfs_papi_labels; ++j)
537 {
538 papiout << "Function : " << bfs_papi_labels[j] << endl;
539
540 for(int k=0; k < combblas_papi_num_events; ++k)
541 {
542 papiout << combblas_event_names[k] << ":\t" << bfs_counters[i][j][k] << endl;
543 }
544 papiout << "Time (usec)" << ":\t" << bfs_counters[i][j][combblas_papi_num_events] << endl;
545 }
546 }
547 SpParHelper::PrintFile(papiout.str(), "PAPIRES.txt");
548 #endif
549 /* (End) Write to PAPI */
550
551 TIMES[sruns] = t2-t1;
552 if(string(argv[1]) == string("Gen"))
553 EDGES[sruns] = static_cast<double>(nedges);
554 else
555 EDGES[sruns] = static_cast<double>(ou_nedges);
556
557 MTEPS[sruns] = EDGES[sruns] / (t2-t1) / 1000000.0;
558 MPEPS[sruns++] = static_cast<double>(nedges_processed) / (t2-t1) / 1000000.0;
560 }
561 }
562 if (sruns < MINRUNS)
563 {
564 SpParHelper::Print("Not enough valid runs done\n");
565 MPI_Finalize();
566 return 0;
567 }
569
570 os << sruns << " valid runs done" << endl;
571 os << "Connected component lower limite was " << CC_LIMIT << endl;
572 os << "Per iteration communication times: " << endl;
573 os << "AllGatherv: " << cblas_allgathertime / sruns << endl;
574 os << "AlltoAllv: " << cblas_alltoalltime / sruns << endl;
575
576 sort(EDGES, EDGES+sruns);
577 os << "--------------------------" << endl;
578 os << "Min nedges: " << EDGES[0] << endl;
579 os << "Median nedges: " << (EDGES[(sruns/2)-1] + EDGES[sruns/2])/2 << endl;
580 os << "Max nedges: " << EDGES[sruns-1] << endl;
581 double mean = accumulate( EDGES, EDGES+sruns, 0.0 )/ sruns;
582 vector<double> zero_mean(sruns); // find distances to the mean
584 // self inner-product is sum of sum of squares
585 double deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
586 deviation = sqrt( deviation / (sruns-1) );
587 os << "Mean nedges: " << mean << endl;
588 os << "STDDEV nedges: " << deviation << endl;
589 os << "--------------------------" << endl;
590
591 sort(TIMES,TIMES+sruns);
592 os << "Filter keeps " << static_cast<double>(keep[trials])/100.0 << " percentage of edges" << endl;
593 os << "Min time: " << TIMES[0] << " seconds" << endl;
594 os << "Median time: " << (TIMES[(sruns/2)-1] + TIMES[sruns/2])/2 << " seconds" << endl;
595 os << "Max time: " << TIMES[sruns-1] << " seconds" << endl;
596 mean = accumulate( TIMES, TIMES+sruns, 0.0 )/ sruns;
598 deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
599 deviation = sqrt( deviation / (sruns-1) );
600 os << "Mean time: " << mean << " seconds" << endl;
601 os << "STDDEV time: " << deviation << " seconds" << endl;
602 os << "--------------------------" << endl;
603
604 sort(MTEPS, MTEPS+sruns);
605 os << "Min MTEPS: " << MTEPS[0] << endl;
606 os << "Median MTEPS: " << (MTEPS[(sruns/2)-1] + MTEPS[sruns/2])/2 << endl;
607 os << "Max MTEPS: " << MTEPS[sruns-1] << endl;
608 transform(MTEPS, MTEPS+sruns, INVMTEPS, safemultinv<double>()); // returns inf for zero teps
609 double hteps = static_cast<double>(sruns) / accumulate(INVMTEPS, INVMTEPS+sruns, 0.0);
610 os << "Harmonic mean of MTEPS: " << hteps << endl;
612 deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
613 deviation = sqrt( deviation / (sruns-1) ) * (hteps*hteps); // harmonic_std_dev
614 os << "Harmonic standard deviation of MTEPS: " << deviation << endl;
615
616 sort(MPEPS, MPEPS+sruns);
617 os << "Bidirectional Processed Edges per second (to estimate sustained BW)"<< endl;
618 os << "Min MPEPS: " << MPEPS[0] << endl;
619 os << "Median MPEPS: " << (MPEPS[(sruns/2)-1] + MPEPS[sruns/2])/2 << endl;
620 os << "Max MPEPS: " << MPEPS[sruns-1] << endl;
621 transform(MPEPS, MPEPS+sruns, INVMPEPS, safemultinv<double>()); // returns inf for zero teps
622 double hpeps = static_cast<double>(sruns) / accumulate(INVMPEPS, INVMPEPS+sruns, 0.0);
623 os << "Harmonic mean of MPEPS: " << hpeps << endl;
625 deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
626 deviation = sqrt( deviation / (sruns-1) ) * (hpeps*hpeps); // harmonic_std_dev
627 os << "Harmonic standard deviation of MPEPS: " << deviation << endl;
628 SpParHelper::Print(os.str());
629 }
630 }
631 MPI_Finalize();
632 return 0;
633}
634
#define MAXTRIALS
int main()
Definition Driver.cpp:12
#define MAX_ITERS
#define ITERS
#define MINRUNS
int cblas_splits
double cblas_allgathertime
double cblas_alltoalltime
#define PERCENTS
#define CC_LIMIT
MTRand GlobalMT(1)
#define EDGEFACTOR
MTRand GlobalMT(1)
SpParMat< int64_t, TwitterEdge, SpDCCols< int64_t, TwitterEdge > > PSpMat_Twitter
ParentType NumSetter(ParentType &num, int64_t index)
SpParMat< int64_t, bool, SpDCCols< int64_t, bool > > PSpMat_Bool
Definition auction.cpp:112
Definition test.cpp:53
double rand()
void GenGraph500Data(double initiator[4], int log_numverts, int edgefactor, bool scramble=false, bool packed=false)
static void PrintFile(const std::string &s, const std::string &filename)
static void Print(const std::string &s)
int nprocs
Definition comms.cpp:55
int combblas_papi_events[]
std::vector< std::vector< std::vector< long long > > > bfs_counters
int num_bfs_papi_labels
int combblas_papi_num_events
std::string combblas_event_names[]
std::string bfs_papi_labels
@ Column
Definition SpDefs.h:115
void Symmetricize(PARMAT &A)
Definition ReadMatDist.h:29
double A
static time_t sincedate
bool operator()(const TwitterEdge &x, time_t sincedate) const
const TwitterEdge operator()(const TwitterEdge &x) const