Skip to content

Commit

Permalink
advanced sched and notes on latency
Browse files Browse the repository at this point in the history
  • Loading branch information
filippobrizzi committed Jul 20, 2016
1 parent 4474eac commit ad03cd0
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 28 deletions.
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ set(UTIL_INCLUDE_FILE ${CMAKE_CURRENT_LIST_DIR}/include/coco/util/generics.hpp
${CMAKE_CURRENT_LIST_DIR}/include/coco/util/accesses.hpp
${CMAKE_CURRENT_LIST_DIR}/include/coco/util/logging.h
${CMAKE_CURRENT_LIST_DIR}/include/coco/util/timing.h
)
${CMAKE_CURRENT_LIST_DIR}/include/coco/util/linux_sched.h)
set(WEB_SOURCE_FILE ${CMAKE_CURRENT_LIST_DIR}/src/web_server.cpp
)
set(WEB_INCLUDE_FILE ${CMAKE_CURRENT_LIST_DIR}/include/coco/web_server/web_server.h
Expand Down
16 changes: 14 additions & 2 deletions core/include/coco/execution.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,29 @@ struct SchedulePolicy
enum Policy
{
PERIODIC, //!< The activity executes periodically with a given period
HARD, //!<
TRIGGERED //!< The activity execution is triggered by an event port receiving data
};
/*! \brief Specify the realtime type of the activity
*/
enum RealTime
{
NONE,
FIFO,
RR,
DEADLINE
};

/*! \brief Base constructor with default values.
*/
explicit SchedulePolicy(Policy policy = PERIODIC, int period = 1)
: scheduling_policy(policy), period_ms(period) {}


Policy scheduling_policy; //!< Scheduling policy
RealTime realtime = NONE;
int period_ms; //!< In case of a periodic activity specifies the period in millisecon
int affinity = -1; //!< Specifies the core id where to pin the activity. If -1 no affinity
int priority = 0;
int runtime = 0;
std::list<unsigned int> available_core_id; //!< Contains the list of the available cores where the activity can run
};

Expand Down Expand Up @@ -254,6 +264,8 @@ class ExecutionEngine: public RunnableInterface
bool stopped_;

util::Timer timer_;

// double latency_time_start_;
};

} // end of namespace coco
10 changes: 4 additions & 6 deletions core/include/coco/task_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,10 @@ class InputPort: public PortBase
if (task_ == other->task())
return false;

std::shared_ptr<ConnectionBase> connection(makeConnection(
std::static_pointer_cast<InputPort<T> >(this->sharedPtr()),
other, policy));
std::shared_ptr<ConnectionBase> connection(
makeConnection(
std::static_pointer_cast<InputPort<T> >(this->sharedPtr()),
other, policy));
addConnection(connection);
other->addConnection(connection);
return true;
Expand All @@ -335,9 +336,6 @@ class InputPort: public PortBase
case ConnectionManagerType::FARM:
this->manager_ = std::make_shared<ConnectionManagerInputFarm<T> >();
break;
default:
COCO_FATAL() << "Invalid ConnectionManagerType " << static_cast<int>(type);
break;
}
}
};
Expand Down
67 changes: 67 additions & 0 deletions core/include/coco/util/linux_sched.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//
// Created by pippo on 14/07/16.
//

#pragma once

#ifdef __linux__

#include <sched.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>

#if defined(__i386__)

#ifndef __NR_sched_setattr
#define __NR_sched_setattr 351
#endif
#ifndef __NR_sched_getattr
#define __NR_sched_getattr 352
#endif

#elif defined(__x86_64__)

#ifndef __NR_sched_setattr
#define __NR_sched_setattr 314
#endif
#ifndef __NR_sched_getattr
#define __NR_sched_getattr 315
#endif

#endif /* i386 or x86_64 */

#if !defined(__NR_sched_setattr)
# error "Your arch does not support sched_setattr()"
#endif

#if !defined(__NR_sched_getattr)
# error "Your arch does not support sched_getattr()"
#endif

/* If not included in the headers, define sched deadline policy numbe */
#ifndef SCHED_DEADLINE
#define SCHED_DEADLINE 6
#endif


