Skip to content

Provide SerializationContext for PayloadConverter and PayloadCodec #1695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ plugins {
id 'org.cadixdev.licenser' version '0.6.1'
id 'com.palantir.git-version' version "${palantirGitVersionVersion}" apply false
id 'io.github.gradle-nexus.publish-plugin' version '1.2.0'
id 'com.diffplug.spotless' version '6.15.0' apply false
id 'com.diffplug.spotless' version '6.17.0' apply false
id 'com.github.nbaztec.coveralls-jacoco' version "1.2.15" apply false

// id 'org.jetbrains.kotlin.jvm' version '1.4.32'
Expand Down
7 changes: 3 additions & 4 deletions docs/AOT-native-image.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@ Temporal JavaSDK dependencies like gRPC and Protobuf use reflection and some of

## [native-image](https://www.graalvm.org/reference-manual/native-image/)

Temporal JavaSDK and Test Server support and can be used with GraalVM `native-image`.
Temporal JavaSDK and Test Server support GraalVM `native-image`.

Temporal JavaSDK team does its best effort to maintain as complete `native-image` descriptors as technically possible for the following modules:
`temporal-sdk`, `temporal-testing`, `temporal-serviceclient`, `temporal-test-server`.
These modules are distributed with the embedded `native-image` descriptors.
While these descriptors provide enough information for `native-image` to build and run a fully functional Temporal Test Server,
any user building an application with Temporal JavaSDK will require an additional configuration for the reasons mentioned above: user interfaces are used by the JavaSDK for proxy creation, and they need to know in advance for `native-image`.

