source: rrlib_thread/tConditionVariable.cpp @ 39:d45d6f5f5d3a

17.03
Last change on this file since 39:d45d6f5f5d3a was 39:d45d6f5f5d3a, checked in by Max Reichardt <mreichardt@…>, 21 months ago

Makes condition variable wrapper return status from Wait() method with timeout

File size: 12.8 KB
Line 
1//
2// You received this file as part of RRLib
3// Robotics Research Library
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    rrlib/thread/tConditionVariable.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2012-06-12
27 *
28 */
29//----------------------------------------------------------------------
30#include "rrlib/thread/tConditionVariable.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include "rrlib/design_patterns/singleton.h"
36#include "rrlib/time/tTimeStretchingListener.h"
37#include "rrlib/logging/messages.h"
38
39//----------------------------------------------------------------------
40// Internal includes with ""
41//----------------------------------------------------------------------
42
43//----------------------------------------------------------------------
44// Debugging
45//----------------------------------------------------------------------
46#include <cassert>
47
48//----------------------------------------------------------------------
49// Namespace usage
50//----------------------------------------------------------------------
51
52//----------------------------------------------------------------------
53// Namespace declaration
54//----------------------------------------------------------------------
55namespace rrlib
56{
57namespace thread
58{
59
60//----------------------------------------------------------------------
61// Forward declarations / typedefs / enums
62//----------------------------------------------------------------------
63
64//----------------------------------------------------------------------
65// Const values
66//----------------------------------------------------------------------
67
68//----------------------------------------------------------------------
69// Implementation
70//----------------------------------------------------------------------
71
72#ifndef RRLIB_SINGLE_THREADED
73
74namespace
75{
76
77inline tConditionVariableStatus ToConditionVariableStatus(std::cv_status status)
78{
79  return status == std::cv_status::timeout ? tConditionVariableStatus::TIMEOUT : tConditionVariableStatus::NO_TIMEOUT;
80}
81
82typedef rrlib::design_patterns::tSingletonHolder<std::vector<tConditionVariable*>> tConditionVariableListSingleton;
83static std::vector<tConditionVariable*>* GetConditionVariableList()
84{
85  try
86  {
87    return &tConditionVariableListSingleton::Instance();
88  }
89  catch (const std::logic_error& e)
90  {}
91  return NULL;
92}
93
94/*!
95 * This object increments value while it exists
96 */
97struct tTemporalIncrement : private util::tNoncopyable
98{
99  int& value;
100  tTemporalIncrement(int& value) : value(value)
101  {
102    value++;
103  }
104
105  ~tTemporalIncrement()
106  {
107    value--;
108  }
109};
110
111} // namespace
112
113class tTimeStretchingListenerImpl : public rrlib::time::tTimeStretchingListener
114{
115  rrlib::time::tTimeMode old_mode;
116
117  virtual void TimeChanged(const rrlib::time::tTimestamp& current_time) override
118  {
119    if (!GetConditionVariableList())
120    {
121      return;
122    }
123
124    // we don't need any locks, because TimeChanged is called with rrlib::time::mutex acquired
125    std::vector<tConditionVariable*>& l = *GetConditionVariableList();
126    for (auto it = l.begin(); it < l.end(); it++)
127    {
128      tLock l((*it)->mutex);
129      if ((*it)->waiting_on_monitor && (*it)->waiting_until_application_time != rrlib::time::cNO_TIME)
130      {
131        (*it)->time_scaling_factor_changed = true;
132        if (current_time >= (*it)->waiting_until_application_time)
133        {
134          (*it)->wrapped.notify_all();
135        }
136        else
137        {
138          rrlib::time::tDuration wait_for = (*it)->waiting_until_application_time - current_time;
139          if (wait_for > (*it)->waiting_for_application_duration)
140          {
141            // clock inconsistency
142            RRLIB_LOG_PRINT(DEBUG_WARNING, "Detected clock inconsistency. New timestamp is more than ",
143                            rrlib::time::ToIsoString(wait_for - (*it)->waiting_for_application_duration), " in the past.");
144            (*it)->waiting_for_application_duration = (*it)->waiting_for_application_duration / 2;
145            RRLIB_LOG_PRINT(DEBUG_WARNING, "Recovering by waiting another ", rrlib::time::ToString((*it)->waiting_for_application_duration), " (half the original timeout).");
146
147            (*it)->waiting_until_application_time = current_time - (*it)->waiting_for_application_duration;
148          }
149          else
150          {
151            (*it)->waiting_for_application_duration = wait_for;
152          }
153        }
154      }
155    }
156  }
157
158  virtual void TimeModeChanged(rrlib::time::tTimeMode new_mode) override
159  {
160    if (!GetConditionVariableList())
161    {
162      return;
163    }
164
165    if (old_mode == rrlib::time::tTimeMode::CUSTOM_CLOCK && new_mode != old_mode)
166    {
167      // we don't need any locks, because TimeChanged is called with rrlib::time::mutex acquired
168      std::vector<tConditionVariable*>& l = *GetConditionVariableList();
169      for (auto it = l.begin(); it < l.end(); it++)
170      {
171        tLock l((*it)->mutex);
172        if ((*it)->waiting_on_monitor && (*it)->waiting_until_application_time != rrlib::time::cNO_TIME)
173        {
174          (*it)->wrapped.notify_all();
175        }
176      }
177    }
178    old_mode = new_mode;
179  }
180
181  virtual void TimeStretchingFactorChanged(bool app_time_faster) override
182  {
183    if (!GetConditionVariableList())
184    {
185      return;
186    }
187
188    // we don't need any locks, because TimeChanged is called with rrlib::time::mutex acquired
189    std::vector<tConditionVariable*>& l = *GetConditionVariableList();
190    for (auto it = l.begin(); it < l.end(); it++)
191    {
192      tLock l((*it)->mutex);
193      if ((*it)->waiting_on_monitor && (*it)->waiting_until_application_time != rrlib::time::cNO_TIME)
194      {
195        (*it)->time_scaling_factor_changed = true;
196        if (app_time_faster)
197        {
198          (*it)->wrapped.notify_one();
199        }
200      }
201    }
202  }
203
204public:
205
206  tTimeStretchingListenerImpl() : old_mode(rrlib::time::GetTimeMode()) {}
207};
208
209static tTimeStretchingListenerImpl time_stretching_listener;
210
211
212tConditionVariable::tConditionVariable(tMutex& mutex) :
213  mutex(mutex),
214  wrapped(),
215  registered_in_list(false),
216  waiting_on_monitor(0),
217  waiting_until_application_time(rrlib::time::cNO_TIME),
218  waiting_for_application_duration(-1),
219  time_scaling_factor_changed(false),
220  notified(false)
221{}
222
223tConditionVariable::~tConditionVariable()
224{
225  assert(waiting_on_monitor == 0);
226  if (registered_in_list && GetConditionVariableList())
227  {
228    try
229    {
230      std::lock_guard<std::mutex> lock(rrlib::time::internal::tTimeMutex::Instance()); // we should not hold any critical locks - otherwise this will dead-lock
231      std::vector<tConditionVariable*>& l = *GetConditionVariableList();
232      l.erase(std::remove(l.begin(), l.end(), this), l.end());
233    }
234    catch (const std::logic_error& le) // can happen if tTimeMutex has already been destructed. In this case, we do not need to worry about unregistering anymore.
235    {}
236  }
237}
238
239bool tConditionVariable::ConditionVariableLockCorrectlyAcquired(const tLock& l) const
240{
241#ifdef RRLIB_THREAD_ENFORCE_LOCK_ORDER
242  return internal::tLockStack::ConditionVariableLockCorrectlyAcquired(l);
243#else
244  return l.IsLocked(mutex);
245#endif
246}
247
248void tConditionVariable::Notify(tLock& l)
249{
250  assert(ConditionVariableLockCorrectlyAcquired(l));
251  if (waiting_on_monitor)
252  {
253    notified = true;
254    wrapped.notify_one();
255  }
256}
257
258void tConditionVariable::NotifyAll(tLock& l)
259{
260  assert(ConditionVariableLockCorrectlyAcquired(l));
261  if (waiting_on_monitor)
262  {
263    notified = true;
264    wrapped.notify_all();
265  }
266}
267
268void tConditionVariable::Wait(tLock& l)
269{
270  assert(ConditionVariableLockCorrectlyAcquired(l));
271  tTemporalIncrement count_waiting_thread(waiting_on_monitor);
272  wrapped.wait(l.GetSimpleLock());
273}
274
275tConditionVariableStatus tConditionVariable::Wait(tLock& l, const rrlib::time::tDuration& wait_for, bool use_application_time, rrlib::time::tTimestamp wait_until)
276{
277  assert(ConditionVariableLockCorrectlyAcquired(l));
278  tTemporalIncrement count_waiting_thread(waiting_on_monitor);
279
280  if (!use_application_time)
281  {
282    return ToConditionVariableStatus(wrapped.wait_for(l.GetSimpleLock(), wait_for));
283  }
284  else
285  {
286    // Put condition variable in list
287    if (!registered_in_list)
288    {
289      try
290      {
291        std::lock_guard<std::mutex> l(rrlib::time::internal::tTimeMutex::Instance()); // this won't dead-lock, because this condition variable is not in listener list yet
292        GetConditionVariableList()->push_back(this);
293        registered_in_list = true;
294      }
295      catch (const std::logic_error&) // tTimeMutex no longer exists
296      {
297        RRLIB_LOG_PRINT(DEBUG_WARNING, "Won't wait after rrlibs have been (partly) destructed.");
298        return tConditionVariableStatus::TIMEOUT;
299      }
300    }
301
302    // Init variables
303    time_scaling_factor_changed = false;
304    notified = false;
305    assert(waiting_until_application_time == rrlib::time::cNO_TIME && "Only one thread may wait on condition variable using 'application time' timeout.");
306    if (wait_until == rrlib::time::cNO_TIME)
307    {
308      wait_until = rrlib::time::Now() + wait_for;
309    }
310    waiting_until_application_time = wait_until;
311    waiting_for_application_duration = wait_for;
312
313    // Wait...
314    rrlib::time::tDuration system_duration = rrlib::time::ToSystemDuration(wait_for);
315    rrlib::time::tTimeMode time_mode = rrlib::time::GetTimeMode();
316    while (true)
317    {
318      if (time_mode == rrlib::time::tTimeMode::CUSTOM_CLOCK)
319      {
320        wrapped.wait(l.GetSimpleLock());
321        time_mode = rrlib::time::GetTimeMode();
322        if (notified || time_mode == rrlib::time::tTimeMode::CUSTOM_CLOCK)
323        {
324          break;
325        }
326      }
327      else
328      {
329        wrapped.wait_for(l.GetSimpleLock(), system_duration);
330        rrlib::time::tTimeMode new_time_mode = rrlib::time::GetTimeMode();
331        if (notified || (!time_scaling_factor_changed && new_time_mode == time_mode))
332        {
333          break;
334        }
335        time_mode = new_time_mode;
336      }
337      rrlib::time::tTimestamp current_app_time = rrlib::time::Now();
338      if (current_app_time >= wait_until)
339      {
340        break;
341      }
342
343      // ok... so we need to wait more
344      assert(!notified);
345      rrlib::time::tDuration waiting_for_application_duration_new = wait_until - current_app_time;
346      if (waiting_for_application_duration_new > waiting_for_application_duration)
347      {
348        // clock inconsistency
349        RRLIB_LOG_PRINT(DEBUG_WARNING, "Detected clock inconsistency. New timestamp is more than ",
350                        rrlib::time::ToIsoString(waiting_for_application_duration_new - waiting_for_application_duration), " in the past.");
351        waiting_for_application_duration = waiting_for_application_duration / 2;
352        RRLIB_LOG_PRINT(DEBUG_WARNING, "Recovering by waiting another ", rrlib::time::ToString(waiting_for_application_duration), " (half the original timeout).");
353
354        waiting_until_application_time = current_app_time - waiting_for_application_duration;
355      }
356      else
357      {
358        waiting_for_application_duration = waiting_until_application_time - current_app_time;
359      }
360      system_duration = rrlib::time::ToSystemDuration(waiting_for_application_duration);
361      time_scaling_factor_changed = false;
362    }
363
364    waiting_until_application_time = rrlib::time::cNO_TIME;
365  }
366  return notified ? tConditionVariableStatus::TIMEOUT : tConditionVariableStatus::NO_TIMEOUT;
367}
368#else // RRLIB_SINGLE_THREADED
369
370tConditionVariable::tConditionVariable(tMutex& mutex)
371{}
372
373tConditionVariable::~tConditionVariable()
374{}
375
376void tConditionVariable::Notify(tLock& l)
377{}
378
379void tConditionVariable::NotifyAll(tLock& l)
380{}
381
382void tConditionVariable::Wait(tLock& l)
383{}
384
385tConditionVariableStatus tConditionVariable::Wait(tLock& l, const rrlib::time::tDuration& wait_for, bool use_application_time, rrlib::time::tTimestamp wait_until)
386{
387  return tConditionVariableStatus::TIMEOUT;  // TODO support pending tasks (to do something during wait) in single-threaded mode
388}
389
390#endif // RRLIB_SINGLE_THREADED
391
392//----------------------------------------------------------------------
393// End of namespace declaration
394//----------------------------------------------------------------------
395}
396}
Note: See TracBrowser for help on using the repository browser.