diff --git a/docs/release_notes/v1.13.2.md b/docs/release_notes/v1.13.2.md index 6071c1ccfd8..f486483fe51 100644 --- a/docs/release_notes/v1.13.2.md +++ b/docs/release_notes/v1.13.2.md @@ -3,6 +3,7 @@ This update includes bug fixes: - [Fix incorrect content-length being sent to HTTP published message](#fix-incorrect-content-length-being-sent-to-http-published-message) +- [Fix PubSub in-flight messages from being cancelled during blocked shutdown](#fix-pubsub-in-flight-messages-from-being-cancelled-during-blocked-shutdown) ## Fix incorrect content-length being sent to HTTP published message @@ -22,3 +23,21 @@ This content-length may not match the final message length sent to the applicati ### Solution Filter out the content-length header from the PubSub broker message before sending it to the application's HTTP server. + +## Fix PubSub in-flight messages from being cancelled during blocked shutdown. + +### Problem + +During a blocked shutdown, all in-flight PubSub messages are cancelled and cannot be processed by the application or the applications processes status discarded. + +### Impact + +During shutdown, in-flight messages which are currently being processed by the application cannot be completed. + +### Root cause + +During shutdown, all publish calls to the application where being cancelled. + +### Solution + +PubSub mesaages are now published to the application in an isolated routine, which is not cancelled during blocked shutdown. diff --git a/pkg/diagnostics/component_monitoring.go b/pkg/diagnostics/component_monitoring.go index a8e516dacae..bca537f49c0 100644 --- a/pkg/diagnostics/component_monitoring.go +++ b/pkg/diagnostics/component_monitoring.go @@ -174,6 +174,8 @@ func (c *componentMetrics) Init(appID, namespace string) error { diagUtils.NewMeasureView(c.bulkPubsubEventIngressCount, []tag.Key{appIDKey, componentKey, namespaceKey, processStatusKey, topicKey}, view.Count()), diagUtils.NewMeasureView(c.pubsubEgressLatency, []tag.Key{appIDKey, componentKey, namespaceKey, successKey, topicKey}, defaultLatencyDistribution), diagUtils.NewMeasureView(c.pubsubEgressCount, []tag.Key{appIDKey, componentKey, namespaceKey, successKey, topicKey}, view.Count()), + diagUtils.NewMeasureView(c.bulkPubsubEgressLatency, []tag.Key{appIDKey, componentKey, namespaceKey, successKey, topicKey}, defaultLatencyDistribution), + diagUtils.NewMeasureView(c.bulkPubsubEgressCount, []tag.Key{appIDKey, componentKey, namespaceKey, successKey, topicKey}, view.Count()), diagUtils.NewMeasureView(c.inputBindingLatency, []tag.Key{appIDKey, componentKey, namespaceKey, successKey}, defaultLatencyDistribution), diagUtils.NewMeasureView(c.inputBindingCount, []tag.Key{appIDKey, componentKey, namespaceKey, successKey}, view.Count()), diagUtils.NewMeasureView(c.outputBindingLatency, []tag.Key{appIDKey, componentKey, namespaceKey, operationKey, successKey}, defaultLatencyDistribution), diff --git a/pkg/runtime/processor/pubsub/topics.go b/pkg/runtime/processor/pubsub/topics.go index bce86cb24ef..320dea9cbc5 100644 --- a/pkg/runtime/processor/pubsub/topics.go +++ b/pkg/runtime/processor/pubsub/topics.go @@ -176,7 +176,7 @@ func (p *pubsub) subscribeTopic(name, topic string, route compstore.TopicRouteEl path: routePath, pubsub: name, } - policyRunner := resiliency.NewRunner[any](ctx, policyDef) + policyRunner := resiliency.NewRunner[any](context.Background(), policyDef) _, err = policyRunner(func(ctx context.Context) (any, error) { var pErr error if p.isHTTP { diff --git a/tests/integration/framework/process/daprd/daprd.go b/tests/integration/framework/process/daprd/daprd.go index 06c9c9185f2..610cdbac3b3 100644 --- a/tests/integration/framework/process/daprd/daprd.go +++ b/tests/integration/framework/process/daprd/daprd.go @@ -27,6 +27,7 @@ import ( "time" "github.com/google/uuid" + "github.com/prometheus/common/expfmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -260,7 +261,7 @@ func (d *Daprd) GRPCClient(t *testing.T, ctx context.Context) rtv1.DaprClient { } //nolint:testifylint -func (d *Daprd) RegistedComponents(t *assert.CollectT, ctx context.Context) []*rtv1.RegisteredComponents { +func (d *Daprd) RegistedComponents(t assert.TestingT, ctx context.Context) []*rtv1.RegisteredComponents { url := fmt.Sprintf("http://%s/v1.0/metadata", d.HTTPAddress()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if !assert.NoError(t, err) { @@ -291,8 +292,12 @@ func (d *Daprd) AppPort() int { return d.appPort } +func (d *Daprd) ipPort(port int) string { + return "127.0.0.1:" + strconv.Itoa(port) +} + func (d *Daprd) AppAddress() string { - return "127.0.0.1:" + strconv.Itoa(d.AppPort()) + return d.ipPort(d.AppPort()) } func (d *Daprd) GRPCPort() int { @@ -300,7 +305,7 @@ func (d *Daprd) GRPCPort() int { } func (d *Daprd) GRPCAddress() string { - return "127.0.0.1:" + strconv.Itoa(d.GRPCPort()) + return d.ipPort(d.GRPCPort()) } func (d *Daprd) HTTPPort() int { @@ -308,7 +313,7 @@ func (d *Daprd) HTTPPort() int { } func (d *Daprd) HTTPAddress() string { - return "127.0.0.1:" + strconv.Itoa(d.HTTPPort()) + return d.ipPort(d.HTTPPort()) } func (d *Daprd) InternalGRPCPort() int { @@ -316,7 +321,7 @@ func (d *Daprd) InternalGRPCPort() int { } func (d *Daprd) InternalGRPCAddress() string { - return "127.0.0.1:" + strconv.Itoa(d.InternalGRPCPort()) + return d.ipPort(d.InternalGRPCPort()) } func (d *Daprd) PublicPort() int { @@ -327,6 +332,41 @@ func (d *Daprd) MetricsPort() int { return d.metricsPort } +func (d *Daprd) MetricsAddress() string { + return d.ipPort(d.MetricsPort()) +} + func (d *Daprd) ProfilePort() int { return d.profilePort } + +// Returns a subset of metrics scraped from the metrics endpoint +func (d *Daprd) Metrics(t *testing.T, ctx context.Context) map[string]float64 { + t.Helper() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://%s/metrics", d.MetricsAddress()), nil) + require.NoError(t, err) + + resp, err := d.httpClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + // Extract the metrics + parser := expfmt.TextParser{} + metricFamilies, err := parser.TextToMetricFamilies(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + + metrics := make(map[string]float64) + for _, mf := range metricFamilies { + for _, m := range mf.GetMetric() { + key := mf.GetName() + for _, l := range m.GetLabel() { + key += "|" + l.GetName() + ":" + l.GetValue() + } + metrics[key] = m.GetCounter().GetValue() + } + } + + return metrics +} diff --git a/tests/integration/suite/daprd/shutdown/block/app/app.go b/tests/integration/suite/daprd/shutdown/block/app/app.go new file mode 100644 index 00000000000..53962a4c326 --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/block/app/app.go @@ -0,0 +1,18 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + _ "github.com/dapr/dapr/tests/integration/suite/daprd/shutdown/block/app/pubsub" +) diff --git a/tests/integration/suite/daprd/shutdown/block/app/pubsub/bulk.go b/tests/integration/suite/daprd/shutdown/block/app/pubsub/bulk.go new file mode 100644 index 00000000000..53ee19ca658 --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/block/app/pubsub/bulk.go @@ -0,0 +1,178 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pubsub + +import ( + "context" + "fmt" + "io" + "net/http" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(bulk)) +} + +// bulk ensures that in-flight bulk messages will continue to be processed when +// a SIGTERM is received by daprd. +type bulk struct { + daprd *daprd.Daprd + appHealth atomic.Bool + inPublish chan struct{} + returnPublish chan struct{} + recvRoute2 chan struct{} +} + +func (b *bulk) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + b.appHealth.Store(true) + b.inPublish = make(chan struct{}) + b.returnPublish = make(chan struct{}) + b.recvRoute2 = make(chan struct{}) + + bulkResp := []byte(`{"statuses":[{"entryId":"1","status":"SUCCESS"}]}`) + + handler := http.NewServeMux() + handler.HandleFunc("/dapr/subscribe", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + io.WriteString(w, `[ + {"pubsubname":"foo","topic":"abc","route":"route1"}, + {"pubsubname":"foo","topic":"def","route":"route2"} +]`) + }) + handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + if b.appHealth.Load() { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusServiceUnavailable) + }) + handler.HandleFunc("/route1", func(w http.ResponseWriter, r *http.Request) { + close(b.inPublish) + <-b.returnPublish + w.Write(bulkResp) + }) + handler.HandleFunc("/route2", func(w http.ResponseWriter, r *http.Request) { + select { + case b.recvRoute2 <- struct{}{}: + case <-r.Context().Done(): + } + w.Write(bulkResp) + }) + app := prochttp.New(t, prochttp.WithHandler(handler)) + + b.daprd = daprd.New(t, + daprd.WithDaprBlockShutdownDuration("180s"), + daprd.WithAppPort(app.Port()), + daprd.WithAppHealthCheck(true), + daprd.WithAppHealthCheckPath("/healthz"), + daprd.WithAppHealthProbeInterval(1), + daprd.WithAppHealthProbeThreshold(1), + daprd.WithResourceFiles(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo +spec: + type: pubsub.in-memory + version: v1 +`)) + + return []framework.Option{ + framework.WithProcesses(app), + } +} + +func (b *bulk) Run(t *testing.T, ctx context.Context) { + b.daprd.Run(t, ctx) + b.daprd.WaitUntilRunning(t, ctx) + + client := b.daprd.GRPCClient(t, ctx) + + assert.Len(t, b.daprd.RegistedComponents(t, ctx), 1) + + _, err := client.BulkPublishEventAlpha1(ctx, &rtv1.BulkPublishRequest{ + PubsubName: "foo", + Topic: "abc", + Entries: []*rtv1.BulkPublishRequestEntry{ + {EntryId: "1", Event: []byte(`{"status":"completed"}`), ContentType: "application/json"}, + }, + }) + require.NoError(t, err) + + select { + case <-b.inPublish: + case <-time.After(time.Second * 5): + assert.Fail(t, "did not receive publish event") + } + + daprdStopped := make(chan struct{}) + go func() { + b.daprd.Cleanup(t) + close(daprdStopped) + }() + t.Cleanup(func() { + select { + case <-daprdStopped: + case <-time.After(time.Second * 5): + assert.Fail(t, "daprd did not exit in time") + } + }) + +LOOP: + for { + _, err := client.BulkPublishEventAlpha1(ctx, &rtv1.BulkPublishRequest{ + PubsubName: "foo", + Topic: "def", + Entries: []*rtv1.BulkPublishRequestEntry{ + {EntryId: "1", Event: []byte(`{"status":"completed"}`), ContentType: "application/json"}, + }, + }) + require.NoError(t, err) + select { + case <-b.recvRoute2: + case <-time.After(time.Second / 2): + break LOOP + } + } + + close(b.returnPublish) + + egressMetric := fmt.Sprintf("dapr_component_pubsub_egress_bulk_count|app_id:%s|component:foo|namespace:|success:true|topic:abc", b.daprd.AppID()) + ingressMetric := fmt.Sprintf("dapr_component_pubsub_ingress_count|app_id:%s|component:foo|namespace:|process_status:success|topic:abc", b.daprd.AppID()) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + metrics := b.daprd.Metrics(t, ctx) + assert.Equal(c, 1, int(metrics[egressMetric])) + assert.Equal(c, 1, int(metrics[ingressMetric])) + }, time.Second*5, time.Millisecond*10) + + b.appHealth.Store(false) +} diff --git a/tests/integration/suite/daprd/shutdown/block/app/pubsub/single.go b/tests/integration/suite/daprd/shutdown/block/app/pubsub/single.go new file mode 100644 index 00000000000..42a4d92d3fc --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/block/app/pubsub/single.go @@ -0,0 +1,170 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pubsub + +import ( + "context" + "fmt" + "io" + "net/http" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(single)) +} + +// single ensures that in-flight messages will continue to be processed when a +// SIGTERM is received by daprd. +type single struct { + daprd *daprd.Daprd + appHealth atomic.Bool + inPublish chan struct{} + returnPublish chan struct{} + recvRoute2 chan struct{} +} + +func (s *single) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + s.appHealth.Store(true) + s.inPublish = make(chan struct{}) + s.returnPublish = make(chan struct{}) + s.recvRoute2 = make(chan struct{}) + + handler := http.NewServeMux() + handler.HandleFunc("/dapr/subscribe", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + io.WriteString(w, `[ + {"pubsubname":"foo","topic":"abc","route":"route1"}, + {"pubsubname":"foo","topic":"def","route":"route2"} +]`) + }) + handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + if s.appHealth.Load() { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusServiceUnavailable) + }) + handler.HandleFunc("/route1", func(w http.ResponseWriter, r *http.Request) { + close(s.inPublish) + <-s.returnPublish + }) + handler.HandleFunc("/route2", func(w http.ResponseWriter, r *http.Request) { + select { + case s.recvRoute2 <- struct{}{}: + case <-r.Context().Done(): + } + }) + app := prochttp.New(t, prochttp.WithHandler(handler)) + + s.daprd = daprd.New(t, + daprd.WithDaprBlockShutdownDuration("180s"), + daprd.WithAppPort(app.Port()), + daprd.WithAppHealthCheck(true), + daprd.WithAppHealthCheckPath("/healthz"), + daprd.WithAppHealthProbeInterval(1), + daprd.WithAppHealthProbeThreshold(1), + daprd.WithResourceFiles(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo +spec: + type: pubsub.in-memory + version: v1 +`)) + + return []framework.Option{ + framework.WithProcesses(app), + } +} + +func (s *single) Run(t *testing.T, ctx context.Context) { + s.daprd.Run(t, ctx) + s.daprd.WaitUntilRunning(t, ctx) + + client := s.daprd.GRPCClient(t, ctx) + + assert.Len(t, s.daprd.RegistedComponents(t, ctx), 1) + + _, err := client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "abc", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + + select { + case <-s.inPublish: + case <-time.After(time.Second * 5): + assert.Fail(t, "did not receive publish event") + } + + daprdStopped := make(chan struct{}) + go func() { + s.daprd.Cleanup(t) + close(daprdStopped) + }() + t.Cleanup(func() { + select { + case <-daprdStopped: + case <-time.After(time.Second * 5): + assert.Fail(t, "daprd did not exit in time") + } + }) + +LOOP: + for { + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "def", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + select { + case <-s.recvRoute2: + case <-time.After(time.Second / 2): + break LOOP + } + } + + close(s.returnPublish) + + egressMetric := fmt.Sprintf("dapr_component_pubsub_egress_count|app_id:%s|component:foo|namespace:|success:true|topic:abc", s.daprd.AppID()) + ingressMetric := fmt.Sprintf("dapr_component_pubsub_ingress_count|app_id:%s|component:foo|namespace:|process_status:success|topic:abc", s.daprd.AppID()) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + metrics := s.daprd.Metrics(t, ctx) + assert.Equal(c, 1, int(metrics[egressMetric])) + assert.Equal(c, 1, int(metrics[ingressMetric])) + }, time.Second*5, time.Millisecond*10) + + s.appHealth.Store(false) +}