Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type config struct {
// TODO: run loadtests using these flags to determine optimal default values.
MaxIdleProxyConns int `split_words:"true" default:"1000"`
MaxIdleProxyConnsPerHost int `split_words:"true" default:"100"`

ProbeTimeout int `split_words:"true" default:"300"`
ProbeFrequency int `split_words:"true" default:"200"`
}

func main() {
Expand Down Expand Up @@ -158,7 +161,8 @@ func main() {
// transport so that throttler probe connections can be reused after probing
// (via keep-alive) to send real requests, avoiding needing an extra
// reconnect for the first request after the probe succeeds.
logger.Debugf("MaxIdleProxyConns: %d, MaxIdleProxyConnsPerHost: %d", env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost)
logger.Debugf("MaxIdleProxyConns: %d, MaxIdleProxyConnsPerHost: %d, ProbeTimeout: %dms, ProbeFrequency: %dms",
env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost, env.ProbeTimeout, env.ProbeFrequency)
transport := pkgnet.NewProxyAutoTransport(env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost)

// Fetch networking configuration to determine whether EnableMeshPodAddressability
Expand Down Expand Up @@ -191,7 +195,9 @@ func main() {

// Start throttler.
throttler := activatornet.NewThrottler(ctx, env.PodIP)
go throttler.Run(ctx, transport, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode)
probeTimeout := time.Duration(env.ProbeTimeout) * time.Millisecond
probeFrequency := time.Duration(env.ProbeFrequency) * time.Millisecond
go throttler.Run(ctx, transport, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode, probeTimeout, probeFrequency)

// Set up our config store
configMapWatcher := configmapinformer.NewInformedWatcher(kubeClient, system.Namespace())
Expand Down
30 changes: 11 additions & 19 deletions pkg/activator/net/revision_backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ func (d dests) MarshalLogObject(enc zapcore.ObjectEncoder) error {
return nil
}

const (
probeTimeout time.Duration = 300 * time.Millisecond
defaultProbeFrequency time.Duration = 200 * time.Millisecond
)

// revisionWatcher watches the podIPs and ClusterIP of the service for a revision. It implements the logic
// to supply revisionDestsUpdate events on updateCh
type revisionWatcher struct {
Expand Down Expand Up @@ -131,13 +126,15 @@ type revisionWatcher struct {
// cover the revision's ready conditions, for example when an exec probe is
// being used.
enableProbeOptimisation bool

probeTimeout time.Duration
}

func newRevisionWatcher(ctx context.Context, rev types.NamespacedName, protocol pkgnet.ProtocolType,
updateCh chan<- revisionDestsUpdate, destsCh chan dests,
transport http.RoundTripper, serviceLister corev1listers.ServiceLister,
usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode,
enableProbeOptimisation bool,
enableProbeOptimisation bool, probeTimeout time.Duration,
logger *zap.SugaredLogger,
) *revisionWatcher {
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -155,6 +152,7 @@ func newRevisionWatcher(ctx context.Context, rev types.NamespacedName, protocol
usePassthroughLb: usePassthroughLb,
meshMode: meshMode,
enableProbeOptimisation: enableProbeOptimisation,
probeTimeout: probeTimeout,
logger: logger.With(zap.String(logkey.Key, rev.String())),
}
}
Expand Down Expand Up @@ -219,7 +217,7 @@ func (rw *revisionWatcher) getDest() (string, error) {
}

func (rw *revisionWatcher) probeClusterIP(dest string) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), probeTimeout)
ctx, cancel := context.WithTimeout(context.Background(), rw.probeTimeout)
defer cancel()
match, _, err := rw.probe(ctx, dest)
return match, err
Expand Down Expand Up @@ -248,7 +246,7 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.Set[string]) (succee
}

// Context used for our probe requests.
ctx, cancel := context.WithTimeout(context.Background(), probeTimeout)
ctx, cancel := context.WithTimeout(context.Background(), rw.probeTimeout)
defer cancel()

// Empty errgroup is used as cancellation on first error is not desired, all probes should be
Expand Down Expand Up @@ -459,19 +457,12 @@ type revisionBackendsManager struct {
usePassthroughLb bool
meshMode netcfg.MeshCompatibilityMode
logger *zap.SugaredLogger
probeTimeout time.Duration
probeFrequency time.Duration
}

// NewRevisionBackendsManager returns a new RevisionBackendsManager with default
// probe time out.
func newRevisionBackendsManager(ctx context.Context, tr http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode) *revisionBackendsManager {
return newRevisionBackendsManagerWithProbeFrequency(ctx, tr, usePassthroughLb, meshMode, defaultProbeFrequency)
}

// newRevisionBackendsManagerWithProbeFrequency creates a fully spec'd RevisionBackendsManager.
func newRevisionBackendsManagerWithProbeFrequency(ctx context.Context, tr http.RoundTripper,
usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode, probeFreq time.Duration,
) *revisionBackendsManager {
// newRevisionBackendsManager returns a new RevisionBackendsManager with configurable probe settings.
func newRevisionBackendsManager(ctx context.Context, tr http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode, probeTimeout, probeFreq time.Duration) *revisionBackendsManager {
rbm := &revisionBackendsManager{
ctx: ctx,
revisionLister: revisioninformer.Get(ctx).Lister(),
Expand All @@ -482,6 +473,7 @@ func newRevisionBackendsManagerWithProbeFrequency(ctx context.Context, tr http.R
usePassthroughLb: usePassthroughLb,
meshMode: meshMode,
logger: logging.FromContext(ctx),
probeTimeout: probeTimeout,
probeFrequency: probeFreq,
}
endpointsInformer := endpointsinformer.Get(ctx)
Expand Down Expand Up @@ -565,7 +557,7 @@ func (rbm *revisionBackendsManager) getOrCreateRevisionWatcher(revID types.Names
}

destsCh := make(chan dests)
rw := newRevisionWatcher(rbm.ctx, revID, rev.GetProtocol(), rbm.updateCh, destsCh, rbm.transport, rbm.serviceLister, rbm.usePassthroughLb, rbm.meshMode, enableProbeOptimisation, rbm.logger)
rw := newRevisionWatcher(rbm.ctx, revID, rev.GetProtocol(), rbm.updateCh, destsCh, rbm.transport, rbm.serviceLister, rbm.usePassthroughLb, rbm.meshMode, enableProbeOptimisation, rbm.probeTimeout, rbm.logger)
rbm.revisionWatchers[revID] = rw
go rw.run(rbm.probeFrequency)
return rw, nil
Expand Down
15 changes: 9 additions & 6 deletions pkg/activator/net/revision_backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ const (
testNamespace = "test-namespace"
testRevision = "test-revision"

probeFreq = 50 * time.Millisecond
updateTimeout = 16 * probeFreq
probeFreq = 50 * time.Millisecond
updateTimeout = 16 * probeFreq
defaultProbeTimeout = 300 * time.Millisecond

meshErrorStatusCode = http.StatusServiceUnavailable
)
Expand Down Expand Up @@ -555,6 +556,7 @@ func TestRevisionWatcher(t *testing.T) {
tc.usePassthroughLb, // usePassthroughLb
tc.meshMode,
true,
defaultProbeTimeout,
logger)
rw.clusterIPHealthy = tc.initialClusterIPState

Expand Down Expand Up @@ -993,7 +995,7 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) {
t.Fatal("Failed to start informers:", err)
}

rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, rt, false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
rbm := newRevisionBackendsManager(ctx, rt, false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq)
defer func() {
cancel()
waitInformers()
Expand Down Expand Up @@ -1456,7 +1458,7 @@ func TestRevisionDeleted(t *testing.T) {
ri.Informer().GetIndexer().Add(rev)

fakeRT := activatortest.FakeRoundTripper{}
rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq)
defer func() {
cancel()
waitInformers()
Expand Down Expand Up @@ -1512,7 +1514,7 @@ func TestServiceDoesNotExist(t *testing.T) {
}},
},
}
rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq)
defer func() {
cancel()
waitInformers()
Expand Down Expand Up @@ -1576,7 +1578,7 @@ func TestServiceMoreThanOne(t *testing.T) {
}},
},
}
rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq)
rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq)
defer func() {
cancel()
waitInformers()
Expand Down Expand Up @@ -1855,6 +1857,7 @@ func TestProbePodIPs(t *testing.T) {
enableProbeOptimisation: input.enableProbeOptimization,
meshMode: input.meshMode,
healthyPods: input.healthy,
probeTimeout: defaultProbeTimeout,
}

healthy, noop, notMesh, err := rw.probePodIPs(input.current.ready, input.current.notReady)
Expand Down
5 changes: 3 additions & 2 deletions pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sort"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -497,8 +498,8 @@ func NewThrottler(ctx context.Context, ipAddr string) *Throttler {
}

// Run starts the throttler and blocks until the context is done.
func (t *Throttler) Run(ctx context.Context, probeTransport http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode) {
rbm := newRevisionBackendsManager(ctx, probeTransport, usePassthroughLb, meshMode)
func (t *Throttler) Run(ctx context.Context, probeTransport http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode, probeTimeout, probeFrequency time.Duration) {
rbm := newRevisionBackendsManager(ctx, probeTransport, usePassthroughLb, meshMode, probeTimeout, probeFrequency)
// Update channel is closed when ctx is done.
t.run(rbm.updates())
}
Expand Down