3#ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
4#define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
42 template <
typename H>
auto require(H &&h) ->
decltype(h.fixedSize());
51constexpr bool callFixedSize(H &&handle) {
52 return handle.fixedSize();
57[[deprecated(
"Using handles with fixedsize() (lower case s) is deprecated and "
58 "will be removed after release 2.8. Implement fixedSize() "
59 "(camelCase) instead!")]]
60constexpr bool callFixedSize(H &&handle) {
61 return handle.fixedsize();
72template<
class T,
class Allocator=std::allocator<T> >
80 explicit MessageBuffer(
int size)
81 : buffer_(new T[
size]), size_(
size), position_(0)
87 explicit MessageBuffer(
const MessageBuffer& o)
88 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
100 void write(
const T& data)
102 buffer_[position_++]=
data;
111 data=buffer_[position_++];
130 return position_==size_;
138 bool hasSpaceForItems(
int noItems)
140 return position_+noItems<=size_;
177class InterfaceTracker
185 InterfaceTracker(
int rank, InterfaceInformation info,
std::size_t fixedsize=0,
186 bool allocateSizes=
false)
187 :
fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
191 sizes_.resize(info.size());
198 void moveToNextIndex()
201 assert(index_<=interface_.size());
211 assert(index_<=interface_.size());
217 bool finished()
const
219 return index_==interface_.size();
222 void skipZeroIndices()
225 while(sizes_.size() && index_!=interface_.size() &&!
size())
235 return interface_[index_];
242 assert(sizes_.size());
243 return sizes_[index_];
258 return !interface_.size();
267 return interface_.size()-index_;
293 InterfaceInformation interface_;
337template<
class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
349#ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
357 : maxBufferSize_(32768), interface_(&inf)
359 MPI_Comm_dup(comm, &communicator_);
366 : maxBufferSize_(32768), interface_(&inf.interfaces())
378 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
381 MPI_Comm_dup(comm, &communicator_);
387 VariableSizeCommunicator(
const Interface& inf)
388 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
389 interface_(&inf.interfaces())
391 MPI_Comm_dup(inf.communicator(), &communicator_);
401 : maxBufferSize_(max_buffer_size), interface_(&inf)
403 MPI_Comm_dup(comm, &communicator_);
412 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
419 MPI_Comm_free(&communicator_);
427 maxBufferSize_ = other.maxBufferSize_;
428 interface_ = other.interface_;
429 MPI_Comm_dup(other.communicator_, &communicator_);
440 maxBufferSize_ = other.maxBufferSize_;
441 interface_ = other.interface_;
442 MPI_Comm_free(&communicator_);
443 MPI_Comm_dup(other.communicator_, &communicator_);
467 template<
class DataHandle>
470 communicate<true>(handle);
492 template<
class DataHandle>
495 communicate<false>(handle);
499 template<
bool FORWARD,
class DataHandle>
500 void communicateSizes(DataHandle& handle,
509 template<
bool forward,
class DataHandle>
510 void communicate(DataHandle& handle);
520 template<
bool FORWARD,
class DataHandle>
521 void setupInterfaceTrackers(DataHandle& handle,
531 template<
bool FORWARD,
class DataHandle>
532 void communicateFixedSize(DataHandle& handle);
540 template<
bool FORWARD,
class DataHandle>
541 void communicateVariableSize(DataHandle& handle);
562 MPI_Comm communicator_;
571template<
class DataHandle>
577 SizeDataHandle(DataHandle& data,
579 : data_(data), trackers_(trackers), index_()
590 void gather(B& buf,
int i)
592 buf.write(data_.size(i));
600 return trackers_[index_].getSizesPointer();
610void setReceivingIndex(T&,
int)
614void setReceivingIndex(SizeDataHandle<T>& t,
int i)
616 t.setReceivingIndex(i);
625template<
bool FORWARD>
626struct InterfaceInformationChooser
631 static const InterfaceInformation&
640 static const InterfaceInformation&
648struct InterfaceInformationChooser<false>
650 static const InterfaceInformation&
656 static const InterfaceInformation&
668template<
class DataHandle>
672 int operator()(DataHandle& handle, InterfaceTracker& tracker,
673 MessageBuffer<typename DataHandle::DataType>& buffer,
674 [[maybe_unused]]
int i)
const
676 return operator()(handle,tracker,buffer);
686 int operator()(DataHandle& handle, InterfaceTracker& tracker,
687 MessageBuffer<typename DataHandle::DataType>& buffer)
const
689 if(tracker.fixedSize)
695 handle.gather(buffer, tracker.index());
696 tracker.moveToNextIndex();
698 return noIndices*tracker.fixedSize;
703 tracker.skipZeroIndices();
704 while(!tracker.finished())
705 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
707 handle.gather(buffer, tracker.index());
708 packed+=handle.size(tracker.index());
709 tracker.moveToNextIndex();
723template<
class DataHandle>
733 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
734 MessageBuffer<typename DataHandle::DataType>& buffer,
737 if(tracker.fixedSize)
743 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
744 tracker.moveToNextIndex();
746 return tracker.finished();
751 for(
int unpacked=0;unpacked<
count;)
753 assert(!tracker.finished());
754 assert(buffer.hasSpaceForItems(tracker.size()));
755 handle.scatter(buffer, tracker.index(), tracker.size());
756 unpacked+=tracker.size();
757 tracker.moveToNextIndex();
759 return tracker.finished();
768template<
class DataHandle>
769struct UnpackSizeEntries{
778 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
779 MessageBuffer<
typename SizeDataHandle<DataHandle>::DataType>& buffer)
const
783 handle.getSizesPointer()+tracker.offset());
784 tracker.increment(noIndices);
787 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
788 MessageBuffer<
typename SizeDataHandle<DataHandle>::DataType>& buffer,
int)
const
790 return operator()(handle,tracker,buffer);
805 MPI_Comm communicator)
810 for(TIter iter=recv_trackers.
begin(), end=recv_trackers.
end(); iter!=
end;
814 iter->rank(), 933881, communicator, &(*mIter));
819 for(TIter iter=send_trackers.
begin(), end=send_trackers.
end();
824 iter->rank(), 933881, communicator, &(*mIter1));
833template<
class DataHandle>
834struct SetupSendRequest{
835 void operator()(DataHandle& handle,
836 InterfaceTracker& tracker,
837 MessageBuffer<typename DataHandle::DataType>& buffer,
838 MPI_Request& request,
842 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
844 while(!tracker.finished() && !handle.size(tracker.index()))
845 tracker.moveToNextIndex();
848 tracker.rank(), 933399, comm, &request);
857template<
class DataHandle>
858struct SetupRecvRequest{
859 void operator()(DataHandle& ,
860 InterfaceTracker& tracker,
861 MessageBuffer<typename DataHandle::DataType>& buffer,
862 MPI_Request& request,
866 if(tracker.indicesLeft())
868 tracker.rank(), 933399, comm, &request);
875template<
class DataHandle>
876struct NullPackUnpackFunctor
878 int operator()(DataHandle&, InterfaceTracker&,
879 MessageBuffer<typename DataHandle::DataType>&,
int)
883 int operator()(DataHandle&, InterfaceTracker&,
884 MessageBuffer<typename DataHandle::DataType>&)
904template<
class DataHandle,
class BufferFunctor,
class CommunicationFunctor>
909 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
911 BufferFunctor buffer_func,
912 CommunicationFunctor comm_func,
921 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
922 indices.resize(no_completed);
926 InterfaceTracker& tracker=trackers[*index];
927 setReceivingIndex(handle, *index);
932 MPI_Get_count(&(statuses[index-indices.begin()]),
936 buffer_func(handle, tracker, buffers[*index], count);
938 buffer_func(handle, tracker, buffers[*index]);
939 tracker.skipZeroIndices();
940 if(!tracker.finished()){
942 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
943 tracker.skipZeroIndices();
961template<
class DataHandle>
962std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
966 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
969 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
970 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(),
false);
981template<
class DataHandle>
982std::size_t checkSendAndContinueSending(DataHandle& handle,
985 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
988 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
989 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
1000template<
class DataHandle>
1001std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
1004 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1007 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
1008 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
1009 true, !Impl::callFixedSize(handle));
1017 if(*i!=MPI_REQUEST_NULL)
1032template<
class DataHandle,
class Functor>
1035 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1037 const Functor& setupFunctor,
1038 MPI_Comm communicator)
1042 biter=buffers.
begin();
1045 for(TIter titer=trackers.
begin(), end=trackers.
end(); titer!=
end; ++titer, ++biter, ++riter)
1047 setupFunctor(handle, *titer, *biter, *riter, communicator);
1048 complete+=titer->finished();
1054template<
class Allocator>
1055template<
bool FORWARD,
class DataHandle>
1056void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1060 if(interface_->
size()==0)
1066 if(Impl::callFixedSize(handle))
1070 typedef typename InterfaceMap::const_iterator IIter;
1071 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=
end; ++inf)
1074 if(Impl::callFixedSize(handle) && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1075 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1076 assert(!Impl::callFixedSize(handle)||fixedsize>0);
1077 send_trackers.
push_back(InterfaceTracker(inf->first,
1078 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1079 recv_trackers.
push_back(InterfaceTracker(inf->first,
1080 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1084template<
class Allocator>
1085template<
bool FORWARD,
class DataHandle>
1086void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1093 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1094 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1098 typedef typename DataHandle::DataType DataType;
1100 recv_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_));
1103 setupRequests(handle, send_trackers, send_buffers, data_send_req,
1104 SetupSendRequest<DataHandle>(), communicator_);
1106 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1107 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->
size();
1111 for(Iter i=recv_trackers.
begin(), end=recv_trackers.
end(); i!=
end; ++i)
1114 for(Iter i=send_trackers.
begin(),
end=send_trackers.
end(); i!=
end; ++i)
1118 while(no_size_to_recv+no_to_send+no_to_recv)
1122 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1123 data_recv_req, recv_buffers,
1128 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1129 send_buffers, communicator_);
1130 if(validRecvRequests(data_recv_req))
1132 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1133 recv_buffers, communicator_);
1138 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1142template<
class Allocator>
1143template<
bool FORWARD,
class DataHandle>
1144void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1153 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1154 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1155 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1156 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1157 setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1158 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1159 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1160 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1163 auto valid_req_func =
1164 [](
const MPI_Request& req) {
return req != MPI_REQUEST_NULL; };
1171 while(size_to_send+size_to_recv)
1175 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1176 send_buffers, communicator_);
1182 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1183 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1184 SetupRecvRequest<SizeDataHandle<DataHandle> >());
1188template<
class Allocator>
1189template<
bool FORWARD,
class DataHandle>
1190void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1195 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1199 typedef typename DataHandle::DataType DataType;
1201 send_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_)),
1202 recv_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_));
1204 communicateSizes<FORWARD>(handle, recv_trackers);
1206 setupRequests(handle, send_trackers, send_buffers, send_requests,
1207 SetupSendRequest<DataHandle>(), communicator_);
1208 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1209 SetupRecvRequest<DataHandle>(), communicator_);
1212 auto valid_req_func =
1213 [](
const MPI_Request& req) {
return req != MPI_REQUEST_NULL;};
1219 while(no_to_send+no_to_recv)
1223 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1224 send_buffers, communicator_);
1227 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1228 recv_buffers, communicator_);
1232template<
class Allocator>
1233template<
bool FORWARD,
class DataHandle>
1234void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1236 if( interface_->
size() == 0)
1241 if(Impl::callFixedSize(handle))
1242 communicateFixedSize<FORWARD>(handle);
1244 communicateVariableSize<FORWARD>(handle);
Traits classes for mapping types onto MPI_Datatype.
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition variablesizecommunicator.hh:272
Infrastructure for concepts.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition parallel/interface.hh:415
Dune namespace.
Definition alignedallocator.hh:11
static MPI_Datatype getType()
Definition mpitraits.hh:46
size_t size() const
Get the number of entries in the interface.
Definition parallel/interface.hh:106
Communication interface between remote and local indices.
Definition parallel/interface.hh:207
Definition variablesizecommunicator.hh:41
auto require(H &&h) -> decltype(h.fixedSize())
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition variablesizecommunicator.hh:339
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition variablesizecommunicator.hh:411
void backward(DataHandle &handle)
Communicate backwards.
Definition variablesizecommunicator.hh:493
~VariableSizeCommunicator()
Definition variablesizecommunicator.hh:417
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition variablesizecommunicator.hh:400
VariableSizeCommunicator(const VariableSizeCommunicator &other)
Copy-constructs a communicator.
Definition variablesizecommunicator.hh:426
void forward(DataHandle &handle)
Communicate forward.
Definition variablesizecommunicator.hh:468
VariableSizeCommunicator & operator=(const VariableSizeCommunicator &other)
Copy-assignes a communicator.
Definition variablesizecommunicator.hh:436
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename std::allocator_traits< Allocator >::template rebind_alloc< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > > > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition variablesizecommunicator.hh:347
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition variablesizecommunicator.hh:356
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition variablesizecommunicator.hh:365
Provides classes for building the communication interface between remote indices.