6 #ifndef DMLC_THREAD_GROUP_H_
7 #define DMLC_THREAD_GROUP_H_
11 #include <dmlc/logging.h>
18 #include <unordered_set>
19 #include <unordered_map>
20 #if defined(DMLC_USE_CXX14) || __cplusplus > 201103L
21 #include <shared_mutex>
23 #include <condition_variable>
26 #include <sys/syscall.h>
43 std::unique_lock<std::mutex>
lock(mutex_);
45 condition_variable_.wait(
lock);
54 std::unique_lock<std::mutex> lk(mutex_);
55 condition_variable_.notify_all();
62 std::unique_lock<std::mutex> lk(mutex_);
70 std::condition_variable condition_variable_;
72 std::atomic<bool> signaled_;
75 #if defined(DMLC_USE_CXX14) || __cplusplus > 201103L
79 using WriteLock = std::unique_lock<SharedMutex>;
81 using ReadLock = std::shared_lock<SharedMutex>;
88 using ReadLock = std::unique_lock<SharedMutex>;
113 : name_(
std::move(threadName))
118 , shutdown_requested_(false)
119 , auto_remove_(false) {
120 CHECK_NOTNULL(owner);
133 if (thread_.load()) {
134 std::thread *thrd = thread_.load();
150 return name_.c_str();
167 template<
typename StartFunction,
typename ...Args>
168 static bool launch(std::shared_ptr<Thread> pThis,
170 StartFunction start_function,
179 return thread_.load() ? (thread_.load()->get_id() == std::this_thread::get_id()) :
false;
188 shutdown_requested_ =
true;
198 return shutdown_requested_.load();
217 auto_remove_ =
false;
225 if (thread_.load()) {
226 CHECK_EQ(auto_remove_,
false);
228 return thread_.load()->joinable();
238 internal_join(
false);
246 return thread_.load()->get_id();
254 void internal_join(
bool auto_remove_ok) {
258 if (thread_.load() && thread_.load()->get_id() != std::thread::id()) {
259 std::thread::id someId;
260 if (!auto_remove_ok) {
261 CHECK_EQ(auto_remove_,
false);
263 CHECK_NOTNULL(thread_.load());
264 if (thread_.load()->joinable()) {
265 thread_.load()->join();
267 LOG(WARNING) <<
"Thread " << name_ <<
" ( "
268 << thread_.load()->get_id() <<
" ) not joinable";
282 template <
typename StartFunction,
typename ...Args>
283 static int entry_and_exit_f(std::shared_ptr<Thread> pThis,
284 StartFunction start_function,
291 std::atomic<std::thread *> thread_;
293 std::shared_ptr<ManualEvent> ready_event_;
295 std::shared_ptr<ManualEvent> start_event_;
299 std::atomic<bool> shutdown_requested_;
304 std::atomic<bool> auto_remove_;
331 std::thread::id
id = std::this_thread::get_id();
333 for (
auto it = threads_.begin(), end = threads_.end(); it != end; ++it) {
334 std::shared_ptr<Thread> thrd = *it;
335 if (thrd->get_id() ==
id)
348 std::thread::id
id = thrd->get_id();
350 for (
auto it = threads_.begin(), end = threads_.end(); it != end; ++it) {
351 std::shared_ptr<Thread> thrd = *it;
352 if (thrd->get_id() ==
id)
369 auto iter = name_to_thread_.find(thrd->name());
370 if (iter == name_to_thread_.end()) {
371 name_to_thread_.emplace(std::make_pair(thrd->name(), thrd));
372 CHECK_EQ(threads_.insert(thrd).second,
true);
388 auto iter = threads_.find(thrd);
389 if (iter != threads_.end()) {
390 name_to_thread_.erase(thrd->name());
391 threads_.erase(iter);
392 if (threads_.empty()) {
409 std::unique_lock<std::mutex> lk(join_all_mtx_);
410 std::unordered_set<std::shared_ptr<Thread>> working_set;
413 for (
auto iter = threads_.begin(), e_iter = threads_.end(); iter != e_iter; ++iter) {
414 if (!(*iter)->is_auto_remove()) {
415 working_set.emplace(*iter);
421 while (!working_set.empty()) {
422 std::shared_ptr<Thread> thrd;
423 thrd = *working_set.begin();
424 if (thrd->joinable()) {
428 working_set.erase(working_set.begin());
434 CHECK_EQ(threads_.size(), 0);
442 std::unique_lock<std::mutex> lk(join_all_mtx_);
444 for (
auto &thread : threads_) {
445 if (make_all_joinable) {
446 thread->make_joinable();
448 thread->request_shutdown();
458 return threads_.size();
467 return threads_.size() == 0;
485 template<
typename StartFunction,
typename ThreadType = Thread,
typename ...Args>
486 inline bool create(
const std::string &threadName,
488 StartFunction start_function,
490 typename ThreadType::SharedPtr newThread(
new ThreadType(threadName,
this));
491 return Thread::launch(newThread, auto_remove, start_function, args...);
501 auto iter = name_to_thread_.find(name);
502 if (iter != name_to_thread_.end()) {
512 mutable std::mutex join_all_mtx_;
514 std::unordered_set<std::shared_ptr<Thread>> threads_;
516 std::shared_ptr<ManualEvent> evEmpty_;
518 std::unordered_map<std::string, std::shared_ptr<Thread>> name_to_thread_;
527 template<
typename ObjectType, ObjectType quit_item>
540 std::thread *thrd =
nullptr)
542 , shutdown_in_progress_(false) {
563 shutdown_in_progress_ =
true;
565 std::this_thread::sleep_for(std::chrono::milliseconds(1));
568 queue_->enqueue(quit_item);
576 if (!shutdown_in_progress_) {
577 queue_->enqueue(item);
597 template<
typename SecondaryFunction>
599 SecondaryFunction secondary_function) {
601 SecondaryFunction secondary_function) {
602 return pThis->run(secondary_function);
604 pThis, secondary_function);
613 template<
typename OnItemFunction>
614 inline int run(OnItemFunction on_item_function) {
618 queue_->wait_dequeue(item);
619 if (item == quit_item) {
622 rc = on_item_function(item);
632 std::shared_ptr<dmlc::moodycamel::BlockingConcurrentQueue<ObjectType>> queue_ =
633 std::make_shared<dmlc::moodycamel::BlockingConcurrentQueue<ObjectType>>();
635 std::atomic<bool> shutdown_in_progress_;
642 template<
typename Duration>
673 template<
typename SecondaryFunction>
675 SecondaryFunction secondary_function) {
677 SecondaryFunction secondary_function) {
678 return pThis->run(secondary_function);
680 pThis, secondary_function);
692 template<
typename Function>
693 static void start(std::shared_ptr<TimerThread> timer_thread,
696 timer_thread->duration_ = duration;
706 template<
typename OnTimerFunction>
707 inline int run(OnTimerFunction on_timer_function) {
710 std::this_thread::sleep_for(duration_);
712 rc = on_timer_function();
725 template <
typename StartFunction,
typename ...Args>
726 inline int ThreadGroup::Thread::entry_and_exit_f(std::shared_ptr<Thread> pThis,
727 StartFunction start_function,
732 pThis->ready_event_->signal();
734 pThis->start_event_->wait();
736 pThis->start_event_->reset();
738 if (!pThis->is_shutdown_requested()) {
739 rc = start_function(args...);
744 if (pThis->is_auto_remove()) {
745 pThis->owner_->remove_thread(pThis);
750 LOG(ERROR) <<
"Null pThis thread pointer";
756 template<
typename StartFunction,
typename ...Args>
759 StartFunction start_function,
762 CHECK_EQ(!pThis->thread_.load(),
true);
763 CHECK_NOTNULL(pThis->owner_);
765 pThis->auto_remove_ = autoRemove;
767 pThis->thread_ =
new std::thread(Thread::template entry_and_exit_f<
768 StartFunction, Args...>,
774 if (!pThis->owner_->add_thread(pThis)) {
775 pThis->request_shutdown();
776 LOG(ERROR) <<
"Duplicate thread name within the same thread group is not allowed";
779 pThis->ready_event_->wait();
781 pThis->start_event_->signal();
783 return pThis->thread_.load() !=
nullptr;
796 template<
typename Duration,
typename TimerFunction>
798 const Duration& duration,
800 TimerFunction timer_function) {
801 std::shared_ptr<dmlc::TimerThread<Duration>> timer_thread =
802 std::make_shared<dmlc::TimerThread<Duration>>(timer_name, owner);
804 return timer_thread !=
nullptr;
808 #endif // DMLC_THREAD_GROUP_H_