Skip to content

Commit

Permalink
Add outbox support (dapr#6755)
Browse files Browse the repository at this point in the history
* add outbox

Signed-off-by: yaron2 <schneider.yaron@live.com>

* linter part 1

Signed-off-by: yaron2 <schneider.yaron@live.com>

* slice linter

Signed-off-by: yaron2 <schneider.yaron@live.com>

* linter

Signed-off-by: yaron2 <schneider.yaron@live.com>

* use ordinance for encryption slice

Signed-off-by: yaron2 <schneider.yaron@live.com>

* change policy return value and uuid

Signed-off-by: yaron2 <schneider.yaron@live.com>

* add resiliency for delete

Signed-off-by: yaron2 <schneider.yaron@live.com>

* add lint ignore

Signed-off-by: yaron2 <schneider.yaron@live.com>

* export extractCloudEventProperty

---------

Signed-off-by: yaron2 <schneider.yaron@live.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
yaron2 and artursouza authored Aug 4, 2023
1 parent e2819b6 commit 6cc2710
Show file tree
Hide file tree
Showing 21 changed files with 1,564 additions and 27 deletions.
20 changes: 16 additions & 4 deletions pkg/grpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,8 +852,8 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu
return &emptypb.Empty{}, err
}

operations := make([]state.TransactionalStateOperation, len(in.Operations))
for i, inputReq := range in.Operations {
operations := make([]state.TransactionalStateOperation, 0, len(in.Operations))
for _, inputReq := range in.Operations {
req := inputReq.Request

hasEtag, etag := extractEtag(req)
Expand Down Expand Up @@ -882,7 +882,7 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu
}
}

operations[i] = setReq
operations = append(operations, setReq)

case state.OperationDelete:
delReq := state.DeleteRequest{
Expand All @@ -900,7 +900,7 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu
}
}

operations[i] = delReq
operations = append(operations, delReq)

default:
err := status.Errorf(codes.Unimplemented, messages.ErrNotSupportedStateOperation, inputReq.OperationType)
Expand Down Expand Up @@ -936,6 +936,18 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu
}
}

outboxEnabled := a.pubsubAdapter.Outbox().Enabled(in.StoreName)
if outboxEnabled {
trs, err := a.pubsubAdapter.Outbox().PublishInternal(ctx, in.StoreName, operations, a.UniversalAPI.AppID)
if err != nil {
err = status.Errorf(codes.Internal, messages.ErrPublishOutbox, err.Error())
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}

operations = append(operations, trs...)
}

start := time.Now()
policyRunner := resiliency.NewRunner[struct{}](ctx,
a.resiliency.ComponentOutboundPolicy(in.StoreName, resiliency.Statestore),
Expand Down
6 changes: 4 additions & 2 deletions pkg/grpc/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2715,7 +2715,8 @@ func TestExecuteStateTransaction(t *testing.T) {
AppID: "fakeAPI",
CompStore: compStore,
},
resiliency: resiliency.New(nil),
resiliency: resiliency.New(nil),
pubsubAdapter: &daprt.MockPubSubAdapter{},
}
server, lis := startDaprAPIServer(fakeAPI, "")
defer server.Stop()
Expand Down Expand Up @@ -3268,7 +3269,8 @@ func TestStateAPIWithResiliency(t *testing.T) {
CompStore: compStore,
Resiliency: res,
},
resiliency: res,
resiliency: res,
pubsubAdapter: &daprt.MockPubSubAdapter{},
}
server, lis := startDaprAPIServer(fakeAPI, "")
defer server.Stop()
Expand Down
23 changes: 19 additions & 4 deletions pkg/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1846,8 +1846,8 @@ func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) {
}
}

operations := make([]state.TransactionalStateOperation, len(req.Operations))
for i, o := range req.Operations {
operations := make([]state.TransactionalStateOperation, 0, len(req.Operations))
for _, o := range req.Operations {
switch o.Operation {
case string(state.OperationUpsert):
var upsertReq state.SetRequest
Expand All @@ -1865,7 +1865,7 @@ func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) {
log.Debug(err)
return
}
operations[i] = upsertReq
operations = append(operations, upsertReq)
case string(state.OperationDelete):
var delReq state.DeleteRequest
err := mapstructure.Decode(o.Request, &delReq)
Expand All @@ -1882,7 +1882,7 @@ func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) {
log.Debug(msg)
return
}
operations[i] = delReq
operations = append(operations, delReq)
default:
msg := NewErrorResponse(
"ERR_NOT_SUPPORTED_STATE_OPERATION",
Expand Down Expand Up @@ -1924,6 +1924,21 @@ func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) {
}
}

