Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix redisson ClassCastException #6054

Merged
merged 1 commit into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ public final class CompletableFutureWrapper<T> extends CompletableFuture<T>
private volatile EndOperationListener<T> endOperationListener;

private CompletableFutureWrapper(CompletableFuture<T> delegate) {
Context context = Context.current();
this.whenComplete(
(result, error) -> {
EndOperationListener<T> endOperationListener = this.endOperationListener;
if (endOperationListener != null) {
endOperationListener.accept(result, error);
}
if (error != null) {
delegate.completeExceptionally(error);
} else {
delegate.complete(result);
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
delegate.completeExceptionally(error);
} else {
delegate.complete(result);
}
}
});
}
Expand All @@ -40,33 +43,6 @@ public static <T> CompletableFuture<T> wrap(CompletableFuture<T> delegate) {
return new CompletableFutureWrapper<>(delegate);
}

/**
* Wrap {@link CompletableFuture} to run callbacks with the context that was current at the time
* this method was called.
*
* <p>This method should be called on, or as close as possible to, the {@link CompletableFuture}
* that is returned to the user to ensure that the callbacks added by user are run in appropriate
* context.
*/
public static <T> CompletableFuture<T> wrapContext(CompletableFuture<T> future) {
Context context = Context.current();
// when input future is completed, complete result future with context that was current
// at the time when the future was wrapped
CompletableFuture<T> result = new CompletableFuture<>();
future.whenComplete(
(T value, Throwable throwable) -> {
try (Scope ignored = context.makeCurrent()) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(value);
}
}
});

return result;
}

@Override
public void setEndOperationListener(EndOperationListener<T> endOperationListener) {
this.endOperationListener = endOperationListener;
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ public RedissonInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new RedisConnectionInstrumentation(),
new RedisCommandDataInstrumentation(),
new RedisCommandAsyncServiceInstrumentation(),
new RedissonCompletableFutureWrapperInstrumentation());
return asList(new RedisConnectionInstrumentation(), new RedisCommandDataInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ public class RedissonPromiseWrapper<T> extends RedissonPromise<T> implements Pro
private volatile EndOperationListener<T> endOperationListener;

private RedissonPromiseWrapper(RPromise<T> delegate) {
Context context = Context.current();
this.whenComplete(
(result, error) -> {
EndOperationListener<T> endOperationListener = this.endOperationListener;
if (endOperationListener != null) {
endOperationListener.accept(result, error);
}
if (error != null) {
delegate.tryFailure(error);
} else {
delegate.trySuccess(result);
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
delegate.tryFailure(error);
} else {
delegate.trySuccess(result);
}
}
});
}
Expand All @@ -40,32 +43,6 @@ public static <T> RPromise<T> wrap(RPromise<T> delegate) {
return new RedissonPromiseWrapper<>(delegate);
}

/**
* Wrap {@link RPromise} to run callbacks with the context that was current at the time this
* method was called.
*
* <p>This method should be called on, or as close as possible to, the {@link RPromise} that is
* returned to the user to ensure that the callbacks added by user are run in appropriate context.
*/
public static <T> RPromise<T> wrapContext(RPromise<T> promise) {
Context context = Context.current();
// when returned promise is completed, complete input promise with context that was current
// at the time when the promise was wrapped
RPromise<T> result = new RedissonPromise<T>();
result.whenComplete(
(value, error) -> {
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
promise.tryFailure(error);
} else {
promise.trySuccess(value);
}
}
});

return result;
}

@Override
public void setEndOperationListener(EndOperationListener<T> endOperationListener) {
this.endOperationListener = endOperationListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import io.opentelemetry.api.trace.Span
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.Callable
import java.util.concurrent.CompletionStage
import org.redisson.Redisson
import org.redisson.api.RBucket
import org.redisson.api.RFuture
import org.redisson.api.RScheduledExecutorService
import org.redisson.api.RSet
import org.redisson.api.RedissonClient
import org.redisson.config.Config
Expand Down Expand Up @@ -129,5 +131,21 @@ class RedissonAsyncClientTest extends AgentInstrumentationSpecification {
}
}

// regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/6033
def "test schedule"() {
RScheduledExecutorService executorService = redisson.getExecutorService("EXECUTOR")
def taskId = executorService.schedule(new MyCallable(), 0, TimeUnit.SECONDS)
.getTaskId()
expect:
taskId != null
}

private static class MyCallable implements Callable, Serializable {

@Override
Object call() throws Exception {
return null
}
}
}