From abd64f94f2d44ce6a1f36164d208e78a6f755b1e Mon Sep 17 00:00:00 2001 From: Nicolas Flacco <47160394+FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg@users.noreply.github.com> Date: Tue, 10 Sep 2019 15:01:13 -0700 Subject: [PATCH] redis: add a request time metric to redis upstream (#7890) Signed-off-by: Nicolas Flacco --- .../network/redis_proxy/v2/redis_proxy.proto | 4 + .../arch_overview/other_protocols/redis.rst | 16 +++ docs/root/intro/version_history.rst | 1 + .../clusters/redis/redis_cluster.cc | 8 +- .../extensions/clusters/redis/redis_cluster.h | 2 + .../filters/network/common/redis/BUILD | 17 +++ .../filters/network/common/redis/client.h | 10 +- .../network/common/redis/client_impl.cc | 58 ++++++--- .../network/common/redis/client_impl.h | 23 +++- .../common/redis/redis_command_stats.cc | 110 ++++++++++++++++++ .../common/redis/redis_command_stats.h | 66 +++++++++++ .../filters/network/redis_proxy/BUILD | 1 + .../filters/network/redis_proxy/config.cc | 6 +- .../network/redis_proxy/conn_pool_impl.cc | 11 +- .../network/redis_proxy/conn_pool_impl.h | 4 +- .../extensions/health_checkers/redis/redis.cc | 10 +- .../extensions/health_checkers/redis/redis.h | 2 + .../clusters/redis/redis_cluster_test.cc | 4 +- .../network/common/redis/client_impl_test.cc | 15 ++- .../redis_proxy/conn_pool_impl_test.cc | 8 +- .../health_checkers/redis/redis_test.cc | 5 +- 21 files changed, 346 insertions(+), 35 deletions(-) create mode 100644 source/extensions/filters/network/common/redis/redis_command_stats.cc create mode 100644 source/extensions/filters/network/common/redis/redis_command_stats.h diff --git a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto index 35d1110df12c..78c56bb2efe6 100644 --- a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto +++ b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto @@ -88,6 +88,10 @@ message RedisProxy { // downstream unchanged. This limit defaults to 100. google.protobuf.UInt32Value max_upstream_unknown_connections = 6; + // Enable per-command statistics per upstream cluster, in addition to the filter level aggregate + // count. + bool enable_command_stats = 8; + // ReadPolicy controls how Envoy routes read commands to Redis nodes. This is currently // supported for Redis Cluster. All ReadPolicy settings except MASTER may return stale data // because replication is asynchronous and requires some delay. You need to ensure that your diff --git a/docs/root/intro/arch_overview/other_protocols/redis.rst b/docs/root/intro/arch_overview/other_protocols/redis.rst index 47e55418c679..e96cbe6a3e58 100644 --- a/docs/root/intro/arch_overview/other_protocols/redis.rst +++ b/docs/root/intro/arch_overview/other_protocols/redis.rst @@ -60,6 +60,8 @@ If passive healthchecking is desired, also configure For the purposes of passive healthchecking, connect timeouts, command timeouts, and connection close map to 5xx. All other responses from Redis are counted as a success. +.. _arch_overview_redis_cluster_support: + Redis Cluster Support (Experimental) ---------------------------------------- @@ -90,6 +92,20 @@ Every Redis cluster has its own extra statistics tree rooted at *cluster.. max_upstream_unknown_connections_reached, Counter, Total number of times that an upstream connection to an unknown host is not created after redirection having reached the connection pool's max_upstream_unknown_connections limit upstream_cx_drained, Counter, Total number of upstream connections drained of active requests before being closed + upstream_commands.upstream_rq_time, Histogram, Histogram of upstream request times for all types of requests + +.. _arch_overview_redis_cluster_command_stats: + +Per-cluster command statistics can be enabled via the setting :ref:`enable_command_stats `: + +.. csv-table:: + :header: Name, Type, Description + :widths: 1, 1, 2 + + upstream_commands.[command].success, Counter, Total number of successful requests for a specific Redis command + upstream_commands.[command].error, Counter, Total number of failed or cancelled requests for a specific Redis command + upstream_commands.[command].total, Counter, Total number of requests for a specific Redis command (sum of success and error) + upstream_commands.[command].latency, Histogram, Latency of requests for a specific Redis command Supported commands ------------------ diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index d641e1d098db..679ea4f9e769 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -38,6 +38,7 @@ Version history * performance: new buffer implementation enabled by default (to disable add "--use-libevent-buffers 1" to the command-line arguments when starting Envoy). * performance: stats symbol table implementation (disabled by default; to test it, add "--use-fake-symbol-table 0" to the command-line arguments when starting Envoy). * rbac: added support for DNS SAN as :ref:`principal_name `. +* redis: added :ref:`enable_command_stats ` to enable :ref:`per command statistics ` for upstream clusters. * redis: added :ref:`read_policy ` to allow reading from redis replicas for Redis Cluster deployments. * regex: introduce new :ref:`RegexMatcher ` type that provides a safe regex implementation for untrusted user input. This type is now used in all diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index dc41651b0ca1..cf6f1d9f5bad 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -168,7 +168,10 @@ RedisCluster::RedisDiscoverySession::RedisDiscoverySession( NetworkFilters::Common::Redis::Client::ClientFactory& client_factory) : parent_(parent), dispatcher_(parent.dispatcher_), resolve_timer_(parent.dispatcher_.createTimer([this]() -> void { startResolveRedis(); })), - client_factory_(client_factory), buffer_timeout_(0) {} + client_factory_(client_factory), buffer_timeout_(0), + redis_command_stats_( + NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats( + parent_.info()->statsScope().symbolTable())) {} // Convert the cluster slot IP/Port response to and address, return null if the response does not // match the expected type. @@ -249,7 +252,8 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { if (!client) { client = std::make_unique(*this); client->host_ = current_host_address_; - client->client_ = client_factory_.create(host, dispatcher_, *this); + client->client_ = client_factory_.create(host, dispatcher_, *this, redis_command_stats_, + parent_.info()->statsScope()); client->client_->addConnectionCallbacks(*client); std::string auth_password = Envoy::Config::DataSource::read(parent_.auth_password_datasource_, true, parent_.api_); diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index f4038b7629f1..bec0ac965031 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -214,6 +214,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { uint32_t maxBufferSizeBeforeFlush() const override { return 0; } std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; } uint32_t maxUpstreamUnknownConnections() const override { return 0; } + bool enableCommandStats() const override { return false; } // This is effectively not in used for making the "Cluster Slots" calls. // since we call cluster slots on both the master and slaves, ANY is more appropriate here. Extensions::NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const override { @@ -241,6 +242,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { Event::TimerPtr resolve_timer_; NetworkFilters::Common::Redis::Client::ClientFactory& client_factory_; const std::chrono::milliseconds buffer_timeout_; + NetworkFilters::Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_; }; Upstream::ClusterManager& cluster_manager_; diff --git a/source/extensions/filters/network/common/redis/BUILD b/source/extensions/filters/network/common/redis/BUILD index 1757c2a7b9b4..336c9f2b303a 100644 --- a/source/extensions/filters/network/common/redis/BUILD +++ b/source/extensions/filters/network/common/redis/BUILD @@ -46,6 +46,7 @@ envoy_cc_library( hdrs = ["client.h"], deps = [ ":codec_lib", + ":redis_command_stats_lib", "//include/envoy/upstream:cluster_manager_interface", ], ) @@ -58,6 +59,7 @@ envoy_cc_library( ":client_interface", ":codec_lib", "//include/envoy/router:router_interface", + "//include/envoy/stats:timespan", "//include/envoy/thread_local:thread_local_interface", "//include/envoy/upstream:cluster_manager_interface", "//source/common/buffer:buffer_lib", @@ -78,3 +80,18 @@ envoy_cc_library( ":codec_lib", ], ) + +envoy_cc_library( + name = "redis_command_stats_lib", + srcs = ["redis_command_stats.cc"], + hdrs = ["redis_command_stats.h"], + deps = [ + ":codec_interface", + ":supported_commands_lib", + "//include/envoy/stats:stats_interface", + "//include/envoy/stats:timespan", + "//source/common/common:to_lower_table_lib", + "//source/common/common:utility_lib", + "//source/common/stats:symbol_table_lib", + ], +) diff --git a/source/extensions/filters/network/common/redis/client.h b/source/extensions/filters/network/common/redis/client.h index e20df148fb82..3e5d80ce9208 100644 --- a/source/extensions/filters/network/common/redis/client.h +++ b/source/extensions/filters/network/common/redis/client.h @@ -5,6 +5,7 @@ #include "envoy/upstream/cluster_manager.h" #include "extensions/filters/network/common/redis/codec_impl.h" +#include "extensions/filters/network/common/redis/redis_command_stats.h" namespace Envoy { namespace Extensions { @@ -163,6 +164,11 @@ class Config { */ virtual uint32_t maxUpstreamUnknownConnections() const PURE; + /** + * @return when enabled, upstream cluster per-command statistics will be recorded. + */ + virtual bool enableCommandStats() const PURE; + /** * @return the read policy the proxy should use. */ @@ -184,7 +190,9 @@ class ClientFactory { * @return ClientPtr a new connection pool client. */ virtual ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, - const Config& config) PURE; + const Config& config, + const RedisCommandStatsSharedPtr& redis_command_stats, + Stats::Scope& scope) PURE; }; } // namespace Client diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 6f941d223ca0..acb7d64f771e 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -19,7 +19,8 @@ ConfigImpl::ConfigImpl( 3)), // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used // as the buffer is flushed on each request immediately. max_upstream_unknown_connections_( - PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)) { + PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)), + enable_command_stats_(config.enable_command_stats()) { switch (config.read_policy()) { case envoy::config::filter::network::redis_proxy::v2:: RedisProxy_ConnPoolSettings_ReadPolicy_MASTER: @@ -48,10 +49,11 @@ ConfigImpl::ConfigImpl( ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder, DecoderFactory& decoder_factory, - const Config& config) { - - std::unique_ptr client( - new ClientImpl(host, dispatcher, std::move(encoder), decoder_factory, config)); + const Config& config, + const RedisCommandStatsSharedPtr& redis_command_stats, + Stats::Scope& scope) { + auto client = std::make_unique(host, dispatcher, std::move(encoder), decoder_factory, + config, redis_command_stats, scope); client->connection_ = host->createConnection(dispatcher, nullptr, nullptr).connection_; client->connection_->addConnectionCallbacks(*client); client->connection_->addReadFilter(Network::ReadFilterSharedPtr{new UpstreamReadFilter(*client)}); @@ -61,11 +63,14 @@ ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatche } ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, - EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config) + EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config, + const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope) : host_(host), encoder_(std::move(encoder)), decoder_(decoder_factory.create(*this)), config_(config), - connect_or_op_timer_(dispatcher.createTimer([this]() -> void { onConnectOrOpTimeout(); })), - flush_timer_(dispatcher.createTimer([this]() -> void { flushBufferAndResetTimer(); })) { + connect_or_op_timer_(dispatcher.createTimer([this]() { onConnectOrOpTimeout(); })), + flush_timer_(dispatcher.createTimer([this]() { flushBufferAndResetTimer(); })), + time_source_(dispatcher.timeSource()), redis_command_stats_(redis_command_stats), + scope_(scope) { host->cluster().stats().upstream_cx_total_.inc(); host->stats().cx_total_.inc(); host->cluster().stats().upstream_cx_active_.inc(); @@ -94,7 +99,17 @@ PoolRequest* ClientImpl::makeRequest(const RespValue& request, PoolCallbacks& ca const bool empty_buffer = encoder_buffer_.length() == 0; - pending_requests_.emplace_back(*this, callbacks); + Stats::StatName command; + if (config_.enableCommandStats()) { + // Only lowercase command and get StatName if we enable command stats + command = redis_command_stats_->getCommandFromRequest(request); + redis_command_stats_->updateStatsTotal(scope_, command); + } else { + // If disabled, we use a placeholder stat name "unused" that is not used + command = redis_command_stats_->getUnusedStatName(); + } + + pending_requests_.emplace_back(*this, callbacks, command); encoder_->encode(request, encoder_buffer_); // If buffer is full, flush. If the buffer was empty before the request, start the timer. @@ -186,6 +201,14 @@ void ClientImpl::onRespValue(RespValuePtr&& value) { ASSERT(!pending_requests_.empty()); PendingRequest& request = pending_requests_.front(); const bool canceled = request.canceled_; + + if (config_.enableCommandStats()) { + bool success = !canceled && (value->type() != Common::Redis::RespType::Error); + redis_command_stats_->updateStats(scope_, request.command_, success); + request.command_request_timer_->complete(); + } + request.aggregate_request_timer_->complete(); + PoolCallbacks& callbacks = request.callbacks_; // We need to ensure the request is popped before calling the callback, since the callback might @@ -225,8 +248,15 @@ void ClientImpl::onRespValue(RespValuePtr&& value) { putOutlierEvent(Upstream::Outlier::Result::EXT_ORIGIN_REQUEST_SUCCESS); } -ClientImpl::PendingRequest::PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks) - : parent_(parent), callbacks_(callbacks) { +ClientImpl::PendingRequest::PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks, + Stats::StatName command) + : parent_(parent), callbacks_(callbacks), command_{command}, + aggregate_request_timer_(parent_.redis_command_stats_->createAggregateTimer( + parent_.scope_, parent_.time_source_)) { + if (parent_.config_.enableCommandStats()) { + command_request_timer_ = parent_.redis_command_stats_->createCommandTimer( + parent_.scope_, command_, parent_.time_source_); + } parent.host_->cluster().stats().upstream_rq_total_.inc(); parent.host_->stats().rq_total_.inc(); parent.host_->cluster().stats().upstream_rq_active_.inc(); @@ -248,9 +278,11 @@ void ClientImpl::PendingRequest::cancel() { ClientFactoryImpl ClientFactoryImpl::instance_; ClientPtr ClientFactoryImpl::create(Upstream::HostConstSharedPtr host, - Event::Dispatcher& dispatcher, const Config& config) { + Event::Dispatcher& dispatcher, const Config& config, + const RedisCommandStatsSharedPtr& redis_command_stats, + Stats::Scope& scope) { return ClientImpl::create(host, dispatcher, EncoderPtr{new EncoderImpl()}, decoder_factory_, - config); + config, redis_command_stats, scope); } } // namespace Client diff --git a/source/extensions/filters/network/common/redis/client_impl.h b/source/extensions/filters/network/common/redis/client_impl.h index 9522482fb022..a8ab3806eb7e 100644 --- a/source/extensions/filters/network/common/redis/client_impl.h +++ b/source/extensions/filters/network/common/redis/client_impl.h @@ -3,6 +3,7 @@ #include #include "envoy/config/filter/network/redis_proxy/v2/redis_proxy.pb.h" +#include "envoy/stats/timespan.h" #include "envoy/thread_local/thread_local.h" #include "envoy/upstream/cluster_manager.h" @@ -49,6 +50,7 @@ class ConfigImpl : public Config { uint32_t maxUpstreamUnknownConnections() const override { return max_upstream_unknown_connections_; } + bool enableCommandStats() const override { return enable_command_stats_; } ReadPolicy readPolicy() const override { return read_policy_; } private: @@ -58,6 +60,7 @@ class ConfigImpl : public Config { const uint32_t max_buffer_size_before_flush_; const std::chrono::milliseconds buffer_flush_timeout_; const uint32_t max_upstream_unknown_connections_; + const bool enable_command_stats_; ReadPolicy read_policy_; }; @@ -65,8 +68,13 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne public: static ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder, DecoderFactory& decoder_factory, - const Config& config); + const Config& config, + const RedisCommandStatsSharedPtr& redis_command_stats, + Stats::Scope& scope); + ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder, + DecoderFactory& decoder_factory, const Config& config, + const RedisCommandStatsSharedPtr& redis_command_stats, Stats::Scope& scope); ~ClientImpl() override; // Client @@ -94,7 +102,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne }; struct PendingRequest : public PoolRequest { - PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks); + PendingRequest(ClientImpl& parent, PoolCallbacks& callbacks, Stats::StatName stat_name); ~PendingRequest() override; // PoolRequest @@ -102,11 +110,12 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne ClientImpl& parent_; PoolCallbacks& callbacks_; + Stats::StatName command_; bool canceled_{}; + Stats::CompletableTimespanPtr aggregate_request_timer_; + Stats::CompletableTimespanPtr command_request_timer_; }; - ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder, - DecoderFactory& decoder_factory, const Config& config); void onConnectOrOpTimeout(); void onData(Buffer::Instance& data); void putOutlierEvent(Upstream::Outlier::Result result); @@ -129,13 +138,17 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne Event::TimerPtr connect_or_op_timer_; bool connected_{}; Event::TimerPtr flush_timer_; + Envoy::TimeSource& time_source_; + const RedisCommandStatsSharedPtr redis_command_stats_; + Stats::Scope& scope_; }; class ClientFactoryImpl : public ClientFactory { public: // RedisProxy::ConnPool::ClientFactoryImpl ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, - const Config& config) override; + const Config& config, const RedisCommandStatsSharedPtr& redis_command_stats, + Stats::Scope& scope) override; static ClientFactoryImpl instance_; diff --git a/source/extensions/filters/network/common/redis/redis_command_stats.cc b/source/extensions/filters/network/common/redis/redis_command_stats.cc new file mode 100644 index 000000000000..ce11df704bac --- /dev/null +++ b/source/extensions/filters/network/common/redis/redis_command_stats.cc @@ -0,0 +1,110 @@ +#include "extensions/filters/network/common/redis/redis_command_stats.h" + +#include "extensions/filters/network/common/redis/supported_commands.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Common { +namespace Redis { + +RedisCommandStats::RedisCommandStats(Stats::SymbolTable& symbol_table, const std::string& prefix) + : symbol_table_(symbol_table), stat_name_pool_(symbol_table_), + prefix_(stat_name_pool_.add(prefix)), + upstream_rq_time_(stat_name_pool_.add("upstream_rq_time")), + latency_(stat_name_pool_.add("latency")), total_(stat_name_pool_.add("total")), + success_(stat_name_pool_.add("success")), error_(stat_name_pool_.add("error")), + unused_metric_(stat_name_pool_.add("unused")), null_metric_(stat_name_pool_.add("null")), + unknown_metric_(stat_name_pool_.add("unknown")) { + // Note: Even if this is disabled, we track the upstream_rq_time. + // Create StatName for each Redis command. Note that we don't include Auth or Ping. + for (const std::string& command : + Extensions::NetworkFilters::Common::Redis::SupportedCommands::simpleCommands()) { + addCommandToPool(command); + } + for (const std::string& command : + Extensions::NetworkFilters::Common::Redis::SupportedCommands::evalCommands()) { + addCommandToPool(command); + } + for (const std::string& command : Extensions::NetworkFilters::Common::Redis::SupportedCommands:: + hashMultipleSumResultCommands()) { + addCommandToPool(command); + } + addCommandToPool(Extensions::NetworkFilters::Common::Redis::SupportedCommands::mget()); + addCommandToPool(Extensions::NetworkFilters::Common::Redis::SupportedCommands::mset()); +} + +void RedisCommandStats::addCommandToPool(const std::string& command_string) { + Stats::StatName command = stat_name_pool_.add(command_string); + stat_name_map_[command_string] = command; +} + +Stats::Counter& RedisCommandStats::counter(Stats::Scope& scope, + const Stats::StatNameVec& stat_names) { + const Stats::SymbolTable::StoragePtr storage_ptr = symbol_table_.join(stat_names); + Stats::StatName full_stat_name = Stats::StatName(storage_ptr.get()); + return scope.counterFromStatName(full_stat_name); +} + +Stats::Histogram& RedisCommandStats::histogram(Stats::Scope& scope, + const Stats::StatNameVec& stat_names) { + const Stats::SymbolTable::StoragePtr storage_ptr = symbol_table_.join(stat_names); + Stats::StatName full_stat_name = Stats::StatName(storage_ptr.get()); + return scope.histogramFromStatName(full_stat_name); +} + +Stats::CompletableTimespanPtr +RedisCommandStats::createCommandTimer(Stats::Scope& scope, Stats::StatName command, + Envoy::TimeSource& time_source) { + return std::make_unique>( + histogram(scope, {prefix_, command, latency_}), time_source); +} + +Stats::CompletableTimespanPtr +RedisCommandStats::createAggregateTimer(Stats::Scope& scope, Envoy::TimeSource& time_source) { + return std::make_unique>( + histogram(scope, {prefix_, upstream_rq_time_}), time_source); +} + +Stats::StatName RedisCommandStats::getCommandFromRequest(const RespValue& request) { + // Get command from RespValue + switch (request.type()) { + case RespType::Array: + return getCommandFromRequest(request.asArray().front()); + case RespType::Integer: + return unknown_metric_; + case RespType::Null: + return null_metric_; + default: + // Once we have a RespType::String we lowercase it and then look it up in our stat_name_map. + // If it does not exist, we return our unknown stat name. + std::string to_lower_command(request.asString()); + to_lower_table_.toLowerCase(to_lower_command); + + auto iter = stat_name_map_.find(to_lower_command); + if (iter != stat_name_map_.end()) { + return iter->second; + } else { + return unknown_metric_; + } + } +} + +void RedisCommandStats::updateStatsTotal(Stats::Scope& scope, Stats::StatName command) { + counter(scope, {prefix_, command, total_}).inc(); +} + +void RedisCommandStats::updateStats(Stats::Scope& scope, Stats::StatName command, + const bool success) { + if (success) { + counter(scope, {prefix_, command, success_}).inc(); + } else { + counter(scope, {prefix_, command, success_}).inc(); + } +} + +} // namespace Redis +} // namespace Common +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/source/extensions/filters/network/common/redis/redis_command_stats.h b/source/extensions/filters/network/common/redis/redis_command_stats.h new file mode 100644 index 000000000000..0ee2ce824c48 --- /dev/null +++ b/source/extensions/filters/network/common/redis/redis_command_stats.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include + +#include "envoy/stats/scope.h" +#include "envoy/stats/timespan.h" + +#include "common/common/to_lower_table.h" +#include "common/stats/symbol_table_impl.h" + +#include "extensions/filters/network/common/redis/codec.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Common { +namespace Redis { + +class RedisCommandStats { +public: + RedisCommandStats(Stats::SymbolTable& symbol_table, const std::string& prefix); + + // TODO (@FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg): Use Singleton to manage a single + // RedisCommandStats on the client factory so that it can be used for proxy filter, discovery and + // health check. + static std::shared_ptr + createRedisCommandStats(Stats::SymbolTable& symbol_table) { + return std::make_shared(symbol_table, "upstream_commands"); + } + + Stats::Counter& counter(Stats::Scope& scope, const Stats::StatNameVec& stat_names); + Stats::Histogram& histogram(Stats::Scope& scope, const Stats::StatNameVec& stat_names); + Stats::CompletableTimespanPtr createCommandTimer(Stats::Scope& scope, Stats::StatName command, + Envoy::TimeSource& time_source); + Stats::CompletableTimespanPtr createAggregateTimer(Stats::Scope& scope, + Envoy::TimeSource& time_source); + Stats::StatName getCommandFromRequest(const RespValue& request); + void updateStatsTotal(Stats::Scope& scope, Stats::StatName command); + void updateStats(Stats::Scope& scope, Stats::StatName command, const bool success); + Stats::StatName getUnusedStatName() { return unused_metric_; } + +private: + void addCommandToPool(const std::string& command_string); + + Stats::SymbolTable& symbol_table_; + Stats::StatNamePool stat_name_pool_; + StringMap stat_name_map_; + const Stats::StatName prefix_; + const Stats::StatName upstream_rq_time_; + const Stats::StatName latency_; + const Stats::StatName total_; + const Stats::StatName success_; + const Stats::StatName error_; + const Stats::StatName unused_metric_; + const Stats::StatName null_metric_; + const Stats::StatName unknown_metric_; + const ToLowerTable to_lower_table_; +}; +using RedisCommandStatsSharedPtr = std::shared_ptr; + +} // namespace Redis +} // namespace Common +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/redis_proxy/BUILD b/source/extensions/filters/network/redis_proxy/BUILD index bbc18dff95d2..2aee3f55e361 100644 --- a/source/extensions/filters/network/redis_proxy/BUILD +++ b/source/extensions/filters/network/redis_proxy/BUILD @@ -120,6 +120,7 @@ envoy_cc_library( "//source/extensions/filters/network:well_known_names", "//source/extensions/filters/network/common:factory_base_lib", "//source/extensions/filters/network/common/redis:codec_lib", + "//source/extensions/filters/network/common/redis:redis_command_stats_lib", "//source/extensions/filters/network/redis_proxy:command_splitter_lib", "//source/extensions/filters/network/redis_proxy:conn_pool_lib", "//source/extensions/filters/network/redis_proxy:proxy_filter_lib", diff --git a/source/extensions/filters/network/redis_proxy/config.cc b/source/extensions/filters/network/redis_proxy/config.cc index 5e3e4018260d..f14176ebe274 100644 --- a/source/extensions/filters/network/redis_proxy/config.cc +++ b/source/extensions/filters/network/redis_proxy/config.cc @@ -57,15 +57,19 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP } addUniqueClusters(unique_clusters, prefix_routes.catch_all_route()); + auto redis_command_stats = + Common::Redis::RedisCommandStats::createRedisCommandStats(context.scope().symbolTable()); + Upstreams upstreams; for (auto& cluster : unique_clusters) { Stats::ScopePtr stats_scope = context.scope().createScope(fmt::format("cluster.{}.redis_cluster", cluster)); + upstreams.emplace(cluster, std::make_shared( cluster, context.clusterManager(), Common::Redis::Client::ClientFactoryImpl::instance_, context.threadLocal(), proto_config.settings(), context.api(), - std::move(stats_scope))); + std::move(stats_scope), redis_command_stats)); } auto router = diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index a097924ea734..c8449f61dc84 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -23,10 +23,12 @@ InstanceImpl::InstanceImpl( const std::string& cluster_name, Upstream::ClusterManager& cm, Common::Redis::Client::ClientFactory& client_factory, ThreadLocal::SlotAllocator& tls, const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config, - Api::Api& api, Stats::ScopePtr&& stats_scope) + Api::Api& api, Stats::ScopePtr&& stats_scope, + const Common::Redis::RedisCommandStatsSharedPtr& redis_command_stats) : cm_(cm), client_factory_(client_factory), tls_(tls.allocateSlot()), config_(config), - api_(api), stats_scope_(std::move(stats_scope)), redis_cluster_stats_{REDIS_CLUSTER_STATS( - POOL_COUNTER(*stats_scope_))} { + api_(api), stats_scope_(std::move(stats_scope)), + redis_command_stats_(redis_command_stats), redis_cluster_stats_{REDIS_CLUSTER_STATS( + POOL_COUNTER(*stats_scope_))} { tls_->set([this, cluster_name]( Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr { return std::make_shared(*this, dispatcher, cluster_name); @@ -195,7 +197,8 @@ InstanceImpl::ThreadLocalPool::threadLocalActiveClient(Upstream::HostConstShared if (!client) { client = std::make_unique(*this); client->host_ = host; - client->redis_client_ = parent_.client_factory_.create(host, dispatcher_, parent_.config_); + client->redis_client_ = parent_.client_factory_.create( + host, dispatcher_, parent_.config_, parent_.redis_command_stats_, *parent_.stats_scope_); client->redis_client_->addConnectionCallbacks(*client); // TODO(hyang): should the auth command and readonly command be moved to the factory method? if (!auth_password_.empty()) { diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h index 91ea51f3e752..7731943b873f 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h @@ -51,7 +51,8 @@ class InstanceImpl : public Instance { const std::string& cluster_name, Upstream::ClusterManager& cm, Common::Redis::Client::ClientFactory& client_factory, ThreadLocal::SlotAllocator& tls, const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config, - Api::Api& api, Stats::ScopePtr&& stats_scope); + Api::Api& api, Stats::ScopePtr&& stats_scope, + const Common::Redis::RedisCommandStatsSharedPtr& redis_command_stats); // RedisProxy::ConnPool::Instance Common::Redis::Client::PoolRequest* makeRequest(const std::string& key, const Common::Redis::RespValue& request, @@ -131,6 +132,7 @@ class InstanceImpl : public Instance { Common::Redis::Client::ConfigImpl config_; Api::Api& api_; Stats::ScopePtr stats_scope_; + Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_; RedisClusterStats redis_cluster_stats_; }; diff --git a/source/extensions/health_checkers/redis/redis.cc b/source/extensions/health_checkers/redis/redis.cc index 9f130824639c..c1107e1a489f 100644 --- a/source/extensions/health_checkers/redis/redis.cc +++ b/source/extensions/health_checkers/redis/redis.cc @@ -22,7 +22,11 @@ RedisHealthChecker::RedisHealthChecker( RedisHealthChecker::RedisActiveHealthCheckSession::RedisActiveHealthCheckSession( RedisHealthChecker& parent, const Upstream::HostSharedPtr& host) - : ActiveHealthCheckSession(parent, host), parent_(parent) {} + : ActiveHealthCheckSession(parent, host), parent_(parent) { + redis_command_stats_ = + Extensions::NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats( + parent_.cluster_.info()->statsScope().symbolTable()); +} RedisHealthChecker::RedisActiveHealthCheckSession::~RedisActiveHealthCheckSession() { ASSERT(current_request_ == nullptr); @@ -51,7 +55,9 @@ void RedisHealthChecker::RedisActiveHealthCheckSession::onEvent(Network::Connect void RedisHealthChecker::RedisActiveHealthCheckSession::onInterval() { if (!client_) { - client_ = parent_.client_factory_.create(host_, parent_.dispatcher_, *this); + client_ = + parent_.client_factory_.create(host_, parent_.dispatcher_, *this, redis_command_stats_, + parent_.cluster_.info()->statsScope()); client_->addConnectionCallbacks(*this); } diff --git a/source/extensions/health_checkers/redis/redis.h b/source/extensions/health_checkers/redis/redis.h index 4c7cc320e01f..17c8e060e716 100644 --- a/source/extensions/health_checkers/redis/redis.h +++ b/source/extensions/health_checkers/redis/redis.h @@ -81,6 +81,7 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase { } uint32_t maxUpstreamUnknownConnections() const override { return 0; } + bool enableCommandStats() const override { return false; } // Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override; @@ -95,6 +96,7 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase { RedisHealthChecker& parent_; Extensions::NetworkFilters::Common::Redis::Client::ClientPtr client_; Extensions::NetworkFilters::Common::Redis::Client::PoolRequest* current_request_{}; + Extensions::NetworkFilters::Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_; }; enum class Type { Ping, Exists }; diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc index c8b941760555..78463582049a 100644 --- a/test/extensions/clusters/redis/redis_cluster_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -58,7 +58,9 @@ class RedisClusterTest : public testing::Test, // ClientFactory Extensions::NetworkFilters::Common::Redis::Client::ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher&, - const Extensions::NetworkFilters::Common::Redis::Client::Config&) override { + const Extensions::NetworkFilters::Common::Redis::Client::Config&, + const Extensions::NetworkFilters::Common::Redis::RedisCommandStatsSharedPtr&, + Stats::Scope&) override { EXPECT_EQ(22120, host->address()->ip()->port()); return Extensions::NetworkFilters::Common::Redis::Client::ClientPtr{ create_(host->address()->asString())}; diff --git a/test/extensions/filters/network/common/redis/client_impl_test.cc b/test/extensions/filters/network/common/redis/client_impl_test.cc index ca944879f516..8a283c67faf1 100644 --- a/test/extensions/filters/network/common/redis/client_impl_test.cc +++ b/test/extensions/filters/network/common/redis/client_impl_test.cc @@ -69,8 +69,11 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF EXPECT_CALL(*upstream_connection_, connect()); EXPECT_CALL(*upstream_connection_, noDelay(true)); + redis_command_stats_ = + Common::Redis::RedisCommandStats::createRedisCommandStats(stats_.symbolTable()); + client_ = ClientImpl::create(host_, dispatcher_, Common::Redis::EncoderPtr{encoder_}, *this, - *config_); + *config_, redis_command_stats_, stats_); EXPECT_EQ(1UL, host_->cluster_.stats_.upstream_cx_total_.value()); EXPECT_EQ(1UL, host_->stats_.cx_total_.value()); EXPECT_EQ(false, client_->active()); @@ -107,6 +110,9 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF Network::ReadFilterSharedPtr upstream_read_filter_; std::unique_ptr config_; ClientPtr client_; + Stats::IsolatedStoreImpl stats_; + Stats::ScopePtr stats_scope_; + Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_; }; TEST_F(RedisClientImplTest, BatchWithZeroBufferAndTimeout) { @@ -151,6 +157,7 @@ class ConfigBufferSizeGTSingleRequest : public Config { return std::chrono::milliseconds(1); } uint32_t maxUpstreamUnknownConnections() const override { return 0; } + bool enableCommandStats() const override { return false; } ReadPolicy readPolicy() const override { return ReadPolicy::Master; } }; @@ -465,6 +472,7 @@ class ConfigOutlierDisabled : public Config { } ReadPolicy readPolicy() const override { return ReadPolicy::Master; } uint32_t maxUpstreamUnknownConnections() const override { return 0; } + bool enableCommandStats() const override { return false; } }; TEST_F(RedisClientImplTest, OutlierDisabled) { @@ -871,7 +879,10 @@ TEST(RedisClientFactoryImplTest, Basic) { EXPECT_CALL(*host, createConnection_(_, _)).WillOnce(Return(conn_info)); NiceMock dispatcher; ConfigImpl config(createConnPoolSettings()); - ClientPtr client = factory.create(host, dispatcher, config); + Stats::IsolatedStoreImpl stats_; + auto redis_command_stats = + Common::Redis::RedisCommandStats::createRedisCommandStats(stats_.symbolTable()); + ClientPtr client = factory.create(host, dispatcher, config, redis_command_stats, stats_); client->close(); } diff --git a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc index 9dd249ec6002..f8f03045aa8e 100644 --- a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc +++ b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc @@ -66,11 +66,13 @@ class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client max_upstream_unknown_connections_reached_.value_++; })); + auto redis_command_stats = + Common::Redis::RedisCommandStats::createRedisCommandStats(store->symbolTable()); std::unique_ptr conn_pool_impl = std::make_unique(cluster_name_, cm_, *this, tls_, Common::Redis::Client::createConnPoolSettings( 20, hashtagging, true, max_unknown_conns, read_policy_), - api_, std::move(store)); + api_, std::move(store), redis_command_stats); // Set the authentication password for this connection pool. conn_pool_impl->tls_->getTyped().auth_password_ = auth_password_; conn_pool_ = std::move(conn_pool_impl); @@ -155,7 +157,9 @@ class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client // Common::Redis::Client::ClientFactory Common::Redis::Client::ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher&, - const Common::Redis::Client::Config&) override { + const Common::Redis::Client::Config&, + const Common::Redis::RedisCommandStatsSharedPtr&, + Stats::Scope&) override { return Common::Redis::Client::ClientPtr{create_(host)}; } diff --git a/test/extensions/health_checkers/redis/redis_test.cc b/test/extensions/health_checkers/redis/redis_test.cc index aa18cd8d8e63..b9c215853885 100644 --- a/test/extensions/health_checkers/redis/redis_test.cc +++ b/test/extensions/health_checkers/redis/redis_test.cc @@ -124,7 +124,9 @@ class RedisHealthCheckerTest Extensions::NetworkFilters::Common::Redis::Client::ClientPtr create(Upstream::HostConstSharedPtr, Event::Dispatcher&, - const Extensions::NetworkFilters::Common::Redis::Client::Config&) override { + const Extensions::NetworkFilters::Common::Redis::Client::Config&, + const Extensions::NetworkFilters::Common::Redis::RedisCommandStatsSharedPtr&, + Stats::Scope&) override { return Extensions::NetworkFilters::Common::Redis::Client::ClientPtr{create_()}; } @@ -166,6 +168,7 @@ class RedisHealthCheckerTest EXPECT_EQ(session->maxBufferSizeBeforeFlush(), 0); EXPECT_EQ(session->bufferFlushTimeoutInMs(), std::chrono::milliseconds(1)); EXPECT_EQ(session->maxUpstreamUnknownConnections(), 0); + EXPECT_FALSE(session->enableCommandStats()); session->onDeferredDeleteBase(); // This must be called to pass assertions in the destructor. }