source: rrlib_thread/tThread.cpp @ 3:5a7d8d527ebd

Last change on this file since 3:5a7d8d527ebd was 3:5a7d8d527ebd, checked in by Tobias Föhst <foehst@…>, 7 years ago

Adapted to rrlib_util -> rrlib_design_patterns split

File size: 14.4 KB
Line 
1//
2// You received this file as part of RRLib
3// Robotics Research Library
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or
8// modify it under the terms of the GNU General Public License
9// as published by the Free Software Foundation; either version 2
10// of the License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License
18// along with this program; if not, write to the Free Software
19// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
20//
21//----------------------------------------------------------------------
22/*!\file    rrlib/thread/tThread.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2012-07-05
27 *
28 */
29//----------------------------------------------------------------------
30#include "rrlib/thread/tThread.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include <sstream>
36#include <cstring>
37#include "rrlib/design_patterns/singleton.h"
38
39//----------------------------------------------------------------------
40// Internal includes with ""
41//----------------------------------------------------------------------
42
43//----------------------------------------------------------------------
44// Debugging
45//----------------------------------------------------------------------
46#include <cassert>
47
48//----------------------------------------------------------------------
49// Namespace usage
50//----------------------------------------------------------------------
51using namespace rrlib::logging;
52
53//----------------------------------------------------------------------
54// Namespace declaration
55//----------------------------------------------------------------------
56namespace rrlib
57{
58namespace thread
59{
60
61//----------------------------------------------------------------------
62// Forward declarations / typedefs / enums
63//----------------------------------------------------------------------
64
65//----------------------------------------------------------------------
66// Const values
67//----------------------------------------------------------------------
68
69//----------------------------------------------------------------------
70// Implementation
71//----------------------------------------------------------------------
72
73__thread tThread* tThread::cur_thread = NULL;
74
75
76namespace internal
77{
78class tThreadCleanup
79{
80public:
81  static void Cleanup(tThread* t)
82  {
83    tLock l(*t);
84    t->state = tThread::tState::TERMINATED;
85    //t->curThread.reset();
86    l.Unlock();
87    t->self.reset(); // possibly delete thread - important that it's last statement
88  }
89};
90
91/*!
92 * Class for thread (self) deletion
93 */
94class tThreadDeleter
95{
96public:
97  //! for (self) deletion
98  void operator()(tThread* t)
99  {
100    if (t->GetDeleteOnCompletion())
101    {
102      delete t;
103    }
104  }
105};
106
107static std::string GetDefaultThreadName(int64_t id)
108{
109  std::ostringstream oss;
110  oss << "Thread-" << id;
111  return oss.str();
112}
113
114template <typename T>
115struct CreateCurThreadLocal
116{
117  static T* Create()
118  {
119    return new T(tThreadCleanup::Cleanup);
120  }
121  static void Destroy(T* object)
122  {
123    delete object;
124  }
125};
126
127/*! Thread counter - used for generating names */
128static std::atomic<int>& GetThreadCounter()
129{
130  static std::atomic<int> thread_counter;
131  return thread_counter;
132}
133
134/*! List of threads currently known and running (== all thread objects created) */
135static std::shared_ptr<internal::tVectorWithMutex<std::weak_ptr<tThread>>>& GetThreadList()
136{
137  static std::shared_ptr<internal::tVectorWithMutex<std::weak_ptr<tThread>>> thread_list(new internal::tVectorWithMutex<std::weak_ptr<tThread>>(0x7FFFFFFF));
138  return thread_list;
139}
140
141} // namespace internal
142
143tThread::tThread(bool anonymous, bool legion) :
144  stop_signal(false),
145  lock_stack(),
146  id(internal::GetThreadCounter()++),
147  name(internal::GetDefaultThreadName(id)),
148  priority(cDEFAULT_PRIORITY),
149  state(tState::RUNNING),
150  self(this, internal::tThreadDeleter()),
151  delete_on_completion(true),
152  start_signal(false),
153  monitor(*this),
154  thread_list_ref(internal::GetThreadList()),
155  locked_objects(),
156  longevity(0),
157  unknown_thread(true),
158  wrapped_thread(),
159  handle(pthread_self()),
160  joining_threads(0)
161{
162  AddToThreadList();
163
164  // see if we can obtain a thread name
165  char name_buffer[1024];
166  if (!pthread_getname_np(handle, name_buffer, 1023))
167  {
168    name_buffer[1023] = 0;
169    if (strlen(name_buffer) > 0)
170    {
171      name = name_buffer;
172    }
173  }
174}
175
176tThread::tThread(const std::string& name) :
177  stop_signal(false),
178  lock_stack(),
179  id(internal::GetThreadCounter()++),
180  name(name.length() > 0 ? name : internal::GetDefaultThreadName(id)),
181  priority(cDEFAULT_PRIORITY),
182  state(tState::NEW),
183  self(this, internal::tThreadDeleter()),
184  delete_on_completion(false),
185  start_signal(false),
186  monitor(*this),
187  thread_list_ref(internal::GetThreadList()),
188  locked_objects(),
189  longevity(0),
190  unknown_thread(false),
191  wrapped_thread(&Launch, this),
192  handle(wrapped_thread.native_handle()),
193  joining_threads(0)
194{
195  AddToThreadList();
196  SetName(this->name);
197}
198
199tThread::~tThread()
200{
201  RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_1, "Deleting thread ", this);
202
203  // remove from thread list
204  tLock lock(thread_list_ref->obj_mutex);
205  assert(this != NULL);
206
207  // remove thread from list
208  for (size_t i = 0; i < thread_list_ref->vec.size(); i++)
209  {
210    std::shared_ptr<tThread> t = thread_list_ref->vec[i].lock();
211    if (t.get() == this)
212    {
213      thread_list_ref->vec.erase(thread_list_ref->vec.begin() + i);
214      break;
215    }
216    if (t.get() == NULL)   // remove empty entries
217    {
218      thread_list_ref->vec.erase(thread_list_ref->vec.begin() + i);
219      i--;
220    }
221  }
222
223  lock.Unlock();
224  if (!unknown_thread)
225  {
226    if (tThread::CurrentThreadRaw() != this)
227    {
228      Join(); // we shouldn't delete anything while thread is still running
229    }
230    else if (wrapped_thread.joinable())
231    {
232      wrapped_thread.detach();
233    }
234  }
235
236  for (auto rit = locked_objects.rbegin(); rit < locked_objects.rend(); ++rit)
237  {
238    (*rit).reset();
239  }
240}
241
242void tThread::AddToThreadList()
243{
244  tLock lock(thread_list_ref->obj_mutex);
245  thread_list_ref->vec.push_back(self);
246
247  //printf("Creating thread %p %s\n", this, getName().getCString());
248  RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_1, "Creating thread ", this);
249}
250
251typedef rrlib::design_patterns::tSingletonHolder<boost::thread_specific_ptr<tThread>, rrlib::design_patterns::singleton::Longevity, internal::CreateCurThreadLocal> tCurThreadLocal;
252static inline unsigned int GetLongevity(boost::thread_specific_ptr<tThread>*)
253{
254  return 0xFCCCCCCC; // should exit before allocation register
255}
256
257boost::thread_specific_ptr<tThread>& tThread::GetCurThreadLocal()
258{
259  return tCurThreadLocal::Instance();
260}
261
262std::string tThread::GetLogDescription() const
263{
264  std::ostringstream oss;
265  oss << "Thread " << id << " '" << GetName() << "'";
266  return oss.str();
267}
268
269void tThread::Join()
270{
271  if (unknown_thread)
272  {
273    RRLIB_LOG_PRINT(rrlib::logging::eLL_WARNING, "Operation not supported for threads of unknown origin.");
274    return;
275  }
276  if (!wrapped_thread.joinable())
277  {
278    return;
279  }
280  if (CurrentThreadRaw() == this)
281  {
282    RRLIB_LOG_PRINT(rrlib::logging::eLL_DEBUG_WARNING, "Thread cannot join itself");
283    return;
284  }
285  PreJoin();
286  RRLIB_LOG_PRINT(rrlib::logging::eLL_DEBUG_VERBOSE_1, "Joining Thread");
287
288  int joining = joining_threads.fetch_add(1);
289  if (joining >= 1)
290  {
291    RRLIB_LOG_PRINT(rrlib::logging::eLL_DEBUG_WARNING, "Multiple threads are trying to join. Returning this thread without joining.");
292    return;
293  }
294  if (wrapped_thread.joinable())
295  {
296    wrapped_thread.join();
297  }
298  RRLIB_LOG_PRINT(rrlib::logging::eLL_DEBUG_VERBOSE_1, "Joined Thread");
299}
300
301void tThread::Launch(tThread* thread_ptr)
302{
303  thread_ptr->Launcher();
304}
305
306void tThread::Launcher()
307{
308  //unsafe _FINROC_LOG_MESSAGE(eLL_DEBUG_VERBOSE_2, logDomain) << "Entering";
309  cur_thread = this;
310  tLock l(*this);
311  state = tState::PREPARE_RUNNING;
312  //unsafe _FINROC_LOG_MESSAGE(eLL_DEBUG_VERBOSE_2, logDomain) << "Locked";
313  //curThread = threadPtr;
314  GetCurThreadLocal().reset(this);
315  //unsafe _FINROC_LOG_MESSAGE(eLL_DEBUG_VERBOSE_2, logDomain) << "ThreadLocal set";
316
317  // wait for start signal
318  while ((!(start_signal)) && (!(stop_signal)))
319  {
320    monitor.Wait(l);
321  }
322
323  // run thread?
324  state = tState::RUNNING;
325  if (start_signal && (!(stop_signal)))
326  {
327    try
328    {
329      l.Unlock();
330      RRLIB_LOG_PRINT(eLL_DEBUG, "Thread started");
331      Run();
332      RRLIB_LOG_PRINT(eLL_DEBUG, "Thread exited normally");
333    }
334    catch (const std::exception& e)
335    {
336      RRLIB_LOG_PRINT(eLL_ERROR, "Thread exited because of exception: ", e.what());
337    }
338  }
339
340  RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_2, "Exiting");
341
342  l.Unlock();
343  // Do this BEFORE thread is removed from list - to ensure this is done StopThreads
344  try
345  {
346    for (auto rit = locked_objects.rbegin(); rit < locked_objects.rend(); ++rit)
347    {
348      (*rit).reset();
349    }
350  }
351  catch (const std::exception& e)
352  {
353    RRLIB_LOG_PRINT(eLL_ERROR, "Thread encountered exception during cleanup: ", e.what());
354  }
355}
356
357void tThread::LockObject(std::shared_ptr<void> obj)
358{
359  tLock(*this);
360  locked_objects.push_back(obj);
361}
362
363void tThread::PreJoin()
364{
365  RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_2, "Entering");
366  tLock l(*this);
367  RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_2, "Locked");
368  if (state == tState::PREPARE_RUNNING || state == tState::NEW)
369  {
370    RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_2, "Notifying");
371    stop_signal = true;
372    monitor.Notify(l);
373    RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_2, "Notified");
374  }
375  RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_2, "Leaving");
376}
377
378void tThread::SetName(const std::string& name)
379{
380  this->name = name;
381  pthread_setname_np(handle, name.substr(0, 15).c_str());
382}
383
384void tThread::SetRealtime()
385{
386  struct sched_param param;
387  param.sched_priority = 49;
388  if (pthread_setschedparam(handle, SCHED_FIFO, &param))
389  {
390    //printf("Failed making thread a real-time thread. Possibly current user has insufficient rights.\n");
391    RRLIB_LOG_PRINT(eLL_ERROR, "Failed making thread a real-time thread. Possibly current user has insufficient rights.");
392  }
393}
394
395
396void tThread::Sleep(const rrlib::time::tDuration& sleep_for, bool use_application_time, rrlib::time::tTimestamp wait_until)
397{
398  rrlib::time::tTimeMode time_mode = rrlib::time::GetTimeMode();
399  tThread& t = *CurrentThreadRaw();
400  if (time_mode == rrlib::time::tTimeMode::SYSTEM_TIME || (!use_application_time))
401  {
402    if (sleep_for <= std::chrono::milliseconds(500))
403    {
404      std::this_thread::sleep_for(sleep_for);
405    }
406    else
407    {
408      tLock l(t);
409      t.monitor.Wait(l, sleep_for, use_application_time, wait_until);
410    }
411  }
412  else if (time_mode == rrlib::time::tTimeMode::CUSTOM_CLOCK)
413  {
414    tLock l(t);
415    t.monitor.Wait(l, sleep_for, use_application_time, wait_until);
416  }
417  else
418  {
419    assert(time_mode == rrlib::time::tTimeMode::STRETCHED_SYSTEM_TIME);
420    tLock l(t);
421    rrlib::time::tDuration system_duration = rrlib::time::ToSystemDuration(sleep_for);
422    if (system_duration > std::chrono::milliseconds(20))
423    {
424      t.monitor.Wait(l, system_duration, use_application_time, wait_until);
425    }
426    else
427    {
428      l.Unlock();
429      std::this_thread::sleep_for(system_duration);
430    }
431  }
432}
433
434void tThread::Start()
435{
436  RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_2, "Entering");
437  RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_2, "PreMutex");
438  tLock l(*this);
439  RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_2, "Locked");
440  start_signal = true;
441  monitor.Notify(l);
442  RRLIB_LOG_PRINT(eLL_DEBUG_VERBOSE_2, "Notified thread");
443}
444
445void tThread::StopThread()
446{
447  if (unknown_thread)
448  {
449    RRLIB_LOG_PRINT(rrlib::logging::eLL_WARNING, "Operation not supported for threads of unknown origin.");
450    return;
451  }
452
453  // default implementation - possibly sufficient for some thread classes
454  tLock l(*this);
455  stop_signal = true;
456  monitor.Notify(l);
457}
458
459bool tThread::StopThreads(bool query_only)
460{
461  volatile static bool stopping_threadz = false;
462  if (stopping_threadz || query_only)   // We don't do this twice
463  {
464    return stopping_threadz;
465  }
466  stopping_threadz = true;
467  const char*(*GetLogDescription)() = GetLogDescriptionStatic;
468  RRLIB_LOG_PRINT(eLL_USER, "Stopping all threads");
469
470  tLock lock(internal::GetThreadList()->obj_mutex);
471  std::vector<std::weak_ptr<tThread>> current_threads_unordered;
472  std::vector<std::weak_ptr<tThread>> current_threads;
473  current_threads_unordered = internal::GetThreadList()->vec;
474  lock.Unlock();
475
476  // Sort threads according to longevity
477  int64_t last_longevity = -1;
478  while (true)
479  {
480    int64_t min_longevity = 0xFFFFFFFFFFLL;
481    for (size_t i = 0; i < current_threads_unordered.size(); i++)
482    {
483      std::shared_ptr<tThread> t = current_threads_unordered[i].lock();
484      if (t && t->longevity < min_longevity && t->longevity > last_longevity)
485      {
486        min_longevity = t->longevity;
487      }
488    }
489    if (min_longevity == 0xFFFFFFFFFFLL)
490    {
491      break;
492    }
493
494    // Copy to new list
495    for (size_t i = 0; i < current_threads_unordered.size(); i++)
496    {
497      std::shared_ptr<tThread> t = current_threads_unordered[i].lock();
498      if (t && t->longevity == min_longevity)
499      {
500        current_threads.push_back(current_threads_unordered[i]);
501      }
502    }
503
504    last_longevity = min_longevity;
505  }
506
507  // Delete threads in now correct order
508  for (size_t i = 0; i < current_threads.size(); i++)
509  {
510    std::weak_ptr<tThread> thread = current_threads[i];
511    std::shared_ptr<tThread> t = thread.lock();
512    if (t && t.get() != CurrentThreadRaw())
513    {
514      if (t->unknown_thread)
515      {
516        RRLIB_LOG_PRINT(rrlib::logging::eLL_WARNING, "Do not know how to stop thread '", t->GetLogDescription(), "' of unknown origin.");
517        continue;
518      }
519
520      RRLIB_LOG_PRINT(eLL_DEBUG, "Stopping thread '", t->GetLogDescription(), "'");
521      tLock l(*t);
522      t->stop_signal = true;
523      if (!t->start_signal)
524      {
525        t->monitor.Notify(l);
526      }
527      else
528      {
529        t->monitor.Notify(l);
530        l.Unlock();
531        t->StopThread();
532        t->Join();
533      }
534    }
535  }
536
537  return true;
538}
539
540void tThread::Yield()
541{
542  std::this_thread::yield();
543}
544
545//----------------------------------------------------------------------
546// End of namespace declaration
547//----------------------------------------------------------------------
548}
549}
Note: See TracBrowser for help on using the repository browser.