dune-common 2.8.0
Loading...
Searching...
No Matches
communicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3#ifndef DUNE_COMMUNICATOR
4#define DUNE_COMMUNICATOR
5
6#if HAVE_MPI
7
8#include <cassert>
9#include <cstddef>
10#include <iostream>
11#include <map>
12#include <type_traits>
13#include <utility>
14
15#include <mpi.h>
16
21
22namespace Dune
23{
107 struct SizeOne
108 {};
109
116 {};
117
118
124 template<class V>
126 {
138 typedef V Type;
139
145 typedef typename V::value_type IndexedType;
146
152
161 static const void* getAddress(const V& v, int index);
162
168 static int getSize(const V&, int index);
169 };
170
171 template<class K, int n> class FieldVector;
172
173 template<class B, class A> class VariableBlockVector;
174
175 template<class K, class A, int n>
177 {
179
180 typedef typename Type::B IndexedType;
181
183
184 static const void* getAddress(const Type& v, int i);
185
186 static int getSize(const Type& v, int i);
187 };
188
193 {};
194
198 template<class T>
200 {
202
203 static const IndexedType& gather(const T& vec, std::size_t i);
204
205 static void scatter(T& vec, const IndexedType& v, std::size_t i);
206
207 };
208
220 template<typename T>
221 class DatatypeCommunicator : public InterfaceBuilder
222 {
223 public:
224
228 typedef T ParallelIndexSet;
229
234
238 typedef typename RemoteIndices::GlobalIndex GlobalIndex;
239
243 typedef typename RemoteIndices::Attribute Attribute;
244
248 typedef typename RemoteIndices::LocalIndex LocalIndex;
249
253 DatatypeCommunicator();
254
258 ~DatatypeCommunicator();
259
286 template<class T1, class T2, class V>
287 void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
288
292 void forward();
293
297 void backward();
298
302 void free();
303 private:
304 enum {
308 commTag_ = 234
309 };
310
314 const RemoteIndices* remoteIndices_;
315
317 MessageTypeMap;
318
322 MessageTypeMap messageTypes;
323
327 void* data_;
328
329 MPI_Request* requests_[2];
330
334 bool created_;
335
339 template<class V, bool FORWARD>
340 void createRequests(V& sendData, V& receiveData);
341
345 template<class T1, class T2, class V, bool send>
346 void createDataTypes(const T1& source, const T2& destination, V& data);
347
351 void sendRecv(MPI_Request* req);
352
356 struct IndexedTypeInformation
357 {
363 void build(int i)
364 {
365 length = new int[i];
366 displ = new MPI_Aint[i];
367 size = i;
368 }
369
373 void free()
374 {
375 delete[] length;
376 delete[] displ;
377 }
379 int* length;
381 MPI_Aint* displ;
387 int elements;
391 int size;
392 };
393
399 template<class V>
400 struct MPIDatatypeInformation
401 {
406 MPIDatatypeInformation(const V& data) : data_(data)
407 {}
408
414 void reserve(int proc, int size)
415 {
416 information_[proc].build(size);
417 }
424 void add(int proc, int local)
425 {
426 IndexedTypeInformation& info=information_[proc];
427 assert((info.elements)<info.size);
428 MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
429 info.displ+info.elements);
430 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
431 info.elements++;
432 }
433
442 const V& data_;
443
444 };
445
446 };
447
458 {
459
460 public:
465
472 template<class Data, class Interface>
474 build(const Interface& interface);
475
483 template<class Data, class Interface>
484 void build(const Data& source, const Data& target, const Interface& interface);
485
514 template<class GatherScatter, class Data>
515 void forward(const Data& source, Data& dest);
516
545 template<class GatherScatter, class Data>
546 void backward(Data& source, const Data& dest);
547
573 template<class GatherScatter, class Data>
574 void forward(Data& data);
575
601 template<class GatherScatter, class Data>
602 void backward(Data& data);
603
607 void free();
608
613
614 private:
615
621
622
626 template<class Data, typename IndexedTypeFlag>
627 struct MessageSizeCalculator
628 {};
629
634 template<class Data>
635 struct MessageSizeCalculator<Data,SizeOne>
636 {
643 inline int operator()(const InterfaceInformation& info) const;
652 inline int operator()(const Data& data, const InterfaceInformation& info) const;
653 };
654
659 template<class Data>
660 struct MessageSizeCalculator<Data,VariableSize>
661 {
670 inline int operator()(const Data& data, const InterfaceInformation& info) const;
671 };
672
676 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
677 struct MessageGatherer
678 {};
679
684 template<class Data, class GatherScatter, bool send>
685 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
686 {
688 typedef typename CommPolicy<Data>::IndexedType Type;
689
694 typedef GatherScatter Gatherer;
695
696 enum {
702 forward=send
703 };
704
712 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
713 };
714
719 template<class Data, class GatherScatter, bool send>
720 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
721 {
723 typedef typename CommPolicy<Data>::IndexedType Type;
724
729 typedef GatherScatter Gatherer;
730
731 enum {
737 forward=send
738 };
739
747 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
748 };
749
753 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
754 struct MessageScatterer
755 {};
756
761 template<class Data, class GatherScatter, bool send>
762 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
763 {
765 typedef typename CommPolicy<Data>::IndexedType Type;
766
771 typedef GatherScatter Scatterer;
772
773 enum {
779 forward=send
780 };
781
789 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
790 };
795 template<class Data, class GatherScatter, bool send>
796 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
797 {
799 typedef typename CommPolicy<Data>::IndexedType Type;
800
805 typedef GatherScatter Scatterer;
806
807 enum {
813 forward=send
814 };
815
823 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
824 };
825
829 struct MessageInformation
830 {
832 MessageInformation()
833 : start_(0), size_(0)
834 {}
835
843 MessageInformation(size_t start, size_t size)
844 : start_(start), size_(size)
845 {}
849 size_t start_;
853 size_t size_;
854 };
855
863 InformationMap;
867 InformationMap messageInformation_;
871 char* buffers_[2];
875 size_t bufferSize_[2];
876
877 enum {
881 commTag_
882 };
883
888
889 MPI_Comm communicator_;
890
894 template<class GatherScatter, bool FORWARD, class Data>
895 void sendRecv(const Data& source, Data& target);
896
897 };
898
899#ifndef DOXYGEN
900
901 template<class V>
902 inline const void* CommPolicy<V>::getAddress(const V& v, int index)
903 {
904 return &(v[index]);
905 }
906
907 template<class V>
908 inline int CommPolicy<V>::getSize([[maybe_unused]] const V& v, [[maybe_unused]] int index)
909 {
910 return 1;
911 }
912
913 template<class K, class A, int n>
914 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
915 {
916 return &(v[index][0]);
917 }
918
919 template<class K, class A, int n>
920 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
921 {
922 return v[index].getsize();
923 }
924
925 template<class T>
926 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
927 {
928 return vec[i];
929 }
930
931 template<class T>
932 inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
933 {
934 vec[i]=v;
935 }
936
937 template<typename T>
938 DatatypeCommunicator<T>::DatatypeCommunicator()
939 : remoteIndices_(0), created_(false)
940 {
941 requests_[0]=0;
942 requests_[1]=0;
943 }
944
945
946
947 template<typename T>
948 DatatypeCommunicator<T>::~DatatypeCommunicator()
949 {
950 free();
951 }
952
953 template<typename T>
954 template<class T1, class T2, class V>
955 inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
956 const T1& source, V& sendData,
957 const T2& destination, V& receiveData)
958 {
959 remoteIndices_ = &remoteIndices;
960 free();
961 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
962 createDataTypes<T1,T2,V,true>(source,destination, sendData);
963 createRequests<V,true>(sendData, receiveData);
964 createRequests<V,false>(receiveData, sendData);
965 created_=true;
966 }
967
968 template<typename T>
969 void DatatypeCommunicator<T>::free()
970 {
971 if(created_) {
972 delete[] requests_[0];
973 delete[] requests_[1];
974 typedef MessageTypeMap::iterator iterator;
975 typedef MessageTypeMap::const_iterator const_iterator;
976
977 const const_iterator end=messageTypes.end();
978
979 for(iterator process = messageTypes.begin(); process != end; ++process) {
980 MPI_Datatype *type = &(process->second.first);
981 int finalized=0;
982 MPI_Finalized(&finalized);
983 if(*type!=MPI_DATATYPE_NULL && !finalized)
984 MPI_Type_free(type);
985 type = &(process->second.second);
986 if(*type!=MPI_DATATYPE_NULL && !finalized)
987 MPI_Type_free(type);
988 }
989 messageTypes.clear();
990 created_=false;
991 }
992
993 }
994
995 template<typename T>
996 template<class T1, class T2, class V, bool send>
997 void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
998 {
999
1000 MPIDatatypeInformation<V> dataInfo(data);
1001 this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
1002
1003 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1004 const const_iterator end=this->remoteIndices_->end();
1005
1006 // Allocate MPI_Datatypes and deallocate memory for the type construction.
1007 for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
1008 IndexedTypeInformation& info=dataInfo.information_[process->first];
1009 // Shift the displacement
1010 MPI_Aint base;
1011 MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1012
1013 for(int i=0; i< info.elements; i++) {
1014 info.displ[i]-=base;
1015 }
1016
1017 // Create data type
1018 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1019 MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1020 MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1021 MPI_Type_commit(type);
1022 // Deallocate memory
1023 info.free();
1024 }
1025 }
1026
1027 template<typename T>
1028 template<class V, bool createForward>
1029 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1030 {
1031 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1032 int rank;
1033 static int index = createForward ? 1 : 0;
1034 int noMessages = messageTypes.size();
1035 // allocate request handles
1036 requests_[index] = new MPI_Request[2*noMessages];
1037 const MapIterator end = messageTypes.end();
1038 int request=0;
1039 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1040
1041 // Set up the requests for receiving first
1042 for(MapIterator process = messageTypes.begin(); process != end;
1043 ++process, ++request) {
1044 MPI_Datatype type = createForward ? process->second.second : process->second.first;
1045 void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1046 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1047 }
1048
1049 // And now the send requests
1050
1051 for(MapIterator process = messageTypes.begin(); process != end;
1052 ++process, ++request) {
1053 MPI_Datatype type = createForward ? process->second.first : process->second.second;
1054 void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1055 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1056 }
1057 }
1058
1059 template<typename T>
1060 void DatatypeCommunicator<T>::forward()
1061 {
1062 sendRecv(requests_[1]);
1063 }
1064
1065 template<typename T>
1066 void DatatypeCommunicator<T>::backward()
1067 {
1068 sendRecv(requests_[0]);
1069 }
1070
1071 template<typename T>
1072 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1073 {
1074 int noMessages = messageTypes.size();
1075 // Start the receive calls first
1076 MPI_Startall(noMessages, requests);
1077 // Now the send calls
1078 MPI_Startall(noMessages, requests+noMessages);
1079
1080 // Wait for completion of the communication send first then receive
1081 MPI_Status* status=new MPI_Status[2*noMessages];
1082 for(int i=0; i<2*noMessages; i++)
1083 status[i].MPI_ERROR=MPI_SUCCESS;
1084
1085 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1086 int receive = MPI_Waitall(noMessages, requests, status);
1087
1088 // Error checks
1089 int success=1, globalSuccess=0;
1090 if(send==MPI_ERR_IN_STATUS) {
1091 int rank;
1092 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1093 std::cerr<<rank<<": Error in sending :"<<std::endl;
1094 // Search for the error
1095 for(int i=noMessages; i< 2*noMessages; i++)
1096 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1097 char message[300];
1098 int messageLength;
1099 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1100 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1101 for(int j = 0; j < messageLength; j++)
1102 std::cout << message[j];
1103 }
1105 success=0;
1106 }
1107
1108 if(receive==MPI_ERR_IN_STATUS) {
1109 int rank;
1110 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1111 std::cerr<<rank<<": Error in receiving!"<<std::endl;
1112 // Search for the error
1113 for(int i=0; i< noMessages; i++)
1114 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1115 char message[300];
1116 int messageLength;
1117 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1118 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1119 for(int j = 0; j < messageLength; j++)
1120 std::cerr << message[j];
1121 }
1123 success=0;
1124 }
1125
1126 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1127
1128 delete[] status;
1129
1130 if(!globalSuccess)
1131 DUNE_THROW(CommunicationError, "A communication error occurred!");
1132
1133 }
1134
1136 {
1137 buffers_[0]=0;
1138 buffers_[1]=0;
1139 bufferSize_[0]=0;
1140 bufferSize_[1]=0;
1141 }
1142
1143 template<class Data, class Interface>
1145 BufferedCommunicator::build(const Interface& interface)
1146 {
1147 interfaces_=interface.interfaces();
1148 communicator_=interface.communicator();
1150 ::const_iterator const_iterator;
1151 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1152 const const_iterator end = interfaces_.end();
1153 int lrank;
1154 MPI_Comm_rank(communicator_, &lrank);
1155
1156 bufferSize_[0]=0;
1157 bufferSize_[1]=0;
1158
1159 for(const_iterator interfacePair = interfaces_.begin();
1160 interfacePair != end; ++interfacePair) {
1161 int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1162 int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1163 if (noSend + noRecv > 0)
1164 messageInformation_.insert(std::make_pair(interfacePair->first,
1165 std::make_pair(MessageInformation(bufferSize_[0],
1166 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1167 MessageInformation(bufferSize_[1],
1168 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1169 bufferSize_[0] += noSend;
1170 bufferSize_[1] += noRecv;
1171 }
1172
1173 // allocate the buffers
1174 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1175 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1176
1177 buffers_[0] = new char[bufferSize_[0]];
1178 buffers_[1] = new char[bufferSize_[1]];
1179 }
1180
1181 template<class Data, class Interface>
1182 void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1183 {
1184
1185 interfaces_=interface.interfaces();
1186 communicator_=interface.communicator();
1188 ::const_iterator const_iterator;
1189 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1190 const const_iterator end = interfaces_.end();
1191
1192 bufferSize_[0]=0;
1193 bufferSize_[1]=0;
1194
1195 for(const_iterator interfacePair = interfaces_.begin();
1196 interfacePair != end; ++interfacePair) {
1197 int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1198 int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1199 if (noSend + noRecv > 0)
1200 messageInformation_.insert(std::make_pair(interfacePair->first,
1201 std::make_pair(MessageInformation(bufferSize_[0],
1202 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1203 MessageInformation(bufferSize_[1],
1204 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1205 bufferSize_[0] += noSend;
1206 bufferSize_[1] += noRecv;
1207 }
1208
1209 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1210 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1211 // allocate the buffers
1212 buffers_[0] = new char[bufferSize_[0]];
1213 buffers_[1] = new char[bufferSize_[1]];
1214 }
1215
1216 inline void BufferedCommunicator::free()
1217 {
1218 messageInformation_.clear();
1219 if(buffers_[0])
1220 delete[] buffers_[0];
1221
1222 if(buffers_[1])
1223 delete[] buffers_[1];
1224 buffers_[0]=buffers_[1]=0;
1225 }
1226
1228 {
1229 free();
1230 }
1231
1232 template<class Data>
1233 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1234 (const InterfaceInformation& info) const
1235 {
1236 return info.size();
1237 }
1238
1239
1240 template<class Data>
1241 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1242 (const Data&, const InterfaceInformation& info) const
1243 {
1244 return operator()(info);
1245 }
1246
1247
1248 template<class Data>
1249 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1250 (const Data& data, const InterfaceInformation& info) const
1251 {
1252 int entries=0;
1253
1254 for(size_t i=0; i < info.size(); i++)
1255 entries += CommPolicy<Data>::getSize(data,info[i]);
1256
1257 return entries;
1258 }
1259
1260
1261 template<class Data, class GatherScatter, bool FORWARD>
1262 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, [[maybe_unused]] size_t bufferSize) const
1263 {
1264 typedef typename InterfaceMap::const_iterator
1265 const_iterator;
1266
1267 int rank;
1268 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1269 const const_iterator end = interfaces.end();
1270 size_t index=0;
1271
1272 for(const_iterator interfacePair = interfaces.begin();
1273 interfacePair != end; ++interfacePair) {
1274 int size = forward ? interfacePair->second.first.size() :
1275 interfacePair->second.second.size();
1276
1277 for(int i=0; i < size; i++) {
1278 int local = forward ? interfacePair->second.first[i] :
1279 interfacePair->second.second[i];
1280 for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1281
1282#ifdef DUNE_ISTL_WITH_CHECKING
1283 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1284#endif
1285 buffer[index]=GatherScatter::gather(data, local, j);
1286 }
1287
1288 }
1289 }
1290
1291 }
1292
1293
1294 template<class Data, class GatherScatter, bool FORWARD>
1295 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(
1296 const InterfaceMap& interfaces, const Data& data, Type* buffer, [[maybe_unused]] size_t bufferSize) const
1297 {
1298 typedef typename InterfaceMap::const_iterator
1299 const_iterator;
1300 const const_iterator end = interfaces.end();
1301 size_t index = 0;
1302
1303 int rank;
1304 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1305
1306 for(const_iterator interfacePair = interfaces.begin();
1307 interfacePair != end; ++interfacePair) {
1308 size_t size = FORWARD ? interfacePair->second.first.size() :
1309 interfacePair->second.second.size();
1310
1311 for(size_t i=0; i < size; i++) {
1312
1313#ifdef DUNE_ISTL_WITH_CHECKING
1314 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1315#endif
1316
1317 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1318 interfacePair->second.second[i]);
1319 }
1320 }
1321
1322 }
1323
1324
1325 template<class Data, class GatherScatter, bool FORWARD>
1326 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1327 {
1328 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1329 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1330
1331 assert(infoPair!=interfaces.end());
1332
1333 const Information& info = FORWARD ? infoPair->second.second :
1334 infoPair->second.first;
1335
1336 for(size_t i=0, index=0; i < info.size(); i++) {
1337 for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1338 GatherScatter::scatter(data, buffer[index++], info[i], j);
1339 }
1340 }
1341
1342
1343 template<class Data, class GatherScatter, bool FORWARD>
1344 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1345 {
1346 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1347 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1348
1349 assert(infoPair!=interfaces.end());
1350
1351 const Information& info = FORWARD ? infoPair->second.second :
1352 infoPair->second.first;
1353
1354 for(size_t i=0; i < info.size(); i++) {
1355 GatherScatter::scatter(data, buffer[i], info[i]);
1356 }
1357 }
1358
1359
1360 template<class GatherScatter,class Data>
1361 void BufferedCommunicator::forward(Data& data)
1362 {
1363 this->template sendRecv<GatherScatter,true>(data, data);
1364 }
1365
1366
1367 template<class GatherScatter, class Data>
1368 void BufferedCommunicator::backward(Data& data)
1369 {
1370 this->template sendRecv<GatherScatter,false>(data, data);
1371 }
1372
1373
1374 template<class GatherScatter, class Data>
1375 void BufferedCommunicator::forward(const Data& source, Data& dest)
1376 {
1377 this->template sendRecv<GatherScatter,true>(source, dest);
1378 }
1379
1380
1381 template<class GatherScatter, class Data>
1382 void BufferedCommunicator::backward(Data& source, const Data& dest)
1383 {
1384 this->template sendRecv<GatherScatter,false>(dest, source);
1385 }
1386
1387
1388 template<class GatherScatter, bool FORWARD, class Data>
1389 void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1390 {
1391 int rank, lrank;
1392
1393 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1394 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1395
1396 typedef typename CommPolicy<Data>::IndexedType Type;
1397 Type *sendBuffer, *recvBuffer;
1398 size_t sendBufferSize;
1399#ifndef NDEBUG
1400 size_t recvBufferSize;
1401#endif
1402
1403 if(FORWARD) {
1404 sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1405 sendBufferSize = bufferSize_[0];
1406 recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1407#ifndef NDEBUG
1408 recvBufferSize = bufferSize_[1];
1409#endif
1410 }else{
1411 sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1412 sendBufferSize = bufferSize_[1];
1413 recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1414#ifndef NDEBUG
1415 recvBufferSize = bufferSize_[0];
1416#endif
1417 }
1418 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1419
1420 MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1421
1422 MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1423 MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1424 /* Number of recvRequests that are not MPI_REQUEST_NULL */
1425 size_t numberOfRealRecvRequests = 0;
1426
1427 // Setup receive first
1428 typedef typename InformationMap::const_iterator const_iterator;
1429
1430 const const_iterator end = messageInformation_.end();
1431 size_t i=0;
1432 int* processMap = new int[messageInformation_.size()];
1433
1434 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1435 processMap[i]=info->first;
1436 if(FORWARD) {
1437 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1438 Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1439 if(info->second.second.size_) {
1440 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1441 MPI_BYTE, info->first, commTag_, communicator_,
1442 recvRequests+i);
1443 numberOfRealRecvRequests += 1;
1444 } else {
1445 // Nothing to receive -> set request to inactive
1446 recvRequests[i]=MPI_REQUEST_NULL;
1447 }
1448 }else{
1449 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1450 Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1451 if(info->second.first.size_) {
1452 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1453 MPI_BYTE, info->first, commTag_, communicator_,
1454 recvRequests+i);
1455 numberOfRealRecvRequests += 1;
1456 } else {
1457 // Nothing to receive -> set request to inactive
1458 recvRequests[i]=MPI_REQUEST_NULL;
1459 }
1460 }
1461 }
1462
1463 // now the send requests
1464 i=0;
1465 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1466 if(FORWARD) {
1467 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1468 Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1469 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1470 if(info->second.first.size_)
1471 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1472 MPI_BYTE, info->first, commTag_, communicator_,
1473 sendRequests+i);
1474 else
1475 // Nothing to send -> set request to inactive
1476 sendRequests[i]=MPI_REQUEST_NULL;
1477 }else{
1478 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1479 Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1480 if(info->second.second.size_)
1481 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1482 MPI_BYTE, info->first, commTag_, communicator_,
1483 sendRequests+i);
1484 else
1485 // Nothing to send -> set request to inactive
1486 sendRequests[i]=MPI_REQUEST_NULL;
1487 }
1488
1489 // Wait for completion of receive and immediately start scatter
1490 i=0;
1491 //int success = 1;
1492 int finished = MPI_UNDEFINED;
1493 MPI_Status status; //[messageInformation_.size()];
1494 //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1495
1496 for(i=0; i< numberOfRealRecvRequests; i++) {
1497 status.MPI_ERROR=MPI_SUCCESS;
1498 MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1499 assert(finished != MPI_UNDEFINED);
1500
1501 if(status.MPI_ERROR==MPI_SUCCESS) {
1502 int& proc = processMap[finished];
1503 typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1504 assert(infoIter != messageInformation_.end());
1505
1506 MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1507 assert(info.start_+info.size_ <= recvBufferSize);
1508
1509 MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1510 }else{
1511 std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1512 //success=0;
1513 }
1514 }
1515
1516 MPI_Status recvStatus;
1517
1518 // Wait for completion of sends
1519 for(i=0; i< messageInformation_.size(); i++)
1520 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1521 std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1522 //success=0;
1523 }
1524 /*
1525 int globalSuccess;
1526 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1527
1528 if(!globalSuccess)
1529 DUNE_THROW(CommunicationError, "A communication error occurred!");
1530 */
1531 delete[] processMap;
1532 delete[] sendRequests;
1533 delete[] recvRequests;
1534
1535 }
1536
1537#endif // DOXYGEN
1538
1540}
1541
1542#endif // HAVE_MPI
1543
1544#endif
Classes describing a distributed indexset.
Standard Dune debug streams.
A few common exception classes.
#define DUNE_THROW(E, m)
Definition exceptions.hh:216
DVVerbType dvverb
stream for very verbose output.
Dune namespace.
Definition alignedallocator.hh:11
Default exception class for I/O errors.
Definition exceptions.hh:229
Flag for marking indexed data structures where data at each index is of the same size.
Definition communicator.hh:108
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition communicator.hh:116
Default policy used for communicating an indexed type.
Definition communicator.hh:126
V::value_type IndexedType
The type we get at each index with operator[].
Definition communicator.hh:145
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition communicator.hh:151
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition communicator.hh:138
Definition communicator.hh:171
Definition communicator.hh:173
VariableBlockVector< FieldVector< K, n >, A > Type
Definition communicator.hh:178
Error thrown if there was a problem with the communication.
Definition communicator.hh:193
GatherScatter default implementation that just copies data.
Definition communicator.hh:200
static void scatter(T &vec, const IndexedType &v, std::size_t i)
CommPolicy< T >::IndexedType IndexedType
Definition communicator.hh:201
static const IndexedType & gather(const T &vec, std::size_t i)
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition communicator.hh:458
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
std::enable_if< std::is_same< SizeOne, typenameCommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
Manager class for the mapping between local indices and globally unique indices.
Definition indexset.hh:216
Base class of all classes representing a communication interface.
Definition parallel/interface.hh:33
Information describing an interface.
Definition parallel/interface.hh:99
Communication interface between remote and local indices.
Definition parallel/interface.hh:207
An index present on the local process.
Definition localindex.hh:33
The indices present on remote processes.
Definition remoteindices.hh:187
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition remoteindices.hh:213
LocalIndex::Attribute Attribute
The type of the attribute.
Definition remoteindices.hh:224
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition remoteindices.hh:219
Provides classes for building the communication interface between remote indices.
T begin(T... args)
T clear(T... args)
T data(T... args)
T end(T... args)
T endl(T... args)
T find(T... args)
T forward(T... args)
T free(T... args)
T insert(T... args)
T make_pair(T... args)
T size(T... args)