outboxEnabled := a.pubsubAdapter.Outbox().Enabled(storeName)
if outboxEnabled {
trs, err := a.pubsubAdapter.Outbox().PublishInternal(reqCtx, storeName, operations, a.universal.AppID)
if err != nil {
msg := NewErrorResponse(
"ERR_PUBLISH_OUTBOX",
fmt.Sprintf(messages.ErrPublishOutbox, err.Error()))
fasthttpRespond(reqCtx, fasthttpResponseWithError(nethttp.StatusInternalServerError, msg))
log.Debug(msg)
return
}

operations = append(operations, trs...)
}

start := time.Now()
policyRunner := resiliency.NewRunner[any](reqCtx,
a.resiliency.ComponentOutboundPolicy(storeName, resiliency.Statestore),
Expand Down
4 changes: 3 additions & 1 deletion pkg/http/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3341,6 +3341,7 @@ func TestV1StateEndpoints(t *testing.T) {
CompStore: compStore,
Resiliency: rc,
},
pubsubAdapter: &daprt.MockPubSubAdapter{},
}
fakeServer.StartServer(testAPI.constructStateEndpoints(), nil)

Expand Down Expand Up @@ -4498,7 +4499,8 @@ func TestV1TransactionEndpoints(t *testing.T) {
universal: &universalapi.UniversalAPI{
CompStore: compStore,
},
resiliency: resiliency.New(nil),
resiliency: resiliency.New(nil),
pubsubAdapter: &daprt.MockPubSubAdapter{},
}
fakeServer.StartServer(testAPI.constructStateEndpoints(), nil)
fakeBodyObject := map[string]interface{}{"data": "fakeData"}
Expand Down
1 change: 1 addition & 0 deletions pkg/messages/predefined.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
ErrPubsubUnmarshal = "error when unmarshaling the request for topic %s pubsub %s: %s"
ErrPubsubMarshal = "error marshaling events to bytes for topic %s pubsub %s: %s"
ErrPubsubGetSubscriptions = "unable to get app subscriptions %s"
ErrPublishOutbox = "error while publishing outbox message: %s"

// AppChannel.
ErrChannelNotFound = "app channel is not initialized"
Expand Down
29 changes: 29 additions & 0 deletions pkg/outbox/outbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package outbox

import (
"context"

"github.com/dapr/components-contrib/state"
"github.com/dapr/dapr/pkg/apis/components/v1alpha1"
)

// Outbox defines the interface for all Outbox pattern operations combining state and pubsub.
type Outbox interface {
AddOrUpdateOutbox(stateStore v1alpha1.Component)
Enabled(stateStore string) bool
PublishInternal(ctx context.Context, stateStore string, states []state.TransactionalStateOperation, source string) ([]state.TransactionalStateOperation, error)
SubscribeToInternalTopics(ctx context.Context, appID string) error
}
11 changes: 11 additions & 0 deletions pkg/runtime/compstore/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ func (c *ComponentStore) GetPubSub(name string) (PubsubItem, bool) {
return pubsub, ok
}

func (c *ComponentStore) GetPubSubComponent(name string) (pubsub.PubSub, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
pubsub, ok := c.pubSubs[name]
if !ok {
return nil, false
}

return pubsub.Component, ok
}

func (c *ComponentStore) ListPubSubs() map[string]PubsubItem {
c.lock.RLock()
defer c.lock.RUnlock()
Expand Down
15 changes: 10 additions & 5 deletions pkg/runtime/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
configmodes "github.com/dapr/dapr/pkg/config/modes"
"github.com/dapr/dapr/pkg/grpc"
"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/dapr/pkg/outbox"
operatorv1 "github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/runtime/compstore"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/dapr/dapr/pkg/runtime/processor/secret"
"github.com/dapr/dapr/pkg/runtime/processor/state"
"github.com/dapr/dapr/pkg/runtime/processor/workflow"
runtimePubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
"github.com/dapr/dapr/pkg/runtime/registry"
)

