6 #ifndef DMLC_THREAD_GROUP_H_ 7 #define DMLC_THREAD_GROUP_H_ 11 #include <dmlc/logging.h> 18 #include <unordered_set> 19 #include <unordered_map> 20 #if defined(DMLC_USE_CXX14) || __cplusplus > 201103L 21 #include <shared_mutex> 23 #include <condition_variable> 26 #include <sys/syscall.h> 43 std::unique_lock<std::mutex> lock(mutex_);
45 condition_variable_.wait(lock);
54 std::unique_lock<std::mutex> lk(mutex_);
55 condition_variable_.notify_all();
62 std::unique_lock<std::mutex> lk(mutex_);
70 std::condition_variable condition_variable_;
72 std::atomic<bool> signaled_;
75 #if defined(DMLC_USE_CXX14) || __cplusplus > 201103L 79 using WriteLock = std::unique_lock<SharedMutex>;
81 using ReadLock = std::shared_lock<SharedMutex>;
88 using ReadLock = std::unique_lock<SharedMutex>;
113 : name_(
std::move(threadName))
118 , shutdown_requested_(false)
119 , auto_remove_(false) {
120 CHECK_NOTNULL(owner);
127 const bool self_delete = is_current_thread();
133 if (thread_.load()) {
134 std::thread *thrd = thread_.load();
150 return name_.c_str();
167 template<
typename StartFunction,
typename ...Args>
168 static bool launch(std::shared_ptr<Thread> pThis,
170 StartFunction start_function,
179 return thread_.load() ? (thread_.load()->get_id() == std::this_thread::get_id()) :
false;
188 shutdown_requested_ =
true;
198 return shutdown_requested_.load();
217 auto_remove_ =
false;
225 if (thread_.load()) {
226 CHECK_EQ(auto_remove_,
false);
228 return thread_.load()->joinable();
238 internal_join(
false);
246 return thread_.load()->get_id();
254 void internal_join(
bool auto_remove_ok) {
258 if (thread_.load() && thread_.load()->get_id() != std::thread::id()) {
259 std::thread::id someId;
260 if (!auto_remove_ok) {
261 CHECK_EQ(auto_remove_,
false);
263 CHECK_NOTNULL(thread_.load());
264 if (thread_.load()->joinable()) {
265 thread_.load()->join();
267 LOG(WARNING) <<
"Thread " << name_ <<
" ( " 268 << thread_.load()->get_id() <<
" ) not joinable";
282 template <
typename StartFunction,
typename ...Args>
283 static int entry_and_exit_f(std::shared_ptr<Thread> pThis,
284 StartFunction start_function,
291 std::atomic<std::thread *> thread_;
293 std::shared_ptr<ManualEvent> ready_event_;
295 std::shared_ptr<ManualEvent> start_event_;
299 std::atomic<bool> shutdown_requested_;
304 std::atomic<bool> auto_remove_;
320 request_shutdown_all();
331 std::thread::id
id = std::this_thread::get_id();
333 for (
auto it = threads_.begin(), end = threads_.end(); it != end; ++it) {
334 std::shared_ptr<Thread> thrd = *it;
335 if (thrd->get_id() == id)
348 std::thread::id
id = thrd->get_id();
350 for (
auto it = threads_.begin(), end = threads_.end(); it != end; ++it) {
351 std::shared_ptr<Thread> thrd = *it;
352 if (thrd->get_id() == id)
369 auto iter = name_to_thread_.find(thrd->name());
370 if (iter == name_to_thread_.end()) {
371 name_to_thread_.emplace(std::make_pair(thrd->name(), thrd));
372 CHECK_EQ(threads_.insert(thrd).second,
true);
388 auto iter = threads_.find(thrd);
389 if (iter != threads_.end()) {
390 name_to_thread_.erase(thrd->name());
391 threads_.erase(iter);
392 if (threads_.empty()) {
407 CHECK_EQ(!is_this_thread_in(),
true);
409 std::unique_lock<std::mutex> lk(join_all_mtx_);
410 std::unordered_set<std::shared_ptr<Thread>> working_set;
413 for (
auto iter = threads_.begin(), e_iter = threads_.end(); iter != e_iter; ++iter) {
414 if (!(*iter)->is_auto_remove()) {
415 working_set.emplace(*iter);
421 while (!working_set.empty()) {
422 std::shared_ptr<Thread> thrd;
423 thrd = *working_set.begin();
424 if (thrd->joinable()) {
428 working_set.erase(working_set.begin());
434 CHECK_EQ(threads_.size(), 0);
442 std::unique_lock<std::mutex> lk(join_all_mtx_);
444 for (
auto &thread : threads_) {
445 if (make_all_joinable) {
446 thread->make_joinable();
448 thread->request_shutdown();
458 return threads_.size();
467 return threads_.size() == 0;
485 template<
typename StartFunction,
typename ThreadType =
Thread,
typename ...Args>
486 inline bool create(
const std::string &threadName,
488 StartFunction start_function,
490 typename ThreadType::SharedPtr newThread(
new ThreadType(threadName,
this));
491 return Thread::launch(newThread, auto_remove, start_function, args...);
501 auto iter = name_to_thread_.find(name);
502 if (iter != name_to_thread_.end()) {
512 mutable std::mutex join_all_mtx_;
514 std::unordered_set<std::shared_ptr<Thread>> threads_;
516 std::shared_ptr<ManualEvent> evEmpty_;
518 std::unordered_map<std::string, std::shared_ptr<Thread>> name_to_thread_;
527 template<
typename ObjectType, ObjectType quit_item>
540 std::thread *thrd =
nullptr)
542 , shutdown_in_progress_(false) {
563 shutdown_in_progress_ =
true;
565 std::this_thread::sleep_for(std::chrono::milliseconds(1));
568 queue_->enqueue(quit_item);
576 if (!shutdown_in_progress_) {
577 queue_->enqueue(item);
597 template<
typename SecondaryFunction>
599 SecondaryFunction secondary_function) {
601 SecondaryFunction secondary_function) {
602 return pThis->run(secondary_function);
604 pThis, secondary_function);
613 template<
typename OnItemFunction>
614 inline int run(OnItemFunction on_item_function) {
618 queue_->wait_dequeue(item);
619 if (item == quit_item) {
622 rc = on_item_function(item);
632 std::shared_ptr<dmlc::moodycamel::BlockingConcurrentQueue<ObjectType>> queue_ =
633 std::make_shared<dmlc::moodycamel::BlockingConcurrentQueue<ObjectType>>();
635 std::atomic<bool> shutdown_in_progress_;
642 template<
typename Duration>
653 : Thread(name, owner) {
673 template<
typename SecondaryFunction>
675 SecondaryFunction secondary_function) {
677 SecondaryFunction secondary_function) {
678 return pThis->run(secondary_function);
680 pThis, secondary_function);
692 template<
typename Function>
693 static void start(std::shared_ptr<TimerThread> timer_thread,
696 timer_thread->duration_ = duration;
697 launch_run(timer_thread,
function);
706 template<
typename OnTimerFunction>
707 inline int run(OnTimerFunction on_timer_function) {
709 while (!is_shutdown_requested()) {
710 std::this_thread::sleep_for(duration_);
711 if (!is_shutdown_requested()) {
712 rc = on_timer_function();
725 template <
typename StartFunction,
typename ...Args>
726 inline int ThreadGroup::Thread::entry_and_exit_f(std::shared_ptr<Thread> pThis,
727 StartFunction start_function,
732 pThis->ready_event_->signal();
734 pThis->start_event_->wait();
736 pThis->start_event_->reset();
738 if (!pThis->is_shutdown_requested()) {
739 rc = start_function(args...);
744 if (pThis->is_auto_remove()) {
745 pThis->owner_->remove_thread(pThis);
750 LOG(ERROR) <<
"Null pThis thread pointer";
756 template<
typename StartFunction,
typename ...Args>
759 StartFunction start_function,
762 CHECK_EQ(!pThis->thread_.load(),
true);
763 CHECK_NOTNULL(pThis->owner_);
765 pThis->auto_remove_ = autoRemove;
767 pThis->thread_ =
new std::thread(Thread::template entry_and_exit_f<
768 StartFunction, Args...>,
774 if (!pThis->owner_->add_thread(pThis)) {
775 pThis->request_shutdown();
776 LOG(ERROR) <<
"Duplicate thread name within the same thread group is not allowed";
779 pThis->ready_event_->wait();
781 pThis->start_event_->signal();
783 return pThis->thread_.load() !=
nullptr;
796 template<
typename Duration,
typename TimerFunction>
798 const Duration& duration,
800 TimerFunction timer_function) {
801 std::shared_ptr<dmlc::TimerThread<Duration>> timer_thread =
802 std::make_shared<dmlc::TimerThread<Duration>>(timer_name, owner);
804 return timer_thread !=
nullptr;
808 #endif // DMLC_THREAD_GROUP_H_ void enqueue(const ObjectType &item)
Enqueue and item.
Definition: thread_group.h:575
static bool launch_run(std::shared_ptr< BQT > pThis, SecondaryFunction secondary_function)
Launch to the 'run' function which will, in turn, call the class' 'run' function, passing it the give...
Definition: thread_group.h:598
std::shared_ptr< Thread > thread_by_name(const std::string &name)
Lookup Thread object by name.
Definition: thread_group.h:499
static bool launch(std::shared_ptr< Thread > pThis, bool autoRemove, StartFunction start_function, Args...args)
Launch the given Thread object.
Definition: thread_group.h:757
BlockingQueueThread(const std::string &name, dmlc::ThreadGroup *owner, std::thread *thrd=nullptr)
Constructor.
Definition: thread_group.h:538
std::thread::id get_id() const
Get this thread's id.
Definition: thread_group.h:245
void join()
Thread join.
Definition: thread_group.h:237
void request_shutdown_all(const bool make_all_joinable=true)
Call request_shutdown() on all threads in this ThreadGroup.
Definition: thread_group.h:441
bool empty() const
Check if the ThreadGroup is empty.
Definition: thread_group.h:465
virtual bool is_shutdown_requested() const
Check whether shutdown has been requested (request_shutdown() was called)
Definition: thread_group.h:197
Thread lifecycle management group.
Definition: thread_group.h:95
ThreadGroup()
Constructor.
Definition: thread_group.h:310
Definition: optional.h:241
std::recursive_mutex SharedMutex
Standard mutex for C++ < 14.
Definition: thread_group.h:84
bool is_auto_remove() const
Check whether the thread is set to auto-remove itself from the ThreadGroup owner when exiting...
Definition: thread_group.h:207
Thread(std::string threadName, ThreadGroup *owner, std::thread *thrd=nullptr)
Constructor.
Definition: thread_group.h:112
void signal()
Set this object's state to signaled (wait() will release or pass through)
Definition: thread_group.h:52
void join_all()
Join all threads in this ThreadGroup.
Definition: thread_group.h:406
bool add_thread(std::shared_ptr< Thread > thrd)
Add a Thread object to this thread group.
Definition: thread_group.h:366
Lifecycle-managed thread (used by ThreadGroup)
Definition: thread_group.h:101
size_t size() const
Return the number of threads in this thread group.
Definition: thread_group.h:456
Blocking queue thread class.
Definition: thread_group.h:528
namespace for dmlc
Definition: array_view.h:12
bool create(const std::string &threadName, bool auto_remove, StartFunction start_function, Args...args)
Create and launch a new Thread object which will be owned by this ThreadGroup.
Definition: thread_group.h:486
void wait()
Wait for the object to become signaled. If the object is already in the signaled state and reset() ha...
Definition: thread_group.h:42
bool CreateTimer(const std::string &timer_name, const Duration &duration, ThreadGroup *owner, TimerFunction timer_function)
Utility function to easily create a timer.
Definition: thread_group.h:797
const char * name() const
Name of the thread.
Definition: thread_group.h:149
void make_joinable()
Make the thread joinable (by removing the auto_remove flag)
Definition: thread_group.h:216
bool remove_thread(std::shared_ptr< Thread > thrd)
Remove a Thread object from this thread group.
Definition: thread_group.h:385
Simple manual-reset event gate which remains open after signalled.
Definition: thread_group.h:34
virtual ~ThreadGroup()
Destructor, perform cleanup. All child threads will be exited when this destructor completes...
Definition: thread_group.h:319
size_t size_approx() const
Get the approximate size of the queue.
Definition: thread_group.h:585
std::unique_lock< SharedMutex > WriteLock
Standard unique lock for C++ < 14.
Definition: thread_group.h:86
std::shared_ptr< Thread > SharedPtr
Shared pointer type for readability.
Definition: thread_group.h:104
Managed timer thread.
Definition: thread_group.h:643
void request_shutdown() override
Signal the thread that a shutdown is desired.
Definition: thread_group.h:562
static bool launch_run(std::shared_ptr< TimerThread< Duration >> pThis, SecondaryFunction secondary_function)
Launch to the 'run' function which will, in turn, call the class' 'run' function, passing it the give...
Definition: thread_group.h:674
std::unique_lock< SharedMutex > ReadLock
Standard unique lock for C++ < 14.
Definition: thread_group.h:88
virtual ~Thread()
Destructor with cleanup.
Definition: thread_group.h:126
bool is_thread_in(std::shared_ptr< Thread > thrd) const
Check if the current thread is a member of this ThreadGroup.
Definition: thread_group.h:346
int run(OnTimerFunction on_timer_function)
Internal timer execution function.
Definition: thread_group.h:707
TimerThread(const std::string &name, ThreadGroup *owner)
Constructor.
Definition: thread_group.h:652
bool is_current_thread() const
Check if this class represents the currently running thread (self)
Definition: thread_group.h:177
int run(OnItemFunction on_item_function)
Thread's main queue processing function.
Definition: thread_group.h:614
bool joinable() const
Check whether the thread is joinable.
Definition: thread_group.h:224
virtual void request_shutdown()
Signal to this thread that a thread shutdown/exit is requested.
Definition: thread_group.h:187
ManualEvent()
Definition: thread_group.h:36
bool is_this_thread_in() const
Check if the current thread a member if this ThreadGroup.
Definition: thread_group.h:330
void reset()
Manually reset this object's state to unsignaled (wait() will block)
Definition: thread_group.h:61
~TimerThread() override
Destructor.
Definition: thread_group.h:659
~BlockingQueueThread() override
Destructor.
Definition: thread_group.h:549
static void start(std::shared_ptr< TimerThread > timer_thread, Duration duration, Function function)
Start a given timer thread.
Definition: thread_group.h:693