`native-image` build can be configured with respect to JNI, java proxying, reflection, etc
by providing [JSON configuration descriptors](https://www.graalvm.org/22.3/reference-manual/native-image/metadata/#specifying-metadata-with-json)
which help `native-image` during building of the native execution file.
which helps `native-image` during building of the native execution file.
This can be done manually, but it's labor-intensive and requires good understanding of `native-image` build [process](https://www.graalvm.org/22.3/reference-manual/native-image/basics/) and [configuration](https://www.graalvm.org/22.3/reference-manual/native-image/overview/Build-Overview/).

Instead we recommend users to run their JVM application along with
Instead, we recommend users run their JVM application along with
[the native-image Tracing agent](https://www.graalvm.org/22.3/reference-manual/native-image/metadata/AutomaticMetadataCollection/).
For example, the agent can be run with the full set of integration tests of the app to cover the largest variety of code paths.
This agent will automatically generate additional descriptor files that users should [place and retain with their project's source code](https://www.graalvm.org/22.3/reference-manual/native-image/overview/BuildConfiguration/#embed-a-configuration-file) under `META-INF/native-image`.
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
3 changes: 2 additions & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip
networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
12 changes: 8 additions & 4 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
Expand All @@ -80,10 +80,10 @@ do
esac
done

APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit

APP_NAME="Gradle"
# This is normally unused
# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
Expand Down Expand Up @@ -143,12 +143,16 @@ fi
if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
# In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
# In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
Expand Down
1 change: 1 addition & 0 deletions gradlew.bat
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ if "%OS%"=="Windows_NT" setlocal

set DIRNAME=%~dp0
if "%DIRNAME%"=="" set DIRNAME=.
@rem This is normally unused
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

Expand Down
2 changes: 1 addition & 1 deletion temporal-kotlin/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
id 'org.jlleitschuh.gradle.ktlint' version '11.2.0'
id 'org.jlleitschuh.gradle.ktlint' version '11.3.1'
}

apply plugin: 'org.jetbrains.kotlin.jvm'
Expand Down
2 changes: 1 addition & 1 deletion temporal-remote-data-encoder/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies {

// Jetty 10+ brings a non-production ready slf4j that doesn't work with released logback.
// It also require Java 11+. That's why we stay on Jetty 9. It's for tests only anyway.
testImplementation(platform("org.eclipse.jetty:jetty-bom:9.4.50.v20221201"))
testImplementation(platform("org.eclipse.jetty:jetty-bom:9.4.51.v20230217"))
testImplementation ("org.eclipse.jetty:jetty-server")
testImplementation ("org.eclipse.jetty:jetty-servlet")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,33 @@ public interface ActivityInfo {
String getWorkflowType();

/**
* @return the Namespace of Workflow Execution that executed the Activity.
* Note: At some moment Temporal had built-in support for scheduling activities on a different
* namespace than the original workflow. Currently, Workflows can schedule activities only on the
* same namespace, hence no need for different {@code getWorkflowNamespace()} and {@link
* #getActivityNamespace()} methods.
*
* @return the Namespace of Workflow Execution that scheduled the Activity.
* @deprecated use {@link #getNamespace()}
*/
@Deprecated
String getWorkflowNamespace();

/**
* @return the Namespace of the Activity Execution.
* Note: At some moment Temporal had built-in support for scheduling activities on a different
* namespace than the original workflow. Currently, Workflows can schedule activities only on the
* same namespace, hence no need for different {@link #getWorkflowNamespace()} and {@code
* getActivityNamespace()} methods.
*
* @return the Namespace of this Activity Execution.
* @deprecated use {@link #getNamespace()}
*/
@Deprecated
String getActivityNamespace();

String getNamespace();

String getActivityTaskQueue();

/**
* Gets the current Activity Execution attempt count. Attempt counts start at 1 and increment on
* each Activity Task Execution retry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public Builder setHeartbeatTimeout(Duration heartbeatTimeoutSeconds) {

/**
* Task queue to use when dispatching activity task to a worker. By default, it is the same task
* list name the workflow was started with.
* list name the workflow was started with. Default is used if set to {@code null}.
*/
public Builder setTaskQueue(String taskQueue) {
this.taskQueue = taskQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
package io.temporal.client;

import io.temporal.activity.ActivityExecutionContext;
import io.temporal.common.Experimental;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.PayloadConverter;
import io.temporal.payload.codec.PayloadCodec;
import io.temporal.payload.context.ActivitySerializationContext;
import io.temporal.payload.context.SerializationContext;
import java.util.Optional;
import javax.annotation.Nonnull;

/**
* Used to complete asynchronously activities that called {@link
Expand Down Expand Up @@ -55,4 +62,19 @@ <V> void reportCancellation(
*/
<V> void heartbeat(String workflowId, Optional<String> runId, String activityId, V details)
throws ActivityCompletionException;

/**
* Supply this context if correct serialization of activity heartbeats, results or other payloads
* requires {@link DataConverter}, {@link PayloadConverter} or {@link PayloadCodec} to be aware of
* {@link ActivitySerializationContext}.
*
* @param context provides information to the data converter about the abstraction the data
* belongs to
* @return an instance of DataConverter that may use the provided {@code context} for
* serialization
* @see SerializationContext
*/
@Experimental
@Nonnull
ActivityCompletionClient withContext(@Nonnull ActivitySerializationContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,34 @@
import com.uber.m3.tally.Scope;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
import io.temporal.payload.context.ActivitySerializationContext;
import io.temporal.workflow.Functions;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

class ActivityCompletionClientImpl implements ActivityCompletionClient {

private final ManualActivityCompletionClientFactory factory;
private final Functions.Proc completionHandle;

private final Scope metricsScope;
private final @Nullable ActivitySerializationContext serializationContext;

public ActivityCompletionClientImpl(
ActivityCompletionClientImpl(
ManualActivityCompletionClientFactory manualActivityCompletionClientFactory,
Functions.Proc completionHandle,
Scope metricsScope) {
Scope metricsScope,
@Nullable ActivitySerializationContext serializationContext) {
this.factory = manualActivityCompletionClientFactory;
this.completionHandle = completionHandle;
this.metricsScope = metricsScope;
this.serializationContext = serializationContext;
}

@Override
public <R> void complete(byte[] taskToken, R result) {
try {
factory.getClient(taskToken, metricsScope).complete(result);
factory.getClient(taskToken, metricsScope, serializationContext).complete(result);
} finally {
completionHandle.apply();
}
Expand All @@ -54,7 +59,9 @@ public <R> void complete(byte[] taskToken, R result) {
@Override
public <R> void complete(String workflowId, Optional<String> runId, String activityId, R result) {
try {
factory.getClient(toExecution(workflowId, runId), activityId, metricsScope).complete(result);
factory
.getClient(toExecution(workflowId, runId), activityId, metricsScope, serializationContext)
.complete(result);
} finally {
completionHandle.apply();
}
Expand All @@ -63,7 +70,7 @@ public <R> void complete(String workflowId, Optional<String> runId, String activ
@Override
public void completeExceptionally(byte[] taskToken, Exception result) {
try {
factory.getClient(taskToken, metricsScope).fail(result);
factory.getClient(taskToken, metricsScope, serializationContext).fail(result);
} finally {
completionHandle.apply();
}
Expand All @@ -73,7 +80,9 @@ public void completeExceptionally(byte[] taskToken, Exception result) {
public void completeExceptionally(
String workflowId, Optional<String> runId, String activityId, Exception result) {
try {
factory.getClient(toExecution(workflowId, runId), activityId, metricsScope).fail(result);
factory
.getClient(toExecution(workflowId, runId), activityId, metricsScope, serializationContext)
.fail(result);
} finally {
completionHandle.apply();
}
Expand All @@ -82,7 +91,7 @@ public void completeExceptionally(
@Override
public <V> void reportCancellation(byte[] taskToken, V details) {
try {
factory.getClient(taskToken, metricsScope).reportCancellation(details);
factory.getClient(taskToken, metricsScope, serializationContext).reportCancellation(details);
} finally {
completionHandle.apply();
}
Expand All @@ -93,7 +102,7 @@ public <V> void reportCancellation(
String workflowId, Optional<String> runId, String activityId, V details) {
try {
factory
.getClient(toExecution(workflowId, runId), activityId, metricsScope)
.getClient(toExecution(workflowId, runId), activityId, metricsScope, serializationContext)
.reportCancellation(details);
} finally {
completionHandle.apply();
Expand All @@ -109,10 +118,16 @@ public <V> void heartbeat(byte[] taskToken, V details) throws ActivityCompletion
public <V> void heartbeat(String workflowId, Optional<String> runId, String activityId, V details)
throws ActivityCompletionException {
factory
.getClient(toExecution(workflowId, runId), activityId, metricsScope)
.getClient(toExecution(workflowId, runId), activityId, metricsScope, serializationContext)
.recordHeartbeat(details);
}

@Nonnull
@Override
public ActivityCompletionClient withContext(@Nonnull ActivitySerializationContext context) {
return new ActivityCompletionClientImpl(factory, completionHandle, metricsScope, context);
}

private static WorkflowExecution toExecution(String workflowId, Optional<String> runId) {
return WorkflowExecution.newBuilder()
.setWorkflowId(workflowId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public ActivityCompletionClient newActivityCompletionClient() {
ActivityCompletionClient result =
WorkflowThreadMarker.protectFromWorkflowThread(
new ActivityCompletionClientImpl(
manualActivityCompletionClientFactory, () -> {}, metricsScope),
manualActivityCompletionClientFactory, () -> {}, metricsScope, null),
ActivityCompletionClient.class);
for (WorkflowClientInterceptor i : interceptors) {
result = i.newActivityCompletionClient(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@
*/
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
@Target({ElementType.FIELD, ElementType.TYPE, ElementType.METHOD})
public @interface Experimental {}
Loading