Skip to content

Commit

Permalink
improved latency
Browse files Browse the repository at this point in the history
  • Loading branch information
filippobrizzi committed Jul 21, 2016
1 parent 5d40a74 commit c395481
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 10 deletions.
4 changes: 2 additions & 2 deletions core/include/coco/connection_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class ConnectionDataL : public ConnectionT<T>
if (latency_time > 0)
{
this->input_->task()->engine()->setLatencyTime(latency_time);
this->output_->task()->engine()->setLatencyTime(-1);
}

return NEW_DATA;
Expand Down Expand Up @@ -121,8 +120,9 @@ class ConnectionDataL : public ConnectionT<T>
}
// trigger if the input port is an event port
if (this->input()->isEvent() &&
old_status != NEW_DATA )
old_status != NEW_DATA )
{
//std::cout << this->output_->task()->instantiationName() << " triggering " << this->input_->task()->instantiationName() << std::endl;
this->trigger();
}

Expand Down
1 change: 1 addition & 0 deletions core/include/coco/execution.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ class ExecutionEngine: public RunnableInterface

std::atomic<double> latency_start_time_ = {-1};
std::vector<double> latency_time_;
public:
bool latency_source_ = false;
bool latency_target_ = false;
};
Expand Down
17 changes: 14 additions & 3 deletions core/include/coco/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ class TaskContext : public Service
*/
void resetTimeStatistics();

void setTaskLatencySource();
void setTaskLatencyTarget();


protected:
friend class ExecutionEngine;
Expand Down Expand Up @@ -580,6 +583,7 @@ class TaskContext : public Service
friend class ConnectionBufferU;
template <class T>
friend class ConnectionBufferLF;
friend class PeerTask;
/*! \brief Pass to the task the pointer to the activity using it.
* This is usefull for propagating trigger from port to activity.
* \param activity The pointer to the activity.
Expand All @@ -598,7 +602,7 @@ class TaskContext : public Service
/*! \brief Return the \ref ExecutionEngine owing the task.
* \return A shared pointer to the engine object owing the task.
*/
std::shared_ptr<ExecutionEngine> engine() const { return engine_; }
virtual std::shared_ptr<ExecutionEngine> engine() const;
/*! \brief Set the current state of the task.
* \param state The state.
*/
Expand Down Expand Up @@ -634,15 +638,22 @@ class PeerTask : public TaskContext
void init() {}
void onUpdate() {}

virtual uint32_t actvityId() const override;
virtual uint32_t actvityId() const final;
/*!
* \brief Used to retreive the task containing the current peer
* @return Pointer to the task containing this peer
*/
std::shared_ptr<TaskContext> fatherTask() { return father_; }
private:
virtual std::shared_ptr<ExecutionEngine> engine() const final;
private:
friend class GraphLoader;

std::shared_ptr<TaskContext> father_;
};

/*!
* \return Wheter the task is a peer
* @return Wheter the task is a peer
*/
inline bool isPeer(std::shared_ptr<TaskContext> task)
{
Expand Down
11 changes: 10 additions & 1 deletion core/src/execution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ void ExecutionEngine::step()

if (ComponentRegistry::profilingEnabled())
{
if (latency_source_ && latency_start_time_ < 0)
//if (latency_source_ && latency_start_time_ < 0)
if (latency_source_)
latency_start_time_ = util::time();

timer_.start();
Expand All @@ -357,6 +358,14 @@ void ExecutionEngine::step()
{
latency_time_.push_back(util::time() - latency_start_time_);
latency_start_time_ = -1;


static int count = 0;
if (count % 10 == 0)
{
double sum = std::accumulate(latency_time_.begin(), latency_time_.end(), 0);
COCO_LOG(1) << "Latency is : " << sum / latency_time_.size();
}
}
}
else
Expand Down
13 changes: 13 additions & 0 deletions core/src/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,23 @@ void TaskContext::resetTimeStatistics()
return engine_->resetTimeStatistics();
}

std::shared_ptr<ExecutionEngine> TaskContext::engine() const
{
return engine_;
}

void TaskContext::setTaskLatencySource() { engine_->latency_source_ = true; }
void TaskContext::setTaskLatencyTarget() { engine_->latency_target_ = true; }

uint32_t PeerTask::actvityId() const
{
return father_->actvityId();
}