Expand Down Expand Up @@ -102,16 +104,15 @@ type PubsubManager interface {
Publish(context.Context, *contribpubsub.PublishRequest) error
BulkPublish(context.Context, *contribpubsub.BulkPublishRequest) (contribpubsub.BulkPublishResponse, error)
SetAppChannel(channel.AppChannel)

StartSubscriptions(context.Context) error
StopSubscriptions()
Outbox() outbox.Outbox
manager
}

type BindingManager interface {
SendToOutputBinding(context.Context, string, *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
SetAppChannel(channel.AppChannel)

StartReadingFromBindings(context.Context) error
StopReadingFromBindings()
manager
Expand All @@ -129,7 +130,7 @@ type Processor struct {
}

func New(opts Options) *Processor {
pubsub := pubsub.New(pubsub.Options{
ps := pubsub.New(pubsub.Options{
ID: opts.ID,
Namespace: opts.Namespace,
Mode: opts.Mode,
Expand All @@ -145,11 +146,15 @@ func New(opts Options) *Processor {
ResourcesPath: opts.Standalone.ResourcesPath,
})

outbox := runtimePubsub.NewOutbox(ps.Publish, opts.ComponentStore.GetPubSubComponent, opts.ComponentStore.GetStateStore, pubsub.ExtractCloudEventProperty)
ps.SetOutbox(outbox)

state := state.New(state.Options{
PlacementEnabled: opts.PlacementEnabled,
Registry: opts.Registry.StateStores(),
ComponentStore: opts.ComponentStore,
Meta: opts.Meta,
Outbox: outbox,
})

binding := binding.New(binding.Options{
Expand All @@ -165,7 +170,7 @@ func New(opts Options) *Processor {
return &Processor{
compStore: opts.ComponentStore,
state: state,
pubsub: pubsub,
pubsub: ps,
binding: binding,
managers: map[components.Category]manager{
components.CategoryBindings: binding,
Expand All @@ -184,7 +189,7 @@ func New(opts Options) *Processor {
ComponentStore: opts.ComponentStore,
Meta: opts.Meta,
}),
components.CategoryPubSub: pubsub,
components.CategoryPubSub: ps,
components.CategorySecretStore: secret.New(secret.Options{
Registry: opts.Registry.SecretStores(),
ComponentStore: opts.ComponentStore,
Expand Down
10 changes: 5 additions & 5 deletions pkg/runtime/processor/pubsub/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ func (p *pubsub) publishMessageGRPC(ctx context.Context, msg *subscribedMessage)
cloudEvent := msg.cloudEvent

envelope := &runtimev1.TopicEventRequest{
Id: extractCloudEventProperty(cloudEvent, contribpubsub.IDField),
Source: extractCloudEventProperty(cloudEvent, contribpubsub.SourceField),
DataContentType: extractCloudEventProperty(cloudEvent, contribpubsub.DataContentTypeField),
Type: extractCloudEventProperty(cloudEvent, contribpubsub.TypeField),
SpecVersion: extractCloudEventProperty(cloudEvent, contribpubsub.SpecVersionField),
Id: ExtractCloudEventProperty(cloudEvent, contribpubsub.IDField),
Source: ExtractCloudEventProperty(cloudEvent, contribpubsub.SourceField),
DataContentType: ExtractCloudEventProperty(cloudEvent, contribpubsub.DataContentTypeField),
Type: ExtractCloudEventProperty(cloudEvent, contribpubsub.TypeField),
SpecVersion: ExtractCloudEventProperty(cloudEvent, contribpubsub.SpecVersionField),
Topic: msg.topic,
PubsubName: msg.metadata[metadataKeyPubSub],
Path: msg.path,
Expand Down
22 changes: 16 additions & 6 deletions pkg/runtime/processor/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
diag "github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/dapr/pkg/grpc"
"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/dapr/pkg/outbox"
operatorv1 "github.com/dapr/dapr/pkg/proto/operator/v1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/pkg/resiliency"
Expand Down Expand Up @@ -98,6 +99,7 @@ type pubsub struct {
lock sync.RWMutex

topicCancels map[string]context.CancelFunc
outbox outbox.Outbox
}

type subscribedMessage struct {
Expand Down Expand Up @@ -168,6 +170,14 @@ func (p *pubsub) Init(ctx context.Context, comp compapi.Component) error {
return nil
}

func (p *pubsub) SetOutbox(outbox outbox.Outbox) {
p.outbox = outbox
}

func (p *pubsub) Outbox() outbox.Outbox {
return p.outbox
}

func (p *pubsub) SetAppChannel(appChannel channel.AppChannel) {
p.appChannel = appChannel
}
Expand Down Expand Up @@ -238,7 +248,7 @@ func matchRoutingRule(rules []*rtpubsub.Rule, data map[string]interface{}) (*rtp
return nil, nil
}

func extractCloudEventProperty(cloudEvent map[string]any, property string) string {
func ExtractCloudEventProperty(cloudEvent map[string]any, property string) string {
if cloudEvent == nil {
return ""
}
Expand All @@ -254,11 +264,11 @@ func extractCloudEventProperty(cloudEvent map[string]any, property string) strin

func extractCloudEvent(event map[string]interface{}) (runtimev1pb.TopicEventBulkRequestEntry_CloudEvent, error) { //nolint:nosnakecase
envelope := &runtimev1pb.TopicEventCERequest{
Id: extractCloudEventProperty(event, contribpubsub.IDField),
Source: extractCloudEventProperty(event, contribpubsub.SourceField),
DataContentType: extractCloudEventProperty(event, contribpubsub.DataContentTypeField),
Type: extractCloudEventProperty(event, contribpubsub.TypeField),
SpecVersion: extractCloudEventProperty(event, contribpubsub.SpecVersionField),
Id: ExtractCloudEventProperty(event, contribpubsub.IDField),
Source: ExtractCloudEventProperty(event, contribpubsub.SourceField),
DataContentType: ExtractCloudEventProperty(event, contribpubsub.DataContentTypeField),
Type: ExtractCloudEventProperty(event, contribpubsub.TypeField),
SpecVersion: ExtractCloudEventProperty(event, contribpubsub.SpecVersionField),
}

if data, ok := event[contribpubsub.DataField]; ok && data != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/runtime/processor/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
compstate "github.com/dapr/dapr/pkg/components/state"
diag "github.com/dapr/dapr/pkg/diagnostics"
"github.com/dapr/dapr/pkg/encryption"
"github.com/dapr/dapr/pkg/outbox"
"github.com/dapr/dapr/pkg/runtime/compstore"
rterrors "github.com/dapr/dapr/pkg/runtime/errors"
"github.com/dapr/dapr/pkg/runtime/meta"
Expand All @@ -43,6 +44,7 @@ type Options struct {
ComponentStore *compstore.ComponentStore
Meta *meta.Meta
PlacementEnabled bool
Outbox outbox.Outbox
}

type state struct {
Expand All @@ -53,6 +55,7 @@ type state struct {

actorStateStoreName *string
placementEnabled bool
outbox outbox.Outbox
}

func New(opts Options) *state {
Expand All @@ -61,6 +64,7 @@ func New(opts Options) *state {
compStore: opts.ComponentStore,
meta: opts.Meta,
placementEnabled: opts.PlacementEnabled,
outbox: opts.Outbox,
}
}

Expand Down Expand Up @@ -108,6 +112,8 @@ func (s *state) Init(ctx context.Context, comp compapi.Component) error {
return rterrors.NewInit(rterrors.InitComponentFailure, fName, wrapError)
}

s.outbox.AddOrUpdateOutbox(comp)

// when placement address list is not empty, set specified actor store.
if s.placementEnabled {
// set specified actor store if "actorStateStore" is true in the spec.
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/pubsub/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"context"

contribPubsub "github.com/dapr/components-contrib/pubsub"
"github.com/dapr/dapr/pkg/outbox"
)

// Adapter is the interface for message buses.
type Adapter interface {
Publish(context.Context, *contribPubsub.PublishRequest) error
BulkPublish(context.Context, *contribPubsub.BulkPublishRequest) (contribPubsub.BulkPublishResponse, error)
Outbox() outbox.Outbox
}
Loading

0 comments on commit 6cc2710

Please sign in to comment.