Skip to content

thread-safe invocation chain implementation #640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.logging.*;
import javax.annotation.*;

import com.microsoft.azure.functions.worker.chain.InvocationChain;
import io.grpc.*;
import io.grpc.stub.*;

Expand Down Expand Up @@ -35,8 +36,8 @@ public JavaWorkerClient(IApplication app) {

@PostConstruct
private void addHandlers() {
JavaFunctionBroker broker = new JavaFunctionBroker(classPathProvider);

InvocationChain.InvocationChainBuilder invocationChainBuilder = new InvocationChain.InvocationChainBuilder();
JavaFunctionBroker broker = new JavaFunctionBroker(classPathProvider, invocationChainBuilder);
this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_INIT_REQUEST, () -> new WorkerInitRequestHandler(broker));
this.handlerSuppliers.put(StreamingMessage.ContentCase.FUNCTION_ENVIRONMENT_RELOAD_REQUEST, () -> new FunctionEnvironmentReloadRequestHandler(broker));
this.handlerSuppliers.put(StreamingMessage.ContentCase.FUNCTION_LOAD_REQUEST, () -> new FunctionLoadRequestHandler(broker));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,8 @@ public void addTriggerMetadataSource(Map<String, TypedData> metadata) {
}
}

public void addExecutionContextSource(String invocationId, String funcname, ExecutionTraceContext traceContext, ExecutionRetryContext retryContext) {
otherSources.put(ExecutionContext.class,
new ExecutionContextDataSource(
invocationId,
funcname,
traceContext,
retryContext
)
);
public void addExecutionContextSource(ExecutionContextDataSource executionContextDataSource) {
otherSources.put(ExecutionContext.class, executionContextDataSource);
}

public Optional<BindingData> getDataByName(String name, Type target) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import com.microsoft.azure.functions.TraceContext;
import com.microsoft.azure.functions.RetryContext;

final class ExecutionContextDataSource extends DataSource<ExecutionContext> implements ExecutionContext {
ExecutionContextDataSource(String invocationId, String funcname, TraceContext traceContext, RetryContext retryContext) {
public final class ExecutionContextDataSource extends DataSource<ExecutionContext> implements ExecutionContext {
public ExecutionContextDataSource(String invocationId, String funcname, TraceContext traceContext, RetryContext retryContext) {
super(null, null, EXECONTEXT_DATA_OPERATIONS);
this.invocationId = invocationId;
this.traceContext = traceContext;
Expand Down Expand Up @@ -37,10 +37,20 @@ final class ExecutionContextDataSource extends DataSource<ExecutionContext> impl
private final TraceContext traceContext;
private final RetryContext retryContext;
private final Logger logger;
private final String funcname;
private final String funcname;

private BindingDataStore dataStore;

public BindingDataStore getDataStore() {
return dataStore;
}

public void setDataStore(BindingDataStore dataStore) {
this.dataStore = dataStore;
}

private static final DataOperations<ExecutionContext, Object> EXECONTEXT_DATA_OPERATIONS = new DataOperations<>();
static {
EXECONTEXT_DATA_OPERATIONS.addGenericOperation(ExecutionContext.class, DataOperations::generalAssignment);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.lang.reflect.*;
import java.util.*;

import com.microsoft.azure.functions.middleware.FunctionWorkerMiddleware;
import com.microsoft.azure.functions.worker.binding.*;
import com.microsoft.azure.functions.worker.description.*;
import com.microsoft.azure.functions.worker.reflect.*;
Expand Down Expand Up @@ -47,9 +48,10 @@ public EnhancedJavaMethodExecutorImpl(FunctionMethodDescriptor descriptor, Map<S

public ParameterResolver getOverloadResolver() { return this.overloadResolver; }

public void execute(BindingDataStore dataStore) throws Exception {
public void execute(ExecutionContextDataSource executionContextDataSource) throws Exception {
try {
Thread.currentThread().setContextClassLoader(this.classLoader);
BindingDataStore dataStore = executionContextDataSource.getDataStore();
Object retValue = this.overloadResolver.resolve(dataStore)
.orElseThrow(() -> new NoSuchMethodException("Cannot locate the method signature with the given input"))
.invoke(() -> this.containingClass.newInstance());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package com.microsoft.azure.functions.worker.broker;

import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import com.microsoft.azure.functions.middleware.FunctionWorkerMiddleware;
import com.microsoft.azure.functions.rpc.messages.*;
import com.microsoft.azure.functions.worker.Constants;
import com.microsoft.azure.functions.worker.WorkerLogManager;
import com.microsoft.azure.functions.worker.binding.BindingDataStore;
import com.microsoft.azure.functions.worker.binding.ExecutionContextDataSource;
import com.microsoft.azure.functions.worker.binding.ExecutionRetryContext;
import com.microsoft.azure.functions.worker.binding.ExecutionTraceContext;
import com.microsoft.azure.functions.worker.description.FunctionMethodDescriptor;
import com.microsoft.azure.functions.worker.chain.InvocationChain;
import com.microsoft.azure.functions.worker.reflect.ClassLoaderProvider;

import org.apache.commons.lang3.exception.ExceptionUtils;
Expand All @@ -25,40 +26,57 @@
* reflection, and invoke them at runtime. Thread-Safety: Multiple thread.
*/
public class JavaFunctionBroker {
public JavaFunctionBroker(ClassLoaderProvider classLoaderProvider) {
public JavaFunctionBroker(ClassLoaderProvider classLoaderProvider, InvocationChain.InvocationChainBuilder invocationChainBuilder) {
this.methods = new ConcurrentHashMap<>();
this.classLoaderProvider = classLoaderProvider;
this.invocationChainBuilder = invocationChainBuilder;
}

public void loadMethod(FunctionMethodDescriptor descriptor, Map<String, BindingInfo> bindings)
throws ClassNotFoundException, NoSuchMethodException, IOException {
descriptor.validate();

addSearchPathsToClassLoader(descriptor);
loadMiddleware();
JavaMethodExecutor executor = new FactoryJavaMethodExecutor().getJavaMethodExecutor(descriptor, bindings, classLoaderProvider);

this.methods.put(descriptor.getId(), new ImmutablePair<>(descriptor.getName(), executor));
}

private void loadMiddleware() {
if (loadMiddleware) {
synchronized (JavaFunctionBroker.class){
if (loadMiddleware) {
try {
Thread.currentThread().setContextClassLoader(classLoaderProvider.createClassLoader());
ServiceLoader<FunctionWorkerMiddleware> middlewareServiceLoader = ServiceLoader.load(FunctionWorkerMiddleware.class);
for (FunctionWorkerMiddleware middleware : middlewareServiceLoader) {
this.invocationChainBuilder.use(middleware);
}
} finally {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
}
loadMiddleware = false;
}
}
}
}

public Optional<TypedData> invokeMethod(String id, InvocationRequest request, List<ParameterBinding> outputs)
throws Exception {
ImmutablePair<String, JavaMethodExecutor> methodEntry = this.methods.get(id);
JavaMethodExecutor executor = methodEntry.right;
if (executor == null) {
throw new NoSuchMethodException("Cannot find method with ID \"" + id + "\"");
}

BindingDataStore dataStore = new BindingDataStore();
final BindingDataStore dataStore = new BindingDataStore();
dataStore.setBindingDefinitions(executor.getBindingDefinitions());
dataStore.addTriggerMetadataSource(getTriggerMetadataMap(request));
dataStore.addParameterSources(request.getInputDataList());

ExecutionTraceContext traceContext = new ExecutionTraceContext(request.getTraceContext().getTraceParent(), request.getTraceContext().getTraceState(), request.getTraceContext().getAttributesMap());
ExecutionRetryContext retryContext = new ExecutionRetryContext(request.getRetryContext().getRetryCount(), request.getRetryContext().getMaxRetryCount(), request.getRetryContext().getException());

dataStore.addExecutionContextSource(request.getInvocationId(), methodEntry.left, traceContext, retryContext);

executor.execute(dataStore);
ExecutionContextDataSource executionContextDataSource = new ExecutionContextDataSource(request.getInvocationId(), methodEntry.left, traceContext, retryContext);
dataStore.addExecutionContextSource(executionContextDataSource);
executionContextDataSource.setDataStore(dataStore);
this.invocationChainBuilder.build(executor).doNext(executionContextDataSource);
outputs.addAll(dataStore.getOutputParameterBindings(true));
return dataStore.getDataTargetTypedValue(BindingDataStore.RETURN_NAME);
}
Expand Down Expand Up @@ -148,27 +166,14 @@ private boolean isTesting(){
}
}

void verifyLibrariesExist (File workerLib, String workerLibPath) throws FileNotFoundException{
if(!workerLib.exists()) {
throw new FileNotFoundException("Error loading worker jars, from path: " + workerLibPath);
} else {
File[] jarFiles = workerLib.listFiles(new FileFilter() {
@Override
public boolean accept(File file) {
return file.isFile() && file.getName().endsWith(".jar");
}
});
if(jarFiles.length == 0) {
throw new FileNotFoundException("Error loading worker jars, from path: " + workerLibPath + ". Jars size is zero");
}
}
}

public void setWorkerDirectory(String workerDirectory) {
this.workerDirectory = workerDirectory;
}

private final Map<String, ImmutablePair<String, JavaMethodExecutor>> methods;
private final ClassLoaderProvider classLoaderProvider;
private String workerDirectory;

private final InvocationChain.InvocationChainBuilder invocationChainBuilder;
private volatile boolean loadMiddleware = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ public interface JavaMethodExecutor {

ParameterResolver getOverloadResolver();

void execute(BindingDataStore dataStore) throws Exception;
void execute(ExecutionContextDataSource executionContextDataSource) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public JavaMethodExecutorImpl(FunctionMethodDescriptor descriptor, Map<String, B

public ParameterResolver getOverloadResolver() { return this.overloadResolver; }

public void execute(BindingDataStore dataStore) throws Exception {
public void execute(ExecutionContextDataSource executionContextDataSource) throws Exception {
BindingDataStore dataStore = executionContextDataSource.getDataStore();
Object retValue = this.overloadResolver.resolve(dataStore)
.orElseThrow(() -> new NoSuchMethodException("Cannot locate the method signature with the given input"))
.invoke(() -> this.containingClass.newInstance());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.microsoft.azure.functions.worker.chain;

import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.middleware.FunctionWorkerChain;
import com.microsoft.azure.functions.middleware.FunctionWorkerMiddleware;
import com.microsoft.azure.functions.worker.binding.ExecutionContextDataSource;
import com.microsoft.azure.functions.worker.broker.JavaMethodExecutor;
import org.apache.commons.lang3.exception.ExceptionUtils;

public class FunctionExecutionMiddleware implements FunctionWorkerMiddleware {

private final JavaMethodExecutor functionExecutor;

public FunctionExecutionMiddleware(JavaMethodExecutor functionExecutor) {
this.functionExecutor = functionExecutor;
}

@Override
public void invoke(ExecutionContext context, FunctionWorkerChain next) {
try {
this.functionExecutor.execute((ExecutionContextDataSource) context);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.microsoft.azure.functions.worker.chain;

import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.middleware.FunctionWorkerChain;
import com.microsoft.azure.functions.middleware.FunctionWorkerMiddleware;
import com.microsoft.azure.functions.worker.broker.JavaMethodExecutor;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class InvocationChain implements FunctionWorkerChain {

private final Iterator<FunctionWorkerMiddleware> middlewareIterator;

public InvocationChain(Iterator<FunctionWorkerMiddleware> middlewareIterator) {
this.middlewareIterator = middlewareIterator;
}

@Override
public void doNext(ExecutionContext context) {
while (middlewareIterator.hasNext()) {
middlewareIterator.next().invoke(context, this);
}
}


public static class InvocationChainBuilder {

private final List<FunctionWorkerMiddleware> middlewareCollections;

public InvocationChainBuilder() {
middlewareCollections = new ArrayList<>();
}

public void use(FunctionWorkerMiddleware functionWorkerMiddleware){
this.middlewareCollections.add(functionWorkerMiddleware);
}

public FunctionWorkerChain build(JavaMethodExecutor executor){
List<FunctionWorkerMiddleware> list = new ArrayList<>(middlewareCollections);
list.add(new FunctionExecutionMiddleware(executor));
return new InvocationChain(list.iterator());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ClassLoader createClassLoader() {
urlsList.addAll(workerUrls);
URL[] urlsForClassLoader = urlsList.toArray(new URL[0]);
URLClassLoader classLoader = new URLClassLoader(urlsForClassLoader);
Thread.currentThread().setContextClassLoader(classLoader);
// Thread.currentThread().setContextClassLoader(classLoader);
return classLoader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
import com.microsoft.azure.functions.rpc.messages.InvocationRequest;
import com.microsoft.azure.functions.rpc.messages.ParameterBinding;
import com.microsoft.azure.functions.rpc.messages.TypedData;
import com.microsoft.azure.functions.worker.chain.InvocationChain;
import com.microsoft.azure.functions.worker.reflect.DefaultClassLoaderProvider;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.*;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -57,7 +56,7 @@ public void getTriggerMetadataMap_success() throws Exception {
triggerMetadata.put("sys", sys);
when(request.getTriggerMetadataMap()).thenReturn(Collections.unmodifiableMap(triggerMetadata));

JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider());
JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider(), new InvocationChain.InvocationChainBuilder());
Map<String, TypedData> actualTriggerMetadata = broker.getTriggerMetadataMap(request);
TypedData actual = actualTriggerMetadata.get("$request");
assertEquals(actual.getString(), expectedData);
Expand Down Expand Up @@ -97,23 +96,9 @@ public void getTriggerMetadataMap_ignored() throws Exception {
when(request.getTriggerMetadataMap()).thenReturn(Collections.unmodifiableMap(triggerMetadata));

int expectedCount = request.getTriggerMetadataMap().size();
JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider());
JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider(), new InvocationChain.InvocationChainBuilder());
Map<String, TypedData> actualTriggerMetadata = broker.getTriggerMetadataMap(request);
// In case of non-http request, it will not modify the triggerMetadata
assertEquals(expectedCount, actualTriggerMetadata.size());
}

@Test(expected = FileNotFoundException.class)
public void checkLibFolderNoWorkerLib() throws Exception {
JavaFunctionBroker broker = new JavaFunctionBroker(null);
broker.verifyLibrariesExist (new File(""), null);
}

@Test(expected = FileNotFoundException.class)
public void checkLibFolderNoJarsInLib() throws Exception {
JavaFunctionBroker broker = new JavaFunctionBroker(null);
String path = "../";
File file = new File(path);
broker.verifyLibrariesExist (file, path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.HashMap;
import java.util.Map;

import com.microsoft.azure.functions.worker.chain.InvocationChain;
import org.junit.Test;

import com.microsoft.azure.functions.worker.broker.JavaFunctionBroker;
Expand All @@ -18,7 +19,7 @@ public class FunctionEnvironmentReloadRequestHandlerTests {
@Test
public void SetEnv_Succeeds() throws Exception {
DefaultClassLoaderProvider classLoader = new DefaultClassLoaderProvider();
JavaFunctionBroker broker = new JavaFunctionBroker(classLoader);
JavaFunctionBroker broker = new JavaFunctionBroker(classLoader, new InvocationChain.InvocationChainBuilder());
FunctionEnvironmentReloadRequestHandler envHandler = new FunctionEnvironmentReloadRequestHandler(broker);

String testSetting = System.getenv("testSetting");
Expand All @@ -36,7 +37,7 @@ public void SetEnv_Succeeds() throws Exception {
@Test
public void SetEnv_Null_Succeeds() throws Exception {
DefaultClassLoaderProvider classLoader = new DefaultClassLoaderProvider();
JavaFunctionBroker broker = new JavaFunctionBroker(classLoader);
JavaFunctionBroker broker = new JavaFunctionBroker(classLoader, new InvocationChain.InvocationChainBuilder());
FunctionEnvironmentReloadRequestHandler envHandler = new FunctionEnvironmentReloadRequestHandler(broker);

Map<String, String> newEnvVariables = null;
Expand Down