Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.temporal.internal.client.external.GenericWorkflowClient;
import io.temporal.internal.common.HeaderUtils;
import io.temporal.internal.nexus.CurrentNexusOperationContext;
import io.temporal.internal.worker.WorkerVersioningProtoUtils;
import io.temporal.payload.context.WorkflowSerializationContext;
import io.temporal.serviceclient.StatusUtils;
import io.temporal.worker.WorkflowTaskDispatchHandle;
Expand Down Expand Up @@ -71,6 +72,11 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
try (@Nullable WorkflowTaskDispatchHandle eagerDispatchHandle = obtainDispatchHandle(input)) {
boolean requestEagerExecution = eagerDispatchHandle != null;
startRequest.setRequestEagerExecution(requestEagerExecution);
if (requestEagerExecution && eagerDispatchHandle.getDeploymentOptions() != null) {
startRequest.setEagerWorkerDeploymentOptions(
WorkerVersioningProtoUtils.deploymentOptionsToProto(
eagerDispatchHandle.getDeploymentOptions()));
}
StartWorkflowExecutionResponse response = genericClient.start(startRequest.build());
WorkflowExecution execution =
WorkflowExecution.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
}
},
slotSupplier,
slotPermit))
slotPermit,
options.getDeploymentOptions()))
.orElse(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,31 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class WorkflowTaskDispatchHandle implements Closeable {
private final AtomicBoolean completed = new AtomicBoolean();
private final Function<WorkflowTask, Boolean> dispatchCallback;
private final TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier;
private final SlotPermit permit;
private final WorkerDeploymentOptions deploymentOptions;

/**
* @param dispatchCallback callback into a {@code WorkflowWorker} to dispatch a workflow task.
* @param slotSupplier slot supplier that was used to reserve a slot for this workflow task
* @param permit the slot permit reserved for this workflow task
* @param deploymentOptions deployment options of the worker that reserved the slot, or null if
* not configured
*/
public WorkflowTaskDispatchHandle(
DispatchCallback dispatchCallback,
TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier,
SlotPermit permit) {
SlotPermit permit,
@Nullable WorkerDeploymentOptions deploymentOptions) {
this.dispatchCallback = dispatchCallback;
this.slotSupplier = slotSupplier;
this.permit = permit;
this.deploymentOptions = deploymentOptions;
}

/**
Expand All @@ -47,6 +54,14 @@ public boolean dispatch(@Nonnull PollWorkflowTaskQueueResponse workflowTask) {
}
}

/**
* @return deployment options of the worker that reserved the slot, or null if not configured
*/
@Nullable
public WorkerDeploymentOptions getDeploymentOptions() {
return deploymentOptions;
}

@Override
public void close() {
if (completed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
import static org.junit.Assert.*;

import io.grpc.*;
import io.temporal.api.deployment.v1.WorkerDeploymentOptions;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.enums.v1.WorkerVersioningMode;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.VersioningBehavior;
import io.temporal.common.WorkerDeploymentVersion;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import io.temporal.testUtils.CountingSlotSupplier;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.WorkerOptions;
import io.temporal.worker.*;
import io.temporal.worker.tuning.*;
import io.temporal.workflow.shared.TestWorkflows;
import java.util.ArrayList;
Expand Down Expand Up @@ -67,6 +69,14 @@ public void tearDown() throws Exception {

private WorkerFactory setupWorkerFactory(
String workerIdentity, boolean registerWorkflows, boolean start) {
return setupWorkerFactory(workerIdentity, registerWorkflows, start, null);
}

private WorkerFactory setupWorkerFactory(
String workerIdentity,
boolean registerWorkflows,
boolean start,
io.temporal.worker.WorkerDeploymentOptions deploymentOptions) {
WorkflowClient workflowClient =
WorkflowClient.newInstance(
testWorkflowRule.getWorkflowServiceStubs(),
Expand All @@ -86,6 +96,7 @@ private WorkerFactory setupWorkerFactory(
activityTaskSlotSupplier,
localActivitySlotSupplier,
nexusSlotSupplier))
.setDeploymentOptions(deploymentOptions)
.build());
if (registerWorkflows) {
worker.registerWorkflowImplementationTypes(EagerWorkflowTaskWorkflowImpl.class);
Expand Down Expand Up @@ -265,6 +276,96 @@ public void testNoEagerWFTIfDisabledOnWorkflowOptions() {
START_CALL_INTERCEPTOR.wasLastStartEager());
}

@Test
public void testDeploymentOptionsArePropagatedForVersionedWorker() {
io.temporal.worker.WorkerDeploymentOptions deploymentOptions =
io.temporal.worker.WorkerDeploymentOptions.newBuilder()
.setUseVersioning(true)
.setVersion(new WorkerDeploymentVersion("my-deployment", "build-id-123"))
.setDefaultVersioningBehavior(VersioningBehavior.PINNED)
.build();

WorkerFactory workerFactory = setupWorkerFactory("worker1", true, true, deploymentOptions);

TestWorkflows.NoArgsWorkflow workflowStub =
workerFactory
.getWorkflowClient()
.newWorkflowStub(
TestWorkflows.NoArgsWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setDisableEagerExecution(false)
.build());
workflowStub.execute();

assertTrue(START_CALL_INTERCEPTOR.wasLastStartEager());

WorkerDeploymentOptions capturedOptions = START_CALL_INTERCEPTOR.getLastDeploymentOptions();
assertNotNull(
"Deployment options should be present in StartWorkflowExecutionRequest", capturedOptions);
assertEquals("my-deployment", capturedOptions.getDeploymentName());
assertEquals("build-id-123", capturedOptions.getBuildId());
assertEquals(
WorkerVersioningMode.WORKER_VERSIONING_MODE_VERSIONED,
capturedOptions.getWorkerVersioningMode());
}

@Test
public void testDeploymentOptionsArePropagatedForUnversionedWorker() {
io.temporal.worker.WorkerDeploymentOptions deploymentOptions =
io.temporal.worker.WorkerDeploymentOptions.newBuilder()
.setUseVersioning(false)
.setVersion(new WorkerDeploymentVersion("my-deployment", "build-id-456"))
.build();

WorkerFactory workerFactory = setupWorkerFactory("worker1", true, true, deploymentOptions);

TestWorkflows.NoArgsWorkflow workflowStub =
workerFactory
.getWorkflowClient()
.newWorkflowStub(
TestWorkflows.NoArgsWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setDisableEagerExecution(false)
.build());
workflowStub.execute();

assertTrue(START_CALL_INTERCEPTOR.wasLastStartEager());

WorkerDeploymentOptions capturedOptions = START_CALL_INTERCEPTOR.getLastDeploymentOptions();
assertNotNull(
"Deployment options should be present in StartWorkflowExecutionRequest", capturedOptions);
assertEquals("my-deployment", capturedOptions.getDeploymentName());
assertEquals("build-id-456", capturedOptions.getBuildId());
assertEquals(
WorkerVersioningMode.WORKER_VERSIONING_MODE_UNVERSIONED,
capturedOptions.getWorkerVersioningMode());
}

@Test
public void testNoDeploymentOptionsWhenWorkerHasNone() {
WorkerFactory workerFactory = setupWorkerFactory("worker1", true, true, null);

TestWorkflows.NoArgsWorkflow workflowStub =
workerFactory
.getWorkflowClient()
.newWorkflowStub(
TestWorkflows.NoArgsWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setDisableEagerExecution(false)
.build());
workflowStub.execute();

assertTrue(START_CALL_INTERCEPTOR.wasLastStartEager());

WorkerDeploymentOptions capturedOptions = START_CALL_INTERCEPTOR.getLastDeploymentOptions();
assertNull(
"Deployment options should not be present when worker has no deployment options configured",
capturedOptions);
}

public static class EagerWorkflowTaskWorkflowImpl implements TestWorkflows.NoArgsWorkflow {
@Override
public void execute() {}
Expand All @@ -273,6 +374,7 @@ public void execute() {}
private static class StartCallInterceptor implements ClientInterceptor {

private Boolean wasLastStartEager;
private WorkerDeploymentOptions lastDeploymentOptions;

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
Expand All @@ -287,8 +389,13 @@ public Boolean wasLastStartEager() {
return wasLastStartEager;
}

public WorkerDeploymentOptions getLastDeploymentOptions() {
return lastDeploymentOptions;
}

public void clear() {
wasLastStartEager = null;
lastDeploymentOptions = null;
}

private final class EagerStartSniffingCall<ReqT, RespT>
Expand All @@ -302,6 +409,11 @@ private final class EagerStartSniffingCall<ReqT, RespT>
public void sendMessage(ReqT message) {
StartWorkflowExecutionRequest request = (StartWorkflowExecutionRequest) message;
wasLastStartEager = request.getRequestEagerExecution();
if (request.hasEagerWorkerDeploymentOptions()) {
lastDeploymentOptions = request.getEagerWorkerDeploymentOptions();
} else {
lastDeploymentOptions = null;
}
super.sendMessage(message);
}
}
Expand Down
Loading