41 #include "Tpetra_Distributor.hpp"
42 #include "Tpetra_Details_gathervPrint.hpp"
43 #include "Tpetra_Details_makeValidVerboseStream.hpp"
44 #include "Teuchos_StandardParameterEntryValidators.hpp"
45 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
53 if (sendType == DISTRIBUTOR_ISEND) {
56 else if (sendType == DISTRIBUTOR_RSEND) {
59 else if (sendType == DISTRIBUTOR_SEND) {
62 else if (sendType == DISTRIBUTOR_SSEND) {
66 TEUCHOS_TEST_FOR_EXCEPTION(
true, std::invalid_argument,
"Invalid "
67 "EDistributorSendType enum value " << sendType <<
".");
75 case Details::DISTRIBUTOR_NOT_INITIALIZED:
76 return "Not initialized yet";
77 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
78 return "By createFromSends";
79 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
80 return "By createFromRecvs";
81 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
82 return "By createFromSendsAndRecvs";
83 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
84 return "By createReverseDistributor";
85 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
86 return "By copy constructor";
93 Teuchos::Array<std::string>
96 Teuchos::Array<std::string> sendTypes;
97 sendTypes.push_back (
"Isend");
98 sendTypes.push_back (
"Rsend");
99 sendTypes.push_back (
"Send");
100 sendTypes.push_back (
"Ssend");
110 const bool tpetraDistributorDebugDefault =
false;
112 const bool barrierBetween_default =
false;
114 const bool useDistinctTags_default =
true;
117 int Distributor::getTag (
const int pathTag)
const {
118 return useDistinctTags_ ? pathTag : comm_->getTag ();
122 #ifdef TPETRA_DISTRIBUTOR_TIMERS
123 void Distributor::makeTimers () {
124 const std::string name_doPosts3 =
"Tpetra::Distributor: doPosts(3)";
125 const std::string name_doPosts4 =
"Tpetra::Distributor: doPosts(4)";
126 const std::string name_doWaits =
"Tpetra::Distributor: doWaits";
127 const std::string name_doPosts3_recvs =
"Tpetra::Distributor: doPosts(3): recvs";
128 const std::string name_doPosts4_recvs =
"Tpetra::Distributor: doPosts(4): recvs";
129 const std::string name_doPosts3_barrier =
"Tpetra::Distributor: doPosts(3): barrier";
130 const std::string name_doPosts4_barrier =
"Tpetra::Distributor: doPosts(4): barrier";
131 const std::string name_doPosts3_sends =
"Tpetra::Distributor: doPosts(3): sends";
132 const std::string name_doPosts4_sends =
"Tpetra::Distributor: doPosts(4): sends";
134 timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
135 timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
136 timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
137 timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
138 timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
139 timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
140 timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
141 timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
142 timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
147 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
148 const Teuchos::RCP<Teuchos::FancyOStream>& out,
149 const Teuchos::RCP<Teuchos::ParameterList>& plist)
152 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
153 , sendType_ (
Details::DISTRIBUTOR_SEND)
154 , barrierBetween_ (barrierBetween_default)
155 , verbose_ (tpetraDistributorDebugDefault)
156 , selfMessage_ (false)
160 , totalReceiveLength_ (0)
161 , lastRoundBytesSend_ (0)
162 , lastRoundBytesRecv_ (0)
163 , useDistinctTags_ (useDistinctTags_default)
165 TEUCHOS_ASSERT( ! out_.is_null () );
168 #ifdef TPETRA_DISTRIBUTOR_TIMERS
174 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm)
179 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
180 const Teuchos::RCP<Teuchos::FancyOStream>& out)
185 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
186 const Teuchos::RCP<Teuchos::ParameterList>& plist)
192 : comm_ (distributor.comm_)
193 , out_ (distributor.out_)
194 , howInitialized_ (
Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
195 , sendType_ (distributor.sendType_)
196 , barrierBetween_ (distributor.barrierBetween_)
197 , verbose_ (distributor.verbose_)
198 , selfMessage_ (distributor.selfMessage_)
199 , numSends_ (distributor.numSends_)
200 , procsTo_ (distributor.procsTo_)
201 , startsTo_ (distributor.startsTo_)
202 , lengthsTo_ (distributor.lengthsTo_)
203 , maxSendLength_ (distributor.maxSendLength_)
204 , indicesTo_ (distributor.indicesTo_)
205 , numReceives_ (distributor.numReceives_)
206 , totalReceiveLength_ (distributor.totalReceiveLength_)
207 , lengthsFrom_ (distributor.lengthsFrom_)
208 , procsFrom_ (distributor.procsFrom_)
209 , startsFrom_ (distributor.startsFrom_)
210 , indicesFrom_ (distributor.indicesFrom_)
211 , reverseDistributor_ (distributor.reverseDistributor_)
212 , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
213 , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
214 , useDistinctTags_ (distributor.useDistinctTags_)
216 using Teuchos::ParameterList;
220 TEUCHOS_ASSERT( ! out_.is_null () );
222 RCP<const ParameterList> rhsList = distributor.getParameterList ();
223 RCP<ParameterList> newList = rhsList.is_null () ? Teuchos::null :
224 Teuchos::parameterList (*rhsList);
227 #ifdef TPETRA_DISTRIBUTOR_TIMERS
233 using Teuchos::ParameterList;
234 using Teuchos::parameterList;
237 std::swap (comm_, rhs.comm_);
238 std::swap (out_, rhs.out_);
239 std::swap (howInitialized_, rhs.howInitialized_);
240 std::swap (sendType_, rhs.sendType_);
241 std::swap (barrierBetween_, rhs.barrierBetween_);
242 std::swap (verbose_, rhs.verbose_);
243 std::swap (selfMessage_, rhs.selfMessage_);
244 std::swap (numSends_, rhs.numSends_);
245 std::swap (procsTo_, rhs.procsTo_);
246 std::swap (startsTo_, rhs.startsTo_);
247 std::swap (lengthsTo_, rhs.lengthsTo_);
248 std::swap (maxSendLength_, rhs.maxSendLength_);
249 std::swap (indicesTo_, rhs.indicesTo_);
250 std::swap (numReceives_, rhs.numReceives_);
251 std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
252 std::swap (lengthsFrom_, rhs.lengthsFrom_);
253 std::swap (procsFrom_, rhs.procsFrom_);
254 std::swap (startsFrom_, rhs.startsFrom_);
255 std::swap (indicesFrom_, rhs.indicesFrom_);
256 std::swap (reverseDistributor_, rhs.reverseDistributor_);
257 std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
258 std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
259 std::swap (useDistinctTags_, rhs.useDistinctTags_);
263 RCP<ParameterList> lhsList = this->getNonconstParameterList ();
264 RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
265 if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
266 rhsList = parameterList (*rhsList);
268 if (! rhsList.is_null ()) {
269 this->setMyParamList (rhsList);
271 if (! lhsList.is_null ()) {
272 rhs.setMyParamList (lhsList);
283 using ::Tpetra::Details::Behavior;
284 using Teuchos::FancyOStream;
285 using Teuchos::getIntegralValue;
286 using Teuchos::includesVerbLevel;
287 using Teuchos::OSTab;
288 using Teuchos::ParameterList;
289 using Teuchos::parameterList;
293 const bool verboseDefault = Behavior::verbose (
"Distributor") ||
294 Behavior::verbose (
"Tpetra::Distributor");
296 if (plist.is_null ()) {
297 verbose_ = verboseDefault;
301 plist->validateParametersAndSetDefaults (*validParams);
303 const bool barrierBetween =
304 plist->get<
bool> (
"Barrier between receives and sends");
306 getIntegralValue<Details::EDistributorSendType> (*plist,
"Send type");
307 const bool useDistinctTags = plist->get<
bool> (
"Use distinct tags");
308 const bool debug = plist->get<
bool> (
"Debug");
313 const bool enable_cuda_rdma =
314 plist->get<
bool> (
"Enable MPI CUDA RDMA support");
315 TEUCHOS_TEST_FOR_EXCEPTION
316 (! enable_cuda_rdma, std::invalid_argument,
"Tpetra::Distributor::"
317 "setParameterList: " <<
"You specified \"Enable MPI CUDA RDMA "
318 "support\" = false. This is no longer valid. You don't need to "
319 "specify this option any more; Tpetra assumes it is always true. "
320 "This is a very light assumption on the MPI implementation, and in "
321 "fact does not actually involve hardware or system RDMA support. "
322 "Tpetra just assumes that the MPI implementation can tell whether a "
323 "pointer points to host memory or CUDA device memory.");
330 TEUCHOS_TEST_FOR_EXCEPTION
331 (! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
332 std::invalid_argument,
"Tpetra::Distributor::setParameterList: " << endl
333 <<
"You specified \"Send type\"=\"Rsend\", but turned off the barrier "
334 "between receives and sends." << endl <<
"This is invalid; you must "
335 "include the barrier if you use ready sends." << endl <<
"Ready sends "
336 "require that their corresponding receives have already been posted, "
337 "and the only way to guarantee that in general is with a barrier.");
340 sendType_ = sendType;
341 barrierBetween_ = barrierBetween;
342 useDistinctTags_ = useDistinctTags;
343 verbose_ = debug || verboseDefault;
347 this->setMyParamList (plist);
351 Teuchos::RCP<const Teuchos::ParameterList>
354 using Teuchos::Array;
355 using Teuchos::ParameterList;
356 using Teuchos::parameterList;
358 using Teuchos::setStringToIntegralParameter;
360 const bool barrierBetween = barrierBetween_default;
361 const bool useDistinctTags = useDistinctTags_default;
362 const bool debug = tpetraDistributorDebugDefault;
365 const std::string defaultSendType (
"Send");
366 Array<Details::EDistributorSendType> sendTypeEnums;
367 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
368 sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
369 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
370 sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
372 RCP<ParameterList> plist = parameterList (
"Tpetra::Distributor");
373 plist->set (
"Barrier between receives and sends", barrierBetween,
374 "Whether to execute a barrier between receives and sends in do"
375 "[Reverse]Posts(). Required for correctness when \"Send type\""
376 "=\"Rsend\", otherwise correct but not recommended.");
377 setStringToIntegralParameter<Details::EDistributorSendType> (
"Send type",
378 defaultSendType,
"When using MPI, the variant of send to use in "
379 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
380 plist->set (
"Use distinct tags", useDistinctTags,
"Whether to use distinct "
381 "MPI message tags for different code paths. Highly recommended"
382 " to avoid message collisions.");
383 plist->set (
"Debug", debug,
"Whether to print copious debugging output on "
385 plist->set (
"Timer Label",
"",
"Label for Time Monitor output");
386 plist->set (
"Enable MPI CUDA RDMA support",
true,
"Assume that MPI can "
387 "tell whether a pointer points to host memory or CUDA device "
388 "memory. You don't need to specify this option any more; "
389 "Tpetra assumes it is always true. This is a very light "
390 "assumption on the MPI implementation, and in fact does not "
391 "actually involve hardware or system RDMA support.");
399 Teuchos::setupVerboseObjectSublist (&*plist);
400 return Teuchos::rcp_const_cast<const ParameterList> (plist);
405 {
return totalReceiveLength_; }
408 {
return numReceives_; }
411 {
return selfMessage_; }
414 {
return numSends_; }
417 {
return maxSendLength_; }
420 {
return procsFrom_; }
423 {
return lengthsFrom_; }
429 {
return lengthsTo_; }
431 Teuchos::RCP<Distributor>
433 if (reverseDistributor_.is_null ()) {
434 createReverseDistributor ();
436 TEUCHOS_TEST_FOR_EXCEPTION
437 (reverseDistributor_.is_null (), std::logic_error,
"The reverse "
438 "Distributor is null after createReverseDistributor returned. "
439 "Please report this bug to the Tpetra developers.");
440 return reverseDistributor_;
445 Distributor::createReverseDistributor()
const
447 reverseDistributor_ = Teuchos::rcp (
new Distributor (comm_, out_));
448 reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
449 reverseDistributor_->sendType_ = sendType_;
450 reverseDistributor_->barrierBetween_ = barrierBetween_;
451 reverseDistributor_->verbose_ = verbose_;
456 size_t totalSendLength =
457 std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
462 size_t maxReceiveLength = 0;
463 const int myProcID = comm_->getRank();
464 for (
size_t i=0; i < numReceives_; ++i) {
465 if (procsFrom_[i] != myProcID) {
467 if (lengthsFrom_[i] > maxReceiveLength) {
468 maxReceiveLength = lengthsFrom_[i];
477 reverseDistributor_->selfMessage_ = selfMessage_;
478 reverseDistributor_->numSends_ = numReceives_;
479 reverseDistributor_->procsTo_ = procsFrom_;
480 reverseDistributor_->startsTo_ = startsFrom_;
481 reverseDistributor_->lengthsTo_ = lengthsFrom_;
482 reverseDistributor_->maxSendLength_ = maxReceiveLength;
483 reverseDistributor_->indicesTo_ = indicesFrom_;
484 reverseDistributor_->numReceives_ = numSends_;
485 reverseDistributor_->totalReceiveLength_ = totalSendLength;
486 reverseDistributor_->lengthsFrom_ = lengthsTo_;
487 reverseDistributor_->procsFrom_ = procsTo_;
488 reverseDistributor_->startsFrom_ = startsTo_;
489 reverseDistributor_->indicesFrom_ = indicesTo_;
498 reverseDistributor_->lastRoundBytesSend_ = 0;
499 reverseDistributor_->lastRoundBytesRecv_ = 0;
501 reverseDistributor_->useDistinctTags_ = useDistinctTags_;
514 reverseDistributor_->reverseDistributor_ = Teuchos::null;
519 using Teuchos::Array;
520 using Teuchos::CommRequest;
521 using Teuchos::FancyOStream;
522 using Teuchos::includesVerbLevel;
523 using Teuchos::is_null;
524 using Teuchos::OSTab;
526 using Teuchos::waitAll;
529 Teuchos::OSTab tab (out_);
531 #ifdef TPETRA_DISTRIBUTOR_TIMERS
532 Teuchos::TimeMonitor timeMon (*timer_doWaits_);
535 const int myRank = comm_->getRank ();
538 std::ostringstream os;
539 os << myRank <<
": doWaits: # reqs = "
540 << requests_.size () << endl;
544 if (requests_.size() > 0) {
545 waitAll (*comm_, requests_());
547 #ifdef HAVE_TEUCHOS_DEBUG
549 for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
550 it != requests_.end(); ++it)
552 TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
553 Teuchos::typeName(*
this) <<
"::doWaits(): Communication requests "
554 "should all be null aftr calling Teuchos::waitAll() on them, but "
555 "at least one request is not null.");
560 requests_.resize (0);
563 #ifdef HAVE_TEUCHOS_DEBUG
565 const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
566 int globalSizeNonzero = 0;
567 Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
569 Teuchos::outArg (globalSizeNonzero));
570 TEUCHOS_TEST_FOR_EXCEPTION(
571 globalSizeNonzero != 0, std::runtime_error,
572 "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
573 "a nonzero number of outstanding posts. There should be none at this "
574 "point. Please report this bug to the Tpetra developers.");
579 std::ostringstream os;
580 os << myRank <<
": doWaits done" << endl;
587 if (! reverseDistributor_.is_null()) {
588 reverseDistributor_->doWaits();
593 std::ostringstream out;
595 out <<
"\"Tpetra::Distributor\": {";
596 const std::string label = this->getObjectLabel ();
598 out <<
"Label: " << label <<
", ";
600 out <<
"How initialized: "
604 << DistributorSendTypeEnumToString (sendType_)
605 <<
", Barrier between receives and sends: "
606 << (barrierBetween_ ?
"true" :
"false")
607 <<
", Use distinct tags: "
608 << (useDistinctTags_ ?
"true" :
"false")
609 <<
", Debug: " << (verbose_ ?
"true" :
"false")
616 localDescribeToString (
const Teuchos::EVerbosityLevel vl)
const
618 using Teuchos::toString;
619 using Teuchos::VERB_HIGH;
620 using Teuchos::VERB_EXTREME;
624 if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
625 return std::string ();
628 auto outStringP = Teuchos::rcp (
new std::ostringstream ());
629 auto outp = Teuchos::getFancyOStream (outStringP);
630 Teuchos::FancyOStream& out = *outp;
632 const int myRank = comm_->getRank ();
633 const int numProcs = comm_->getSize ();
634 out <<
"Process " << myRank <<
" of " << numProcs <<
":" << endl;
635 Teuchos::OSTab tab1 (out);
639 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
640 out <<
"procsTo: " << toString (procsTo_) << endl;
641 out <<
"lengthsTo: " << toString (lengthsTo_) << endl;
644 if (vl == VERB_EXTREME) {
645 out <<
"startsTo: " << toString (startsTo_) << endl;
646 out <<
"indicesTo: " << toString (indicesTo_) << endl;
648 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
651 out <<
"lengthsFrom: " << toString (lengthsFrom_) << endl;
652 out <<
"startsFrom: " << toString (startsFrom_) << endl;
653 out <<
"procsFrom: " << toString (procsFrom_) << endl;
657 return outStringP->str ();
662 describe (Teuchos::FancyOStream &out,
663 const Teuchos::EVerbosityLevel verbLevel)
const
666 using Teuchos::VERB_DEFAULT;
667 using Teuchos::VERB_NONE;
668 using Teuchos::VERB_LOW;
669 using Teuchos::VERB_MEDIUM;
670 using Teuchos::VERB_HIGH;
671 using Teuchos::VERB_EXTREME;
672 const Teuchos::EVerbosityLevel vl =
673 (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
675 if (vl == VERB_NONE) {
683 if (comm_.is_null ()) {
686 const int myRank = comm_->getRank ();
687 const int numProcs = comm_->getSize ();
696 Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
702 tab0 = Teuchos::rcp (
new Teuchos::OSTab (out));
705 out <<
"\"Tpetra::Distributor\":" << endl;
706 tab1 = Teuchos::rcp (
new Teuchos::OSTab (out));
708 const std::string label = this->getObjectLabel ();
710 out <<
"Label: " << label << endl;
712 out <<
"Number of processes: " << numProcs << endl
713 <<
"How initialized: "
717 out <<
"Parameters: " << endl;
718 Teuchos::OSTab tab2 (out);
719 out <<
"\"Send type\": "
720 << DistributorSendTypeEnumToString (sendType_) << endl
721 <<
"\"Barrier between receives and sends\": "
722 << (barrierBetween_ ?
"true" :
"false") << endl
723 <<
"\"Use distinct tags\": "
724 << (useDistinctTags_ ?
"true" :
"false") << endl
725 <<
"\"Debug\": " << (verbose_ ?
"true" :
"false") << endl;
731 const std::string lclStr = this->localDescribeToString (vl);
735 out <<
"Reverse Distributor:";
736 if (reverseDistributor_.is_null ()) {
737 out <<
" null" << endl;
741 reverseDistributor_->describe (out, vl);
749 using Teuchos::Array;
750 using Teuchos::ArrayRCP;
752 using Teuchos::CommStatus;
753 using Teuchos::CommRequest;
754 using Teuchos::ireceive;
757 using Teuchos::REDUCE_SUM;
758 using Teuchos::receive;
759 using Teuchos::reduce;
760 using Teuchos::scatter;
762 using Teuchos::waitAll;
765 Teuchos::OSTab tab (out_);
766 const int myRank = comm_->getRank();
767 const int numProcs = comm_->getSize();
770 const int pathTag = 2;
771 const int tag = this->getTag (pathTag);
773 std::unique_ptr<std::string> prefix;
775 std::ostringstream os;
776 os <<
"Proc " << myRank <<
": computeReceives: ";
777 prefix = std::unique_ptr<std::string> (
new std::string (os.str ()));
778 os <<
"{selfMessage_: " << (selfMessage_ ?
"true" :
"false")
779 <<
", tag: " << tag <<
"}" << endl;
789 Array<int> toProcsFromMe (numProcs, 0);
790 #ifdef HAVE_TEUCHOS_DEBUG
791 bool counting_error =
false;
793 for (
size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
794 #ifdef HAVE_TEUCHOS_DEBUG
795 if (toProcsFromMe[procsTo_[i]] != 0) {
796 counting_error =
true;
799 toProcsFromMe[procsTo_[i]] = 1;
801 #ifdef HAVE_TEUCHOS_DEBUG
803 "Tpetra::Distributor::computeReceives: There was an error on at least "
804 "one process in counting the number of messages send by that process to "
805 "the other processs. Please report this bug to the Tpetra developers.",
810 std::ostringstream os;
811 os << *prefix <<
"Reduce & scatter" << endl;
868 Array<int> numRecvsOnEachProc;
869 if (myRank == root) {
870 numRecvsOnEachProc.resize (numProcs);
872 int numReceivesAsInt = 0;
873 reduce<int, int> (toProcsFromMe.getRawPtr (),
874 numRecvsOnEachProc.getRawPtr (),
875 numProcs, REDUCE_SUM, root, *comm_);
876 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
877 &numReceivesAsInt, 1, root, *comm_);
878 numReceives_ =
static_cast<size_t> (numReceivesAsInt);
884 lengthsFrom_.assign (numReceives_, 0);
885 procsFrom_.assign (numReceives_, 0);
901 const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
907 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
908 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
909 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
914 const int anySourceProc = MPI_ANY_SOURCE;
916 const int anySourceProc = -1;
920 std::ostringstream os;
921 os << *prefix <<
"Post " << actualNumReceives <<
" irecv"
922 << (actualNumReceives != size_t (1) ?
"s" :
"") << endl;
927 for (
size_t i = 0; i < actualNumReceives; ++i) {
932 lengthsFromBuffers[i].resize (1);
933 lengthsFromBuffers[i][0] = as<size_t> (0);
934 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
937 std::ostringstream os;
938 os << *prefix <<
"Posted any-proc irecv w/ tag " << tag << endl;
944 std::ostringstream os;
945 os << *prefix <<
"Post " << numSends_ <<
" send"
946 << (numSends_ != size_t (1) ?
"s" :
"") << endl;
957 for (
size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
958 if (procsTo_[i] != myRank) {
962 const size_t*
const lengthsTo_i = &lengthsTo_[i];
963 send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
965 std::ostringstream os;
966 os << *prefix <<
"Posted send to Proc " << procsTo_[i] <<
" w/ tag "
978 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
979 procsFrom_[numReceives_-1] = myRank;
984 std::ostringstream os;
985 os << myRank <<
": computeReceives: waitAll on "
986 << requests.size () <<
" requests" << endl;
995 waitAll (*comm_, requests (), statuses ());
996 for (
size_t i = 0; i < actualNumReceives; ++i) {
997 lengthsFrom_[i] = *lengthsFromBuffers[i];
998 procsFrom_[i] = statuses[i]->getSourceRank ();
1004 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1007 totalReceiveLength_ =
1008 std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
1009 indicesFrom_.clear ();
1015 indicesFrom_.reserve (totalReceiveLength_);
1016 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1017 indicesFrom_.push_back(i);
1021 startsFrom_.clear ();
1022 startsFrom_.reserve (numReceives_);
1023 for (
size_t i = 0, j = 0; i < numReceives_; ++i) {
1024 startsFrom_.push_back(j);
1025 j += lengthsFrom_[i];
1033 std::ostringstream os;
1034 os << *prefix <<
"Done!" << endl;
1043 using Teuchos::outArg;
1044 using Teuchos::REDUCE_MAX;
1045 using Teuchos::reduceAll;
1047 const char rawPrefix[] =
"Tpetra::Distributor::createFromSends: ";
1049 Teuchos::OSTab tab (out_);
1050 const size_t numExports = exportProcIDs.size();
1051 const int myProcID = comm_->getRank();
1052 const int numProcs = comm_->getSize();
1054 std::unique_ptr<std::string> prefix;
1056 std::ostringstream os;
1057 os <<
"Proc " << myProcID <<
": " << rawPrefix <<
": ";
1058 prefix = std::unique_ptr<std::string> (
new std::string (os.str ()));
1059 os <<
"exportPIDs: " << exportProcIDs << endl;
1111 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1114 size_t numActive = 0;
1115 int needSendBuff = 0;
1117 #ifdef HAVE_TPETRA_DEBUG
1120 for (
size_t i = 0; i < numExports; ++i) {
1121 const int exportID = exportProcIDs[i];
1122 if (exportID >= numProcs) {
1123 #ifdef HAVE_TPETRA_DEBUG
1128 else if (exportID >= 0) {
1142 if (needSendBuff == 0 && starts[exportID] > 1 &&
1143 exportID != exportProcIDs[i-1]) {
1150 #ifdef HAVE_TPETRA_DEBUG
1157 reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1158 TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1159 Teuchos::typeName(*
this) <<
"::createFromSends: Proc " << gbl_badID
1160 <<
", perhaps among other processes, got a bad send process ID.");
1177 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1179 int global_needSendBuff;
1180 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1181 outArg (global_needSendBuff));
1183 global_needSendBuff != 0, std::runtime_error,
1184 "::createFromSends: Grouping export IDs together by process rank often "
1185 "improves performance.");
1191 if (starts[myProcID] != 0) {
1192 selfMessage_ =
true;
1195 selfMessage_ =
false;
1198 #ifdef HAVE_TEUCHOS_DEBUG
1199 bool index_neq_numActive =
false;
1200 bool send_neq_numSends =
false;
1202 if (! needSendBuff) {
1207 for (
int i = 0; i < numProcs; ++i) {
1215 indicesTo_.resize(0);
1218 procsTo_.assign(numSends_,0);
1219 startsTo_.assign(numSends_,0);
1220 lengthsTo_.assign(numSends_,0);
1227 size_t index = 0, procIndex = 0;
1228 for (
size_t i = 0; i < numSends_; ++i) {
1229 while (exportProcIDs[procIndex] < 0) {
1232 startsTo_[i] = procIndex;
1233 int procID = exportProcIDs[procIndex];
1234 procsTo_[i] = procID;
1235 index += starts[procID];
1236 procIndex += starts[procID];
1238 #ifdef HAVE_TEUCHOS_DEBUG
1239 if (index != numActive) {
1240 index_neq_numActive =
true;
1246 if (numSends_ > 0) {
1247 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1251 for (
size_t i = 0; i < numSends_; ++i) {
1252 int procID = procsTo_[i];
1253 lengthsTo_[i] = starts[procID];
1254 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1255 maxSendLength_ = lengthsTo_[i];
1266 if (starts[0] == 0 ) {
1272 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1274 i != starts.end(); ++i)
1276 if (*i != 0) ++numSends_;
1282 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1283 i=starts.rbegin()+1;
1284 i != starts.rend(); ++i)
1293 indicesTo_.resize(numActive);
1295 for (
size_t i = 0; i < numExports; ++i) {
1296 if (exportProcIDs[i] >= 0) {
1298 indicesTo_[starts[exportProcIDs[i]]] = i;
1300 ++starts[exportProcIDs[i]];
1312 for (
int proc = numProcs-1; proc != 0; --proc) {
1313 starts[proc] = starts[proc-1];
1316 starts[numProcs] = numActive;
1323 procsTo_.resize(numSends_);
1324 startsTo_.resize(numSends_);
1325 lengthsTo_.resize(numSends_);
1332 for (
int proc = 0; proc < numProcs; ++proc ) {
1333 if (starts[proc+1] != starts[proc]) {
1334 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1335 startsTo_[snd] = starts[proc];
1337 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1338 maxSendLength_ = lengthsTo_[snd];
1340 procsTo_[snd] = proc;
1344 #ifdef HAVE_TEUCHOS_DEBUG
1345 if (snd != numSends_) {
1346 send_neq_numSends =
true;
1350 #ifdef HAVE_TEUCHOS_DEBUG
1352 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1354 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1357 if (selfMessage_) --numSends_;
1363 std::ostringstream os;
1364 os << *prefix <<
"Done!" << endl;
1370 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1372 return totalReceiveLength_;
1378 const Teuchos::ArrayView<const int>& remoteProcIDs)
1387 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1390 int myProcID = comm_->getRank ();
1391 int numProcs = comm_->getSize();
1393 const size_t numExportIDs = exportProcIDs.size();
1394 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1396 size_t numActive = 0;
1397 int needSendBuff = 0;
1399 for(
size_t i = 0; i < numExportIDs; i++ )
1401 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1403 if( exportProcIDs[i] >= 0 )
1405 ++starts[ exportProcIDs[i] ];
1410 selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1416 if (starts[0] == 0 ) {
1422 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1424 i != starts.end(); ++i)
1426 if (*i != 0) ++numSends_;
1432 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1433 i=starts.rbegin()+1;
1434 i != starts.rend(); ++i)
1443 indicesTo_.resize(numActive);
1445 for (
size_t i = 0; i < numExportIDs; ++i) {
1446 if (exportProcIDs[i] >= 0) {
1448 indicesTo_[starts[exportProcIDs[i]]] = i;
1450 ++starts[exportProcIDs[i]];
1453 for (
int proc = numProcs-1; proc != 0; --proc) {
1454 starts[proc] = starts[proc-1];
1457 starts[numProcs] = numActive;
1458 procsTo_.resize(numSends_);
1459 startsTo_.resize(numSends_);
1460 lengthsTo_.resize(numSends_);
1463 for (
int proc = 0; proc < numProcs; ++proc ) {
1464 if (starts[proc+1] != starts[proc]) {
1465 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1466 startsTo_[snd] = starts[proc];
1468 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1469 maxSendLength_ = lengthsTo_[snd];
1471 procsTo_[snd] = proc;
1481 for (
int i = 0; i < numProcs; ++i) {
1489 indicesTo_.resize(0);
1492 procsTo_.assign(numSends_,0);
1493 startsTo_.assign(numSends_,0);
1494 lengthsTo_.assign(numSends_,0);
1501 size_t index = 0, procIndex = 0;
1502 for (
size_t i = 0; i < numSends_; ++i) {
1503 while (exportProcIDs[procIndex] < 0) {
1506 startsTo_[i] = procIndex;
1507 int procID = exportProcIDs[procIndex];
1508 procsTo_[i] = procID;
1509 index += starts[procID];
1510 procIndex += starts[procID];
1515 if (numSends_ > 0) {
1516 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1520 for (
size_t i = 0; i < numSends_; ++i) {
1521 int procID = procsTo_[i];
1522 lengthsTo_[i] = starts[procID];
1523 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1524 maxSendLength_ = lengthsTo_[i];
1530 numSends_ -= selfMessage_;
1531 std::vector<int> recv_list;
1532 recv_list.reserve(numSends_);
1535 for(
int i=0; i<remoteProcIDs.size(); i++) {
1536 if(remoteProcIDs[i]>last_pid) {
1537 recv_list.push_back(remoteProcIDs[i]);
1538 last_pid = remoteProcIDs[i];
1540 else if (remoteProcIDs[i]<last_pid)
1541 throw std::runtime_error(
"Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1543 numReceives_ = recv_list.size();
1545 procsFrom_.assign(numReceives_,0);
1546 lengthsFrom_.assign(numReceives_,0);
1547 indicesFrom_.assign(numReceives_,0);
1548 startsFrom_.assign(numReceives_,0);
1550 for(
size_t i=0,j=0; i<numReceives_; ++i) {
1552 procsFrom_[i] = recv_list[i];
1554 for( ; j<(size_t)remoteProcIDs.size() &&
1555 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1556 lengthsFrom_[i] = j-jlast;
1558 totalReceiveLength_ = remoteProcIDs.size();
1559 indicesFrom_.clear ();
1565 indicesFrom_.reserve (totalReceiveLength_);
1566 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1567 indicesFrom_.push_back(i);
1570 numReceives_-=selfMessage_;
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
Sets up and executes a communication plan for a Tpetra DistObject.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
size_t getNumReceives() const
The number of processes from which we will receive data.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
Teuchos::RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.
std::string description() const
Return a one-line description of this object.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
size_t getNumSends() const
The number of processes to which we will send data.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
Implementation details of Tpetra.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator,...
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.