Skip to content

Commit 2d2e78a

Browse files
committed
middleware support logics
1 parent c24a2a4 commit 2d2e78a

10 files changed

+211
-34
lines changed

src/main/java/com/microsoft/azure/functions/worker/binding/ExecutionContextDataSource.java

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,20 @@
33
import com.microsoft.azure.functions.ExecutionContext;
44
import com.microsoft.azure.functions.RetryContext;
55
import com.microsoft.azure.functions.TraceContext;
6+
import com.microsoft.azure.functions.internal.MiddlewareContext;
7+
import com.microsoft.azure.functions.rpc.messages.ParameterBinding;
8+
import com.microsoft.azure.functions.rpc.messages.TypedData;
69
import com.microsoft.azure.functions.worker.WorkerLogManager;
710
import com.microsoft.azure.functions.worker.broker.MethodBindInfo;
11+
import com.microsoft.azure.functions.worker.broker.ParamBindInfo;
12+
import java.lang.reflect.Parameter;
13+
import java.nio.charset.StandardCharsets;
14+
import java.util.HashMap;
15+
import java.util.List;
16+
import java.util.Map;
817
import java.util.logging.Logger;
918

10-
public final class ExecutionContextDataSource extends DataSource<ExecutionContext> implements ExecutionContext {
11-
19+
public final class ExecutionContextDataSource extends DataSource<ExecutionContext> implements MiddlewareContext {
1220
private final String invocationId;
1321
private final TraceContext traceContext;
1422
private final RetryContext retryContext;
@@ -17,6 +25,11 @@ public final class ExecutionContextDataSource extends DataSource<ExecutionContex
1725
private final BindingDataStore dataStore;
1826
private final MethodBindInfo methodBindInfo;
1927
private final Class<?> containingClass;
28+
private final Map<String, Parameter> parameterMap = new HashMap<>();
29+
private final Map<String, String> parameterPayloadMap = new HashMap<>();
30+
private final Map<String, Object> middlewareInputMap = new HashMap<>();
31+
private Object returnValue;
32+
private Object middlewareOutput;
2033

2134
private static final DataOperations<ExecutionContext, Object> EXECONTEXT_DATA_OPERATIONS = new DataOperations<>();
2235
static {
@@ -35,6 +48,7 @@ public ExecutionContextDataSource(String invocationId, TraceContext traceContext
3548
this.dataStore = dataStore;
3649
this.methodBindInfo = methodBindInfo;
3750
this.containingClass = containingClass;
51+
addParameters(methodBindInfo, this.parameterMap);
3852
this.setValue(this);
3953
}
4054

@@ -64,4 +78,72 @@ public MethodBindInfo getMethodBindInfo() {
6478
public Class<?> getContainingClass() {
6579
return containingClass;
6680
}
81+
82+
private static void addParameters(MethodBindInfo methodBindInfo, Map<String, Parameter> parameterMap){
83+
for (ParamBindInfo paramBindInfo : methodBindInfo.getParams()) {
84+
parameterMap.put(paramBindInfo.getName(), paramBindInfo.getParameter());
85+
}
86+
}
87+
88+
@Override
89+
public Map<String, Parameter> getParameterMap() {
90+
return this.parameterMap;
91+
}
92+
93+
public void buildParameterPayloadMap(List<ParameterBinding> inputDataList){
94+
for (ParameterBinding parameterBinding : inputDataList) {
95+
String serializedPayload = convertToString(parameterBinding.getData());
96+
this.parameterPayloadMap.put(parameterBinding.getName(), serializedPayload);
97+
}
98+
}
99+
100+
// TODO: Refactor the code in V5 to make resolve arguments logs before middleware invocation
101+
private String convertToString(TypedData data) {
102+
switch (data.getDataCase()) {
103+
case INT: return String.valueOf(data.getInt());
104+
case DOUBLE: return String.valueOf(data.getDouble());
105+
case STRING: return data.getString();
106+
case BYTES: return data.getBytes().toString(StandardCharsets.UTF_8);
107+
case JSON: return data.getJson();
108+
case HTTP: return data.getHttp().toString();
109+
case COLLECTION_STRING: data.getCollectionString().toString();
110+
case COLLECTION_DOUBLE: data.getCollectionDouble().toString();
111+
case COLLECTION_BYTES: data.getCollectionBytes().toString();
112+
case COLLECTION_SINT64: data.getCollectionSint64().toString();
113+
default: return null;
114+
}
115+
}
116+
117+
@Override
118+
public Object getParameterPayloadByName(String name) {
119+
return this.parameterPayloadMap.get(name);
120+
}
121+
122+
@Override
123+
public void updateParameterPayloadByName(String key, Object value) {
124+
this.middlewareInputMap.put(key, value);
125+
}
126+
127+
public Object getMiddlewareInputByName(String name){
128+
return this.middlewareInputMap.get(name);
129+
}
130+
131+
public void setReturnValue(Object retValue) {
132+
this.returnValue = retValue;
133+
}
134+
135+
@Override
136+
public Object getReturnValue() {
137+
return this.returnValue;
138+
}
139+
140+
@Override
141+
public void setMiddlewareOutput(Object value) {
142+
this.middlewareOutput = value;
143+
}
144+
145+
public void updateOutputValue(){
146+
if (this.middlewareOutput == null) return;
147+
this.dataStore.setDataTargetValue(BindingDataStore.RETURN_NAME, this.middlewareOutput);
148+
}
67149
}

src/main/java/com/microsoft/azure/functions/worker/broker/EnhancedJavaMethodExecutorImpl.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,6 @@
11
package com.microsoft.azure.functions.worker.broker;
22

3-
import java.lang.invoke.WrongMethodTypeException;
4-
import java.lang.reflect.*;
5-
import java.util.*;
6-
7-
import com.microsoft.azure.functions.OutputBinding;
83
import com.microsoft.azure.functions.worker.binding.*;
9-
import org.apache.commons.lang3.exception.ExceptionUtils;
10-
import org.apache.commons.lang3.reflect.TypeUtils;
114

125
/**
136
* Used to executor of arbitrary Java method in any JAR using reflection.
@@ -28,6 +21,7 @@ public void execute(ExecutionContextDataSource executionContextDataSource) throw
2821
.orElseThrow(() -> new NoSuchMethodException("Cannot locate the method signature with the given input"))
2922
.invoke(() -> executionContextDataSource.getContainingClass().newInstance());
3023
executionContextDataSource.getDataStore().setDataTargetValue(BindingDataStore.RETURN_NAME, retValue);
24+
executionContextDataSource.setReturnValue(retValue);
3125
} finally {
3226
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
3327
}

src/main/java/com/microsoft/azure/functions/worker/broker/JavaFunctionBroker.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,18 @@
66
import java.net.URL;
77
import java.util.*;
88
import java.util.concurrent.ConcurrentHashMap;
9-
9+
import com.microsoft.azure.functions.middleware.FunctionWorkerMiddleware;
1010
import com.microsoft.azure.functions.rpc.messages.*;
1111
import com.microsoft.azure.functions.worker.Constants;
12+
import com.microsoft.azure.functions.worker.WorkerLogManager;
1213
import com.microsoft.azure.functions.worker.binding.BindingDataStore;
1314
import com.microsoft.azure.functions.worker.binding.ExecutionContextDataSource;
1415
import com.microsoft.azure.functions.worker.binding.ExecutionRetryContext;
1516
import com.microsoft.azure.functions.worker.binding.ExecutionTraceContext;
17+
import com.microsoft.azure.functions.worker.chain.FunctionExecutionMiddleware;
18+
import com.microsoft.azure.functions.worker.chain.InvocationChainFactory;
1619
import com.microsoft.azure.functions.worker.description.FunctionMethodDescriptor;
1720
import com.microsoft.azure.functions.worker.reflect.ClassLoaderProvider;
18-
1921
import org.apache.commons.lang3.exception.ExceptionUtils;
2022
import org.apache.commons.lang3.tuple.ImmutablePair;
2123

@@ -24,6 +26,13 @@
2426
* reflection, and invoke them at runtime. Thread-Safety: Multiple thread.
2527
*/
2628
public class JavaFunctionBroker {
29+
30+
//TODO: build dedicate ImmutablePair class with meaningful fields.
31+
private final Map<String, ImmutablePair<String, FunctionDefinition>> methods;
32+
private final ClassLoaderProvider classLoaderProvider;
33+
private String workerDirectory;
34+
private volatile boolean loadMiddleware = true;
35+
private volatile InvocationChainFactory invocationChainFactory;
2736
public JavaFunctionBroker(ClassLoaderProvider classLoaderProvider) {
2837
this.methods = new ConcurrentHashMap<>();
2938
this.classLoaderProvider = classLoaderProvider;
@@ -33,15 +42,47 @@ public void loadMethod(FunctionMethodDescriptor descriptor, Map<String, BindingI
3342
throws ClassNotFoundException, NoSuchMethodException, IOException {
3443
descriptor.validate();
3544
addSearchPathsToClassLoader(descriptor);
45+
loadMiddleware();
3646
FunctionDefinition functionDefinition = new FunctionDefinition(descriptor, bindings, classLoaderProvider);
3747
this.methods.put(descriptor.getId(), new ImmutablePair<>(descriptor.getName(), functionDefinition));
3848
}
3949

50+
private void loadMiddleware() {
51+
if (loadMiddleware) {
52+
synchronized (JavaFunctionBroker.class){
53+
if (loadMiddleware) {
54+
ArrayList<FunctionWorkerMiddleware> middlewares = new ArrayList<>();
55+
try {
56+
//ServiceLoader will use thread context classloader to verify loaded class
57+
Thread.currentThread().setContextClassLoader(classLoaderProvider.createClassLoader());
58+
ServiceLoader<FunctionWorkerMiddleware> middlewareServiceLoader = ServiceLoader.load(FunctionWorkerMiddleware.class);
59+
for (FunctionWorkerMiddleware middleware : middlewareServiceLoader) {
60+
middlewares.add(middleware);
61+
WorkerLogManager.getSystemLogger().info("Load middleware " + middleware.getClass().getSimpleName());
62+
}
63+
} finally {
64+
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
65+
}
66+
loadFunctionExecutionMiddleWare(middlewares);
67+
loadMiddleware = false;
68+
}
69+
}
70+
}
71+
}
72+
73+
private void loadFunctionExecutionMiddleWare(ArrayList<FunctionWorkerMiddleware> middlewares) {
74+
FunctionExecutionMiddleware functionExecutionMiddleware = new FunctionExecutionMiddleware(
75+
JavaMethodExecutors.createJavaMethodExecutor(this.classLoaderProvider.createClassLoader()));
76+
middlewares.add(functionExecutionMiddleware);
77+
WorkerLogManager.getSystemLogger().info("Load last middleware: FunctionExecutionMiddleware");
78+
this.invocationChainFactory = new InvocationChainFactory(middlewares);
79+
}
80+
4081
public Optional<TypedData> invokeMethod(String id, InvocationRequest request, List<ParameterBinding> outputs)
4182
throws Exception {
4283
ExecutionContextDataSource executionContextDataSource = buildExecutionContext(id, request);
43-
JavaMethodExecutor executor = JavaMethodExecutors.createJavaMethodExecutor(this.classLoaderProvider.createClassLoader());
44-
executor.execute(executionContextDataSource);
84+
this.invocationChainFactory.create().doNext(executionContextDataSource);
85+
executionContextDataSource.updateOutputValue();
4586
outputs.addAll(executionContextDataSource.getDataStore().getOutputParameterBindings(true));
4687
return executionContextDataSource.getDataStore().getDataTargetTypedValue(BindingDataStore.RETURN_NAME);
4788
}
@@ -64,6 +105,7 @@ private ExecutionContextDataSource buildExecutionContext(String id, InvocationR
64105
ExecutionContextDataSource executionContextDataSource = new ExecutionContextDataSource(request.getInvocationId(),
65106
traceContext, retryContext, methodEntry.left, dataStore, functionDefinition.getCandidate(),
66107
functionDefinition.getContainingClass());
108+
executionContextDataSource.buildParameterPayloadMap(request.getInputDataList());
67109
dataStore.addExecutionContextSource(executionContextDataSource);
68110
return executionContextDataSource;
69111
}
@@ -156,9 +198,4 @@ private boolean isTesting(){
156198
public void setWorkerDirectory(String workerDirectory) {
157199
this.workerDirectory = workerDirectory;
158200
}
159-
160-
//TODO: build dedicate ImmutablePair class with meaningful fields.
161-
private final Map<String, ImmutablePair<String, FunctionDefinition>> methods;
162-
private final ClassLoaderProvider classLoaderProvider;
163-
private String workerDirectory;
164201
}

src/main/java/com/microsoft/azure/functions/worker/broker/JavaMethodExecutorImpl.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,6 @@
11
package com.microsoft.azure.functions.worker.broker;
22

3-
import java.lang.invoke.WrongMethodTypeException;
4-
import java.lang.reflect.*;
5-
import java.net.*;
6-
import java.util.*;
7-
8-
import com.microsoft.azure.functions.OutputBinding;
93
import com.microsoft.azure.functions.worker.binding.*;
10-
import com.microsoft.azure.functions.worker.description.*;
11-
import com.microsoft.azure.functions.worker.reflect.*;
12-
import com.microsoft.azure.functions.rpc.messages.*;
13-
import org.apache.commons.lang3.exception.ExceptionUtils;
14-
import org.apache.commons.lang3.reflect.TypeUtils;
154

165
/**
176
* Used to executor of arbitrary Java method in any JAR using reflection.
@@ -32,5 +21,6 @@ public void execute(ExecutionContextDataSource executionContextDataSource) throw
3221
.orElseThrow(() -> new NoSuchMethodException("Cannot locate the method signature with the given input"))
3322
.invoke(() -> executionContextDataSource.getContainingClass().newInstance());
3423
executionContextDataSource.getDataStore().setDataTargetValue(BindingDataStore.RETURN_NAME, retValue);
24+
executionContextDataSource.setReturnValue(retValue);
3525
}
3626
}

src/main/java/com/microsoft/azure/functions/worker/broker/JavaMethodExecutors.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,8 @@
33
import com.microsoft.azure.functions.worker.WorkerLogManager;
44
import org.apache.commons.lang3.SystemUtils;
55

6-
import java.net.MalformedURLException;
7-
86
public class JavaMethodExecutors {
9-
public static JavaMethodExecutor createJavaMethodExecutor(ClassLoader classLoader)
10-
throws MalformedURLException, ClassNotFoundException, NoSuchMethodException {
7+
public static JavaMethodExecutor createJavaMethodExecutor(ClassLoader classLoader) {
118
if(SystemUtils.IS_JAVA_1_8) {
129
WorkerLogManager.getSystemLogger().info("Loading JavaMethodExecutorImpl");
1310
return JavaMethodExecutorImpl.getExecutorInstance();

src/main/java/com/microsoft/azure/functions/worker/broker/ParamBindInfo.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@ public final class ParamBindInfo {
99
private final Type type;
1010
private final String bindingNameAnnotation;
1111
private final boolean isImplicitOutput;
12+
private final Parameter parameter;
1213
ParamBindInfo(Parameter param) {
1314
this.name = CoreTypeResolver.getAnnotationName(param);
1415
this.type = param.getParameterizedType();
1516
this.bindingNameAnnotation = CoreTypeResolver.getBindingNameAnnotation(param);
1617
this.isImplicitOutput = CoreTypeResolver.checkImplicitOutput(param);
18+
this.parameter = param;
1719
}
1820

1921
public boolean isImplicitOutput() {
@@ -31,4 +33,8 @@ public Type getType() {
3133
public String getBindingNameAnnotation() {
3234
return bindingNameAnnotation;
3335
}
36+
37+
public Parameter getParameter() {
38+
return parameter;
39+
}
3440
}

src/main/java/com/microsoft/azure/functions/worker/broker/ParameterResolver.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ private static InvokeInfoBuilder resolve(ExecutionContextDataSource executionCon
4141
argument = dataStore.getOrAddDataTarget(invokeInfo.getOutputsId(), paramName, paramType, false);
4242
}
4343
else if (paramName != null && !paramName.isEmpty()) {
44-
argument = dataStore.getDataByName(paramName, paramType);
44+
argument = buildMiddlewareInput(executionContextDataSource.getMiddlewareInputByName(paramName));
45+
if (!argument.isPresent()) {
46+
argument = dataStore.getDataByName(paramName, paramType);
47+
}
4548
}
4649
else if (paramName == null && !paramBindingNameAnnotation.isEmpty()) {
4750
argument = dataStore.getTriggerMetatDataByName(paramBindingNameAnnotation, paramType);
@@ -62,6 +65,11 @@ else if (paramName == null && !paramBindingNameAnnotation.isEmpty()) {
6265
}
6366
}
6467

68+
private static Optional<BindingData> buildMiddlewareInput(Object input) {
69+
if (input == null) return Optional.empty();
70+
return Optional.of(new BindingData(input));
71+
}
72+
6573
public static final class InvokeInfoBuilder extends JavaMethodInvokeInfo.Builder {
6674
public InvokeInfoBuilder(MethodBindInfo method) { super.setMethod(method.getMethod()); }
6775
private final UUID outputsId = UUID.randomUUID();
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.microsoft.azure.functions.worker.chain;
2+
3+
import com.microsoft.azure.functions.internal.MiddlewareContext;
4+
import com.microsoft.azure.functions.middleware.FunctionWorkerChain;
5+
import com.microsoft.azure.functions.middleware.FunctionWorkerMiddleware;
6+
import com.microsoft.azure.functions.worker.binding.ExecutionContextDataSource;
7+
import com.microsoft.azure.functions.worker.broker.JavaMethodExecutor;
8+
9+
public class FunctionExecutionMiddleware implements FunctionWorkerMiddleware {
10+
11+
private final JavaMethodExecutor functionMethodExecutor;
12+
13+
public FunctionExecutionMiddleware(JavaMethodExecutor functionExecutionMiddleware) {
14+
this.functionMethodExecutor = functionExecutionMiddleware;
15+
}
16+
17+
@Override
18+
public void invoke(MiddlewareContext context, FunctionWorkerChain next) throws Exception{
19+
this.functionMethodExecutor.execute((ExecutionContextDataSource) context);
20+
}
21+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.microsoft.azure.functions.worker.chain;
2+
3+
import com.microsoft.azure.functions.internal.MiddlewareContext;
4+
import com.microsoft.azure.functions.middleware.FunctionWorkerChain;
5+
import com.microsoft.azure.functions.middleware.FunctionWorkerMiddleware;
6+
7+
import java.util.Iterator;
8+
import java.util.List;
9+
10+
public class InvocationChain implements FunctionWorkerChain {
11+
private final Iterator<FunctionWorkerMiddleware> middlewareIterator;
12+
13+
public InvocationChain(List<FunctionWorkerMiddleware> middlewares){
14+
this.middlewareIterator = middlewares.iterator();
15+
}
16+
17+
@Override
18+
public void doNext(MiddlewareContext context) throws Exception {
19+
while (middlewareIterator.hasNext()) {
20+
middlewareIterator.next().invoke(context, this);
21+
}
22+
}
23+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.microsoft.azure.functions.worker.chain;
2+
3+
import com.microsoft.azure.functions.middleware.FunctionWorkerChain;
4+
import com.microsoft.azure.functions.middleware.FunctionWorkerMiddleware;
5+
6+
import java.util.List;
7+
8+
public class InvocationChainFactory {
9+
10+
private final List<FunctionWorkerMiddleware> middlewares;
11+
12+
public InvocationChainFactory(List<FunctionWorkerMiddleware> middlewares) {
13+
this.middlewares = middlewares;
14+
}
15+
16+
public FunctionWorkerChain create(){
17+
return new InvocationChain(middlewares);
18+
}
19+
}

0 commit comments

Comments
 (0)