1#ifndef DUNE_FEM_MPIMANAGER_HH 
    2#define DUNE_FEM_MPIMANAGER_HH 
    4#if defined _OPENMP || defined(USE_PTHREADS) 
    5#ifndef USE_SMP_PARALLEL 
    6#define USE_SMP_PARALLEL 
   11#include <condition_variable> 
   15#include <shared_mutex> 
   22#include <dune/fem/misc/petsc/petsccommon.hh> 
   25#include <dune/fem/storage/singleton.hh> 
   48      void message(
const std::string &msg) { msg_ = msg; }
 
   49      const char* what() 
const noexcept override { 
return msg_.c_str(); }
 
   51      void message(
const std::string &msg) {}
 
   52      const char* what() 
const noexcept override 
   54        return "SingleThreadModeError: remove -DNDEBUG to obtain a more detailed message!";
 
   63      static inline unsigned int getEnvNumberThreads (
unsigned int defaultValue)
 
   65#ifdef USE_SMP_PARALLEL 
   66        unsigned int maxThreads = defaultValue;
 
   68        const char* mThreads = std::getenv(
"DUNE_NUM_THREADS");
 
   70          maxThreads = std::max( 
int(1), atoi( mThreads ) );
 
   73          const char* mThreads = std::getenv(
"OMP_NUM_THREADS");
 
   75            maxThreads = std::max( 
int(1), atoi( mThreads ) );
 
   78        unsigned int maxThreads = 1;
 
   86        static const bool useStdThreads = true ;
 
   87        static_assert( useStdThreads, 
"useStdThreads is disabled but OpenMP has not been found!");
 
   90        static const bool useStdThreads = false ;
 
   99        std::vector<std::thread> threads_;
 
  100        mutable std::unordered_map<std::thread::id,int> numbers_; 
 
  101        std::condition_variable_any waitA_;
 
  102        std::shared_mutex lockA_;
 
  103        std::condition_variable_any waitB_;
 
  104        std::shared_mutex lockB_;
 
  107        std::function<void(
void)> run_;
 
  113        static int& threadNumber_()
 
  115          static thread_local int number = -1;
 
  123          ThreadPool::threadNumber_() = t;
 
  125          std::shared_lock<std::shared_mutex> lkA(lockA_);
 
  126          std::shared_lock<std::shared_mutex> lkB(lockB_);
 
  133            while (!run_ && !finalized_)
 
  136            if (finalized_) 
break;
 
  137            ThreadPool::threadNumber_() = t;
 
  138            numbers_[std::this_thread::get_id()] = t;
 
  153        template<
typename F, 
typename... Args>
 
  154        void runOpenMP(F&& f, Args&&... args)
 
  157          const int nThreads = numThreads();
 
  164          std::atomic< bool > singleThreadModeError( 
false );
 
  166          initMultiThreadMode();
 
  167#pragma omp parallel num_threads(nThreads) 
  170            threadNumber_() = omp_get_thread_num();
 
  181              singleThreadModeError = true ;
 
  187          initSingleThreadMode();
 
  190          if( singleThreadModeError )
 
  192            DUNE_THROW(SingleThreadModeError, 
"ThreadPool::run: single thread mode violation occurred!");
 
  199        ThreadPool( 
const ThreadPool& ) = 
delete;
 
  205        : maxThreads_( 
std::
max(1u, detail::getEnvNumberThreads( 
std::thread::hardware_concurrency() )) )
 
  206        , numThreads_( detail::getEnvNumberThreads(1) )
 
  213          ThreadPool::threadNumber_() = 0;
 
  214#ifdef USE_SMP_PARALLEL 
  215          if constexpr( useStdThreads )
 
  217            numbers_[std::this_thread::get_id()] = 0;
 
  218            for (
int t=1;t<maxThreads_;++t)
 
  220              threads_.push_back( std::thread( [
this,t]() { wait(t); } ) );
 
  221              numbers_[threads_[t-1].get_id()] = t;
 
  228#ifdef USE_SMP_PARALLEL 
  229          if constexpr( useStdThreads )
 
  233              std::unique_lock<std::shared_mutex> lk(lockA_);
 
  238            std::for_each(threads_.begin(),threads_.end(), std::mem_fn(&std::thread::join));
 
  244        template<