#define sched_setattr(pid, attr, flags) syscall(__NR_sched_setattr, pid, attr, flags)
#define sched_getattr(pid, attr, size, flags) syscall(__NR_sched_getattr, pid, attr, size, flags)

struct sched_attr
{
uint32_t size;
uint32_t sched_policy;
uint64_t sched_flags;
/* SCHED_NORMAL, SCHED_BATCH */
int32_t sched_nice;
/* SCHED_FIFO, SCHED_RR */
uint32_t sched_priority;
/* SCHED_DEADLINE */
uint64_t sched_runtime;
uint64_t sched_deadline;
uint64_t sched_period;
};


#endif
2 changes: 1 addition & 1 deletion core/include/coco/util/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ via Luigi Alamanni 13D, San Giuliano Terme 56010 (PI), Italy

# define COCO_ERR() coco::util::LogMessage(coco::util::Type::ERR, -1).stream()
# define COCO_FATAL() coco::util::LogMessage(coco::util::Type::FATAL, -1).stream()
# define COCO_SAMPLE(x, y) coco::util::LogMessageSampled(x, y).stream()
# define COCO_LOG_SAMPLE(x, y) coco::util::LogMessageSampled(x, y).stream()
# ifndef NDEBUG
# define COCO_DEBUG(x) coco::util::LogMessage(coco::util::Type::DEBUG, 0, x).stream()
# else
Expand Down
6 changes: 6 additions & 0 deletions core/include/coco/util/timing.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ namespace coco
namespace util
{

inline double time()
{
return std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}

struct TimeStatistics
{
unsigned long iterations;
Expand Down
60 changes: 57 additions & 3 deletions core/src/execution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
#include <thread>
#include <mutex>

#include <sys/types.h>
#define gettid() syscall(SYS_gettid)

#include "coco/util/timing.h"
#include "coco/util/linux_sched.h"

#include "coco/task.h"
#include "coco/register.h"

#include "coco/execution.h"


namespace coco
{
uint32_t Activity::guid_gen = 0;
Expand Down Expand Up @@ -189,6 +193,8 @@ std::thread::id ParallelActivity::threadId() const
void ParallelActivity::setSchedule()
{
#ifdef __linux__

/* Setting core affinity */
cpu_set_t cpu_set;
CPU_ZERO(&cpu_set);
if (policy_.affinity >= 0 &&
Expand All @@ -202,7 +208,48 @@ void ParallelActivity::setSchedule()

if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) < 0)
COCO_FATAL() << "Failed to set affinity on core: " << policy_.affinity;


/* Setting linux real time scheduler */
sched_attr sched;
sched.size = sizeof(sched_attr);

switch (policy_.realtime)
{
case SchedulePolicy::FIFO:
{
sched.sched_policy = SCHED_FIFO;
sched.sched_priority = policy_.priority;
break;
}
case SchedulePolicy::RR:
{
sched.sched_policy = SCHED_RR;
sched.sched_priority = policy_.priority;
break;
}
case SchedulePolicy::DEADLINE:
{
// setup_hr_tick()

sched.sched_policy = SCHED_DEADLINE;
sched.sched_runtime = policy_.runtime;
sched.sched_deadline = policy_.period_ms * 10e6; // Convert ms to ns
sched.sched_period = policy_.period_ms * 10e6; // Convert ms to ns
break;
}
case SchedulePolicy::NONE:
{
return;
}
}
int ret = sched_setattr(0, &sched, 0);
if (ret < 0)
{
COCO_FATAL() << "Failed to setattr for thread: " << getpid() << " with guid: " << guid_;

return;
}

#elif __APPLE__

#elif WIN
Expand All @@ -221,7 +268,6 @@ void ParallelActivity::entry()
/* PERIODIC */
if (isPeriodic())
{
std::chrono::system_clock::time_point next_start_time;
while (!stopping_)
{
auto now = std::chrono::system_clock::now();
Expand Down Expand Up @@ -282,6 +328,7 @@ void ExecutionEngine::init()
task_->setState(TaskState::INIT);
task_->onConfig();
COCO_DEBUG("Execution") << "[" << task_->instantiationName() << "] onConfig completed.";
//COCO_DEBUG("Execution") << "Task " << task_->instantiationName() << " is on thread: " << pthread_self() << ", " << getpid();
coco::ComponentRegistry::increaseConfigCompleted();
task_->setState(TaskState::IDLE);
}
Expand All @@ -299,9 +346,16 @@ void ExecutionEngine::step()

if (ComponentRegistry::profilingEnabled())
{
// Check if task is source in a latency calculus and store time.
// if (task is latency source)
// latency_time_start_= util::time();

timer_.start();
task_->onUpdate();
timer_.stop();

// if (task is latency target)
// latency_time_tot_ = util::time() - latency_time_start_;
}
else
{
Expand Down
14 changes: 14 additions & 0 deletions include/web_server_task.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include <coco/coco.h>


class WebServerTask : public coco::TaskContext
{
public:


coco::InputPort<std::string> in_message();
private:


};
COCO_REGISTER(WebServerTask)
3 changes: 3 additions & 0 deletions launcher/include/graph_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ struct ConnectionSpec
struct SchedulePolicySpec
{
std::string type;
std::string realtime;
int period;
int affinity;
int priority;
int runtime;
bool exclusive;
};

Expand Down
25 changes: 16 additions & 9 deletions launcher/src/graph_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ void GraphLoader::loadSchedule(const SchedulePolicySpec &policy_spec, SchedulePo
policy.scheduling_policy = SchedulePolicy::PERIODIC;
else
COCO_FATAL()<< "Schduele policy type: " << policy_spec.type
<< " is not know\n Possibilities are: triggered, periodic";
<< " is not know\n Possibilities are: triggered, periodic";

policy.period_ms = policy_spec.period;

policy.priority = policy_spec.priority;
policy.affinity = -1;
if (policy_spec.affinity >= 0)
{
Expand All @@ -98,9 +98,22 @@ void GraphLoader::loadSchedule(const SchedulePolicySpec &policy_spec, SchedulePo
else
{
COCO_FATAL() << "Core " << policy_spec.affinity
<< " either doesn't exist or it has already been assigned exclusivly to another activity!";
<< " either doesn't exist or it has already"
<< " been assigned exclusively to another activity!";
}
}

if (policy_spec.realtime == "fifo")
policy.realtime = SchedulePolicy::FIFO;
else if (policy_spec.realtime == "rr")
policy.realtime = SchedulePolicy::RR;
else if (policy_spec.realtime == "deadline")
policy.realtime = SchedulePolicy::DEADLINE;
else
policy.realtime = SchedulePolicy::NONE;

policy.priority = policy_spec.priority;
policy.runtime = policy_spec.runtime;
}

void GraphLoader::startActivity(std::unique_ptr<ActivitySpec> &activity_spec)
Expand Down Expand Up @@ -368,8 +381,6 @@ bool GraphLoader::loadTask(std::shared_ptr<TaskSpec> & task_spec,
void GraphLoader::makeConnection(
std::unique_ptr<ConnectionSpec> &connection_spec)
{
std::cout << "Making connection" << std::endl;

ConnectionPolicy policy(connection_spec->policy.data,
connection_spec->policy.policy,
connection_spec->policy.transport,
Expand All @@ -385,9 +396,6 @@ void GraphLoader::makeConnection(
return;
}

std::cout << "Connection: " << connection_spec->src_task->instance_name << " " << connection_spec->dest_task->instance_name << std::endl;


if (src_task->second->isOnSameThread(dest_task->second))
policy.lock_policy = ConnectionPolicy::UNSYNC;

Expand All @@ -406,7 +414,6 @@ void GraphLoader::makeConnection(
<< " or Component in: " << connection_spec->dest_task->instance_name
<< " doesn't have port: " << connection_spec->dest_port;
}
std::cout << "Connection completed" << std::endl;
}

void GraphLoader::startApp()
Expand Down
Loading

0 comments on commit ad03cd0

Please sign in to comment.