Skip to content

Commit

Permalink
Revert back - Transactional state store API without retry (dapr#1892)
Browse files Browse the repository at this point in the history
* Revert "Revert transactional state API (dapr#1777)"

This reverts commit f733cf3.

* remove retry

* add stateoption and regen proto client

* remove concurrency and consistency per state

* update testapp modules

* fix invocation test

* Revert "remove concurrency and consistency per state"

This reverts commit 141d9ff.

* add transactional request

* update pb client

* update modules

* update modules

* updated

* fix test.

Co-authored-by: Yaron Schneider <yaronsc@microsoft.com>
  • Loading branch information
youngbupark and yaron2 authored Aug 14, 2020
1 parent fd7eae4 commit 9d6fad2
Show file tree
Hide file tree
Showing 36 changed files with 857 additions and 8,014 deletions.
26 changes: 25 additions & 1 deletion dapr/proto/runtime/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ service Dapr {

// Deletes the state for a specific key.
rpc DeleteState(DeleteStateRequest) returns (google.protobuf.Empty) {}

// Executes transactions for a specified store
rpc ExecuteStateTransaction(ExecuteStateTransactionRequest) returns (google.protobuf.Empty) {}

// Publishes events to the specific topic.
rpc PublishEvent(PublishEventRequest) returns (google.protobuf.Empty) {}
Expand Down Expand Up @@ -182,9 +185,30 @@ message GetSecretRequest {
map<string,string> metadata = 3;
}

// GetSecretResponse is the response mesage to convey the requested secret.
// GetSecretResponse is the response message to convey the requested secret.
message GetSecretResponse {
// data is the secret value. Some secret store, such as kubernetes secret
// store, can save multiple secrets for single secret key.
map<string, string> data = 1;
}

// TransactionalStateOperation is the message to execute a specified operation with a key-value pair.
message TransactionalStateOperation {
// The type of operation to be executed
string operationType = 1;

// State values to be operated on
common.v1.StateItem request = 2;
}

// ExecuteStateTransactionRequest is the message to execute multiple operations on a specified store.
message ExecuteStateTransactionRequest {
// Required. name of state store.
string storeName = 1;

// Required. transactional operation list.
repeated TransactionalStateOperation operations = 2;

// The metadata used for transactional operations.
map<string,string> metadata = 3;
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a
github.com/coreos/etcd v3.3.18+incompatible // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/dapr/components-contrib v0.2.5-0.20200724001326-5892feca63f7
github.com/dapr/components-contrib v0.2.5-0.20200812172248-3efcb4043035
github.com/fasthttp/router v1.0.4
github.com/fsnotify/fsnotify v1.4.7
github.com/ghodss/yaml v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/dapr/components-contrib v0.0.0-20200219164914-5b75f4d0fbc6/go.mod h1:AZi8IGs8LFdywJg/YGwDs7MAxJkvGa8RgHN4NoJSKt0=
github.com/dapr/components-contrib v0.2.5-0.20200724001326-5892feca63f7 h1:5wwh9jI5gIgmxhYfV/syl7/TRQah8z6hwRXeZpGf904=
github.com/dapr/components-contrib v0.2.5-0.20200724001326-5892feca63f7/go.mod h1:JSMSHTHQEcLgv7MZKIbKbHXgudogZfIjEkZMmn7nJwQ=
github.com/dapr/components-contrib v0.2.5-0.20200812172248-3efcb4043035 h1:BIu+ZvP/g4phWJ49f3VG4PVe4m80rY2ylgIBrIWCRKQ=
github.com/dapr/components-contrib v0.2.5-0.20200812172248-3efcb4043035/go.mod h1:JSMSHTHQEcLgv7MZKIbKbHXgudogZfIjEkZMmn7nJwQ=
github.com/dapr/dapr v0.4.1-0.20200228055659-71892bc0111e/go.mod h1:c60DJ9TdSdpbLjgqP55A5u4ZCYChFwa9UGYIXd9pmm4=
github.com/dapr/go-sdk v0.0.0-20200121181907-48249cda2fad h1:RKWoYovBc+B9ltvjtZLMnbu49stSucH8rZze3MeqyvQ=
github.com/dapr/go-sdk v0.0.0-20200121181907-48249cda2fad/go.mod h1:yeOIFBz6+BigHpk4ASJbgQDVjQ8+00oCWrFyOAFdob8=
Expand Down
11 changes: 7 additions & 4 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (a *actorsRuntime) TransactionalStateOperation(ctx context.Context, req *Tr
if a.store == nil {
return errors.New("actors: state store does not exist or incorrectly configured")
}
requests := []state.TransactionalRequest{}
operations := []state.TransactionalStateOperation{}
partitionKey := a.constructCompositeKey(a.config.AppID, req.ActorType, req.ActorID)
metadata := map[string]string{metadataPartitionKey: partitionKey}

Expand All @@ -385,7 +385,7 @@ func (a *actorsRuntime) TransactionalStateOperation(ctx context.Context, req *Tr
return err
}
key := a.constructActorStateKey(req.ActorType, req.ActorID, upsert.Key)
requests = append(requests, state.TransactionalRequest{
operations = append(operations, state.TransactionalStateOperation{
Request: state.SetRequest{
Key: key,
Value: upsert.Value,
Expand All @@ -401,7 +401,7 @@ func (a *actorsRuntime) TransactionalStateOperation(ctx context.Context, req *Tr
}

key := a.constructActorStateKey(req.ActorType, req.ActorID, delete.Key)
requests = append(requests, state.TransactionalRequest{
operations = append(operations, state.TransactionalStateOperation{
Request: state.DeleteRequest{
Key: key,
Metadata: metadata,
Expand All @@ -418,7 +418,10 @@ func (a *actorsRuntime) TransactionalStateOperation(ctx context.Context, req *Tr
return errors.New(incompatibleStateStore)
}

err := transactionalStore.Multi(requests)
err := transactionalStore.Multi(&state.TransactionalStateRequest{
Operations: operations,
Metadata: metadata,
})
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (f *fakeStateStore) BulkSet(req []state.SetRequest) error {
return nil
}

func (f *fakeStateStore) Multi(reqs []state.TransactionalRequest) error {
func (f *fakeStateStore) Multi(request *state.TransactionalStateRequest) error {
return nil
}

Expand Down
69 changes: 67 additions & 2 deletions pkg/grpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ import (
)

const (
daprSeparator = "||"

daprSeparator = "||"
daprHTTPStatusHeader = "dapr-http-status"
)

Expand All @@ -56,6 +55,7 @@ type API interface {
GetSecret(ctx context.Context, in *runtimev1pb.GetSecretRequest) (*runtimev1pb.GetSecretResponse, error)
SaveState(ctx context.Context, in *runtimev1pb.SaveStateRequest) (*empty.Empty, error)
DeleteState(ctx context.Context, in *runtimev1pb.DeleteStateRequest) (*empty.Empty, error)
ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.ExecuteStateTransactionRequest) (*empty.Empty, error)
}

type api struct {
Expand Down Expand Up @@ -372,3 +372,68 @@ func (a *api) GetSecret(ctx context.Context, in *runtimev1pb.GetSecretRequest) (
}
return response, nil
}

func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.ExecuteStateTransactionRequest) (*empty.Empty, error) {
if a.stateStores == nil || len(a.stateStores) == 0 {
return &empty.Empty{}, errors.New("ERR_STATE_STORE_NOT_CONFIGURED")
}

storeName := in.StoreName

if a.stateStores[storeName] == nil {
return &empty.Empty{}, errors.New("ERR_STATE_STORE_NOT_FOUND")
}

transactionalStore, ok := a.stateStores[storeName].(state.TransactionalStore)
if !ok {
return &empty.Empty{}, errors.New("ERR_STATE_STORE_NOT_SUPPORTED")
}

operations := []state.TransactionalStateOperation{}
for _, inputReq := range in.Operations {
var req state.TransactionalStateOperation
switch state.OperationType(inputReq.OperationType) {
case state.Upsert:
setReq := state.SetRequest{
Key: a.getModifiedStateKey(inputReq.Request.Key),
Value: string(inputReq.Request.Value),
Options: state.SetStateOption{
Concurrency: stateConcurrencyToString(inputReq.Request.Options.Concurrency),
Consistency: stateConsistencyToString(inputReq.Request.Options.Consistency),
},
}
req = state.TransactionalStateOperation{
Operation: state.Upsert,
Request: setReq,
}

case state.Delete:
delReq := state.DeleteRequest{
Key: a.getModifiedStateKey(inputReq.Request.Key),
Options: state.DeleteStateOption{
Concurrency: stateConcurrencyToString(inputReq.Request.Options.Concurrency),
Consistency: stateConsistencyToString(inputReq.Request.Options.Consistency),
},
}
req = state.TransactionalStateOperation{
Operation: state.Delete,
Request: delReq,
}

default:
return &empty.Empty{}, fmt.Errorf("ERR_OPERATION_NOT_SUPPORTED: operation type %s not supported", inputReq.OperationType)
}

operations = append(operations, req)
}

err := transactionalStore.Multi(&state.TransactionalStateRequest{
Operations: operations,
Metadata: in.Metadata,
})

if err != nil {
return &empty.Empty{}, fmt.Errorf("ERR_STATE_TRANSACTION: %s", err)
}
return &empty.Empty{}, nil
}
60 changes: 60 additions & 0 deletions pkg/grpc/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/dapr/components-contrib/exporters"
"github.com/dapr/components-contrib/exporters/stringexporter"
"github.com/dapr/components-contrib/state"
channelt "github.com/dapr/dapr/pkg/channel/testing"
"github.com/dapr/dapr/pkg/config"
diag "github.com/dapr/dapr/pkg/diagnostics"
Expand Down Expand Up @@ -91,6 +92,10 @@ func (m *mockGRPCAPI) GetSecret(ctx context.Context, in *runtimev1pb.GetSecretRe
return &runtimev1pb.GetSecretResponse{}, nil
}

func (m *mockGRPCAPI) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.ExecuteStateTransactionRequest) (*empty.Empty, error) {
return &empty.Empty{}, nil
}

func ExtractSpanContext(ctx context.Context) []byte {
span := diag_utils.SpanFromContext(ctx)
return []byte(SerializeSpanContext(span.SpanContext()))
Expand Down Expand Up @@ -696,3 +701,58 @@ func TestInvokeBinding(t *testing.T) {
_, err := client.InvokeBinding(context.Background(), &runtimev1pb.InvokeBindingRequest{})
assert.Nil(t, err)
}

func TestExecuteStateTransaction(t *testing.T) {
stateOptions, _ := GenerateStateOptionsTestCase()
port, _ := freeport.GetFreePort()

server := startTestServer(port)
defer server.Stop()

clientConn := createTestClient(port)
defer clientConn.Close()

client := runtimev1pb.NewDaprClient(clientConn)
_, err := client.ExecuteStateTransaction(context.Background(), &runtimev1pb.ExecuteStateTransactionRequest{
Operations: []*runtimev1pb.TransactionalStateOperation{
{
OperationType: "upsert",
Request: &commonv1pb.StateItem{
Key: "key1",
Value: []byte("1"),
Options: stateOptions,
},
},
{
OperationType: "upsert",
Request: &commonv1pb.StateItem{
Key: "key2",
Value: []byte("1"),
},
},
{
OperationType: "delete",
Request: &commonv1pb.StateItem{
Key: "key1",
},
},
},
})
server.Stop()
assert.Nil(t, err)
}

func GenerateStateOptionsTestCase() (*commonv1pb.StateOptions, state.SetStateOption) {
concurrencyOption := commonv1pb.StateOptions_CONCURRENCY_FIRST_WRITE
consistencyOption := commonv1pb.StateOptions_CONSISTENCY_STRONG

testOptions := commonv1pb.StateOptions{
Concurrency: concurrencyOption,
Consistency: consistencyOption,
}
expected := state.SetStateOption{
Concurrency: "first-write",
Consistency: "strong",
}
return &testOptions, expected
}
46 changes: 46 additions & 0 deletions pkg/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func (a *api) constructStateEndpoints() []Endpoint {
Version: apiVersionV1,
Handler: a.onBulkGetState,
},
{
Methods: []string{fasthttp.MethodPost},
Route: "state/{storeName}/transaction",
Version: apiVersionV1,
Handler: a.onPostStateTransaction,
},
}
}

Expand Down Expand Up @@ -1001,3 +1007,43 @@ func getMetadataFromRequest(reqCtx *fasthttp.RequestCtx) map[string]string {

return metadata
}

func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) {
var err error
if a.stateStores == nil || len(a.stateStores) == 0 {
msg := NewErrorResponse("ERR_STATE_STORES_NOT_CONFIGURED", "")
respondWithError(reqCtx, 400, msg)
return
}

storeName := reqCtx.UserValue(storeNameParam).(string)
stateStore, ok := a.stateStores[storeName]
if !ok {
msg := NewErrorResponse("ERR_STATE_STORE_NOT_FOUND:", fmt.Sprintf("state store name: %s", storeName))
respondWithError(reqCtx, 401, msg)
return
}

transactionalStore, ok := stateStore.(state.TransactionalStore)
if !ok {
msg := NewErrorResponse("ERR_STATE_STORE_NOT_SUPPORTED", fmt.Sprintf("state store name: %s", storeName))
respondWithError(reqCtx, 500, msg)
return
}

body := reqCtx.PostBody()
var request state.TransactionalStateRequest
if err = a.json.Unmarshal(body, &request); err != nil {
msg := NewErrorResponse("ERR_DESERIALIZE_HTTP_BODY", err.Error())
respondWithError(reqCtx, 400, msg)
return
}

err = transactionalStore.Multi(&request)
if err != nil {
msg := NewErrorResponse("ERR_STATE_TRANSACTION_SAVE", err.Error())
respondWithError(reqCtx, 500, msg)
} else {
respondEmpty(reqCtx, 201)
}
}
Loading

0 comments on commit 9d6fad2

Please sign in to comment.