typename F, 
typename... Args>
 
  245        void run(F&& f, Args&&... args)
 
  251          Dune::Fem::detail::SingletonStorage::getStorage();
 
  252          if (!singleThreadMode())
 
  253            DUNE_THROW(InvalidStateException, 
"ThreadPool: run is running run");
 
  254          if constexpr(! useStdThreads )
 
  256            runOpenMP(f, args...);
 
  259          if ( numThreads_==1 )
 
  264            numbers_[std::this_thread::get_id()] = 0;
 
  266            initMultiThreadMode();
 
  267            std::atomic<bool> caughtException(
false);
 
  271              std::lock_guard<std::shared_mutex> lkA(lockA_);
 
  274                    catch (
const SingleThreadModeError& e )
 
  275                    { caughtException = 
true; }
 
  281            ThreadPool::threadNumber_() = 0;
 
  286              std::lock_guard<std::shared_mutex> lkB(lockB_);
 
  293            initSingleThreadMode();
 
  294            if( caughtException )
 
  295              DUNE_THROW(SingleThreadModeError, 
"ThreadPool::run: single thread mode violation occurred!");
 
  299        int numThreads()
 const { 
return numThreads_; }
 
  300        int maxThreads()
 const { 
return maxThreads_; }
 
  306          int t = ThreadPool::threadNumber_();
 
  315          if constexpr(! useStdThreads )
 
  316            return omp_get_thread_num();
 
  319          return numbers_[std::this_thread::get_id()];
 
  326        [[deprecated(
"Use method thread instead!")]]
 
  327        int threadNumber()
 const { 
return thread(); }
 
  329        void initSingleThreadMode() { activeThreads_ = 1; }
 
  330        void initMultiThreadMode() { activeThreads_ = numThreads_; }
 
  331        bool singleThreadMode()
 const { 
return activeThreads_ == 1; }
 
  332        void setNumThreads( 
int use )
 
  334          if ( !singleThreadMode() )
 
  335            DUNE_THROW(SingleThreadModeError, 
"ThreadPool: number of threads can only be changed in single thread mode!");
 
  336          if ( use > maxThreads_ )
 
  338            std::cout << 
"Warning: requesting more threads than available." 
  339                      << 
" Maximum number of threads was restricted to " << maxThreads_
 
  340                      << 
" at startup. Setting to maximum number instead.\n";
 
  346        bool isMainThread()
 const { 
return thread() == 0; }
 
  357      typedef detail::ThreadPool ThreadPoolType;
 
  359      static MPIManager &instance ()
 
  364      static bool mpiFinalized ()
 
  366        bool finalized = false ;
 
  370          int wasFinalized = -1;
 
  371          MPI_Finalized( &wasFinalized );
 
  372          finalized = bool( wasFinalized );
 
  387        if( ! mpiFinalized() )
 
  390          if( petscWasInitializedHere_ )
 
  391            ::Dune::Petsc::finalize();
 
  395          if( wasInitializedHere_ )
 
  404      static void finalize()
 
  406        instance()._finalize();
 
  409      static void initialize ( 
int &argc, 
char **&argv );
 
  411      static const Communication &comm ()
 
  413        const std::unique_ptr< Communication > &comm = instance().comm_;
 
  415          DUNE_THROW( InvalidStateException, 
"MPIManager has not been initialized." );
 
  421        return comm().rank();
 
  426        return comm().size();
 
  430      static const ThreadPoolType& threadPool() { 
return instance().pool_; }
 
  433      static inline void initSingleThreadMode() { instance().pool_.initSingleThreadMode(); }
 
  436      static inline void initMultiThreadMode() { instance().pool_.initMultiThreadMode(); }
 
  439      static int maxThreads() { 
return instance().pool_.maxThreads(); }
 
  442      static int numThreads() { 
return instance().pool_.numThreads(); }
 
  445      static int thread() { 
return instance().pool_.thread(); }
 
  448      static bool isMainThread() { 
return instance().pool_.isMainThread(); }
 
  450      [[deprecated(
"use isMainThread() instead!")]]
 
  451      static bool isMaster() { 
return isMainThread(); }
 
  454      static void setNumThreads( 
int use ) { instance().pool_.setNumThreads(use); }
 
  457      static bool singleThreadMode() { 
return instance().pool_.singleThreadMode(); }
 
  460      template<
