source: rrlib_thread/tThread.cpp @ 20:6a5ce655e1ab

Last change on this file since 20:6a5ce655e1ab was 20:6a5ce655e1ab, checked in by Max Reichardt <mreichardt@…>, 5 years ago

Removed boost dependency (replaced boost::thread_specific_ptr with standard thread_local)

File size: 15.4 KB
Line 
1//
2// You received this file as part of RRLib
3// Robotics Research Library
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    rrlib/thread/tThread.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2012-07-05
27 *
28 */
29//----------------------------------------------------------------------
30#include "rrlib/thread/tThread.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include <sstream>
36#include <cstring>
37#include <sys/resource.h>
38#include <sys/syscall.h>
39#include <unistd.h>
40
41#include "rrlib/design_patterns/singleton.h"
42//----------------------------------------------------------------------
43// Internal includes with ""
44//----------------------------------------------------------------------
45
46//----------------------------------------------------------------------
47// Debugging
48//----------------------------------------------------------------------
49#include <cassert>
50
51//----------------------------------------------------------------------
52// Namespace usage
53//----------------------------------------------------------------------
54using namespace rrlib::logging;
55
56//----------------------------------------------------------------------
57// Namespace declaration
58//----------------------------------------------------------------------
59namespace rrlib
60{
61namespace thread
62{
63
64//----------------------------------------------------------------------
65// Forward declarations / typedefs / enums
66//----------------------------------------------------------------------
67
68//----------------------------------------------------------------------
69// Const values
70//----------------------------------------------------------------------
71
72//----------------------------------------------------------------------
73// Implementation
74//----------------------------------------------------------------------
75
76thread_local tThread::tPointer tThread::current_thread;
77
78namespace internal
79{
80
81/*!
82 * Class for thread (self) deletion
83 */
84class tThreadDeleter
85{
86public:
87  //! for (self) deletion
88  void operator()(tThread* t)
89  {
90    if (t->GetDeleteOnCompletion())
91    {
92      delete t;
93    }
94  }
95};
96
97static std::string GetDefaultThreadName(int64_t id)
98{
99  std::ostringstream oss;
100  oss << "Thread-" << id;
101  return oss.str();
102}
103
104/*! List of threads currently known and running (== all thread objects created) */
105static std::shared_ptr<internal::tVectorWithMutex<std::weak_ptr<tThread>>>& GetThreadList()
106{
107  static std::shared_ptr<internal::tVectorWithMutex<std::weak_ptr<tThread>>> thread_list(new internal::tVectorWithMutex<std::weak_ptr<tThread>>(0x7FFFFFFF));
108  return thread_list;
109}
110
111static uint32_t GetUniqueThreadId()
112{
113  static tOrderedMutex mutex("GetUniqueThreadId()", 0x7FFFFFFE);
114  static bool overflow = false;
115  static uint32_t counter = 1;
116  tLock lock(mutex);
117  if (!overflow)
118  {
119    int result = counter;
120    counter++;
121    if (counter == 0)
122    {
123      overflow = true;
124    }
125    return result;
126  }
127
128  tLock lock2(GetThreadList()->obj_mutex);
129  // hmm... we start at id 1024 - as the former threads may be more long-lived
130  counter = std::max(1023u, counter);
131  std::vector<std::weak_ptr<tThread>>& current_threads = GetThreadList()->vec;
132  while (true)
133  {
134    counter++;
135    bool used = false;
136    for (auto it = current_threads.begin(); it != current_threads.end(); ++it)
137    {
138      std::shared_ptr<tThread> thread = it->lock();
139      if (thread && thread->GetId() == counter)
140      {
141        used = true;
142        break;
143      }
144    }
145    if (!used)
146    {
147      return counter;
148    }
149  }
150}
151
152
153} // namespace internal
154
155tThread::tThread(bool anonymous, bool legion) :
156  stop_signal(false),
157  lock_stack(),
158  id(internal::GetUniqueThreadId()),
159  name(internal::GetDefaultThreadName(id)),
160  priority(cDEFAULT_PRIORITY),
161  state(tState::RUNNING),
162  self(this, internal::tThreadDeleter()),
163  delete_on_completion(true),
164  start_signal(false),
165  monitor(*this),
166  thread_list_ref(internal::GetThreadList()),
167  locked_objects(),
168  longevity(0),
169  unknown_thread(true),
170  wrapped_thread(),
171  handle(pthread_self()),
172  joining_threads(0)
173{
174  AddToThreadList();
175
176  // see if we can obtain a thread name
177  char name_buffer[1024];
178  if (!pthread_getname_np(handle, name_buffer, 1023))
179  {
180    name_buffer[1023] = 0;
181    if (strlen(name_buffer) > 0)
182    {
183      name = name_buffer;
184    }
185  }
186}
187
188tThread::tThread(const std::string& name) :
189  stop_signal(false),
190  lock_stack(),
191  id(internal::GetUniqueThreadId()),
192  name(name.length() > 0 ? name : internal::GetDefaultThreadName(id)),
193  priority(cDEFAULT_PRIORITY),
194  state(tState::NEW),
195  self(this, internal::tThreadDeleter()),
196  delete_on_completion(false),
197  start_signal(false),
198  monitor(*this),
199  thread_list_ref(internal::GetThreadList()),
200  locked_objects(),
201  longevity(0),
202  unknown_thread(false),
203  wrapped_thread(&Launch, this),
204  handle(wrapped_thread.native_handle()),
205  joining_threads(0)
206{
207  AddToThreadList();
208  SetName(this->name);
209}
210
211tThread::~tThread()
212{
213  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Deleting thread ", this);
214
215  // remove from thread list
216  tLock lock(thread_list_ref->obj_mutex);
217  assert(this != NULL);
218
219  // remove thread from list
220  for (size_t i = 0; i < thread_list_ref->vec.size(); i++)
221  {
222    std::shared_ptr<tThread> t = thread_list_ref->vec[i].lock();
223    if (t.get() == this)
224    {
225      thread_list_ref->vec.erase(thread_list_ref->vec.begin() + i);
226      break;
227    }
228    if (t.get() == NULL)   // remove empty entries
229    {
230      thread_list_ref->vec.erase(thread_list_ref->vec.begin() + i);
231      i--;
232    }
233  }
234
235  lock.Unlock();
236  if (!unknown_thread)
237  {
238    if (&tThread::CurrentThread() != this)
239    {
240      Join(); // we shouldn't delete anything while thread is still running
241    }
242    else if (wrapped_thread.joinable())
243    {
244      wrapped_thread.detach();
245    }
246  }
247
248  for (auto rit = locked_objects.rbegin(); rit < locked_objects.rend(); ++rit)
249  {
250    (*rit).reset();
251  }
252}
253
254void tThread::AddToThreadList()
255{
256  tLock lock(thread_list_ref->obj_mutex);
257  thread_list_ref->vec.push_back(self);
258
259  //printf("Creating thread %p %s\n", this, getName().getCString());
260  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Creating thread ", this);
261}
262
263std::string tThread::GetLogDescription() const
264{
265  std::ostringstream oss;
266  oss << "Thread " << id << " '" << GetName() << "'";
267  return oss.str();
268}
269
270void tThread::Join()
271{
272  if (unknown_thread)
273  {
274    RRLIB_LOG_PRINT(WARNING, "Operation not supported for threads of unknown origin.");
275    return;
276  }
277  if (!wrapped_thread.joinable())
278  {
279    return;
280  }
281  if (&CurrentThread() == this)
282  {
283    RRLIB_LOG_PRINT(DEBUG_WARNING, "Thread cannot join itself");
284    return;
285  }
286  PreJoin();
287  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Joining Thread");
288
289  int joining = joining_threads.fetch_add(1);
290  if (joining >= 1)
291  {
292    RRLIB_LOG_PRINT(DEBUG_WARNING, "Multiple threads are trying to join. Returning this thread without joining.");
293    return;
294  }
295  if (wrapped_thread.joinable())
296  {
297    wrapped_thread.join();
298  }
299  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Joined Thread");
300}
301
302void tThread::Launch(tThread* thread_ptr)
303{
304  thread_ptr->Launcher();
305}
306
307void tThread::Launcher()
308{
309  //unsafe _FINROC_LOG_MESSAGE(DEBUG_VERBOSE_2, logDomain) << "Entering";
310  current_thread.pointer = this;
311  tLock l(*this);
312  state = tState::PREPARE_RUNNING;
313  //unsafe _FINROC_LOG_MESSAGE(DEBUG_VERBOSE_2, logDomain) << "Locked";
314
315  // wait for start signal
316  while ((!(start_signal)) && (!(stop_signal)))
317  {
318    monitor.Wait(l);
319  }
320
321  // run thread?
322  state = tState::RUNNING;
323  if (start_signal/* && (!(stop_signal))*/)
324  {
325    l.Unlock();
326    RRLIB_LOG_PRINT(DEBUG, "Thread started");
327    Run();
328    RRLIB_LOG_PRINT(DEBUG, "Thread exited normally");
329  }
330
331  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Exiting");
332
333  l.Unlock();
334  // Do this BEFORE thread is removed from list - to ensure this is done StopThreads
335  try
336  {
337    for (auto rit = locked_objects.rbegin(); rit < locked_objects.rend(); ++rit)
338    {
339      (*rit).reset();
340    }
341  }
342  catch (const std::exception& e)
343  {
344    RRLIB_LOG_PRINT(ERROR, "Thread encountered exception during cleanup: ", e.what());
345  }
346}
347
348void tThread::LockObject(std::shared_ptr<void> obj)
349{
350  tLock(*this);
351  locked_objects.push_back(obj);
352}
353
354void tThread::PreJoin()
355{
356  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Entering");
357  tLock l(*this);
358  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Locked");
359  if (state == tState::PREPARE_RUNNING || state == tState::NEW)
360  {
361    RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notifying");
362    stop_signal = true;
363    monitor.Notify(l);
364    RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notified");
365  }
366  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Leaving");
367}
368
369void tThread::SetName(const std::string& name)
370{
371  this->name = name;
372  pthread_setname_np(handle, name.substr(0, 15).c_str());
373}
374
375void tThread::SetPriority(int new_priority)
376{
377  //if (new_priority < sched_get_priority_min(SCHED_OTHER) || new_priority > sched_get_priority_max(SCHED_OTHER))
378  if (new_priority < -20 || new_priority > 19)
379  {
380    //pthread_getschedparam(handle, &policy, &param);
381    RRLIB_LOG_PRINT(ERROR, "Invalid thread priority: ", new_priority, ". Ignoring.");// Valid range is ", sched_get_priority_min(SCHED_OTHER), " to ", sched_get_priority_max(SCHED_OTHER),
382    //    ". Current priority is ", param.sched_priority, ".");
383    return;
384  }
385  if (CurrentThreadId() != GetId())
386  {
387    RRLIB_LOG_PRINT(ERROR, "SetPriority can only be called from the current thread");
388    return;
389  }
390
391  //int error_code = pthread_setschedparam(handle, SCHED_OTHER, &param);
392  // works according to "man pthreads" and discussion on: http://stackoverflow.com/questions/7684404/is-nice-used-to-change-the-thread-priority-or-the-process-priority
393  pid_t thread_id = syscall(SYS_gettid);
394  int error_code = setpriority(PRIO_PROCESS, thread_id, new_priority);
395  if (error_code)
396  {
397    RRLIB_LOG_PRINT(ERROR, "Failed to change thread priority: ", strerror(error_code));
398    return;
399  }
400  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Set niceness to ", new_priority);
401  priority = new_priority;
402}
403
404void tThread::SetRealtime()
405{
406  struct sched_param param;
407  param.sched_priority = 49;
408  int error_code = pthread_setschedparam(handle, SCHED_FIFO, &param);
409  if (error_code)
410  {
411    //printf("Failed making thread a real-time thread. Possibly current user has insufficient rights.\n");
412    RRLIB_LOG_PRINT(ERROR, "Failed making thread a real-time thread.", (error_code == EPERM ? " Caller does not have appropriate privileges." : ""));
413  }
414}
415
416
417void tThread::Sleep(const rrlib::time::tDuration& sleep_for, bool use_application_time, rrlib::time::tTimestamp wait_until)
418{
419  rrlib::time::tTimeMode time_mode = rrlib::time::GetTimeMode();
420  tThread& t = CurrentThread();
421  if (time_mode == rrlib::time::tTimeMode::SYSTEM_TIME || (!use_application_time))
422  {
423    if (sleep_for < std::chrono::milliseconds(400))
424    {
425      std::this_thread::sleep_for(sleep_for);
426    }
427    else
428    {
429      tLock l(t);
430      t.monitor.Wait(l, sleep_for, use_application_time, wait_until);
431    }
432  }
433  else if (time_mode == rrlib::time::tTimeMode::CUSTOM_CLOCK)
434  {
435    tLock l(t);
436    t.monitor.Wait(l, sleep_for, use_application_time, wait_until);
437  }
438  else
439  {
440    assert(time_mode == rrlib::time::tTimeMode::STRETCHED_SYSTEM_TIME);
441    tLock l(t);
442    rrlib::time::tDuration system_duration = rrlib::time::ToSystemDuration(sleep_for);
443    if (system_duration > std::chrono::milliseconds(20))
444    {
445      t.monitor.Wait(l, system_duration, use_application_time, wait_until);
446    }
447    else
448    {
449      l.Unlock();
450      std::this_thread::sleep_for(system_duration);
451    }
452  }
453}
454
455void tThread::Start()
456{
457  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Entering");
458  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "PreMutex");
459  tLock l(*this);
460  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Locked");
461  start_signal = true;
462  monitor.Notify(l);
463  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notified thread");
464}
465
466void tThread::StopThread()
467{
468  if (unknown_thread)
469  {
470    RRLIB_LOG_PRINT(WARNING, "Operation not supported for threads of unknown origin.");
471    return;
472  }
473
474  // default implementation - possibly sufficient for some thread classes
475  tLock l(*this);
476  stop_signal = true;
477  monitor.Notify(l);
478}
479
480bool tThread::StopThreads(bool query_only)
481{
482  volatile static bool stopping_threadz = false;
483  if (stopping_threadz || query_only)   // We don't do this twice
484  {
485    return stopping_threadz;
486  }
487  stopping_threadz = true;
488  const char*(*GetLogDescription)() = GetLogDescriptionStatic;
489  RRLIB_LOG_PRINT(USER, "Stopping all threads");
490
491  tLock lock(internal::GetThreadList()->obj_mutex);
492  std::vector<std::weak_ptr<tThread>> current_threads_unordered;
493  std::vector<std::weak_ptr<tThread>> current_threads;
494  current_threads_unordered = internal::GetThreadList()->vec;
495  lock.Unlock();
496
497  // Sort threads according to longevity
498  int64_t last_longevity = -1;
499  while (true)
500  {
501    int64_t min_longevity = 0xFFFFFFFFFFLL;
502    for (size_t i = 0; i < current_threads_unordered.size(); i++)
503    {
504      std::shared_ptr<tThread> t = current_threads_unordered[i].lock();
505      if (t && t->longevity < min_longevity && t->longevity > last_longevity)
506      {
507        min_longevity = t->longevity;
508      }
509    }
510    if (min_longevity == 0xFFFFFFFFFFLL)
511    {
512      break;
513    }
514
515    // Copy to new list
516    for (size_t i = 0; i < current_threads_unordered.size(); i++)
517    {
518      std::shared_ptr<tThread> t = current_threads_unordered[i].lock();
519      if (t && t->longevity == min_longevity)
520      {
521        current_threads.push_back(current_threads_unordered[i]);
522      }
523    }
524
525    last_longevity = min_longevity;
526  }
527
528  // Delete threads in now correct order
529  for (size_t i = 0; i < current_threads.size(); i++)
530  {
531    std::weak_ptr<tThread> thread = current_threads[i];
532    std::shared_ptr<tThread> t = thread.lock();
533    if (t && t.get() != &CurrentThread())
534    {
535      if (t->unknown_thread)
536      {
537        RRLIB_LOG_PRINT(WARNING, "Do not know how to stop thread '", t->GetLogDescription(), "' of unknown origin.");
538        continue;
539      }
540
541      RRLIB_LOG_PRINT(DEBUG, "Stopping thread '", t->GetLogDescription(), "'");
542      tLock l(*t);
543      t->stop_signal = true;
544      if (!t->start_signal)
545      {
546        t->monitor.Notify(l);
547      }
548      else
549      {
550        t->monitor.Notify(l);
551        l.Unlock();
552        t->StopThread();
553        t->Join();
554      }
555    }
556  }
557
558  return true;
559}
560
561void tThread::Yield()
562{
563  std::this_thread::yield();
564}
565
566tThread::tPointer::~tPointer()
567{
568  tLock l(*pointer);
569  pointer->state = tThread::tState::TERMINATED;
570  //t->curThread.reset();
571  l.Unlock();
572  pointer->self.reset(); // possibly delete thread - important that it's last statement
573}
574
575//----------------------------------------------------------------------
576// End of namespace declaration
577//----------------------------------------------------------------------
578}
579}
Note: See TracBrowser for help on using the repository browser.