Tpetra parallel linear algebra  Version of the Day
Tpetra_Distributor.cpp
1 // @HEADER
2 // ***********************************************************************
3 //
4 // Tpetra: Templated Linear Algebra Services Package
5 // Copyright (2008) Sandia Corporation
6 //
7 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
8 // the U.S. Government retains certain rights in this software.
9 //
10 // Redistribution and use in source and binary forms, with or without
11 // modification, are permitted provided that the following conditions are
12 // met:
13 //
14 // 1. Redistributions of source code must retain the above copyright
15 // notice, this list of conditions and the following disclaimer.
16 //
17 // 2. Redistributions in binary form must reproduce the above copyright
18 // notice, this list of conditions and the following disclaimer in the
19 // documentation and/or other materials provided with the distribution.
20 //
21 // 3. Neither the name of the Corporation nor the names of the
22 // contributors may be used to endorse or promote products derived from
23 // this software without specific prior written permission.
24 //
25 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
26 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
28 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
29 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 //
37 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
38 //
39 // ************************************************************************
40 // @HEADER
41 
42 #include "Tpetra_Distributor.hpp"
43 #include "Teuchos_StandardParameterEntryValidators.hpp"
44 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
45 
46 
47 namespace Tpetra {
48  namespace Details {
49  std::string
51  {
52  if (sendType == DISTRIBUTOR_ISEND) {
53  return "Isend";
54  }
55  else if (sendType == DISTRIBUTOR_RSEND) {
56  return "Rsend";
57  }
58  else if (sendType == DISTRIBUTOR_SEND) {
59  return "Send";
60  }
61  else if (sendType == DISTRIBUTOR_SSEND) {
62  return "Ssend";
63  }
64  else {
65  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
66  "EDistributorSendType enum value " << sendType << ".");
67  }
68  }
69 
70  std::string
72  {
73  switch (how) {
74  case Details::DISTRIBUTOR_NOT_INITIALIZED:
75  return "Not initialized yet";
76  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
77  return "By createFromSends";
78  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
79  return "By createFromRecvs";
80  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
81  return "By createReverseDistributor";
82  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
83  return "By copy constructor";
84  default:
85  return "INVALID";
86  }
87  }
88  } // namespace Details
89 
90  Array<std::string>
92  {
93  Array<std::string> sendTypes;
94  sendTypes.push_back ("Isend");
95  sendTypes.push_back ("Rsend");
96  sendTypes.push_back ("Send");
97  sendTypes.push_back ("Ssend");
98  return sendTypes;
99  }
100 
101  // We set default values of Distributor's Boolean parameters here,
102  // in this one place. That way, if we want to change the default
103  // value of a parameter, we don't have to search the whole file to
104  // ensure a consistent setting.
105  namespace {
106  // Default value of the "Debug" parameter.
107  const bool tpetraDistributorDebugDefault = false;
108  // Default value of the "Barrier between receives and sends" parameter.
109  const bool barrierBetween_default = false;
110  // Default value of the "Use distinct tags" parameter.
111  const bool useDistinctTags_default = true;
112  // Default value of the "Enable MPI CUDA RDMA support"
113 #ifdef TPETRA_ENABLE_MPI_CUDA_RDMA
114  const bool enable_cuda_rdma_default = true;
115 #else
116  const bool enable_cuda_rdma_default = false;
117 #endif
118  } // namespace (anonymous)
119 
120  int Distributor::getTag (const int pathTag) const {
121  return useDistinctTags_ ? pathTag : comm_->getTag ();
122  }
123 
124 
125 #ifdef TPETRA_DISTRIBUTOR_TIMERS
126  void Distributor::makeTimers () {
127  const std::string name_doPosts3 = "Tpetra::Distributor: doPosts(3)";
128  const std::string name_doPosts4 = "Tpetra::Distributor: doPosts(4)";
129  const std::string name_doWaits = "Tpetra::Distributor: doWaits";
130  const std::string name_doPosts3_recvs = "Tpetra::Distributor: doPosts(3): recvs";
131  const std::string name_doPosts4_recvs = "Tpetra::Distributor: doPosts(4): recvs";
132  const std::string name_doPosts3_barrier = "Tpetra::Distributor: doPosts(3): barrier";
133  const std::string name_doPosts4_barrier = "Tpetra::Distributor: doPosts(4): barrier";
134  const std::string name_doPosts3_sends = "Tpetra::Distributor: doPosts(3): sends";
135  const std::string name_doPosts4_sends = "Tpetra::Distributor: doPosts(4): sends";
136 
137  timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
138  timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
139  timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
140  timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
141  timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
142  timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
143  timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
144  timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
145  timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
146  }
147 #endif // TPETRA_DISTRIBUTOR_TIMERS
148 
149  void
150  Distributor::init (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
151  const Teuchos::RCP<Teuchos::ParameterList>& plist)
152  {
153  this->setVerbLevel (debug_ ? Teuchos::VERB_EXTREME : Teuchos::VERB_NONE);
154  this->setOStream (out_);
155  if (! plist.is_null ()) {
156  // The input parameters may override the above verbosity level
157  // setting, if there is a "VerboseObject" sublist.
158  this->setParameterList (plist);
159  }
160 
161 #ifdef TPETRA_DISTRIBUTOR_TIMERS
162  makeTimers ();
163 #endif // TPETRA_DISTRIBUTOR_TIMERS
164 
165  if (debug_) {
166  Teuchos::OSTab tab (out_);
167  std::ostringstream os;
168  os << comm_->getRank ()
169  << ": Distributor ctor done" << std::endl;
170  *out_ << os.str ();
171  }
172  }
173 
174  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm)
175  : comm_ (comm)
176  , out_ (Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)))
177  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
178  , sendType_ (Details::DISTRIBUTOR_SEND)
179  , barrierBetween_ (barrierBetween_default)
180  , debug_ (tpetraDistributorDebugDefault)
181  , enable_cuda_rdma_ (enable_cuda_rdma_default)
182  , numExports_ (0)
183  , selfMessage_ (false)
184  , numSends_ (0)
185  , maxSendLength_ (0)
186  , numReceives_ (0)
187  , totalReceiveLength_ (0)
188  , lastRoundBytesSend_ (0)
189  , lastRoundBytesRecv_ (0)
190  , useDistinctTags_ (useDistinctTags_default)
191  {
192  init (comm, Teuchos::null);
193  }
194 
195  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
196  const Teuchos::RCP<Teuchos::FancyOStream>& out)
197  : comm_ (comm)
198  , out_ (out.is_null () ? Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out)
199  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
200  , sendType_ (Details::DISTRIBUTOR_SEND)
201  , barrierBetween_ (barrierBetween_default)
202  , debug_ (tpetraDistributorDebugDefault)
203  , enable_cuda_rdma_ (enable_cuda_rdma_default)
204  , numExports_ (0)
205  , selfMessage_ (false)
206  , numSends_ (0)
207  , maxSendLength_ (0)
208  , numReceives_ (0)
209  , totalReceiveLength_ (0)
210  , lastRoundBytesSend_ (0)
211  , lastRoundBytesRecv_ (0)
212  , useDistinctTags_ (useDistinctTags_default)
213  {
214  init (comm, Teuchos::null);
215  }
216 
217  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
218  const Teuchos::RCP<Teuchos::ParameterList>& plist)
219  : comm_ (comm)
220  , out_ (Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)))
221  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
222  , sendType_ (Details::DISTRIBUTOR_SEND)
223  , barrierBetween_ (barrierBetween_default)
224  , debug_ (tpetraDistributorDebugDefault)
225  , enable_cuda_rdma_ (enable_cuda_rdma_default)
226  , numExports_ (0)
227  , selfMessage_ (false)
228  , numSends_ (0)
229  , maxSendLength_ (0)
230  , numReceives_ (0)
231  , totalReceiveLength_ (0)
232  , lastRoundBytesSend_ (0)
233  , lastRoundBytesRecv_ (0)
234  , useDistinctTags_ (useDistinctTags_default)
235  {
236  init (comm, plist);
237  }
238 
239  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
240  const Teuchos::RCP<Teuchos::FancyOStream>& out,
241  const Teuchos::RCP<Teuchos::ParameterList>& plist)
242  : comm_ (comm)
243  , out_ (out.is_null () ? Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out)
244  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
245  , sendType_ (Details::DISTRIBUTOR_SEND)
246  , barrierBetween_ (barrierBetween_default)
247  , debug_ (tpetraDistributorDebugDefault)
248  , enable_cuda_rdma_ (enable_cuda_rdma_default)
249  , numExports_ (0)
250  , selfMessage_ (false)
251  , numSends_ (0)
252  , maxSendLength_ (0)
253  , numReceives_ (0)
254  , totalReceiveLength_ (0)
255  , lastRoundBytesSend_ (0)
256  , lastRoundBytesRecv_ (0)
257  , useDistinctTags_ (useDistinctTags_default)
258  {
259  init (comm, plist);
260  }
261 
262  Distributor::Distributor (const Distributor & distributor)
263  : comm_ (distributor.comm_)
264  , out_ (distributor.out_)
265  , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
266  , sendType_ (distributor.sendType_)
267  , barrierBetween_ (distributor.barrierBetween_)
268  , debug_ (distributor.debug_)
269  , enable_cuda_rdma_ (distributor.enable_cuda_rdma_)
270  , numExports_ (distributor.numExports_)
271  , selfMessage_ (distributor.selfMessage_)
272  , numSends_ (distributor.numSends_)
273  , imagesTo_ (distributor.imagesTo_)
274  , startsTo_ (distributor.startsTo_)
275  , lengthsTo_ (distributor.lengthsTo_)
276  , maxSendLength_ (distributor.maxSendLength_)
277  , indicesTo_ (distributor.indicesTo_)
278  , numReceives_ (distributor.numReceives_)
279  , totalReceiveLength_ (distributor.totalReceiveLength_)
280  , lengthsFrom_ (distributor.lengthsFrom_)
281  , imagesFrom_ (distributor.imagesFrom_)
282  , startsFrom_ (distributor.startsFrom_)
283  , indicesFrom_ (distributor.indicesFrom_)
284  , reverseDistributor_ (distributor.reverseDistributor_)
285  , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
286  , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
287  , useDistinctTags_ (distributor.useDistinctTags_)
288  {
289  using Teuchos::ParameterList;
290  using Teuchos::parameterList;
291  using Teuchos::RCP;
292  using Teuchos::rcp;
293 
294  this->setVerbLevel (distributor.getVerbLevel ());
295  this->setOStream (out_);
296  // The input parameters may override the above verbosity level
297  // setting, if there is a "VerboseObject" sublist.
298  //
299  // Clone the right-hand side's ParameterList, so that this' list
300  // is decoupled from the right-hand side's list. We don't need to
301  // do validation, since the right-hand side already has validated
302  // its parameters, so just call setMyParamList(). Note that this
303  // won't work if the right-hand side doesn't have a list set yet,
304  // so we first check for null.
305  RCP<const ParameterList> rhsList = distributor.getParameterList ();
306  if (! rhsList.is_null ()) {
307  this->setMyParamList (parameterList (* rhsList));
308  }
309 
310 #ifdef TPETRA_DISTRIBUTOR_TIMERS
311  makeTimers ();
312 #endif // TPETRA_DISTRIBUTOR_TIMERS
313 
314  if (debug_) {
315  Teuchos::OSTab tab (out_);
316  std::ostringstream os;
317  os << comm_->getRank ()
318  << ": Distributor copy ctor done" << std::endl;
319  *out_ << os.str ();
320  }
321  }
322 
324  using Teuchos::ParameterList;
325  using Teuchos::parameterList;
326  using Teuchos::RCP;
327 
328  std::swap (comm_, rhs.comm_);
329  std::swap (out_, rhs.out_);
330  std::swap (howInitialized_, rhs.howInitialized_);
331  std::swap (sendType_, rhs.sendType_);
332  std::swap (barrierBetween_, rhs.barrierBetween_);
333  std::swap (debug_, rhs.debug_);
334  std::swap (enable_cuda_rdma_, rhs.enable_cuda_rdma_);
335  std::swap (numExports_, rhs.numExports_);
336  std::swap (selfMessage_, rhs.selfMessage_);
337  std::swap (numSends_, rhs.numSends_);
338  std::swap (imagesTo_, rhs.imagesTo_);
339  std::swap (startsTo_, rhs.startsTo_);
340  std::swap (lengthsTo_, rhs.lengthsTo_);
341  std::swap (maxSendLength_, rhs.maxSendLength_);
342  std::swap (indicesTo_, rhs.indicesTo_);
343  std::swap (numReceives_, rhs.numReceives_);
344  std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
345  std::swap (lengthsFrom_, rhs.lengthsFrom_);
346  std::swap (imagesFrom_, rhs.imagesFrom_);
347  std::swap (startsFrom_, rhs.startsFrom_);
348  std::swap (indicesFrom_, rhs.indicesFrom_);
349  std::swap (reverseDistributor_, rhs.reverseDistributor_);
350  std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
351  std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
352  std::swap (useDistinctTags_, rhs.useDistinctTags_);
353 
354  // Swap verbosity levels.
355  const Teuchos::EVerbosityLevel lhsVerb = this->getVerbLevel ();
356  const Teuchos::EVerbosityLevel rhsVerb = rhs.getVerbLevel ();
357  this->setVerbLevel (rhsVerb);
358  rhs.setVerbLevel (lhsVerb);
359 
360  // Swap output streams. We've swapped out_ above, but we have to
361  // tell the parent class VerboseObject about the swap.
362  this->setOStream (out_);
363  rhs.setOStream (rhs.out_);
364 
365  // Swap parameter lists. If they are the same object, make a deep
366  // copy first, so that modifying one won't modify the other one.
367  RCP<ParameterList> lhsList = this->getNonconstParameterList ();
368  RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
369  if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
370  rhsList = parameterList (*rhsList);
371  }
372  if (! rhsList.is_null ()) {
373  this->setMyParamList (rhsList);
374  }
375  if (! lhsList.is_null ()) {
376  rhs.setMyParamList (lhsList);
377  }
378 
379  // We don't need to swap timers, because all instances of
380  // Distributor use the same timers.
381  }
382 
384  {
385  // We shouldn't have any outstanding communication requests at
386  // this point.
387  TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
388  "Tpetra::Distributor: Destructor called with " << requests_.size()
389  << " outstanding posts (unfulfilled communication requests). There "
390  "should be none at this point. Please report this bug to the Tpetra "
391  "developers.");
392  }
393 
394  void
395  Distributor::setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
396  {
397  using Teuchos::FancyOStream;
398  using Teuchos::getIntegralValue;
399  using Teuchos::includesVerbLevel;
400  using Teuchos::OSTab;
401  using Teuchos::ParameterList;
402  using Teuchos::parameterList;
403  using Teuchos::RCP;
404  using std::endl;
405 
406  RCP<const ParameterList> validParams = getValidParameters ();
407  plist->validateParametersAndSetDefaults (*validParams);
408 
409  const bool barrierBetween =
410  plist->get<bool> ("Barrier between receives and sends");
411  const Details::EDistributorSendType sendType =
412  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
413  const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
414  const bool debug = plist->get<bool> ("Debug");
415  const bool enable_cuda_rdma = plist->get<bool> ("Enable MPI CUDA RDMA support");
416 
417  // We check this property explicitly, since we haven't yet learned
418  // how to make a validator that can cross-check properties.
419  // Later, turn this into a validator so that it can be embedded in
420  // the valid ParameterList and used in Optika.
421  TEUCHOS_TEST_FOR_EXCEPTION(
422  ! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
423  std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
424  << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
425  "between receives and sends." << endl << "This is invalid; you must "
426  "include the barrier if you use ready sends." << endl << "Ready sends "
427  "require that their corresponding receives have already been posted, "
428  "and the only way to guarantee that in general is with a barrier.");
429 
430  if (plist->isSublist ("VerboseObject")) {
431  // Read the "VerboseObject" sublist for (optional) verbosity
432  // settings. We've set defaults already in Distributor's
433  // constructor, so we don't need this sublist to exist.
434  Teuchos::readVerboseObjectSublist (&*plist, this);
435  }
436 
437  // Now that we've validated the input list, save the results.
438  sendType_ = sendType;
439  barrierBetween_ = barrierBetween;
440  useDistinctTags_ = useDistinctTags;
441  debug_ = debug;
442  enable_cuda_rdma_ = enable_cuda_rdma;
443 
444  // ParameterListAcceptor semantics require pointer identity of the
445  // sublist passed to setParameterList(), so we save the pointer.
446  this->setMyParamList (plist);
447  }
448 
449  Teuchos::RCP<const Teuchos::ParameterList>
451  {
452  using Teuchos::Array;
453  using Teuchos::ParameterList;
454  using Teuchos::parameterList;
455  using Teuchos::RCP;
456  using Teuchos::setStringToIntegralParameter;
457 
458  const bool barrierBetween = barrierBetween_default;
459  const bool useDistinctTags = useDistinctTags_default;
460  const bool debug = tpetraDistributorDebugDefault;
461  const bool enable_cuda_rdma = enable_cuda_rdma_default;
462 
463  Array<std::string> sendTypes = distributorSendTypes ();
464  const std::string defaultSendType ("Send");
465  Array<Details::EDistributorSendType> sendTypeEnums;
466  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
467  sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
468  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
469  sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
470 
471  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
472  plist->set ("Barrier between receives and sends", barrierBetween,
473  "Whether to execute a barrier between receives and sends in do"
474  "[Reverse]Posts(). Required for correctness when \"Send type\""
475  "=\"Rsend\", otherwise correct but not recommended.");
476  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
477  defaultSendType, "When using MPI, the variant of send to use in "
478  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
479  plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
480  "MPI message tags for different code paths.");
481  plist->set ("Debug", debug, "Whether to print copious debugging output on "
482  "all processes.");
483  plist->set ("Enable MPI CUDA RDMA support", enable_cuda_rdma,
484  "Whether to enable RDMA support for MPI communication between "
485  "CUDA GPUs. Only enable this if you know for sure your MPI "
486  "library supports it.");
487 
488  Teuchos::setupVerboseObjectSublist (&*plist);
489  return Teuchos::rcp_const_cast<const ParameterList> (plist);
490  }
491 
492 
494  { return totalReceiveLength_; }
495 
497  { return numReceives_; }
498 
500  { return selfMessage_; }
501 
503  { return numSends_; }
504 
506  { return maxSendLength_; }
507 
508  Teuchos::ArrayView<const int> Distributor::getImagesFrom() const
509  { return imagesFrom_; }
510 
511  Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
512  { return lengthsFrom_; }
513 
514  Teuchos::ArrayView<const int> Distributor::getImagesTo() const
515  { return imagesTo_; }
516 
517  Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
518  { return lengthsTo_; }
519 
520  Teuchos::RCP<Distributor>
522  if (reverseDistributor_.is_null ()) {
523  createReverseDistributor ();
524  }
525  return reverseDistributor_;
526  }
527 
528 
529  void
530  Distributor::createReverseDistributor() const
531  {
532  reverseDistributor_ = Teuchos::rcp (new Distributor (comm_));
533 
534  // The total length of all the sends of this Distributor. We
535  // calculate it because it's the total length of all the receives
536  // of the reverse Distributor.
537  size_t totalSendLength =
538  std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
539 
540  // The maximum length of any of the receives of this Distributor.
541  // We calculate it because it's the maximum length of any of the
542  // sends of the reverse Distributor.
543  size_t maxReceiveLength = 0;
544  const int myImageID = comm_->getRank();
545  for (size_t i=0; i < numReceives_; ++i) {
546  if (imagesFrom_[i] != myImageID) {
547  // Don't count receives for messages sent by myself to myself.
548  if (lengthsFrom_[i] > maxReceiveLength) {
549  maxReceiveLength = lengthsFrom_[i];
550  }
551  }
552  }
553 
554  // Initialize all of reverseDistributor's data members. This
555  // mainly just involves flipping "send" and "receive," or the
556  // equivalent "to" and "from."
557  reverseDistributor_->lengthsTo_ = lengthsFrom_;
558  reverseDistributor_->imagesTo_ = imagesFrom_;
559  reverseDistributor_->indicesTo_ = indicesFrom_;
560  reverseDistributor_->startsTo_ = startsFrom_;
561  reverseDistributor_->lengthsFrom_ = lengthsTo_;
562  reverseDistributor_->imagesFrom_ = imagesTo_;
563  reverseDistributor_->indicesFrom_ = indicesTo_;
564  reverseDistributor_->startsFrom_ = startsTo_;
565  reverseDistributor_->numSends_ = numReceives_;
566  reverseDistributor_->numReceives_ = numSends_;
567  reverseDistributor_->selfMessage_ = selfMessage_;
568  reverseDistributor_->maxSendLength_ = maxReceiveLength;
569  reverseDistributor_->totalReceiveLength_ = totalSendLength;
570  reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
571 
572  // Note: technically, I am my reverse distributor's reverse distributor, but
573  // we will not set this up, as it gives us an opportunity to test
574  // that reverseDistributor is an inverse operation w.r.t. value semantics of distributors
575  // Note: numExports_ was not copied
576  }
577 
578 
580  using Teuchos::Array;
581  using Teuchos::CommRequest;
582  using Teuchos::FancyOStream;
583  using Teuchos::includesVerbLevel;
584  using Teuchos::is_null;
585  using Teuchos::OSTab;
586  using Teuchos::RCP;
587  using Teuchos::waitAll;
588  using std::endl;
589 
590  Teuchos::OSTab tab (out_);
591 
592 #ifdef TPETRA_DISTRIBUTOR_TIMERS
593  Teuchos::TimeMonitor timeMon (*timer_doWaits_);
594 #endif // TPETRA_DISTRIBUTOR_TIMERS
595 
596  const int myRank = comm_->getRank ();
597 
598  if (debug_) {
599  std::ostringstream os;
600  os << myRank << ": doWaits: # reqs = "
601  << requests_.size () << endl;
602  *out_ << os.str ();
603  }
604 
605  if (requests_.size() > 0) {
606  waitAll (*comm_, requests_());
607 
608 #ifdef HAVE_TEUCHOS_DEBUG
609  // Make sure that waitAll() nulled out all the requests.
610  for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
611  it != requests_.end(); ++it)
612  {
613  TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
614  Teuchos::typeName(*this) << "::doWaits(): Communication requests "
615  "should all be null aftr calling Teuchos::waitAll() on them, but "
616  "at least one request is not null.");
617  }
618 #endif // HAVE_TEUCHOS_DEBUG
619  // Restore the invariant that requests_.size() is the number of
620  // outstanding nonblocking communication requests.
621  requests_.resize (0);
622  }
623 
624 #ifdef HAVE_TEUCHOS_DEBUG
625  {
626  const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
627  int globalSizeNonzero = 0;
628  Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
629  localSizeNonzero,
630  Teuchos::outArg (globalSizeNonzero));
631  TEUCHOS_TEST_FOR_EXCEPTION(
632  globalSizeNonzero != 0, std::runtime_error,
633  "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
634  "a nonzero number of outstanding posts. There should be none at this "
635  "point. Please report this bug to the Tpetra developers.");
636  }
637 #endif // HAVE_TEUCHOS_DEBUG
638 
639  if (debug_) {
640  std::ostringstream os;
641  os << myRank << ": doWaits done" << endl;
642  *out_ << os.str ();
643  }
644  }
645 
647  // call doWaits() on the reverse Distributor, if it exists
648  if (! reverseDistributor_.is_null()) {
649  reverseDistributor_->doWaits();
650  }
651  }
652 
653  std::string Distributor::description () const {
654  std::ostringstream out;
655 
656  out << "\"Tpetra::Distributor\": {";
657  const std::string label = this->getObjectLabel ();
658  if (label != "") {
659  out << "Label: " << label << ", ";
660  }
661  out << "How initialized: "
663  << ", Parameters: {"
664  << "Send type: "
665  << DistributorSendTypeEnumToString (sendType_)
666  << ", Barrier between receives and sends: "
667  << (barrierBetween_ ? "true" : "false")
668  << ", Use distinct tags: "
669  << (useDistinctTags_ ? "true" : "false")
670  << ", Debug: " << (debug_ ? "true" : "false")
671  << ", Enable MPI CUDA RDMA support: "
672  << (enable_cuda_rdma_ ? "true" : "false")
673  << "}}";
674  return out.str ();
675  }
676 
677  void
678  Distributor::describe (Teuchos::FancyOStream &out,
679  const Teuchos::EVerbosityLevel verbLevel) const
680  {
681  using std::endl;
682  using std::setw;
683  using Teuchos::VERB_DEFAULT;
684  using Teuchos::VERB_NONE;
685  using Teuchos::VERB_LOW;
686  using Teuchos::VERB_MEDIUM;
687  using Teuchos::VERB_HIGH;
688  using Teuchos::VERB_EXTREME;
689  Teuchos::EVerbosityLevel vl = verbLevel;
690  if (vl == VERB_DEFAULT) vl = VERB_LOW;
691  const int myImageID = comm_->getRank();
692  const int numImages = comm_->getSize();
693  Teuchos::OSTab tab (out);
694 
695  if (vl == VERB_NONE) {
696  return;
697  } else {
698  if (myImageID == 0) {
699  // VERB_LOW and higher prints description() (on Proc 0 only).
700  // We quote the class name because it contains colons:
701  // quoting makes the output valid YAML.
702  out << "\"Tpetra::Distributor\":" << endl;
703  Teuchos::OSTab tab2 (out);
704  const std::string label = this->getObjectLabel ();
705  if (label != "") {
706  out << "Label: " << label << endl;
707  }
708  out << "How initialized: "
710  << endl << "Parameters: " << endl;
711  {
712  Teuchos::OSTab tab3 (out);
713  out << "\"Send type\": "
714  << DistributorSendTypeEnumToString (sendType_) << endl
715  << "\"Barrier between receives and sends\": "
716  << (barrierBetween_ ? "true" : "false") << endl;
717  out << "\"Use distinct tags\": "
718  << (useDistinctTags_ ? "true" : "false") << endl;
719  out << "\"Debug\": " << (debug_ ? "true" : "false") << endl;
720  out << "\"Enable MPI CUDA RDMA support\": " <<
721  (enable_cuda_rdma_ ? "true" : "false") << endl;
722  }
723  }
724  if (vl == VERB_LOW) {
725  return;
726  } else {
727  Teuchos::OSTab tab2 (out);
728  // vl > VERB_LOW lets each image print its data. We assume
729  // that all images can print to the given output stream, and
730  // execute barriers to make it more likely that the output
731  // will be in the right order.
732  for (int imageCtr = 0; imageCtr < numImages; ++imageCtr) {
733  if (myImageID == imageCtr) {
734  if (myImageID == 0) {
735  out << "Number of processes: " << numImages << endl;
736  }
737  out << "Process: " << myImageID << endl;
738  Teuchos::OSTab tab3 (out);
739  out << "selfMessage: " << hasSelfMessage () << endl;
740  out << "numSends: " << getNumSends () << endl;
741  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
742  out << "imagesTo: " << toString (imagesTo_) << endl;
743  out << "lengthsTo: " << toString (lengthsTo_) << endl;
744  out << "maxSendLength: " << getMaxSendLength () << endl;
745  }
746  if (vl == VERB_EXTREME) {
747  out << "startsTo: " << toString (startsTo_) << endl;
748  out << "indicesTo: " << toString (indicesTo_) << endl;
749  }
750  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
751  out << "numReceives: " << getNumReceives () << endl;
752  out << "totalReceiveLength: " << getTotalReceiveLength () << endl;
753  out << "lengthsFrom: " << toString (lengthsFrom_) << endl;
754  out << "startsFrom: " << toString (startsFrom_) << endl;
755  out << "imagesFrom: " << toString (imagesFrom_) << endl;
756  }
757  // Last output is a flush; it leaves a space and also
758  // helps synchronize output.
759  out << std::flush;
760  } // if it's my image's turn to print
761  // Execute barriers to give output time to synchronize.
762  // One barrier generally isn't enough.
763  comm_->barrier();
764  comm_->barrier();
765  comm_->barrier();
766  } // for each image
767  }
768  }
769  }
770 
771  void
772  Distributor::computeReceives ()
773  {
774  using Teuchos::Array;
775  using Teuchos::as;
776  using Teuchos::CommStatus;
777  using Teuchos::CommRequest;
778  using Teuchos::ireceive;
779  using Teuchos::RCP;
780  using Teuchos::rcp;
781  using Teuchos::REDUCE_SUM;
782  using Teuchos::receive;
783  using Teuchos::reduce;
784  using Teuchos::scatter;
785  using Teuchos::send;
786  using Teuchos::waitAll;
787  using std::endl;
788 
789  Teuchos::OSTab tab (out_);
790  const int myRank = comm_->getRank();
791  const int numProcs = comm_->getSize();
792 
793  // MPI tag for nonblocking receives and blocking sends in this method.
794  const int pathTag = 2;
795  const int tag = this->getTag (pathTag);
796 
797  if (debug_) {
798  std::ostringstream os;
799  os << myRank << ": computeReceives: "
800  "{selfMessage_: " << (selfMessage_ ? "true" : "false")
801  << ", tag: " << tag << "}" << endl;
802  *out_ << os.str ();
803  }
804 
805  // toNodesFromMe[i] == the number of messages sent by this process
806  // to process i. The data in numSends_, imagesTo_, and lengthsTo_
807  // concern the contiguous sends. Therefore, each process will be
808  // listed in imagesTo_ at most once, and so toNodesFromMe[i] will
809  // either be 0 or 1.
810  {
811  Array<int> toNodesFromMe (numProcs, 0);
812 #ifdef HAVE_TEUCHOS_DEBUG
813  bool counting_error = false;
814 #endif // HAVE_TEUCHOS_DEBUG
815  for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
816 #ifdef HAVE_TEUCHOS_DEBUG
817  if (toNodesFromMe[imagesTo_[i]] != 0) {
818  counting_error = true;
819  }
820 #endif // HAVE_TEUCHOS_DEBUG
821  toNodesFromMe[imagesTo_[i]] = 1;
822  }
823 #ifdef HAVE_TEUCHOS_DEBUG
824  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
825  "Tpetra::Distributor::computeReceives: There was an error on at least "
826  "one process in counting the number of messages send by that process to "
827  "the other processs. Please report this bug to the Tpetra developers.",
828  *comm_);
829 #endif // HAVE_TEUCHOS_DEBUG
830 
831  if (debug_) {
832  std::ostringstream os;
833  os << myRank << ": computeReceives: Calling reduce and scatter" << endl;
834  *out_ << os.str ();
835  }
836 
837  // Compute the number of receives that this process needs to
838  // post. The number of receives includes any self sends (i.e.,
839  // messages sent by this process to itself).
840  //
841  // (We will use numReceives_ this below to post exactly that
842  // number of receives, with MPI_ANY_SOURCE as the sending rank.
843  // This will tell us from which processes this process expects
844  // to receive, and how many packets of data we expect to receive
845  // from each process.)
846  //
847  // toNodesFromMe[i] is the number of messages sent by this
848  // process to process i. Compute the sum (elementwise) of all
849  // the toNodesFromMe arrays on all processes in the
850  // communicator. If the array x is that sum, then if this
851  // process has rank j, x[j] is the number of messages sent
852  // to process j, that is, the number of receives on process j
853  // (including any messages sent by process j to itself).
854  //
855  // Yes, this requires storing and operating on an array of
856  // length P, where P is the number of processes in the
857  // communicator. Epetra does this too. Avoiding this O(P)
858  // memory bottleneck would require some research.
859  //
860  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
861  // implement this O(P) memory algorithm.
862  //
863  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
864  // process (0) from toNodesFromMe, to numRecvsOnEachProc.
865  // Then, scatter the latter, so that each process p gets
866  // numRecvsOnEachProc[p].
867  //
868  // 2. Like #1, but use MPI_Reduce_scatter instead of
869  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
870  // optimized to reduce the number of messages, but
871  // MPI_Reduce_scatter is more general than we need (it
872  // allows the equivalent of MPI_Scatterv). See Bug 6336.
873  //
874  // 3. Do an all-reduce on toNodesFromMe, and let my process
875  // (with rank myRank) get numReceives_ from
876  // toNodesFromMe[myRank]. The HPCCG miniapp uses the
877  // all-reduce method.
878  //
879  // Approaches 1 and 3 have the same critical path length.
880  // However, #3 moves more data. This is because the final
881  // result is just one integer, but #3 moves a whole array of
882  // results to all the processes. This is why we use Approach 1
883  // here.
884  //
885  // mfh 12 Apr 2013: See discussion in createFromSends() about
886  // how we could use this communication to propagate an error
887  // flag for "free" in a release build.
888 
889  const int root = 0; // rank of root process of the reduction
890  Array<int> numRecvsOnEachProc; // temp; only needed on root
891  if (myRank == root) {
892  numRecvsOnEachProc.resize (numProcs);
893  }
894  int numReceivesAsInt = 0; // output
895  reduce<int, int> (toNodesFromMe.getRawPtr (),
896  numRecvsOnEachProc.getRawPtr (),
897  numProcs, REDUCE_SUM, root, *comm_);
898  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
899  &numReceivesAsInt, 1, root, *comm_);
900  numReceives_ = static_cast<size_t> (numReceivesAsInt);
901  }
902 
903  // Now we know numReceives_, which is this process' number of
904  // receives. Allocate the lengthsFrom_ and imagesFrom_ arrays
905  // with this number of entries.
906  lengthsFrom_.assign (numReceives_, 0);
907  imagesFrom_.assign (numReceives_, 0);
908 
909  //
910  // Ask (via nonblocking receive) each process from which we are
911  // receiving how many packets we should expect from it in the
912  // communication pattern.
913  //
914 
915  // At this point, numReceives_ includes any self message that
916  // there may be. At the end of this routine, we'll subtract off
917  // the self message (if there is one) from numReceives_. In this
918  // routine, we don't need to receive a message from ourselves in
919  // order to figure out our lengthsFrom_ and source process ID; we
920  // can just ask ourselves directly. Thus, the actual number of
921  // nonblocking receives we post here does not include the self
922  // message.
923  const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
924 
925  // Teuchos' wrapper for nonblocking receives requires receive
926  // buffers that it knows won't go away. This is why we use RCPs,
927  // one RCP per nonblocking receive request. They get allocated in
928  // the loop below.
929  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
930  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
931  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
932 
933  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
934  // (receive data from any process).
935 #ifdef HAVE_MPI
936  const int anySourceProc = MPI_ANY_SOURCE;
937 #else
938  const int anySourceProc = -1;
939 #endif
940 
941  if (debug_) {
942  std::ostringstream os;
943  os << myRank << ": computeReceives: Posting "
944  << actualNumReceives << " irecvs" << endl;
945  *out_ << os.str ();
946  }
947 
948  // Post the (nonblocking) receives.
949  for (size_t i = 0; i < actualNumReceives; ++i) {
950  // Once the receive completes, we can ask the corresponding
951  // CommStatus object (output by wait()) for the sending process'
952  // ID (which we'll assign to imagesFrom_[i] -- don't forget to
953  // do that!).
954  lengthsFromBuffers[i].resize (1);
955  lengthsFromBuffers[i][0] = as<size_t> (0);
956  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc, tag, *comm_);
957  if (debug_) {
958  std::ostringstream os;
959  os << myRank << ": computeReceives: "
960  "Posted any-proc irecv w/ specified tag " << tag << endl;
961  *out_ << os.str ();
962  }
963  }
964 
965  if (debug_) {
966  std::ostringstream os;
967  os << myRank << ": computeReceives: "
968  "posting " << numSends_ << " sends" << endl;
969  *out_ << os.str ();
970  }
971  // Post the sends: Tell each process to which we are sending how
972  // many packets it should expect from us in the communication
973  // pattern. We could use nonblocking sends here, as long as we do
974  // a waitAll() on all the sends and receives at once.
975  //
976  // We assume that numSends_ and selfMessage_ have already been
977  // set. The value of numSends_ (my process' number of sends) does
978  // not include any message that it might send to itself.
979  for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
980  if (imagesTo_[i] != myRank) {
981  // Send a message to imagesTo_[i], telling that process that
982  // this communication pattern will send that process
983  // lengthsTo_[i] blocks of packets.
984  const size_t* const lengthsTo_i = &lengthsTo_[i];
985  send<int, size_t> (lengthsTo_i, 1, as<int> (imagesTo_[i]), tag, *comm_);
986  if (debug_) {
987  std::ostringstream os;
988  os << myRank << ": computeReceives: "
989  "Posted send to Proc " << imagesTo_[i] << " w/ specified tag "
990  << tag << endl;
991  *out_ << os.str ();
992  }
993  }
994  else {
995  // We don't need a send in the self-message case. If this
996  // process will send a message to itself in the communication
997  // pattern, then the last element of lengthsFrom_ and
998  // imagesFrom_ corresponds to the self-message. Of course
999  // this process knows how long the message is, and the process
1000  // ID is its own process ID.
1001  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1002  imagesFrom_[numReceives_-1] = myRank;
1003  }
1004  }
1005 
1006  if (debug_) {
1007  std::ostringstream os;
1008  os << myRank << ": computeReceives: waitAll on "
1009  << requests.size () << " requests" << endl;
1010  *out_ << os.str ();
1011  }
1012  //
1013  // Wait on all the receives. When they arrive, check the status
1014  // output of wait() for the receiving process ID, unpack the
1015  // request buffers into lengthsFrom_, and set imagesFrom_ from the
1016  // status.
1017  //
1018  waitAll (*comm_, requests (), statuses ());
1019  for (size_t i = 0; i < actualNumReceives; ++i) {
1020  lengthsFrom_[i] = *lengthsFromBuffers[i];
1021  imagesFrom_[i] = statuses[i]->getSourceRank ();
1022  }
1023 
1024  // Sort the imagesFrom_ array, and apply the same permutation to
1025  // lengthsFrom_. This ensures that imagesFrom_[i] and
1026  // lengthsFrom_[i] refers to the same thing.
1027  sort2 (imagesFrom_.begin(), imagesFrom_.end(), lengthsFrom_.begin());
1028 
1029  // Compute indicesFrom_
1030  totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
1031  indicesFrom_.clear ();
1032  indicesFrom_.reserve (totalReceiveLength_);
1033  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1034  indicesFrom_.push_back(i);
1035  }
1036 
1037  startsFrom_.clear ();
1038  startsFrom_.reserve (numReceives_);
1039  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
1040  startsFrom_.push_back(j);
1041  j += lengthsFrom_[i];
1042  }
1043 
1044  if (selfMessage_) {
1045  --numReceives_;
1046  }
1047 
1048  if (debug_) {
1049  std::ostringstream os;
1050  os << myRank << ": computeReceives: done" << endl;
1051  *out_ << os.str ();
1052  }
1053  }
1054 
1055  size_t
1056  Distributor::createFromSends (const Teuchos::ArrayView<const int> &exportNodeIDs)
1057  {
1058  using Teuchos::outArg;
1059  using Teuchos::REDUCE_MAX;
1060  using Teuchos::reduceAll;
1061  using std::endl;
1062 
1063  Teuchos::OSTab tab (out_);
1064 
1065  numExports_ = exportNodeIDs.size();
1066 
1067  const int myImageID = comm_->getRank();
1068  const int numImages = comm_->getSize();
1069  if (debug_) {
1070  std::ostringstream os;
1071  os << myImageID << ": createFromSends" << endl;
1072  *out_ << os.str ();
1073  }
1074 
1075  // exportNodeIDs tells us the communication pattern for this
1076  // distributor. It dictates the way that the export data will be
1077  // interpreted in doPosts(). We want to perform at most one
1078  // send per process in doPosts; this is for two reasons:
1079  // * minimize latency / overhead in the comm routines (nice)
1080  // * match the number of receives and sends between processes
1081  // (necessary)
1082  //
1083  // Teuchos::Comm requires that the data for a send are contiguous
1084  // in a send buffer. Therefore, if the data in the send buffer
1085  // for doPosts() are not contiguous, they will need to be copied
1086  // into a contiguous buffer. The user has specified this
1087  // noncontiguous pattern and we can't do anything about it.
1088  // However, if they do not provide an efficient pattern, we will
1089  // warn them if one of the following compile-time options has been
1090  // set:
1091  // * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
1092  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
1093  //
1094  // If the data are contiguous, then we can post the sends in situ
1095  // (i.e., without needing to copy them into a send buffer).
1096  //
1097  // Determine contiguity. There are a number of ways to do this:
1098  // * If the export IDs are sorted, then all exports to a
1099  // particular node must be contiguous. This is what Epetra does.
1100  // * If the export ID of the current export already has been
1101  // listed, then the previous listing should correspond to the
1102  // same export. This tests contiguity, but not sortedness.
1103  //
1104  // Both of these tests require O(n), where n is the number of
1105  // exports. However, the latter will positively identify a greater
1106  // portion of contiguous patterns. We use the latter method.
1107  //
1108  // Check to see if values are grouped by images without gaps
1109  // If so, indices_to -> 0.
1110 
1111  // Set up data structures for quick traversal of arrays.
1112  // This contains the number of sends for each process ID.
1113  //
1114  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
1115  // that create an array of length the number of processes in the
1116  // communicator (plus one). Given how this code uses this array,
1117  // it should be straightforward to replace it with a hash table or
1118  // some other more space-efficient data structure. In practice,
1119  // most of the entries of starts should be zero for a sufficiently
1120  // large process count, unless the communication pattern is dense.
1121  // Note that it's important to be able to iterate through keys (i
1122  // for which starts[i] is nonzero) in increasing order.
1123  Teuchos::Array<size_t> starts (numImages + 1, 0);
1124 
1125  // numActive is the number of sends that are not Null
1126  size_t numActive = 0;
1127  int needSendBuff = 0; // Boolean
1128 
1129 #ifdef HAVE_TPETRA_DEBUG
1130  int badID = -1; // only used in a debug build
1131 #endif // HAVE_TPETRA_DEBUG
1132  for (size_t i = 0; i < numExports_; ++i) {
1133  const int exportID = exportNodeIDs[i];
1134  if (exportID >= numImages) {
1135 #ifdef HAVE_TPETRA_DEBUG
1136  badID = myImageID;
1137 #endif // HAVE_TPETRA_DEBUG
1138  break;
1139  }
1140  else if (exportID >= 0) {
1141  // exportID is a valid process ID. Increment the number of
1142  // messages this process will send to that process.
1143  ++starts[exportID];
1144 
1145  // If we're sending more than one message to process exportID,
1146  // then it is possible that the data are not contiguous.
1147  // Check by seeing if the previous process ID in the list
1148  // (exportNodeIDs[i-1]) is the same. It's safe to use i-1,
1149  // because if starts[exportID] > 1, then i must be > 1 (since
1150  // the starts array was filled with zeros initially).
1151 
1152  // null entries break continuity.
1153  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
1154  if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportNodeIDs[i-1]) {
1155  needSendBuff = 1;
1156  }
1157  ++numActive;
1158  }
1159  }
1160 
1161 #ifdef HAVE_TPETRA_DEBUG
1162  // Test whether any process in the communicator got an invalid
1163  // process ID. If badID != -1 on this process, then it equals
1164  // this process' rank. The max of all badID over all processes is
1165  // the max rank which has an invalid process ID.
1166  {
1167  int gbl_badID;
1168  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1169  TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1170  Teuchos::typeName(*this) << "::createFromSends(): Process " << gbl_badID
1171  << ", perhaps among other processes, got a bad send process ID.");
1172  }
1173 #else
1174  // FIXME (mfh 12 Apr 2013, 15 Jul 2015) Rather than simply
1175  // ignoring this information, we should think about how to pass it
1176  // along so that all the processes find out about it. In a
1177  // release build with efficiency warnings turned off, the next
1178  // collective communication happens in computeReceives(). We
1179  // could figure out how to encode the error flag in that
1180  // operation, for example by adding an extra entry to the
1181  // collective's output array that encodes the error condition (0
1182  // on all processes if no error, else 1 on any process with the
1183  // error, so that the sum will produce a nonzero value if any
1184  // process had an error). I'll defer this change for now and
1185  // recommend instead that people with troubles try a debug build.
1186 #endif // HAVE_TPETRA_DEBUG
1187 
1188 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1189  {
1190  int global_needSendBuff;
1191  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1192  outArg (global_needSendBuff));
1194  global_needSendBuff != 0, std::runtime_error,
1195  "::createFromSends: Grouping export IDs together by process rank often "
1196  "improves performance.");
1197  }
1198 #endif
1199 
1200  // Determine from the caller's data whether or not the current
1201  // process should send (a) message(s) to itself.
1202  if (starts[myImageID] != 0) {
1203  selfMessage_ = true;
1204  }
1205  else {
1206  selfMessage_ = false;
1207  }
1208 
1209 #ifdef HAVE_TEUCHOS_DEBUG
1210  bool index_neq_numActive = false;
1211  bool send_neq_numSends = false;
1212 #endif
1213  if (! needSendBuff) {
1214  // grouped by image, no send buffer or indicesTo_ needed
1215  numSends_ = 0;
1216  // Count total number of sends, i.e., total number of images to
1217  // which we are sending. This includes myself, if applicable.
1218  for (int i = 0; i < numImages; ++i) {
1219  if (starts[i]) {
1220  ++numSends_;
1221  }
1222  }
1223 
1224  // Not only do we not need these, but we must clear them, as
1225  // empty status of indicesTo is a flag used later.
1226  indicesTo_.resize(0);
1227  // Size these to numSends_; note, at the moment, numSends_
1228  // includes self sends. Set their values to zeros.
1229  imagesTo_.assign(numSends_,0);
1230  startsTo_.assign(numSends_,0);
1231  lengthsTo_.assign(numSends_,0);
1232 
1233  // set startsTo to the offset for each send (i.e., each image ID)
1234  // set imagesTo to the image ID for each send
1235  // in interpreting this code, remember that we are assuming contiguity
1236  // that is why index skips through the ranks
1237  {
1238  size_t index = 0, nodeIndex = 0;
1239  for (size_t i = 0; i < numSends_; ++i) {
1240  while (exportNodeIDs[nodeIndex] < 0) {
1241  ++nodeIndex; // skip all negative node IDs
1242  }
1243  startsTo_[i] = nodeIndex;
1244  int imageID = exportNodeIDs[nodeIndex];
1245  imagesTo_[i] = imageID;
1246  index += starts[imageID];
1247  nodeIndex += starts[imageID];
1248  }
1249 #ifdef HAVE_TEUCHOS_DEBUG
1250  if (index != numActive) {
1251  index_neq_numActive = true;
1252  }
1253 #endif
1254  }
1255  // sort the startsTo and image IDs together, in ascending order, according
1256  // to image IDs
1257  if (numSends_ > 0) {
1258  sort2(imagesTo_.begin(), imagesTo_.end(), startsTo_.begin());
1259  }
1260  // compute the maximum send length
1261  maxSendLength_ = 0;
1262  for (size_t i = 0; i < numSends_; ++i) {
1263  int imageID = imagesTo_[i];
1264  lengthsTo_[i] = starts[imageID];
1265  if ((imageID != myImageID) && (lengthsTo_[i] > maxSendLength_)) {
1266  maxSendLength_ = lengthsTo_[i];
1267  }
1268  }
1269  }
1270  else {
1271  // not grouped by image, need send buffer and indicesTo_
1272 
1273  // starts[i] is the number of sends to node i
1274  // numActive equals number of sends total, \sum_i starts[i]
1275 
1276  // this loop starts at starts[1], so explicitly check starts[0]
1277  if (starts[0] == 0 ) {
1278  numSends_ = 0;
1279  }
1280  else {
1281  numSends_ = 1;
1282  }
1283  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1284  im1=starts.begin();
1285  i != starts.end(); ++i)
1286  {
1287  if (*i != 0) ++numSends_;
1288  *i += *im1;
1289  im1 = i;
1290  }
1291  // starts[i] now contains the number of exports to nodes 0 through i
1292 
1293  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1294  i=starts.rbegin()+1;
1295  i != starts.rend(); ++i)
1296  {
1297  *ip1 = *i;
1298  ip1 = i;
1299  }
1300  starts[0] = 0;
1301  // starts[i] now contains the number of exports to nodes 0 through
1302  // i-1, i.e., all nodes before node i
1303 
1304  indicesTo_.resize(numActive);
1305 
1306  for (size_t i = 0; i < numExports_; ++i) {
1307  if (exportNodeIDs[i] >= 0) {
1308  // record the offset to the sendBuffer for this export
1309  indicesTo_[starts[exportNodeIDs[i]]] = i;
1310  // now increment the offset for this node
1311  ++starts[exportNodeIDs[i]];
1312  }
1313  }
1314  // our send buffer will contain the export data for each of the nodes
1315  // we communicate with, in order by node id
1316  // sendBuffer = {node_0_data, node_1_data, ..., node_np-1_data}
1317  // indicesTo now maps each export to the location in our send buffer
1318  // associated with the export
1319  // data for export i located at sendBuffer[indicesTo[i]]
1320  //
1321  // starts[i] once again contains the number of exports to
1322  // nodes 0 through i
1323  for (int node = numImages-1; node != 0; --node) {
1324  starts[node] = starts[node-1];
1325  }
1326  starts.front() = 0;
1327  starts[numImages] = numActive;
1328  //
1329  // starts[node] once again contains the number of exports to
1330  // nodes 0 through node-1
1331  // i.e., the start of my data in the sendBuffer
1332 
1333  // this contains invalid data at nodes we don't care about, that is okay
1334  imagesTo_.resize(numSends_);
1335  startsTo_.resize(numSends_);
1336  lengthsTo_.resize(numSends_);
1337 
1338  // for each group of sends/exports, record the destination node,
1339  // the length, and the offset for this send into the
1340  // send buffer (startsTo_)
1341  maxSendLength_ = 0;
1342  size_t snd = 0;
1343  for (int node = 0; node < numImages; ++node ) {
1344  if (starts[node+1] != starts[node]) {
1345  lengthsTo_[snd] = starts[node+1] - starts[node];
1346  startsTo_[snd] = starts[node];
1347  // record max length for all off-node sends
1348  if ((node != myImageID) && (lengthsTo_[snd] > maxSendLength_)) {
1349  maxSendLength_ = lengthsTo_[snd];
1350  }
1351  imagesTo_[snd] = node;
1352  ++snd;
1353  }
1354  }
1355 #ifdef HAVE_TEUCHOS_DEBUG
1356  if (snd != numSends_) {
1357  send_neq_numSends = true;
1358  }
1359 #endif
1360  }
1361 #ifdef HAVE_TEUCHOS_DEBUG
1362  SHARED_TEST_FOR_EXCEPTION(index_neq_numActive, std::logic_error,
1363  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1364  SHARED_TEST_FOR_EXCEPTION(send_neq_numSends, std::logic_error,
1365  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1366 #endif
1367 
1368  if (selfMessage_) --numSends_;
1369 
1370  // Invert map to see what msgs are received and what length
1371  computeReceives();
1372 
1373  if (debug_) {
1374  std::ostringstream os;
1375  os << myImageID << ": createFromSends: done" << endl;
1376  *out_ << os.str ();
1377  }
1378 
1379  // createFromRecvs() calls createFromSends(), but will set
1380  // howInitialized_ again after calling createFromSends().
1381  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1382 
1383  return totalReceiveLength_;
1384  }
1385 
1386 } // namespace Tpetra
Namespace Tpetra contains the class and methods constituting the Tpetra library.
size_t getNumReceives() const
The number of processes from which we will receive data.
std::string description() const
A simple one-line description of this object.
ArrayView< const int > getImagesTo() const
Ranks of the processes to which this process will send values.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
Implementation details of Tpetra.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
Sets up and executes a communication plan for a Tpetra DistObject.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
size_t createFromSends(const ArrayView< const int > &exportNodeIDs)
Set up Distributor using list of process ranks to which this process will send.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
virtual ~Distributor()
Destructor (virtual for memory safety).
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
ArrayView< const int > getImagesFrom() const
Ranks of the processes sending values to this process.
size_t getNumSends() const
The number of processes to which we will send data.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Print the object with some verbosity level to an FancyOStream.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
Array< std::string > distributorSendTypes()
Valid values for Distributor&#39;s "Send type" parameter.
RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.