typename F, 
typename... Args>
 
  461      static void run(F&& f, Args&&... args) { instance().pool_.run(f,args...); }
 
  464      MPIHelper *helper_ = 
nullptr;
 
  465      std::unique_ptr< Communication > comm_;
 
  466      bool wasInitializedHere_ = false ;
 
  468      bool petscWasInitializedHere_ = false ;
 
  470      ThreadPoolType pool_;
 
  473    using ThreadManager = MPIManager;
 
  477    using ThreadPool = MPIManager;
 
  483#include <dune/fem/quadrature/caching/registry.hh> 
  489    class QuadratureStorageRegistry;
 
  491    inline void MPIManager::initialize ( 
int &argc, 
char **&argv )
 
  493      MPIHelper *&helper = instance().helper_;
 
  494      std::unique_ptr< Communication > &comm = instance().comm_;
 
  499      int wasInitialized = -1;
 
  500      MPI_Initialized( &wasInitialized );
 
  503#ifndef USE_SMP_PARALLEL 
  508          int is_initialized = MPI_Init(&argc, &argv);
 
  509          if( is_initialized != MPI_SUCCESS )
 
  510            DUNE_THROW(InvalidStateException,
"MPI_Init failed!");
 
  516          int is_initialized = MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
 
  518          if( is_initialized != MPI_SUCCESS )
 
  519            DUNE_THROW(InvalidStateException,
"MPI_Init_thread failed!");
 
  521#if not defined NDEBUG && defined DUNE_DEVEL_MODE 
  525          if( provided != MPI_THREAD_FUNNELED )
 
  527            if( provided == MPI_THREAD_SINGLE )
 
  528              dwarn << 
"MPI thread support = single (instead of funneled)!" << std::endl;
 
  530              dwarn << 
"WARNING: MPI thread support = " << provided << 
" != MPI_THREAD_FUNNELED " << MPI_THREAD_FUNNELED << std::endl;
 
  535        instance().wasInitializedHere_ = 
true;
 
  547      comm.reset( 
new Communication( helper->getCommunicator() ) );
 
  552      instance().petscWasInitializedHere_ =
 
  553        ::Dune::Petsc::initialize( rank() == 0, argc, argv );
 
  557      QuadratureStorageRegistry::initialize();
 
Collective communication interface and sequential default implementation.
Definition: communication.hh:100
 
Exception thrown when a code segment that is supposed to be only accessed in single thread mode is ac...
Definition: mpimanager.hh:43
 
static DUNE_EXPORT Object & instance(Args &&... args)
return singleton instance of given Object type.
Definition: singleton.hh:123
 
static DUNE_EXPORT MPIHelper & instance(int &argc, char **&argv)
Get the singleton instance of the helper.
Definition: mpihelper.hh:228
 
#define DUNE_THROW(E,...)
Definition: exceptions.hh:314
 
constexpr auto max
Function object that returns the greater of the given values.
Definition: hybridutilities.hh:485
 
DWarnType dwarn(std::cerr)
Stream for warnings indicating problems.
Definition: stdstreams.hh:162
 
Implements an utility class that provides MPI's collective communication methods.
 
Helpers for dealing with MPI.
 
Dune namespace.
Definition: alignedallocator.hh:13
 
constexpr std::integral_constant< std::size_t, sizeof...(II)> size(std::integer_sequence< T, II... >)
Return the size of the sequence.
Definition: integersequence.hh:75