Skip to content

Commit

Permalink
Merge branch 'optional-timesync'
Browse files Browse the repository at this point in the history
  • Loading branch information
tilsche committed May 24, 2019
2 parents 4e0fe58 + 0c67340 commit 39d7e06
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 91 deletions.
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ include(cmake/DefaultBuildType.cmake)
include(cmake/GitSubmoduleUpdate.cmake)
git_submodule_update()

# We're building a shared library. D'oh.
set(NITRO_POSITION_INDEPENDENT_CODE ON CACHE INTERNAL "")
set(METRICQ_POSITION_INDEPENDENT_CODE ON CACHE INTERNAL "")
add_subdirectory(lib)

find_package(FFTW3)

add_library(metricq_plugin
MODULE
src/main.cpp
src/log.cpp

src/timesync/footprint.cpp
src/timesync/shifter.cpp
Expand All @@ -24,6 +26,7 @@ target_link_libraries(metricq_plugin
PRIVATE
Scorep::scorep-plugin-cxx
metricq::sink
metricq::logger-nitro
FFTW3::fftw3
)

Expand Down
58 changes: 49 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,44 @@
# Usage
## Usage
### Environment Variables

### Required Environment Variables
#### General

SCOREP_METRIC_PLUGINS=metricq_plugin
SCOREP_METRIC_METRICQ_PLUGIN=metric.name
SCOREP_METRIC_METRICQ_PLUGIN_SERVER=amqps://user:pass@hostname
* `SCOREP_METRIC_PLUGINS=metricq_plugin`

* `SCOREP_METRIC_METRICQ_PLUGIN` (required)

Comma-separated list of metrics.
Metrics can also contain wildcards (`*`)

* `SCOREP_METRIC_METRICQ_PLUGIN_SERVER` (required)

URL to the main MetricQ AMQP server including user/password.

* `SCOREP_METRIC_METRICQ_PLUGIN_TIMEOUT` (optional, default: 1 hour)

Maximum duration of the experiment.
It is strongly recommended to specify a value.
If the value is too large and your measurement crashes, it may result in
resource exhaustion of the measurement infrastructure.
The value is a duration string which will be parsed, e.g. `1 hour`.

* `SCOREP_METRIC_METRICQ_PLUGIN_TOKEN` (optional, default: "sink-scorep")

Base token to use within MetricQ.
A uuid is added internally. Default:

### Optional Environment Variables
* `SCOREP_METRIC_METRICQ_PLUGIN_VERBOSE` (optional, default: `info`)

SCOREP_METRIC_METRICQ_PLUGIN_VERBOSE=debug
SCOREP_METRIC_METRICQ_PLUGIN_TIMEOUT=600s
Control output verbosity. Use one of `trace,debug,info,warn,error,fatal`.

* `SCOREP_METRIC_METRICQ_PLUGIN_AVERAGE` (optional, default: `0`)

Combine multiple high-resolution (>= 1 kSa/s) values to one.
Set to e.g. `8` to reduce the number of values by a factor of `8`.
A setting of `0` disables averaging.


#### Time synchronization

Control the time synchronization.
Each sync phase will last `quantum * 2 ^ exponent + 2 * tolerance`.
Expand All @@ -18,7 +47,18 @@ Each sync phase will last `quantum * 2 ^ exponent + 2 * tolerance`.
SCOREP_METRIC_METRICQ_PLUGIN_SYNC_QUANTUM=1ms
SCOREP_METRIC_METRICQ_PLUGIN_SYNC_TOLERANCE=2s

Because Score-P default settings are insufficient:
* `SCOREP_METRIC_METRICQ_PLUGIN_CORRELATION_FILE` (optional)

Prefix for writing a file containing correlation values for all offsets.
This is only used for the most hardcore timesync debugging.

Time synchronization is only applied to metrics with >= 1 kSa/s.
It uses the first of such metrics to determine the offset, so be wary of the order in which metrics are specified.
Using wildcards is not recommended with that.

#### Recommended Score-P settings

Because the Score-P default settings are not appropriate for many use-cases:

SCOREP_ENABLE_PROFILING=false
SCOREP_ENABLE_TRACING=true
2 changes: 1 addition & 1 deletion lib/metricq
Submodule metricq updated 47 files
+3 −0 .gitmodules
+1 −2 README.md
+2 −1 examples/metricq-sink-dummy/src/dummy_sink.cpp
+1 −1 include/metricq/logger.hpp
+9 −0 include/metricq/metric.hpp
+2 −1 include/metricq/rpc_request.hpp
+11 −2 include/metricq/simple.hpp
+10 −0 lib/CMakeLists.txt
+1 −0 lib/date
+1 −1 lib/nitro
+25 −0 pyproject.toml
+2 −2 python/examples/metricq_client.py
+51 −24 python/examples/metricq_get_history.py
+9 −9 python/examples/metricq_rpc_test.py
+13 −11 python/examples/metricq_sink.py
+17 −15 python/examples/metricq_source.py
+7 −5 python/examples/metricq_sychronous_multi_source.py
+11 −14 python/examples/metricq_synchronous_source.py
+1 −1 python/metricq/__init__.py
+145 −70 python/metricq/agent.py
+45 −12 python/metricq/client.py
+19 −8 python/metricq/data_client.py
+238 −52 python/metricq/history_client.py
+8 −4 python/metricq/interval_source.py
+2 −0 python/metricq/logging.py
+12 −5 python/metricq/rpc.py
+20 −14 python/metricq/sink.py
+14 −13 python/metricq/source.py
+6 −0 python/metricq/source_metric.py
+17 −17 python/metricq/synchronous_source.py
+144 −2 python/metricq/types.py
+41 −30 setup.py
+6 −6 src/chrono.cpp
+1 −1 src/db.cpp
+27 −4 src/history.proto
+1 −1 src/history_client.cpp
+1 −1 src/log.cpp
+2 −2 src/log.hpp
+22 −2 src/simple.cpp
+8 −1 src/sink.cpp
+7 −2 src/source.cpp
+2 −1 src/transformer.cpp
+4 −4 tests/amqpcpp_close_test.cpp
+5 −1 tools/metricq-logger-nitro/CMakeLists.txt
+21 −14 tools/metricq-logger-nitro/include/metricq/logger/nitro.hpp
+0 −1 tools/metricq-summary/src/run_command.cpp
+0 −1 tools/metricq-summary/src/summary.cpp
47 changes: 0 additions & 47 deletions src/log.cpp

This file was deleted.

6 changes: 0 additions & 6 deletions src/log.hpp

This file was deleted.

80 changes: 63 additions & 17 deletions src/main.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#include "log.hpp"
#include "timesync/timesync.hpp"

#include <metricq/logger/nitro.hpp>
#include <metricq/metadata.hpp>
#include <metricq/ostream.hpp>
#include <metricq/simple.hpp>
#include <metricq/simple_drain.hpp>
#include <metricq/types.hpp>

#include <scorep/plugin/plugin.hpp>

#include <nitro/format.hpp>
#include <nitro/lang/enumerate.hpp>

#include <chrono>
Expand All @@ -19,6 +21,8 @@

using namespace scorep::plugin::policy;

using Log = metricq::logger::nitro::Log;

// Must be system clock for real epoch!
using local_clock = std::chrono::system_clock;

Expand All @@ -37,8 +41,20 @@ std::vector<T> keys(std::map<T, V>& map)
struct Metric
{
std::string name;
bool use_timesync;
bool use_average;
};

void replace_all(std::string& str, const std::string& from, const std::string& to)
{
size_t start_pos;
while ((start_pos = str.find(from, start_pos)) != std::string::npos)
{
str.replace(start_pos, from.length(), to);
start_pos += to.length();
}
}

template <typename T, typename Policies>
using handle_oid_policy = object_id<Metric, T, Policies>;

Expand All @@ -48,32 +64,50 @@ class metricq_plugin : public scorep::plugin::base<metricq_plugin, async, once,
public:
metricq_plugin()
: url_(scorep::environment_variable::get("SERVER")),
token_(scorep::environment_variable::get("TOKEN", "scorepPlugin")),
token_(scorep::environment_variable::get("TOKEN", "sink-scorep")),
average_(std::stoi(scorep::environment_variable::get("AVERAGE", "0")))
{
initialize_logger();
metricq::logger::nitro::initialize();
auto log_verbose = scorep::environment_variable::get("VERBOSE", "WARN");
auto level =
nitro::log::severity_from_string(log_verbose, nitro::log::severity_level::info);
metricq::logger::nitro::set_severity(level);
}

std::vector<scorep::plugin::metric_property> get_metric_properties(const std::string& s)
private:
auto get_metadata(const std::string& s)
{
std::vector<scorep::plugin::metric_property> result;
std::string selector = s;
bool is_regex = (selector.find("*") != std::string::npos);

auto selector = s;
if (selector == "*")
if (!is_regex)
{
selector = "";
return metricq::get_metadata(url_, token_, std::vector<std::string>({ selector }));
}
auto metadata = metricq::get_metadata(url_, token_, selector);

replace_all(selector, ".", "\\.");
replace_all(selector, "*", ".*");
selector = nitro::format("^{}$") % selector;
return metricq::get_metadata(url_, token_, selector);
}

public:
std::vector<scorep::plugin::metric_property> get_metric_properties(const std::string& s)
{
auto metadata = get_metadata(s);
std::vector<scorep::plugin::metric_property> result;
for (const auto& elem : metadata)
{
const auto& name = elem.first;
const auto& meta = elem.second;
make_handle(name, Metric{ name });
auto use_timesync = !std::isnan(meta.rate()) and meta.rate() >= 1000;
auto use_average = use_timesync && average_;
make_handle(name, Metric{ name, use_timesync, use_average });

auto property = scorep::plugin::metric_property(name, meta.description(), meta.unit())
.value_double();

if (average_)
if (use_average)
{
property.absolute_last();
}
Expand Down Expand Up @@ -111,7 +145,7 @@ class metricq_plugin : public scorep::plugin::base<metricq_plugin, async, once,
{
convert_.synchronize_point();
auto timeout_str = scorep::environment_variable::get("TIMEOUT");
auto timeout = metricq::duration_parse(timeout_str);
metricq::Duration timeout;
if (timeout_str.empty())
{
Log::warn()
Expand All @@ -125,7 +159,7 @@ class metricq_plugin : public scorep::plugin::base<metricq_plugin, async, once,
{
try
{
timeout = std::chrono::seconds(std::stoll(timeout_str));
timeout = metricq::duration_parse(timeout_str);
if (timeout.count() <= 0)
{
throw std::out_of_range("");
Expand Down Expand Up @@ -158,7 +192,7 @@ class metricq_plugin : public scorep::plugin::base<metricq_plugin, async, once,
for (auto& metric : get_handles())
{
// XXX sync with first metric
if (!cc_synced_)
if (metric.use_timesync && !cc_synced_)
{
try
{
Expand All @@ -176,6 +210,18 @@ class metricq_plugin : public scorep::plugin::base<metricq_plugin, async, once,
}
}

scorep::chrono::ticks convert_time_(metricq::TimePoint time, Metric& metric)
{
if (metric.use_timesync)
{
return convert_.to_ticks(cc_time_sync_.to_local(time));
}
else
{
return convert_.to_ticks(time);
}
}

template <class Cursor>
void get_all_values(Metric& metric, Cursor& c)
{
Expand All @@ -186,7 +232,7 @@ class metricq_plugin : public scorep::plugin::base<metricq_plugin, async, once,
return;
}

if (average_)
if (metric.use_average)
{
int count = 0;
double sum = 0.;
Expand All @@ -196,7 +242,7 @@ class metricq_plugin : public scorep::plugin::base<metricq_plugin, async, once,
count++;
if (count == average_)
{
c.write(convert_.to_ticks(cc_time_sync_.to_local(tv.time)), sum / average_);
c.write(convert_time_(tv.time, metric), sum / average_);
count = 0;
sum = 0.;
}
Expand All @@ -206,7 +252,7 @@ class metricq_plugin : public scorep::plugin::base<metricq_plugin, async, once,
{
for (auto& tv : data)
{
c.write(convert_.to_ticks(cc_time_sync_.to_local(tv.time)), tv.value);
c.write(convert_time_(tv.time, metric), tv.value);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/timesync/fft.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <metricq/logger/nitro.hpp>

#include <fftw3.h>

#include <algorithm>
Expand All @@ -11,6 +13,8 @@
#include <climits>
#include <cstddef>

using Log = metricq::logger::nitro::Log;

using complex_type = std::complex<double>;

static_assert(sizeof(complex_type) == sizeof(fftw_complex), "You're fucked.");
Expand Down
4 changes: 2 additions & 2 deletions src/timesync/footprint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

#include "msequence.hpp"

#include "../log.hpp"

#include <metricq/logger/nitro.hpp>
#include <metricq/types.hpp>

#include <sched.h>
Expand All @@ -19,6 +18,7 @@ using Duration = metricq::Duration;

namespace timesync
{
using Log = metricq::logger::nitro::Log;

uint64_t sqrtsd_loop_(double* buffer, uint64_t elems, uint64_t repeat);

Expand Down
Loading

0 comments on commit 39d7e06

Please sign in to comment.