Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/native-image-musl/dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ WORKDIR /opt
RUN ./install-musl.sh
ENV MUSL_HOME=/opt/musl-toolchain
ENV PATH="$MUSL_HOME/bin:$PATH"
# Verify installation
RUN x86_64-linux-musl-gcc --version
# Avoid errors like: "fatal: detected dubious ownership in repository"
RUN git config --global --add safe.directory '*'
4 changes: 2 additions & 2 deletions docker/native-image-musl/install-musl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ curl -O https://zlib.net/fossils/zlib-1.2.13.tar.gz

# Build musl from source
tar -xzvf musl-1.2.5.tar.gz
cd musl-1.2.5
cd musl-1.2.5 || exit
./configure --prefix=$MUSL_HOME --static
# The next operation may require privileged access to system resources, so use sudo
make && make install
Expand All @@ -22,7 +22,7 @@ x86_64-linux-musl-gcc --version

# Build zlib with musl from source and install into the MUSL_HOME directory
tar -xzvf zlib-1.2.13.tar.gz
cd zlib-1.2.13
cd zlib-1.2.13 || exit
CC=musl-gcc ./configure --prefix=$MUSL_HOME --static
make && make install
cd ..
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.Experimental;
import io.temporal.nexus.NexusOperationInfo;

/**
* Can be used to intercept calls from a Nexus operation into the Temporal APIs.
Expand All @@ -20,6 +21,9 @@
*/
@Experimental
public interface NexusOperationOutboundCallsInterceptor {
/** Intercepts call to get the Nexus info in a Nexus operation. */
NexusOperationInfo getInfo();

/** Intercepts call to get the metric scope in a Nexus operation. */
Scope getMetricsScope();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.Experimental;
import io.temporal.nexus.NexusOperationInfo;

/** Convenience base class for {@link NexusOperationOutboundCallsInterceptor} implementations. */
@Experimental
Expand All @@ -14,6 +15,11 @@ public NexusOperationOutboundCallsInterceptorBase(NexusOperationOutboundCallsInt
this.next = next;
}

@Override
public NexusOperationInfo getInfo() {
return next.getInfo();
}

@Override
public Scope getMetricsScope() {
return next.getMetricsScope();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.temporal.client.WorkflowClient;
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
import io.temporal.nexus.NexusOperationContext;
import io.temporal.nexus.NexusOperationInfo;

public class InternalNexusOperationContext {
private final String namespace;
Expand Down Expand Up @@ -58,6 +59,11 @@ public Link getStartWorkflowResponseLink() {
}

private class NexusOperationContextImpl implements NexusOperationContext {
@Override
public NexusOperationInfo getInfo() {
return outboundCalls.getInfo();
}

@Override
public Scope getMetricsScope() {
return outboundCalls.getMetricsScope();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.temporal.internal.nexus;

import io.temporal.nexus.NexusOperationInfo;

class NexusInfoImpl implements NexusOperationInfo {
private final String namespace;
private final String taskQueue;

NexusInfoImpl(String namespace, String taskQueue) {
this.namespace = namespace;
this.taskQueue = taskQueue;
}

@Override
public String getNamespace() {
return namespace;
}

@Override
public String getTaskQueue() {
return taskQueue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,24 @@
import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
import io.temporal.nexus.NexusOperationInfo;

public class RootNexusOperationOutboundCallsInterceptor
implements NexusOperationOutboundCallsInterceptor {
private final Scope scope;
private final WorkflowClient client;
private final NexusOperationInfo nexusInfo;

RootNexusOperationOutboundCallsInterceptor(Scope scope, WorkflowClient client) {
RootNexusOperationOutboundCallsInterceptor(
Scope scope, WorkflowClient client, NexusOperationInfo nexusInfo) {
this.scope = scope;
this.client = client;
this.nexusInfo = nexusInfo;
}

@Override
public NexusOperationInfo getInfo() {
return nexusInfo;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ public OperationHandler<Object, Object> intercept(
InternalNexusOperationContext temporalNexusContext = CurrentNexusOperationContext.get();
inboundCallsInterceptor.init(
new RootNexusOperationOutboundCallsInterceptor(
temporalNexusContext.getMetricsScope(), temporalNexusContext.getWorkflowClient()));
temporalNexusContext.getMetricsScope(),
temporalNexusContext.getWorkflowClient(),
new NexusInfoImpl(
temporalNexusContext.getNamespace(), temporalNexusContext.getTaskQueue())));
return new OperationInterceptorConverter(inboundCallsInterceptor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
*/
public interface NexusOperationContext {

/** Get Temporal information about the Nexus Operation. */
NexusOperationInfo getInfo();

/**
* Get scope for reporting business metrics in a nexus handler. This scope is tagged with the
* service and operation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.temporal.nexus;

/**
* Temporal information about the Nexus Operation. Use {@link NexusOperationContext#getInfo()} from
* a Nexus Operation implementation to access.
*/
public interface NexusOperationInfo {
/**
* @return Namespace of the worker that is executing the Nexus Operation
*/
String getNamespace();

/**
* @return Nexus Task Queue of the worker that is executing the Nexus Operation
*/
String getTaskQueue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.temporal.workflow.nexus;

import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.nexus.Nexus;
import io.temporal.nexus.NexusOperationInfo;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestNexusServices;
import io.temporal.workflow.shared.TestWorkflows;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class NexusOperationInfoTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class)
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Test
public void testOperationHeaders() {
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
Assert.assertEquals(
"UnitTest:" + testWorkflowRule.getTaskQueue(),
workflowStub.execute(testWorkflowRule.getTaskQueue()));
}

public static class TestNexus implements TestWorkflows.TestWorkflow1 {
@Override
public String execute(String input) {
// Try to call with the typed stub
TestNexusServices.TestNexusService1 serviceStub =
Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class);
return serviceStub.operation(input);
}
}

@ServiceImpl(service = TestNexusServices.TestNexusService1.class)
public static class TestNexusServiceImpl {
@OperationImpl
public OperationHandler<String, String> operation() {
return OperationHandler.sync(
(context, details, input) -> {
NexusOperationInfo info = Nexus.getOperationContext().getInfo();
return info.getNamespace() + ":" + info.getTaskQueue();
});
}
}
}
Loading