Tpetra parallel linear algebra  Version of the Day
Tpetra_Distributor.cpp
1 // ***********************************************************************
2 //
3 // Tpetra: Templated Linear Algebra Services Package
4 // Copyright (2008) Sandia Corporation
5 //
6 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7 // the U.S. Government retains certain rights in this software.
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are
11 // met:
12 //
13 // 1. Redistributions of source code must retain the above copyright
14 // notice, this list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright
17 // notice, this list of conditions and the following disclaimer in the
18 // documentation and/or other materials provided with the distribution.
19 //
20 // 3. Neither the name of the Corporation nor the names of the
21 // contributors may be used to endorse or promote products derived from
22 // this software without specific prior written permission.
23 //
24 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 //
36 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
37 //
38 // ************************************************************************
39 // @HEADER
40 
41 #include "Tpetra_Distributor.hpp"
42 #include "Tpetra_Details_gathervPrint.hpp"
43 #include "Tpetra_Details_makeValidVerboseStream.hpp"
44 #include "Teuchos_StandardParameterEntryValidators.hpp"
45 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
46 #include <numeric>
47 
48 namespace Tpetra {
49  namespace Details {
50  std::string
52  {
53  if (sendType == DISTRIBUTOR_ISEND) {
54  return "Isend";
55  }
56  else if (sendType == DISTRIBUTOR_RSEND) {
57  return "Rsend";
58  }
59  else if (sendType == DISTRIBUTOR_SEND) {
60  return "Send";
61  }
62  else if (sendType == DISTRIBUTOR_SSEND) {
63  return "Ssend";
64  }
65  else {
66  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
67  "EDistributorSendType enum value " << sendType << ".");
68  }
69  }
70 
71  std::string
73  {
74  switch (how) {
75  case Details::DISTRIBUTOR_NOT_INITIALIZED:
76  return "Not initialized yet";
77  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
78  return "By createFromSends";
79  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
80  return "By createFromRecvs";
81  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
82  return "By createFromSendsAndRecvs";
83  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
84  return "By createReverseDistributor";
85  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
86  return "By copy constructor";
87  default:
88  return "INVALID";
89  }
90  }
91  } // namespace Details
92 
93  Teuchos::Array<std::string>
95  {
96  Teuchos::Array<std::string> sendTypes;
97  sendTypes.push_back ("Isend");
98  sendTypes.push_back ("Rsend");
99  sendTypes.push_back ("Send");
100  sendTypes.push_back ("Ssend");
101  return sendTypes;
102  }
103 
104  // We set default values of Distributor's Boolean parameters here,
105  // in this one place. That way, if we want to change the default
106  // value of a parameter, we don't have to search the whole file to
107  // ensure a consistent setting.
108  namespace {
109  // Default value of the "Debug" parameter.
110  const bool tpetraDistributorDebugDefault = false;
111  // Default value of the "Barrier between receives and sends" parameter.
112  const bool barrierBetween_default = false;
113  // Default value of the "Use distinct tags" parameter.
114  const bool useDistinctTags_default = true;
115  } // namespace (anonymous)
116 
117  int Distributor::getTag (const int pathTag) const {
118  return useDistinctTags_ ? pathTag : comm_->getTag ();
119  }
120 
121 
122 #ifdef TPETRA_DISTRIBUTOR_TIMERS
123  void Distributor::makeTimers () {
124  const std::string name_doPosts3 = "Tpetra::Distributor: doPosts(3)";
125  const std::string name_doPosts4 = "Tpetra::Distributor: doPosts(4)";
126  const std::string name_doWaits = "Tpetra::Distributor: doWaits";
127  const std::string name_doPosts3_recvs = "Tpetra::Distributor: doPosts(3): recvs";
128  const std::string name_doPosts4_recvs = "Tpetra::Distributor: doPosts(4): recvs";
129  const std::string name_doPosts3_barrier = "Tpetra::Distributor: doPosts(3): barrier";
130  const std::string name_doPosts4_barrier = "Tpetra::Distributor: doPosts(4): barrier";
131  const std::string name_doPosts3_sends = "Tpetra::Distributor: doPosts(3): sends";
132  const std::string name_doPosts4_sends = "Tpetra::Distributor: doPosts(4): sends";
133 
134  timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
135  timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
136  timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
137  timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
138  timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
139  timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
140  timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
141  timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
142  timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
143  }
144 #endif // TPETRA_DISTRIBUTOR_TIMERS
145 
147  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
148  const Teuchos::RCP<Teuchos::FancyOStream>& out,
149  const Teuchos::RCP<Teuchos::ParameterList>& plist)
150  : comm_ (comm)
151  , out_ (::Tpetra::Details::makeValidVerboseStream (out))
152  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
153  , sendType_ (Details::DISTRIBUTOR_SEND)
154  , barrierBetween_ (barrierBetween_default)
155  , verbose_ (tpetraDistributorDebugDefault)
156  , selfMessage_ (false)
157  , numSends_ (0)
158  , maxSendLength_ (0)
159  , numReceives_ (0)
160  , totalReceiveLength_ (0)
161  , lastRoundBytesSend_ (0)
162  , lastRoundBytesRecv_ (0)
163  , useDistinctTags_ (useDistinctTags_default)
164  {
165  TEUCHOS_ASSERT( ! out_.is_null () );
166 
167  this->setParameterList (plist); // sets verbose_ via Behavior
168 #ifdef TPETRA_DISTRIBUTOR_TIMERS
169  makeTimers ();
170 #endif // TPETRA_DISTRIBUTOR_TIMERS
171  }
172 
174  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm)
175  : Distributor (comm, Teuchos::null, Teuchos::null)
176  {}
177 
179  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
180  const Teuchos::RCP<Teuchos::FancyOStream>& out)
181  : Distributor (comm, out, Teuchos::null)
182  {}
183 
185  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
186  const Teuchos::RCP<Teuchos::ParameterList>& plist)
187  : Distributor (comm, Teuchos::null, plist)
188  {}
189 
191  Distributor (const Distributor& distributor)
192  : comm_ (distributor.comm_)
193  , out_ (distributor.out_)
194  , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
195  , sendType_ (distributor.sendType_)
196  , barrierBetween_ (distributor.barrierBetween_)
197  , verbose_ (distributor.verbose_)
198  , selfMessage_ (distributor.selfMessage_)
199  , numSends_ (distributor.numSends_)
200  , procsTo_ (distributor.procsTo_)
201  , startsTo_ (distributor.startsTo_)
202  , lengthsTo_ (distributor.lengthsTo_)
203  , maxSendLength_ (distributor.maxSendLength_)
204  , indicesTo_ (distributor.indicesTo_)
205  , numReceives_ (distributor.numReceives_)
206  , totalReceiveLength_ (distributor.totalReceiveLength_)
207  , lengthsFrom_ (distributor.lengthsFrom_)
208  , procsFrom_ (distributor.procsFrom_)
209  , startsFrom_ (distributor.startsFrom_)
210  , indicesFrom_ (distributor.indicesFrom_)
211  , reverseDistributor_ (distributor.reverseDistributor_)
212  , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
213  , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
214  , useDistinctTags_ (distributor.useDistinctTags_)
215  {
216  using Teuchos::ParameterList;
217  using Teuchos::RCP;
218  using Teuchos::rcp;
219 
220  TEUCHOS_ASSERT( ! out_.is_null () );
221 
222  RCP<const ParameterList> rhsList = distributor.getParameterList ();
223  RCP<ParameterList> newList = rhsList.is_null () ? Teuchos::null :
224  Teuchos::parameterList (*rhsList);
225  this->setParameterList (newList);
226 
227 #ifdef TPETRA_DISTRIBUTOR_TIMERS
228  makeTimers ();
229 #endif // TPETRA_DISTRIBUTOR_TIMERS
230  }
231 
233  using Teuchos::ParameterList;
234  using Teuchos::parameterList;
235  using Teuchos::RCP;
236 
237  std::swap (comm_, rhs.comm_);
238  std::swap (out_, rhs.out_);
239  std::swap (howInitialized_, rhs.howInitialized_);
240  std::swap (sendType_, rhs.sendType_);
241  std::swap (barrierBetween_, rhs.barrierBetween_);
242  std::swap (verbose_, rhs.verbose_);
243  std::swap (selfMessage_, rhs.selfMessage_);
244  std::swap (numSends_, rhs.numSends_);
245  std::swap (procsTo_, rhs.procsTo_);
246  std::swap (startsTo_, rhs.startsTo_);
247  std::swap (lengthsTo_, rhs.lengthsTo_);
248  std::swap (maxSendLength_, rhs.maxSendLength_);
249  std::swap (indicesTo_, rhs.indicesTo_);
250  std::swap (numReceives_, rhs.numReceives_);
251  std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
252  std::swap (lengthsFrom_, rhs.lengthsFrom_);
253  std::swap (procsFrom_, rhs.procsFrom_);
254  std::swap (startsFrom_, rhs.startsFrom_);
255  std::swap (indicesFrom_, rhs.indicesFrom_);
256  std::swap (reverseDistributor_, rhs.reverseDistributor_);
257  std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
258  std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
259  std::swap (useDistinctTags_, rhs.useDistinctTags_);
260 
261  // Swap parameter lists. If they are the same object, make a deep
262  // copy first, so that modifying one won't modify the other one.
263  RCP<ParameterList> lhsList = this->getNonconstParameterList ();
264  RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
265  if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
266  rhsList = parameterList (*rhsList);
267  }
268  if (! rhsList.is_null ()) {
269  this->setMyParamList (rhsList);
270  }
271  if (! lhsList.is_null ()) {
272  rhs.setMyParamList (lhsList);
273  }
274 
275  // We don't need to swap timers, because all instances of
276  // Distributor use the same timers.
277  }
278 
279  void
281  setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
282  {
283  using ::Tpetra::Details::Behavior;
284  using Teuchos::FancyOStream;
285  using Teuchos::getIntegralValue;
286  using Teuchos::includesVerbLevel;
287  using Teuchos::OSTab;
288  using Teuchos::ParameterList;
289  using Teuchos::parameterList;
290  using Teuchos::RCP;
291  using std::endl;
292 
293  const bool verboseDefault = Behavior::verbose ("Distributor") ||
294  Behavior::verbose ("Tpetra::Distributor");
295 
296  if (plist.is_null ()) {
297  verbose_ = verboseDefault;
298  }
299  else {
300  RCP<const ParameterList> validParams = getValidParameters ();
301  plist->validateParametersAndSetDefaults (*validParams);
302 
303  const bool barrierBetween =
304  plist->get<bool> ("Barrier between receives and sends");
305  const Details::EDistributorSendType sendType =
306  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
307  const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
308  const bool debug = plist->get<bool> ("Debug");
309  {
310  // mfh 03 May 2016: We keep this option only for backwards
311  // compatibility, but it must always be true. See discussion of
312  // Github Issue #227.
313  const bool enable_cuda_rdma =
314  plist->get<bool> ("Enable MPI CUDA RDMA support");
315  TEUCHOS_TEST_FOR_EXCEPTION
316  (! enable_cuda_rdma, std::invalid_argument, "Tpetra::Distributor::"
317  "setParameterList: " << "You specified \"Enable MPI CUDA RDMA "
318  "support\" = false. This is no longer valid. You don't need to "
319  "specify this option any more; Tpetra assumes it is always true. "
320  "This is a very light assumption on the MPI implementation, and in "
321  "fact does not actually involve hardware or system RDMA support. "
322  "Tpetra just assumes that the MPI implementation can tell whether a "
323  "pointer points to host memory or CUDA device memory.");
324  }
325 
326  // We check this property explicitly, since we haven't yet learned
327  // how to make a validator that can cross-check properties.
328  // Later, turn this into a validator so that it can be embedded in
329  // the valid ParameterList and used in Optika.
330  TEUCHOS_TEST_FOR_EXCEPTION
331  (! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
332  std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
333  << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
334  "between receives and sends." << endl << "This is invalid; you must "
335  "include the barrier if you use ready sends." << endl << "Ready sends "
336  "require that their corresponding receives have already been posted, "
337  "and the only way to guarantee that in general is with a barrier.");
338 
339  // Now that we've validated the input list, save the results.
340  sendType_ = sendType;
341  barrierBetween_ = barrierBetween;
342  useDistinctTags_ = useDistinctTags;
343  verbose_ = debug || verboseDefault;
344 
345  // ParameterListAcceptor semantics require pointer identity of the
346  // sublist passed to setParameterList(), so we save the pointer.
347  this->setMyParamList (plist);
348  }
349  }
350 
351  Teuchos::RCP<const Teuchos::ParameterList>
353  {
354  using Teuchos::Array;
355  using Teuchos::ParameterList;
356  using Teuchos::parameterList;
357  using Teuchos::RCP;
358  using Teuchos::setStringToIntegralParameter;
359 
360  const bool barrierBetween = barrierBetween_default;
361  const bool useDistinctTags = useDistinctTags_default;
362  const bool debug = tpetraDistributorDebugDefault;
363 
364  Array<std::string> sendTypes = distributorSendTypes ();
365  const std::string defaultSendType ("Send");
366  Array<Details::EDistributorSendType> sendTypeEnums;
367  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
368  sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
369  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
370  sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
371 
372  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
373  plist->set ("Barrier between receives and sends", barrierBetween,
374  "Whether to execute a barrier between receives and sends in do"
375  "[Reverse]Posts(). Required for correctness when \"Send type\""
376  "=\"Rsend\", otherwise correct but not recommended.");
377  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
378  defaultSendType, "When using MPI, the variant of send to use in "
379  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
380  plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
381  "MPI message tags for different code paths. Highly recommended"
382  " to avoid message collisions.");
383  plist->set ("Debug", debug, "Whether to print copious debugging output on "
384  "all processes.");
385  plist->set ("Timer Label","","Label for Time Monitor output");
386  plist->set ("Enable MPI CUDA RDMA support", true, "Assume that MPI can "
387  "tell whether a pointer points to host memory or CUDA device "
388  "memory. You don't need to specify this option any more; "
389  "Tpetra assumes it is always true. This is a very light "
390  "assumption on the MPI implementation, and in fact does not "
391  "actually involve hardware or system RDMA support.");
392 
393  // mfh 24 Dec 2015: Tpetra no longer inherits from
394  // Teuchos::VerboseObject, so it doesn't need the "VerboseObject"
395  // sublist. However, we retain the "VerboseObject" sublist
396  // anyway, for backwards compatibility (otherwise the above
397  // validation would fail with an invalid parameter name, should
398  // the user still want to provide this list).
399  Teuchos::setupVerboseObjectSublist (&*plist);
400  return Teuchos::rcp_const_cast<const ParameterList> (plist);
401  }
402 
403 
405  { return totalReceiveLength_; }
406 
408  { return numReceives_; }
409 
411  { return selfMessage_; }
412 
414  { return numSends_; }
415 
417  { return maxSendLength_; }
418 
419  Teuchos::ArrayView<const int> Distributor::getProcsFrom() const
420  { return procsFrom_; }
421 
422  Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
423  { return lengthsFrom_; }
424 
425  Teuchos::ArrayView<const int> Distributor::getProcsTo() const
426  { return procsTo_; }
427 
428  Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
429  { return lengthsTo_; }
430 
431  Teuchos::RCP<Distributor>
433  if (reverseDistributor_.is_null ()) {
434  createReverseDistributor ();
435  }
436  TEUCHOS_TEST_FOR_EXCEPTION
437  (reverseDistributor_.is_null (), std::logic_error, "The reverse "
438  "Distributor is null after createReverseDistributor returned. "
439  "Please report this bug to the Tpetra developers.");
440  return reverseDistributor_;
441  }
442 
443 
444  void
445  Distributor::createReverseDistributor() const
446  {
447  reverseDistributor_ = Teuchos::rcp (new Distributor (comm_, out_));
448  reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
449  reverseDistributor_->sendType_ = sendType_;
450  reverseDistributor_->barrierBetween_ = barrierBetween_;
451  reverseDistributor_->verbose_ = verbose_;
452 
453  // The total length of all the sends of this Distributor. We
454  // calculate it because it's the total length of all the receives
455  // of the reverse Distributor.
456  size_t totalSendLength =
457  std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
458 
459  // The maximum length of any of the receives of this Distributor.
460  // We calculate it because it's the maximum length of any of the
461  // sends of the reverse Distributor.
462  size_t maxReceiveLength = 0;
463  const int myProcID = comm_->getRank();
464  for (size_t i=0; i < numReceives_; ++i) {
465  if (procsFrom_[i] != myProcID) {
466  // Don't count receives for messages sent by myself to myself.
467  if (lengthsFrom_[i] > maxReceiveLength) {
468  maxReceiveLength = lengthsFrom_[i];
469  }
470  }
471  }
472 
473  // Initialize all of reverseDistributor's data members. This
474  // mainly just involves flipping "send" and "receive," or the
475  // equivalent "to" and "from."
476 
477  reverseDistributor_->selfMessage_ = selfMessage_;
478  reverseDistributor_->numSends_ = numReceives_;
479  reverseDistributor_->procsTo_ = procsFrom_;
480  reverseDistributor_->startsTo_ = startsFrom_;
481  reverseDistributor_->lengthsTo_ = lengthsFrom_;
482  reverseDistributor_->maxSendLength_ = maxReceiveLength;
483  reverseDistributor_->indicesTo_ = indicesFrom_;
484  reverseDistributor_->numReceives_ = numSends_;
485  reverseDistributor_->totalReceiveLength_ = totalSendLength;
486  reverseDistributor_->lengthsFrom_ = lengthsTo_;
487  reverseDistributor_->procsFrom_ = procsTo_;
488  reverseDistributor_->startsFrom_ = startsTo_;
489  reverseDistributor_->indicesFrom_ = indicesTo_;
490 
491  // requests_: Allocated on demand.
492  // reverseDistributor_: See note below
493 
494  // mfh 31 Mar 2016: These are statistics, kept on calls to
495  // doPostsAndWaits or doReversePostsAndWaits. They weren't here
496  // when I started, and I didn't add them, so I don't know if they
497  // are accurate.
498  reverseDistributor_->lastRoundBytesSend_ = 0;
499  reverseDistributor_->lastRoundBytesRecv_ = 0;
500 
501  reverseDistributor_->useDistinctTags_ = useDistinctTags_;
502 
503  // I am my reverse Distributor's reverse Distributor.
504  // Thus, it would be legit to do the following:
505  //
506  // reverseDistributor_->reverseDistributor_ = Teuchos::rcp (this, false);
507  //
508  // (Note use of a "weak reference" to avoid a circular RCP
509  // dependency.) The only issue is that if users hold on to the
510  // reverse Distributor but let go of the forward one, this
511  // reference won't be valid anymore. However, the reverse
512  // Distributor is really an implementation detail of Distributor
513  // and not meant to be used directly, so we don't need to do this.
514  reverseDistributor_->reverseDistributor_ = Teuchos::null;
515  }
516 
517 
519  using Teuchos::Array;
520  using Teuchos::CommRequest;
521  using Teuchos::FancyOStream;
522  using Teuchos::includesVerbLevel;
523  using Teuchos::is_null;
524  using Teuchos::OSTab;
525  using Teuchos::RCP;
526  using Teuchos::waitAll;
527  using std::endl;
528 
529  Teuchos::OSTab tab (out_);
530 
531 #ifdef TPETRA_DISTRIBUTOR_TIMERS
532  Teuchos::TimeMonitor timeMon (*timer_doWaits_);
533 #endif // TPETRA_DISTRIBUTOR_TIMERS
534 
535  const int myRank = comm_->getRank ();
536 
537  if (verbose_) {
538  std::ostringstream os;
539  os << myRank << ": doWaits: # reqs = "
540  << requests_.size () << endl;
541  *out_ << os.str ();
542  }
543 
544  if (requests_.size() > 0) {
545  waitAll (*comm_, requests_());
546 
547 #ifdef HAVE_TEUCHOS_DEBUG
548  // Make sure that waitAll() nulled out all the requests.
549  for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
550  it != requests_.end(); ++it)
551  {
552  TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
553  Teuchos::typeName(*this) << "::doWaits(): Communication requests "
554  "should all be null aftr calling Teuchos::waitAll() on them, but "
555  "at least one request is not null.");
556  }
557 #endif // HAVE_TEUCHOS_DEBUG
558  // Restore the invariant that requests_.size() is the number of
559  // outstanding nonblocking communication requests.
560  requests_.resize (0);
561  }
562 
563 #ifdef HAVE_TEUCHOS_DEBUG
564  {
565  const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
566  int globalSizeNonzero = 0;
567  Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
568  localSizeNonzero,
569  Teuchos::outArg (globalSizeNonzero));
570  TEUCHOS_TEST_FOR_EXCEPTION(
571  globalSizeNonzero != 0, std::runtime_error,
572  "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
573  "a nonzero number of outstanding posts. There should be none at this "
574  "point. Please report this bug to the Tpetra developers.");
575  }
576 #endif // HAVE_TEUCHOS_DEBUG
577 
578  if (verbose_) {
579  std::ostringstream os;
580  os << myRank << ": doWaits done" << endl;
581  *out_ << os.str ();
582  }
583  }
584 
586  // call doWaits() on the reverse Distributor, if it exists
587  if (! reverseDistributor_.is_null()) {
588  reverseDistributor_->doWaits();
589  }
590  }
591 
592  std::string Distributor::description () const {
593  std::ostringstream out;
594 
595  out << "\"Tpetra::Distributor\": {";
596  const std::string label = this->getObjectLabel ();
597  if (label != "") {
598  out << "Label: " << label << ", ";
599  }
600  out << "How initialized: "
602  << ", Parameters: {"
603  << "Send type: "
604  << DistributorSendTypeEnumToString (sendType_)
605  << ", Barrier between receives and sends: "
606  << (barrierBetween_ ? "true" : "false")
607  << ", Use distinct tags: "
608  << (useDistinctTags_ ? "true" : "false")
609  << ", Debug: " << (verbose_ ? "true" : "false")
610  << "}}";
611  return out.str ();
612  }
613 
614  std::string
615  Distributor::
616  localDescribeToString (const Teuchos::EVerbosityLevel vl) const
617  {
618  using Teuchos::toString;
619  using Teuchos::VERB_HIGH;
620  using Teuchos::VERB_EXTREME;
621  using std::endl;
622 
623  // This preserves current behavior of Distributor.
624  if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
625  return std::string ();
626  }
627 
628  auto outStringP = Teuchos::rcp (new std::ostringstream ());
629  auto outp = Teuchos::getFancyOStream (outStringP); // returns RCP
630  Teuchos::FancyOStream& out = *outp;
631 
632  const int myRank = comm_->getRank ();
633  const int numProcs = comm_->getSize ();
634  out << "Process " << myRank << " of " << numProcs << ":" << endl;
635  Teuchos::OSTab tab1 (out);
636 
637  out << "selfMessage: " << hasSelfMessage () << endl;
638  out << "numSends: " << getNumSends () << endl;
639  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
640  out << "procsTo: " << toString (procsTo_) << endl;
641  out << "lengthsTo: " << toString (lengthsTo_) << endl;
642  out << "maxSendLength: " << getMaxSendLength () << endl;
643  }
644  if (vl == VERB_EXTREME) {
645  out << "startsTo: " << toString (startsTo_) << endl;
646  out << "indicesTo: " << toString (indicesTo_) << endl;
647  }
648  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
649  out << "numReceives: " << getNumReceives () << endl;
650  out << "totalReceiveLength: " << getTotalReceiveLength () << endl;
651  out << "lengthsFrom: " << toString (lengthsFrom_) << endl;
652  out << "startsFrom: " << toString (startsFrom_) << endl;
653  out << "procsFrom: " << toString (procsFrom_) << endl;
654  }
655 
656  out.flush (); // make sure the ostringstream got everything
657  return outStringP->str ();
658  }
659 
660  void
662  describe (Teuchos::FancyOStream &out,
663  const Teuchos::EVerbosityLevel verbLevel) const
664  {
665  using std::endl;
666  using Teuchos::VERB_DEFAULT;
667  using Teuchos::VERB_NONE;
668  using Teuchos::VERB_LOW;
669  using Teuchos::VERB_MEDIUM;
670  using Teuchos::VERB_HIGH;
671  using Teuchos::VERB_EXTREME;
672  const Teuchos::EVerbosityLevel vl =
673  (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
674 
675  if (vl == VERB_NONE) {
676  return; // don't print anything
677  }
678  // If this Distributor's Comm is null, then the the calling
679  // process does not participate in Distributor-related collective
680  // operations with the other processes. In that case, it is not
681  // even legal to call this method. The reasonable thing to do in
682  // that case is nothing.
683  if (comm_.is_null ()) {
684  return;
685  }
686  const int myRank = comm_->getRank ();
687  const int numProcs = comm_->getSize ();
688 
689  // Only Process 0 should touch the output stream, but this method
690  // in general may need to do communication. Thus, we may need to
691  // preserve the current tab level across multiple "if (myRank ==
692  // 0) { ... }" inner scopes. This is why we sometimes create
693  // OSTab instances by pointer, instead of by value. We only need
694  // to create them by pointer if the tab level must persist through
695  // multiple inner scopes.
696  Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
697 
698  if (myRank == 0) {
699  // At every verbosity level but VERB_NONE, Process 0 prints.
700  // By convention, describe() always begins with a tab before
701  // printing.
702  tab0 = Teuchos::rcp (new Teuchos::OSTab (out));
703  // We quote the class name because it contains colons.
704  // This makes the output valid YAML.
705  out << "\"Tpetra::Distributor\":" << endl;
706  tab1 = Teuchos::rcp (new Teuchos::OSTab (out));
707 
708  const std::string label = this->getObjectLabel ();
709  if (label != "") {
710  out << "Label: " << label << endl;
711  }
712  out << "Number of processes: " << numProcs << endl
713  << "How initialized: "
715  << endl;
716  {
717  out << "Parameters: " << endl;
718  Teuchos::OSTab tab2 (out);
719  out << "\"Send type\": "
720  << DistributorSendTypeEnumToString (sendType_) << endl
721  << "\"Barrier between receives and sends\": "
722  << (barrierBetween_ ? "true" : "false") << endl
723  << "\"Use distinct tags\": "
724  << (useDistinctTags_ ? "true" : "false") << endl
725  << "\"Debug\": " << (verbose_ ? "true" : "false") << endl;
726  }
727  } // if myRank == 0
728 
729  // This is collective over the Map's communicator.
730  if (vl > VERB_LOW) {
731  const std::string lclStr = this->localDescribeToString (vl);
732  Tpetra::Details::gathervPrint (out, lclStr, *comm_);
733  }
734 
735  out << "Reverse Distributor:";
736  if (reverseDistributor_.is_null ()) {
737  out << " null" << endl;
738  }
739  else {
740  out << endl;
741  reverseDistributor_->describe (out, vl);
742  }
743  }
744 
745  void
746  Distributor::
747  computeReceives ()
748  {
749  using Teuchos::Array;
750  using Teuchos::ArrayRCP;
751  using Teuchos::as;
752  using Teuchos::CommStatus;
753  using Teuchos::CommRequest;
754  using Teuchos::ireceive;
755  using Teuchos::RCP;
756  using Teuchos::rcp;
757  using Teuchos::REDUCE_SUM;
758  using Teuchos::receive;
759  using Teuchos::reduce;
760  using Teuchos::scatter;
761  using Teuchos::send;
762  using Teuchos::waitAll;
763  using std::endl;
764 
765  Teuchos::OSTab tab (out_);
766  const int myRank = comm_->getRank();
767  const int numProcs = comm_->getSize();
768 
769  // MPI tag for nonblocking receives and blocking sends in this method.
770  const int pathTag = 2;
771  const int tag = this->getTag (pathTag);
772 
773  std::unique_ptr<std::string> prefix;
774  if (verbose_) {
775  std::ostringstream os;
776  os << "Proc " << myRank << ": computeReceives: ";
777  prefix = std::unique_ptr<std::string> (new std::string (os.str ()));
778  os << "{selfMessage_: " << (selfMessage_ ? "true" : "false")
779  << ", tag: " << tag << "}" << endl;
780  *out_ << os.str ();
781  }
782 
783  // toProcsFromMe[i] == the number of messages sent by this process
784  // to process i. The data in numSends_, procsTo_, and lengthsTo_
785  // concern the contiguous sends. Therefore, each process will be
786  // listed in procsTo_ at most once, and so toProcsFromMe[i] will
787  // either be 0 or 1.
788  {
789  Array<int> toProcsFromMe (numProcs, 0);
790 #ifdef HAVE_TEUCHOS_DEBUG
791  bool counting_error = false;
792 #endif // HAVE_TEUCHOS_DEBUG
793  for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
794 #ifdef HAVE_TEUCHOS_DEBUG
795  if (toProcsFromMe[procsTo_[i]] != 0) {
796  counting_error = true;
797  }
798 #endif // HAVE_TEUCHOS_DEBUG
799  toProcsFromMe[procsTo_[i]] = 1;
800  }
801 #ifdef HAVE_TEUCHOS_DEBUG
802  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
803  "Tpetra::Distributor::computeReceives: There was an error on at least "
804  "one process in counting the number of messages send by that process to "
805  "the other processs. Please report this bug to the Tpetra developers.",
806  *comm_);
807 #endif // HAVE_TEUCHOS_DEBUG
808 
809  if (verbose_) {
810  std::ostringstream os;
811  os << *prefix << "Reduce & scatter" << endl;
812  *out_ << os.str ();
813  }
814 
815  // Compute the number of receives that this process needs to
816  // post. The number of receives includes any self sends (i.e.,
817  // messages sent by this process to itself).
818  //
819  // (We will use numReceives_ this below to post exactly that
820  // number of receives, with MPI_ANY_SOURCE as the sending rank.
821  // This will tell us from which processes this process expects
822  // to receive, and how many packets of data we expect to receive
823  // from each process.)
824  //
825  // toProcsFromMe[i] is the number of messages sent by this
826  // process to process i. Compute the sum (elementwise) of all
827  // the toProcsFromMe arrays on all processes in the
828  // communicator. If the array x is that sum, then if this
829  // process has rank j, x[j] is the number of messages sent
830  // to process j, that is, the number of receives on process j
831  // (including any messages sent by process j to itself).
832  //
833  // Yes, this requires storing and operating on an array of
834  // length P, where P is the number of processes in the
835  // communicator. Epetra does this too. Avoiding this O(P)
836  // memory bottleneck would require some research.
837  //
838  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
839  // implement this O(P) memory algorithm.
840  //
841  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
842  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
843  // Then, scatter the latter, so that each process p gets
844  // numRecvsOnEachProc[p].
845  //
846  // 2. Like #1, but use MPI_Reduce_scatter instead of
847  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
848  // optimized to reduce the number of messages, but
849  // MPI_Reduce_scatter is more general than we need (it
850  // allows the equivalent of MPI_Scatterv). See Bug 6336.
851  //
852  // 3. Do an all-reduce on toProcsFromMe, and let my process
853  // (with rank myRank) get numReceives_ from
854  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
855  // all-reduce method.
856  //
857  // Approaches 1 and 3 have the same critical path length.
858  // However, #3 moves more data. This is because the final
859  // result is just one integer, but #3 moves a whole array of
860  // results to all the processes. This is why we use Approach 1
861  // here.
862  //
863  // mfh 12 Apr 2013: See discussion in createFromSends() about
864  // how we could use this communication to propagate an error
865  // flag for "free" in a release build.
866 
867  const int root = 0; // rank of root process of the reduction
868  Array<int> numRecvsOnEachProc; // temp; only needed on root
869  if (myRank == root) {
870  numRecvsOnEachProc.resize (numProcs);
871  }
872  int numReceivesAsInt = 0; // output
873  reduce<int, int> (toProcsFromMe.getRawPtr (),
874  numRecvsOnEachProc.getRawPtr (),
875  numProcs, REDUCE_SUM, root, *comm_);
876  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
877  &numReceivesAsInt, 1, root, *comm_);
878  numReceives_ = static_cast<size_t> (numReceivesAsInt);
879  }
880 
881  // Now we know numReceives_, which is this process' number of
882  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
883  // with this number of entries.
884  lengthsFrom_.assign (numReceives_, 0);
885  procsFrom_.assign (numReceives_, 0);
886 
887  //
888  // Ask (via nonblocking receive) each process from which we are
889  // receiving how many packets we should expect from it in the
890  // communication pattern.
891  //
892 
893  // At this point, numReceives_ includes any self message that
894  // there may be. At the end of this routine, we'll subtract off
895  // the self message (if there is one) from numReceives_. In this
896  // routine, we don't need to receive a message from ourselves in
897  // order to figure out our lengthsFrom_ and source process ID; we
898  // can just ask ourselves directly. Thus, the actual number of
899  // nonblocking receives we post here does not include the self
900  // message.
901  const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
902 
903  // Teuchos' wrapper for nonblocking receives requires receive
904  // buffers that it knows won't go away. This is why we use RCPs,
905  // one RCP per nonblocking receive request. They get allocated in
906  // the loop below.
907  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
908  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
909  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
910 
911  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
912  // (receive data from any process).
913 #ifdef HAVE_MPI
914  const int anySourceProc = MPI_ANY_SOURCE;
915 #else
916  const int anySourceProc = -1;
917 #endif
918 
919  if (verbose_) {
920  std::ostringstream os;
921  os << *prefix << "Post " << actualNumReceives << " irecv"
922  << (actualNumReceives != size_t (1) ? "s" : "") << endl;
923  *out_ << os.str ();
924  }
925 
926  // Post the (nonblocking) receives.
927  for (size_t i = 0; i < actualNumReceives; ++i) {
928  // Once the receive completes, we can ask the corresponding
929  // CommStatus object (output by wait()) for the sending process'
930  // ID (which we'll assign to procsFrom_[i] -- don't forget to
931  // do that!).
932  lengthsFromBuffers[i].resize (1);
933  lengthsFromBuffers[i][0] = as<size_t> (0);
934  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
935  tag, *comm_);
936  if (verbose_) {
937  std::ostringstream os;
938  os << *prefix << "Posted any-proc irecv w/ tag " << tag << endl;
939  *out_ << os.str ();
940  }
941  }
942 
943  if (verbose_) {
944  std::ostringstream os;
945  os << *prefix << "Post " << numSends_ << " send"
946  << (numSends_ != size_t (1) ? "s" : "") << endl;
947  *out_ << os.str ();
948  }
949  // Post the sends: Tell each process to which we are sending how
950  // many packets it should expect from us in the communication
951  // pattern. We could use nonblocking sends here, as long as we do
952  // a waitAll() on all the sends and receives at once.
953  //
954  // We assume that numSends_ and selfMessage_ have already been
955  // set. The value of numSends_ (my process' number of sends) does
956  // not include any message that it might send to itself.
957  for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
958  if (procsTo_[i] != myRank) {
959  // Send a message to procsTo_[i], telling that process that
960  // this communication pattern will send that process
961  // lengthsTo_[i] blocks of packets.
962  const size_t* const lengthsTo_i = &lengthsTo_[i];
963  send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
964  if (verbose_) {
965  std::ostringstream os;
966  os << *prefix << "Posted send to Proc " << procsTo_[i] << " w/ tag "
967  << tag << endl;
968  *out_ << os.str ();
969  }
970  }
971  else {
972  // We don't need a send in the self-message case. If this
973  // process will send a message to itself in the communication
974  // pattern, then the last element of lengthsFrom_ and
975  // procsFrom_ corresponds to the self-message. Of course
976  // this process knows how long the message is, and the process
977  // ID is its own process ID.
978  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
979  procsFrom_[numReceives_-1] = myRank;
980  }
981  }
982 
983  if (verbose_) {
984  std::ostringstream os;
985  os << myRank << ": computeReceives: waitAll on "
986  << requests.size () << " requests" << endl;
987  *out_ << os.str ();
988  }
989  //
990  // Wait on all the receives. When they arrive, check the status
991  // output of wait() for the receiving process ID, unpack the
992  // request buffers into lengthsFrom_, and set procsFrom_ from the
993  // status.
994  //
995  waitAll (*comm_, requests (), statuses ());
996  for (size_t i = 0; i < actualNumReceives; ++i) {
997  lengthsFrom_[i] = *lengthsFromBuffers[i];
998  procsFrom_[i] = statuses[i]->getSourceRank ();
999  }
1000 
1001  // Sort the procsFrom_ array, and apply the same permutation to
1002  // lengthsFrom_. This ensures that procsFrom_[i] and
1003  // lengthsFrom_[i] refers to the same thing.
1004  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1005 
1006  // Compute indicesFrom_
1007  totalReceiveLength_ =
1008  std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
1009  indicesFrom_.clear ();
1010  // NOTE (mfh 13 Feb 2019): Epetra_MpiDistributor deliberately does
1011  // _not_ fill indicesFrom_ (what it calls "indices_from_") like
1012  // this; it leaves indicesFrom_ empty. The comment there mentions
1013  // that not filling indicesFrom_ helps reverse mode correctness.
1014 #if 0
1015  indicesFrom_.reserve (totalReceiveLength_);
1016  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1017  indicesFrom_.push_back(i);
1018  }
1019 #endif // 0
1020 
1021  startsFrom_.clear ();
1022  startsFrom_.reserve (numReceives_);
1023  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
1024  startsFrom_.push_back(j);
1025  j += lengthsFrom_[i];
1026  }
1027 
1028  if (selfMessage_) {
1029  --numReceives_;
1030  }
1031 
1032  if (verbose_) {
1033  std::ostringstream os;
1034  os << *prefix << "Done!" << endl;
1035  *out_ << os.str ();
1036  }
1037  }
1038 
1039  size_t
1041  createFromSends (const Teuchos::ArrayView<const int>& exportProcIDs)
1042  {
1043  using Teuchos::outArg;
1044  using Teuchos::REDUCE_MAX;
1045  using Teuchos::reduceAll;
1046  using std::endl;
1047  const char rawPrefix[] = "Tpetra::Distributor::createFromSends: ";
1048 
1049  Teuchos::OSTab tab (out_);
1050  const size_t numExports = exportProcIDs.size();
1051  const int myProcID = comm_->getRank();
1052  const int numProcs = comm_->getSize();
1053 
1054  std::unique_ptr<std::string> prefix;
1055  if (verbose_) {
1056  std::ostringstream os;
1057  os << "Proc " << myProcID << ": " << rawPrefix << ": ";
1058  prefix = std::unique_ptr<std::string> (new std::string (os.str ()));
1059  os << "exportPIDs: " << exportProcIDs << endl;
1060  *out_ << os.str ();
1061  }
1062 
1063  // exportProcIDs tells us the communication pattern for this
1064  // distributor. It dictates the way that the export data will be
1065  // interpreted in doPosts(). We want to perform at most one
1066  // send per process in doPosts; this is for two reasons:
1067  // * minimize latency / overhead in the comm routines (nice)
1068  // * match the number of receives and sends between processes
1069  // (necessary)
1070  //
1071  // Teuchos::Comm requires that the data for a send are contiguous
1072  // in a send buffer. Therefore, if the data in the send buffer
1073  // for doPosts() are not contiguous, they will need to be copied
1074  // into a contiguous buffer. The user has specified this
1075  // noncontiguous pattern and we can't do anything about it.
1076  // However, if they do not provide an efficient pattern, we will
1077  // warn them if one of the following compile-time options has been
1078  // set:
1079  // * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
1080  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
1081  //
1082  // If the data are contiguous, then we can post the sends in situ
1083  // (i.e., without needing to copy them into a send buffer).
1084  //
1085  // Determine contiguity. There are a number of ways to do this:
1086  // * If the export IDs are sorted, then all exports to a
1087  // particular proc must be contiguous. This is what Epetra does.
1088  // * If the export ID of the current export already has been
1089  // listed, then the previous listing should correspond to the
1090  // same export. This tests contiguity, but not sortedness.
1091  //
1092  // Both of these tests require O(n), where n is the number of
1093  // exports. However, the latter will positively identify a greater
1094  // portion of contiguous patterns. We use the latter method.
1095  //
1096  // Check to see if values are grouped by procs without gaps
1097  // If so, indices_to -> 0.
1098 
1099  // Set up data structures for quick traversal of arrays.
1100  // This contains the number of sends for each process ID.
1101  //
1102  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
1103  // that create an array of length the number of processes in the
1104  // communicator (plus one). Given how this code uses this array,
1105  // it should be straightforward to replace it with a hash table or
1106  // some other more space-efficient data structure. In practice,
1107  // most of the entries of starts should be zero for a sufficiently
1108  // large process count, unless the communication pattern is dense.
1109  // Note that it's important to be able to iterate through keys (i
1110  // for which starts[i] is nonzero) in increasing order.
1111  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1112 
1113  // numActive is the number of sends that are not Null
1114  size_t numActive = 0;
1115  int needSendBuff = 0; // Boolean
1116 
1117 #ifdef HAVE_TPETRA_DEBUG
1118  int badID = -1; // only used in a debug build
1119 #endif // HAVE_TPETRA_DEBUG
1120  for (size_t i = 0; i < numExports; ++i) {
1121  const int exportID = exportProcIDs[i];
1122  if (exportID >= numProcs) {
1123 #ifdef HAVE_TPETRA_DEBUG
1124  badID = myProcID;
1125 #endif // HAVE_TPETRA_DEBUG
1126  break;
1127  }
1128  else if (exportID >= 0) {
1129  // exportID is a valid process ID. Increment the number of
1130  // messages this process will send to that process.
1131  ++starts[exportID];
1132 
1133  // If we're sending more than one message to process exportID,
1134  // then it is possible that the data are not contiguous.
1135  // Check by seeing if the previous process ID in the list
1136  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
1137  // because if starts[exportID] > 1, then i must be > 1 (since
1138  // the starts array was filled with zeros initially).
1139 
1140  // null entries break continuity.
1141  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
1142  if (needSendBuff == 0 && starts[exportID] > 1 &&
1143  exportID != exportProcIDs[i-1]) {
1144  needSendBuff = 1;
1145  }
1146  ++numActive;
1147  }
1148  }
1149 
1150 #ifdef HAVE_TPETRA_DEBUG
1151  // Test whether any process in the communicator got an invalid
1152  // process ID. If badID != -1 on this process, then it equals
1153  // this process' rank. The max of all badID over all processes is
1154  // the max rank which has an invalid process ID.
1155  {
1156  int gbl_badID;
1157  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1158  TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1159  Teuchos::typeName(*this) << "::createFromSends: Proc " << gbl_badID
1160  << ", perhaps among other processes, got a bad send process ID.");
1161  }
1162 #else
1163  // FIXME (mfh 12 Apr 2013, 15 Jul 2015) Rather than simply
1164  // ignoring this information, we should think about how to pass it
1165  // along so that all the processes find out about it. In a
1166  // release build with efficiency warnings turned off, the next
1167  // collective communication happens in computeReceives(). We
1168  // could figure out how to encode the error flag in that
1169  // operation, for example by adding an extra entry to the
1170  // collective's output array that encodes the error condition (0
1171  // on all processes if no error, else 1 on any process with the
1172  // error, so that the sum will produce a nonzero value if any
1173  // process had an error). I'll defer this change for now and
1174  // recommend instead that people with troubles try a debug build.
1175 #endif // HAVE_TPETRA_DEBUG
1176 
1177 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1178  {
1179  int global_needSendBuff;
1180  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1181  outArg (global_needSendBuff));
1183  global_needSendBuff != 0, std::runtime_error,
1184  "::createFromSends: Grouping export IDs together by process rank often "
1185  "improves performance.");
1186  }
1187 #endif
1188 
1189  // Determine from the caller's data whether or not the current
1190  // process should send (a) message(s) to itself.
1191  if (starts[myProcID] != 0) {
1192  selfMessage_ = true;
1193  }
1194  else {
1195  selfMessage_ = false;
1196  }
1197 
1198 #ifdef HAVE_TEUCHOS_DEBUG
1199  bool index_neq_numActive = false;
1200  bool send_neq_numSends = false;
1201 #endif
1202  if (! needSendBuff) {
1203  // grouped by proc, no send buffer or indicesTo_ needed
1204  numSends_ = 0;
1205  // Count total number of sends, i.e., total number of procs to
1206  // which we are sending. This includes myself, if applicable.
1207  for (int i = 0; i < numProcs; ++i) {
1208  if (starts[i]) {
1209  ++numSends_;
1210  }
1211  }
1212 
1213  // Not only do we not need these, but we must clear them, as
1214  // empty status of indicesTo is a flag used later.
1215  indicesTo_.resize(0);
1216  // Size these to numSends_; note, at the moment, numSends_
1217  // includes self sends. Set their values to zeros.
1218  procsTo_.assign(numSends_,0);
1219  startsTo_.assign(numSends_,0);
1220  lengthsTo_.assign(numSends_,0);
1221 
1222  // set startsTo to the offset for each send (i.e., each proc ID)
1223  // set procsTo to the proc ID for each send
1224  // in interpreting this code, remember that we are assuming contiguity
1225  // that is why index skips through the ranks
1226  {
1227  size_t index = 0, procIndex = 0;
1228  for (size_t i = 0; i < numSends_; ++i) {
1229  while (exportProcIDs[procIndex] < 0) {
1230  ++procIndex; // skip all negative proc IDs
1231  }
1232  startsTo_[i] = procIndex;
1233  int procID = exportProcIDs[procIndex];
1234  procsTo_[i] = procID;
1235  index += starts[procID];
1236  procIndex += starts[procID];
1237  }
1238 #ifdef HAVE_TEUCHOS_DEBUG
1239  if (index != numActive) {
1240  index_neq_numActive = true;
1241  }
1242 #endif
1243  }
1244  // sort the startsTo and proc IDs together, in ascending order, according
1245  // to proc IDs
1246  if (numSends_ > 0) {
1247  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1248  }
1249  // compute the maximum send length
1250  maxSendLength_ = 0;
1251  for (size_t i = 0; i < numSends_; ++i) {
1252  int procID = procsTo_[i];
1253  lengthsTo_[i] = starts[procID];
1254  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1255  maxSendLength_ = lengthsTo_[i];
1256  }
1257  }
1258  }
1259  else {
1260  // not grouped by proc, need send buffer and indicesTo_
1261 
1262  // starts[i] is the number of sends to proc i
1263  // numActive equals number of sends total, \sum_i starts[i]
1264 
1265  // this loop starts at starts[1], so explicitly check starts[0]
1266  if (starts[0] == 0 ) {
1267  numSends_ = 0;
1268  }
1269  else {
1270  numSends_ = 1;
1271  }
1272  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1273  im1=starts.begin();
1274  i != starts.end(); ++i)
1275  {
1276  if (*i != 0) ++numSends_;
1277  *i += *im1;
1278  im1 = i;
1279  }
1280  // starts[i] now contains the number of exports to procs 0 through i
1281 
1282  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1283  i=starts.rbegin()+1;
1284  i != starts.rend(); ++i)
1285  {
1286  *ip1 = *i;
1287  ip1 = i;
1288  }
1289  starts[0] = 0;
1290  // starts[i] now contains the number of exports to procs 0 through
1291  // i-1, i.e., all procs before proc i
1292 
1293  indicesTo_.resize(numActive);
1294 
1295  for (size_t i = 0; i < numExports; ++i) {
1296  if (exportProcIDs[i] >= 0) {
1297  // record the offset to the sendBuffer for this export
1298  indicesTo_[starts[exportProcIDs[i]]] = i;
1299  // now increment the offset for this proc
1300  ++starts[exportProcIDs[i]];
1301  }
1302  }
1303  // our send buffer will contain the export data for each of the procs
1304  // we communicate with, in order by proc id
1305  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
1306  // indicesTo now maps each export to the location in our send buffer
1307  // associated with the export
1308  // data for export i located at sendBuffer[indicesTo[i]]
1309  //
1310  // starts[i] once again contains the number of exports to
1311  // procs 0 through i
1312  for (int proc = numProcs-1; proc != 0; --proc) {
1313  starts[proc] = starts[proc-1];
1314  }
1315  starts.front() = 0;
1316  starts[numProcs] = numActive;
1317  //
1318  // starts[proc] once again contains the number of exports to
1319  // procs 0 through proc-1
1320  // i.e., the start of my data in the sendBuffer
1321 
1322  // this contains invalid data at procs we don't care about, that is okay
1323  procsTo_.resize(numSends_);
1324  startsTo_.resize(numSends_);
1325  lengthsTo_.resize(numSends_);
1326 
1327  // for each group of sends/exports, record the destination proc,
1328  // the length, and the offset for this send into the
1329  // send buffer (startsTo_)
1330  maxSendLength_ = 0;
1331  size_t snd = 0;
1332  for (int proc = 0; proc < numProcs; ++proc ) {
1333  if (starts[proc+1] != starts[proc]) {
1334  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1335  startsTo_[snd] = starts[proc];
1336  // record max length for all off-proc sends
1337  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1338  maxSendLength_ = lengthsTo_[snd];
1339  }
1340  procsTo_[snd] = proc;
1341  ++snd;
1342  }
1343  }
1344 #ifdef HAVE_TEUCHOS_DEBUG
1345  if (snd != numSends_) {
1346  send_neq_numSends = true;
1347  }
1348 #endif
1349  }
1350 #ifdef HAVE_TEUCHOS_DEBUG
1351  SHARED_TEST_FOR_EXCEPTION(index_neq_numActive, std::logic_error,
1352  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1353  SHARED_TEST_FOR_EXCEPTION(send_neq_numSends, std::logic_error,
1354  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1355 #endif
1356 
1357  if (selfMessage_) --numSends_;
1358 
1359  // Invert map to see what msgs are received and what length
1360  computeReceives();
1361 
1362  if (verbose_) {
1363  std::ostringstream os;
1364  os << *prefix << "Done!" << endl;
1365  *out_ << os.str ();
1366  }
1367 
1368  // createFromRecvs() calls createFromSends(), but will set
1369  // howInitialized_ again after calling createFromSends().
1370  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1371 
1372  return totalReceiveLength_;
1373  }
1374 
1375  void
1377  createFromSendsAndRecvs (const Teuchos::ArrayView<const int>& exportProcIDs,
1378  const Teuchos::ArrayView<const int>& remoteProcIDs)
1379  {
1380  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
1381  // an entry for each GID. If the export/remoteProcIDs is taken from
1382  // the getProcs{From|To} lists that are extracted from a previous distributor,
1383  // it will generate a wrong answer, because those lists have a unique entry
1384  // for each processor id. A version of this with lengthsTo and lengthsFrom
1385  // should be made.
1386 
1387  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1388 
1389 
1390  int myProcID = comm_->getRank ();
1391  int numProcs = comm_->getSize();
1392 
1393  const size_t numExportIDs = exportProcIDs.size();
1394  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1395 
1396  size_t numActive = 0;
1397  int needSendBuff = 0; // Boolean
1398 
1399  for(size_t i = 0; i < numExportIDs; i++ )
1400  {
1401  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1402  needSendBuff = 1;
1403  if( exportProcIDs[i] >= 0 )
1404  {
1405  ++starts[ exportProcIDs[i] ];
1406  ++numActive;
1407  }
1408  }
1409 
1410  selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1411 
1412  numSends_ = 0;
1413 
1414  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
1415  {
1416  if (starts[0] == 0 ) {
1417  numSends_ = 0;
1418  }
1419  else {
1420  numSends_ = 1;
1421  }
1422  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1423  im1=starts.begin();
1424  i != starts.end(); ++i)
1425  {
1426  if (*i != 0) ++numSends_;
1427  *i += *im1;
1428  im1 = i;
1429  }
1430  // starts[i] now contains the number of exports to procs 0 through i
1431 
1432  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1433  i=starts.rbegin()+1;
1434  i != starts.rend(); ++i)
1435  {
1436  *ip1 = *i;
1437  ip1 = i;
1438  }
1439  starts[0] = 0;
1440  // starts[i] now contains the number of exports to procs 0 through
1441  // i-1, i.e., all procs before proc i
1442 
1443  indicesTo_.resize(numActive);
1444 
1445  for (size_t i = 0; i < numExportIDs; ++i) {
1446  if (exportProcIDs[i] >= 0) {
1447  // record the offset to the sendBuffer for this export
1448  indicesTo_[starts[exportProcIDs[i]]] = i;
1449  // now increment the offset for this proc
1450  ++starts[exportProcIDs[i]];
1451  }
1452  }
1453  for (int proc = numProcs-1; proc != 0; --proc) {
1454  starts[proc] = starts[proc-1];
1455  }
1456  starts.front() = 0;
1457  starts[numProcs] = numActive;
1458  procsTo_.resize(numSends_);
1459  startsTo_.resize(numSends_);
1460  lengthsTo_.resize(numSends_);
1461  maxSendLength_ = 0;
1462  size_t snd = 0;
1463  for (int proc = 0; proc < numProcs; ++proc ) {
1464  if (starts[proc+1] != starts[proc]) {
1465  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1466  startsTo_[snd] = starts[proc];
1467  // record max length for all off-proc sends
1468  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1469  maxSendLength_ = lengthsTo_[snd];
1470  }
1471  procsTo_[snd] = proc;
1472  ++snd;
1473  }
1474  }
1475  }
1476  else {
1477  // grouped by proc, no send buffer or indicesTo_ needed
1478  numSends_ = 0;
1479  // Count total number of sends, i.e., total number of procs to
1480  // which we are sending. This includes myself, if applicable.
1481  for (int i = 0; i < numProcs; ++i) {
1482  if (starts[i]) {
1483  ++numSends_;
1484  }
1485  }
1486 
1487  // Not only do we not need these, but we must clear them, as
1488  // empty status of indicesTo is a flag used later.
1489  indicesTo_.resize(0);
1490  // Size these to numSends_; note, at the moment, numSends_
1491  // includes self sends. Set their values to zeros.
1492  procsTo_.assign(numSends_,0);
1493  startsTo_.assign(numSends_,0);
1494  lengthsTo_.assign(numSends_,0);
1495 
1496  // set startsTo to the offset for each send (i.e., each proc ID)
1497  // set procsTo to the proc ID for each send
1498  // in interpreting this code, remember that we are assuming contiguity
1499  // that is why index skips through the ranks
1500  {
1501  size_t index = 0, procIndex = 0;
1502  for (size_t i = 0; i < numSends_; ++i) {
1503  while (exportProcIDs[procIndex] < 0) {
1504  ++procIndex; // skip all negative proc IDs
1505  }
1506  startsTo_[i] = procIndex;
1507  int procID = exportProcIDs[procIndex];
1508  procsTo_[i] = procID;
1509  index += starts[procID];
1510  procIndex += starts[procID];
1511  }
1512  }
1513  // sort the startsTo and proc IDs together, in ascending order, according
1514  // to proc IDs
1515  if (numSends_ > 0) {
1516  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1517  }
1518  // compute the maximum send length
1519  maxSendLength_ = 0;
1520  for (size_t i = 0; i < numSends_; ++i) {
1521  int procID = procsTo_[i];
1522  lengthsTo_[i] = starts[procID];
1523  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1524  maxSendLength_ = lengthsTo_[i];
1525  }
1526  }
1527  }
1528 
1529 
1530  numSends_ -= selfMessage_;
1531  std::vector<int> recv_list;
1532  recv_list.reserve(numSends_); //reserve an initial guess for size needed
1533 
1534  int last_pid=-2;
1535  for(int i=0; i<remoteProcIDs.size(); i++) {
1536  if(remoteProcIDs[i]>last_pid) {
1537  recv_list.push_back(remoteProcIDs[i]);
1538  last_pid = remoteProcIDs[i];
1539  }
1540  else if (remoteProcIDs[i]<last_pid)
1541  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1542  }
1543  numReceives_ = recv_list.size();
1544  if(numReceives_) {
1545  procsFrom_.assign(numReceives_,0);
1546  lengthsFrom_.assign(numReceives_,0);
1547  indicesFrom_.assign(numReceives_,0);
1548  startsFrom_.assign(numReceives_,0);
1549  }
1550  for(size_t i=0,j=0; i<numReceives_; ++i) {
1551  int jlast=j;
1552  procsFrom_[i] = recv_list[i];
1553  startsFrom_[i] = j;
1554  for( ; j<(size_t)remoteProcIDs.size() &&
1555  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1556  lengthsFrom_[i] = j-jlast;
1557  }
1558  totalReceiveLength_ = remoteProcIDs.size();
1559  indicesFrom_.clear ();
1560  // NOTE (mfh 13 Feb 2019): Epetra_MpiDistributor deliberately does
1561  // _not_ fill indicesFrom_ (what it calls "indices_from_") like
1562  // this; it leaves indicesFrom_ empty. The comment there mentions
1563  // that not filling indicesFrom_ helps reverse mode correctness.
1564 #if 0
1565  indicesFrom_.reserve (totalReceiveLength_);
1566  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1567  indicesFrom_.push_back(i);
1568  }
1569 #endif // 0
1570  numReceives_-=selfMessage_;
1571  }
1572 
1573 } // namespace Tpetra
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
Sets up and executes a communication plan for a Tpetra DistObject.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
size_t getNumReceives() const
The number of processes from which we will receive data.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
Teuchos::RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.
std::string description() const
Return a one-line description of this object.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
size_t getNumSends() const
The number of processes to which we will send data.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
Implementation details of Tpetra.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator,...
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.