-
Notifications
You must be signed in to change notification settings - Fork 896
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement GetWorkerTaskReachability API #4346
Implement GetWorkerTaskReachability API #4346
Conversation
bergundy
commented
May 16, 2023
- Closes [Worker Versioning] Implement API for UI / CLI to know which workers can be retired #4162
- Closes Worker Versioning: Add visibility tags (and possibly API) to enable worker retirement discovery #3303
response, err := wh.getWorkerTaskReachabilityValidated(ctx, ns, request) | ||
if err != nil { | ||
wh.logger.Error("Failed getting worker task reachability", tag.Error(err)) | ||
return nil, serviceerror.NewInternal("Internal error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is desirable and we should avoid leaking error details to the caller.
Query: query.str, | ||
} | ||
|
||
// TODO: is count more efficient than select with page size of 1? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think select with limit of one is more efficient, not sure... @rodrigozhou or @alexshtin could verify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very possible the count is more efficient, esp. if it can be approximate. databases often keep summary stats to aid the query planner and the count might be able to be answered from those without touching any data at all
) | ||
|
||
// Little helper to concurrently map a function over input and fail fast on error. | ||
func raceMap[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([]OUT, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is idiomatic but I found myself repeating this pattern a few times in this file and ended up refactoring to use this little helper. Maybe we have something similar in the codebase or a shared place we can put this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put little generic stuff like this in common/util/util.go
_, err := mdb.conn.ExecContext(ctx, query, params...) | ||
if err == nil { | ||
// TODO(bergundy) | ||
panic("this should be properly tested once we support deletion") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentional, I started implementing it but realized I can't test it properly and decided to leave this panic as a reminder that this has to be tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say just replace "this" with something slightly more specific
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I didn't think it'd matter much because it's a placeholder for when we do implement deletions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a few high-level comments until the new version
) | ||
|
||
// Little helper to concurrently map a function over input and fail fast on error. | ||
func raceMap[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([]OUT, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put little generic stuff like this in common/util/util.go
Query: query.str, | ||
} | ||
|
||
// TODO: is count more efficient than select with page size of 1? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very possible the count is more efficient, esp. if it can be approximate. databases often keep summary stats to aid the query planner and the count might be able to be answered from those without touching any data at all
…ersioning-build-id-search-attribute
…ersioning-build-id-search-attribute
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM but I'm no DB expert
// Do not set this to a value higher than 255 for clusters using SQL based persistence due to predefined VARCHAR | ||
// column width. | ||
WorkerBuildIdSizeLimit = "limit.workerBuildIdSize" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be good if we just enforced this limit at startup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. but this is dynamic config that technically can change at any time.
I do get the point though, I don't know if we support dynamic config validation.
I can check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No
@@ -64,6 +64,25 @@ type ( | |||
DataEncoding string | |||
} | |||
|
|||
AddBuildIdToTaskQueueMapping struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be plural?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... yeah...
My point was that you're adding to the mapping of build id to task queue.
But if it doesn't read right to you, I can change (in a followup PR because I have a couple stacked on top of this).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe AddToBuildIdToTaskQueueMapping
? that reads okay to me though a little verbose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like
BuildIds []string | ||
} | ||
|
||
RemoveBuildIdToTaskQueueMapping struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also should be plural?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RemoveBuildIdToTaskQueueMapping struct { | |
RemoveFromBuildIdToTaskQueueMapping struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like
_, err := mdb.conn.ExecContext(ctx, query, params...) | ||
if err == nil { | ||
// TODO(bergundy) | ||
panic("this should be properly tested once we support deletion") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say just replace "this" with something slightly more specific
|
||
// MapConcurrent concurrently maps a function over input and fails fast on error. | ||
func MapConcurrent[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([]OUT, error) { | ||
errorsCh := make(chan error, len(input)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could have this be only 1 size by using select both where you put in it and where you take out of it, to avoid writing a bunch of nils on non-error cases. Not exactly a huge deal tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to block the goroutines when the receiver exits after the first error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I mean by using a select on where you take out as well, you can make that nonblocking too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would need to use select where I send the results.
build_id_if_row_is_an_index text, -- If this row is used as a mapping of build id to task queue, this will not be empty | ||
data blob, -- temporal.server.api.persistence.v1.TaskQueueUserData | ||
data_encoding text, -- Encoding type used for serialization, in practice this should always be proto3 | ||
build_ids set<text>, -- All active build ids in all version sets on this task queue (used in an index below) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "active" mean? Not deleted? Would be good to clarify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, active is "not deleted", I can clarify.
} | ||
} | ||
if gotUnversionedRequest && len(request.GetTaskQueues()) == 0 { | ||
return nil, serviceerror.NewInvalidArgument("Cannot get reachability of an unversioned worker without specifying at least one task queue") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth mentioning here that we interpret empty string as unversioned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, couldn't hurt
_, err = s.engine.UpdateWorkerBuildIdCompatibility(ctx, &workflowservice.UpdateWorkerBuildIdCompatibilityRequest{ | ||
Namespace: s.namespace, | ||
TaskQueue: tq1, | ||
Operation: &workflowservice.UpdateWorkerBuildIdCompatibilityRequest_AddNewCompatibleBuildId{ | ||
AddNewCompatibleBuildId: &workflowservice.UpdateWorkerBuildIdCompatibilityRequest_AddNewCompatibleVersion{ | ||
ExistingCompatibleBuildId: v0, | ||
NewBuildId: v01, | ||
}, | ||
}, | ||
}) | ||
s.Require().NoError(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This test reads slightly more obviously if this update happens after the first one so it's both tq1 ops, then tq2, then tq3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Sushisource I'll address your comments after I merge my stacked PRs.
…ersioning-redirect-writes
Route task queue user data writes via a single matching node per namespace
Merged David's recent changes Added a reachability tests and search attribute tests Search attribute is now propagated from source version stamp Search attribute has versioned|unversioned prefix to give more accurate reachability results for unversioned workers Merged stacked PR #2
…ersioning-build-id-search-attribute
* Only put messages in the ns replication queue for global namespaces * Add BuildIdBasedVersioning to list of supported capabilities * Address Spencer's comments on #4346
@@ -184,6 +184,17 @@ func makeGetMatchingClient(reqType reflect.Type) string { | |||
|
|||
var tqtPath string | |||
switch t.Name() { | |||
case "GetBuildIdTaskQueueMappingRequest": | |||
// Pick a random node for this request, it's not associated with a specific task queue. | |||
tqPath = "&taskqueuepb.TaskQueue{Name: fmt.Sprintf(\"not-applicable-%%s\", rand.Int())}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not being double-formatted, this should work
tqPath = "&taskqueuepb.TaskQueue{Name: fmt.Sprintf(\"not-applicable-%%s\", rand.Int())}" | |
tqPath = "&taskqueuepb.TaskQueue{Name: fmt.Sprintf(\"not-applicable-%s\", rand.Int())}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right
} | ||
|
||
if err := iter.Close(); err != nil { | ||
return nil, serviceerror.NewUnavailable(fmt.Sprintf("GetTaskQueuesByBuildId operation failed. Error: %v", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add a wrapped error field to NewUnavailable since this comes up a lot. not now, though
@@ -64,6 +64,25 @@ type ( | |||
DataEncoding string | |||
} | |||
|
|||
AddBuildIdToTaskQueueMapping struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe AddToBuildIdToTaskQueueMapping
? that reads okay to me though a little verbose
BuildIds []string | ||
} | ||
|
||
RemoveBuildIdToTaskQueueMapping struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RemoveBuildIdToTaskQueueMapping struct { | |
RemoveFromBuildIdToTaskQueueMapping struct { |
BuildID string | ||
} | ||
|
||
CountTaskQueuesByBuildIdRequest = GetTaskQueuesByBuildIdRequest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make a separate type, not an alias
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will, just curious why
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otherwise you could pass a value of one type as the other? I mean, yeah, it's not like it's a big problem, they are the same in some sense, it's just weird to be able to pass a *CountTaskQueuesByBuildIdRequest
to GetTaskQueuesByBuildId
statusFilter := "" | ||
switch reachabilityType { | ||
case enumspb.TASK_REACHABILITY_OPEN_WORKFLOWS: | ||
statusFilter = " AND ExecutionStatus = \"Running\"" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
statusFilter = " AND ExecutionStatus = \"Running\"" | |
statusFilter = ` AND ExecutionStatus = "Running"` |
req := manager.CountWorkflowExecutionsRequest{ | ||
NamespaceID: ns.ID(), | ||
Namespace: ns.Name(), | ||
Query: fmt.Sprintf("TaskQueue = %q AND %s%s", taskQueue, buildIdsFilter, statusFilter), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use constant for search attribute name
@@ -115,6 +122,10 @@ type ( | |||
timeSource clock.TimeSource | |||
// Only set if global namespaces are enabled on the cluster. | |||
namespaceReplicationQueue persistence.NamespaceReplicationQueue | |||
// Disables concurrent task queue user data updates and replication requests (due to a cassandra limitation) | |||
namespaceUpdateLockMap map[string]*namespaceUpdateLocks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider using common/locks.IDMutex
here. it takes some of the boilerplate out of this pattern. you can use separate keys for the update + replication locks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine TBH
// Force unloading a task queue. Used for testing only. | ||
rpc ForceUnloadTaskQueue (ForceUnloadTaskQueueRequest) returns (ForceUnloadTaskQueueResponse) {} | ||
|
||
// Update task queue user data in owning node for all updates in namespace. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this sounds too tempting to use when you want to update user data. can we make the comment very explicit that these two functions should only be called by the matching implementation itself and that external callers like frontend should use UpdateWorkerBuildIdCompatibility
(or some other function for other fields)?
it might even be good if the name starts with Internal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can change to Internal
@@ -488,29 +489,25 @@ func (c *taskQueueManagerImpl) GetUserData(ctx context.Context) (*persistencespb | |||
} | |||
|
|||
//nolint:revive // control coupling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this now that lint won't complain?
Note: This commit came from a feature branch and is not expected to build.
Note: This commit came from a feature branch and is not expected to build.
Note: This commit came from a feature branch and is not expected to build.
Note: This commit came from a feature branch and is not expected to build.
Note: This commit came from a feature branch and is not expected to build.