MueLu  Version of the Day
MueLu_RepartitionFactory_def.hpp
Go to the documentation of this file.
1 // @HEADER
2 //
3 // ***********************************************************************
4 //
5 // MueLu: A package for multigrid based preconditioning
6 // Copyright 2012 Sandia Corporation
7 //
8 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
9 // the U.S. Government retains certain rights in this software.
10 //
11 // Redistribution and use in source and binary forms, with or without
12 // modification, are permitted provided that the following conditions are
13 // met:
14 //
15 // 1. Redistributions of source code must retain the above copyright
16 // notice, this list of conditions and the following disclaimer.
17 //
18 // 2. Redistributions in binary form must reproduce the above copyright
19 // notice, this list of conditions and the following disclaimer in the
20 // documentation and/or other materials provided with the distribution.
21 //
22 // 3. Neither the name of the Corporation nor the names of the
23 // contributors may be used to endorse or promote products derived from
24 // this software without specific prior written permission.
25 //
26 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
27 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
29 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
30 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
31 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
32 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
33 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
34 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
35 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
36 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37 //
38 // Questions? Contact
39 // Jonathan Hu (jhu@sandia.gov)
40 // Andrey Prokopenko (aprokop@sandia.gov)
41 // Ray Tuminaro (rstumin@sandia.gov)
42 //
43 // ***********************************************************************
44 //
45 // @HEADER
46 #ifndef MUELU_REPARTITIONFACTORY_DEF_HPP
47 #define MUELU_REPARTITIONFACTORY_DEF_HPP
48 
49 #include <algorithm>
50 #include <iostream>
51 #include <sstream>
52 
53 #include "MueLu_RepartitionFactory_decl.hpp" // TMP JG NOTE: before other includes, otherwise I cannot test the fwd declaration in _def
54 
55 #ifdef HAVE_MPI
56 #include <Teuchos_DefaultMpiComm.hpp>
57 #include <Teuchos_CommHelpers.hpp>
58 
59 #include <Xpetra_Map.hpp>
60 #include <Xpetra_MapFactory.hpp>
61 #include <Xpetra_VectorFactory.hpp>
62 #include <Xpetra_Import.hpp>
63 #include <Xpetra_ImportFactory.hpp>
64 #include <Xpetra_Export.hpp>
65 #include <Xpetra_ExportFactory.hpp>
66 #include <Xpetra_Matrix.hpp>
67 #include <Xpetra_MatrixFactory.hpp>
68 
69 #include "MueLu_Utilities.hpp"
70 
71 #include "MueLu_Level.hpp"
72 #include "MueLu_MasterList.hpp"
73 #include "MueLu_Monitor.hpp"
74 
75 namespace MueLu {
76 
77  template <class Scalar, class LocalOrdinal, class GlobalOrdinal, class Node>
79  RCP<ParameterList> validParamList = rcp(new ParameterList());
80 
81 #define SET_VALID_ENTRY(name) validParamList->setEntry(name, MasterList::getEntry(name))
82  SET_VALID_ENTRY("repartition: start level");
83  SET_VALID_ENTRY("repartition: min rows per proc");
84  SET_VALID_ENTRY("repartition: max imbalance");
85  SET_VALID_ENTRY("repartition: print partition distribution");
86  SET_VALID_ENTRY("repartition: remap parts");
87  SET_VALID_ENTRY("repartition: remap num values");
88 #undef SET_VALID_ENTRY
89 
90  validParamList->set< RCP<const FactoryBase> >("A", Teuchos::null, "Factory of the matrix A");
91  validParamList->set< RCP<const FactoryBase> >("Partition", Teuchos::null, "Factory of the partition");
92 
93  return validParamList;
94  }
95 
96  template <class Scalar, class LocalOrdinal, class GlobalOrdinal, class Node>
98  Input(currentLevel, "A");
99  Input(currentLevel, "Partition");
100  }
101 
102  template<class T> class MpiTypeTraits { public: static MPI_Datatype getType(); };
103  template<> class MpiTypeTraits<long> { public: static MPI_Datatype getType() { return MPI_LONG; } };
104  template<> class MpiTypeTraits<int> { public: static MPI_Datatype getType() { return MPI_INT; } };
105  template<> class MpiTypeTraits<short> { public: static MPI_Datatype getType() { return MPI_SHORT; } };
106  template<> class MpiTypeTraits<unsigned> { public: static MPI_Datatype getType() { return MPI_UNSIGNED; } };
107 #ifdef HAVE_XPETRA_INT_LONG_LONG
108  template<> class MpiTypeTraits<long long> { public: static MPI_Datatype getType() { return MPI_LONG_LONG; } };
109 #endif
110 
111  template <class Scalar, class LocalOrdinal, class GlobalOrdinal, class Node>
113  FactoryMonitor m(*this, "Build", currentLevel);
114 
115  const Teuchos::ParameterList & pL = GetParameterList();
116  // Access parameters here to make sure that we set the parameter entry flag to "used" even in case of short-circuit evaluation.
117  // TODO (JG): I don't really know if we want to do this.
118  const int startLevel = pL.get<int> ("repartition: start level");
119  const LO minRowsPerProcessor = pL.get<LO> ("repartition: min rows per proc");
120  const double nonzeroImbalance = pL.get<double>("repartition: max imbalance");
121  const bool remapPartitions = pL.get<bool> ("repartition: remap parts");
122 
123  // TODO: We only need a CrsGraph. This class does not have to be templated on Scalar types.
124  RCP<Matrix> A = Get< RCP<Matrix> >(currentLevel, "A");
125 
126  // ======================================================================================================
127  // Determine whether partitioning is needed
128  // ======================================================================================================
129  // NOTE: most tests include some global communication, which is why we currently only do tests until we make
130  // a decision on whether to repartition. However, there is value in knowing how "close" we are to having to
131  // rebalance an operator. So, it would probably be beneficial to do and report *all* tests.
132 
133  // Test1: skip repartitioning if current level is less than the specified minimum level for repartitioning
134  if (currentLevel.GetLevelID() < startLevel) {
135  GetOStream(Statistics0) << "Repartitioning? NO:" <<
136  "\n current level = " << Teuchos::toString(currentLevel.GetLevelID()) <<
137  ", first level where repartitioning can happen is " + Teuchos::toString(startLevel) << std::endl;
138 
139  Set<RCP<const Import> >(currentLevel, "Importer", Teuchos::null);
140  return;
141  }
142 
143  RCP<const Map> rowMap = A->getRowMap();
144 
145  // NOTE: Teuchos::MPIComm::duplicate() calls MPI_Bcast inside, so this is
146  // a synchronization point. However, as we do MueLu_sumAll afterwards anyway, it
147  // does not matter.
148  RCP<const Teuchos::Comm<int> > origComm = rowMap->getComm();
149  RCP<const Teuchos::Comm<int> > comm = origComm->duplicate();
150 
151  // Test 2: check whether A is actually distributed, i.e. more than one processor owns part of A
152  // TODO: this global communication can be avoided if we store the information with the matrix (it is known when matrix is created)
153  // TODO: further improvements could be achieved when we use subcommunicator for the active set. Then we only need to check its size
154  {
155  int numActiveProcesses = 0;
156  MueLu_sumAll(comm, Teuchos::as<int>((A->getNodeNumRows() > 0) ? 1 : 0), numActiveProcesses);
157 
158  if (numActiveProcesses == 1) {
159  GetOStream(Statistics0) << "Repartitioning? NO:" <<
160  "\n # processes with rows = " << Teuchos::toString(numActiveProcesses) << std::endl;
161 
162  Set<RCP<const Import> >(currentLevel, "Importer", Teuchos::null);
163  return;
164  }
165  }
166 
167  bool test3 = false, test4 = false;
168  std::string msg3, msg4;
169 
170  // Test3: check whether number of rows on any processor satisfies the minimum number of rows requirement
171  // NOTE: Test2 ensures that repartitionning is not done when there is only one processor (it may or may not satisfy Test3)
172  if (minRowsPerProcessor > 0) {
173  LO numMyRows = Teuchos::as<LO>(A->getNodeNumRows()), minNumRows, LOMAX = Teuchos::OrdinalTraits<LO>::max();
174  LO haveFewRows = (numMyRows < minRowsPerProcessor ? 1 : 0), numWithFewRows = 0;
175  MueLu_sumAll(comm, haveFewRows, numWithFewRows);
176  MueLu_minAll(comm, (numMyRows > 0 ? numMyRows : LOMAX), minNumRows);
177 
178  // TODO: we could change it to repartition only if the number of processors with numRows < minNumRows is larger than some
179  // percentage of the total number. This way, we won't repartition if 2 out of 1000 processors don't have enough elements.
180  // I'm thinking maybe 20% threshold. To implement, simply add " && numWithFewRows < .2*numProcs" to the if statement.
181  if (numWithFewRows > 0)
182  test3 = true;
183 
184  msg3 = "\n min # rows per proc = " + Teuchos::toString(minNumRows) + ", min allowable = " + Teuchos::toString(minRowsPerProcessor);
185  }
186 
187  // Test4: check whether the balance in the number of nonzeros per processor is greater than threshold
188  if (!test3) {
189  GO minNnz, maxNnz, numMyNnz = Teuchos::as<GO>(A->getNodeNumEntries());
190  MueLu_maxAll(comm, numMyNnz, maxNnz);
191  MueLu_minAll(comm, (numMyNnz > 0 ? numMyNnz : maxNnz), minNnz); // min nnz over all active processors
192  double imbalance = Teuchos::as<double>(maxNnz)/minNnz;
193 
194  if (imbalance > nonzeroImbalance)
195  test4 = true;
196 
197  msg4 = "\n nonzero imbalance = " + Teuchos::toString(imbalance) + ", max allowable = " + Teuchos::toString(nonzeroImbalance);
198  }
199 
200  if (!test3 && !test4) {
201  GetOStream(Statistics0) << "Repartitioning? NO:" << msg3 + msg4 << std::endl;
202 
203  Set<RCP<const Import> >(currentLevel, "Importer", Teuchos::null);
204  return;
205  }
206 
207  GetOStream(Statistics0) << "Repartitioning? YES:" << msg3 + msg4 << std::endl;
208 
209  GO indexBase = rowMap->getIndexBase();
210  Xpetra::UnderlyingLib lib = rowMap->lib();
211  int myRank = comm->getRank();
212  int numProcs = comm->getSize();
213 
214  RCP<const Teuchos::MpiComm<int> > tmpic = rcp_dynamic_cast<const Teuchos::MpiComm<int> >(comm);
215  TEUCHOS_TEST_FOR_EXCEPTION(tmpic == Teuchos::null, Exceptions::RuntimeError, "Cannot cast base Teuchos::Comm to Teuchos::MpiComm object.");
216  RCP<const Teuchos::OpaqueWrapper<MPI_Comm> > rawMpiComm = tmpic->getRawMpiComm();
217 
218  // ======================================================================================================
219  // Calculate number of partitions
220  // ======================================================================================================
221  // FIXME Quick way to figure out how many partitions there should be (same algorithm as ML)
222  // FIXME Should take into account nnz? Perhaps only when user is using min #nnz per row threshold.
223  GO numPartitions;
224  if (currentLevel.IsAvailable("number of partitions")) {
225  numPartitions = currentLevel.Get<GO>("number of partitions");
226  GetOStream(Warnings0) << "Using user-provided \"number of partitions\", the performance is unknown" << std::endl;
227 
228  } else {
229  if (Teuchos::as<GO>(A->getGlobalNumRows()) < minRowsPerProcessor) {
230  // System is too small, migrate it to a single processor
231  numPartitions = 1;
232 
233  } else {
234  // Make sure that each processor has approximately minRowsPerProcessor
235  numPartitions = A->getGlobalNumRows() / minRowsPerProcessor;
236  }
237  numPartitions = std::min(numPartitions, Teuchos::as<GO>(numProcs));
238 
239  currentLevel.Set("number of partitions", numPartitions, NoFactory::get());
240  }
241  GetOStream(Statistics0) << "Number of partitions to use = " << numPartitions << std::endl;
242 
243  // ======================================================================================================
244  // Construct decomposition vector
245  // ======================================================================================================
246  RCP<GOVector> decomposition;
247  if (numPartitions == 1) {
248  // Trivial case: decomposition is the trivial one, all zeros. We skip the call to Zoltan_Interface
249  // (this is mostly done to avoid extra output messages, as even if we didn't skip there is a shortcut
250  // in Zoltan[12]Interface).
251  // TODO: We can probably skip more work in this case (like building all extra data structures)
252  GetOStream(Warnings0) << "Only one partition: Skip call to the repartitioner." << std::endl;
253  decomposition = Xpetra::VectorFactory<GO, LO, GO, NO>::Build(A->getRowMap(), true);
254 
255  } else {
256  decomposition = Get<RCP<GOVector> >(currentLevel, "Partition");
257 
258  if (decomposition.is_null()) {
259  GetOStream(Warnings0) << "No repartitioning necessary: partitions were left unchanged by the repartitioner" << std::endl;
260  Set<RCP<const Import> >(currentLevel, "Importer", Teuchos::null);
261  return;
262  }
263  }
264 
265  // ======================================================================================================
266  // Remap if necessary
267  // ======================================================================================================
268  // From a user perspective, we want user to not care about remapping, thinking of it as only a performance feature.
269  // There are two problems, however.
270  // (1) Next level aggregation depends on the order of GIDs in the vector, if one uses "natural" or "random" orderings.
271  // This also means that remapping affects next level aggregation, despite the fact that the _set_ of GIDs for
272  // each partition is the same.
273  // (2) Even with the fixed order of GIDs, the remapping may influence the aggregation for the next-next level.
274  // Let us consider the following example. Lets assume that when we don't do remapping, processor 0 would have
275  // GIDs {0,1,2}, and processor 1 GIDs {3,4,5}, and if we do remapping processor 0 would contain {3,4,5} and
276  // processor 1 {0,1,2}. Now, when we run repartitioning algorithm on the next level (say Zoltan1 RCB), it may
277  // be dependent on whether whether it is [{0,1,2}, {3,4,5}] or [{3,4,5}, {0,1,2}]. Specifically, the tie-breaking
278  // algorithm can resolve these differently. For instance, running
279  // mpirun -np 5 ./MueLu_ScalingTestParamList.exe --xml=easy_sa.xml --nx=12 --ny=12 --nz=12
280  // with
281  // <ParameterList name="MueLu">
282  // <Parameter name="coarse: max size" type="int" value="1"/>
283  // <Parameter name="repartition: enable" type="bool" value="true"/>
284  // <Parameter name="repartition: min rows per proc" type="int" value="2"/>
285  // <ParameterList name="level 1">
286  // <Parameter name="repartition: remap parts" type="bool" value="false/true"/>
287  // </ParameterList>
288  // </ParameterList>
289  // produces different repartitioning for level 2.
290  // This different repartitioning may then escalate into different aggregation for the next level.
291  //
292  // We fix (1) by fixing the order of GIDs in a vector by sorting the resulting vector.
293  // Fixing (2) is more complicated.
294  // FIXME: Fixing (2) in Zoltan may not be enough, as we may use some arbitration in MueLu,
295  // for instance with CoupledAggregation. What we really need to do is to use the same order of processors containing
296  // the same order of GIDs. To achieve that, the newly created subcommunicator must be conforming with the order. For
297  // instance, if we have [{0,1,2}, {3,4,5}], we create a subcommunicator where processor 0 gets rank 0, and processor 1
298  // gets rank 1. If, on the other hand, we have [{3,4,5}, {0,1,2}], we assign rank 1 to processor 0, and rank 0 to processor 1.
299  // This rank permutation requires help from Epetra/Tpetra, both of which have no such API in place.
300  // One should also be concerned that if we had such API in place, rank 0 in subcommunicator may no longer be rank 0 in
301  // MPI_COMM_WORLD, which may lead to issues for logging.
302  if (remapPartitions) {
303  SubFactoryMonitor m1(*this, "DeterminePartitionPlacement", currentLevel);
304 
305  DeterminePartitionPlacement(*A, *decomposition, numPartitions);
306  }
307 
308  // ======================================================================================================
309  // Construct importer
310  // ======================================================================================================
311  // At this point, the following is true:
312  // * Each processors owns 0 or 1 partitions
313  // * If a processor owns a partition, that partition number is equal to the processor rank
314  // * The decomposition vector contains the partitions ids that the corresponding GID belongs to
315 
316  ArrayRCP<const GO> decompEntries;
317  if (decomposition->getLocalLength() > 0)
318  decompEntries = decomposition->getData(0);
319 
320 #ifdef HAVE_MUELU_DEBUG
321  // Test range of partition ids
322  int incorrectRank = -1;
323  for (int i = 0; i < decompEntries.size(); i++)
324  if (decompEntries[i] >= numProcs || decompEntries[i] < 0) {
325  incorrectRank = myRank;
326  break;
327  }
328 
329  int incorrectGlobalRank = -1;
330  MueLu_maxAll(comm, incorrectRank, incorrectGlobalRank);
331  TEUCHOS_TEST_FOR_EXCEPTION(incorrectGlobalRank >- 1, Exceptions::RuntimeError, "pid " + Teuchos::toString(incorrectGlobalRank) + " encountered a partition number is that out-of-range");
332 #endif
333 
334  Array<GO> myGIDs;
335  myGIDs.reserve(decomposition->getLocalLength());
336 
337  // Step 0: Construct mapping
338  // part number -> GIDs I own which belong to this part
339  // NOTE: my own part GIDs are not part of the map
340  typedef std::map<GO, Array<GO> > map_type;
341  map_type sendMap;
342  for (LO i = 0; i < decompEntries.size(); i++) {
343  GO id = decompEntries[i];
344  GO GID = rowMap->getGlobalElement(i);
345 
346  if (id == myRank)
347  myGIDs .push_back(GID);
348  else
349  sendMap[id].push_back(GID);
350  }
351  decompEntries = Teuchos::null;
352 
353  if (IsPrint(Statistics2)) {
354  GO numLocalKept = myGIDs.size(), numGlobalKept, numGlobalRows = A->getGlobalNumRows();
355  MueLu_sumAll(comm,numLocalKept, numGlobalKept);
356  GetOStream(Statistics2) << "Unmoved rows: " << numGlobalKept << " / " << numGlobalRows << " (" << 100*Teuchos::as<double>(numGlobalKept)/numGlobalRows << "%)" << std::endl;
357  }
358 
359  int numSend = sendMap.size(), numRecv;
360 
361  // Arrayify map keys
362  Array<GO> myParts(numSend), myPart(1);
363  int cnt = 0;
364  myPart[0] = myRank;
365  for (typename map_type::const_iterator it = sendMap.begin(); it != sendMap.end(); it++)
366  myParts[cnt++] = it->first;
367 
368  // Step 1: Find out how many processors send me data
369  // partsIndexBase starts from zero, as the processors ids start from zero
370  GO partsIndexBase = 0;
371  RCP<Map> partsIHave = MapFactory ::Build(lib, Teuchos::OrdinalTraits<Xpetra::global_size_t>::invalid(), myParts(), partsIndexBase, comm);
372  RCP<Map> partsIOwn = MapFactory ::Build(lib, numProcs, myPart(), partsIndexBase, comm);
373  RCP<Export> partsExport = ExportFactory::Build(partsIHave, partsIOwn);
374 
375  RCP<GOVector> partsISend = Xpetra::VectorFactory<GO, LO, GO, NO>::Build(partsIHave);
376  RCP<GOVector> numPartsIRecv = Xpetra::VectorFactory<GO, LO, GO, NO>::Build(partsIOwn);
377  if (numSend) {
378  ArrayRCP<GO> partsISendData = partsISend->getDataNonConst(0);
379  for (int i = 0; i < numSend; i++)
380  partsISendData[i] = 1;
381  }
382  (numPartsIRecv->getDataNonConst(0))[0] = 0;
383 
384  numPartsIRecv->doExport(*partsISend, *partsExport, Xpetra::ADD);
385  numRecv = (numPartsIRecv->getData(0))[0];
386 
387  // Step 2: Get my GIDs from everybody else
388  MPI_Datatype MpiType = MpiTypeTraits<GO>::getType();
389  int msgTag = 12345; // TODO: use Comm::dup for all internal messaging
390 
391  // Post sends
392  Array<MPI_Request> sendReqs(numSend);
393  cnt = 0;
394  for (typename map_type::iterator it = sendMap.begin(); it != sendMap.end(); it++)
395  MPI_Isend(static_cast<void*>(it->second.getRawPtr()), it->second.size(), MpiType, Teuchos::as<GO>(it->first), msgTag, *rawMpiComm, &sendReqs[cnt++]);
396 
397  map_type recvMap;
398  size_t totalGIDs = myGIDs.size();
399  for (int i = 0; i < numRecv; i++) {
400  MPI_Status status;
401  MPI_Probe(MPI_ANY_SOURCE, msgTag, *rawMpiComm, &status);
402 
403  // Get rank and number of elements from status
404  int fromRank = status.MPI_SOURCE, count;
405  MPI_Get_count(&status, MpiType, &count);
406 
407  recvMap[fromRank].resize(count);
408  MPI_Recv(static_cast<void*>(recvMap[fromRank].getRawPtr()), count, MpiType, fromRank, msgTag, *rawMpiComm, &status);
409 
410  totalGIDs += count;
411  }
412 
413  // Do waits on send requests
414  if (numSend) {
415  Array<MPI_Status> sendStatuses(numSend);
416  MPI_Waitall(numSend, sendReqs.getRawPtr(), sendStatuses.getRawPtr());
417  }
418 
419  // Merge GIDs
420  myGIDs.reserve(totalGIDs);
421  for (typename map_type::const_iterator it = recvMap.begin(); it != recvMap.end(); it++) {
422  int offset = myGIDs.size(), len = it->second.size();
423  if (len) {
424  myGIDs.resize(offset + len);
425  memcpy(myGIDs.getRawPtr() + offset, it->second.getRawPtr(), len*sizeof(GO));
426  }
427  }
428  // NOTE 2: The general sorting algorithm could be sped up by using the knowledge that original myGIDs and all received chunks
429  // (i.e. it->second) are sorted. Therefore, a merge sort would work well in this situation.
430  std::sort(myGIDs.begin(), myGIDs.end());
431 
432  // Step 3: Construct importer
433  RCP<Map> newRowMap = MapFactory ::Build(lib, rowMap->getGlobalNumElements(), myGIDs(), indexBase, origComm);
434  RCP<const Import> rowMapImporter;
435  {
436  SubFactoryMonitor m1(*this, "Import construction", currentLevel);
437  rowMapImporter = ImportFactory::Build(rowMap, newRowMap);
438  }
439 
440  Set(currentLevel, "Importer", rowMapImporter);
441 
442  // ======================================================================================================
443  // Print some data
444  // ======================================================================================================
445  if (pL.get<bool>("repartition: print partition distribution") && IsPrint(Statistics2)) {
446  // Print the grid of processors
447  GetOStream(Statistics2) << "Partition distribution over cores (ownership is indicated by '+')" << std::endl;
448 
449  char amActive = (myGIDs.size() ? 1 : 0);
450  std::vector<char> areActive(numProcs, 0);
451  MPI_Gather(&amActive, 1, MPI_CHAR, &areActive[0], 1, MPI_CHAR, 0, *rawMpiComm);
452 
453  int rowWidth = std::min(Teuchos::as<int>(ceil(sqrt(numProcs))), 100);
454  for (int proc = 0; proc < numProcs; proc += rowWidth) {
455  for (int j = 0; j < rowWidth; j++)
456  if (proc + j < numProcs)
457  GetOStream(Statistics2) << (areActive[proc + j] ? "+" : ".");
458  else
459  GetOStream(Statistics2) << " ";
460 
461  GetOStream(Statistics2) << " " << proc << ":" << std::min(proc + rowWidth, numProcs) - 1 << std::endl;
462  }
463  }
464 
465  } // Build
466 
467  //----------------------------------------------------------------------
468  template<typename T, typename W>
469  struct Triplet {
470  T i, j;
471  W v;
472  };
473  template<typename T, typename W>
474  static bool compareTriplets(const Triplet<T,W>& a, const Triplet<T,W>& b) {
475  return (a.v > b.v); // descending order
476  }
477 
478  template <class Scalar, class LocalOrdinal, class GlobalOrdinal, class Node>
480  DeterminePartitionPlacement(const Matrix& A, GOVector& decomposition, GO numPartitions) const {
481  RCP<const Map> rowMap = A.getRowMap();
482 
483  RCP<const Teuchos::Comm<int> > comm = rowMap->getComm()->duplicate();
484  int numProcs = comm->getSize();
485 
486  RCP<const Teuchos::MpiComm<int> > tmpic = rcp_dynamic_cast<const Teuchos::MpiComm<int> >(comm);
487  TEUCHOS_TEST_FOR_EXCEPTION(tmpic == Teuchos::null, Exceptions::RuntimeError, "Cannot cast base Teuchos::Comm to Teuchos::MpiComm object.");
488  RCP<const Teuchos::OpaqueWrapper<MPI_Comm> > rawMpiComm = tmpic->getRawMpiComm();
489 
490  const Teuchos::ParameterList& pL = GetParameterList();
491 
492  // maxLocal is a constant which determins the number of largest edges which are being exchanged
493  // The idea is that we do not want to construct the full bipartite graph, but simply a subset of
494  // it, which requires less communication. By selecting largest local edges we hope to achieve
495  // similar results but at a lower cost.
496  const int maxLocal = pL.get<int>("repartition: remap num values");
497  const int dataSize = 2*maxLocal;
498 
499  ArrayRCP<GO> decompEntries;
500  if (decomposition.getLocalLength() > 0)
501  decompEntries = decomposition.getDataNonConst(0);
502 
503  // Step 1: Sort local edges by weight
504  // Each edge of a bipartite graph corresponds to a triplet (i, j, v) where
505  // i: processor id that has some piece of part with part_id = j
506  // j: part id
507  // v: weight of the edge
508  // We set edge weights to be the total number of nonzeros in rows on this processor which
509  // correspond to this part_id. The idea is that when we redistribute matrix, this weight
510  // is a good approximation of the amount of data to move.
511  // We use two maps, original which maps a partition id of an edge to the corresponding weight,
512  // and a reverse one, which is necessary to sort by edges.
513  std::map<GO,GO> lEdges;
514  for (LO i = 0; i < decompEntries.size(); i++)
515  lEdges[decompEntries[i]] += A.getNumEntriesInLocalRow(i);
516 
517  // Reverse map, so that edges are sorted by weight.
518  // This results in multimap, as we may have edges with the same weight
519  std::multimap<GO,GO> revlEdges;
520  for (typename std::map<GO,GO>::const_iterator it = lEdges.begin(); it != lEdges.end(); it++)
521  revlEdges.insert(std::make_pair(it->second, it->first));
522 
523  // Both lData and gData are arrays of data which we communicate. The data is stored
524  // in pairs, so that data[2*i+0] is the part index, and data[2*i+1] is the corresponding edge weight.
525  // We do not store processor id in data, as we can compute that by looking on the offset in the gData.
526  Array<GO> lData(dataSize, -1), gData(numProcs * dataSize);
527  int numEdges = 0;
528  for (typename std::multimap<GO,GO>::reverse_iterator rit = revlEdges.rbegin(); rit != revlEdges.rend() && numEdges < maxLocal; rit++) {
529  lData[2*numEdges+0] = rit->second; // part id
530  lData[2*numEdges+1] = rit->first; // edge weight
531  numEdges++;
532  }
533 
534  // Step 2: Gather most edges
535  // Each processors contributes maxLocal edges by providing maxLocal pairs <part id, weight>, which is of size dataSize
536  MPI_Datatype MpiType = MpiTypeTraits<GO>::getType();
537  MPI_Allgather(static_cast<void*>(lData.getRawPtr()), dataSize, MpiType, static_cast<void*>(gData.getRawPtr()), dataSize, MpiType, *rawMpiComm);
538 
539  // Step 3: Construct mapping
540 
541  // Construct the set of triplets
542  std::vector<Triplet<int,int> > gEdges(numProcs * maxLocal);
543  size_t k = 0;
544  for (LO i = 0; i < gData.size(); i += 2) {
545  GO part = gData[i+0];
546  GO weight = gData[i+1];
547  if (part != -1) { // skip nonexistent edges
548  gEdges[k].i = i/dataSize; // determine the processor by its offset (since every processor sends the same amount)
549  gEdges[k].j = part;
550  gEdges[k].v = weight;
551  k++;
552  }
553  }
554  gEdges.resize(k);
555 
556  // Sort edges by weight
557  // NOTE: compareTriplets is actually a reverse sort, so the edges weight is in decreasing order
558  std::sort(gEdges.begin(), gEdges.end(), compareTriplets<int,int>);
559 
560  // Do matching
561  std::map<int,int> match;
562  std::vector<char> matchedRanks(numProcs, 0), matchedParts(numProcs, 0);
563  int numMatched = 0;
564  for (typename std::vector<Triplet<int,int> >::const_iterator it = gEdges.begin(); it != gEdges.end(); it++) {
565  GO rank = it->i;
566  GO part = it->j;
567  if (matchedRanks[rank] == 0 && matchedParts[part] == 0) {
568  matchedRanks[rank] = 1;
569  matchedParts[part] = 1;
570  match[part] = rank;
571  numMatched++;
572  }
573  }
574  GetOStream(Statistics0) << "Number of unassigned paritions before cleanup stage: " << (numPartitions - numMatched) << " / " << numPartitions << std::endl;
575 
576  // Step 4: Assign unassigned partitions
577  // We do that through random matching for remaining partitions. Not all part numbers are valid, but valid parts are a subset of [0, numProcs).
578  // The reason it is done this way is that we don't need any extra communication, as we don't need to know which parts are valid.
579  for (int part = 0, matcher = 0; part < numProcs; part++)
580  if (match.count(part) == 0) {
581  // Find first non-matched rank
582  while (matchedRanks[matcher])
583  matcher++;
584 
585  match[part] = matcher++;
586  }
587 
588  // Step 5: Permute entries in the decomposition vector
589  for (LO i = 0; i < decompEntries.size(); i++)
590  decompEntries[i] = match[decompEntries[i]];
591  }
592 
593 } // namespace MueLu
594 
595 #endif //ifdef HAVE_MPI
596 
597 #endif // MUELU_REPARTITIONFACTORY_DEF_HPP
Important warning messages (one line)
void Build(Level &currentLevel) const
Build an object with this factory.
#define MueLu_maxAll(rcpComm, in, out)
std::string toString(const T &what)
Little helper function to convert non-string types to strings.
Timer to be used in factories. Similar to Monitor but with additional timers.
static MPI_Datatype getType()
Namespace for MueLu classes and methods.
#define SET_VALID_ENTRY(name)
static const NoFactory * get()
Print even more statistics.
Print statistics that do not involve significant additional computation.
static bool compareTriplets(const Triplet< T, W > &a, const Triplet< T, W > &b)
#define MueLu_sumAll(rcpComm, in, out)
Class that holds all level-specific information.
Definition: MueLu_Level.hpp:99
Timer to be used in factories. Similar to SubMonitor but adds a timer level by level.
void DeclareInput(Level &currentLevel) const
Determines the data that RepartitionFactory needs, and the factories that generate that data...
Exception throws to report errors in the internal logical of the program.
RCP< const ParameterList > GetValidParameterList() const
Return a const parameter list of valid parameters that setParameterList() will accept.
void DeterminePartitionPlacement(const Matrix &A, GOVector &decomposition, GO numPartitions) const
Determine which process should own each partition.
#define MueLu_minAll(rcpComm, in, out)