3#ifndef DUNE_COMMUNICATOR
4#define DUNE_COMMUNICATOR
175 template<
class K,
class A,
int n>
253 DatatypeCommunicator();
258 ~DatatypeCommunicator();
286 template<
class T1,
class T2,
class V>
287 void build(
const RemoteIndices& remoteIndices,
const T1& sourceFlags, V& sendData,
const T2& destFlags, V& receiveData);
322 MessageTypeMap messageTypes;
329 MPI_Request* requests_[2];
339 template<
class V,
bool FORWARD>
340 void createRequests(V& sendData, V& receiveData);
345 template<
class T1,
class T2,
class V,
bool send>
346 void createDataTypes(
const T1& source,
const T2& destination, V& data);
351 void sendRecv(MPI_Request* req);
356 struct IndexedTypeInformation
366 displ =
new MPI_Aint[i];
400 struct MPIDatatypeInformation
406 MPIDatatypeInformation(
const V& data) : data_(
data)
414 void reserve(
int proc,
int size)
416 information_[proc].build(size);
424 void add(
int proc,
int local)
426 IndexedTypeInformation& info=information_[proc];
427 assert((info.elements)<info.size);
428 MPI_Get_address(
const_cast<void*
>(CommPolicy<V>::getAddress(data_, local)),
429 info.displ+info.elements);
430 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
472 template<
class Data,
class Interface>
483 template<
class Data,
class Interface>
514 template<
class GatherScatter,
class Data>
545 template<
class GatherScatter,
class Data>
573 template<
class GatherScatter,
class Data>
601 template<
class GatherScatter,
class Data>
626 template<
class Data,
typename IndexedTypeFlag>
627 struct MessageSizeCalculator
635 struct MessageSizeCalculator<Data,
SizeOne>
660 struct MessageSizeCalculator<Data,VariableSize>
670 inline int operator()(
const Data& data,
const InterfaceInformation& info)
const;
676 template<
class Data,
class GatherScatter,
bool send,
typename IndexedTypeFlag>
677 struct MessageGatherer
684 template<
class Data,
class GatherScatter,
bool send>
685 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
694 typedef GatherScatter Gatherer;
712 inline void operator()(
const InterfaceMap& interface,
const Data& data, Type* buffer,
size_t bufferSize)
const;
719 template<
class Data,
class GatherScatter,
bool send>
720 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
729 typedef GatherScatter Gatherer;
747 inline void operator()(
const InterfaceMap& interface,
const Data& data, Type* buffer,
size_t bufferSize)
const;
753 template<
class Data,
class GatherScatter,
bool send,
typename IndexedTypeFlag>
754 struct MessageScatterer
761 template<
class Data,
class GatherScatter,
bool send>
762 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
771 typedef GatherScatter Scatterer;
789 inline void operator()(
const InterfaceMap& interface, Data& data, Type* buffer,
const int& proc)
const;
795 template<
class Data,
class GatherScatter,
bool send>
796 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
805 typedef GatherScatter Scatterer;
823 inline void operator()(
const InterfaceMap& interface, Data& data, Type* buffer,
const int& proc)
const;
829 struct MessageInformation
833 : start_(0), size_(0)
843 MessageInformation(
size_t start,
size_t size)
844 : start_(start), size_(
size)
867 InformationMap messageInformation_;
875 size_t bufferSize_[2];
889 MPI_Comm communicator_;
894 template<
class GatherScatter,
bool FORWARD,
class Data>
895 void sendRecv(
const Data& source, Data& target);
913 template<
class K,
class A,
int n>
914 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(
const Type& v,
int index)
916 return &(v[index][0]);
919 template<
class K,
class A,
int n>
920 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(
const Type& v,
int index)
922 return v[index].getsize();
926 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(
const T & vec,
std::size_t i)
932 inline void CopyGatherScatter<T>::scatter(T& vec,
const IndexedType& v,
std::size_t i)
938 DatatypeCommunicator<T>::DatatypeCommunicator()
939 : remoteIndices_(0), created_(false)
948 DatatypeCommunicator<T>::~DatatypeCommunicator()
954 template<
class T1,
class T2,
class V>
955 inline void DatatypeCommunicator<T>::build(
const RemoteIndices& remoteIndices,
956 const T1& source, V& sendData,
957 const T2& destination, V& receiveData)
959 remoteIndices_ = &remoteIndices;
961 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
962 createDataTypes<T1,T2,V,true>(source,destination, sendData);
963 createRequests<V,true>(sendData, receiveData);
964 createRequests<V,false>(receiveData, sendData);
969 void DatatypeCommunicator<T>::free()
972 delete[] requests_[0];
973 delete[] requests_[1];
974 typedef MessageTypeMap::iterator iterator;
975 typedef MessageTypeMap::const_iterator const_iterator;
977 const const_iterator
end=messageTypes.end();
979 for(iterator process = messageTypes.begin(); process !=
end; ++process) {
980 MPI_Datatype *type = &(process->second.first);
982 MPI_Finalized(&finalized);
983 if(*type!=MPI_DATATYPE_NULL && !finalized)
985 type = &(process->second.second);
986 if(*type!=MPI_DATATYPE_NULL && !finalized)
989 messageTypes.clear();
996 template<
class T1,
class T2,
class V,
bool send>
997 void DatatypeCommunicator<T>::createDataTypes(
const T1& sourceFlags,
const T2& destFlags, V& data)
1000 MPIDatatypeInformation<V> dataInfo(data);
1001 this->
template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
1003 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1004 const const_iterator
end=this->remoteIndices_->end();
1007 for(const_iterator process=this->remoteIndices_->begin(); process !=
end; ++process) {
1008 IndexedTypeInformation& info=dataInfo.information_[process->first];
1013 for(
int i=0; i< info.elements; i++) {
1014 info.displ[i]-=base;
1018 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1019 MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1021 MPI_Type_commit(type);
1027 template<
typename T>
1028 template<
class V,
bool createForward>
1029 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1033 static int index = createForward ? 1 : 0;
1034 int noMessages = messageTypes.
size();
1036 requests_[index] =
new MPI_Request[2*noMessages];
1037 const MapIterator
end = messageTypes.end();
1039 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1042 for(MapIterator process = messageTypes.begin(); process !=
end;
1043 ++process, ++request) {
1044 MPI_Datatype type = createForward ? process->second.second : process->second.first;
1046 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1051 for(MapIterator process = messageTypes.begin(); process !=
end;
1052 ++process, ++request) {
1053 MPI_Datatype type = createForward ? process->second.first : process->second.second;
1055 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1059 template<
typename T>
1060 void DatatypeCommunicator<T>::forward()
1062 sendRecv(requests_[1]);
1065 template<
typename T>
1066 void DatatypeCommunicator<T>::backward()
1068 sendRecv(requests_[0]);
1071 template<
typename T>
1072 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1074 int noMessages = messageTypes.size();
1076 MPI_Startall(noMessages, requests);
1078 MPI_Startall(noMessages, requests+noMessages);
1081 MPI_Status*
status=
new MPI_Status[2*noMessages];
1082 for(
int i=0; i<2*noMessages; i++)
1083 status[i].MPI_ERROR=MPI_SUCCESS;
1085 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1086 int receive = MPI_Waitall(noMessages, requests, status);
1089 int success=1, globalSuccess=0;
1090 if(send==MPI_ERR_IN_STATUS) {
1092 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1095 for(
int i=noMessages; i< 2*noMessages; i++)
1096 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1099 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1101 for(
int j = 0; j < messageLength; j++)
1108 if(receive==MPI_ERR_IN_STATUS) {
1110 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1113 for(
int i=0; i< noMessages; i++)
1114 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1117 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1119 for(
int j = 0; j < messageLength; j++)
1126 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1131 DUNE_THROW(CommunicationError,
"A communication error occurred!");
1143 template<
class Data,
class Interface>
1147 interfaces_=interface.interfaces();
1148 communicator_=interface.communicator();
1150 ::const_iterator const_iterator;
1152 const const_iterator
end = interfaces_.
end();
1154 MPI_Comm_rank(communicator_, &lrank);
1159 for(const_iterator interfacePair = interfaces_.
begin();
1160 interfacePair !=
end; ++interfacePair) {
1161 int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1162 int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1163 if (noSend + noRecv > 0)
1167 MessageInformation(bufferSize_[1],
1169 bufferSize_[0] += noSend;
1170 bufferSize_[1] += noRecv;
1177 buffers_[0] =
new char[bufferSize_[0]];
1178 buffers_[1] =
new char[bufferSize_[1]];
1181 template<
class Data,
class Interface>
1185 interfaces_=interface.interfaces();
1186 communicator_=interface.communicator();
1188 ::const_iterator const_iterator;
1190 const const_iterator
end = interfaces_.
end();
1195 for(const_iterator interfacePair = interfaces_.
begin();
1196 interfacePair !=
end; ++interfacePair) {
1197 int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1198 int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1199 if (noSend + noRecv > 0)
1203 MessageInformation(bufferSize_[1],
1205 bufferSize_[0] += noSend;
1206 bufferSize_[1] += noRecv;
1212 buffers_[0] =
new char[bufferSize_[0]];
1213 buffers_[1] =
new char[bufferSize_[1]];
1218 messageInformation_.
clear();
1220 delete[] buffers_[0];
1223 delete[] buffers_[1];
1224 buffers_[0]=buffers_[1]=0;
1232 template<
class Data>
1233 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1234 (
const InterfaceInformation& info)
const
1240 template<
class Data>
1241 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1242 (
const Data&,
const InterfaceInformation& info)
const
1244 return operator()(info);
1248 template<
class Data>
1249 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1250 (
const Data&
data,
const InterfaceInformation& info)
const
1254 for(
size_t i=0; i < info.size(); i++)
1261 template<
class Data,
class GatherScatter,
bool FORWARD>
1262 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(
const InterfaceMap& interfaces,
const Data& data, Type* buffer, [[maybe_unused]]
size_t bufferSize)
const
1264 typedef typename InterfaceMap::const_iterator
1268 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1269 const const_iterator
end = interfaces.end();
1272 for(const_iterator interfacePair = interfaces.begin();
1273 interfacePair !=
end; ++interfacePair) {
1274 int size =
forward ? interfacePair->second.first.size() :
1275 interfacePair->second.second.size();
1277 for(
int i=0; i <
size; i++) {
1278 int local =
forward ? interfacePair->second.first[i] :
1279 interfacePair->second.second[i];
1280 for(
std::size_t j=0; j < CommPolicy<Data>::getSize(
data, local); j++, index++) {
1282#ifdef DUNE_ISTL_WITH_CHECKING
1285 buffer[index]=GatherScatter::gather(data, local, j);
1294 template<
class Data,
class GatherScatter,
bool FORWARD>
1295 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(
1296 const InterfaceMap& interfaces,
const Data& data, Type* buffer, [[maybe_unused]]
size_t bufferSize)
const
1298 typedef typename InterfaceMap::const_iterator
1300 const const_iterator
end = interfaces.end();
1304 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1306 for(const_iterator interfacePair = interfaces.begin();
1307 interfacePair !=
end; ++interfacePair) {
1308 size_t size = FORWARD ? interfacePair->second.first.size() :
1309 interfacePair->second.second.size();
1311 for(
size_t i=0; i <
size; i++) {
1313#ifdef DUNE_ISTL_WITH_CHECKING
1317 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1318 interfacePair->second.second[i]);
1325 template<
class Data,
class GatherScatter,
bool FORWARD>
1326 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(
const InterfaceMap& interfaces, Data& data, Type* buffer,
const int& proc)
const
1328 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1329 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1331 assert(infoPair!=interfaces.end());
1333 const Information& info = FORWARD ? infoPair->second.second :
1334 infoPair->second.first;
1336 for(
size_t i=0, index=0; i < info.size(); i++) {
1337 for(
size_t j=0; j < CommPolicy<Data>::getSize(
data, info[i]); j++)
1338 GatherScatter::scatter(data, buffer[index++], info[i], j);
1343 template<
class Data,
class GatherScatter,
bool FORWARD>
1344 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(
const InterfaceMap& interfaces, Data& data, Type* buffer,
const int& proc)
const
1346 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1347 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1349 assert(infoPair!=interfaces.end());
1351 const Information& info = FORWARD ? infoPair->second.second :
1352 infoPair->second.first;
1354 for(
size_t i=0; i < info.size(); i++) {
1355 GatherScatter::scatter(data, buffer[i], info[i]);
1360 template<
class GatherScatter,
class Data>
1363 this->
template sendRecv<GatherScatter,true>(data, data);
1367 template<
class GatherScatter,
class Data>
1370 this->
template sendRecv<GatherScatter,false>(data, data);
1374 template<
class GatherScatter,
class Data>
1377 this->
template sendRecv<GatherScatter,true>(source, dest);
1381 template<
class GatherScatter,
class Data>
1384 this->
template sendRecv<GatherScatter,false>(dest, source);
1388 template<
class GatherScatter,
bool FORWARD,
class Data>
1389 void BufferedCommunicator::sendRecv(
const Data& source, Data& dest)
1393 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1394 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1397 Type *sendBuffer, *recvBuffer;
1398 size_t sendBufferSize;
1400 size_t recvBufferSize;
1404 sendBuffer =
reinterpret_cast<Type*
>(buffers_[0]);
1405 sendBufferSize = bufferSize_[0];
1406 recvBuffer =
reinterpret_cast<Type*
>(buffers_[1]);
1408 recvBufferSize = bufferSize_[1];
1411 sendBuffer =
reinterpret_cast<Type*
>(buffers_[1]);
1412 sendBufferSize = bufferSize_[1];
1413 recvBuffer =
reinterpret_cast<Type*
>(buffers_[0]);
1415 recvBufferSize = bufferSize_[0];
1420 MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1422 MPI_Request* sendRequests =
new MPI_Request[messageInformation_.
size()];
1423 MPI_Request* recvRequests =
new MPI_Request[messageInformation_.
size()];
1425 size_t numberOfRealRecvRequests = 0;
1428 typedef typename InformationMap::const_iterator const_iterator;
1430 const const_iterator
end = messageInformation_.
end();
1432 int* processMap =
new int[messageInformation_.
size()];
1434 for(const_iterator info = messageInformation_.
begin(); info !=
end; ++info, ++i) {
1435 processMap[i]=info->
first;
1439 if(info->second.second.size_) {
1440 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1441 MPI_BYTE, info->first, commTag_, communicator_,
1443 numberOfRealRecvRequests += 1;
1446 recvRequests[i]=MPI_REQUEST_NULL;
1451 if(info->second.first.size_) {
1452 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1453 MPI_BYTE, info->first, commTag_, communicator_,
1455 numberOfRealRecvRequests += 1;
1458 recvRequests[i]=MPI_REQUEST_NULL;
1465 for(const_iterator info = messageInformation_.
begin(); info !=
end; ++info, ++i)
1470 if(info->second.first.size_)
1471 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1472 MPI_BYTE, info->first, commTag_, communicator_,
1476 sendRequests[i]=MPI_REQUEST_NULL;
1480 if(info->second.second.size_)
1481 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1482 MPI_BYTE, info->first, commTag_, communicator_,
1486 sendRequests[i]=MPI_REQUEST_NULL;
1492 int finished = MPI_UNDEFINED;
1496 for(i=0; i< numberOfRealRecvRequests; i++) {
1497 status.MPI_ERROR=MPI_SUCCESS;
1498 MPI_Waitany(messageInformation_.
size(), recvRequests, &finished, &status);
1499 assert(finished != MPI_UNDEFINED);
1501 if(
status.MPI_ERROR==MPI_SUCCESS) {
1502 int& proc = processMap[finished];
1503 typename InformationMap::const_iterator infoIter = messageInformation_.
find(proc);
1504 assert(infoIter != messageInformation_.
end());
1506 MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1507 assert(info.start_+info.size_ <= recvBufferSize);
1509 MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1511 std::cerr<<rank<<
": MPI_Error occurred while receiving message from "<<processMap[finished]<<
std::endl;
1516 MPI_Status recvStatus;
1519 for(i=0; i< messageInformation_.
size(); i++)
1520 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1521 std::cerr<<rank<<
": MPI_Error occurred while sending message to "<<processMap[finished]<<
std::endl;
1531 delete[] processMap;
1532 delete[] sendRequests;
1533 delete[] recvRequests;
Classes describing a distributed indexset.
Standard Dune debug streams.
A few common exception classes.
#define DUNE_THROW(E, m)
Definition exceptions.hh:216
DVVerbType dvverb
stream for very verbose output.
Dune namespace.
Definition alignedallocator.hh:11
Default exception class for I/O errors.
Definition exceptions.hh:229
Flag for marking indexed data structures where data at each index is of the same size.
Definition communicator.hh:108
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition communicator.hh:116
Default policy used for communicating an indexed type.
Definition communicator.hh:126
V::value_type IndexedType
The type we get at each index with operator[].
Definition communicator.hh:145
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition communicator.hh:151
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition communicator.hh:138
Definition communicator.hh:171
Definition communicator.hh:173
static int getSize(const Type &v, int i)
VariableSize IndexedTypeFlag
Definition communicator.hh:182
VariableBlockVector< FieldVector< K, n >, A > Type
Definition communicator.hh:178
static const void * getAddress(const Type &v, int i)
Type::B IndexedType
Definition communicator.hh:180
Error thrown if there was a problem with the communication.
Definition communicator.hh:193
GatherScatter default implementation that just copies data.
Definition communicator.hh:200
static void scatter(T &vec, const IndexedType &v, std::size_t i)
CommPolicy< T >::IndexedType IndexedType
Definition communicator.hh:201
static const IndexedType & gather(const T &vec, std::size_t i)
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition communicator.hh:458
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
std::enable_if< std::is_same< SizeOne, typenameCommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
Manager class for the mapping between local indices and globally unique indices.
Definition indexset.hh:216
Base class of all classes representing a communication interface.
Definition parallel/interface.hh:33
Information describing an interface.
Definition parallel/interface.hh:99
Communication interface between remote and local indices.
Definition parallel/interface.hh:207
An index present on the local process.
Definition localindex.hh:33
The indices present on remote processes.
Definition remoteindices.hh:187
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition remoteindices.hh:213
LocalIndex::Attribute Attribute
The type of the attribute.
Definition remoteindices.hh:224
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition remoteindices.hh:219
Provides classes for building the communication interface between remote indices.