Skip to content

Commit

Permalink
Fix armeria latest dep tests (#3407)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Jun 25, 2021
1 parent 220ea41 commit ce36486
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -30,15 +31,32 @@ public void transform(TypeTransformer transformer) {
takesArgument(0, named("com.linecorp.armeria.common.stream.AbstractStreamMessage")))
.and(takesArgument(1, named("org.reactivestreams.Subscriber"))),
AbstractStreamMessageSubscriptionInstrumentation.class.getName() + "$WrapSubscriberAdvice");
// since 1.9.0
transformer.applyAdviceToMethod(
isConstructor()
.and(
takesArgument(0, named("com.linecorp.armeria.common.stream.AbstractStreamMessage")))
.and(takesArgument(4, named("java.util.concurrent.CompletableFuture"))),
AbstractStreamMessageSubscriptionInstrumentation.class.getName()
+ "$WrapCompletableFutureAdvice");
}

@SuppressWarnings("unused")
public static class WrapSubscriberAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void attachContext(
@Advice.Argument(value = 1, readOnly = false) Subscriber subscriber) {
public static void wrapSubscriber(
@Advice.Argument(value = 1, readOnly = false) Subscriber<?> subscriber) {
subscriber = SubscriberWrapper.wrap(subscriber);
}
}

public static class WrapCompletableFutureAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapCompletableFuture(
@Advice.Argument(value = 4, readOnly = false) CompletableFuture<?> future) {
future = CompletableFutureWrapper.wrap(future);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.armeria.v1_3;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.concurrent.CompletableFuture;

public final class CompletableFutureWrapper {

private CompletableFutureWrapper() {}

public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
if (future == null) {
return null;
}
Context context = Context.current();
if (context != Context.root()) {
return wrap(future, context);
}
return future;
}

private static <T> CompletableFuture<T> wrap(CompletableFuture<T> future, Context context) {
CompletableFuture<T> result = new CompletableFuture<>();
result.whenComplete(
(T value, Throwable throwable) -> {
try (Scope ignored = context.makeCurrent()) {
if (throwable != null) {
future.completeExceptionally(throwable);
} else {
future.complete(value);
}
}
});

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,49 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class SubscriberWrapper implements Subscriber<Object> {
private final Subscriber<Object> delegate;
public class SubscriberWrapper<T> implements Subscriber<T> {
private static final Class<?> abortingSubscriberClass = getAbortingSubscriberClass();
private static final Class<?> noopSubscriberClass = getNoopSubscriberClass();

private final Subscriber<T> delegate;
private final Context context;

private SubscriberWrapper(Subscriber<Object> delegate, Context context) {
private static Class<?> getAbortingSubscriberClass() {
// AbortingSubscriber is package private
try {
return Class.forName("com.linecorp.armeria.common.stream.AbortingSubscriber");
} catch (ClassNotFoundException exception) {
return null;
}
}

private static Class<?> getNoopSubscriberClass() {
// NoopSubscriber is package private
try {
return Class.forName("com.linecorp.armeria.common.stream.NoopSubscriber");
} catch (ClassNotFoundException exception) {
return null;
}
}

private SubscriberWrapper(Subscriber<T> delegate, Context context) {
this.delegate = delegate;
this.context = context;
}

public static Subscriber<Object> wrap(Subscriber<Object> delegate) {
private static <T> boolean isIgnored(Subscriber<T> delegate) {
return (abortingSubscriberClass != null && abortingSubscriberClass.isInstance(delegate))
|| (noopSubscriberClass != null && noopSubscriberClass.isInstance(delegate));
}

public static <T> Subscriber<T> wrap(Subscriber<T> delegate) {
if (isIgnored(delegate)) {
return delegate;
}

Context context = Context.current();
if (context != Context.root()) {
return new SubscriberWrapper(delegate, context);
return new SubscriberWrapper<>(delegate, context);
}
return delegate;
}
Expand All @@ -35,7 +65,7 @@ public void onSubscribe(Subscription subscription) {
}

@Override
public void onNext(Object o) {
public void onNext(T o) {
try (Scope ignored = context.makeCurrent()) {
delegate.onNext(o);
}
Expand Down

0 comments on commit ce36486

Please sign in to comment.