Skip to content

Commit

Permalink
[1.13] Pubsub: Execute in-flight subscribed messages in background co…
Browse files Browse the repository at this point in the history
…ntext. (dapr#7660)

* Pubsub: Execute in-flight subscribed messages in background context.

When block shutdown is enabled, Dapr will block the shutdown sequence
for a configurable amount of time or until the app becomes unhealthy.
During this time the out-bound Dapr APIs remain available but all
in-bound requests are blocked. Currently, in-flight PubSub messages are
canceled when Dapr receives a SIGTERM, which can result in lost
messages.

Patch updates the pubsub subscribe message handler policy runner to
execute sending the message to the app channel in a background context.
This ensures that when Dapr cancels the Subscription due to a blocking
shutdown, the in-flight message is not effected.

Enables the `dapr_component_pubsub_egress_bulk_count` and
`dapr_component_pubsub_egress_bulk_latency` metrics for bulk pubsub
egress which were previously not enabled, as they are used to test that
an in-flight message is correctly sent after Dapr has begun to shutdown.

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds Metrics request to daprd process

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds v1.13.2.md patch notes

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL authored Apr 3, 2024
1 parent 5beb6df commit 121f5e4
Show file tree
Hide file tree
Showing 7 changed files with 433 additions and 6 deletions.
19 changes: 19 additions & 0 deletions docs/release_notes/v1.13.2.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
This update includes bug fixes:

- [Fix incorrect content-length being sent to HTTP published message](#fix-incorrect-content-length-being-sent-to-http-published-message)
- [Fix PubSub in-flight messages from being cancelled during blocked shutdown](#fix-pubsub-in-flight-messages-from-being-cancelled-during-blocked-shutdown)

## Fix incorrect content-length being sent to HTTP published message

Expand All @@ -22,3 +23,21 @@ This content-length may not match the final message length sent to the applicati
### Solution

Filter out the content-length header from the PubSub broker message before sending it to the application's HTTP server.

## Fix PubSub in-flight messages from being cancelled during blocked shutdown.

### Problem

During a blocked shutdown, all in-flight PubSub messages are cancelled and cannot be processed by the application or the applications processes status discarded.

### Impact

During shutdown, in-flight messages which are currently being processed by the application cannot be completed.

### Root cause

During shutdown, all publish calls to the application where being cancelled.

### Solution

PubSub mesaages are now published to the application in an isolated routine, which is not cancelled during blocked shutdown.
2 changes: 2 additions & 0 deletions pkg/diagnostics/component_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func (c *componentMetrics) Init(appID, namespace string) error {
diagUtils.NewMeasureView(c.bulkPubsubEventIngressCount, []tag.Key{appIDKey, componentKey, namespaceKey, processStatusKey, topicKey}, view.Count()),
diagUtils.NewMeasureView(c.pubsubEgressLatency, []tag.Key{appIDKey, componentKey, namespaceKey, successKey, topicKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(c.pubsubEgressCount, []tag.Key{appIDKey, componentKey, namespaceKey, successKey, topicKey}, view.Count()),
diagUtils.NewMeasureView(c.bulkPubsubEgressLatency, []tag.Key{appIDKey, componentKey, namespaceKey, successKey, topicKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(c.bulkPubsubEgressCount, []tag.Key{appIDKey, componentKey, namespaceKey, successKey, topicKey}, view.Count()),
diagUtils.NewMeasureView(c.inputBindingLatency, []tag.Key{appIDKey, componentKey, namespaceKey, successKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(c.inputBindingCount, []tag.Key{appIDKey, componentKey, namespaceKey, successKey}, view.Count()),
diagUtils.NewMeasureView(c.outputBindingLatency, []tag.Key{appIDKey, componentKey, namespaceKey, operationKey, successKey}, defaultLatencyDistribution),
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/processor/pubsub/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (p *pubsub) subscribeTopic(name, topic string, route compstore.TopicRouteEl
path: routePath,
pubsub: name,
}
policyRunner := resiliency.NewRunner[any](ctx, policyDef)
policyRunner := resiliency.NewRunner[any](context.Background(), policyDef)
_, err = policyRunner(func(ctx context.Context) (any, error) {
var pErr error
if p.isHTTP {
Expand Down
50 changes: 45 additions & 5 deletions tests/integration/framework/process/daprd/daprd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -260,7 +261,7 @@ func (d *Daprd) GRPCClient(t *testing.T, ctx context.Context) rtv1.DaprClient {
}

//nolint:testifylint
func (d *Daprd) RegistedComponents(t *assert.CollectT, ctx context.Context) []*rtv1.RegisteredComponents {
func (d *Daprd) RegistedComponents(t assert.TestingT, ctx context.Context) []*rtv1.RegisteredComponents {
url := fmt.Sprintf("http://%s/v1.0/metadata", d.HTTPAddress())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if !assert.NoError(t, err) {
Expand Down Expand Up @@ -291,32 +292,36 @@ func (d *Daprd) AppPort() int {
return d.appPort
}

func (d *Daprd) ipPort(port int) string {
return "127.0.0.1:" + strconv.Itoa(port)
}

func (d *Daprd) AppAddress() string {
return "127.0.0.1:" + strconv.Itoa(d.AppPort())
return d.ipPort(d.AppPort())
}

func (d *Daprd) GRPCPort() int {
return d.grpcPort
}

func (d *Daprd) GRPCAddress() string {
return "127.0.0.1:" + strconv.Itoa(d.GRPCPort())
return d.ipPort(d.GRPCPort())
}

func (d *Daprd) HTTPPort() int {
return d.httpPort
}

func (d *Daprd) HTTPAddress() string {
return "127.0.0.1:" + strconv.Itoa(d.HTTPPort())
return d.ipPort(d.HTTPPort())
}

func (d *Daprd) InternalGRPCPort() int {
return d.internalGRPCPort
}

func (d *Daprd) InternalGRPCAddress() string {
return "127.0.0.1:" + strconv.Itoa(d.InternalGRPCPort())
return d.ipPort(d.InternalGRPCPort())
}

func (d *Daprd) PublicPort() int {
Expand All @@ -327,6 +332,41 @@ func (d *Daprd) MetricsPort() int {
return d.metricsPort
}

func (d *Daprd) MetricsAddress() string {
return d.ipPort(d.MetricsPort())
}

func (d *Daprd) ProfilePort() int {
return d.profilePort
}

// Returns a subset of metrics scraped from the metrics endpoint
func (d *Daprd) Metrics(t *testing.T, ctx context.Context) map[string]float64 {
t.Helper()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://%s/metrics", d.MetricsAddress()), nil)
require.NoError(t, err)

resp, err := d.httpClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

// Extract the metrics
parser := expfmt.TextParser{}
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

metrics := make(map[string]float64)
for _, mf := range metricFamilies {
for _, m := range mf.GetMetric() {
key := mf.GetName()
for _, l := range m.GetLabel() {
key += "|" + l.GetName() + ":" + l.GetValue()
}
metrics[key] = m.GetCounter().GetValue()
}
}

return metrics
}
18 changes: 18 additions & 0 deletions tests/integration/suite/daprd/shutdown/block/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package app

import (
_ "github.com/dapr/dapr/tests/integration/suite/daprd/shutdown/block/app/pubsub"
)
178 changes: 178 additions & 0 deletions tests/integration/suite/daprd/shutdown/block/app/pubsub/bulk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pubsub

import (
"context"
"fmt"
"io"
"net/http"
"runtime"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
prochttp "github.com/dapr/dapr/tests/integration/framework/process/http"
"github.com/dapr/dapr/tests/integration/suite"
)

func init() {
suite.Register(new(bulk))
}

// bulk ensures that in-flight bulk messages will continue to be processed when
// a SIGTERM is received by daprd.
type bulk struct {
daprd *daprd.Daprd
appHealth atomic.Bool
inPublish chan struct{}
returnPublish chan struct{}
recvRoute2 chan struct{}
}

func (b *bulk) Setup(t *testing.T) []framework.Option {
if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows which relies on unix process signals")
}

b.appHealth.Store(true)
b.inPublish = make(chan struct{})
b.returnPublish = make(chan struct{})
b.recvRoute2 = make(chan struct{})

bulkResp := []byte(`{"statuses":[{"entryId":"1","status":"SUCCESS"}]}`)

handler := http.NewServeMux()
handler.HandleFunc("/dapr/subscribe", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
io.WriteString(w, `[
{"pubsubname":"foo","topic":"abc","route":"route1"},
{"pubsubname":"foo","topic":"def","route":"route2"}
]`)
})
handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
if b.appHealth.Load() {
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusServiceUnavailable)
})
handler.HandleFunc("/route1", func(w http.ResponseWriter, r *http.Request) {
close(b.inPublish)
<-b.returnPublish
w.Write(bulkResp)
})
handler.HandleFunc("/route2", func(w http.ResponseWriter, r *http.Request) {
select {
case b.recvRoute2 <- struct{}{}:
case <-r.Context().Done():
}
w.Write(bulkResp)
})
app := prochttp.New(t, prochttp.WithHandler(handler))

b.daprd = daprd.New(t,
daprd.WithDaprBlockShutdownDuration("180s"),
daprd.WithAppPort(app.Port()),
daprd.WithAppHealthCheck(true),
daprd.WithAppHealthCheckPath("/healthz"),
daprd.WithAppHealthProbeInterval(1),
daprd.WithAppHealthProbeThreshold(1),
daprd.WithResourceFiles(`
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: foo
spec:
type: pubsub.in-memory
version: v1
`))

return []framework.Option{
framework.WithProcesses(app),
}
}

func (b *bulk) Run(t *testing.T, ctx context.Context) {
b.daprd.Run(t, ctx)
b.daprd.WaitUntilRunning(t, ctx)

client := b.daprd.GRPCClient(t, ctx)

assert.Len(t, b.daprd.RegistedComponents(t, ctx), 1)

_, err := client.BulkPublishEventAlpha1(ctx, &rtv1.BulkPublishRequest{
PubsubName: "foo",
Topic: "abc",
Entries: []*rtv1.BulkPublishRequestEntry{
{EntryId: "1", Event: []byte(`{"status":"completed"}`), ContentType: "application/json"},
},
})
require.NoError(t, err)

select {
case <-b.inPublish:
case <-time.After(time.Second * 5):
assert.Fail(t, "did not receive publish event")
}

daprdStopped := make(chan struct{})
go func() {
b.daprd.Cleanup(t)
close(daprdStopped)
}()
t.Cleanup(func() {
select {
case <-daprdStopped:
case <-time.After(time.Second * 5):
assert.Fail(t, "daprd did not exit in time")
}
})

LOOP:
for {
_, err := client.BulkPublishEventAlpha1(ctx, &rtv1.BulkPublishRequest{
PubsubName: "foo",
Topic: "def",
Entries: []*rtv1.BulkPublishRequestEntry{
{EntryId: "1", Event: []byte(`{"status":"completed"}`), ContentType: "application/json"},
},
})
require.NoError(t, err)
select {
case <-b.recvRoute2:
case <-time.After(time.Second / 2):
break LOOP
}
}

close(b.returnPublish)

egressMetric := fmt.Sprintf("dapr_component_pubsub_egress_bulk_count|app_id:%s|component:foo|namespace:|success:true|topic:abc", b.daprd.AppID())
ingressMetric := fmt.Sprintf("dapr_component_pubsub_ingress_count|app_id:%s|component:foo|namespace:|process_status:success|topic:abc", b.daprd.AppID())

assert.EventuallyWithT(t, func(c *assert.CollectT) {
metrics := b.daprd.Metrics(t, ctx)
assert.Equal(c, 1, int(metrics[egressMetric]))
assert.Equal(c, 1, int(metrics[ingressMetric]))
}, time.Second*5, time.Millisecond*10)

b.appHealth.Store(false)
}
Loading

0 comments on commit 121f5e4

Please sign in to comment.