Skip to content

Commit

Permalink
Consistent query support (#2678)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Oct 22, 2019
1 parent b5b20e3 commit 519b92b
Show file tree
Hide file tree
Showing 34 changed files with 1,980 additions and 1,281 deletions.
308 changes: 207 additions & 101 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

176 changes: 156 additions & 20 deletions .gen/go/matching/matching.go

Large diffs are not rendered by default.

477 changes: 375 additions & 102 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ func (c *clientImpl) QueryWorkflow(
request *h.QueryWorkflowRequest,
opts ...yarpc.CallOption,
) (*h.QueryWorkflowResponse, error) {
client, err := c.getClientForWorkflowID(request.Execution.GetWorkflowId())
client, err := c.getClientForWorkflowID(request.GetRequest().GetExecution().GetWorkflowId())
if err != nil {
return nil, err
}
Expand Down
12 changes: 3 additions & 9 deletions common/client/clientFeature.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ const (
var defaultVersion = version{0, 0, 0}

type (
// Feature provides information about client's capibility
// Feature provides information about client's capabilities
Feature interface {
SupportStickyQuery() bool
}

// FeatureImpl is used for determining the client's capibility.
// FeatureImpl is used for determining the client's capabilities.
// This can be useful when service support a feature, while
// client does not, so we can use be backward comparible
// client does not, so we can be backward compatible
FeatureImpl struct {
libVersion version
featureVersion version
Expand Down Expand Up @@ -74,12 +74,6 @@ func (feature *FeatureImpl) SupportStickyQuery() bool {
return feature.featureVersion.major > 0
}

// SupportConsistentQuery whether a client supports consistent query
func (feature *FeatureImpl) SupportConsistentQuery() bool {
// TODO: andrewjdawson2016 once client side changes for consistent query are done then update this
return false
}

func parseVersion(versionStr string) version {
var major int64
var minor int64
Expand Down
8 changes: 0 additions & 8 deletions common/client/clientFeature_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,3 @@ func (s *FeatureSuite) TestSupportStickyQuery() {
feature = NewFeatureImpl(libVersion, featureVersion, lang)
s.False(feature.SupportStickyQuery(), "Should not support sticky query")
}

func (s *FeatureSuite) TestSupportConsistentQuery() {
libVersion := "0.5.0"
featureVersion := "1.0.0"
lang := "go"
feature := NewFeatureImpl(libVersion, featureVersion, lang)
s.False(feature.SupportConsistentQuery())
}
15 changes: 15 additions & 0 deletions common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ func ClientArchivalStatusPtr(t shared.ArchivalStatus) *shared.ArchivalStatus {
return &t
}

// QueryResultTypePtr makes a copy and returns the pointer to a QueryResultType
func QueryResultTypePtr(t s.QueryResultType) *s.QueryResultType {
return &t
}

// QueryRejectConditionPtr makes a copy and returns the pointer to a QueryRejectCondition
func QueryRejectConditionPtr(t s.QueryRejectCondition) *s.QueryRejectCondition {
return &t
}

// QueryConsistencyLevelPtr makes a copy and returns the pointer to a QueryConsistencyLevel
func QueryConsistencyLevelPtr(t s.QueryConsistencyLevel) *s.QueryConsistencyLevel {
return &t
}

// StringDefault returns value if string pointer is set otherwise default value of string
func StringDefault(v *string) string {
var defaultString string
Expand Down
10 changes: 10 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1570,6 +1570,11 @@ const (
ReplicationTasksReturned
GetReplicationMessagesForShardLatency
EventReapplySkippedCount
DirectQueryDispatchLatency
DecisionTaskQueryLatency
CompleteQueryFailedCount
ConsistentQueryTimeoutCount
DecisionTaskCreatedForBufferedQueriesCount

NumHistoryMetrics
)
Expand Down Expand Up @@ -1875,6 +1880,11 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicationTasksReturned: {metricName: "replication_tasks_returned", metricType: Timer},
GetReplicationMessagesForShardLatency: {metricName: "get_replication_messages_for_shard", metricType: Timer},
EventReapplySkippedCount: {metricName: "event_reapply_skipped_count", metricType: Counter},
DirectQueryDispatchLatency: {metricName: "direct_query_dispatch_latency", metricType: Timer},
DecisionTaskQueryLatency: {metricName: "decision_task_query_latency", metricType: Timer},
CompleteQueryFailedCount: {metricName: "complete_query_failed", metricType: Counter},
ConsistentQueryTimeoutCount: {metricName: "consistent_query_timeout", metricType: Counter},
DecisionTaskCreatedForBufferedQueriesCount: {metricName: "decision_task_created_for_buffered_queries", metricType: Counter},
},
Matching: {
PollSuccessCounter: {metricName: "poll_success"},
Expand Down
1 change: 1 addition & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecision
BranchToken: historyResponse.BranchToken,
ScheduledTimestamp: historyResponse.ScheduledTimestamp,
StartedTimestamp: historyResponse.StartedTimestamp,
Queries: historyResponse.Queries,
}
if historyResponse.GetPreviousStartedEventId() != EmptyEventID {
matchingResp.PreviousStartedEventId = historyResponse.PreviousStartedEventId
Expand Down
30 changes: 27 additions & 3 deletions host/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,15 @@ func (s *elasticsearchIntegrationSuite) TestListWorkflow_SearchAttribute() {
Logger: s.Logger,
T: s.T(),
}
_, newTask, err := poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(false, false, true, true, int64(0), 1, true)
_, newTask, err := poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(
false,
false,
true,
true,
int64(0),
1,
true,
nil)
s.Nil(err)
s.NotNil(newTask)
s.NotNil(newTask.DecisionTask)
Expand Down Expand Up @@ -880,7 +888,15 @@ func (s *elasticsearchIntegrationSuite) TestUpsertWorkflowExecution() {
}

// process 1st decision and assert decision is handled correctly.
_, newTask, err := poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(false, false, true, true, int64(0), 1, true)
_, newTask, err := poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(
false,
false,
true,
true,
int64(0),
1,
true,
nil)
s.Nil(err)
s.NotNil(newTask)
s.NotNil(newTask.DecisionTask)
Expand Down Expand Up @@ -921,7 +937,15 @@ func (s *elasticsearchIntegrationSuite) TestUpsertWorkflowExecution() {
s.True(verified)

// process 2nd decision and assert decision is handled correctly.
_, newTask, err = poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(false, false, true, true, int64(0), 1, true)
_, newTask, err = poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(
false,
false,
true,
true,
int64(0),
1,
true,
nil)
s.Nil(err)
s.NotNil(newTask)
s.NotNil(newTask.DecisionTask)
Expand Down
30 changes: 27 additions & 3 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,15 @@ func (s *integrationSuite) TestCompleteDecisionTaskAndCreateNewOne() {
T: s.T(),
}

_, newTask, err := poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(false, false, true, true, int64(0), 1, true)
_, newTask, err := poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(
false,
false,
true,
true,
int64(0),
1,
true,
nil)
s.Nil(err)
s.NotNil(newTask)
s.NotNil(newTask.DecisionTask)
Expand Down Expand Up @@ -2656,7 +2664,15 @@ func (s *integrationSuite) TestRelayDecisionTimeout() {
}

// First decision task complete with a marker decision, and request to relay decision (immediately return a new decision task)
_, newTask, err := poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(false, false, false, false, 0, 3, true)
_, newTask, err := poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(
false,
false,
false,
false,
0,
3,
true,
nil)
s.Logger.Info("PollAndProcessDecisionTask", tag.Error(err))
s.Nil(err)
s.NotNil(newTask)
Expand Down Expand Up @@ -3245,7 +3261,15 @@ func (s *integrationSuite) TestBufferedEventsOutOfOrder() {
}

// first decision, which will schedule an activity and add marker
_, task, err := poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(true, false, false, false, int64(0), 1, true)
_, task, err := poller.PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(
true,
false,
false,
false,
int64(0),
1,
true,
nil)
s.Logger.Info("pollAndProcessDecisionTask", tag.Error(err))
s.Nil(err)

Expand Down
Loading

0 comments on commit 519b92b

Please sign in to comment.