source: finroc_plugins_scheduling/tThreadContainerThread.cpp @ 44:05abba900db9

17.03
Last change on this file since 44:05abba900db9 was 44:05abba900db9, checked in by Max Reichardt <max.reichardt@…>, 4 months ago

Merge with 14.08

File size: 27.3 KB
Line 
1//
2// You received this file as part of Finroc
3// A framework for intelligent robot control
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    plugins/scheduling/tThreadContainerThread.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2012-12-02
27 *
28 */
29//----------------------------------------------------------------------
30#include "plugins/scheduling/tThreadContainerThread.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include "core/tRuntimeEnvironment.h"
36#include "core/port/tAggregatedEdge.h"
37#include <set>
38
39//----------------------------------------------------------------------
40// Internal includes with ""
41//----------------------------------------------------------------------
42#include "plugins/scheduling/tExecutionControl.h"
43#include "plugins/scheduling/tPeriodicFrameworkElementTask.h"
44
45//----------------------------------------------------------------------
46// Debugging
47//----------------------------------------------------------------------
48#include <cassert>
49
50//----------------------------------------------------------------------
51// Namespace usage
52//----------------------------------------------------------------------
53
54//----------------------------------------------------------------------
55// Namespace declaration
56//----------------------------------------------------------------------
57namespace finroc
58{
59namespace scheduling
60{
61
62//----------------------------------------------------------------------
63// Forward declarations / typedefs / enums
64//----------------------------------------------------------------------
65
66/*! Flags used for storing information in tPeriodicFrameworkElementTask::task_classification */
67enum tTaskClassificationFlag
68{
69  eSENSE_TASK = 1,
70  eSENSE_DEPENDENCY = 2,
71  eSENSE_DEPENDENT = 4,
72  eCONTROL_TASK = 8,
73  eCONTROL_DEPENDENCY = 16,
74  eCONTROL_DEPENDENT = 32
75};
76
77typedef core::tFrameworkElement::tFlag tFlag;
78
79//----------------------------------------------------------------------
80// Const values
81//----------------------------------------------------------------------
82#define FINROC_PORT_BASED_SCHEDULING
83
84//----------------------------------------------------------------------
85// Implementation
86//----------------------------------------------------------------------
87
88tThreadContainerThread* tThreadContainerThread::single_thread_container = nullptr;
89
90// Abort predicates for tThreadContainerThread::ForEachConnectedTask()
91static bool IsSensorInterface(core::tEdgeAggregator& ea)
92{
93  return ea.GetFlag(core::tFrameworkElement::tFlag::SENSOR_DATA);
94}
95static bool IsControllerInterface(core::tEdgeAggregator& ea)
96{
97  return ea.GetFlag(core::tFrameworkElement::tFlag::CONTROLLER_DATA);
98}
99static bool IsSensorOrControllerInterface(core::tEdgeAggregator& ea)
100{
101  return IsSensorInterface(ea) || IsControllerInterface(ea);
102}
103static bool AlwaysFalse(core::tEdgeAggregator& ea)
104{
105  return false;
106}
107
108/*!
109 * \param fe Framework element
110 * \return Is framework element an interface?
111 */
112static inline bool IsInterface(core::tFrameworkElement& fe)
113{
114  return fe.GetFlag(tFlag::EDGE_AGGREGATOR) || fe.GetFlag(tFlag::INTERFACE);
115}
116
117
118tThreadContainerThread::tThreadContainerThread(core::tFrameworkElement& thread_container, rrlib::time::tDuration default_cycle_time,
119    bool warn_on_cycle_time_exceed, data_ports::tOutputPort<rrlib::time::tDuration> execution_duration,
120    data_ports::tOutputPort<std::vector<tTaskProfile>> execution_details) :
121  tLoopThread(default_cycle_time, true, warn_on_cycle_time_exceed),
122  tWatchDogTask(true),
123  thread_container(thread_container),
124  reschedule(true),
125  schedule(),
126  task_set_first_index { 0, 0, 0, 0 },
127                     execution_duration(execution_duration),
128                     execution_details(execution_details),
129                     total_execution_duration(0),
130                     max_execution_duration(0),
131                     execution_count(0),
132                     current_task(NULL),
133                     current_cycle_start_application_time(rrlib::time::cNO_TIME)
134{
135  this->SetName("ThreadContainer " + thread_container.GetName());
136  this->thread_container.GetRuntime().AddListener(*this);
137#ifdef RRLIB_SINGLE_THREADED
138  assert(single_thread_container == nullptr);
139  single_thread_container = this;
140#endif
141}
142
143tThreadContainerThread::~tThreadContainerThread()
144{
145  this->thread_container.GetRuntime().RemoveListener(*this);
146  single_thread_container = nullptr;
147}
148
149std::string tThreadContainerThread::CreateLoopDebugOutput(const std::vector<tPeriodicFrameworkElementTask*>& task_list)
150{
151  std::ostringstream stream;
152  for (auto it = task_list.rbegin(); it != task_list.rend(); ++it)
153  {
154    stream << (it != task_list.rbegin() ? "-> " : "   ");
155    stream << (*it)->GetLogDescription() << std::endl;
156    for (auto next = (*it)->next_tasks.begin(); next != (*it)->next_tasks.end(); ++next)
157    {
158      if (*next == *task_list.rbegin())
159      {
160        stream << "-> " << (*next)->GetLogDescription();
161        return stream.str();
162      }
163    }
164  }
165  return "ERROR";
166}
167
168template <typename T1, typename T2>
169void Increment(bool trace_reverse, T1& it_incoming, T2& it_outgoing)
170{
171  if (trace_reverse)
172  {
173    it_incoming++;
174  }
175  else
176  {
177    it_outgoing++;
178  }
179}
180
181template <bool (ABORT_PREDICATE)(core::tEdgeAggregator&), class TFunction>
182void tThreadContainerThread::ForEachConnectedTask(core::tEdgeAggregator& origin, std::vector<core::tEdgeAggregator*>& trace, TFunction& function, bool trace_reverse)
183{
184  // Add to trace stack
185  trace.push_back(&origin);
186
187#ifdef FINROC_PORT_BASED_SCHEDULING
188  for (auto it = origin.ChildPortsBegin(); it != origin.ChildPortsEnd(); ++it)
189  {
190    ForEachConnectedTask<ABORT_PREDICATE, TFunction>(*it, trace, function, trace_reverse);
191  }
192
193  // remove from trace stack
194  assert(trace[trace.size() - 1] == &origin);
195  trace.pop_back();
196}
197
198template <bool (ABORT_PREDICATE)(core::tEdgeAggregator&), class TFunction>
199void tThreadContainerThread::ForEachConnectedTask(core::tAbstractPort& origin, std::vector<core::tEdgeAggregator*>& trace, TFunction& function, bool trace_reverse)
200{
201#endif
202
203  auto it_incoming = origin.IncomingConnectionsBegin();
204  auto end_incoming = origin.IncomingConnectionsEnd();
205  auto it_outgoing = origin.OutgoingConnectionsBegin();
206  auto end_outgoing = origin.OutgoingConnectionsEnd();
207  for (; trace_reverse ? it_incoming != end_incoming : it_outgoing != end_outgoing; Increment(trace_reverse, it_incoming, it_outgoing))
208  {
209#ifdef FINROC_PORT_BASED_SCHEDULING
210    core::tAbstractPort& dest_port = trace_reverse ? it_incoming->Source() : it_outgoing->Destination();
211    core::tEdgeAggregator* dest_aggregator = core::tEdgeAggregator::GetAggregator(dest_port);
212#else
213    core::tAggregatedEdge& aggregated_edge = trace_reverse ? **it_incoming : **it_outgoing;
214    core::tEdgeAggregator* dest_aggregator = trace_reverse ? &aggregated_edge.source : &aggregated_edge.destination;
215#endif
216    tExecutionControl* execution_control = dest_aggregator ? tExecutionControl::Find(*dest_aggregator) : nullptr;
217    if ((!execution_control) || execution_control->GetAnnotated<core::tFrameworkElement>() != &thread_container)
218    {
219      continue;
220    }
221    core::tEdgeAggregator& dest = *dest_aggregator;
222    if (ABORT_PREDICATE(dest))
223    {
224      continue;
225    }
226
227    if (std::find(trace.begin(), trace.end(), &dest) == trace.end())
228    {
229      // Have we reached another task?
230      tPeriodicFrameworkElementTask* connected_task = dest.GetAnnotation<tPeriodicFrameworkElementTask>();
231      if (connected_task == NULL && IsInterface(dest))
232      {
233        connected_task = dest.GetParent()->GetAnnotation<tPeriodicFrameworkElementTask>();
234      }
235      if (connected_task == NULL && trace_reverse && IsInterface(dest))
236      {
237        for (auto child = dest.GetParent()->ChildrenBegin(); child != dest.GetParent()->ChildrenEnd(); ++child)
238        {
239          if (IsInterface(*child))
240          {
241            tPeriodicFrameworkElementTask* task_to_test = child->template GetAnnotation<tPeriodicFrameworkElementTask>();
242            if (task_to_test && std::find(task_to_test->outgoing.begin(), task_to_test->outgoing.end(), &dest) != task_to_test->outgoing.end())
243            {
244              connected_task = task_to_test;
245              break;
246            }
247          }
248        }
249      }
250      if (connected_task)
251      {
252        function(*connected_task);
253        continue;
254      }
255
256      // continue from this edge aggregator
257      auto it_next = trace_reverse ? dest.IncomingConnectionsBegin() : dest.OutgoingConnectionsBegin();
258      auto end_next = trace_reverse ? dest.IncomingConnectionsEnd() : dest.OutgoingConnectionsEnd();
259      if (it_next != end_next) // not empty?
260      {
261#ifdef FINROC_PORT_BASED_SCHEDULING
262        trace.push_back(&dest);
263        ForEachConnectedTask<ABORT_PREDICATE, TFunction>(dest_port, trace, function, trace_reverse);
264        trace.pop_back();
265#else
266        ForEachConnectedTask<ABORT_PREDICATE, TFunction>(dest, trace, function, trace_reverse);
267#endif
268      }
269      else if (IsModuleInputInterface(dest)) // in case we have a module with event-triggered execution (and, hence, no periodic task)
270      {
271        core::tFrameworkElement* parent = dest.GetParent();
272        if (parent->GetFlag(tFlag::EDGE_AGGREGATOR))
273        {
274          core::tEdgeAggregator* ea = static_cast<core::tEdgeAggregator*>(parent);
275          if (std::find(trace.begin(), trace.end(), ea) == trace.end())
276          {
277            ForEachConnectedTask<ABORT_PREDICATE, TFunction>(*ea, trace, function, trace_reverse);
278          }
279        }
280        // if we have e.g. an sensor input interface, only continue with sensor output
281        uint required_flags = dest.GetAllFlags().Raw() & (tFlag::SENSOR_DATA | tFlag::CONTROLLER_DATA).Raw();
282        required_flags |= (tFlag::READY | tFlag::EDGE_AGGREGATOR | tFlag::INTERFACE).Raw();
283        for (auto it = parent->ChildrenBegin(); it != parent->ChildrenEnd(); ++it)
284        {
285          if ((it->GetAllFlags().Raw() & required_flags) == required_flags)
286          {
287            core::tEdgeAggregator& ea = static_cast<core::tEdgeAggregator&>(*it);
288            if (std::find(trace.begin(), trace.end(), &ea) == trace.end())
289            {
290              ForEachConnectedTask<ABORT_PREDICATE, TFunction>(ea, trace, function, trace_reverse);
291            }
292          }
293        }
294      }
295    }
296  }
297
298#ifndef FINROC_PORT_BASED_SCHEDULING
299  // remove from trace stack
300  assert(trace[trace.size() - 1] == &origin);
301  trace.pop_back();
302#endif
303}
304
305std::string tThreadContainerThread::GetCurrentTaskName() const
306{
307  tPeriodicFrameworkElementTask* task = current_task;
308  if (!task)
309  {
310    return "";
311  }
312  std::stringstream stream;
313  stream << *(task->incoming.size() > 0 ? task->incoming[0] : task->GetAnnotated<core::tFrameworkElement>());
314  return stream.str();
315}
316
317void tThreadContainerThread::HandleWatchdogAlert()
318{
319  std::string stuck_name = GetCurrentTaskName();
320  if (stuck_name.empty())
321  {
322    FINROC_LOG_PRINT(ERROR, "Got stuck without executing any task!? This should not happen.");
323  }
324  else
325  {
326    FINROC_LOG_PRINT(ERROR, "Got stuck executing task associated with '", stuck_name, "'. Please check your code for infinite loops etc.!");
327  }
328  tWatchDogTask::Deactivate();
329}
330
331bool tThreadContainerThread::IsModuleInputInterface(core::tFrameworkElement& fe)
332{
333  if (IsInterface(fe))
334  {
335    uint port_count = 0;
336    uint pure_input_port_count = 0;
337    for (auto it = fe.ChildPortsBegin(); it != fe.ChildPortsEnd(); ++it)
338    {
339      if (data_ports::IsDataFlowType(it->GetDataType()))
340      {
341        port_count++;
342        if (it->GetFlag(tFlag::ACCEPTS_DATA) && (!it->GetFlag(tFlag::EMITS_DATA)))
343        {
344          pure_input_port_count++;
345        }
346      }
347    }
348    return (2 * pure_input_port_count) >= port_count; // heuristic: min. 50% of ports are pure input ports
349  }
350  return false;
351}
352
353void tThreadContainerThread::MainLoopCallback()
354{
355  if (reschedule)
356  {
357    // TODO: this rescheduling implementation leads to unpredictable delays (scheduling could be performed by another thread)
358    reschedule = false;
359    {
360      tLock lock(this->thread_container.GetStructureMutex());
361      schedule.clear();
362      rrlib::time::tTimestamp start_time = rrlib::time::Now();
363
364      /*! Sets of tasks that need to be scheduled */
365      std::set<tPeriodicFrameworkElementTask*> sense_tasks, control_tasks, initial_tasks, other_tasks;
366
367      /*! Sense and control interfaces */
368      std::set<core::tEdgeAggregator*> sense_interfaces, control_interfaces;
369
370      // find tasks and classified interfaces
371      for (auto it = thread_container.SubElementsBegin(true); it != thread_container.SubElementsEnd(); ++it)
372      {
373        if ((!it->IsReady()) || tExecutionControl::Find(*it)->GetAnnotated<core::tFrameworkElement>() != &thread_container)    // don't handle elements in nested thread containers
374        {
375          continue;
376        }
377        tPeriodicFrameworkElementTask* task = it->GetAnnotation<tPeriodicFrameworkElementTask>();
378        if (task)
379        {
380          task->previous_tasks.clear();
381          task->next_tasks.clear();
382          task->task_classification = 0;
383          if (task->IsSenseTask())
384          {
385            task->task_classification = eSENSE_TASK;
386            sense_tasks.insert(task);
387            sense_interfaces.insert(task->incoming.begin(), task->incoming.end());
388            sense_interfaces.insert(task->outgoing.begin(), task->outgoing.end());
389          }
390          else if (task->IsControlTask())
391          {
392            task->task_classification = eCONTROL_TASK;
393            control_tasks.insert(task);
394            control_interfaces.insert(task->incoming.begin(), task->incoming.end());
395            control_interfaces.insert(task->outgoing.begin(), task->outgoing.end());
396          }
397          else
398          {
399            other_tasks.insert(task);
400          }
401        }
402
403        if (it->GetFlag(tFlag::INTERFACE))
404        {
405          if (it->GetFlag(tFlag::SENSOR_DATA))
406          {
407            sense_interfaces.insert(static_cast<core::tEdgeAggregator*>(&(*it)));
408          }
409          if (it->GetFlag(tFlag::CONTROLLER_DATA))
410          {
411            control_interfaces.insert(static_cast<core::tEdgeAggregator*>(&(*it)));
412          }
413        }
414      }
415
416      // classify tasks by flooding
417      std::vector<core::tEdgeAggregator*> trace; // trace we're currently following
418      {
419        int flag_to_check = 0;
420        std::function<void (tPeriodicFrameworkElementTask&)> function = [&](tPeriodicFrameworkElementTask & connected_task)
421        {
422          if ((connected_task.task_classification & (flag_to_check | eSENSE_TASK | eCONTROL_TASK)) == 0)
423          {
424            connected_task.task_classification |= flag_to_check;
425            bool reverse = (flag_to_check == eSENSE_DEPENDENCY || flag_to_check == eCONTROL_DEPENDENCY);
426            for (core::tEdgeAggregator * next : (reverse ? connected_task.incoming : connected_task.outgoing))
427            {
428              ForEachConnectedTask<IsSensorOrControllerInterface>(*next, trace, function, reverse);
429            }
430          }
431        };
432
433        for (core::tEdgeAggregator * interface : sense_interfaces)
434        {
435          flag_to_check = eSENSE_DEPENDENT;
436          ForEachConnectedTask<IsSensorOrControllerInterface>(*interface, trace, function, false);
437          flag_to_check = eSENSE_DEPENDENCY;
438          ForEachConnectedTask<IsSensorOrControllerInterface>(*interface, trace, function, true);
439        }
440        for (core::tEdgeAggregator * interface : control_interfaces)
441        {
442          flag_to_check = eCONTROL_DEPENDENT;
443          ForEachConnectedTask<IsSensorOrControllerInterface>(*interface, trace, function, false);
444          flag_to_check = eCONTROL_DEPENDENCY;
445          ForEachConnectedTask<IsSensorOrControllerInterface>(*interface, trace, function, true);
446        }
447      }
448
449      std::set<tPeriodicFrameworkElementTask*> other_tasks_copy = other_tasks;
450      for (tPeriodicFrameworkElementTask * other_task : other_tasks_copy)
451      {
452        bool sense_task = (other_task->task_classification & (eSENSE_DEPENDENCY | eSENSE_DEPENDENT)) == (eSENSE_DEPENDENCY | eSENSE_DEPENDENT);
453        bool control_task = (other_task->task_classification & (eCONTROL_DEPENDENCY | eCONTROL_DEPENDENT)) == (eCONTROL_DEPENDENCY | eCONTROL_DEPENDENT);
454        if (!(sense_task || control_task))
455        {
456          // max. two flags are possible - check all combinations
457          if ((other_task->task_classification & (eSENSE_DEPENDENCY | eCONTROL_DEPENDENCY)) == (eSENSE_DEPENDENCY | eCONTROL_DEPENDENCY))
458          {
459            initial_tasks.insert(other_task);
460            other_tasks.erase(other_task);
461            continue;
462          }
463          if ((other_task->task_classification & (eSENSE_DEPENDENT | eCONTROL_DEPENDENT)) == (eSENSE_DEPENDENT | eCONTROL_DEPENDENT))
464          {
465            continue;
466          }
467          if ((other_task->task_classification & (eSENSE_DEPENDENCY | eCONTROL_DEPENDENT)) == (eSENSE_DEPENDENCY | eCONTROL_DEPENDENT))
468          {
469            sense_task = true;
470          }
471          if ((other_task->task_classification & (eSENSE_DEPENDENT | eCONTROL_DEPENDENCY)) == (eSENSE_DEPENDENT | eCONTROL_DEPENDENCY))
472          {
473            control_task = true;
474          }
475        }
476        if (!(sense_task || control_task))
477        {
478          // max. one flag is possible
479          sense_task = other_task->task_classification & (eSENSE_DEPENDENCY | eSENSE_DEPENDENT);
480          control_task = other_task->task_classification & (eCONTROL_DEPENDENCY | eCONTROL_DEPENDENT);
481        }
482
483        if (sense_task || control_task)
484        {
485          other_tasks.erase(other_task);
486          if (sense_task)
487          {
488            sense_tasks.insert(other_task);
489          }
490          if (control_task)
491          {
492            control_tasks.insert(other_task);
493          }
494        }
495      }
496
497      /*! temporary variable for trace backs */
498      std::vector<tPeriodicFrameworkElementTask*> trace_back;
499
500      // create task graphs for the four relevant sets of tasks and schedule them
501      std::set<tPeriodicFrameworkElementTask*>* task_sets[4] = { &initial_tasks, &sense_tasks, &control_tasks, &other_tasks };
502      for (size_t i = 0; i < 4; i++)
503      {
504        trace.clear();
505        std::set<tPeriodicFrameworkElementTask*>& task_set = *task_sets[i];
506        auto task = task_set.begin();
507        std::function<void (tPeriodicFrameworkElementTask&)> function = [&](tPeriodicFrameworkElementTask & connected_task)
508        {
509          if (task_set.find(&connected_task) != task_set.end() &&
510              std::find((*task)->next_tasks.begin(), (*task)->next_tasks.end(), &connected_task) == (*task)->next_tasks.end())
511          {
512            (*task)->next_tasks.push_back(&connected_task);
513            connected_task.previous_tasks.push_back(*task);
514          }
515        };
516
517        // create task graph
518        for (; task != task_set.end(); ++task)
519        {
520          // trace outgoing connections to other elements in task set
521          for (auto it = (*task)->outgoing.begin(); it < (*task)->outgoing.end(); ++it)
522          {
523            if (i == 1)
524            {
525              ForEachConnectedTask<IsControllerInterface>(**it, trace, function, false);
526            }
527            else if (i == 2)
528            {
529              ForEachConnectedTask<IsSensorInterface>(**it, trace, function, false);
530            }
531            else
532            {
533              ForEachConnectedTask<AlwaysFalse>(**it, trace, function, false);
534            }
535          }
536        }
537
538        task_set_first_index[i] = schedule.size();
539
540        // now create schedule
541        while (task_set.size() > 0)
542        {
543          // do we have a task without previous tasks?
544          bool found = false;
545          for (auto it = task_set.begin(); it != task_set.end(); ++it)
546          {
547            tPeriodicFrameworkElementTask* task = *it;
548            if (task->previous_tasks.size() == 0)
549            {
550              schedule.push_back(task);
551              task_set.erase(task);
552              found = true;
553
554              // delete from next tasks' previous task list
555              for (auto next = task->next_tasks.begin(); next != task->next_tasks.end(); ++next)
556              {
557                (*next)->previous_tasks.erase(std::remove((*next)->previous_tasks.begin(), (*next)->previous_tasks.end(), task), (*next)->previous_tasks.end());
558              }
559              break;
560            }
561          }
562          if (found)
563          {
564            continue;
565          }
566
567          // ok, we didn't find task to continue with... (loop)
568          trace_back.clear();
569          tPeriodicFrameworkElementTask* current = *task_set.begin();
570          trace_back.push_back(current);
571          while (true)
572          {
573            bool end = true;
574            for (size_t i = 0u; i < current->previous_tasks.size(); i++)
575            {
576              tPeriodicFrameworkElementTask* prev = current->previous_tasks[i];
577              if (std::find(trace_back.begin(), trace_back.end(), prev) == trace_back.end())
578              {
579                end = false;
580                current = prev;
581                trace_back.push_back(current);
582                break;
583              }
584            }
585            if (end)
586            {
587              FINROC_LOG_PRINT(WARNING, "Detected loop:\n", CreateLoopDebugOutput(trace_back), "\nBreaking it up at '", current->previous_tasks[0]->GetLogDescription(), "' -> '", current->GetLogDescription(), "' (The latter will be executed before the former)");
588              schedule.push_back(current);
589              task_set.erase(current);
590
591              // delete from next tasks' previous task list
592              for (auto next = current->next_tasks.begin(); next != current->next_tasks.end(); ++next)
593              {
594                (*next)->previous_tasks.erase(std::remove((*next)->previous_tasks.begin(), (*next)->previous_tasks.end(), current), (*next)->previous_tasks.end());
595              }
596              break;
597            }
598          }
599        }
600      }
601
602      FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Created schedule in ", rrlib::time::ToIsoString(rrlib::time::Now() - start_time));
603      for (size_t i = 0; i < schedule.size(); ++i)
604      {
605        FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "  ", i, ": ", schedule[i]->GetLogDescription());
606      }
607    }
608  }
609
610  // execute tasks
611  SetDeadLine(rrlib::time::Now() + GetCycleTime() * 4 + std::chrono::seconds(4));
612
613  if (execution_details.GetWrapped() == nullptr || execution_count == 0) // we skip profiling the first/initial execution
614  {
615    current_cycle_start_application_time = IsUsingApplicationTime() && this->IsAlive() ? tLoopThread::GetCurrentCycleStartTime() : rrlib::time::Now();
616
617    execution_duration.Publish(GetLastCycleTime());
618    for (size_t i = 0u; i < schedule.size(); i++)
619    {
620      current_task = schedule[i];
621      //FINROC_LOG_PRINT(DEBUG_WARNING, "Executing ", current_task->GetLogDescription());
622      current_task->task.ExecuteTask();
623    }
624    execution_count++;
625  }
626  else
627  {
628    data_ports::tPortDataPointer<std::vector<tTaskProfile>> details = execution_details.GetUnusedBuffer();
629    details->resize(schedule.size() + 1);
630    rrlib::time::tTimestamp start = rrlib::time::Now(true);
631    current_cycle_start_application_time = start;
632
633    for (size_t i = 0u; i < schedule.size(); i++)
634    {
635      current_task = schedule[i];
636      rrlib::time::tTimestamp task_start = rrlib::time::Now(true);
637      current_task->task.ExecuteTask();
638      rrlib::time::tDuration task_duration = rrlib::time::Now(true) - task_start;
639
640      // Update internal task statistics
641      current_task->total_execution_duration += task_duration;
642      current_task->execution_count++;
643      current_task->max_execution_duration = std::max(task_duration, current_task->max_execution_duration);
644
645      // Fill task profile to publish
646      tTaskProfile& task_profile = (*details)[i + 1];
647      task_profile.handle = current_task->GetAnnotated<core::tFrameworkElement>()->GetHandle();
648      task_profile.last_execution_duration = task_duration;
649      task_profile.max_execution_duration = current_task->max_execution_duration;
650      task_profile.average_execution_duration = rrlib::time::tDuration(current_task->total_execution_duration.count() / current_task->execution_count);
651      task_profile.total_execution_duration = current_task->total_execution_duration;
652      task_profile.task_classification = tTaskClassification::OTHER;
653    }
654
655    // Set classification
656    for (size_t i = task_set_first_index[1]; i < task_set_first_index[2]; i++)
657    {
658      (*details)[i + 1].task_classification = tTaskClassification::SENSE;  // +1, because first task is at index 1
659    }
660    for (size_t i = task_set_first_index[2]; i < task_set_first_index[3]; i++)
661    {
662      (*details)[i + 1].task_classification = tTaskClassification::CONTROL;
663    }
664
665
666    // Update thread statistics
667    rrlib::time::tDuration duration = rrlib::time::Now(true) - start;
668    this->total_execution_duration += duration;
669    this->execution_count++;
670    this->max_execution_duration = std::max(duration, this->max_execution_duration);
671
672    // Fill thread profile to publish
673    tTaskProfile& profile = (*details)[0];
674    profile.handle = thread_container.GetHandle();
675    profile.last_execution_duration = duration;
676    profile.max_execution_duration = this->max_execution_duration;
677    assert(execution_count > 1);
678    profile.average_execution_duration = rrlib::time::tDuration(this->total_execution_duration.count() / (this->execution_count - 1)); // we did not include initial execution for profile statistics
679    profile.total_execution_duration = this->total_execution_duration;
680
681    // Publish profiling information
682    for (size_t i = 0u; i < schedule.size(); i++)
683    {
684      if (schedule[i]->execution_duration.GetWrapped())
685      {
686        schedule[i]->execution_duration.Publish((*details)[i + 1].last_execution_duration);
687      }
688    }
689    execution_duration.Publish(duration);
690    execution_details.Publish(details);
691  }
692
693  tWatchDogTask::Deactivate();
694}
695
696void tThreadContainerThread::OnConnectorChange(core::tRuntimeListener::tEvent change_type, core::tConnector& connector)
697{
698  if (connector.Source().IsChildOf(this->thread_container) && connector.Destination().IsChildOf(this->thread_container))
699  {
700    reschedule = true;
701  }
702}
703
704void tThreadContainerThread::OnFrameworkElementChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element)
705{
706  if (element.GetAnnotation<tPeriodicFrameworkElementTask>() && element.IsChildOf(this->thread_container, true))
707  {
708    reschedule = true;
709  }
710}
711
712void tThreadContainerThread::OnUriConnectorChange(core::tRuntimeListener::tEvent change_type, core::tUriConnector& connector)
713{
714}
715
716void tThreadContainerThread::Run()
717{
718  tLoopThread::Run();
719}
720
721//----------------------------------------------------------------------
722// End of namespace declaration
723//----------------------------------------------------------------------
724}
725}
Note: See TracBrowser for help on using the repository browser.