20
20
#include " envoy/registry/registry.h"
21
21
#include " envoy/server/factory_context.h"
22
22
#include " envoy/singleton/manager.h"
23
+ #include " envoy/thread_local/thread_local.h"
23
24
#include " extensions/common/metadata_object.h"
24
25
#include " parser/parser.h"
25
26
#include " source/common/grpc/common.h"
@@ -313,7 +314,7 @@ struct Context : public Singleton::Instance {
313
314
314
315
using ContextSharedPtr = std::shared_ptr<Context>;
315
316
316
- SINGLETON_MANAGER_REGISTRATION (Context )
317
+ SINGLETON_MANAGER_REGISTRATION (istio_stats_filter_context )
317
318
318
319
using google::api::expr::runtime::CelValue;
319
320
@@ -418,21 +419,21 @@ struct MetricOverrides : public Logger::Loggable<Logger::Id::filter> {
418
419
// periodically to replace the current scope.
419
420
//
420
421
// The replaced stats scope is deleted gracefully after a minimum of 1s delay
421
- // for two reasons:
422
- //
423
- // 1. Stats flushing is asynchronous and the data may be lost if not flushed
424
- // before the deletion (see stats_flush_interval).
425
- //
426
- // 2. The implementation avoids locking by releasing a raw pointer to workers.
427
- // When the rotation happens on the main, the raw pointer may still be in-use
428
- // by workers for a short duration.
422
+ // because of stats flushing is asynchronous and the data may be lost if not
423
+ // flushed before the deletion (see stats_flush_interval).
429
424
class RotatingScope : public Logger ::Loggable<Logger::Id::filter> {
430
425
public:
431
426
RotatingScope (Server::Configuration::FactoryContext& factory_context, uint64_t rotate_interval_ms,
432
427
uint64_t delete_interval_ms)
433
428
: parent_scope_(factory_context.scope()), active_scope_(parent_scope_.createScope(" " )),
434
- raw_scope_ (active_scope_.get()), rotate_interval_ms_(rotate_interval_ms),
429
+ tls_scope_ (factory_context.serverFactoryContext().threadLocal()),
430
+ rotate_interval_ms_(rotate_interval_ms),
435
431
delete_interval_ms_(delete_interval_ms) {
432
+
433
+ tls_scope_.set ([&scope = *active_scope_](Event::Dispatcher&){
434
+ return std::make_shared<TlsCachedScope>(scope);
435
+ });
436
+
436
437
if (rotate_interval_ms_ > 0 ) {
437
438
ASSERT (delete_interval_ms_ < rotate_interval_ms_);
438
439
ASSERT (delete_interval_ms_ >= 1000 );
@@ -452,36 +453,58 @@ class RotatingScope : public Logger::Loggable<Logger::Id::filter> {
452
453
delete_timer_.reset ();
453
454
}
454
455
}
455
- Stats::Scope* scope () { return raw_scope_. load () ; }
456
+ Stats::Scope& scope () { return tls_scope_-> _scope ; }
456
457
457
458
private:
459
+ struct TlsCachedScope :ThreadLocal::ThreadLocalObject{
460
+ TlsCachedScope (Stats::Scope& scope):_scope(scope) {};
461
+ std::reference_wrapper<Stats::Scope> _scope;
462
+ };
463
+
458
464
void onRotate () {
459
465
ENVOY_LOG (info, " Rotating active Istio stats scope after {}ms." , rotate_interval_ms_);
460
466
draining_scope_ = active_scope_;
461
- delete_timer_->enableTimer (std::chrono::milliseconds (delete_interval_ms_));
462
467
active_scope_ = parent_scope_.createScope (" " );
463
- raw_scope_.store (active_scope_.get ());
464
- rotate_timer_->enableTimer (std::chrono::milliseconds (rotate_interval_ms_));
468
+ tls_scope_.runOnAllThreads (
469
+ [&scope = *active_scope_](OptRef<TlsCachedScope> tls_cache) {
470
+ tls_cache->_scope = scope;
471
+ },
472
+ // Start the delete and rotate timer after the new scope has been propagated to all worker threads.
473
+ // The RotatingScope instance can go away before the dispatcher has a chance to execute the callback
474
+ // and the still_alive shared_ptr will be deallocated when the current instance is deallocated.
475
+ // We rely on a weak_ptr to still_alive flag to determine if the instance is still valid.
476
+ [this , maybe_still_alive = std::weak_ptr<bool >(still_alive_)]() -> void {
477
+ if (!maybe_still_alive.expired ()){
478
+ delete_timer_->enableTimer (std::chrono::milliseconds (delete_interval_ms_));
479
+ rotate_timer_->enableTimer (std::chrono::milliseconds (rotate_interval_ms_));
480
+ }
481
+ });
465
482
}
466
483
void onDelete () {
467
484
ENVOY_LOG (info, " Deleting draining Istio stats scope after {}ms." , delete_interval_ms_);
468
485
draining_scope_.reset ();
469
486
}
487
+
470
488
Stats::Scope& parent_scope_;
471
489
Stats::ScopeSharedPtr active_scope_;
472
- std::atomic<Stats::Scope*> raw_scope_;
473
490
Stats::ScopeSharedPtr draining_scope_{nullptr };
491
+ ThreadLocal::TypedSlot<TlsCachedScope> tls_scope_;
474
492
const uint64_t rotate_interval_ms_;
475
493
const uint64_t delete_interval_ms_;
476
494
Event::TimerPtr rotate_timer_{nullptr };
477
495
Event::TimerPtr delete_timer_{nullptr };
496
+
497
+ // A sentinel shared_ptr used for keeping track of whether the RotatingContext is still alive.
498
+ // It is only held by a weak reference in the callback that will be invoked after the new active
499
+ // scope has been propagated to all worker threads.
500
+ std::shared_ptr<bool > still_alive_{std::make_shared<bool >(true )};
478
501
};
479
502
480
503
struct Config : public Logger ::Loggable<Logger::Id::filter> {
481
504
Config (const stats::PluginConfig& proto_config,
482
505
Server::Configuration::FactoryContext& factory_context)
483
506
: context_(factory_context.serverFactoryContext().singletonManager().getTyped<Context>(
484
- SINGLETON_MANAGER_REGISTERED_NAME (Context ),
507
+ SINGLETON_MANAGER_REGISTERED_NAME (istio_stats_filter_context ),
485
508
[&factory_context] {
486
509
return std::make_shared<Context>(
487
510
factory_context.serverFactoryContext ().scope ().symbolTable (),
@@ -514,7 +537,7 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
514
537
break ;
515
538
}
516
539
if (proto_config.metrics_size () > 0 || proto_config.definitions_size () > 0 ) {
517
- metric_overrides_ = std::make_unique<MetricOverrides>(context_, scope ()-> symbolTable ());
540
+ metric_overrides_ = std::make_unique<MetricOverrides>(context_, scope (). symbolTable ());
518
541
for (const auto & definition : proto_config.definitions ()) {
519
542
const auto & it = context_->all_metrics_ .find (definition.name ());
520
543
if (it != context_->all_metrics_ .end ()) {
@@ -698,12 +721,12 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
698
721
return ;
699
722
}
700
723
auto new_tags = parent_.metric_overrides_ ->overrideTags (metric, tags, expr_values_);
701
- Stats::Utility::counterFromStatNames (* parent_.scope (),
724
+ Stats::Utility::counterFromStatNames (parent_.scope (),
702
725
{parent_.context_ ->stat_namespace_ , metric}, new_tags)
703
726
.add (amount);
704
727
return ;
705
728
}
706
- Stats::Utility::counterFromStatNames (* parent_.scope (),
729
+ Stats::Utility::counterFromStatNames (parent_.scope (),
707
730
{parent_.context_ ->stat_namespace_ , metric}, tags)
708
731
.add (amount);
709
732
}
@@ -717,12 +740,12 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
717
740
}
718
741
auto new_tags = parent_.metric_overrides_ ->overrideTags (metric, tags, expr_values_);
719
742
Stats::Utility::histogramFromStatNames (
720
- * parent_.scope (), {parent_.context_ ->stat_namespace_ , metric}, unit, new_tags)
743
+ parent_.scope (), {parent_.context_ ->stat_namespace_ , metric}, unit, new_tags)
721
744
.recordValue (value);
722
745
return ;
723
746
}
724
747
Stats::Utility::histogramFromStatNames (
725
- * parent_.scope (), {parent_.context_ ->stat_namespace_ , metric}, unit, tags)
748
+ parent_.scope (), {parent_.context_ ->stat_namespace_ , metric}, unit, tags)
726
749
.recordValue (value);
727
750
}
728
751
@@ -735,17 +758,17 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
735
758
switch (metric.type_ ) {
736
759
case MetricOverrides::MetricType::Counter:
737
760
Stats::Utility::counterFromStatNames (
738
- * parent_.scope (), {parent_.context_ ->stat_namespace_ , metric.name_ }, tags)
761
+ parent_.scope (), {parent_.context_ ->stat_namespace_ , metric.name_ }, tags)
739
762
.add (amount);
740
763
break ;
741
764
case MetricOverrides::MetricType::Histogram:
742
765
Stats::Utility::histogramFromStatNames (
743
- * parent_.scope (), {parent_.context_ ->stat_namespace_ , metric.name_ },
766
+ parent_.scope (), {parent_.context_ ->stat_namespace_ , metric.name_ },
744
767
Stats::Histogram::Unit::Bytes, tags)
745
768
.recordValue (amount);
746
769
break ;
747
770
case MetricOverrides::MetricType::Gauge:
748
- Stats::Utility::gaugeFromStatNames (* parent_.scope (),
771
+ Stats::Utility::gaugeFromStatNames (parent_.scope (),
749
772
{parent_.context_ ->stat_namespace_ , metric.name_ },
750
773
Stats::Gauge::ImportMode::Accumulate, tags)
751
774
.set (amount);
@@ -769,14 +792,14 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
769
792
tags.push_back ({context_->tag_ , context_->istio_version_ .empty () ? context_->unknown_
770
793
: context_->istio_version_ });
771
794
772
- Stats::Utility::gaugeFromStatNames (* scope (),
795
+ Stats::Utility::gaugeFromStatNames (scope (),
773
796
{context_->stat_namespace_ , context_->istio_build_ },
774
797
Stats::Gauge::ImportMode::Accumulate, tags)
775
798
.set (1 );
776
799
}
777
800
778
801
Reporter reporter () const { return reporter_; }
779
- Stats::Scope* scope () { return scope_.scope (); }
802
+ Stats::Scope& scope () { return scope_.scope (); }
780
803
781
804
ContextSharedPtr context_;
782
805
RotatingScope scope_;
@@ -795,7 +818,7 @@ class IstioStatsFilter : public Http::PassThroughFilter,
795
818
public Network::ConnectionCallbacks {
796
819
public:
797
820
IstioStatsFilter (ConfigSharedPtr config)
798
- : config_(config), context_(*config->context_), pool_(config->scope ()-> symbolTable()),
821
+ : config_(config), context_(*config->context_), pool_(config->scope (). symbolTable()),
799
822
stream_(*config_, pool_) {
800
823
tags_.reserve (25 );
801
824
switch (config_->reporter ()) {
0 commit comments