Skip to content

Commit

Permalink
outlier detection framework (#142)
Browse files Browse the repository at this point in the history
This is the outline of an outlier detection framework. It doesn't actually
do anything right now. I want to get this deployed just to make sure that
the basic flows look good and then I will start work on actual detection
algorithms.
  • Loading branch information
mattklein123 authored Oct 12, 2016
1 parent 2a80f8f commit 673bcc9
Show file tree
Hide file tree
Showing 22 changed files with 410 additions and 52 deletions.
6 changes: 6 additions & 0 deletions include/envoy/upstream/host_description.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "envoy/stats/stats_macros.h"
#include "envoy/upstream/outlier_detection.h"

namespace Upstream {

Expand Down Expand Up @@ -42,6 +43,11 @@ class HostDescription {
*/
virtual const Cluster& cluster() const PURE;

/**
* @return the host's outlier detection sink.
*/
virtual OutlierDetectorHostSink& outlierDetector() const PURE;

/**
* @return the URL used to connect to the host.
*/
Expand Down
53 changes: 53 additions & 0 deletions include/envoy/upstream/outlier_detection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#pragma once

#include "envoy/common/pure.h"

namespace Upstream {

class Host;
typedef std::shared_ptr<Host> HostPtr;

/**
* Sink for per host data. Proxy filters should send pertinent data when available.
*/
class OutlierDetectorHostSink {
public:
virtual ~OutlierDetectorHostSink() {}

/**
* Add an HTTP response code for a host.
*/
virtual void putHttpResponseCode(uint64_t code) PURE;

/**
* Add a response time for a host (in this case response time is generic and might be used for
* different operations including HTTP, Mongo, Redis, etc.).
*/
virtual void putResponseTime(std::chrono::milliseconds time) PURE;
};

typedef std::unique_ptr<OutlierDetectorHostSink> OutlierDetectorHostSinkPtr;

/**
* Interface for an outlier detection engine. Uses per host data to determine which hosts in a
* cluster are outliers and should be ejected.
*/
class OutlierDetector {
public:
virtual ~OutlierDetector() {}

/**
* Outlier detection change state callback.
*/
typedef std::function<void(HostPtr host)> ChangeStateCb;

/**
* Add a changed state callback to the detector. The callback will be called whenever any host
* changes state (either ejected or brought back in) due to outlier status.
*/
virtual void addChangedStateCb(ChangeStateCb cb) PURE;
};

typedef std::unique_ptr<OutlierDetector> OutlierDetectorPtr;

} // Upstream
17 changes: 12 additions & 5 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@

namespace Upstream {

class Host;
typedef std::shared_ptr<Host> HostPtr;
typedef std::shared_ptr<const Host> ConstHostPtr;

/**
* An upstream host.
*/
Expand All @@ -24,7 +20,9 @@ class Host : virtual public HostDescription {

enum class HealthFlag {
// The host is currently failing active health checks.
FAILED_ACTIVE_HC = 0x1
FAILED_ACTIVE_HC = 0x1,
// The host is currently considered an outlier and has been ejected.
FAILED_OUTLIER_CHECK = 0x02
};

/**
Expand Down Expand Up @@ -69,6 +67,13 @@ class Host : virtual public HostDescription {
*/
virtual bool healthy() const PURE;

/**
* Set the host's outlier detector. Outlier detectors are assumed to be thread safe, however
* a new outlier detector must be installed before the host is used across threads. Thus,
* this routine should only be called on the main thread before the host is used across threads.
*/
virtual void setOutlierDetector(OutlierDetectorHostSinkPtr&& outlier_detector) PURE;

/**
* @return the current load balancing weight of the host, in the range 1-100.
*/
Expand All @@ -80,6 +85,8 @@ class Host : virtual public HostDescription {
virtual void weight(uint32_t new_weight) PURE;
};

typedef std::shared_ptr<const Host> ConstHostPtr;

/**
* Base host set interface. This is used both for clusters, as well as per thread/worker host sets
* used during routing/forwarding.
Expand Down
1 change: 1 addition & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ add_library(
upstream/host_utility.cc
upstream/load_balancer_impl.cc
upstream/logical_dns_cluster.cc
upstream/outlier_detection_impl.cc
upstream/sds.cc
upstream/upstream_impl.cc
${gen_git_sha_target})
Expand Down
51 changes: 24 additions & 27 deletions source/common/http/codes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,33 +59,30 @@ void CodeUtility::chargeResponseStat(const ResponseStatInfo& info) {
}

void CodeUtility::chargeResponseTiming(const ResponseTimingInfo& info) {
if (DateUtil::timePointValid(info.request_send_time_)) {
std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - info.request_send_time_);

info.store_.deliverTimingToSinks(info.prefix_ + "upstream_rq_time", ms);
if (info.upstream_canary_) {
info.store_.deliverTimingToSinks(info.prefix_ + "canary.upstream_rq_time", ms);
}

if (info.internal_request_) {
info.store_.deliverTimingToSinks(info.prefix_ + "internal.upstream_rq_time", ms);
} else {
info.store_.deliverTimingToSinks(info.prefix_ + "external.upstream_rq_time", ms);
}

if (!info.request_vcluster_name_.empty()) {
info.store_.deliverTimingToSinks("vhost." + info.request_vhost_name_ + ".vcluster." +
info.request_vcluster_name_ + ".upstream_rq_time",
ms);
}

// Handle per zone stats.
if (!info.from_zone_.empty() && !info.to_zone_.empty()) {
info.store_.deliverTimingToSinks(fmt::format("{}zone.{}.{}.upstream_rq_time", info.prefix_,
info.from_zone_, info.to_zone_),
ms);
}
info.store_.deliverTimingToSinks(info.prefix_ + "upstream_rq_time", info.response_time_);
if (info.upstream_canary_) {
info.store_.deliverTimingToSinks(info.prefix_ + "canary.upstream_rq_time", info.response_time_);
}

if (info.internal_request_) {
info.store_.deliverTimingToSinks(info.prefix_ + "internal.upstream_rq_time",
info.response_time_);
} else {
info.store_.deliverTimingToSinks(info.prefix_ + "external.upstream_rq_time",
info.response_time_);
}

if (!info.request_vcluster_name_.empty()) {
info.store_.deliverTimingToSinks("vhost." + info.request_vhost_name_ + ".vcluster." +
info.request_vcluster_name_ + ".upstream_rq_time",
info.response_time_);
}

// Handle per zone stats.
if (!info.from_zone_.empty() && !info.to_zone_.empty()) {
info.store_.deliverTimingToSinks(
fmt::format("{}zone.{}.{}.upstream_rq_time", info.prefix_, info.from_zone_, info.to_zone_),
info.response_time_);
}
}

Expand Down
3 changes: 1 addition & 2 deletions source/common/http/codes.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include "envoy/common/time.h"
#include "envoy/http/codes.h"
#include "envoy/http/header_map.h"
#include "envoy/stats/stats.h"
Expand Down Expand Up @@ -40,7 +39,7 @@ class CodeUtility {
struct ResponseTimingInfo {
Stats::Store& store_;
const std::string& prefix_;
SystemTime request_send_time_;
std::chrono::milliseconds response_time_;
bool upstream_canary_;
bool internal_request_;
const std::string& request_vhost_name_;
Expand Down
22 changes: 15 additions & 7 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ void Filter::chargeUpstreamCode(const Http::HeaderMap& response_headers) {
bool is_canary = (response_headers.get(Http::Headers::get().EnvoyUpstreamCanary) == "true") ||
(upstream_host_ ? upstream_host_->canary() : false);

if (upstream_host_) {
upstream_host_->outlierDetector().putHttpResponseCode(
Http::Utility::getResponseStatus(response_headers));
}

Http::CodeUtility::ResponseStatInfo info{
config_.stats_store_, stat_prefix_, response_headers,
downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true",
Expand Down Expand Up @@ -430,11 +435,16 @@ void Filter::onUpstreamComplete() {
upstream_request_->upstream_encoder_->resetStream();
}

if (config_.emit_dynamic_stats_ && !callbacks_->requestInfo().healthCheck()) {
if (config_.emit_dynamic_stats_ && !callbacks_->requestInfo().healthCheck() &&
DateUtil::timePointValid(upstream_request_->upstream_encoder_->requestCompleteTime())) {
std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() -
upstream_request_->upstream_encoder_->requestCompleteTime());

upstream_host_->outlierDetector().putResponseTime(response_time);

Http::CodeUtility::ResponseTimingInfo info{
config_.stats_store_, stat_prefix_,
upstream_request_->upstream_encoder_->requestCompleteTime(),
upstream_request_->upstream_canary_,
config_.stats_store_, stat_prefix_, response_time, upstream_request_->upstream_canary_,
downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true",
route_->virtualHostName(), request_vcluster_ ? request_vcluster_->name() : "",
config_.service_zone_, upstreamZone()};
Expand All @@ -443,9 +453,7 @@ void Filter::onUpstreamComplete() {

for (const std::string& alt_prefix : alt_stat_prefixes_) {
Http::CodeUtility::ResponseTimingInfo info{
config_.stats_store_, alt_prefix,
upstream_request_->upstream_encoder_->requestCompleteTime(),
upstream_request_->upstream_canary_,
config_.stats_store_, alt_prefix, response_time, upstream_request_->upstream_canary_,
downstream_headers_->get(Http::Headers::get().EnvoyInternalRequest) == "true", "", "",
config_.service_zone_, upstreamZone()};

Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ void ClusterManagerImpl::loadCluster(const Json::Object& cluster, Stats::Store&
}
}

new_cluster->setOutlierDetector(OutlierDetectorImplFactory::createForCluster(
*new_cluster, cluster, dns_resolver.dispatcher()));
primary_clusters_.emplace(new_cluster->name(), new_cluster);
}

Expand Down
3 changes: 3 additions & 0 deletions source/common/upstream/logical_dns_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class LogicalDnsCluster : public ClusterImplBase {
// Upstream:HostDescription
bool canary() const override { return false; }
const Cluster& cluster() const override { return logical_host_->cluster(); }
OutlierDetectorHostSink& outlierDetector() const override {
return logical_host_->outlierDetector();
}
const HostStats& stats() const override { return logical_host_->stats(); }
const std::string& url() const override { return url_; }
const std::string& zone() const override { return EMPTY_STRING; }
Expand Down
44 changes: 44 additions & 0 deletions source/common/upstream/outlier_detection_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include "outlier_detection_impl.h"

#include "common/common/assert.h"

namespace Upstream {

OutlierDetectorPtr OutlierDetectorImplFactory::createForCluster(Cluster& cluster,
const Json::Object& cluster_config,
Event::Dispatcher& dispatcher) {
// Right now we don't support any configuration but in order to make the config backwards
// compatible we just look for an empty object.
if (cluster_config.hasObject("outlier_detection")) {
return OutlierDetectorPtr{new OutlierDetectorImpl(cluster, dispatcher)};
} else {
return nullptr;
}
}

OutlierDetectorImpl::OutlierDetectorImpl(Cluster& cluster, Event::Dispatcher&) {
for (HostPtr host : cluster.hosts()) {
addHostSink(host);
}

cluster.addMemberUpdateCb([this](const std::vector<HostPtr>& hosts_added,
const std::vector<HostPtr>& hosts_removed) -> void {
for (HostPtr host : hosts_added) {
addHostSink(host);
}

for (HostPtr host : hosts_removed) {
ASSERT(host_sinks_.count(host) == 1);
host_sinks_.erase(host);
}
});
}

void OutlierDetectorImpl::addHostSink(HostPtr host) {
ASSERT(host_sinks_.count(host) == 0);
OutlierDetectorHostSinkImpl* sink = new OutlierDetectorHostSinkImpl();
host_sinks_[host] = sink;
host->setOutlierDetector(OutlierDetectorHostSinkPtr{sink});
}

} // Upstream
58 changes: 58 additions & 0 deletions source/common/upstream/outlier_detection_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include "envoy/upstream/outlier_detection.h"
#include "envoy/upstream/upstream.h"

#include "common/json/json_loader.h"

namespace Upstream {

/**
* Null host sink implementation.
*/
class OutlierDetectorHostSinkNullImpl : public OutlierDetectorHostSink {
public:
// Upstream::OutlierDetectorHostSink
void putHttpResponseCode(uint64_t) override {}
void putResponseTime(std::chrono::milliseconds) override {}
};

/**
* Factory for creating a detector from a JSON configuration.
*/
class OutlierDetectorImplFactory {
public:
static OutlierDetectorPtr createForCluster(Cluster& cluster, const Json::Object& cluster_config,
Event::Dispatcher& dispatcher);
};

/**
* Implementation of OutlierDetectorHostSink for the generic detector.
*/
class OutlierDetectorHostSinkImpl : public OutlierDetectorHostSink {
public:
// Upstream::OutlierDetectorHostSink
void putHttpResponseCode(uint64_t) override {}
void putResponseTime(std::chrono::milliseconds) override {}
};

/**
* An implementation of an outlier detector. In the future we may support multiple outlier detection
* implementations with different configuration. For now, as we iterate everything is contained
* within this implementation.
*/
class OutlierDetectorImpl : public OutlierDetector {
public:
OutlierDetectorImpl(Cluster& cluster, Event::Dispatcher& dispatcher);

// Upstream::OutlierDetector
void addChangedStateCb(ChangeStateCb cb) override { callbacks_.push_back(cb); }

private:
void addHostSink(HostPtr host);

std::list<ChangeStateCb> callbacks_;
std::unordered_map<HostPtr, OutlierDetectorHostSinkImpl*> host_sinks_;
};

} // Upstream
2 changes: 1 addition & 1 deletion source/common/upstream/resource_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ResourceManagerImpl : public ResourceManager {
const uint64_t max_;
std::atomic<uint64_t> current_{};
Runtime::Loader& runtime_;
std::string runtime_key_;
const std::string runtime_key_;
};

ResourceImpl connections_;
Expand Down
Loading

0 comments on commit 673bcc9

Please sign in to comment.