Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ jobs:
--port 7233 \
--http-port 7243 \
--namespace UnitTest \
--db-filename temporal.sqlite \
--sqlite-pragma journal_mode=WAL \
--sqlite-pragma synchronous=OFF \
--search-attribute CustomKeywordField=Keyword \
--search-attribute CustomStringField=Text \
--search-attribute CustomTextField=Text \
Expand All @@ -96,6 +99,7 @@ jobs:
--dynamic-config-value frontend.workerVersioningRuleAPIs=true \
--dynamic-config-value worker.removableBuildIdDurationSinceDefault=true \
--dynamic-config-value matching.useNewMatcher=true \
--dynamic-config-value system.refreshNexusEndpointsMinWait=1000 \
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true &
sleep 10s
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.client.functional;

import static org.junit.Assert.assertEquals;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.*;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.Rule;
import org.junit.Test;

public class StartDelayTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNoArgsWorkflowsFuncImpl.class)
.setUseTimeskipping(false)
.build();

@Test
public void startWithDelay() {
WorkflowOptions workflowOptions =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
.setStartDelay(Duration.ofSeconds(1))
.build();
TestNoArgsWorkflowFunc stubF =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(TestNoArgsWorkflowFunc.class, workflowOptions);
long start = System.currentTimeMillis();
stubF.func();
long end = System.currentTimeMillis();
// Assert that the workflow took at least 5 seconds to start
assertEquals(1000, end - start, 500);
WorkflowExecution workflowExecution = WorkflowStub.fromTyped(stubF).getExecution();
WorkflowExecutionHistory workflowExecutionHistory =
testWorkflowRule.getWorkflowClient().fetchHistory(workflowExecution.getWorkflowId());
List<WorkflowExecutionStartedEventAttributes> workflowExecutionStartedEvents =
workflowExecutionHistory.getEvents().stream()
.filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes)
.map(x -> x.getWorkflowExecutionStartedEventAttributes())
.collect(Collectors.toList());
assertEquals(1, workflowExecutionStartedEvents.size());
assertEquals(
Duration.ofSeconds(1),
ProtobufTimeUtils.toJavaDuration(
workflowExecutionStartedEvents.get(0).getFirstWorkflowTaskBackoff()));
}

public static class TestNoArgsWorkflowsFuncImpl implements TestNoArgsWorkflowFunc {

@Override
public String func() {

return "done";
}

@Override
public String update() {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,24 @@

import static org.junit.Assert.assertEquals;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import io.temporal.client.WorkflowException;
import io.temporal.client.WorkflowStub;
import io.temporal.common.RetryOptions;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.failure.ApplicationFailure;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -77,6 +85,20 @@ public void testWorkflowRetry() {
long elapsed = testWorkflowRule.getTestEnvironment().currentTimeMillis() - start;
Assert.assertTrue(
String.valueOf(elapsed), elapsed >= 2000); // Ensure that retry delays the restart
// Verify that the first workflow task backoff is set to 1 second
WorkflowExecution workflowExecution = WorkflowStub.fromTyped(workflowStub).getExecution();
WorkflowExecutionHistory workflowExecutionHistory =
testWorkflowRule.getWorkflowClient().fetchHistory(workflowExecution.getWorkflowId());
List<WorkflowExecutionStartedEventAttributes> workflowExecutionStartedEvents =
workflowExecutionHistory.getEvents().stream()
.filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes)
.map(x -> x.getWorkflowExecutionStartedEventAttributes())
.collect(Collectors.toList());
assertEquals(1, workflowExecutionStartedEvents.size());
assertEquals(
Duration.ofSeconds(1),
ProtobufTimeUtils.toJavaDuration(
workflowExecutionStartedEvents.get(0).getFirstWorkflowTaskBackoff()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1313,14 +1313,16 @@ private static void startWorkflow(
}
data.retryState.ifPresent(
testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));

a.setFirstExecutionRunId(data.firstExecutionRunId);
a.setOriginalExecutionRunId(data.originalExecutionRunId);
data.continuedExecutionRunId.ifPresent(a::setContinuedExecutionRunId);
if (data.lastCompletionResult != null) {
a.setLastCompletionResult(data.lastCompletionResult);
}
if (request.hasWorkflowStartDelay()) {
a.setFirstWorkflowTaskBackoff(request.getWorkflowStartDelay());

if (data.backoffStartInterval != null) {
a.setFirstWorkflowTaskBackoff(data.backoffStartInterval);
}
data.lastFailure.ifPresent(a::setContinuedFailure);
if (request.hasMemo()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1525,9 +1525,9 @@ private void processFailWorkflowExecution(
continueAsNewAttr.setMemo(startRequest.getMemo());
}
// TODO
ContinueAsNewWorkflowExecutionCommandAttributes coninueAsNewCommand =
ContinueAsNewWorkflowExecutionCommandAttributes continueAsNewCommand =
continueAsNewAttr.build();
workflow.action(Action.CONTINUE_AS_NEW, ctx, coninueAsNewCommand, workflowTaskCompletedId);
workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewCommand, workflowTaskCompletedId);
workflowTaskStateMachine.getData().workflowCompleted = true;
HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewEventAttributes =
Expand All @@ -1537,7 +1537,7 @@ private void processFailWorkflowExecution(
Optional.of(rs.getNextAttempt(Optional.of(failure)));
service.continueAsNew(
startRequest,
coninueAsNewCommand,
continueAsNewCommand,
continuedAsNewEventAttributes,
continuedRetryState,
identity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,16 @@ public void startWorkflowExecution(
StartWorkflowExecutionRequest request,
StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
try {
if (!request.getCronSchedule().isEmpty() && request.hasWorkflowStartDelay()) {
throw Status.INVALID_ARGUMENT
.withDescription(
"INVALID_ARGUMENT: CronSchedule and WorkflowStartDelay may not be used together.")
.asRuntimeException();
}
Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
if (request.hasWorkflowStartDelay()) {
backoffInterval = ProtobufTimeUtils.toJavaDuration(request.getWorkflowStartDelay());
}
StartWorkflowExecutionResponse response =
startWorkflowExecutionImpl(
request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
Expand Down Expand Up @@ -1453,8 +1462,10 @@ public void signalWithStartWorkflowExecution(
if (r.hasSearchAttributes()) {
startRequest.setSearchAttributes(r.getSearchAttributes());
}
Duration backoffInterval = Duration.ZERO;
if (r.hasWorkflowStartDelay()) {
startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay());
backoffInterval = ProtobufTimeUtils.toJavaDuration(r.getWorkflowStartDelay());
}
if (!r.getLinksList().isEmpty()) {
startRequest.addAllLinks(r.getLinksList());
Expand All @@ -1463,7 +1474,7 @@ public void signalWithStartWorkflowExecution(
StartWorkflowExecutionResponse startResult =
startWorkflowExecutionImpl(
startRequest.build(),
Duration.ZERO,
backoffInterval,
Optional.empty(),
OptionalLong.empty(),
ms -> {
Expand Down
Loading