source: finroc_plugins_scheduling/tThreadContainerThread.cpp @ 46:ea5236122831

17.03
Last change on this file since 46:ea5236122831 was 46:ea5236122831, checked in by Max Reichardt <max.reichardt@…>, 3 months ago

Adds 'pause thread' port to thread containers - and tidies code around implementation

File size: 27.8 KB
Line 
1//
2// You received this file as part of Finroc
3// A framework for intelligent robot control
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    plugins/scheduling/tThreadContainerThread.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2012-12-02
27 *
28 */
29//----------------------------------------------------------------------
30#include "plugins/scheduling/tThreadContainerThread.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include "core/tRuntimeEnvironment.h"
36#include "core/port/tAggregatedEdge.h"
37#include <set>
38
39//----------------------------------------------------------------------
40// Internal includes with ""
41//----------------------------------------------------------------------
42#include "plugins/scheduling/tExecutionControl.h"
43#include "plugins/scheduling/tPeriodicFrameworkElementTask.h"
44#include "plugins/scheduling/tThreadContainerBase.h"
45
46//----------------------------------------------------------------------
47// Debugging
48//----------------------------------------------------------------------
49#include <cassert>
50
51//----------------------------------------------------------------------
52// Namespace usage
53//----------------------------------------------------------------------
54
55//----------------------------------------------------------------------
56// Namespace declaration
57//----------------------------------------------------------------------
58namespace finroc
59{
60namespace scheduling
61{
62
63//----------------------------------------------------------------------
64// Forward declarations / typedefs / enums
65//----------------------------------------------------------------------
66
67/*! Flags used for storing information in tPeriodicFrameworkElementTask::task_classification */
68enum tTaskClassificationFlag
69{
70  eSENSE_TASK = 1,
71  eSENSE_DEPENDENCY = 2,
72  eSENSE_DEPENDENT = 4,
73  eCONTROL_TASK = 8,
74  eCONTROL_DEPENDENCY = 16,
75  eCONTROL_DEPENDENT = 32
76};
77
78typedef core::tFrameworkElement::tFlag tFlag;
79
80//----------------------------------------------------------------------
81// Const values
82//----------------------------------------------------------------------
83#define FINROC_PORT_BASED_SCHEDULING
84
85//----------------------------------------------------------------------
86// Implementation
87//----------------------------------------------------------------------
88
89tThreadContainerThread* tThreadContainerThread::single_thread_container = nullptr;
90
91// Abort predicates for tThreadContainerThread::ForEachConnectedTask()
92static bool IsSensorInterface(core::tEdgeAggregator& ea)
93{
94  return ea.GetFlag(core::tFrameworkElement::tFlag::SENSOR_DATA);
95}
96static bool IsControllerInterface(core::tEdgeAggregator& ea)
97{
98  return ea.GetFlag(core::tFrameworkElement::tFlag::CONTROLLER_DATA);
99}
100static bool IsSensorOrControllerInterface(core::tEdgeAggregator& ea)
101{
102  return IsSensorInterface(ea) || IsControllerInterface(ea);
103}
104static bool AlwaysFalse(core::tEdgeAggregator& ea)
105{
106  return false;
107}
108
109/*!
110 * \param fe Framework element
111 * \return Is framework element an interface?
112 */
113static inline bool IsInterface(core::tFrameworkElement& fe)
114{
115  return fe.GetFlag(tFlag::EDGE_AGGREGATOR) || fe.GetFlag(tFlag::INTERFACE);
116}
117
118
119tThreadContainerThread::tThreadContainerThread(tThreadContainerBase& thread_container_base, core::tFrameworkElement& thread_container_element) :
120  tLoopThread(thread_container_base.GetCycleTime(), true, thread_container_base.warn_on_cycle_time_exceed.Get()),
121  tWatchDogTask(true),
122  thread_container_base(thread_container_base),
123  thread_container_element(thread_container_element),
124  reschedule(true),
125  schedule(),
126  task_set_first_index { 0, 0, 0, 0 },
127                     total_execution_duration(0),
128                     max_execution_duration(0),
129                     execution_count(0),
130                     current_task(NULL),
131                     current_cycle_start_application_time(rrlib::time::cNO_TIME)
132{
133  this->SetName("ThreadContainer " + thread_container_element.GetName());
134  this->thread_container_element.GetRuntime().AddListener(*this);
135#ifdef RRLIB_SINGLE_THREADED
136  assert(single_thread_container == nullptr);
137  single_thread_container = this;
138#endif
139}
140
141tThreadContainerThread::~tThreadContainerThread()
142{
143  this->thread_container_element.GetRuntime().RemoveListener(*this);
144  single_thread_container = nullptr;
145}
146
147std::string tThreadContainerThread::CreateLoopDebugOutput(const std::vector<tPeriodicFrameworkElementTask*>& task_list)
148{
149  std::ostringstream stream;
150  for (auto it = task_list.rbegin(); it != task_list.rend(); ++it)
151  {
152    stream << (it != task_list.rbegin() ? "-> " : "   ");
153    stream << (*it)->GetLogDescription() << std::endl;
154    for (auto next = (*it)->next_tasks.begin(); next != (*it)->next_tasks.end(); ++next)
155    {
156      if (*next == *task_list.rbegin())
157      {
158        stream << "-> " << (*next)->GetLogDescription();
159        return stream.str();
160      }
161    }
162  }
163  return "ERROR";
164}
165
166template <typename T1, typename T2>
167void Increment(bool trace_reverse, T1& it_incoming, T2& it_outgoing)
168{
169  if (trace_reverse)
170  {
171    it_incoming++;
172  }
173  else
174  {
175    it_outgoing++;
176  }
177}
178
179template <bool (ABORT_PREDICATE)(core::tEdgeAggregator&), class TFunction>
180void tThreadContainerThread::ForEachConnectedTask(core::tEdgeAggregator& origin, std::vector<core::tEdgeAggregator*>& trace, TFunction& function, bool trace_reverse)
181{
182  // Add to trace stack
183  trace.push_back(&origin);
184
185#ifdef FINROC_PORT_BASED_SCHEDULING
186  for (auto it = origin.ChildPortsBegin(); it != origin.ChildPortsEnd(); ++it)
187  {
188    ForEachConnectedTask<ABORT_PREDICATE, TFunction>(*it, trace, function, trace_reverse);
189  }
190
191  // remove from trace stack
192  assert(trace[trace.size() - 1] == &origin);
193  trace.pop_back();
194}
195
196template <bool (ABORT_PREDICATE)(core::tEdgeAggregator&), class TFunction>
197void tThreadContainerThread::ForEachConnectedTask(core::tAbstractPort& origin, std::vector<core::tEdgeAggregator*>& trace, TFunction& function, bool trace_reverse)
198{
199#endif
200
201  auto it_incoming = origin.IncomingConnectionsBegin();
202  auto end_incoming = origin.IncomingConnectionsEnd();
203  auto it_outgoing = origin.OutgoingConnectionsBegin();
204  auto end_outgoing = origin.OutgoingConnectionsEnd();
205  for (; trace_reverse ? it_incoming != end_incoming : it_outgoing != end_outgoing; Increment(trace_reverse, it_incoming, it_outgoing))
206  {
207#ifdef FINROC_PORT_BASED_SCHEDULING
208    core::tAbstractPort& dest_port = trace_reverse ? it_incoming->Source() : it_outgoing->Destination();
209    core::tEdgeAggregator* dest_aggregator = core::tEdgeAggregator::GetAggregator(dest_port);
210#else
211    core::tAggregatedEdge& aggregated_edge = trace_reverse ? **it_incoming : **it_outgoing;
212    core::tEdgeAggregator* dest_aggregator = trace_reverse ? &aggregated_edge.source : &aggregated_edge.destination;
213#endif
214    tExecutionControl* execution_control = dest_aggregator ? tExecutionControl::Find(*dest_aggregator) : nullptr;
215    if ((!execution_control) || execution_control->GetAnnotated<core::tFrameworkElement>() != &thread_container_element)
216    {
217      continue;
218    }
219    core::tEdgeAggregator& dest = *dest_aggregator;
220    if (ABORT_PREDICATE(dest))
221    {
222      continue;
223    }
224
225    if (std::find(trace.begin(), trace.end(), &dest) == trace.end())
226    {
227      // Have we reached another task?
228      tPeriodicFrameworkElementTask* connected_task = dest.GetAnnotation<tPeriodicFrameworkElementTask>();
229      if (connected_task == NULL && IsInterface(dest))
230      {
231        connected_task = dest.GetParent()->GetAnnotation<tPeriodicFrameworkElementTask>();
232      }
233      if (connected_task == NULL && trace_reverse && IsInterface(dest))
234      {
235        for (auto child = dest.GetParent()->ChildrenBegin(); child != dest.GetParent()->ChildrenEnd(); ++child)
236        {
237          if (IsInterface(*child))
238          {
239            tPeriodicFrameworkElementTask* task_to_test = child->template GetAnnotation<tPeriodicFrameworkElementTask>();
240            if (task_to_test && std::find(task_to_test->outgoing.begin(), task_to_test->outgoing.end(), &dest) != task_to_test->outgoing.end())
241            {
242              connected_task = task_to_test;
243              break;
244            }
245          }
246        }
247      }
248      if (connected_task)
249      {
250        function(*connected_task);
251        continue;
252      }
253
254      // continue from this edge aggregator
255      auto it_next = trace_reverse ? dest.IncomingConnectionsBegin() : dest.OutgoingConnectionsBegin();
256      auto end_next = trace_reverse ? dest.IncomingConnectionsEnd() : dest.OutgoingConnectionsEnd();
257      if (it_next != end_next) // not empty?
258      {
259#ifdef FINROC_PORT_BASED_SCHEDULING
260        trace.push_back(&dest);
261        ForEachConnectedTask<ABORT_PREDICATE, TFunction>(dest_port, trace, function, trace_reverse);
262        trace.pop_back();
263#else
264        ForEachConnectedTask<ABORT_PREDICATE, TFunction>(dest, trace, function, trace_reverse);
265#endif
266      }
267      else if (IsModuleInputInterface(dest)) // in case we have a module with event-triggered execution (and, hence, no periodic task)
268      {
269        core::tFrameworkElement* parent = dest.GetParent();
270        if (parent->GetFlag(tFlag::EDGE_AGGREGATOR))
271        {
272          core::tEdgeAggregator* ea = static_cast<core::tEdgeAggregator*>(parent);
273          if (std::find(trace.begin(), trace.end(), ea) == trace.end())
274          {
275            ForEachConnectedTask<ABORT_PREDICATE, TFunction>(*ea, trace, function, trace_reverse);
276          }
277        }
278        // if we have e.g. an sensor input interface, only continue with sensor output
279        uint required_flags = dest.GetAllFlags().Raw() & (tFlag::SENSOR_DATA | tFlag::CONTROLLER_DATA).Raw();
280        required_flags |= (tFlag::READY | tFlag::EDGE_AGGREGATOR | tFlag::INTERFACE).Raw();
281        for (auto it = parent->ChildrenBegin(); it != parent->ChildrenEnd(); ++it)
282        {
283          if ((it->GetAllFlags().Raw() & required_flags) == required_flags)
284          {
285            core::tEdgeAggregator& ea = static_cast<core::tEdgeAggregator&>(*it);
286            if (std::find(trace.begin(), trace.end(), &ea) == trace.end())
287            {
288              ForEachConnectedTask<ABORT_PREDICATE, TFunction>(ea, trace, function, trace_reverse);
289            }
290          }
291        }
292      }
293    }
294  }
295
296#ifndef FINROC_PORT_BASED_SCHEDULING
297  // remove from trace stack
298  assert(trace[trace.size() - 1] == &origin);
299  trace.pop_back();
300#endif
301}
302
303std::string tThreadContainerThread::GetCurrentTaskName() const
304{
305  tPeriodicFrameworkElementTask* task = current_task;
306  if (!task)
307  {
308    return "";
309  }
310  std::stringstream stream;
311  stream << *(task->incoming.size() > 0 ? task->incoming[0] : task->GetAnnotated<core::tFrameworkElement>());
312  return stream.str();
313}
314
315void tThreadContainerThread::HandleWatchdogAlert()
316{
317  std::string stuck_name = GetCurrentTaskName();
318  if (stuck_name.empty())
319  {
320    FINROC_LOG_PRINT(ERROR, "Got stuck without executing any task!? This should not happen.");
321  }
322  else
323  {
324    FINROC_LOG_PRINT(ERROR, "Got stuck executing task associated with '", stuck_name, "'. Please check your code for infinite loops etc.!");
325  }
326  tWatchDogTask::Deactivate();
327}
328
329bool tThreadContainerThread::IsModuleInputInterface(core::tFrameworkElement& fe)
330{
331  if (IsInterface(fe))
332  {
333    uint port_count = 0;
334    uint pure_input_port_count = 0;
335    for (auto it = fe.ChildPortsBegin(); it != fe.ChildPortsEnd(); ++it)
336    {
337      if (data_ports::IsDataFlowType(it->GetDataType()))
338      {
339        port_count++;
340        if (it->GetFlag(tFlag::ACCEPTS_DATA) && (!it->GetFlag(tFlag::EMITS_DATA)))
341        {
342          pure_input_port_count++;
343        }
344      }
345    }
346    return (2 * pure_input_port_count) >= port_count; // heuristic: min. 50% of ports are pure input ports
347  }
348  return false;
349}
350
351void tThreadContainerThread::MainLoopCallback()
352{
353  if (reschedule)
354  {
355    // TODO: this rescheduling implementation leads to unpredictable delays (scheduling could be performed by another thread)
356    reschedule = false;
357    {
358      tLock lock(this->thread_container_element.GetStructureMutex());
359      schedule.clear();
360      rrlib::time::tTimestamp start_time = rrlib::time::Now();
361
362      /*! Sets of tasks that need to be scheduled */
363      std::set<tPeriodicFrameworkElementTask*> sense_tasks, control_tasks, initial_tasks, other_tasks;
364
365      /*! Sense and control interfaces */
366      std::set<core::tEdgeAggregator*> sense_interfaces, control_interfaces;
367
368      // find tasks and classified interfaces
369      for (auto it = thread_container_element.SubElementsBegin(true); it != thread_container_element.SubElementsEnd(); ++it)
370      {
371        if ((!it->IsReady()) || tExecutionControl::Find(*it)->GetAnnotated<core::tFrameworkElement>() != &thread_container_element)    // don't handle elements in nested thread containers
372        {
373          continue;
374        }
375        tPeriodicFrameworkElementTask* task = it->GetAnnotation<tPeriodicFrameworkElementTask>();
376        if (task)
377        {
378          task->previous_tasks.clear();
379          task->next_tasks.clear();
380          task->task_classification = 0;
381          if (task->IsSenseTask())
382          {
383            task->task_classification = eSENSE_TASK;
384            sense_tasks.insert(task);
385            sense_interfaces.insert(task->incoming.begin(), task->incoming.end());
386            sense_interfaces.insert(task->outgoing.begin(), task->outgoing.end());
387          }
388          else if (task->IsControlTask())
389          {
390            task->task_classification = eCONTROL_TASK;
391            control_tasks.insert(task);
392            control_interfaces.insert(task->incoming.begin(), task->incoming.end());
393            control_interfaces.insert(task->outgoing.begin(), task->outgoing.end());
394          }
395          else
396          {
397            other_tasks.insert(task);
398          }
399        }
400
401        if (it->GetFlag(tFlag::INTERFACE))
402        {
403          if (it->GetFlag(tFlag::SENSOR_DATA))
404          {
405            sense_interfaces.insert(static_cast<core::tEdgeAggregator*>(&(*it)));
406          }
407          if (it->GetFlag(tFlag::CONTROLLER_DATA))
408          {
409            control_interfaces.insert(static_cast<core::tEdgeAggregator*>(&(*it)));
410          }
411        }
412      }
413
414      // classify tasks by flooding
415      std::vector<core::tEdgeAggregator*> trace; // trace we're currently following
416      {
417        int flag_to_check = 0;
418        std::function<void (tPeriodicFrameworkElementTask&)> function = [&](tPeriodicFrameworkElementTask & connected_task)
419        {
420          if ((connected_task.task_classification & (flag_to_check | eSENSE_TASK | eCONTROL_TASK)) == 0)
421          {
422            connected_task.task_classification |= flag_to_check;
423            bool reverse = (flag_to_check == eSENSE_DEPENDENCY || flag_to_check == eCONTROL_DEPENDENCY);
424            for (core::tEdgeAggregator * next : (reverse ? connected_task.incoming : connected_task.outgoing))
425            {
426              ForEachConnectedTask<IsSensorOrControllerInterface>(*next, trace, function, reverse);
427            }
428          }
429        };
430
431        for (core::tEdgeAggregator * interface : sense_interfaces)
432        {
433          flag_to_check = eSENSE_DEPENDENT;
434          ForEachConnectedTask<IsSensorOrControllerInterface>(*interface, trace, function, false);
435          flag_to_check = eSENSE_DEPENDENCY;
436          ForEachConnectedTask<IsSensorOrControllerInterface>(*interface, trace, function, true);
437        }
438        for (core::tEdgeAggregator * interface : control_interfaces)
439        {
440          flag_to_check = eCONTROL_DEPENDENT;
441          ForEachConnectedTask<IsSensorOrControllerInterface>(*interface, trace, function, false);
442          flag_to_check = eCONTROL_DEPENDENCY;
443          ForEachConnectedTask<IsSensorOrControllerInterface>(*interface, trace, function, true);
444        }
445      }
446
447      std::set<tPeriodicFrameworkElementTask*> other_tasks_copy = other_tasks;
448      for (tPeriodicFrameworkElementTask * other_task : other_tasks_copy)
449      {
450        bool sense_task = (other_task->task_classification & (eSENSE_DEPENDENCY | eSENSE_DEPENDENT)) == (eSENSE_DEPENDENCY | eSENSE_DEPENDENT);
451        bool control_task = (other_task->task_classification & (eCONTROL_DEPENDENCY | eCONTROL_DEPENDENT)) == (eCONTROL_DEPENDENCY | eCONTROL_DEPENDENT);
452        if (!(sense_task || control_task))
453        {
454          // max. two flags are possible - check all combinations
455          if ((other_task->task_classification & (eSENSE_DEPENDENCY | eCONTROL_DEPENDENCY)) == (eSENSE_DEPENDENCY | eCONTROL_DEPENDENCY))
456          {
457            initial_tasks.insert(other_task);
458            other_tasks.erase(other_task);
459            continue;
460          }
461          if ((other_task->task_classification & (eSENSE_DEPENDENT | eCONTROL_DEPENDENT)) == (eSENSE_DEPENDENT | eCONTROL_DEPENDENT))
462          {
463            continue;
464          }
465          if ((other_task->task_classification & (eSENSE_DEPENDENCY | eCONTROL_DEPENDENT)) == (eSENSE_DEPENDENCY | eCONTROL_DEPENDENT))
466          {
467            sense_task = true;
468          }
469          if ((other_task->task_classification & (eSENSE_DEPENDENT | eCONTROL_DEPENDENCY)) == (eSENSE_DEPENDENT | eCONTROL_DEPENDENCY))
470          {
471            control_task = true;
472          }
473        }
474        if (!(sense_task || control_task))
475        {
476          // max. one flag is possible
477          sense_task = other_task->task_classification & (eSENSE_DEPENDENCY | eSENSE_DEPENDENT);
478          control_task = other_task->task_classification & (eCONTROL_DEPENDENCY | eCONTROL_DEPENDENT);
479        }
480
481        if (sense_task || control_task)
482        {
483          other_tasks.erase(other_task);
484          if (sense_task)
485          {
486            sense_tasks.insert(other_task);
487          }
488          if (control_task)
489          {
490            control_tasks.insert(other_task);
491          }
492        }
493      }
494
495      /*! temporary variable for trace backs */
496      std::vector<tPeriodicFrameworkElementTask*> trace_back;
497
498      // create task graphs for the four relevant sets of tasks and schedule them
499      std::set<tPeriodicFrameworkElementTask*>* task_sets[4] = { &initial_tasks, &sense_tasks, &control_tasks, &other_tasks };
500      for (size_t i = 0; i < 4; i++)
501      {
502        trace.clear();
503        std::set<tPeriodicFrameworkElementTask*>& task_set = *task_sets[i];
504        auto task = task_set.begin();
505        std::function<void (tPeriodicFrameworkElementTask&)> function = [&](tPeriodicFrameworkElementTask & connected_task)
506        {
507          if (task_set.find(&connected_task) != task_set.end() &&
508              std::find((*task)->next_tasks.begin(), (*task)->next_tasks.end(), &connected_task) == (*task)->next_tasks.end())
509          {
510            (*task)->next_tasks.push_back(&connected_task);
511            connected_task.previous_tasks.push_back(*task);
512          }
513        };
514
515        // create task graph
516        for (; task != task_set.end(); ++task)
517        {
518          // trace outgoing connections to other elements in task set
519          for (auto it = (*task)->outgoing.begin(); it < (*task)->outgoing.end(); ++it)
520          {
521            if (i == 1)
522            {
523              ForEachConnectedTask<IsControllerInterface>(**it, trace, function, false);
524            }
525            else if (i == 2)
526            {
527              ForEachConnectedTask<IsSensorInterface>(**it, trace, function, false);
528            }
529            else
530            {
531              ForEachConnectedTask<AlwaysFalse>(**it, trace, function, false);
532            }
533          }
534        }
535
536        task_set_first_index[i] = schedule.size();
537
538        // now create schedule
539        while (task_set.size() > 0)
540        {
541          // do we have a task without previous tasks?
542          bool found = false;
543          for (auto it = task_set.begin(); it != task_set.end(); ++it)
544          {
545            tPeriodicFrameworkElementTask* task = *it;
546            if (task->previous_tasks.size() == 0)
547            {
548              schedule.push_back(task);
549              task_set.erase(task);
550              found = true;
551
552              // delete from next tasks' previous task list
553              for (auto next = task->next_tasks.begin(); next != task->next_tasks.end(); ++next)
554              {
555                (*next)->previous_tasks.erase(std::remove((*next)->previous_tasks.begin(), (*next)->previous_tasks.end(), task), (*next)->previous_tasks.end());
556              }
557              break;
558            }
559          }
560          if (found)
561          {
562            continue;
563          }
564
565          // ok, we didn't find task to continue with... (loop)
566          trace_back.clear();
567          tPeriodicFrameworkElementTask* current = *task_set.begin();
568          trace_back.push_back(current);
569          while (true)
570          {
571            bool end = true;
572            for (size_t i = 0u; i < current->previous_tasks.size(); i++)
573            {
574              tPeriodicFrameworkElementTask* prev = current->previous_tasks[i];
575              if (std::find(trace_back.begin(), trace_back.end(), prev) == trace_back.end())
576              {
577                end = false;
578                current = prev;
579                trace_back.push_back(current);
580                break;
581              }
582            }
583            if (end)
584            {
585              FINROC_LOG_PRINT(WARNING, "Detected loop:\n", CreateLoopDebugOutput(trace_back), "\nBreaking it up at '", current->previous_tasks[0]->GetLogDescription(), "' -> '", current->GetLogDescription(), "' (The latter will be executed before the former)");
586              schedule.push_back(current);
587              task_set.erase(current);
588
589              // delete from next tasks' previous task list
590              for (auto next = current->next_tasks.begin(); next != current->next_tasks.end(); ++next)
591              {
592                (*next)->previous_tasks.erase(std::remove((*next)->previous_tasks.begin(), (*next)->previous_tasks.end(), current), (*next)->previous_tasks.end());
593              }
594              break;
595            }
596          }
597        }
598      }
599
600      FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Created schedule in ", rrlib::time::ToIsoString(rrlib::time::Now() - start_time));
601      for (size_t i = 0; i < schedule.size(); ++i)
602      {
603        FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "  ", i, ": ", schedule[i]->GetLogDescription());
604      }
605    }
606  }
607
608  if (thread_container_base.pause_thread.Get())
609  {
610    rrlib::thread::tLock lock(thread_container_base.Mutex(), false);
611    if (lock.TryLock())  // this avoids dead-lock if ThreadContainerBase already wants to join; if it fails, thread can simply go to sleep on the next cycle
612    {
613      if (thread_container_base.pause_thread.Get())
614      {
615        this->PauseThread();
616      }
617    }
618    return;
619  }
620
621  // execute tasks
622  tWatchDogTask::SetDeadLine(rrlib::time::Now() + GetCycleTime() * 4 + std::chrono::seconds(4));
623
624  if (thread_container_base.execution_details.GetWrapped() == nullptr || execution_count == 0) // we skip profiling the first/initial execution
625  {
626    current_cycle_start_application_time = IsUsingApplicationTime() && this->IsAlive() ? tLoopThread::GetCurrentCycleStartTime() : rrlib::time::Now();
627
628    thread_container_base.execution_duration.Publish(GetLastCycleTime());
629    for (size_t i = 0u; i < schedule.size(); i++)
630    {
631      current_task = schedule[i];
632      //FINROC_LOG_PRINT(DEBUG_WARNING, "Executing ", current_task->GetLogDescription());
633      current_task->task.ExecuteTask();
634    }
635    execution_count++;
636  }
637  else
638  {
639    data_ports::tPortDataPointer<std::vector<tTaskProfile>> details = thread_container_base.execution_details.GetUnusedBuffer();
640    details->resize(schedule.size() + 1);
641    rrlib::time::tTimestamp start = rrlib::time::Now(true);
642    current_cycle_start_application_time = start;
643
644    for (size_t i = 0u; i < schedule.size(); i++)
645    {
646      current_task = schedule[i];
647      rrlib::time::tTimestamp task_start = rrlib::time::Now(true);
648      current_task->task.ExecuteTask();
649      rrlib::time::tDuration task_duration = rrlib::time::Now(true) - task_start;
650
651      // Update internal task statistics
652      current_task->total_execution_duration += task_duration;
653      current_task->execution_count++;
654      current_task->max_execution_duration = std::max(task_duration, current_task->max_execution_duration);
655
656      // Fill task profile to publish
657      tTaskProfile& task_profile = (*details)[i + 1];
658      task_profile.handle = current_task->GetAnnotated<core::tFrameworkElement>()->GetHandle();
659      task_profile.last_execution_duration = task_duration;
660      task_profile.max_execution_duration = current_task->max_execution_duration;
661      task_profile.average_execution_duration = rrlib::time::tDuration(current_task->total_execution_duration.count() / current_task->execution_count);
662      task_profile.total_execution_duration = current_task->total_execution_duration;
663      task_profile.task_classification = tTaskClassification::OTHER;
664    }
665
666    // Set classification
667    for (size_t i = task_set_first_index[1]; i < task_set_first_index[2]; i++)
668    {
669      (*details)[i + 1].task_classification = tTaskClassification::SENSE;  // +1, because first task is at index 1
670    }
671    for (size_t i = task_set_first_index[2]; i < task_set_first_index[3]; i++)
672    {
673      (*details)[i + 1].task_classification = tTaskClassification::CONTROL;
674    }
675
676
677    // Update thread statistics
678    rrlib::time::tDuration duration = rrlib::time::Now(true) - start;
679    this->total_execution_duration += duration;
680    this->execution_count++;
681    this->max_execution_duration = std::max(duration, this->max_execution_duration);
682
683    // Fill thread profile to publish
684    tTaskProfile& profile = (*details)[0];
685    profile.handle = thread_container_element.GetHandle();
686    profile.last_execution_duration = duration;
687    profile.max_execution_duration = this->max_execution_duration;
688    assert(execution_count > 1);
689    profile.average_execution_duration = rrlib::time::tDuration(this->total_execution_duration.count() / (this->execution_count - 1)); // we did not include initial execution for profile statistics
690    profile.total_execution_duration = this->total_execution_duration;
691
692    // Publish profiling information
693    for (size_t i = 0u; i < schedule.size(); i++)
694    {
695      if (schedule[i]->execution_duration.GetWrapped())
696      {
697        schedule[i]->execution_duration.Publish((*details)[i + 1].last_execution_duration);
698      }
699    }
700    thread_container_base.execution_duration.Publish(duration);
701    thread_container_base.execution_details.Publish(details);
702  }
703
704  tWatchDogTask::Deactivate();
705}
706
707void tThreadContainerThread::OnConnectorChange(core::tRuntimeListener::tEvent change_type, core::tConnector& connector)
708{
709  if (connector.Source().IsChildOf(this->thread_container_element) && connector.Destination().IsChildOf(this->thread_container_element))
710  {
711    reschedule = true;
712  }
713}
714
715void tThreadContainerThread::OnFrameworkElementChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element)
716{
717  if (element.GetAnnotation<tPeriodicFrameworkElementTask>() && element.IsChildOf(this->thread_container_element, true))
718  {
719    reschedule = true;
720  }
721}
722
723void tThreadContainerThread::OnUriConnectorChange(core::tRuntimeListener::tEvent change_type, core::tUriConnector& connector)
724{
725}
726
727void tThreadContainerThread::Run()
728{
729  tLoopThread::Run();
730}
731
732//----------------------------------------------------------------------
733// End of namespace declaration
734//----------------------------------------------------------------------
735}
736}
Note: See TracBrowser for help on using the repository browser.