std::shared_ptr<ExecutionEngine> PeerTask::engine() const
{
return father_->engine();
}

} // end of namespace coco

4 changes: 3 additions & 1 deletion launcher/include/input_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class InputParser
"Print the xml template for all the components contained in the library.")
("web_server,w", boost::program_options::value<int>()->implicit_value(7707),
"Instantiate a web server that allows to view statics about the executions.")
("web_root,r", boost::program_options::value<std::string>(), "set document root for web server");
("web_root,r", boost::program_options::value<std::string>(), "set document root for web server")
("latency,L", boost::program_options::value<std::vector<std::string> >()->multitoken(),
"Set the two task between which calculate the latency. Peer are not valid.");

boost::program_options::store(boost::program_options::command_line_parser(argc_, argv_).
options(description_).run(), vm_);
Expand Down
23 changes: 21 additions & 2 deletions ros_launcher/src/coco_ros_launcher/src/coco_ros_launcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ void printStatistics(int interval)
void launchApp(const std::string & config_file_path, bool profiling,
const std::string &graph, int web_server_port,
const std::string& web_server_root,
std::unordered_set<std::string> disabled_component)
std::unordered_set<std::string> disabled_component,
std::vector<std::string> latency)
{
std::shared_ptr<coco::TaskGraphSpec> graph_spec(new coco::TaskGraphSpec());
coco::XmlParser parser;
Expand All @@ -106,6 +107,18 @@ void launchApp(const std::string & config_file_path, bool profiling,

loader->enableProfiling(profiling);

if (latency.size() != 0)
{
auto src_task = COCO_TASK(latency[0]);
auto dst_task = COCO_TASK(latency[1]);

if ((!src_task || !dst_task) || (coco::isPeer(src_task) || coco::isPeer(dst_task)))
COCO_FATAL() << "To use latency specify the name of two valid task.";

src_task->setTaskLatencySource();
dst_task->setTaskLatencyTarget();
}

if (!graph.empty())
loader->printGraph(graph);

Expand Down Expand Up @@ -159,8 +172,14 @@ int main(int argc, char **argv)
std::unordered_set<std::string> disabled_component;
for (auto & d : disabled)
disabled_component.insert(d);

std::vector<std::string> latency = options.getStringVector("latency");
if (latency.size() > 0 && latency.size() != 2)
{
COCO_FATAL() << "To calculate latency specify the name of two task. [-L task1 task2]";
}

launchApp(config_file, profiling, graph, port, root, disabled_component);
launchApp(config_file, profiling, graph, port, root, disabled_component, latency);

ros::Rate rate(100);
while (ros::ok())
Expand Down
4 changes: 3 additions & 1 deletion samples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ include_directories(${CMAKE_SOURCE_DIR}/core/include)
add_library(component_1 SHARED ${CMAKE_CURRENT_LIST_DIR}/src/component_1.cpp)
add_library(component_2 SHARED ${CMAKE_CURRENT_LIST_DIR}/src/component_2.cpp)
add_library(pipeline_comps SHARED ${CMAKE_CURRENT_LIST_DIR}/src/pipeline_comps.cpp)
add_library(component_latency SHARED ${CMAKE_CURRENT_LIST_DIR}/src/component_latency.cpp)

add_dependencies(component_1 coco)
target_link_libraries(component_1 coco)
add_dependencies(component_2 coco)
target_link_libraries(component_2 coco)
add_dependencies(pipeline_comps coco)
target_link_libraries(pipeline_comps coco)

add_dependencies(component_latency coco)
target_link_libraries(component_latency coco)
102 changes: 102 additions & 0 deletions samples/launch_files/config_latency.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<package>
<log>
<levels>0 1 2 3 4</levels>
<types>debug err log</types>
</log>
<paths>
<path>/home/pippo/Libraries/coco/build/lib/</path>
<path>/home/pippo/Libraries/coco/samples</path>
</paths>
<components>
<component>
<task>TaskLatStart</task>
<library>component_latency</library>
<attributes>
<attribute name="sleep_time" value="10" />
</attributes>
</component>
<component>
<task>TaskLatMiddle</task>
<name>middle_1</name>
<library>component_latency</library>
<attributes>
<attribute name="sleep_time" value="100" />
</attributes>
</component>
<component>
<task>TaskLatMiddle</task>
<name>middle_2</name>
<library>component_latency</library>
<attributes>
<attribute name="sleep_time" value="100" />
</attributes>
</component>
<component>
<task>TaskLatMiddle</task>
<name>middle_3</name>
<library>component_latency</library>
<attributes>
<attribute name="sleep_time" value="100" />
</attributes>
</component>
<component>
<task>TaskLatSink</task>
<library>component_latency</library>
<attributes>
<attribute name="sleep_time" value="100" />
</attributes>
</component>
</components>

<activities>
<activity>
<schedule activity="parallel" type="periodic" period="50"/>
<components>
<component name="TaskLatStart" />
</components>
</activity>
<activity>
<schedule activity="parallel" type="triggered"/>
<components>
<component name="middle_1" />
</components>
</activity>
<activity>
<schedule activity="parallel" type="triggered"/>
<components>
<component name="middle_2" />
</components>
</activity>
<activity>
<schedule activity="parallel" type="triggered"/>
<components>
<component name="middle_3" />
</components>
</activity>
<activity>
<schedule activity="parallel" type="triggered"/>
<components>
<component name="TaskLatSink" />
</components>
</activity>
</activities>

<connections>
<connection data="DATA" policy="LOCKED" transport="LOCAL" buffersize="10">
<src task="TaskLatStart" port="time_OUT"/>
<dest task="middle_1" port="time_IN"/>
</connection>
<connection data="DATA" policy="LOCKED" transport="LOCAL" buffersize="1">
<src task="middle_1" port="time_OUT"/>
<dest task="middle_2" port="time_IN"/>
</connection>
<connection data="DATA" policy="LOCKED" transport="LOCAL" buffersize="1">
<src task="middle_2" port="time_OUT"/>
<dest task="middle_3" port="time_IN"/>
</connection>
<connection data="DATA" policy="LOCKED" transport="LOCAL" buffersize="1">
<src task="middle_3" port="time_OUT"/>
<dest task="TaskLatSink" port="time_IN"/>
</connection>
</connections>
</package>
90 changes: 90 additions & 0 deletions samples/src/component_latency.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#include <chrono>
#include <thread>
#include <coco/coco.h>

class TaskLatStart : public coco::TaskContext
{
public:
coco::OutputPort<double> out_time_ = {this, "time_OUT"};
coco::Attribute<int long> asleep_time_ = {this, "sleep_time", sleep_time_};

void init() {}
void onConfig() {}

void onUpdate()
{
auto time = coco::util::time();
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time_));

out_time_.write(time);
}
private:
int long sleep_time_ = 10;
};

