Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent parallel invocation instrumentation #3432

Merged
merged 1 commit into from
Jun 8, 2020

Conversation

dstepanov
Copy link
Contributor

@graemerocher @lgathy Using the old approach without active check needed, assuming only cross invocation can be between onSubscribe and onNext/Complete/Success/Error

@graemerocher
Copy link
Contributor

Seems ok to me. We probably need a test, not sure how to recreate #3427 in a test however 🤔

Copy link
Contributor

@lgathy lgathy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach seems more correct than using the active flag.

@@ -59,8 +59,7 @@ protected void subscribeActual(Observer<? super T> o) {
instrumenter.beforeInvocation();
source.subscribe(o);
} finally {
instrumenter.afterInvocation(false);
active = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dstepanov This does not seem right here. Did you want to remove the active field completely or did you just accidentally deleted line 63?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, that was a mistake

@dstepanov
Copy link
Contributor Author

@lgathy would you be able to correct master if it's merged?

@lgathy
Copy link
Contributor

lgathy commented Jun 8, 2020

@dstepanov Sure, I will try to post a PR by the end of the day.

@graemerocher
Copy link
Contributor

@dstepanov @lgathy btw I still see the NoSuchElementException issue manifest when trying this fix out with the example application posted at #3427

@dstepanov
Copy link
Contributor Author

@graemerocher Do you have a test case?

@graemerocher
Copy link
Contributor

Let me try add one to this PR

@graemerocher
Copy link
Contributor

Hmm I can't reproduce in the test, maybe have I have an out of date build. Here is the test:

 test-suite-kotlin/build.gradle                |  1 +
 .../instrument/CoroutineController.kt         | 54 +++++++++++++++++++
 .../MultipleInvocationInstrumenterSpec.kt     | 28 ++++++++++
 3 files changed, 83 insertions(+)
 create mode 100644 test-suite-kotlin/src/test/kotlin/io/micronaut/scheduling/instrument/CoroutineController.kt
 create mode 100644 test-suite-kotlin/src/test/kotlin/io/micronaut/scheduling/instrument/MultipleInvocationInstrumenterSpec.kt

diff --git a/test-suite-kotlin/build.gradle b/test-suite-kotlin/build.gradle
index 28b226070..7580e61c8 100644
--- a/test-suite-kotlin/build.gradle
+++ b/test-suite-kotlin/build.gradle
@@ -6,6 +6,7 @@ plugins {
 dependencies {
     api "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlinVersion"
     api "org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion"
+    api "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.3.7"
     api "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7"
     api "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.7"
     api project(':http-server-netty')
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/scheduling/instrument/CoroutineController.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/scheduling/instrument/CoroutineController.kt
new file mode 100644
index 000000000..312c933d1
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/scheduling/instrument/CoroutineController.kt
@@ -0,0 +1,54 @@
+package io.micronaut.scheduling.instrument
+
+import io.micronaut.http.annotation.Controller
+import io.micronaut.http.annotation.Get
+import io.micronaut.http.annotation.QueryValue
+import io.reactivex.Observable
+import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Runnable
+import kotlinx.coroutines.async
+import kotlinx.coroutines.future.asCompletableFuture
+import kotlinx.coroutines.rx2.await
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.ExecutorService
+import kotlin.coroutines.CoroutineContext
+
+typealias TokenDetail = String
+@Controller
+class Controller(private val executorService: ExecutorService) : CoroutineScope {
+
+    override val coroutineContext: CoroutineContext = object : CoroutineDispatcher() {
+        override fun dispatch(context: CoroutineContext, block: Runnable) {
+            executorService.execute(block)
+        }
+
+    }
+
+    val stream: Observable<TokenDetail> by lazy {
+        requestNextToken(0).replay(1).autoConnect()
+    }
+
+    fun current() = stream.take(1).singleOrError()!!
+
+    private fun requestNextToken(idx: Long): Observable<TokenDetail> {
+        return Observable.just(idx).map {
+            Thread.sleep(5000)
+            "idx + $it"
+        }.subscribeOn(Schedulers.io())
+    }
+
+    @Get("/tryout/{times}")
+    fun tryout(@QueryValue("times") times: Int) = asyncResult {
+        (1..times).map {
+            async { current().await() }
+        }.map {
+            it.await()
+        }
+    }
+
+    private fun <T> asyncResult(block: suspend CoroutineScope.() -> T): CompletableFuture<T> {
+        return async { block() }.asCompletableFuture()
+    }
+}
\ No newline at end of file
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/scheduling/instrument/MultipleInvocationInstrumenterSpec.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/scheduling/instrument/MultipleInvocationInstrumenterSpec.kt
new file mode 100644
index 000000000..ccc128a62
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/scheduling/instrument/MultipleInvocationInstrumenterSpec.kt
@@ -0,0 +1,28 @@
+package io.micronaut.scheduling.instrument
+
+import io.micronaut.http.HttpRequest
+import io.micronaut.http.client.RxHttpClient
+import io.micronaut.http.client.annotation.Client
+import io.micronaut.test.annotation.MicronautTest
+import org.junit.jupiter.api.Test
+import javax.inject.Inject
+import kotlin.test.assertTrue
+
+@MicronautTest
+class MultipleInvocationInstrumenterSpec {
+
+    @Inject
+    @field:Client("/")
+    lateinit var client : RxHttpClient;
+
+    @Test
+    fun testMultipleInvocationInstrumenter() {
+        val map: List<*> = client
+                .retrieve(
+                        HttpRequest.GET<Any>("/tryout/100"),
+                        MutableList::class.java
+                ).blockingFirst()
+
+        assertTrue(map.isNotEmpty())
+    }
+}
\ No newline at end of file
-- 

@graemerocher graemerocher merged commit 7b545a4 into micronaut-projects:1.3.x Jun 8, 2020
@graemerocher
Copy link
Contributor

@dstepanov @lgathy I did the merge 1ed48df

If you spot anything awry let me know

@lgathy
Copy link
Contributor

lgathy commented Jun 8, 2020

@graemerocher Unfortunately the fix is not resolving this issue. The test case was simply not triggering MultipleIncovationInstrumenter as tracing was not enabled. See here + diff

I did not want to submit a PR with this comment yet, as it fails crashing the Gradle daemon. I tried with @Timeout but it did not help.

@lgathy
Copy link
Contributor

lgathy commented Jun 8, 2020

Making the methods synchronized is also not a good solution. It removes the exception but instrumentation will still not be done correctly.

I don't really see how we could do correct instrumentation if an instrumenter instance can be called concurrently. I am starting to believe that this reactive construct in the test controller should be changed instead, i.e. access to the replay observable should be serialized. @dstepanov @graemerocher What do you think about that?

@lgathy
Copy link
Contributor

lgathy commented Jun 8, 2020

Serializing subsciption to the shared observable resolves the problem indeed, see: here. Furthermore, instrumentation gets done correctly in this case.

@lgathy
Copy link
Contributor

lgathy commented Jun 8, 2020

The only alternative solution where I can see this concurrent subscription properly supported by instrumentation is to make all instrumenters thread-safe. Internal state could be moved into the new Instrumentation objects, however that could only be supported in 2.0 + before/afterInvocation methods would no longer work correctly in that case. They could be removed however as 2.0 would allow braking changes. Still, it's a big change, so I am not sure if this would be the right move. I will try to put together a PR tomorrow to see if it would even work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants