source: rrlib_thread/tThread.cpp @ 15:d0e7b92ad22f

Last change on this file since 15:d0e7b92ad22f was 15:d0e7b92ad22f, checked in by Tobias Föhst <foehst@…>, 6 years ago

Added and updated license information

File size: 16.1 KB
Line 
1//
2// You received this file as part of RRLib
3// Robotics Research Library
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    rrlib/thread/tThread.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2012-07-05
27 *
28 */
29//----------------------------------------------------------------------
30#include "rrlib/thread/tThread.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include <sstream>
36#include <cstring>
37#include <sys/resource.h>
38#include <sys/syscall.h>
39#include "rrlib/design_patterns/singleton.h"
40//----------------------------------------------------------------------
41// Internal includes with ""
42//----------------------------------------------------------------------
43
44//----------------------------------------------------------------------
45// Debugging
46//----------------------------------------------------------------------
47#include <cassert>
48
49//----------------------------------------------------------------------
50// Namespace usage
51//----------------------------------------------------------------------
52using namespace rrlib::logging;
53
54//----------------------------------------------------------------------
55// Namespace declaration
56//----------------------------------------------------------------------
57namespace rrlib
58{
59namespace thread
60{
61
62//----------------------------------------------------------------------
63// Forward declarations / typedefs / enums
64//----------------------------------------------------------------------
65
66//----------------------------------------------------------------------
67// Const values
68//----------------------------------------------------------------------
69
70//----------------------------------------------------------------------
71// Implementation
72//----------------------------------------------------------------------
73
74__thread tThread* tThread::cur_thread = NULL;
75
76
77namespace internal
78{
79class tThreadCleanup
80{
81public:
82  static void Cleanup(tThread* t)
83  {
84    tLock l(*t);
85    t->state = tThread::tState::TERMINATED;
86    //t->curThread.reset();
87    l.Unlock();
88    t->self.reset(); // possibly delete thread - important that it's last statement
89  }
90};
91
92/*!
93 * Class for thread (self) deletion
94 */
95class tThreadDeleter
96{
97public:
98  //! for (self) deletion
99  void operator()(tThread* t)
100  {
101    if (t->GetDeleteOnCompletion())
102    {
103      delete t;
104    }
105  }
106};
107
108static std::string GetDefaultThreadName(int64_t id)
109{
110  std::ostringstream oss;
111  oss << "Thread-" << id;
112  return oss.str();
113}
114
115template <typename T>
116struct CreateCurThreadLocal
117{
118  static T* Create()
119  {
120    return new T(tThreadCleanup::Cleanup);
121  }
122  static void Destroy(T* object)
123  {
124    delete object;
125  }
126};
127
128/*! List of threads currently known and running (== all thread objects created) */
129static std::shared_ptr<internal::tVectorWithMutex<std::weak_ptr<tThread>>>& GetThreadList()
130{
131  static std::shared_ptr<internal::tVectorWithMutex<std::weak_ptr<tThread>>> thread_list(new internal::tVectorWithMutex<std::weak_ptr<tThread>>(0x7FFFFFFF));
132  return thread_list;
133}
134
135static uint32_t GetUniqueThreadId()
136{
137  static tOrderedMutex mutex("GetUniqueThreadId()", 0x7FFFFFFE);
138  static bool overflow = false;
139  static uint32_t counter = 1;
140  tLock lock(mutex);
141  if (!overflow)
142  {
143    int result = counter;
144    counter++;
145    if (counter == 0)
146    {
147      overflow = true;
148    }
149    return result;
150  }
151
152  tLock lock2(GetThreadList()->obj_mutex);
153  // hmm... we start at id 1024 - as the former threads may be more long-lived
154  counter = std::max(1023u, counter);
155  std::vector<std::weak_ptr<tThread>>& current_threads = GetThreadList()->vec;
156  while (true)
157  {
158    counter++;
159    bool used = false;
160    for (auto it = current_threads.begin(); it != current_threads.end(); ++it)
161    {
162      std::shared_ptr<tThread> thread = it->lock();
163      if (thread && thread->GetId() == counter)
164      {
165        used = true;
166        break;
167      }
168    }
169    if (!used)
170    {
171      return counter;
172    }
173  }
174}
175
176
177} // namespace internal
178
179tThread::tThread(bool anonymous, bool legion) :
180  stop_signal(false),
181  lock_stack(),
182  id(internal::GetUniqueThreadId()),
183  name(internal::GetDefaultThreadName(id)),
184  priority(cDEFAULT_PRIORITY),
185  state(tState::RUNNING),
186  self(this, internal::tThreadDeleter()),
187  delete_on_completion(true),
188  start_signal(false),
189  monitor(*this),
190  thread_list_ref(internal::GetThreadList()),
191  locked_objects(),
192  longevity(0),
193  unknown_thread(true),
194  wrapped_thread(),
195  handle(pthread_self()),
196  joining_threads(0)
197{
198  AddToThreadList();
199
200  // see if we can obtain a thread name
201  char name_buffer[1024];
202  if (!pthread_getname_np(handle, name_buffer, 1023))
203  {
204    name_buffer[1023] = 0;
205    if (strlen(name_buffer) > 0)
206    {
207      name = name_buffer;
208    }
209  }
210}
211
212tThread::tThread(const std::string& name) :
213  stop_signal(false),
214  lock_stack(),
215  id(internal::GetUniqueThreadId()),
216  name(name.length() > 0 ? name : internal::GetDefaultThreadName(id)),
217  priority(cDEFAULT_PRIORITY),
218  state(tState::NEW),
219  self(this, internal::tThreadDeleter()),
220  delete_on_completion(false),
221  start_signal(false),
222  monitor(*this),
223  thread_list_ref(internal::GetThreadList()),
224  locked_objects(),
225  longevity(0),
226  unknown_thread(false),
227  wrapped_thread(&Launch, this),
228  handle(wrapped_thread.native_handle()),
229  joining_threads(0)
230{
231  AddToThreadList();
232  SetName(this->name);
233}
234
235tThread::~tThread()
236{
237  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Deleting thread ", this);
238
239  // remove from thread list
240  tLock lock(thread_list_ref->obj_mutex);
241  assert(this != NULL);
242
243  // remove thread from list
244  for (size_t i = 0; i < thread_list_ref->vec.size(); i++)
245  {
246    std::shared_ptr<tThread> t = thread_list_ref->vec[i].lock();
247    if (t.get() == this)
248    {
249      thread_list_ref->vec.erase(thread_list_ref->vec.begin() + i);
250      break;
251    }
252    if (t.get() == NULL)   // remove empty entries
253    {
254      thread_list_ref->vec.erase(thread_list_ref->vec.begin() + i);
255      i--;
256    }
257  }
258
259  lock.Unlock();
260  if (!unknown_thread)
261  {
262    if (&tThread::CurrentThread() != this)
263    {
264      Join(); // we shouldn't delete anything while thread is still running
265    }
266    else if (wrapped_thread.joinable())
267    {
268      wrapped_thread.detach();
269    }
270  }
271
272  for (auto rit = locked_objects.rbegin(); rit < locked_objects.rend(); ++rit)
273  {
274    (*rit).reset();
275  }
276}
277
278void tThread::AddToThreadList()
279{
280  tLock lock(thread_list_ref->obj_mutex);
281  thread_list_ref->vec.push_back(self);
282
283  //printf("Creating thread %p %s\n", this, getName().getCString());
284  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Creating thread ", this);
285}
286
287typedef rrlib::design_patterns::tSingletonHolder<boost::thread_specific_ptr<tThread>, rrlib::design_patterns::singleton::Longevity, internal::CreateCurThreadLocal> tCurThreadLocal;
288static inline unsigned int GetLongevity(boost::thread_specific_ptr<tThread>*)
289{
290  return 0xFFFFFFFC; // should exist longer than anything using rrlib locks
291}
292
293boost::thread_specific_ptr<tThread>& tThread::GetCurThreadLocal()
294{
295  return tCurThreadLocal::Instance();
296}
297
298std::string tThread::GetLogDescription() const
299{
300  std::ostringstream oss;
301  oss << "Thread " << id << " '" << GetName() << "'";
302  return oss.str();
303}
304
305void tThread::Join()
306{
307  if (unknown_thread)
308  {
309    RRLIB_LOG_PRINT(WARNING, "Operation not supported for threads of unknown origin.");
310    return;
311  }
312  if (!wrapped_thread.joinable())
313  {
314    return;
315  }
316  if (&CurrentThread() == this)
317  {
318    RRLIB_LOG_PRINT(DEBUG_WARNING, "Thread cannot join itself");
319    return;
320  }
321  PreJoin();
322  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Joining Thread");
323
324  int joining = joining_threads.fetch_add(1);
325  if (joining >= 1)
326  {
327    RRLIB_LOG_PRINT(DEBUG_WARNING, "Multiple threads are trying to join. Returning this thread without joining.");
328    return;
329  }
330  if (wrapped_thread.joinable())
331  {
332    wrapped_thread.join();
333  }
334  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Joined Thread");
335}
336
337void tThread::Launch(tThread* thread_ptr)
338{
339  thread_ptr->Launcher();
340}
341
342void tThread::Launcher()
343{
344  //unsafe _FINROC_LOG_MESSAGE(DEBUG_VERBOSE_2, logDomain) << "Entering";
345  cur_thread = this;
346  tLock l(*this);
347  state = tState::PREPARE_RUNNING;
348  //unsafe _FINROC_LOG_MESSAGE(DEBUG_VERBOSE_2, logDomain) << "Locked";
349  //curThread = threadPtr;
350  GetCurThreadLocal().reset(this);
351  //unsafe _FINROC_LOG_MESSAGE(DEBUG_VERBOSE_2, logDomain) << "ThreadLocal set";
352
353  // wait for start signal
354  while ((!(start_signal)) && (!(stop_signal)))
355  {
356    monitor.Wait(l);
357  }
358
359  // run thread?
360  state = tState::RUNNING;
361  if (start_signal/* && (!(stop_signal))*/)
362  {
363    l.Unlock();
364    RRLIB_LOG_PRINT(DEBUG, "Thread started");
365    Run();
366    RRLIB_LOG_PRINT(DEBUG, "Thread exited normally");
367  }
368
369  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Exiting");
370
371  l.Unlock();
372  // Do this BEFORE thread is removed from list - to ensure this is done StopThreads
373  try
374  {
375    for (auto rit = locked_objects.rbegin(); rit < locked_objects.rend(); ++rit)
376    {
377      (*rit).reset();
378    }
379  }
380  catch (const std::exception& e)
381  {
382    RRLIB_LOG_PRINT(ERROR, "Thread encountered exception during cleanup: ", e.what());
383  }
384}
385
386void tThread::LockObject(std::shared_ptr<void> obj)
387{
388  tLock(*this);
389  locked_objects.push_back(obj);
390}
391
392void tThread::PreJoin()
393{
394  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Entering");
395  tLock l(*this);
396  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Locked");
397  if (state == tState::PREPARE_RUNNING || state == tState::NEW)
398  {
399    RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notifying");
400    stop_signal = true;
401    monitor.Notify(l);
402    RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notified");
403  }
404  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Leaving");
405}
406
407void tThread::SetName(const std::string& name)
408{
409  this->name = name;
410  pthread_setname_np(handle, name.substr(0, 15).c_str());
411}
412
413void tThread::SetPriority(int new_priority)
414{
415  //if (new_priority < sched_get_priority_min(SCHED_OTHER) || new_priority > sched_get_priority_max(SCHED_OTHER))
416  if (new_priority < -20 || new_priority > 19)
417  {
418    //pthread_getschedparam(handle, &policy, &param);
419    RRLIB_LOG_PRINT(ERROR, "Invalid thread priority: ", new_priority, ". Ignoring.");// Valid range is ", sched_get_priority_min(SCHED_OTHER), " to ", sched_get_priority_max(SCHED_OTHER),
420    //    ". Current priority is ", param.sched_priority, ".");
421    return;
422  }
423  if (CurrentThreadId() != GetId())
424  {
425    RRLIB_LOG_PRINT(ERROR, "SetPriority can only be called from the current thread");
426    return;
427  }
428
429  //int error_code = pthread_setschedparam(handle, SCHED_OTHER, &param);
430  // works according to "man pthreads" and discussion on: http://stackoverflow.com/questions/7684404/is-nice-used-to-change-the-thread-priority-or-the-process-priority
431  pid_t thread_id = syscall(SYS_gettid);
432  int error_code = setpriority(PRIO_PROCESS, thread_id, new_priority);
433  if (error_code)
434  {
435    RRLIB_LOG_PRINT(ERROR, "Failed to change thread priority: ", strerror(error_code));
436    return;
437  }
438  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Set niceness to ", new_priority);
439  priority = new_priority;
440}
441
442void tThread::SetRealtime()
443{
444  struct sched_param param;
445  param.sched_priority = 49;
446  int error_code = pthread_setschedparam(handle, SCHED_FIFO, &param);
447  if (error_code)
448  {
449    //printf("Failed making thread a real-time thread. Possibly current user has insufficient rights.\n");
450    RRLIB_LOG_PRINT(ERROR, "Failed making thread a real-time thread.", (error_code == EPERM ? " Caller does not have appropriate privileges." : ""));
451  }
452}
453
454
455void tThread::Sleep(const rrlib::time::tDuration& sleep_for, bool use_application_time, rrlib::time::tTimestamp wait_until)
456{
457  rrlib::time::tTimeMode time_mode = rrlib::time::GetTimeMode();
458  tThread& t = CurrentThread();
459  if (time_mode == rrlib::time::tTimeMode::SYSTEM_TIME || (!use_application_time))
460  {
461    if (sleep_for < std::chrono::milliseconds(400))
462    {
463      std::this_thread::sleep_for(sleep_for);
464    }
465    else
466    {
467      tLock l(t);
468      t.monitor.Wait(l, sleep_for, use_application_time, wait_until);
469    }
470  }
471  else if (time_mode == rrlib::time::tTimeMode::CUSTOM_CLOCK)
472  {
473    tLock l(t);
474    t.monitor.Wait(l, sleep_for, use_application_time, wait_until);
475  }
476  else
477  {
478    assert(time_mode == rrlib::time::tTimeMode::STRETCHED_SYSTEM_TIME);
479    tLock l(t);
480    rrlib::time::tDuration system_duration = rrlib::time::ToSystemDuration(sleep_for);
481    if (system_duration > std::chrono::milliseconds(20))
482    {
483      t.monitor.Wait(l, system_duration, use_application_time, wait_until);
484    }
485    else
486    {
487      l.Unlock();
488      std::this_thread::sleep_for(system_duration);
489    }
490  }
491}
492
493void tThread::Start()
494{
495  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Entering");
496  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "PreMutex");
497  tLock l(*this);
498  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Locked");
499  start_signal = true;
500  monitor.Notify(l);
501  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notified thread");
502}
503
504void tThread::StopThread()
505{
506  if (unknown_thread)
507  {
508    RRLIB_LOG_PRINT(WARNING, "Operation not supported for threads of unknown origin.");
509    return;
510  }
511
512  // default implementation - possibly sufficient for some thread classes
513  tLock l(*this);
514  stop_signal = true;
515  monitor.Notify(l);
516}
517
518bool tThread::StopThreads(bool query_only)
519{
520  volatile static bool stopping_threadz = false;
521  if (stopping_threadz || query_only)   // We don't do this twice
522  {
523    return stopping_threadz;
524  }
525  stopping_threadz = true;
526  const char*(*GetLogDescription)() = GetLogDescriptionStatic;
527  RRLIB_LOG_PRINT(USER, "Stopping all threads");
528
529  tLock lock(internal::GetThreadList()->obj_mutex);
530  std::vector<std::weak_ptr<tThread>> current_threads_unordered;
531  std::vector<std::weak_ptr<tThread>> current_threads;
532  current_threads_unordered = internal::GetThreadList()->vec;
533  lock.Unlock();
534
535  // Sort threads according to longevity
536  int64_t last_longevity = -1;
537  while (true)
538  {
539    int64_t min_longevity = 0xFFFFFFFFFFLL;
540    for (size_t i = 0; i < current_threads_unordered.size(); i++)
541    {
542      std::shared_ptr<tThread> t = current_threads_unordered[i].lock();
543      if (t && t->longevity < min_longevity && t->longevity > last_longevity)
544      {
545        min_longevity = t->longevity;
546      }
547    }
548    if (min_longevity == 0xFFFFFFFFFFLL)
549    {
550      break;
551    }
552
553    // Copy to new list
554    for (size_t i = 0; i < current_threads_unordered.size(); i++)
555    {
556      std::shared_ptr<tThread> t = current_threads_unordered[i].lock();
557      if (t && t->longevity == min_longevity)
558      {
559        current_threads.push_back(current_threads_unordered[i]);
560      }
561    }
562
563    last_longevity = min_longevity;
564  }
565
566  // Delete threads in now correct order
567  for (size_t i = 0; i < current_threads.size(); i++)
568  {
569    std::weak_ptr<tThread> thread = current_threads[i];
570    std::shared_ptr<tThread> t = thread.lock();
571    if (t && t.get() != &CurrentThread())
572    {
573      if (t->unknown_thread)
574      {
575        RRLIB_LOG_PRINT(WARNING, "Do not know how to stop thread '", t->GetLogDescription(), "' of unknown origin.");
576        continue;
577      }
578
579      RRLIB_LOG_PRINT(DEBUG, "Stopping thread '", t->GetLogDescription(), "'");
580      tLock l(*t);
581      t->stop_signal = true;
582      if (!t->start_signal)
583      {
584        t->monitor.Notify(l);
585      }
586      else
587      {
588        t->monitor.Notify(l);
589        l.Unlock();
590        t->StopThread();
591        t->Join();
592      }
593    }
594  }
595
596  return true;
597}
598
599void tThread::Yield()
600{
601  std::this_thread::yield();
602}
603
604//----------------------------------------------------------------------
605// End of namespace declaration
606//----------------------------------------------------------------------
607}
608}
Note: See TracBrowser for help on using the repository browser.