COCO_REGISTER(TaskLatStart)

class TaskLatMiddle : public coco::TaskContext
{
public:
coco::InputPort<double> in_time_ = {this, "time_IN", true};
coco::OutputPort<double> out_time_ = {this, "time_OUT"};

coco::Attribute<int long> asleep_time_ = {this, "sleep_time", sleep_time_};

void init() {}
void onConfig() {}

void onUpdate()
{
//if (this->instantiationName() == "middle_2")
std::cout << "Executing start" << std::endl;
double time;
in_time_.read(time);
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time_));

out_time_.write(time);

//if (this->instantiationName() == "middle_2")
std::cout << "Executing end" << std::endl;
}
private:
int long sleep_time_ = 10;
};

COCO_REGISTER(TaskLatMiddle)


class TaskLatSink : public coco::TaskContext
{
public:
coco::InputPort<double> in_time_ = {this, "time_IN", true};
coco::OutputPort<double> out_time_ = {this, "time_OUT"};
coco::Attribute<int long> asleep_time_ = {this, "sleep_time", sleep_time_};

void init() {}
void onConfig() {}

void onUpdate()
{
double time;
in_time_.read(time);

std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time_));

times_.push_back(coco::util::time() - time);
out_time_.write(time);

static int count = 1;
if (count++ % 10 == 0)
{
double sum = std::accumulate(times_.begin(), times_.end(), 0);
COCO_LOG(1) << "Latency: " << sum / times_.size();
}
}
private:
std::vector<double> times_;
int long sleep_time_ = 10;
};

COCO_REGISTER(TaskLatSink)

0 comments on commit c395481

Please sign in to comment.