Skip to content

Commit

Permalink
[1.13] Fix incorrect content-length being sent to HTTP published mess…
Browse files Browse the repository at this point in the history
…age (dapr#7661)

* Fix incorrect content-length being sent to HTTP published message

PR dapr#7537 reverse revered the change which removed the content-length
from being set on HTTP headers on sent messages. We still need to remove
the content-length from messages from pubsub subscriptions as the pubsub
may report a message with a content-length which does not actually
match the size of the delivered message to the app.

content-length is only removed on HTTP published messages.

Change should be backported to 1.13

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds grpc app subscriber to content-length tests

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds tests for content-length gRPC subscribed app

Signed-off-by: joshvanl <me@joshvanl.dev>

* framework/socket: skip if test is windows

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fix socket runtime GOOS import

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds v1.13.2.md release notes

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL authored Mar 28, 2024
1 parent 1b3c480 commit 5beb6df
Show file tree
Hide file tree
Showing 25 changed files with 757 additions and 162 deletions.
24 changes: 24 additions & 0 deletions docs/release_notes/v1.13.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Dapr 1.13.2

This update includes bug fixes:

- [Fix incorrect content-length being sent to HTTP published message](#fix-incorrect-content-length-being-sent-to-http-published-message)

## Fix incorrect content-length being sent to HTTP published message

### Problem

Published messages to HTTP application server report a content-length error and are not processed.

### Impact

PubSub messages from some PubSubs cannot be processed by the application.

### Root cause

The content-length reported by the PubSub broker message was copied to the message sent to the application's HTTP server.
This content-length may not match the final message length sent to the application's HTTP server, resulting in the mesage being rejected.

### Solution

Filter out the content-length header from the PubSub broker message before sending it to the application's HTTP server.
6 changes: 6 additions & 0 deletions pkg/messaging/v1/invoke_method_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (imr *InvokeMethodRequest) WithHTTPExtension(verb string, querystring strin
// WithCustomHTTPMetadata applies a metadata map to a InvokeMethodRequest.
func (imr *InvokeMethodRequest) WithCustomHTTPMetadata(md map[string]string) *InvokeMethodRequest {
for k, v := range md {
if strings.EqualFold(k, ContentLengthHeader) {
// There is no use of the original payload's content-length because
// the entire data is already in the cloud event.
continue
}

if imr.r.GetMetadata() == nil {
imr.r.Metadata = make(map[string]*internalv1pb.ListStringValue)
}
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/framework/process/daprd/daprd.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func New(t *testing.T, fopts ...Option) *Daprd {
if opts.blockShutdownDuration != nil {
args = append(args, "--dapr-block-shutdown-duration="+*opts.blockShutdownDuration)
}
if opts.controlPlaneTrustDomain != nil {
args = append(args, "--control-plane-trust-domain="+*opts.controlPlaneTrustDomain)
}

ns := "default"
if opts.namespace != nil {
Expand Down
14 changes: 14 additions & 0 deletions tests/integration/framework/process/daprd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/dapr/dapr/tests/integration/framework/process/exec"
"github.com/dapr/dapr/tests/integration/framework/process/logline"
"github.com/dapr/dapr/tests/integration/framework/socket"
)

// Option is a function that configures the dapr process.
Expand Down Expand Up @@ -55,6 +56,7 @@ type options struct {
disableK8sSecretStore *bool
gracefulShutdownSeconds *int
blockShutdownDuration *string
controlPlaneTrustDomain *string
}

func WithExecOptions(execOptions ...exec.Option) Option {
Expand Down Expand Up @@ -246,3 +248,15 @@ func WithDaprBlockShutdownDuration(duration string) Option {
o.blockShutdownDuration = &duration
}
}

func WithControlPlaneTrustDomain(trustDomain string) Option {
return func(o *options) {
o.controlPlaneTrustDomain = &trustDomain
}
}

func WithSocket(t *testing.T, socket *socket.Socket) Option {
return WithExecOptions(exec.WithEnvVars(t,
"DAPR_COMPONENTS_SOCKETS_FOLDER", socket.Directory(),
))
}
14 changes: 8 additions & 6 deletions tests/integration/framework/process/grpc/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ func New(t *testing.T, fopts ...Option) *App {
return &App{
GRPC: procgrpc.New(t, append(opts.grpcopts, procgrpc.WithRegister(func(s *grpc.Server) {
srv := &server{
onInvokeFn: opts.onInvokeFn,
onTopicEventFn: opts.onTopicEventFn,
listTopicSubFn: opts.listTopicSubFn,
listInputBindFn: opts.listInputBindFn,
onBindingEventFn: opts.onBindingEventFn,
healthCheckFn: opts.healthCheckFn,
onInvokeFn: opts.onInvokeFn,
onTopicEventFn: opts.onTopicEventFn,
onBulkTopicEventFn: opts.onBulkTopicEventFn,
listTopicSubFn: opts.listTopicSubFn,
listInputBindFn: opts.listInputBindFn,
onBindingEventFn: opts.onBindingEventFn,
healthCheckFn: opts.healthCheckFn,
}
rtv1.RegisterAppCallbackServer(s, srv)
rtv1.RegisterAppCallbackAlphaServer(s, srv)
rtv1.RegisterAppCallbackHealthCheckServer(s, srv)
if opts.withRegister != nil {
opts.withRegister(s)
Expand Down
23 changes: 15 additions & 8 deletions tests/integration/framework/process/grpc/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (

// options contains the options for running a GRPC server in integration tests.
type options struct {
grpcopts []procgrpc.Option
withRegister func(s *grpc.Server)
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
listTopicSubFn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
grpcopts []procgrpc.Option
withRegister func(s *grpc.Server)
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
onBulkTopicEventFn func(context.Context, *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error)
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
listTopicSubFn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
}

func WithGRPCOptions(opts ...procgrpc.Option) func(*options) {
Expand All @@ -48,6 +49,12 @@ func WithOnTopicEventFn(fn func(context.Context, *rtv1.TopicEventRequest) (*rtv1
}
}

func WithOnBulkTopicEventFn(fn func(context.Context, *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error)) func(*options) {
return func(opts *options) {
opts.onBulkTopicEventFn = fn
}
}

func WithOnInvokeFn(fn func(ctx context.Context, in *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)) func(*options) {
return func(opts *options) {
opts.onInvokeFn = fn
Expand Down
20 changes: 14 additions & 6 deletions tests/integration/framework/process/grpc/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
)

type server struct {
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
listTopicSubFn func(context.Context, *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
onBulkTopicEventFn func(context.Context, *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error)
listTopicSubFn func(context.Context, *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
}

func (s *server) OnInvoke(ctx context.Context, in *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error) {
Expand Down Expand Up @@ -66,6 +67,13 @@ func (s *server) OnTopicEvent(ctx context.Context, in *rtv1.TopicEventRequest) (
return s.onTopicEventFn(ctx, in)
}

func (s *server) OnBulkTopicEventAlpha1(ctx context.Context, in *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error) {
if s.onBulkTopicEventFn == nil {
return new(rtv1.TopicEventBulkResponse), nil
}
return s.onBulkTopicEventFn(ctx, in)
}

func (s *server) HealthCheck(ctx context.Context, e *emptypb.Empty) (*rtv1.HealthCheckResponse, error) {
if s.healthCheckFn == nil {
return new(rtv1.HealthCheckResponse), nil
Expand Down
18 changes: 17 additions & 1 deletion tests/integration/framework/process/grpc/subscriber/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,21 @@ limitations under the License.

package subscriber

import (
"context"

"google.golang.org/protobuf/types/known/emptypb"

rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
)

// options contains the options for running a pubsub subscriber gRPC server app.
type options struct{}
type options struct {
listTopicSubFn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
}

func WithListTopicSubscriptions(fn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)) func(*options) {
return func(opts *options) {
opts.listTopicSubFn = fn
}
}
49 changes: 44 additions & 5 deletions tests/integration/framework/process/grpc/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (
type Option func(*options)

type Subscriber struct {
app *app.App
inCh chan *rtv1.TopicEventRequest
closeCh chan struct{}
app *app.App
inCh chan *rtv1.TopicEventRequest
inBulkCh chan *rtv1.TopicEventBulkRequest
closeCh chan struct{}
}

func New(t *testing.T, fopts ...Option) *Subscriber {
Expand All @@ -43,12 +44,15 @@ func New(t *testing.T, fopts ...Option) *Subscriber {
}

inCh := make(chan *rtv1.TopicEventRequest, 100)
inBulkCh := make(chan *rtv1.TopicEventBulkRequest, 100)
closeCh := make(chan struct{})

return &Subscriber{
inCh: inCh,
closeCh: closeCh,
inCh: inCh,
inBulkCh: inBulkCh,
closeCh: closeCh,
app: app.New(t,
app.WithListTopicSubscriptions(opts.listTopicSubFn),
app.WithOnTopicEventFn(func(ctx context.Context, in *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error) {
select {
case inCh <- in:
Expand All @@ -57,6 +61,21 @@ func New(t *testing.T, fopts ...Option) *Subscriber {
}
return new(rtv1.TopicEventResponse), nil
}),
app.WithOnBulkTopicEventFn(func(ctx context.Context, in *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error) {
select {
case inBulkCh <- in:
case <-ctx.Done():
case <-closeCh:
}
stats := make([]*rtv1.TopicEventBulkResponseEntry, len(in.GetEntries()))
for i, e := range in.GetEntries() {
stats[i] = &rtv1.TopicEventBulkResponseEntry{
EntryId: e.GetEntryId(),
Status: rtv1.TopicEventResponse_SUCCESS,
}
}
return &rtv1.TopicEventBulkResponse{Statuses: stats}, nil
}),
),
}
}
Expand Down Expand Up @@ -92,11 +111,31 @@ func (s *Subscriber) Receive(t *testing.T, ctx context.Context) *rtv1.TopicEvent
}
}

func (s *Subscriber) ReceiveBulk(t *testing.T, ctx context.Context) *rtv1.TopicEventBulkRequest {
t.Helper()

ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()

select {
case <-ctx.Done():
require.Fail(t, "timed out waiting for event response")
return nil
case in := <-s.inBulkCh:
return in
}
}

func (s *Subscriber) AssertEventChanLen(t *testing.T, l int) {
t.Helper()
assert.Len(t, s.inCh, l)
}

func (s *Subscriber) AssertBulkEventChanLen(t *testing.T, l int) {
t.Helper()
assert.Len(t, s.inBulkCh, l)
}

func (s *Subscriber) ExpectPublishReceive(t *testing.T, ctx context.Context, daprd *daprd.Daprd, req *rtv1.PublishEventRequest) {
t.Helper()
_, err := daprd.GRPCClient(t, ctx).PublishEvent(ctx, req)
Expand Down
Loading

0 comments on commit 5beb6df

Please sign in to comment.