source: rrlib_thread/tThread.cpp @ 32:1270a96f17ba

Last change on this file since 32:1270a96f17ba was 32:1270a96f17ba, checked in by Max Reichardt <mreichardt@…>, 5 years ago

Fix for single threaded mode (tThread::StoppingThreads() always returned true)

File size: 16.0 KB
Line 
1//
2// You received this file as part of RRLib
3// Robotics Research Library
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    rrlib/thread/tThread.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2012-07-05
27 *
28 */
29//----------------------------------------------------------------------
30#include "rrlib/thread/tThread.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include <sstream>
36#include <cstring>
37
38#ifndef RRLIB_SINGLE_THREADED
39#include <sys/resource.h>
40#include <sys/syscall.h>
41#include <unistd.h>
42#endif
43
44#include "rrlib/design_patterns/singleton.h"
45//----------------------------------------------------------------------
46// Internal includes with ""
47//----------------------------------------------------------------------
48
49//----------------------------------------------------------------------
50// Debugging
51//----------------------------------------------------------------------
52#include <cassert>
53
54//----------------------------------------------------------------------
55// Namespace usage
56//----------------------------------------------------------------------
57using namespace rrlib::logging;
58
59//----------------------------------------------------------------------
60// Namespace declaration
61//----------------------------------------------------------------------
62namespace rrlib
63{
64namespace thread
65{
66
67//----------------------------------------------------------------------
68// Forward declarations / typedefs / enums
69//----------------------------------------------------------------------
70
71//----------------------------------------------------------------------
72// Const values
73//----------------------------------------------------------------------
74#ifndef RRLIB_SINGLE_THREADED
75#ifdef _PTHREAD_H
76#define RRLIB_THREAD_USING_PTHREADS
77#endif
78#endif
79
80//----------------------------------------------------------------------
81// Implementation
82//----------------------------------------------------------------------
83
84#ifndef RRLIB_SINGLE_THREADED
85thread_local tThread::tPointer tThread::current_thread;
86#endif
87
88namespace internal
89{
90
91/*!
92 * Class for thread (self) deletion
93 */
94class tThreadDeleter
95{
96public:
97  //! for (self) deletion
98  void operator()(tThread* t)
99  {
100    if (t->GetDeleteOnCompletion())
101    {
102      delete t;
103    }
104  }
105};
106
107static std::string GetDefaultThreadName(int64_t id)
108{
109  std::ostringstream oss;
110  oss << "Thread-" << id;
111  return oss.str();
112}
113
114/*! List of threads currently known and running (== all thread objects created) */
115static std::shared_ptr<internal::tVectorWithMutex<std::weak_ptr<tThread>>>& GetThreadList()
116{
117  static std::shared_ptr<internal::tVectorWithMutex<std::weak_ptr<tThread>>> thread_list(new internal::tVectorWithMutex<std::weak_ptr<tThread>>(0x7FFFFFFF));
118  return thread_list;
119}
120
121static uint32_t GetUniqueThreadId()
122{
123  static tOrderedMutex mutex("GetUniqueThreadId()", 0x7FFFFFFE);
124  static bool overflow = false;
125  static uint32_t counter = 1;
126  tLock lock(mutex);
127  if (!overflow)
128  {
129    int result = counter;
130    counter++;
131    if (counter == 0)
132    {
133      overflow = true;
134    }
135    return result;
136  }
137
138  tLock lock2(GetThreadList()->obj_mutex);
139  // hmm... we start at id 1024 - as the former threads may be more long-lived
140  counter = std::max<uint32_t>(1023u, counter);
141  std::vector<std::weak_ptr<tThread>>& current_threads = GetThreadList()->vec;
142  while (true)
143  {
144    counter++;
145    bool used = false;
146    for (auto it = current_threads.begin(); it != current_threads.end(); ++it)
147    {
148      std::shared_ptr<tThread> thread = it->lock();
149      if (thread && thread->GetId() == counter)
150      {
151        used = true;
152        break;
153      }
154    }
155    if (!used)
156    {
157      return counter;
158    }
159  }
160}
161
162
163} // namespace internal
164
165tThread::tThread(bool anonymous, bool legion) :
166  stop_signal(false),
167  lock_stack(),
168  id(internal::GetUniqueThreadId()),
169  name(internal::GetDefaultThreadName(id)),
170  priority(cDEFAULT_PRIORITY),
171  state(tState::RUNNING),
172  self(this, internal::tThreadDeleter()),
173  delete_on_completion(true),
174  start_signal(false),
175  monitor(*this),
176  thread_list_ref(internal::GetThreadList()),
177  locked_objects(),
178  longevity(0),
179  unknown_thread(true),
180#ifndef RRLIB_SINGLE_THREADED
181  wrapped_thread(),
182  handle(pthread_self()),
183#endif
184  joining_threads(0)
185{
186  AddToThreadList();
187
188#ifdef RRLIB_THREAD_USING_PTHREADS
189  // see if we can obtain a thread name
190  char name_buffer[1024];
191  if (!pthread_getname_np(handle, name_buffer, 1023))
192  {
193    name_buffer[1023] = 0;
194    if (strlen(name_buffer) > 0)
195    {
196      name = name_buffer;
197    }
198  }
199#endif
200}
201
202tThread::tThread(const std::string& name) :
203  stop_signal(false),
204  lock_stack(),
205  id(internal::GetUniqueThreadId()),
206  name(name.length() > 0 ? name : internal::GetDefaultThreadName(id)),
207  priority(cDEFAULT_PRIORITY),
208  state(tState::NEW),
209  self(this, internal::tThreadDeleter()),
210  delete_on_completion(false),
211  start_signal(false),
212  monitor(*this),
213  thread_list_ref(internal::GetThreadList()),
214  locked_objects(),
215  longevity(0),
216  unknown_thread(false),
217#ifndef RRLIB_SINGLE_THREADED
218  wrapped_thread(&Launch, this),
219  handle(wrapped_thread.native_handle()),
220#endif
221  joining_threads(0)
222{
223  AddToThreadList();
224  SetName(this->name);
225}
226
227tThread::~tThread()
228{
229  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Deleting thread ", this);
230
231  // remove from thread list
232  tLock lock(thread_list_ref->obj_mutex);
233  assert(this != NULL);
234
235  // remove thread from list
236  for (size_t i = 0; i < thread_list_ref->vec.size(); i++)
237  {
238    std::shared_ptr<tThread> t = thread_list_ref->vec[i].lock();
239    if (t.get() == this)
240    {
241      thread_list_ref->vec.erase(thread_list_ref->vec.begin() + i);
242      break;
243    }
244    if (t.get() == NULL)   // remove empty entries
245    {
246      thread_list_ref->vec.erase(thread_list_ref->vec.begin() + i);
247      i--;
248    }
249  }
250
251  lock.Unlock();
252  if (!unknown_thread)
253  {
254#ifndef RRLIB_SINGLE_THREADED
255    if (&tThread::CurrentThread() != this)
256    {
257      Join(); // we shouldn't delete anything while thread is still running
258    }
259    else if (wrapped_thread.joinable())
260    {
261      wrapped_thread.detach();
262    }
263#endif
264  }
265
266  for (auto rit = locked_objects.rbegin(); rit < locked_objects.rend(); ++rit)
267  {
268    (*rit).reset();
269  }
270}
271
272void tThread::AddToThreadList()
273{
274  tLock lock(thread_list_ref->obj_mutex);
275  thread_list_ref->vec.push_back(self);
276
277  //printf("Creating thread %p %s\n", this, getName().getCString());
278  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Creating thread ", this);
279}
280
281std::string tThread::GetLogDescription() const
282{
283  std::ostringstream oss;
284  oss << "Thread " << id << " '" << GetName() << "'";
285  return oss.str();
286}
287
288void tThread::Join()
289{
290  if (unknown_thread)
291  {
292    RRLIB_LOG_PRINT(WARNING, "Operation not supported for threads of unknown origin.");
293    return;
294  }
295
296#ifndef RRLIB_SINGLE_THREADED
297  if (!wrapped_thread.joinable())
298  {
299    return;
300  }
301  if (&CurrentThread() == this)
302  {
303    RRLIB_LOG_PRINT(DEBUG_WARNING, "Thread cannot join itself");
304    return;
305  }
306  PreJoin();
307  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Joining Thread");
308
309  int joining = joining_threads.fetch_add(1);
310  if (joining >= 1)
311  {
312    RRLIB_LOG_PRINT(DEBUG_WARNING, "Multiple threads are trying to join. Returning this thread without joining.");
313    return;
314  }
315  if (wrapped_thread.joinable())
316  {
317    wrapped_thread.join();
318  }
319  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Joined Thread");
320#endif
321}
322
323void tThread::Launch(tThread* thread_ptr)
324{
325  thread_ptr->Launcher();
326}
327
328void tThread::Launcher()
329{
330#ifndef RRLIB_SINGLE_THREADED
331  //unsafe _FINROC_LOG_MESSAGE(DEBUG_VERBOSE_2, logDomain) << "Entering";
332  current_thread.pointer = this;
333  tLock l(*this);
334  state = tState::PREPARE_RUNNING;
335  //unsafe _FINROC_LOG_MESSAGE(DEBUG_VERBOSE_2, logDomain) << "Locked";
336
337  // wait for start signal
338  while ((!(start_signal)) && (!(stop_signal)))
339  {
340    monitor.Wait(l);
341  }
342
343  // run thread?
344  state = tState::RUNNING;
345  if (start_signal/* && (!(stop_signal))*/)
346  {
347    l.Unlock();
348    RRLIB_LOG_PRINT(DEBUG, "Thread started");
349    Run();
350    RRLIB_LOG_PRINT(DEBUG, "Thread exited normally");
351  }
352
353  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Exiting");
354
355  l.Unlock();
356  // Do this BEFORE thread is removed from list - to ensure this is done StopThreads
357  try
358  {
359    for (auto rit = locked_objects.rbegin(); rit < locked_objects.rend(); ++rit)
360    {
361      (*rit).reset();
362    }
363  }
364  catch (const std::exception& e)
365  {
366    RRLIB_LOG_PRINT(ERROR, "Thread encountered exception during cleanup: ", e.what());
367  }
368#endif
369}
370
371void tThread::LockObject(std::shared_ptr<void> obj)
372{
373  tLock(*this);
374  locked_objects.push_back(obj);
375}
376
377void tThread::PreJoin()
378{
379  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Entering");
380  tLock l(*this);
381  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Locked");
382  if (state == tState::PREPARE_RUNNING || state == tState::NEW)
383  {
384    RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notifying");
385    stop_signal = true;
386    monitor.Notify(l);
387    RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notified");
388  }
389  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Leaving");
390}
391
392void tThread::SetName(const std::string& name)
393{
394  this->name = name;
395#ifdef RRLIB_THREAD_USING_PTHREADS
396  pthread_setname_np(handle, name.substr(0, 15).c_str());
397#endif
398}
399
400void tThread::SetPriority(int new_priority)
401{
402#ifdef RRLIB_THREAD_USING_PTHREADS
403  //if (new_priority < sched_get_priority_min(SCHED_OTHER) || new_priority > sched_get_priority_max(SCHED_OTHER))
404  if (new_priority < -20 || new_priority > 19)
405  {
406    //pthread_getschedparam(handle, &policy, &param);
407    RRLIB_LOG_PRINT(ERROR, "Invalid thread priority: ", new_priority, ". Ignoring.");// Valid range is ", sched_get_priority_min(SCHED_OTHER), " to ", sched_get_priority_max(SCHED_OTHER),
408    //    ". Current priority is ", param.sched_priority, ".");
409    return;
410  }
411  if (CurrentThreadId() != GetId())
412  {
413    RRLIB_LOG_PRINT(ERROR, "SetPriority can only be called from the current thread");
414    return;
415  }
416
417  //int error_code = pthread_setschedparam(handle, SCHED_OTHER, &param);
418  // works according to "man pthreads" and discussion on: http://stackoverflow.com/questions/7684404/is-nice-used-to-change-the-thread-priority-or-the-process-priority
419  pid_t thread_id = syscall(SYS_gettid);
420  int error_code = setpriority(PRIO_PROCESS, thread_id, new_priority);
421  if (error_code)
422  {
423    RRLIB_LOG_PRINT(ERROR, "Failed to change thread priority: ", strerror(error_code));
424    return;
425  }
426  RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Set niceness to ", new_priority);
427  priority = new_priority;
428#endif
429}
430
431void tThread::SetRealtime()
432{
433#ifdef RRLIB_THREAD_USING_PTHREADS
434  struct sched_param param;
435  param.sched_priority = 49;
436  int error_code = pthread_setschedparam(handle, SCHED_FIFO, &param);
437  if (error_code)
438  {
439    //printf("Failed making thread a real-time thread. Possibly current user has insufficient rights.\n");
440    RRLIB_LOG_PRINT(ERROR, "Failed making thread a real-time thread.", (error_code == EPERM ? " Caller does not have appropriate privileges." : ""));
441  }
442#endif
443}
444
445
446void tThread::Sleep(const rrlib::time::tDuration& sleep_for, bool use_application_time, rrlib::time::tTimestamp wait_until)
447{
448#ifndef RRLIB_SINGLE_THREADED
449  rrlib::time::tTimeMode time_mode = rrlib::time::GetTimeMode();
450  tThread& t = CurrentThread();
451  if (time_mode == rrlib::time::tTimeMode::SYSTEM_TIME || (!use_application_time))
452  {
453    if (sleep_for < std::chrono::milliseconds(400))
454    {
455      std::this_thread::sleep_for(sleep_for);
456    }
457    else
458    {
459      tLock l(t);
460      t.monitor.Wait(l, sleep_for, use_application_time, wait_until);
461    }
462  }
463  else if (time_mode == rrlib::time::tTimeMode::CUSTOM_CLOCK)
464  {
465    tLock l(t);
466    t.monitor.Wait(l, sleep_for, use_application_time, wait_until);
467  }
468  else
469  {
470    assert(time_mode == rrlib::time::tTimeMode::STRETCHED_SYSTEM_TIME);
471    tLock l(t);
472    rrlib::time::tDuration system_duration = rrlib::time::ToSystemDuration(sleep_for);
473    if (system_duration > std::chrono::milliseconds(20))
474    {
475      t.monitor.Wait(l, system_duration, use_application_time, wait_until);
476    }
477    else
478    {
479      l.Unlock();
480      std::this_thread::sleep_for(system_duration);
481    }
482  }
483#else
484  std::this_thread::sleep_for(sleep_for);
485#endif
486}
487
488void tThread::Start()
489{
490  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Entering");
491  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "PreMutex");
492  tLock l(*this);
493  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Locked");
494  start_signal = true;
495  monitor.Notify(l);
496  RRLIB_LOG_PRINT(DEBUG_VERBOSE_2, "Notified thread");
497}
498
499void tThread::StopThread()
500{
501  if (unknown_thread)
502  {
503    RRLIB_LOG_PRINT(WARNING, "Operation not supported for threads of unknown origin.");
504    return;
505  }
506
507  // default implementation - possibly sufficient for some thread classes
508  tLock l(*this);
509  stop_signal = true;
510  monitor.Notify(l);
511}
512
513bool tThread::StopThreads(bool query_only)
514{
515  volatile static bool stopping_threadz = false;
516  if (stopping_threadz || query_only)   // We don't do this twice
517  {
518    return stopping_threadz;
519  }
520#ifndef RRLIB_SINGLE_THREADED
521  stopping_threadz = true;
522  const char*(*GetLogDescription)() = GetLogDescriptionStatic;
523  RRLIB_LOG_PRINT(USER, "Stopping all threads");
524
525  tLock lock(internal::GetThreadList()->obj_mutex);
526  std::vector<std::weak_ptr<tThread>> current_threads_unordered;
527  std::vector<std::weak_ptr<tThread>> current_threads;
528  current_threads_unordered = internal::GetThreadList()->vec;
529  lock.Unlock();
530
531  // Sort threads according to longevity
532  int64_t last_longevity = -1;
533  while (true)
534  {
535    int64_t min_longevity = 0xFFFFFFFFFFLL;
536    for (size_t i = 0; i < current_threads_unordered.size(); i++)
537    {
538      std::shared_ptr<tThread> t = current_threads_unordered[i].lock();
539      if (t && t->longevity < min_longevity && t->longevity > last_longevity)
540      {
541        min_longevity = t->longevity;
542      }
543    }
544    if (min_longevity == 0xFFFFFFFFFFLL)
545    {
546      break;
547    }
548
549    // Copy to new list
550    for (size_t i = 0; i < current_threads_unordered.size(); i++)
551    {
552      std::shared_ptr<tThread> t = current_threads_unordered[i].lock();
553      if (t && t->longevity == min_longevity)
554      {
555        current_threads.push_back(current_threads_unordered[i]);
556      }
557    }
558
559    last_longevity = min_longevity;
560  }
561
562  // Delete threads in now correct order
563  for (size_t i = 0; i < current_threads.size(); i++)
564  {
565    std::weak_ptr<tThread> thread = current_threads[i];
566    std::shared_ptr<tThread> t = thread.lock();
567    if (t && t.get() != &CurrentThread())
568    {
569      if (t->unknown_thread)
570      {
571        RRLIB_LOG_PRINT(WARNING, "Do not know how to stop thread '", t->GetLogDescription(), "' of unknown origin.");
572        continue;
573      }
574
575      RRLIB_LOG_PRINT(DEBUG, "Stopping thread '", t->GetLogDescription(), "'");
576      tLock l(*t);
577      t->stop_signal = true;
578      if (!t->start_signal)
579      {
580        t->monitor.Notify(l);
581      }
582      else
583      {
584        t->monitor.Notify(l);
585        l.Unlock();
586        t->StopThread();
587        t->Join();
588      }
589    }
590  }
591#endif
592
593  return true;
594}
595
596void tThread::Yield()
597{
598  std::this_thread::yield();
599}
600
601tThread::tPointer::~tPointer()
602{
603  tLock l(*pointer);
604  pointer->state = tThread::tState::TERMINATED;
605  //t->curThread.reset();
606  l.Unlock();
607  pointer->self.reset(); // possibly delete thread - important that it's last statement
608}
609
610//----------------------------------------------------------------------
611// End of namespace declaration
612//----------------------------------------------------------------------
613}
614}
Note: See TracBrowser for help on using the repository browser.