Skip to content

Commit

Permalink
New API for context propagation (#7498)
Browse files Browse the repository at this point in the history
Initial version of the propagation API.

Main points:
- The propagated context is always immutable and consist of different propagated elements similar to Kotlin Coroutines context
- Propagated element can implement `ThreadPropagatedContextElement` to setup/restore thread locals
- The propagated state is never captured but the context needs to be extended and pushed down the downstream
- The capturing API is not needed anymore, this should eliminate a lot of overhead from the reactive code
- The context is automatically propagated to the Kotlin coroutines
- In the Reactor context should be propagated as whole with utility method to extract/extend it (The Reactor context needs to be modified manually)
- The implementation is using try-resources to propagate the contest without extra overhead, this might change in the future to support scoped local, but at this moment, I think this is the best solution.

I want to merge this first PR to have the orther work mergable.

Next PRs will remove the existing instrumentation propagation, add docs etc.

There are already examples how it should be used:
- Tracing micronaut-projects/micronaut-tracing#281
- Micronaut Data micronaut-projects/micronaut-data#2189 (This is a big PR because of the changes in the TX management and removal of forked Spring TX code)
  • Loading branch information
dstepanov authored May 17, 2023
1 parent 35c9dea commit 129209a
Show file tree
Hide file tree
Showing 63 changed files with 3,623 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.micronaut.aop.util.KotlinInterceptedMethodHelper;
import io.micronaut.core.annotation.Experimental;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.propagation.KotlinCoroutinePropagation;
import io.micronaut.core.propagation.PropagatedContext;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.KotlinUtils;
import kotlin.coroutines.Continuation;
Expand Down Expand Up @@ -63,7 +65,7 @@ private KotlinInterceptedMethodImpl(MethodInvocationContext<?, ?> context,
* Checks if the method invocation is a Kotlin coroutine.
*
* @param context {@link MethodInvocationContext}
* @return true if Kotlin coroutine
* @return new intercepted method if Kotlin coroutine or null if it's not
*/
public static KotlinInterceptedMethodImpl of(MethodInvocationContext<?, ?> context) {
if (!KotlinUtils.KOTLIN_COROUTINES_SUPPORTED || !context.getExecutableMethod().isSuspend()) {
Expand Down Expand Up @@ -99,6 +101,14 @@ public Argument<?> returnTypeValue() {

@Override
public CompletableFuture<Object> interceptResultAsCompletionStage() {
if (PropagatedContext.exists()) {
updateCoroutineContext(
KotlinCoroutinePropagation.Companion.updatePropagatedContext(
getCoroutineContext(),
PropagatedContext.get()
)
);
}
@SuppressWarnings("unchecked")
CompletableFutureContinuation completableFutureContinuation = new CompletableFutureContinuation((Continuation<Object>) continuation);
replaceContinuation.accept(completableFutureContinuation);
Expand All @@ -112,6 +122,14 @@ public CompletableFuture<Object> interceptResultAsCompletionStage() {

@Override
public CompletableFuture<Object> interceptResultAsCompletionStage(Interceptor<?, ?> from) {
if (PropagatedContext.exists()) {
updateCoroutineContext(
KotlinCoroutinePropagation.Companion.updatePropagatedContext(
getCoroutineContext(),
PropagatedContext.get()
)
);
}
@SuppressWarnings("unchecked")
CompletableFutureContinuation completableFutureContinuation = new CompletableFutureContinuation((Continuation<Object>) continuation);
replaceContinuation.accept(completableFutureContinuation);
Expand Down Expand Up @@ -141,7 +159,14 @@ public Object handleResult(Object result) {
} else {
throw new IllegalStateException("Cannot convert " + result + " to 'java.util.concurrent.CompletionStage'");
}
//noinspection unchecked
if (PropagatedContext.exists()) {
updateCoroutineContext(
KotlinCoroutinePropagation.Companion.updatePropagatedContext(
getCoroutineContext(),
PropagatedContext.get()
)
);
}
return KotlinInterceptedMethodHelper.handleResult(completionStageResult, isUnitValueType, (Continuation<? super Object>) continuation);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 original authors
* Copyright 2017-2022 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,10 +13,5 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Support classes for binding the {@link io.micronaut.http.context.ServerRequestContext}.
*
* @author graemerocher
* @since 1.0
*/
package io.micronaut.http.server.context;
package io.micronaut.aop.util

85 changes: 85 additions & 0 deletions context-propagation/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
plugins {
id "io.micronaut.build.internal.convention-library"
id "org.jetbrains.kotlin.jvm"
id "org.jetbrains.kotlin.kapt"
}

micronautBuild {
binaryCompatibility {
enabled.set(false)
}
}

dependencies {
annotationProcessor project(":inject-java")
annotationProcessor project(":graal")

api project(':context')
api project(':inject')
api project(':aop')

// implementation 'io.micrometer:context-propagation:1.0.2'

compileOnly project(':core-reactive')
compileOnly libs.managed.reactor

compileOnly platform(libs.test.boms.micronaut.rxjava2)
compileOnly platform(libs.test.boms.micronaut.rxjava3)
compileOnly platform(libs.test.boms.micronaut.reactor)

testImplementation platform(libs.test.boms.micronaut.rxjava2)
testImplementation platform(libs.test.boms.micronaut.rxjava3)
testImplementation platform(libs.test.boms.micronaut.reactor)

compileOnly ("io.micronaut.rxjava2:micronaut-rxjava2-http-client") {
exclude group: "io.micronaut"
}
compileOnly ("io.micronaut.rxjava3:micronaut-rxjava3-http-client") {
exclude group: "io.micronaut"
}

testImplementation project(":inject-groovy")
testAnnotationProcessor project(":inject-java")

testImplementation project(":runtime")
testImplementation project(":jackson-databind")
testImplementation project(":core-reactive")
testImplementation project(":inject-java-test")
testImplementation project(":http-client")
testImplementation project(':http-server-netty')

testImplementation ("io.micronaut.rxjava2:micronaut-rxjava2-http-client") {
exclude group: "io.micronaut"
}
testImplementation ("io.micronaut.rxjava3:micronaut-rxjava3-http-client") {
exclude group: "io.micronaut"
}
testImplementation ("io.micronaut.reactor:micronaut-reactor-http-client") {
exclude group: "io.micronaut"
}
}

// Kotlin
dependencies {
kapt project(':inject-java')
kaptTest project(':inject-java')

compileOnly libs.managed.kotlin.stdlib.jdk8
compileOnly libs.managed.kotlinx.coroutines.core

testImplementation libs.managed.kotlin.stdlib.jdk8
testImplementation libs.managed.kotlinx.coroutines.core
testImplementation libs.managed.kotlinx.coroutines.reactor
}

tasks.named("compileKotlin") {
kotlinOptions {
jvmTarget = '17'
}
}

tasks.named("compileTestKotlin") {
kotlinOptions {
jvmTarget = '17'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2017-2022 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.context.propagation.instrument.execution;

import io.micronaut.context.annotation.Prototype;
import io.micronaut.context.event.BeanCreatedEvent;
import io.micronaut.context.event.BeanCreatedEventListener;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.propagation.PropagatedContext;
import io.micronaut.scheduling.instrument.InstrumentedExecutorService;
import io.micronaut.scheduling.instrument.InstrumentedScheduledExecutorService;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
* Wraps {@link ExecutorService} to instrument {@link Callable} and {@link Runnable} to be aware of {@link PropagatedContext}.
*
* @author Denis Stepanov
* @since 4.0.0
*/
@Prototype
@Internal
final class ExecutorServiceInstrumenter implements BeanCreatedEventListener<ExecutorService> {

/**
* Wraps {@link ExecutorService}.
*
* @param event The bean created event
* @return wrapped instance
*/
@Override
public ExecutorService onCreated(BeanCreatedEvent<ExecutorService> event) {
Class<ExecutorService> beanType = event.getBeanDefinition().getBeanType();
if (beanType == ExecutorService.class) {
ExecutorService executorService = event.getBean();
if (executorService instanceof ScheduledExecutorService) {
return new InstrumentedScheduledExecutorService() {
@Override
public ScheduledExecutorService getTarget() {
return (ScheduledExecutorService) executorService;
}

@Override
public <T> Callable<T> instrument(Callable<T> task) {
return PropagatedContext.wrapCurrent(task);
}

@Override
public Runnable instrument(Runnable command) {
return PropagatedContext.wrapCurrent(command);
}
};
} else {
return new InstrumentedExecutorService() {
@Override
public ExecutorService getTarget() {
return executorService;
}

@Override
public <T> Callable<T> instrument(Callable<T> task) {
return PropagatedContext.wrapCurrent(task);
}

@Override
public Runnable instrument(Runnable command) {
return PropagatedContext.wrapCurrent(command);
}
};
}
} else {
return event.getBean();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2017-2022 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.context.propagation.instrument.reactor;

import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.propagation.PropagatedContext;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import reactor.core.scheduler.Schedulers;

/**
* On scheduler hook for the thread to be aware of {@link PropagatedContext}.
*
* @author Denis Stepanov
* @since 4.0.0
*/
@Requires(classes = Schedulers.class)
@Context
@Internal
final class ReactorInstrumentation {

private static final String KEY = "MICRONAUT_CONTEXT_PROPAGATION";

@PostConstruct
void init() {
Schedulers.onScheduleHook(KEY, PropagatedContext::wrapCurrent);
}

@PreDestroy
void removeInstrumentation() {
Schedulers.removeExecutorServiceDecorator(KEY);
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2017-2022 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.context.propagation.instrument.rxjava2;

import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.propagation.PropagatedContext;
import io.reactivex.functions.Function;
import io.reactivex.plugins.RxJavaPlugins;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;

/**
* On scheduler hook for the thread to be aware of {@link PropagatedContext}.
*
* @author Denis Stepanov
* @since 4.0.0
*/
@Requires(classes = RxJavaPlugins.class)
@Context
@Internal
final class RxJava2Instrumentation {

private Function<? super Runnable, ? extends Runnable> scheduleHandler;

@PostConstruct
void init() {
scheduleHandler = RxJavaPlugins.getScheduleHandler();
RxJavaPlugins.setScheduleHandler(runnable -> {
if (scheduleHandler != null) {
runnable = scheduleHandler.apply(runnable);
}
return PropagatedContext.wrapCurrent(runnable);
});
}

@PreDestroy
void removeInstrumentation() {
RxJavaPlugins.setScheduleHandler(scheduleHandler);
}

}

Loading

0 comments on commit 129209a

Please sign in to comment.