source: rrlib_thread/tThread.cpp @ 30:fa354798480f

Last change on this file since 30:fa354798480f was 30:fa354798480f, checked in by Max Reichardt <mreichardt@…>, 5 years ago

Dependency to pthread library is now optional. Added RRLIB_SINGLE_THREADED #ifndefs to all places where they were still missing.

File size: 16.0 KB
Line 
1//
2// You received this file as part of RRLib
3// Robotics Research Library
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    rrlib/thread/tThread.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2012-07-05
27 *
28 */
29//----------------------------------------------------------------------
30#include "rrlib/thread/tThread.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include <sstream>
36#include <cstring>
37
38#ifndef RRLIB_SINGLE_THREADED
39#include <sys/resource.h>
40#include <sys/syscall.h>
41#include <unistd.h>
42#endif
43
44#include "rrlib/design_patterns/singleton.h"
45//----------------------------------------------------------------------
46// Internal includes with ""
47//----------------------------------------------------------------------
48
49//----------------------------------------------------------------------
50// Debugging
51//----------------------------------------------------------------------
52#include <cassert>
53
54//----------------------------------------------------------------------
55// Namespace usage
56//----------------------------------------------------------------------
57using namespace rrlib::logging;
58
59//----------------------------------------------------------------------
60// Namespace declaration
61//----------------------------------------------------------------------
62namespace rrlib
63{
64namespace thread
65{
66
67//----------------------------------------------------------------------
68// Forward declarations / typedefs / enums
69//----------------------------------------------------------------------
70
71//----------------------------------------------------------------------
72// Const values
73//----------------------------------------------------------------------
74#ifdef _PTHREAD_H
75#define RRLIB_THREAD_USING_PTHREADS
76#endif
77
78//----------------------------------------------------------------------
79// Implementation
80//----------------------------------------------------------------------
81
82#ifndef RRLIB_SINGLE_THREADED
83thread_local tThread::tPointer tThread::current_thread;
84#endif
85
86namespace internal
87{
88
89/*!
90 * Class for thread (self) deletion
91 */
92class tThreadDeleter
93{
94public:
95  //! for (self) deletion
96  void operator()(tThread* t)
97  {
98    if (t->GetDeleteOnCompletion())
99    {
100      delete t;
101    }
102  }
103};
104
105static std::string GetDefaultThreadName(int64_t id)
106{
107  std::ostringstream oss;
108  oss << "Thread-" << id;
109  return oss.str();
110}
111
112/*! List of threads currently known and running (== all thread objects created) */
113static std::shared_ptr<internal::tVectorWithMutex<std::weak_ptr<tThread>>>& GetThreadList()
114{
115  static std::shared_ptr<internal::tVectorWithMutex<std::weak_ptr<tThread>>> thread_list(new internal::tVectorWithMutex<std::weak_ptr<tThread>>(0x7FFFFFFF));
116  return thread_list;
117}
118
119static uint32_t GetUniqueThreadId()
120{
121  static tOrderedMutex mutex("GetUniqueThreadId()", 0x7FFFFFFE);
122  static bool overflow = false;
123  static uint32_t counter = 1;
124  tLock lock(mutex);
125  if (!overflow)
126  {
127    int result = counter;
128    counter++;
129    if (counter == 0)
130    {
131      overflow = true;
132    }
133    return result;
134  }
135
136  tLock lock2(GetThreadList()->obj_mutex);
137  // hmm... we start at id 1024 - as the former threads may be more long-lived
138  counter = std::max<uint32_t>(1023u, counter);
139  std::vector<std::weak_ptr<tThread>>& current_threads = GetThreadList()->vec;
140  while (true)
141  {
142    counter++;
143    bool used = false;
144    for (auto it = current_threads.begin(); it != current_threads.end(); ++it)
145    {
146      std::shared_ptr<tThread> thread = it->lock();
147      if (thread && thread->GetId() == counter)
148      {
149        used = true;
150        break;
151      }
152    }
153    if (!used)
154    {
155      return counter;
156    }
157  }
158}
159
160
161} // namespace internal
162
163tThread::tThread(bool anonymous, bool legion) :
164  stop_signal(false),
165  lock_stack(),
166  id(internal::GetUniqueThreadId()),
167  name(internal::GetDefaultThreadName(id)),
168  priority(cDEFAULT_PRIORITY),
169  state(tState::RUNNING),
170  self(this, internal::tThreadDeleter()),
171  delete_on_completion(true),
172  start_signal(false),
173  monitor(*this),
174  thread_list_ref(internal::GetThreadList()),
175  locked_objects(),
176  longevity(0),
177  unknown_thread(true),
178#ifndef RRLIB_SINGLE_THREADED
179  wrapped_thread(),
180  handle(pthread_self()),
181#endif
182  joining_threads(0)
183{
184  AddToThreadList();
185
186#ifdef RRLIB_THREAD_USING_PTHREADS
187  // see if we can obtain a thread name
188  char name_buffer[1024];
189  if (!pthread_getname_np(handle, name_buffer, 1023))
190  {
191    name_buffer[1023] = 0;
192    if (strlen(name_buffer) > 0)
193    {
194      name = name_buffer;
195    }
196  }
197#endif
198}
199
200tThread::tThread(const std::string& name) :
201  stop_signal(false),
202  lock_stack(),
203  id(internal::GetUniqueThreadId()),
204  name(name.length() > 0 ? name : internal::GetDefaultThreadName(id)),
205  priority(cDEFAULT_PRIORITY),
206  state(tState::NEW),
207  self(this, internal::tThreadDeleter()),
208  delete_on_completion(false),
209  start_signal(false),
210  monitor(*this),
211  thread_list_ref(internal::GetThreadList()),
212  locked_objects(),
213  longevity(0),
214  unknown_thread(false),
215#ifndef RRLIB_SINGLE_THREADED
216  wrapped_thread(&Launch, this),
217  handle(wrapped_thread.native_handle()),
218#endif
219  joining_threads(0)
220{
221  AddToThreadList();
222  SetName(this->name);
223}
224
225tThread::~tThread()
226{
227  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Deleting thread ", this);
228
229  // remove from thread list
230  tLock lock(thread_list_ref->obj_mutex);
231  assert(this != NULL);
232
233  // remove thread from list
234  for (size_t i = 0; i < thread_list_ref->vec.size(); i++)
235  {
236    std::shared_ptr<tThread> t = thread_list_ref->vec[i].lock();
237    if (t.get() == this)
238    {
239      thread_list_ref->vec.erase(thread_list_ref->vec.begin() + i);
240      break;
241    }
242    if (t.get() == NULL)   // remove empty entries
243    {
244      thread_list_ref->vec.erase(thread_list_ref->vec.begin() + i);
245      i--;
246    }
247  }
248
249  lock.Unlock();
250  if (!unknown_thread)
251  {
252#ifndef RRLIB_SINGLE_THREADED
253    if (&tThread::CurrentThread() != this)
254    {
255      Join(); // we shouldn't delete anything while thread is still running
256    }
257    else if (wrapped_thread.joinable())
258    {
259      wrapped_thread.detach();
260    }
261#endif
262  }
263
264  for (auto rit = locked_objects.rbegin(); rit < locked_objects.rend(); ++rit)
265  {
266    (*rit).reset();
267  }
268}
269
270void tThread::AddToThreadList()
271{
272  tLock lock(thread_list_ref->obj_mutex);
273  thread_list_ref->vec.push_back(self);
274
275  //printf("Creating thread %p %s\n", this, getName().getCString());
276  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Creating thread ", this);
277}
278
279std::string tThread::GetLogDescription() const
280{
281  std::ostringstream oss;
282  oss << "Thread " << id << " '" << GetName() << "'";
283  return oss.str();
284}
285
286void tThread::Join()
287{
288  if (unknown_thread)
289  {
290    RRLIB_LOG_PRINT(WARNING, "Operation not supported for threads of unknown origin.");
291    return;
292  }
293
294#ifndef RRLIB_SINGLE_THREADED
295  if (!wrapped_thread.joinable())
296  {
297    return;
298  }
299  if (&CurrentThread() == this)
300  {
301    RRLIB_LOG_PRINT(DEBUG_WARNING, "Thread cannot join itself");
302    return;
303  }
304  PreJoin();
305  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Joining Thread");
306
307  int joining = joining_threads.fetch_add(1);
308  if (joining >= 1)
309  {
310    RRLIB_LOG_PRINT(DEBUG_WARNING, "Multiple threads are trying to join. Returning this thread without joining.");
311    return;
312  }
313  if (wrapped_thread.joinable())
314  {
315    wrapped_thread.join();
316  }
317  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Joined Thread");
318#endif
319}
320
321void tThread::Launch(tThread* thread_ptr)
322{
323  thread_ptr->Launcher();
324}
325
326void tThread::Launcher()
327{
328#ifndef RRLIB_SINGLE_THREADED
329  //unsafe _FINROC_LOG_MESSAGE(DEBUG_VERBOSE_2, logDomain) << "Entering";
330  current_thread.pointer = this;
331  tLock l(*this);
332  state = tState::PREPARE_RUNNING;
333  //unsafe _FINROC_LOG_MESSAGE(DEBUG_VERBOSE_2, logDomain) << "Locked";
334
335  // wait for start signal
336  while ((!(start_signal)) && (!(stop_signal)))
337  {
338    monitor.Wait(l);
339  }
340
341  // run thread?
342  state = tState::RUNNING;
343  if (start_signal/* && (!(stop_signal))*/)
344  {
345    l.Unlock();
346    RRLIB_LOG_PRINT(DEBUG, "Thread started");
347    Run();
348    RRLIB_LOG_PRINT(DEBUG, "Thread exited normally");
349  }
350
351  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Exiting");
352
353  l.Unlock();
354  // Do this BEFORE thread is removed from list - to ensure this is done StopThreads
355  try
356  {
357    for (auto rit = locked_objects.rbegin(); rit < locked_objects.rend(); ++rit)
358    {
359      (*rit).reset();
360    }
361  }
362  catch (const std::exception& e)
363  {
364    RRLIB_LOG_PRINT(ERROR, "Thread encountered exception during cleanup: ", e.what());
365  }
366#endif
367}
368
369void tThread::LockObject(std::shared_ptr<void> obj)
370{
371  tLock(*this);
372  locked_objects.push_back(obj);
373}
374
375void tThread::PreJoin()
376{
377  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Entering");
378  tLock l(*this);
379  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Locked");
380  if (state == tState::PREPARE_RUNNING || state == tState::NEW)
381  {
382    RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notifying");
383    stop_signal = true;
384    monitor.Notify(l);
385    RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notified");
386  }
387  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Leaving");
388}
389
390void tThread::SetName(const std::string& name)
391{
392  this->name = name;
393#ifdef RRLIB_THREAD_USING_PTHREADS
394  pthread_setname_np(handle, name.substr(0, 15).c_str());
395#endif
396}
397
398void tThread::SetPriority(int new_priority)
399{
400#ifdef RRLIB_THREAD_USING_PTHREADS
401  //if (new_priority < sched_get_priority_min(SCHED_OTHER) || new_priority > sched_get_priority_max(SCHED_OTHER))
402  if (new_priority < -20 || new_priority > 19)
403  {
404    //pthread_getschedparam(handle, &policy, &param);
405    RRLIB_LOG_PRINT(ERROR, "Invalid thread priority: ", new_priority, ". Ignoring.");// Valid range is ", sched_get_priority_min(SCHED_OTHER), " to ", sched_get_priority_max(SCHED_OTHER),
406    //    ". Current priority is ", param.sched_priority, ".");
407    return;
408  }
409  if (CurrentThreadId() != GetId())
410  {
411    RRLIB_LOG_PRINT(ERROR, "SetPriority can only be called from the current thread");
412    return;
413  }
414
415  //int error_code = pthread_setschedparam(handle, SCHED_OTHER, &param);
416  // works according to "man pthreads" and discussion on: http://stackoverflow.com/questions/7684404/is-nice-used-to-change-the-thread-priority-or-the-process-priority
417  pid_t thread_id = syscall(SYS_gettid);
418  int error_code = setpriority(PRIO_PROCESS, thread_id, new_priority);
419  if (error_code)
420  {
421    RRLIB_LOG_PRINT(ERROR, "Failed to change thread priority: ", strerror(error_code));
422    return;
423  }
424  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Set niceness to ", new_priority);
425  priority = new_priority;
426#endif
427}
428
429void tThread::SetRealtime()
430{
431#ifdef RRLIB_THREAD_USING_PTHREADS
432  struct sched_param param;
433  param.sched_priority = 49;
434  int error_code = pthread_setschedparam(handle, SCHED_FIFO, &param);
435  if (error_code)
436  {
437    //printf("Failed making thread a real-time thread. Possibly current user has insufficient rights.\n");
438    RRLIB_LOG_PRINT(ERROR, "Failed making thread a real-time thread.", (error_code == EPERM ? " Caller does not have appropriate privileges." : ""));
439  }
440#endif
441}
442
443
444void tThread::Sleep(const rrlib::time::tDuration& sleep_for, bool use_application_time, rrlib::time::tTimestamp wait_until)
445{
446#ifndef RRLIB_SINGLE_THREADED
447  rrlib::time::tTimeMode time_mode = rrlib::time::GetTimeMode();
448  tThread& t = CurrentThread();
449  if (time_mode == rrlib::time::tTimeMode::SYSTEM_TIME || (!use_application_time))
450  {
451    if (sleep_for < std::chrono::milliseconds(400))
452    {
453      std::this_thread::sleep_for(sleep_for);
454    }
455    else
456    {
457      tLock l(t);
458      t.monitor.Wait(l, sleep_for, use_application_time, wait_until);
459    }
460  }
461  else if (time_mode == rrlib::time::tTimeMode::CUSTOM_CLOCK)
462  {
463    tLock l(t);
464    t.monitor.Wait(l, sleep_for, use_application_time, wait_until);
465  }
466  else
467  {
468    assert(time_mode == rrlib::time::tTimeMode::STRETCHED_SYSTEM_TIME);
469    tLock l(t);
470    rrlib::time::tDuration system_duration = rrlib::time::ToSystemDuration(sleep_for);
471    if (system_duration > std::chrono::milliseconds(20))
472    {
473      t.monitor.Wait(l, system_duration, use_application_time, wait_until);
474    }
475    else
476    {
477      l.Unlock();
478      std::this_thread::sleep_for(system_duration);
479    }
480  }
481#else
482  std::this_thread::sleep_for(sleep_for);
483#endif
484}
485
486void tThread::Start()
487{
488  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Entering");
489  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "PreMutex");
490  tLock l(*this);
491  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Locked");
492  start_signal = true;
493  monitor.Notify(l);
494  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notified thread");
495}
496
497void tThread::StopThread()
498{
499  if (unknown_thread)
500  {
501    RRLIB_LOG_PRINT(WARNING, "Operation not supported for threads of unknown origin.");
502    return;
503  }
504
505  // default implementation - possibly sufficient for some thread classes
506  tLock l(*this);
507  stop_signal = true;
508  monitor.Notify(l);
509}
510
511bool tThread::StopThreads(bool query_only)
512{
513#ifndef RRLIB_SINGLE_THREADED
514  volatile static bool stopping_threadz = false;
515  if (stopping_threadz || query_only)   // We don't do this twice
516  {
517    return stopping_threadz;
518  }
519  stopping_threadz = true;
520  const char*(*GetLogDescription)() = GetLogDescriptionStatic;
521  RRLIB_LOG_PRINT(USER, "Stopping all threads");
522
523  tLock lock(internal::GetThreadList()->obj_mutex);
524  std::vector<std::weak_ptr<tThread>> current_threads_unordered;
525  std::vector<std::weak_ptr<tThread>> current_threads;
526  current_threads_unordered = internal::GetThreadList()->vec;
527  lock.Unlock();
528
529  // Sort threads according to longevity
530  int64_t last_longevity = -1;
531  while (true)
532  {
533    int64_t min_longevity = 0xFFFFFFFFFFLL;
534    for (size_t i = 0; i < current_threads_unordered.size(); i++)
535    {
536      std::shared_ptr<tThread> t = current_threads_unordered[i].lock();
537      if (t && t->longevity < min_longevity && t->longevity > last_longevity)
538      {
539        min_longevity = t->longevity;
540      }
541    }
542    if (min_longevity == 0xFFFFFFFFFFLL)
543    {
544      break;
545    }
546
547    // Copy to new list
548    for (size_t i = 0; i < current_threads_unordered.size(); i++)
549    {
550      std::shared_ptr<tThread> t = current_threads_unordered[i].lock();
551      if (t && t->longevity == min_longevity)
552      {
553        current_threads.push_back(current_threads_unordered[i]);
554      }
555    }
556
557    last_longevity = min_longevity;
558  }
559
560  // Delete threads in now correct order
561  for (size_t i = 0; i < current_threads.size(); i++)
562  {
563    std::weak_ptr<tThread> thread = current_threads[i];
564    std::shared_ptr<tThread> t = thread.lock();
565    if (t && t.get() != &CurrentThread())
566    {
567      if (t->unknown_thread)
568      {
569        RRLIB_LOG_PRINT(WARNING, "Do not know how to stop thread '", t->GetLogDescription(), "' of unknown origin.");
570        continue;
571      }
572
573      RRLIB_LOG_PRINT(DEBUG, "Stopping thread '", t->GetLogDescription(), "'");
574      tLock l(*t);
575      t->stop_signal = true;
576      if (!t->start_signal)
577      {
578        t->monitor.Notify(l);
579      }
580      else
581      {
582        t->monitor.Notify(l);
583        l.Unlock();
584        t->StopThread();
585        t->Join();
586      }
587    }
588  }
589#endif
590
591  return true;
592}
593
594void tThread::Yield()
595{
596  std::this_thread::yield();
597}
598
599tThread::tPointer::~tPointer()
600{
601  tLock l(*pointer);
602  pointer->state = tThread::tState::TERMINATED;
603  //t->curThread.reset();
604  l.Unlock();
605  pointer->self.reset(); // possibly delete thread - important that it's last statement
606}
607
608//----------------------------------------------------------------------
609// End of namespace declaration
610//----------------------------------------------------------------------
611}
612}
Note: See TracBrowser for help on using the repository browser.