42 #ifndef TPETRA_DISTRIBUTOR_HPP 43 #define TPETRA_DISTRIBUTOR_HPP 46 #include <Teuchos_as.hpp> 47 #include <Teuchos_Describable.hpp> 48 #include <Teuchos_ParameterListAcceptorDefaultBase.hpp> 49 #include <Teuchos_VerboseObject.hpp> 60 #ifdef TPETRA_DISTRIBUTOR_TIMERS 61 # undef TPETRA_DISTRIBUTOR_TIMERS 62 #endif // TPETRA_DISTRIBUTOR_TIMERS 64 #include "KokkosCompat_View.hpp" 65 #include "Kokkos_Core.hpp" 66 #include "Kokkos_TeuchosCommAdapters.hpp" 95 DISTRIBUTOR_NOT_INITIALIZED,
96 DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS,
97 DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS,
98 DISTRIBUTOR_INITIALIZED_BY_REVERSE,
99 DISTRIBUTOR_INITIALIZED_BY_COPY
187 public Teuchos::Describable,
188 public Teuchos::ParameterListAcceptorDefaultBase,
189 public Teuchos::VerboseObject<Distributor> {
201 explicit Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm);
212 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
213 const Teuchos::RCP<Teuchos::FancyOStream>& out);
227 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
228 const Teuchos::RCP<Teuchos::ParameterList>& plist);
244 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
245 const Teuchos::RCP<Teuchos::FancyOStream>& out,
246 const Teuchos::RCP<Teuchos::ParameterList>& plist);
269 void setParameterList (
const Teuchos::RCP<Teuchos::ParameterList>& plist);
275 Teuchos::RCP<const Teuchos::ParameterList> getValidParameters ()
const;
300 size_t createFromSends (
const ArrayView<const int>& exportNodeIDs);
335 template <
class Ordinal>
337 createFromRecvs (
const ArrayView<const Ordinal>& remoteIDs,
338 const ArrayView<const int>& remoteNodeIDs,
339 Array<Ordinal>& exportIDs,
340 Array<int>& exportNodeIDs);
349 size_t getNumReceives()
const;
354 size_t getNumSends()
const;
357 bool hasSelfMessage()
const;
360 size_t getMaxSendLength()
const;
363 size_t getTotalReceiveLength()
const;
369 ArrayView<const int> getImagesFrom()
const;
375 ArrayView<const int> getImagesTo()
const;
384 ArrayView<const size_t> getLengthsFrom()
const;
393 ArrayView<const size_t> getLengthsTo()
const;
400 return howInitialized_;
417 RCP<Distributor> getReverse()
const;
443 template <
class Packet>
445 doPostsAndWaits (
const ArrayView<const Packet> &exports,
447 const ArrayView<Packet> &imports);
470 template <
class Packet>
472 doPostsAndWaits (
const ArrayView<const Packet> &exports,
473 const ArrayView<size_t> &numExportPacketsPerLID,
474 const ArrayView<Packet> &imports,
475 const ArrayView<size_t> &numImportPacketsPerLID);
501 template <
class Packet>
503 doPosts (
const ArrayRCP<const Packet> &exports,
505 const ArrayRCP<Packet> &imports);
525 template <
class Packet>
527 doPosts (
const ArrayRCP<const Packet> &exports,
528 const ArrayView<size_t> &numExportPacketsPerLID,
529 const ArrayRCP<Packet> &imports,
530 const ArrayView<size_t> &numImportPacketsPerLID);
544 template <
class Packet>
546 doReversePostsAndWaits (
const ArrayView<const Packet> &exports,
548 const ArrayView<Packet> &imports);
554 template <
class Packet>
556 doReversePostsAndWaits (
const ArrayView<const Packet> &exports,
557 const ArrayView<size_t> &numExportPacketsPerLID,
558 const ArrayView<Packet> &imports,
559 const ArrayView<size_t> &numImportPacketsPerLID);
565 template <
class Packet>
567 doReversePosts (
const ArrayRCP<const Packet> &exports,
569 const ArrayRCP<Packet> &imports);
575 template <
class Packet>
577 doReversePosts (
const ArrayRCP<const Packet> &exports,
578 const ArrayView<size_t> &numExportPacketsPerLID,
579 const ArrayRCP<Packet> &imports,
580 const ArrayView<size_t> &numImportPacketsPerLID);
588 void doReverseWaits ();
610 template <
class Packet,
class Layout,
class Device,
class Mem>
613 const Kokkos::View<const Packet*, Layout, Device, Mem> &exports,
615 const Kokkos::View<Packet*, Layout, Device, Mem> &imports);
638 template <
class Packet,
class Layout,
class Device,
class Mem>
640 doPostsAndWaits (
const Kokkos::View<const Packet*, Layout, Device, Mem> &exports,
641 const ArrayView<size_t> &numExportPacketsPerLID,
642 const Kokkos::View<Packet*, Layout, Device, Mem> &imports,
643 const ArrayView<size_t> &numImportPacketsPerLID);
669 template <
class Packet,
class Layout,
class Device,
class Mem>
671 doPosts (
const Kokkos::View<const Packet*, Layout, Device, Mem> &exports,
673 const Kokkos::View<Packet*, Layout, Device, Mem> &imports);
693 template <
class Packet,
class Layout,
class Device,
class Mem>
695 doPosts (
const Kokkos::View<const Packet*, Layout, Device, Mem> &exports,
696 const ArrayView<size_t> &numExportPacketsPerLID,
697 const Kokkos::View<Packet*, Layout, Device, Mem> &imports,
698 const ArrayView<size_t> &numImportPacketsPerLID);
704 template <
class Packet,
class Layout,
class Device,
class Mem>
706 doReversePostsAndWaits (
const Kokkos::View<const Packet*, Layout, Device, Mem> &exports,
708 const Kokkos::View<Packet*, Layout, Device, Mem> &imports);
714 template <
class Packet,
class Layout,
class Device,
class Mem>
716 doReversePostsAndWaits (
const Kokkos::View<const Packet*, Layout, Device, Mem> &exports,
717 const ArrayView<size_t> &numExportPacketsPerLID,
718 const Kokkos::View<Packet*, Layout, Device, Mem> &imports,
719 const ArrayView<size_t> &numImportPacketsPerLID);
725 template <
class Packet,
class Layout,
class Device,
class Mem>
727 doReversePosts (
const Kokkos::View<const Packet*, Layout, Device, Mem> &exports,
729 const Kokkos::View<Packet*, Layout, Device, Mem> &imports);
735 template <
class Packet,
class Layout,
class Device,
class Mem>
737 doReversePosts (
const Kokkos::View<const Packet*, Layout, Device, Mem> &exports,
738 const ArrayView<size_t> &numExportPacketsPerLID,
739 const Kokkos::View<Packet*, Layout, Device, Mem> &imports,
740 const ArrayView<size_t> &numImportPacketsPerLID);
746 bytes_sent = lastRoundBytesSend_;
747 bytes_recvd = lastRoundBytesRecv_;
756 std::string description()
const;
759 void describe (Teuchos::FancyOStream &out,
const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default)
const;
765 RCP<const Comm<int> > comm_;
768 Teuchos::RCP<Teuchos::FancyOStream> out_;
780 bool barrierBetween_;
795 bool enable_cuda_rdma_;
829 Array<int> imagesTo_;
839 Array<size_t> startsTo_;
846 Array<size_t> lengthsTo_;
851 size_t maxSendLength_;
868 Array<size_t> indicesTo_;
887 size_t totalReceiveLength_;
894 Array<size_t> lengthsFrom_;
901 Array<int> imagesFrom_;
908 Array<size_t> startsFrom_;
916 Array<size_t> indicesFrom_;
924 Array<RCP<Teuchos::CommRequest<int> > > requests_;
930 mutable RCP<Distributor> reverseDistributor_;
934 size_t lastRoundBytesSend_;
937 size_t lastRoundBytesRecv_;
939 #ifdef TPETRA_DISTRIBUTOR_TIMERS 940 Teuchos::RCP<Teuchos::Time> timer_doPosts3_;
941 Teuchos::RCP<Teuchos::Time> timer_doPosts4_;
942 Teuchos::RCP<Teuchos::Time> timer_doWaits_;
943 Teuchos::RCP<Teuchos::Time> timer_doPosts3_recvs_;
944 Teuchos::RCP<Teuchos::Time> timer_doPosts4_recvs_;
945 Teuchos::RCP<Teuchos::Time> timer_doPosts3_barrier_;
946 Teuchos::RCP<Teuchos::Time> timer_doPosts4_barrier_;
947 Teuchos::RCP<Teuchos::Time> timer_doPosts3_sends_;
948 Teuchos::RCP<Teuchos::Time> timer_doPosts4_sends_;
952 #endif // TPETRA_DISTRIBUTOR_TIMERS 965 bool useDistinctTags_;
971 int getTag (
const int pathTag)
const;
987 init (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
988 const Teuchos::RCP<Teuchos::ParameterList>& plist);
1000 void computeReceives ();
1014 template <
class Ordinal>
1015 void computeSends (
const ArrayView<const Ordinal> &importIDs,
1016 const ArrayView<const int> &importNodeIDs,
1017 Array<Ordinal> &exportIDs,
1018 Array<int> &exportNodeIDs);
1021 void createReverseDistributor()
const;
1026 template <
class Packet>
1030 const ArrayView<Packet>& imports)
1032 using Teuchos::arcp;
1033 using Teuchos::ArrayRCP;
1034 typedef typename ArrayRCP<const Packet>::size_type size_type;
1036 TEUCHOS_TEST_FOR_EXCEPTION(
1037 requests_.size () != 0, std::runtime_error,
"Tpetra::Distributor::" 1038 "doPostsAndWaits(3 args): There are " << requests_.size () <<
1039 " outstanding nonblocking messages pending. It is incorrect to call " 1040 "this method with posts outstanding.");
1052 ArrayRCP<const Packet> exportsArcp (exports.getRawPtr (),
1053 static_cast<size_type
> (0),
1054 exports.size(),
false);
1067 doPosts (exportsArcp,
1069 arcp<Packet> (imports.getRawPtr (), 0, imports.size (),
false));
1072 lastRoundBytesSend_ = exports.size () *
sizeof (Packet);
1073 lastRoundBytesRecv_ = imports.size () *
sizeof (Packet);
1076 template <
class Packet>
1079 const ArrayView<size_t> &numExportPacketsPerLID,
1080 const ArrayView<Packet> &imports,
1081 const ArrayView<size_t> &numImportPacketsPerLID)
1083 using Teuchos::arcp;
1084 using Teuchos::ArrayRCP;
1086 TEUCHOS_TEST_FOR_EXCEPTION(
1087 requests_.size () != 0, std::runtime_error,
"Tpetra::Distributor::" 1088 "doPostsAndWaits: There are " << requests_.size () <<
" outstanding " 1089 "nonblocking messages pending. It is incorrect to call doPostsAndWaits " 1090 "with posts outstanding.");
1103 typedef typename ArrayRCP<const Packet>::size_type size_type;
1104 ArrayRCP<const Packet> exportsArcp (exports.getRawPtr (),
1105 static_cast<size_type
> (0),
1106 exports.size (),
false);
1111 doPosts (exportsArcp,
1112 numExportPacketsPerLID,
1113 arcp<Packet> (imports.getRawPtr (), 0, imports.size (),
false),
1114 numImportPacketsPerLID);
1117 lastRoundBytesSend_ = exports.size () *
sizeof (Packet);
1118 lastRoundBytesRecv_ = imports.size () *
sizeof (Packet);
1122 template <
class Packet>
1126 const ArrayRCP<Packet>& imports)
1128 using Teuchos::Array;
1130 using Teuchos::FancyOStream;
1131 using Teuchos::includesVerbLevel;
1132 using Teuchos::ireceive;
1133 using Teuchos::isend;
1134 using Teuchos::OSTab;
1135 using Teuchos::readySend;
1136 using Teuchos::send;
1137 using Teuchos::ssend;
1138 using Teuchos::TypeNameTraits;
1139 using Teuchos::typeName;
1141 typedef Array<size_t>::size_type size_type;
1143 Teuchos::OSTab tab (out_);
1145 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1146 Teuchos::TimeMonitor timeMon (*timer_doPosts3_);
1147 #endif // TPETRA_DISTRIBUTOR_TIMERS 1152 const bool doBarrier = barrierBetween_;
1180 TEUCHOS_TEST_FOR_EXCEPTION(
1181 sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier, std::logic_error,
1182 "Tpetra::Distributor::doPosts(3 args): Ready-send version requires a " 1183 "barrier between posting receives and posting ready sends. This should " 1184 "have been checked before. " 1185 "Please report this bug to the Tpetra developers.");
1187 const int myImageID = comm_->getRank ();
1188 size_t selfReceiveOffset = 0;
1196 const size_t totalNumImportPackets = totalReceiveLength_ * numPackets;
1197 TEUCHOS_TEST_FOR_EXCEPTION(
1198 static_cast<size_t> (imports.size ()) < totalNumImportPackets,
1199 std::invalid_argument,
"Tpetra::Distributor::doPosts(3 args): The " 1200 "'imports' array must have enough entries to hold the expected number " 1201 "of import packets. imports.size() = " << imports.size () <<
" < " 1202 "totalNumImportPackets = " << totalNumImportPackets <<
".");
1209 const int pathTag = 0;
1210 const int tag = this->getTag (pathTag);
1213 TEUCHOS_TEST_FOR_EXCEPTION(
1214 requests_.size () != 0, std::logic_error,
"Tpetra::Distributor::" 1215 "doPosts(3 args): Process " << myImageID <<
": requests_.size() = " 1216 << requests_.size () <<
" != 0.");
1217 std::ostringstream os;
1218 os << myImageID <<
": doPosts(3," 1219 << (indicesTo_.empty () ?
"fast" :
"slow") <<
")" << endl;
1236 const size_type actualNumReceives = as<size_type> (numReceives_) +
1237 as<size_type> (selfMessage_ ? 1 : 0);
1238 requests_.resize (0);
1246 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1247 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3_recvs_);
1248 #endif // TPETRA_DISTRIBUTOR_TIMERS 1250 size_t curBufOffset = 0;
1251 for (size_type i = 0; i < actualNumReceives; ++i) {
1252 const size_t curBufLen = lengthsFrom_[i] * numPackets;
1253 if (imagesFrom_[i] != myImageID) {
1261 TEUCHOS_TEST_FOR_EXCEPTION(
1262 curBufOffset + curBufLen > static_cast<size_t> (imports.size ()),
1263 std::logic_error,
"Tpetra::Distributor::doPosts(3 args): Exceeded " 1264 "size of 'imports' array in packing loop on Process " << myImageID
1265 <<
". imports.size() = " << imports.size () <<
" < offset + length" 1266 " = " << (curBufOffset + curBufLen) <<
".");
1268 ArrayRCP<Packet> recvBuf =
1269 imports.persistingView (curBufOffset, curBufLen);
1270 requests_.push_back (ireceive<int, Packet> (recvBuf, imagesFrom_[i],
1273 std::ostringstream os;
1274 os << myImageID <<
": doPosts(3," 1275 << (indicesTo_.empty () ?
"fast" :
"slow") <<
"): " 1276 <<
"Posted irecv from Proc " << imagesFrom_[i] <<
" with " 1277 "specified tag " << tag << endl;
1282 selfReceiveOffset = curBufOffset;
1284 curBufOffset += curBufLen;
1289 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1290 Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts3_barrier_);
1291 #endif // TPETRA_DISTRIBUTOR_TIMERS 1300 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1301 Teuchos::TimeMonitor timeMonSends (*timer_doPosts3_sends_);
1302 #endif // TPETRA_DISTRIBUTOR_TIMERS 1309 size_t numBlocks = numSends_ + selfMessage_;
1310 size_t imageIndex = 0;
1311 while ((imageIndex < numBlocks) && (imagesTo_[imageIndex] < myImageID)) {
1314 if (imageIndex == numBlocks) {
1319 size_t selfIndex = 0;
1321 if (indicesTo_.empty()) {
1323 std::ostringstream os;
1324 os << myImageID <<
": doPosts(3,fast): posting sends" << endl;
1330 for (
size_t i = 0; i < numBlocks; ++i) {
1331 size_t p = i + imageIndex;
1332 if (p > (numBlocks - 1)) {
1336 if (imagesTo_[p] != myImageID) {
1337 ArrayView<const Packet> tmpSend =
1338 exports.view (startsTo_[p]*numPackets, lengthsTo_[p]*numPackets);
1340 if (sendType == Details::DISTRIBUTOR_SEND) {
1341 send<int, Packet> (tmpSend.getRawPtr (),
1342 as<int> (tmpSend.size ()),
1343 imagesTo_[p], tag, *comm_);
1345 else if (sendType == Details::DISTRIBUTOR_ISEND) {
1346 ArrayRCP<const Packet> tmpSendBuf =
1347 exports.persistingView (startsTo_[p] * numPackets,
1348 lengthsTo_[p] * numPackets);
1349 requests_.push_back (isend<int, Packet> (tmpSendBuf, imagesTo_[p],
1352 else if (sendType == Details::DISTRIBUTOR_RSEND) {
1353 readySend<int, Packet> (tmpSend.getRawPtr (),
1354 as<int> (tmpSend.size ()),
1355 imagesTo_[p], tag, *comm_);
1357 else if (sendType == Details::DISTRIBUTOR_SSEND) {
1358 ssend<int, Packet> (tmpSend.getRawPtr (),
1359 as<int> (tmpSend.size ()),
1360 imagesTo_[p], tag, *comm_);
1362 TEUCHOS_TEST_FOR_EXCEPTION(
1363 true, std::logic_error,
"Tpetra::Distributor::doPosts(3 args): " 1364 "Invalid send type. We should never get here. " 1365 "Please report this bug to the Tpetra developers.");
1369 std::ostringstream os;
1370 os << myImageID <<
": doPosts(3,fast): " 1371 <<
"Posted send to Proc " << imagesTo_[i]
1372 <<
" w/ specified tag " << tag << endl;
1389 std::copy (exports.begin()+startsTo_[selfNum]*numPackets,
1390 exports.begin()+startsTo_[selfNum]*numPackets+lengthsTo_[selfNum]*numPackets,
1391 imports.begin()+selfReceiveOffset);
1394 std::ostringstream os;
1395 os << myImageID <<
": doPosts(3,fast) done" << endl;
1401 std::ostringstream os;
1402 os << myImageID <<
": doPosts(3,slow): posting sends" << endl;
1408 ArrayRCP<Packet> sendArray (maxSendLength_ * numPackets);
1410 TEUCHOS_TEST_FOR_EXCEPTION(
1411 sendType == Details::DISTRIBUTOR_ISEND, std::logic_error,
1412 "Tpetra::Distributor::doPosts(3 args): The \"send buffer\" code path " 1413 <<
"doesn't currently work with nonblocking sends.");
1415 for (
size_t i = 0; i < numBlocks; ++i) {
1416 size_t p = i + imageIndex;
1417 if (p > (numBlocks - 1)) {
1421 if (imagesTo_[p] != myImageID) {
1422 typename ArrayView<const Packet>::iterator srcBegin, srcEnd;
1423 size_t sendArrayOffset = 0;
1424 size_t j = startsTo_[p];
1425 for (
size_t k = 0; k < lengthsTo_[p]; ++k, ++j) {
1426 srcBegin = exports.begin() + indicesTo_[j]*numPackets;
1427 srcEnd = srcBegin + numPackets;
1428 std::copy (srcBegin, srcEnd, sendArray.begin()+sendArrayOffset);
1429 sendArrayOffset += numPackets;
1431 ArrayView<const Packet> tmpSend =
1432 sendArray.view (0, lengthsTo_[p]*numPackets);
1434 if (sendType == Details::DISTRIBUTOR_SEND) {
1435 send<int, Packet> (tmpSend.getRawPtr (),
1436 as<int> (tmpSend.size ()),
1437 imagesTo_[p], tag, *comm_);
1439 else if (sendType == Details::DISTRIBUTOR_ISEND) {
1440 ArrayRCP<const Packet> tmpSendBuf =
1441 sendArray.persistingView (0, lengthsTo_[p] * numPackets);
1442 requests_.push_back (isend<int, Packet> (tmpSendBuf, imagesTo_[p],
1445 else if (sendType == Details::DISTRIBUTOR_RSEND) {
1446 readySend<int, Packet> (tmpSend.getRawPtr (),
1447 as<int> (tmpSend.size ()),
1448 imagesTo_[p], tag, *comm_);
1450 else if (sendType == Details::DISTRIBUTOR_SSEND) {
1451 ssend<int, Packet> (tmpSend.getRawPtr (),
1452 as<int> (tmpSend.size ()),
1453 imagesTo_[p], tag, *comm_);
1456 TEUCHOS_TEST_FOR_EXCEPTION(
1457 true, std::logic_error,
"Tpetra::Distributor::doPosts(3 args): " 1458 "Invalid send type. We should never get here. " 1459 "Please report this bug to the Tpetra developers.");
1463 std::ostringstream os;
1464 os << myImageID <<
": doPosts(3,slow): " 1465 <<
"Posted send to Proc " << imagesTo_[i]
1466 <<
" w/ specified tag " << tag << endl;
1472 selfIndex = startsTo_[p];
1477 for (
size_t k = 0; k < lengthsTo_[selfNum]; ++k) {
1478 std::copy (exports.begin()+indicesTo_[selfIndex]*numPackets,
1479 exports.begin()+indicesTo_[selfIndex]*numPackets + numPackets,
1480 imports.begin() + selfReceiveOffset);
1482 selfReceiveOffset += numPackets;
1486 std::ostringstream os;
1487 os << myImageID <<
": doPosts(3,slow) done" << endl;
1493 template <
class Packet>
1496 const ArrayView<size_t>& numExportPacketsPerLID,
1497 const ArrayRCP<Packet>& imports,
1498 const ArrayView<size_t>& numImportPacketsPerLID)
1500 using Teuchos::Array;
1502 using Teuchos::ireceive;
1503 using Teuchos::isend;
1504 using Teuchos::readySend;
1505 using Teuchos::send;
1506 using Teuchos::ssend;
1507 using Teuchos::TypeNameTraits;
1508 #ifdef HAVE_TEUCHOS_DEBUG 1509 using Teuchos::OSTab;
1510 #endif // HAVE_TEUCHOS_DEBUG 1512 typedef Array<size_t>::size_type size_type;
1514 Teuchos::OSTab tab (out_);
1516 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1517 Teuchos::TimeMonitor timeMon (*timer_doPosts4_);
1518 #endif // TPETRA_DISTRIBUTOR_TIMERS 1523 const bool doBarrier = barrierBetween_;
1549 TEUCHOS_TEST_FOR_EXCEPTION(
1550 sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier, std::logic_error,
1551 "Tpetra::Distributor::doPosts(4 args): Ready-send version requires a " 1552 "barrier between posting receives and posting ready sends. This should " 1553 "have been checked before. " 1554 "Please report this bug to the Tpetra developers.");
1556 const int myImageID = comm_->getRank ();
1557 size_t selfReceiveOffset = 0;
1559 #ifdef HAVE_TEUCHOS_DEBUG 1561 size_t totalNumImportPackets = 0;
1562 for (
int ii = 0; ii < numImportPacketsPerLID.size(); ++ii) {
1563 totalNumImportPackets += numImportPacketsPerLID[ii];
1565 TEUCHOS_TEST_FOR_EXCEPTION(
1566 static_cast<size_t> (imports.size ()) < totalNumImportPackets,
1567 std::runtime_error,
"Tpetra::Distributor::doPosts(4 args): The 'imports' " 1568 "array must have enough entries to hold the expected number of import " 1569 "packets. imports.size() = " << imports.size() <<
" < " 1570 "totalNumImportPackets = " << totalNumImportPackets <<
".");
1571 #endif // HAVE_TEUCHOS_DEBUG 1578 const int pathTag = 1;
1579 const int tag = this->getTag (pathTag);
1582 TEUCHOS_TEST_FOR_EXCEPTION(
1583 requests_.size () != 0, std::logic_error,
"Tpetra::Distributor::" 1584 "doPosts(4 args): Process " << myImageID <<
": requests_.size() = " 1585 << requests_.size () <<
" != 0.");
1586 std::ostringstream os;
1587 os << myImageID <<
": doPosts(4," 1588 << (indicesTo_.empty () ?
"fast" :
"slow") <<
")" << endl;
1605 const size_type actualNumReceives = as<size_type> (numReceives_) +
1606 as<size_type> (selfMessage_ ? 1 : 0);
1607 requests_.resize (0);
1615 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1616 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4_recvs_);
1617 #endif // TPETRA_DISTRIBUTOR_TIMERS 1619 size_t curBufferOffset = 0;
1620 size_t curLIDoffset = 0;
1621 for (size_type i = 0; i < actualNumReceives; ++i) {
1622 size_t totalPacketsFrom_i = 0;
1623 for (
size_t j = 0; j < lengthsFrom_[i]; ++j) {
1624 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
1626 curLIDoffset += lengthsFrom_[i];
1627 if (imagesFrom_[i] != myImageID && totalPacketsFrom_i) {
1636 ArrayRCP<Packet> recvBuf =
1637 imports.persistingView (curBufferOffset, totalPacketsFrom_i);
1638 requests_.push_back (ireceive<int, Packet> (recvBuf, imagesFrom_[i],
1642 selfReceiveOffset = curBufferOffset;
1644 curBufferOffset += totalPacketsFrom_i;
1649 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1650 Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts4_barrier_);
1651 #endif // TPETRA_DISTRIBUTOR_TIMERS 1660 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1661 Teuchos::TimeMonitor timeMonSends (*timer_doPosts4_sends_);
1662 #endif // TPETRA_DISTRIBUTOR_TIMERS 1666 Array<size_t> sendPacketOffsets(numSends_,0), packetsPerSend(numSends_,0);
1667 size_t maxNumPackets = 0;
1668 size_t curPKToffset = 0;
1669 for (
size_t pp=0; pp<numSends_; ++pp) {
1670 sendPacketOffsets[pp] = curPKToffset;
1671 size_t numPackets = 0;
1672 for (
size_t j=startsTo_[pp]; j<startsTo_[pp]+lengthsTo_[pp]; ++j) {
1673 numPackets += numExportPacketsPerLID[j];
1675 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
1676 packetsPerSend[pp] = numPackets;
1677 curPKToffset += numPackets;
1682 size_t numBlocks = numSends_+ selfMessage_;
1683 size_t imageIndex = 0;
1684 while ((imageIndex < numBlocks) && (imagesTo_[imageIndex] < myImageID)) {
1687 if (imageIndex == numBlocks) {
1692 size_t selfIndex = 0;
1694 if (indicesTo_.empty()) {
1696 std::ostringstream os;
1697 os << myImageID <<
": doPosts(4,fast): posting sends" << endl;
1703 for (
size_t i = 0; i < numBlocks; ++i) {
1704 size_t p = i + imageIndex;
1705 if (p > (numBlocks - 1)) {
1709 if (imagesTo_[p] != myImageID && packetsPerSend[p] > 0) {
1710 ArrayView<const Packet> tmpSend =
1711 exports.view (sendPacketOffsets[p], packetsPerSend[p]);
1713 if (sendType == Details::DISTRIBUTOR_SEND) {
1714 send<int, Packet> (tmpSend.getRawPtr (),
1715 as<int> (tmpSend.size ()),
1716 imagesTo_[p], tag, *comm_);
1718 else if (sendType == Details::DISTRIBUTOR_RSEND) {
1719 readySend<int, Packet> (tmpSend.getRawPtr (),
1720 as<int> (tmpSend.size ()),
1721 imagesTo_[p], tag, *comm_);
1723 else if (sendType == Details::DISTRIBUTOR_ISEND) {
1724 ArrayRCP<const Packet> tmpSendBuf =
1725 exports.persistingView (sendPacketOffsets[p], packetsPerSend[p]);
1726 requests_.push_back (isend<int, Packet> (tmpSendBuf, imagesTo_[p],
1729 else if (sendType == Details::DISTRIBUTOR_SSEND) {
1730 ssend<int, Packet> (tmpSend.getRawPtr (),
1731 as<int> (tmpSend.size ()),
1732 imagesTo_[p], tag, *comm_);
1735 TEUCHOS_TEST_FOR_EXCEPTION(
1736 true, std::logic_error,
"Tpetra::Distributor::doPosts(4 args): " 1737 "Invalid send type. We should never get here. Please report " 1738 "this bug to the Tpetra developers.");
1747 std::copy (exports.begin()+sendPacketOffsets[selfNum],
1748 exports.begin()+sendPacketOffsets[selfNum]+packetsPerSend[selfNum],
1749 imports.begin()+selfReceiveOffset);
1752 std::ostringstream os;
1753 os << myImageID <<
": doPosts(4,fast) done" << endl;
1759 std::ostringstream os;
1760 os << myImageID <<
": doPosts(4,slow): posting sends" << endl;
1765 ArrayRCP<Packet> sendArray (maxNumPackets);
1767 TEUCHOS_TEST_FOR_EXCEPTION(
1768 sendType == Details::DISTRIBUTOR_ISEND, std::logic_error,
1769 "Tpetra::Distributor::doPosts(3 args): The \"send buffer\" " 1770 "code path may not necessarily work with nonblocking sends.");
1772 Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
1774 for (
int j=0; j<numExportPacketsPerLID.size(); ++j) {
1775 indicesOffsets[j] = ioffset;
1776 ioffset += numExportPacketsPerLID[j];
1779 for (
size_t i = 0; i < numBlocks; ++i) {
1780 size_t p = i + imageIndex;
1781 if (p > (numBlocks - 1)) {
1785 if (imagesTo_[p] != myImageID) {
1786 typename ArrayView<const Packet>::iterator srcBegin, srcEnd;
1787 size_t sendArrayOffset = 0;
1788 size_t j = startsTo_[p];
1789 size_t numPacketsTo_p = 0;
1790 for (
size_t k = 0; k < lengthsTo_[p]; ++k, ++j) {
1791 srcBegin = exports.begin() + indicesOffsets[j];
1792 srcEnd = srcBegin + numExportPacketsPerLID[j];
1793 numPacketsTo_p += numExportPacketsPerLID[j];
1794 std::copy (srcBegin, srcEnd, sendArray.begin()+sendArrayOffset);
1795 sendArrayOffset += numExportPacketsPerLID[j];
1797 if (numPacketsTo_p > 0) {
1798 ArrayView<const Packet> tmpSend =
1799 sendArray.view (0, numPacketsTo_p);
1801 if (sendType == Details::DISTRIBUTOR_RSEND) {
1802 readySend<int, Packet> (tmpSend.getRawPtr (),
1803 as<int> (tmpSend.size ()),
1804 imagesTo_[p], tag, *comm_);
1806 else if (sendType == Details::DISTRIBUTOR_ISEND) {
1807 ArrayRCP<const Packet> tmpSendBuf =
1808 sendArray.persistingView (0, numPacketsTo_p);
1809 requests_.push_back (isend<int, Packet> (tmpSendBuf, imagesTo_[p],
1812 else if (sendType == Details::DISTRIBUTOR_SSEND) {
1813 ssend<int, Packet> (tmpSend.getRawPtr (),
1814 as<int> (tmpSend.size ()),
1815 imagesTo_[p], tag, *comm_);
1818 send<int, Packet> (tmpSend.getRawPtr (),
1819 as<int> (tmpSend.size ()),
1820 imagesTo_[p], tag, *comm_);
1826 selfIndex = startsTo_[p];
1831 for (
size_t k = 0; k < lengthsTo_[selfNum]; ++k) {
1832 std::copy (exports.begin()+indicesOffsets[selfIndex],
1833 exports.begin()+indicesOffsets[selfIndex]+numExportPacketsPerLID[selfIndex],
1834 imports.begin() + selfReceiveOffset);
1835 selfReceiveOffset += numExportPacketsPerLID[selfIndex];
1840 std::ostringstream os;
1841 os << myImageID <<
": doPosts(4,slow) done" << endl;
1847 template <
class Packet>
1851 const ArrayView<Packet>& imports)
1865 typedef typename ArrayRCP<const Packet>::size_type size_type;
1866 ArrayRCP<const Packet> exportsArcp (exports.getRawPtr(), as<size_type> (0),
1867 exports.size(),
false);
1872 doReversePosts (exportsArcp,
1874 arcp<Packet> (imports.getRawPtr (), 0, imports.size (),
false));
1877 lastRoundBytesSend_ = exports.size() *
sizeof(Packet);
1878 lastRoundBytesRecv_ = imports.size() *
sizeof(Packet);
1881 template <
class Packet>
1884 const ArrayView<size_t> &numExportPacketsPerLID,
1885 const ArrayView<Packet> &imports,
1886 const ArrayView<size_t> &numImportPacketsPerLID)
1889 using Teuchos::arcp;
1890 using Teuchos::ArrayRCP;
1892 TEUCHOS_TEST_FOR_EXCEPTION(
1893 requests_.size () != 0, std::runtime_error,
"Tpetra::Distributor::" 1894 "doReversePostsAndWaits(4 args): There are " << requests_.size ()
1895 <<
" outstanding nonblocking messages pending. It is incorrect to call " 1896 "this method with posts outstanding.");
1909 typedef typename ArrayRCP<const Packet>::size_type size_type;
1910 ArrayRCP<const Packet> exportsArcp (exports.getRawPtr (), as<size_type> (0),
1911 exports.size (),
false);
1912 doReversePosts (exportsArcp,
1913 numExportPacketsPerLID,
1914 arcp<Packet> (imports.getRawPtr (), 0, imports.size (),
false),
1915 numImportPacketsPerLID);
1918 lastRoundBytesSend_ = exports.size() *
sizeof(Packet);
1919 lastRoundBytesRecv_ = imports.size() *
sizeof(Packet);
1922 template <
class Packet>
1926 const ArrayRCP<Packet>& imports)
1929 TEUCHOS_TEST_FOR_EXCEPTION(
1930 ! indicesTo_.empty (), std::runtime_error,
1931 "Tpetra::Distributor::doReversePosts(3 args): Can only do reverse " 1932 "communication when original data are blocked by process.");
1933 if (reverseDistributor_.is_null ()) {
1934 createReverseDistributor ();
1936 reverseDistributor_->doPosts (exports, numPackets, imports);
1939 template <
class Packet>
1942 const ArrayView<size_t>& numExportPacketsPerLID,
1943 const ArrayRCP<Packet>& imports,
1944 const ArrayView<size_t>& numImportPacketsPerLID)
1947 TEUCHOS_TEST_FOR_EXCEPTION(
1948 ! indicesTo_.empty (), std::runtime_error,
1949 "Tpetra::Distributor::doReversePosts(3 args): Can only do reverse " 1950 "communication when original data are blocked by process.");
1951 if (reverseDistributor_.is_null ()) {
1952 createReverseDistributor ();
1954 reverseDistributor_->doPosts (exports, numExportPacketsPerLID,
1955 imports, numImportPacketsPerLID);
1958 template <
class Packet,
class Layout,
class Device,
class Mem>
1962 const Kokkos::View<Packet*, Layout, Device, Mem> &imports)
1971 typedef Kokkos::View<const Packet*, Layout, Device, Mem> exports_view;
1972 typedef Kokkos::View<Packet*, Layout, Device, Mem> imports_view;
1973 const bool can_access_from_host =
1974 Kokkos::Impl::VerifyExecutionCanAccessMemorySpace< Kokkos::HostSpace,
1975 typename exports_view::memory_space >::value;
1976 if (! enable_cuda_rdma_ && ! can_access_from_host) {
1977 typename exports_view::HostMirror host_exports =
1978 Kokkos::create_mirror_view (exports);
1979 typename imports_view::HostMirror host_imports =
1980 Kokkos::create_mirror_view (imports);
1982 doPostsAndWaits (Kokkos::Compat::create_const_view (host_exports),
1989 TEUCHOS_TEST_FOR_EXCEPTION(
1990 requests_.size () != 0, std::runtime_error,
"Tpetra::Distributor::" 1991 "doPostsAndWaits(3 args): There are " << requests_.size () <<
1992 " outstanding nonblocking messages pending. It is incorrect to call " 1993 "this method with posts outstanding.");
1995 doPosts (exports, numPackets, imports);
1999 template <
class Packet,
class Layout,
class Device,
class Mem>
2002 const ArrayView<size_t> &numExportPacketsPerLID,
2003 const Kokkos::View<Packet*, Layout, Device, Mem> &imports,
2004 const ArrayView<size_t> &numImportPacketsPerLID)
2012 typedef Kokkos::View<const Packet*, Layout, Device, Mem> exports_view;
2013 typedef Kokkos::View<Packet*, Layout, Device, Mem> imports_view;
2014 if (!enable_cuda_rdma_ && !exports_view::is_hostspace) {
2015 typename exports_view::HostMirror host_exports =
2016 Kokkos::create_mirror_view(exports);
2017 typename imports_view::HostMirror host_imports =
2018 Kokkos::create_mirror_view(imports);
2020 doPostsAndWaits(Kokkos::Compat::create_const_view(host_exports),
2021 numExportPacketsPerLID,
2023 numImportPacketsPerLID);
2028 TEUCHOS_TEST_FOR_EXCEPTION(
2029 requests_.size () != 0, std::runtime_error,
2030 "Tpetra::Distributor::doPostsAndWaits(4 args): There are " 2031 << requests_.size () <<
" outstanding nonblocking messages pending. " 2032 "It is incorrect to call this method with posts outstanding.");
2034 doPosts (exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
2039 template <
class Packet,
class Layout,
class Device,
class Mem>
2041 doPosts (
const Kokkos::View<const Packet*, Layout, Device, Mem> &exports,
2043 const Kokkos::View<Packet*, Layout, Device, Mem> &imports)
2045 using Teuchos::Array;
2047 using Teuchos::FancyOStream;
2048 using Teuchos::includesVerbLevel;
2049 using Teuchos::ireceive;
2050 using Teuchos::isend;
2051 using Teuchos::OSTab;
2052 using Teuchos::readySend;
2053 using Teuchos::send;
2054 using Teuchos::ssend;
2055 using Teuchos::TypeNameTraits;
2056 using Teuchos::typeName;
2058 using Kokkos::Compat::create_const_view;
2059 using Kokkos::Compat::create_view;
2060 using Kokkos::Compat::subview_offset;
2061 using Kokkos::Compat::deep_copy_offset;
2062 typedef Array<size_t>::size_type size_type;
2063 typedef Kokkos::View<const Packet*, Layout, Device, Mem> exports_view_type;
2064 typedef Kokkos::View<Packet*, Layout, Device, Mem> imports_view_type;
2066 Teuchos::OSTab tab (out_);
2068 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2069 Teuchos::TimeMonitor timeMon (*timer_doPosts3_);
2070 #endif // TPETRA_DISTRIBUTOR_TIMERS 2075 const bool doBarrier = barrierBetween_;
2103 TEUCHOS_TEST_FOR_EXCEPTION(
2104 sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier, std::logic_error,
2105 "Tpetra::Distributor::doPosts(3 args): Ready-send version requires a " 2106 "barrier between posting receives and posting ready sends. This should " 2107 "have been checked before. " 2108 "Please report this bug to the Tpetra developers.");
2110 const int myImageID = comm_->getRank ();
2111 size_t selfReceiveOffset = 0;
2114 const size_t totalNumImportPackets = totalReceiveLength_ * numPackets;
2115 TEUCHOS_TEST_FOR_EXCEPTION(
2116 static_cast<size_t> (imports.dimension_0 ()) < totalNumImportPackets,
2117 std::runtime_error,
"Tpetra::Distributor::doPosts(3 args): The 'imports' " 2118 "array must have enough entries to hold the expected number of import " 2119 "packets. imports.dimension_0() = " << imports.dimension_0 () <<
" < " 2120 "totalNumImportPackets = " << totalNumImportPackets <<
".");
2127 const int pathTag = 0;
2128 const int tag = this->getTag (pathTag);
2131 TEUCHOS_TEST_FOR_EXCEPTION(
2132 requests_.size () != 0, std::logic_error,
"Tpetra::Distributor::" 2133 "doPosts(3 args): Process " << myImageID <<
": requests_.size() = " 2134 << requests_.size () <<
" != 0.");
2135 std::ostringstream os;
2136 os << myImageID <<
": doPosts(3," 2137 << (indicesTo_.empty () ?
"fast" :
"slow") <<
")" << endl;
2154 const size_type actualNumReceives = as<size_type> (numReceives_) +
2155 as<size_type> (selfMessage_ ? 1 : 0);
2156 requests_.resize (0);
2164 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2165 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3_recvs_);
2166 #endif // TPETRA_DISTRIBUTOR_TIMERS 2168 size_t curBufferOffset = 0;
2169 for (size_type i = 0; i < actualNumReceives; ++i) {
2170 if (imagesFrom_[i] != myImageID) {
2178 imports_view_type recvBuf =
2179 subview_offset (imports, curBufferOffset, lengthsFrom_[i]*numPackets);
2180 requests_.push_back (ireceive<int> (recvBuf, imagesFrom_[i],
2183 std::ostringstream os;
2184 os << myImageID <<
": doPosts(3," 2185 << (indicesTo_.empty () ?
"fast" :
"slow") <<
"): " 2186 <<
"Posted irecv from Proc " << imagesFrom_[i] <<
" with " 2187 "specified tag " << tag << endl;
2192 selfReceiveOffset = curBufferOffset;
2194 curBufferOffset += lengthsFrom_[i]*numPackets;
2199 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2200 Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts3_barrier_);
2201 #endif // TPETRA_DISTRIBUTOR_TIMERS 2210 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2211 Teuchos::TimeMonitor timeMonSends (*timer_doPosts3_sends_);
2212 #endif // TPETRA_DISTRIBUTOR_TIMERS 2219 size_t numBlocks = numSends_ + selfMessage_;
2220 size_t imageIndex = 0;
2221 while ((imageIndex < numBlocks) && (imagesTo_[imageIndex] < myImageID)) {
2224 if (imageIndex == numBlocks) {
2229 size_t selfIndex = 0;
2231 if (indicesTo_.empty()) {
2233 std::ostringstream os;
2234 os << myImageID <<
": doPosts(3,fast): posting sends" << endl;
2240 for (
size_t i = 0; i < numBlocks; ++i) {
2241 size_t p = i + imageIndex;
2242 if (p > (numBlocks - 1)) {
2246 if (imagesTo_[p] != myImageID) {
2247 exports_view_type tmpSend = subview_offset(
2248 exports, startsTo_[p]*numPackets, lengthsTo_[p]*numPackets);
2250 if (sendType == Details::DISTRIBUTOR_SEND) {
2252 as<int> (tmpSend.size ()),
2253 imagesTo_[p], tag, *comm_);
2255 else if (sendType == Details::DISTRIBUTOR_ISEND) {
2256 exports_view_type tmpSendBuf =
2257 subview_offset (exports, startsTo_[p] * numPackets,
2258 lengthsTo_[p] * numPackets);
2259 requests_.push_back (isend<int> (tmpSendBuf, imagesTo_[p],
2262 else if (sendType == Details::DISTRIBUTOR_RSEND) {
2263 readySend<int> (tmpSend,
2264 as<int> (tmpSend.size ()),
2265 imagesTo_[p], tag, *comm_);
2267 else if (sendType == Details::DISTRIBUTOR_SSEND) {
2268 ssend<int> (tmpSend,
2269 as<int> (tmpSend.size ()),
2270 imagesTo_[p], tag, *comm_);
2272 TEUCHOS_TEST_FOR_EXCEPTION(
2273 true, std::logic_error,
"Tpetra::Distributor::doPosts(3 args): " 2274 "Invalid send type. We should never get here. " 2275 "Please report this bug to the Tpetra developers.");
2279 std::ostringstream os;
2280 os << myImageID <<
": doPosts(3,fast): " 2281 <<
"Posted send to Proc " << imagesTo_[i]
2282 <<
" w/ specified tag " << tag << endl;
2299 deep_copy_offset(imports, exports, selfReceiveOffset,
2300 startsTo_[selfNum]*numPackets,
2301 lengthsTo_[selfNum]*numPackets);
2304 std::ostringstream os;
2305 os << myImageID <<
": doPosts(3,fast) done" << endl;
2311 std::ostringstream os;
2312 os << myImageID <<
": doPosts(3,slow): posting sends" << endl;
2318 Kokkos::View<Packet*,Layout,Device,Mem> sendArray =
2319 create_view<Packet,Layout,Device,Mem> (
"sendArray",
2320 maxSendLength_ * numPackets);
2322 TEUCHOS_TEST_FOR_EXCEPTION(
2323 sendType == Details::DISTRIBUTOR_ISEND, std::logic_error,
2324 "Tpetra::Distributor::doPosts(3 args): The \"send buffer\" code path " 2325 "doesn't currently work with nonblocking sends.");
2327 for (
size_t i = 0; i < numBlocks; ++i) {
2328 size_t p = i + imageIndex;
2329 if (p > (numBlocks - 1)) {
2333 if (imagesTo_[p] != myImageID) {
2334 size_t sendArrayOffset = 0;
2335 size_t j = startsTo_[p];
2336 for (
size_t k = 0; k < lengthsTo_[p]; ++k, ++j) {
2337 deep_copy_offset(sendArray, exports, sendArrayOffset,
2338 indicesTo_[j]*numPackets, numPackets);
2339 sendArrayOffset += numPackets;
2341 Kokkos::View<Packet*, Layout, Device, Mem> tmpSend =
2342 subview_offset(sendArray,
size_t(0), lengthsTo_[p]*numPackets);
2344 if (sendType == Details::DISTRIBUTOR_SEND) {
2346 as<int> (tmpSend.size ()),
2347 imagesTo_[p], tag, *comm_);
2349 else if (sendType == Details::DISTRIBUTOR_ISEND) {
2350 exports_view_type tmpSendBuf =
2351 subview_offset (sendArray,
size_t(0), lengthsTo_[p] * numPackets);
2352 requests_.push_back (isend<int> (tmpSendBuf, imagesTo_[p],
2355 else if (sendType == Details::DISTRIBUTOR_RSEND) {
2356 readySend<int> (tmpSend,
2357 as<int> (tmpSend.size ()),
2358 imagesTo_[p], tag, *comm_);
2360 else if (sendType == Details::DISTRIBUTOR_SSEND) {
2361 ssend<int> (tmpSend,
2362 as<int> (tmpSend.size ()),
2363 imagesTo_[p], tag, *comm_);
2366 TEUCHOS_TEST_FOR_EXCEPTION(
2367 true, std::logic_error,
"Tpetra::Distributor::doPosts(3 args): " 2368 "Invalid send type. We should never get here. " 2369 "Please report this bug to the Tpetra developers.");
2373 std::ostringstream os;
2374 os << myImageID <<
": doPosts(3,slow): " 2375 <<
"Posted send to Proc " << imagesTo_[i]
2376 <<
" w/ specified tag " << tag << endl;
2382 selfIndex = startsTo_[p];
2387 for (
size_t k = 0; k < lengthsTo_[selfNum]; ++k) {
2388 deep_copy_offset(imports, exports, selfReceiveOffset,
2389 indicesTo_[selfIndex]*numPackets, numPackets);
2391 selfReceiveOffset += numPackets;
2395 std::ostringstream os;
2396 os << myImageID <<
": doPosts(3,slow) done" << endl;
2402 template <
class Packet,
class Layout,
class Device,
class Mem>
2404 doPosts (
const Kokkos::View<const Packet*, Layout, Device, Mem> &exports,
2405 const ArrayView<size_t> &numExportPacketsPerLID,
2406 const Kokkos::View<Packet*, Layout, Device, Mem> &imports,
2407 const ArrayView<size_t> &numImportPacketsPerLID)
2409 using Teuchos::Array;
2411 using Teuchos::ireceive;
2412 using Teuchos::isend;
2413 using Teuchos::readySend;
2414 using Teuchos::send;
2415 using Teuchos::ssend;
2416 using Teuchos::TypeNameTraits;
2417 #ifdef HAVE_TEUCHOS_DEBUG 2418 using Teuchos::OSTab;
2419 #endif // HAVE_TEUCHOS_DEBUG 2421 using Kokkos::Compat::create_const_view;
2422 using Kokkos::Compat::create_view;
2423 using Kokkos::Compat::subview_offset;
2424 using Kokkos::Compat::deep_copy_offset;
2425 typedef Array<size_t>::size_type size_type;
2426 typedef Kokkos::View<const Packet*, Layout, Device, Mem> exports_view_type;
2427 typedef Kokkos::View<Packet*, Layout, Device, Mem> imports_view_type;
2429 Teuchos::OSTab tab (out_);
2431 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2432 Teuchos::TimeMonitor timeMon (*timer_doPosts4_);
2433 #endif // TPETRA_DISTRIBUTOR_TIMERS 2438 const bool doBarrier = barrierBetween_;
2464 TEUCHOS_TEST_FOR_EXCEPTION(
2465 sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier,
2466 std::logic_error,
"Tpetra::Distributor::doPosts(4 args): Ready-send " 2467 "version requires a barrier between posting receives and posting ready " 2468 "sends. This should have been checked before. " 2469 "Please report this bug to the Tpetra developers.");
2471 const int myImageID = comm_->getRank ();
2472 size_t selfReceiveOffset = 0;
2474 #ifdef HAVE_TEUCHOS_DEBUG 2476 size_t totalNumImportPackets = 0;
2477 for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
2478 totalNumImportPackets += numImportPacketsPerLID[ii];
2480 TEUCHOS_TEST_FOR_EXCEPTION(
2481 imports.dimension_0 () < totalNumImportPackets, std::runtime_error,
2482 "Tpetra::Distributor::doPosts(4 args): The 'imports' array must have " 2483 "enough entries to hold the expected number of import packets. " 2484 "imports.dimension_0() = " << imports.dimension_0 () <<
" < " 2485 "totalNumImportPackets = " << totalNumImportPackets <<
".");
2486 #endif // HAVE_TEUCHOS_DEBUG 2493 const int pathTag = 1;
2494 const int tag = this->getTag (pathTag);
2497 TEUCHOS_TEST_FOR_EXCEPTION(
2498 requests_.size () != 0, std::logic_error,
"Tpetra::Distributor::" 2499 "doPosts(4 args): Process " << myImageID <<
": requests_.size () = " 2500 << requests_.size () <<
" != 0.");
2501 std::ostringstream os;
2502 os << myImageID <<
": doPosts(4," 2503 << (indicesTo_.empty () ?
"fast" :
"slow") <<
")" << endl;
2520 const size_type actualNumReceives = as<size_type> (numReceives_) +
2521 as<size_type> (selfMessage_ ? 1 : 0);
2522 requests_.resize (0);
2530 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2531 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4_recvs_);
2532 #endif // TPETRA_DISTRIBUTOR_TIMERS 2534 size_t curBufferOffset = 0;
2535 size_t curLIDoffset = 0;
2536 for (size_type i = 0; i < actualNumReceives; ++i) {
2537 size_t totalPacketsFrom_i = 0;
2538 for (
size_t j = 0; j < lengthsFrom_[i]; ++j) {
2539 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
2541 curLIDoffset += lengthsFrom_[i];
2542 if (imagesFrom_[i] != myImageID && totalPacketsFrom_i) {
2551 imports_view_type recvBuf =
2552 subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
2553 requests_.push_back (ireceive<int> (recvBuf, imagesFrom_[i],
2557 selfReceiveOffset = curBufferOffset;
2559 curBufferOffset += totalPacketsFrom_i;
2564 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2565 Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts4_barrier_);
2566 #endif // TPETRA_DISTRIBUTOR_TIMERS 2575 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2576 Teuchos::TimeMonitor timeMonSends (*timer_doPosts4_sends_);
2577 #endif // TPETRA_DISTRIBUTOR_TIMERS 2581 Array<size_t> sendPacketOffsets(numSends_,0), packetsPerSend(numSends_,0);
2582 size_t maxNumPackets = 0;
2583 size_t curPKToffset = 0;
2584 for (
size_t pp=0; pp<numSends_; ++pp) {
2585 sendPacketOffsets[pp] = curPKToffset;
2586 size_t numPackets = 0;
2587 for (
size_t j=startsTo_[pp]; j<startsTo_[pp]+lengthsTo_[pp]; ++j) {
2588 numPackets += numExportPacketsPerLID[j];
2590 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
2591 packetsPerSend[pp] = numPackets;
2592 curPKToffset += numPackets;
2597 size_t numBlocks = numSends_+ selfMessage_;
2598 size_t imageIndex = 0;
2599 while ((imageIndex < numBlocks) && (imagesTo_[imageIndex] < myImageID)) {
2602 if (imageIndex == numBlocks) {
2607 size_t selfIndex = 0;
2609 if (indicesTo_.empty()) {
2611 std::ostringstream os;
2612 os << myImageID <<
": doPosts(4,fast): posting sends" << endl;
2618 for (
size_t i = 0; i < numBlocks; ++i) {
2619 size_t p = i + imageIndex;
2620 if (p > (numBlocks - 1)) {
2624 if (imagesTo_[p] != myImageID && packetsPerSend[p] > 0) {
2625 exports_view_type tmpSend =
2626 subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
2628 if (sendType == Details::DISTRIBUTOR_SEND) {
2630 as<int> (tmpSend.size ()),
2631 imagesTo_[p], tag, *comm_);
2633 else if (sendType == Details::DISTRIBUTOR_RSEND) {
2634 readySend<int> (tmpSend,
2635 as<int> (tmpSend.size ()),
2636 imagesTo_[p], tag, *comm_);
2638 else if (sendType == Details::DISTRIBUTOR_ISEND) {
2639 exports_view_type tmpSendBuf =
2640 subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
2641 requests_.push_back (isend<int> (tmpSendBuf, imagesTo_[p],
2644 else if (sendType == Details::DISTRIBUTOR_SSEND) {
2645 ssend<int> (tmpSend,
2646 as<int> (tmpSend.size ()),
2647 imagesTo_[p], tag, *comm_);
2650 TEUCHOS_TEST_FOR_EXCEPTION(
2651 true, std::logic_error,
"Tpetra::Distributor::doPosts(4 args): " 2652 "Invalid send type. We should never get here. " 2653 "Please report this bug to the Tpetra developers.");
2662 deep_copy_offset(imports, exports, selfReceiveOffset,
2663 sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
2666 std::ostringstream os;
2667 os << myImageID <<
": doPosts(4,fast) done" << endl;
2673 std::ostringstream os;
2674 os << myImageID <<
": doPosts(4,slow): posting sends" << endl;
2679 Kokkos::View<Packet*,Layout,Device,Mem> sendArray =
2680 create_view<Packet,Layout,Device,Mem>(
"sendArray", maxNumPackets);
2682 TEUCHOS_TEST_FOR_EXCEPTION(
2683 sendType == Details::DISTRIBUTOR_ISEND, std::logic_error,
2684 "Tpetra::Distributor::doPosts(3 args): The \"send buffer\" code path " 2685 "may not necessarily work with nonblocking sends.");
2687 Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
2689 for (
int j=0; j<numExportPacketsPerLID.size(); ++j) {
2690 indicesOffsets[j] = ioffset;
2691 ioffset += numExportPacketsPerLID[j];
2694 for (
size_t i = 0; i < numBlocks; ++i) {
2695 size_t p = i + imageIndex;
2696 if (p > (numBlocks - 1)) {
2700 if (imagesTo_[p] != myImageID) {
2701 size_t sendArrayOffset = 0;
2702 size_t j = startsTo_[p];
2703 size_t numPacketsTo_p = 0;
2704 for (
size_t k = 0; k < lengthsTo_[p]; ++k, ++j) {
2705 deep_copy_offset(sendArray, exports, sendArrayOffset,
2706 indicesOffsets[j], numExportPacketsPerLID[j]);
2707 sendArrayOffset += numExportPacketsPerLID[j];
2709 if (numPacketsTo_p > 0) {
2710 Kokkos::View<Packet*, Layout, Device, Mem> tmpSend =
2711 subview_offset(sendArray,
size_t(0), numPacketsTo_p);
2713 if (sendType == Details::DISTRIBUTOR_RSEND) {
2714 readySend<int> (tmpSend,
2715 as<int> (tmpSend.size ()),
2716 imagesTo_[p], tag, *comm_);
2718 else if (sendType == Details::DISTRIBUTOR_ISEND) {
2719 exports_view_type tmpSendBuf =
2720 subview_offset (sendArray,
size_t(0), numPacketsTo_p);
2721 requests_.push_back (isend<int> (tmpSendBuf, imagesTo_[p],
2724 else if (sendType == Details::DISTRIBUTOR_SSEND) {
2725 ssend<int> (tmpSend,
2726 as<int> (tmpSend.size ()),
2727 imagesTo_[p], tag, *comm_);
2731 as<int> (tmpSend.size ()),
2732 imagesTo_[p], tag, *comm_);
2738 selfIndex = startsTo_[p];
2743 for (
size_t k = 0; k < lengthsTo_[selfNum]; ++k) {
2744 deep_copy_offset(imports, exports, selfReceiveOffset,
2745 indicesOffsets[selfIndex],
2746 numExportPacketsPerLID[selfIndex]);
2747 selfReceiveOffset += numExportPacketsPerLID[selfIndex];
2752 std::ostringstream os;
2753 os << myImageID <<
": doPosts(4,slow) done" << endl;
2759 template <
class Packet,
class Layout,
class Device,
class Mem>
2763 const Kokkos::View<Packet*, Layout, Device, Mem> &imports)
2771 typedef Kokkos::View<const Packet*, Layout, Device, Mem> exports_view;
2772 typedef Kokkos::View<Packet*, Layout, Device, Mem> imports_view;
2773 if (!enable_cuda_rdma_ && !exports_view::is_hostspace) {
2774 typename exports_view::HostMirror host_exports =
2775 Kokkos::create_mirror_view(exports);
2776 typename imports_view::HostMirror host_imports =
2777 Kokkos::create_mirror_view(imports);
2779 doPostsAndWaits(Kokkos::Compat::create_const_view(host_exports),
2786 doReversePosts (exports, numPackets, imports);
2790 template <
class Packet,
class Layout,
class Device,
class Mem>
2793 const ArrayView<size_t> &numExportPacketsPerLID,
2794 const Kokkos::View<Packet*, Layout, Device, Mem> &imports,
2795 const ArrayView<size_t> &numImportPacketsPerLID)
2803 typedef Kokkos::View<const Packet*, Layout, Device, Mem> exports_view;
2804 typedef Kokkos::View<Packet*, Layout, Device, Mem> imports_view;
2805 if (!enable_cuda_rdma_ && !exports_view::is_hostspace) {
2806 typename exports_view::HostMirror host_exports =
2807 Kokkos::create_mirror_view(exports);
2808 typename imports_view::HostMirror host_imports =
2809 Kokkos::create_mirror_view(imports);
2811 doPostsAndWaits(Kokkos::Compat::create_const_view(host_exports),
2812 numExportPacketsPerLID,
2814 numImportPacketsPerLID);
2819 TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
2820 "Tpetra::Distributor::doReversePostsAndWaits(4 args): There are " 2821 << requests_.size() <<
" outstanding nonblocking messages pending. It " 2822 "is incorrect to call this method with posts outstanding.");
2824 doReversePosts (exports, numExportPacketsPerLID, imports,
2825 numImportPacketsPerLID);
2829 template <
class Packet,
class Layout,
class Device,
class Mem>
2833 const Kokkos::View<Packet*, Layout, Device, Mem> &imports)
2836 TEUCHOS_TEST_FOR_EXCEPTION(
2837 ! indicesTo_.empty (), std::runtime_error,
2838 "Tpetra::Distributor::doReversePosts(3 args): Can only do " 2839 "reverse communication when original data are blocked by process.");
2840 if (reverseDistributor_.is_null ()) {
2841 createReverseDistributor ();
2843 reverseDistributor_->doPosts (exports, numPackets, imports);
2846 template <
class Packet,
class Layout,
class Device,
class Mem>
2849 const ArrayView<size_t> &numExportPacketsPerLID,
2850 const Kokkos::View<Packet*, Layout, Device, Mem> &imports,
2851 const ArrayView<size_t> &numImportPacketsPerLID)
2854 TEUCHOS_TEST_FOR_EXCEPTION(
2855 ! indicesTo_.empty (), std::runtime_error,
2856 "Tpetra::Distributor::doReversePosts(3 args): Can only do " 2857 "reverse communication when original data are blocked by process.");
2858 if (reverseDistributor_.is_null ()) {
2859 createReverseDistributor ();
2861 reverseDistributor_->doPosts (exports, numExportPacketsPerLID,
2862 imports, numImportPacketsPerLID);
2865 template <
class OrdinalType>
2867 computeSends (
const ArrayView<const OrdinalType> & importIDs,
2868 const ArrayView<const int> & importNodeIDs,
2869 Array<OrdinalType> & exportIDs,
2870 Array<int> & exportNodeIDs)
2880 typedef typename ArrayView<const OrdinalType>::size_type size_type;
2882 Teuchos::OSTab tab (out_);
2883 const int myRank = comm_->getRank ();
2885 std::ostringstream os;
2886 os << myRank <<
": computeSends" << endl;
2890 TEUCHOS_TEST_FOR_EXCEPTION(
2891 importIDs.size () != importNodeIDs.size (), std::invalid_argument,
2892 "Tpetra::Distributor::computeSends: On Process " << myRank <<
": " 2893 "importNodeIDs.size() = " << importNodeIDs.size ()
2894 <<
" != importIDs.size() = " << importIDs.size () <<
".");
2896 const size_type numImports = importNodeIDs.size ();
2897 Array<size_t> importObjs (2*numImports);
2899 for (size_type i = 0; i < numImports; ++i) {
2900 importObjs[2*i] =
static_cast<size_t> (importIDs[i]);
2901 importObjs[2*i+1] =
static_cast<size_t> (myRank);
2909 std::ostringstream os;
2910 os << myRank <<
": computeSends: tempPlan.createFromSends" << endl;
2916 const size_t numExportsAsSizeT = tempPlan.
createFromSends (importNodeIDs);
2917 const size_type numExports =
static_cast<size_type
> (numExportsAsSizeT);
2918 TEUCHOS_TEST_FOR_EXCEPTION(
2919 numExports < 0, std::logic_error,
"Tpetra::Distributor::computeSends: " 2920 "tempPlan.createFromSends() returned numExports = " << numExportsAsSizeT
2921 <<
" as a size_t, which overflows to " << numExports <<
" when cast to " 2922 << Teuchos::TypeNameTraits<size_type>::name () <<
". " 2923 "Please report this bug to the Tpetra developers.");
2924 TEUCHOS_TEST_FOR_EXCEPTION(
2926 std::logic_error,
"Tpetra::Distributor::computeSends: tempPlan.getTotal" 2928 "Exports = " << numExports <<
". Please report this bug to the " 2929 "Tpetra developers.");
2931 if (numExports > 0) {
2932 exportIDs.resize (numExports);
2933 exportNodeIDs.resize (numExports);
2944 TEUCHOS_TEST_FOR_EXCEPTION(
2945 sizeof (
size_t) <
sizeof (OrdinalType), std::logic_error,
2946 "Tpetra::Distributor::computeSends: sizeof(size_t) = " <<
sizeof(
size_t)
2947 <<
" < sizeof(" << Teuchos::TypeNameTraits<OrdinalType>::name () <<
") = " 2948 <<
sizeof (OrdinalType) <<
". This violates an assumption of the " 2949 "method. It's not hard to work around (just use Array<OrdinalType> as " 2950 "the export buffer, not Array<size_t>), but we haven't done that yet. " 2951 "Please report this bug to the Tpetra developers.");
2953 TEUCHOS_TEST_FOR_EXCEPTION(
2956 "Tpetra::Distributor::computeSends: tempPlan.getTotalReceiveLength() = " 2958 <<
". Please report this bug to the Tpetra developers.");
2962 std::ostringstream os;
2963 os << myRank <<
": computeSends: tempPlan.doPostsAndWaits" << endl;
2969 for (size_type i = 0; i < numExports; ++i) {
2970 exportIDs[i] =
static_cast<OrdinalType
> (exportObjs[2*i]);
2971 exportNodeIDs[i] =
static_cast<int> (exportObjs[2*i+1]);
2975 std::ostringstream os;
2976 os << myRank <<
": computeSends done" << endl;
2981 template <
class OrdinalType>
2984 const ArrayView<const int> &remoteImageIDs,
2985 Array<OrdinalType> &exportGIDs,
2986 Array<int> &exportNodeIDs)
2990 Teuchos::OSTab tab (out_);
2991 const int myRank = comm_->getRank();
2994 *out_ << myRank <<
": createFromRecvs" << endl;
2997 #ifdef HAVE_TPETRA_DEBUG 2998 using Teuchos::outArg;
2999 using Teuchos::reduceAll;
3004 (remoteIDs.size () != remoteImageIDs.size ()) ? myRank : -1;
3005 int maxErrProc = -1;
3006 reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX, errProc, outArg (maxErrProc));
3007 TEUCHOS_TEST_FOR_EXCEPTION(maxErrProc != -1, std::runtime_error,
3008 Teuchos::typeName (*
this) <<
"::createFromRecvs(): lists of remote IDs " 3009 "and remote process IDs must have the same size on all participating " 3010 "processes. Maximum process ID with error: " << maxErrProc <<
".");
3011 #else // NOT HAVE_TPETRA_DEBUG 3014 TEUCHOS_TEST_FOR_EXCEPTION(
3015 remoteIDs.size () != remoteImageIDs.size (), std::invalid_argument,
3016 Teuchos::typeName (*
this) <<
"::createFromRecvs<" <<
3017 Teuchos::TypeNameTraits<OrdinalType>::name () <<
">(): On Process " <<
3018 myRank <<
": remoteIDs.size() = " << remoteIDs.size () <<
" != " 3019 "remoteImageIDs.size() = " << remoteImageIDs.size () <<
".");
3020 #endif // HAVE_TPETRA_DEBUG 3022 computeSends (remoteIDs, remoteImageIDs, exportGIDs, exportNodeIDs);
3024 const size_t numProcsSendingToMe = createFromSends (exportNodeIDs ());
3031 std::ostringstream os;
3032 os <<
"Proc " << myRank <<
": {numProcsSendingToMe: " 3033 << numProcsSendingToMe <<
", remoteImageIDs.size(): " 3034 << remoteImageIDs.size () <<
", selfMessage_: " 3035 << (selfMessage_ ?
"true" :
"false") <<
"}" << std::endl;
3036 std::cerr << os.str ();
3040 *out_ << myRank <<
": createFromRecvs done" << endl;
3043 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
3048 #endif // TPETRA_DISTRIBUTOR_HPP Namespace Tpetra contains the class and methods constituting the Tpetra library.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
void doPosts(const ArrayRCP< const Packet > &exports, size_t numPackets, const ArrayRCP< Packet > &imports)
Post the data for a forward plan, but do not execute the waits yet.
void deep_copy(MultiVector< DS, DL, DG, DN, dstClassic > &dst, const MultiVector< SS, SL, SG, SN, srcClassic > &src)
Copy the contents of the MultiVector src into dst.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Implementation details of Tpetra.
Details::EDistributorHowInitialized howInitialized() const
Return an enum indicating whether and how a Distributor was initialized.
void doPostsAndWaits(const ArrayView< const Packet > &exports, size_t numPackets, const ArrayView< Packet > &imports)
Execute the (forward) communication plan.
Sets up and executes a communication plan for a Tpetra DistObject.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
size_t createFromSends(const ArrayView< const int > &exportNodeIDs)
Set up Distributor using list of process ranks to which this process will send.
void doReversePosts(const ArrayRCP< const Packet > &exports, size_t numPackets, const ArrayRCP< Packet > &imports)
Post the data for a reverse plan, but do not execute the waits yet.
void doReversePostsAndWaits(const ArrayView< const Packet > &exports, size_t numPackets, const ArrayView< Packet > &imports)
Execute the reverse communication plan.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
void getLastDoStatistics(size_t &bytes_sent, size_t &bytes_recvd) const
Information on the last call to do/doReverse.
Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void createFromRecvs(const ArrayView< const Ordinal > &remoteIDs, const ArrayView< const int > &remoteNodeIDs, Array< Ordinal > &exportIDs, Array< int > &exportNodeIDs)
Set up Distributor using list of process ranks from which to receive.
EDistributorSendType
The type of MPI send that Distributor should use.