Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close opened Scope in the child thread #765

Open
sondemar opened this issue Jul 5, 2024 · 0 comments
Open

Close opened Scope in the child thread #765

sondemar opened this issue Jul 5, 2024 · 0 comments

Comments

@sondemar
Copy link

sondemar commented Jul 5, 2024

Hi, I would like to encapsulate a method returning a CompletableFuture step/task with the Observation API, ensuring full observability (the observation scope should enclose the method's scope).

The CompletableFuture in the method will contain asynchronous operations. This means the Observation needs to be propagated to child threads, and at the same time, the Observation needs to be stopped (with the scope closed) in the child thread without further propagation to the next steps in the chain (outside this method).
I need to have control over closing the opened scope because it may lead to thread pollution and memory leaks.

The only solution that comes to mind is to open the observation scope manually, make a snapshot of it, and then set thread locals in every CompletableFuture sub-step inside the method.

I can illustrate the scenario with the following example:

    @Test
    void testObservableAwareCompletableFuture() {
        CompletableFuture<Void> cf = CompletableFuture.completedFuture("hello ").thenApplyAsync(s -> {
                    log("dummy logging in a separate thread");
                    return s;
                }).thenCombine(observableExternalApi(), (s, s2) -> s + s2)
                .thenAccept(s -> log("result: " + s));
        cf.join();
        log("after completion the whole chain of steps");
    }

    private void log(String message) {
        if (registry.getCurrentObservation() == null) {
            message += " not";
        }
        message += " observable";
        log.info(message);
    }

    private CompletableFuture<String> observableExternalApi() {
        Observation observation = Observation.createNotStarted("test", registry).start();
        log("before opening scope");
        ContextSnapshot snapshot;
        try (Observation.Scope __ = observation.openScope()) {
            snapshot = ContextSnapshot.captureAll();
            log("after creating a snapshot");
        }

        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    //    threadPoolTaskExecutor.setTaskDecorator(snapshot::wrap);
        threadPoolTaskExecutor.afterPropertiesSet();


        CompletableFuture<String> cf = CompletableFuture.completedFuture("world").thenApply(s -> {
                    try (ContextSnapshot.Scope __ = snapshot.setThreadLocals()) {
                        log("before async send");
                        return s;
                    }
                }).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
                            try (ContextSnapshot.Scope __ = snapshot.setThreadLocals()) {
                                try {
                                    TimeUnit.SECONDS.sleep(1);
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                                log("after async send");
                                return s;
                            }
                        }
                        ,
                        threadPoolTaskExecutor))
                .whenComplete((s, throwable) -> {
                    try (ContextSnapshot.Scope __ = snapshot.setThreadLocals()) {
                        log("before stopping observation - ");
                    }
                    observation.stop();
                    log("after stopping observation - ");

                });
        return cf;
    }

Which produces the following expected output:

[ForkJoinPool.commonPool-worker-1] INFO  [] [] w.api.functional.ApplicationFlowTest - dummy logging in a separate thread not observable
[main] INFO  [] [] w.api.functional.ApplicationFlowTest - before opening scope not observable
[main] INFO  [e7e4306f9f0d9810] [e7e4306f9f0d9810] w.api.functional.ApplicationFlowTest - after creating a snapshot observable
[main] INFO  [e7e4306f9f0d9810] [e7e4306f9f0d9810] w.api.functional.ApplicationFlowTest - before async send observable
[ThreadPoolTaskExecutor-1] INFO  [e7e4306f9f0d9810] [e7e4306f9f0d9810] w.api.functional.ApplicationFlowTest - after async send observable
[ThreadPoolTaskExecutor-1] INFO  [e7e4306f9f0d9810] [e7e4306f9f0d9810] w.api.functional.ApplicationFlowTest - before stopping observation -  observable
[ThreadPoolTaskExecutor-1] INFO  [] [] w.api.functional.ApplicationFlowTest - after stopping observation -  not observable
[ThreadPoolTaskExecutor-1] INFO  [] [] w.api.functional.ApplicationFlowTest - result: hello world not observable
[main] INFO  [] [] w.api.functional.ApplicationFlowTest - after completion the whole chain of steps not observable

Please note, that I have not decorated the tasks of the executor with the snapshot, as reflected by the commented line:

   //    threadPoolTaskExecutor.setTaskDecorator(snapshot::wrap);

If I uncomment this line, I will get undesired behavior (the scope will be opened outside the invoked method and propagated):

[ForkJoinPool.commonPool-worker-1] INFO  [] [] w.api.functional.ApplicationFlowTest - dummy logging in a separate thread not observable
[main] INFO  [] [] w.api.functional.ApplicationFlowTest - before opening scope not observable
[main] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - after creating a snapshot observable
[main] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - before async send observable
[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - after async send observable
[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - before stopping observation -  observable
[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - after stopping observation -  observable
[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - result: hello world observable
[main] INFO  [] [] w.api.functional.ApplicationFlowTest - after completion the whole chain of steps not observable

These two invocations should not be observable:

[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - after stopping observation -  observable
[ThreadPoolTaskExecutor-1] INFO  [e89c41beb2a78006] [e89c41beb2a78006] w.api.functional.ApplicationFlowTest - result: hello world observable

but they are because the tasks of the executor are automatically instrumented with the snapshot.

Could you please provide guidance on how to achieve this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants