From 3bf0f96a3bf58ca5284fd80310aca721461b6177 Mon Sep 17 00:00:00 2001 From: Tommy Ludwig <8924140+shakuzen@users.noreply.github.com> Date: Wed, 8 Nov 2023 17:37:31 +0900 Subject: [PATCH] Wait for in-progress publish before closing step rollover (#4318) If a scheduled publish is in progress when `close` is called, `StepMeterRegistry` and `OtlpMeterRegistry` were not performing the closing rollover and was not doing a subsequent publish of the final partial step. Wait for the in-progress publish to finish before the closing rollover is called, which is followed by a final publish of the partial step (via `PushMeterRegistry`'s close). Fixes gh-3846 --- .../registry/otlp/OtlpMeterRegistry.java | 7 +- .../otlp/OtlpDeltaMeterRegistryTest.java | 143 +++++++++++++++++- .../instrument/push/PushMeterRegistry.java | 7 +- .../instrument/step/StepMeterRegistry.java | 7 +- .../step/StepMeterRegistryTest.java | 131 +++++++++++++++- 5 files changed, 286 insertions(+), 9 deletions(-) diff --git a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java index f7a5fcc49d..811c24b6df 100644 --- a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java +++ b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java @@ -243,8 +243,8 @@ protected DistributionStatisticConfig defaultHistogramConfig() { @Override public void close() { stop(); - if (!isPublishing() && isDelta()) { - if (!isDataPublishedForCurrentStep()) { + if (config.enabled() && isDelta() && !isClosed()) { + if (!isDataPublishedForCurrentStep() && !isPublishing()) { // Data was not published for the current step. So, we should flush that // first. try { @@ -256,6 +256,9 @@ public void close() { e); } } + else if (isPublishing()) { + waitForInProgressScheduledPublish(); + } getMeters().forEach(this::closingRollover); } super.close(); diff --git a/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java b/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java index 0513ffe50a..396f5422e7 100644 --- a/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java +++ b/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java @@ -32,7 +32,11 @@ import java.util.ArrayDeque; import java.util.Arrays; import java.util.Deque; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -41,6 +45,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; class OtlpDeltaMeterRegistryTest extends OtlpMeterRegistryTest { @@ -551,6 +556,7 @@ void finalPushHasPartialStep() { stepOverNStep(1); registry.scheduledPublish(); + registry.waitForInProgressScheduledPublish(); assertThat(registry.publishedCounterCounts).hasSize(1); assertThat(registry.publishedCounterCounts.pop()).isOne(); @@ -621,6 +627,105 @@ void finalPushHasPartialStep() { assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24); } + @Test + @Issue("gh-3846") + void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePublished() + throws InterruptedException { + TestOtlpMeterRegistry registry = new TestOtlpMeterRegistry(); + + AtomicDouble counterCount = new AtomicDouble(15); + AtomicLong timerCount = new AtomicLong(3); + AtomicDouble timerTotalTime = new AtomicDouble(53); + + Counter counter = Counter.builder("counter").register(registry); + counter.increment(); + Timer timer = Timer.builder("timer").register(registry); + timer.record(5, MILLISECONDS); + DistributionSummary summary = DistributionSummary.builder("summary").register(registry); + summary.record(7); + FunctionCounter functionCounter = FunctionCounter.builder("counter.function", this, obj -> counterCount.get()) + .register(registry); + FunctionTimer functionTimer = FunctionTimer + .builder("timer.function", this, obj -> timerCount.get(), obj -> timerTotalTime.get(), MILLISECONDS) + .register(registry); + + // before step rollover + assertThat(counter.count()).isZero(); + assertThat(timer.count()).isZero(); + assertThat(timer.totalTime(MILLISECONDS)).isZero(); + assertThat(timer.max(MILLISECONDS)).isZero(); + assertEmptyHistogramSnapshot(timer.takeSnapshot()); + assertThat(summary.count()).isZero(); + assertThat(summary.totalAmount()).isZero(); + assertThat(summary.max()).isZero(); + assertEmptyHistogramSnapshot(summary.takeSnapshot()); + assertThat(functionCounter.count()).isZero(); + assertThat(functionTimer.count()).isZero(); + assertThat(functionTimer.totalTime(MILLISECONDS)).isZero(); + + stepOverNStep(1); + registry.pollMetersToRollover(); + + // set clock to middle of second step + clock.add(otlpConfig().step().dividedBy(2)); + // record some more values in new step interval + counter.increment(2); + timer.record(6, MILLISECONDS); + summary.record(8); + counterCount.set(18); + timerCount.set(5); + timerTotalTime.set(77); + + // close registry during scheduled publish + CountDownLatch latch = new CountDownLatch(1); + registry.scheduledPublish(() -> { + try { + latch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + await().pollDelay(1, MILLISECONDS) + .atMost(100, MILLISECONDS) + .untilAsserted(() -> assertThat(registry.isPublishing()).isTrue()); + Thread closeThread = new Thread(registry::close, "simulatedShutdownHookThread"); + closeThread.start(); + latch.countDown(); + closeThread.join(); + + // publish happened twice - scheduled publish of first step and closing publish of + // partial second step + assertThat(registry.publishedCounterCounts).hasSize(2); + assertThat(registry.publishedTimerCounts).hasSize(2); + assertThat(registry.publishedTimerSumMilliseconds).hasSize(2); + assertThat(registry.publishedSummaryCounts).hasSize(2); + assertThat(registry.publishedSummaryTotals).hasSize(2); + assertThat(registry.publishedFunctionCounterCounts).hasSize(2); + assertThat(registry.publishedFunctionTimerCounts).hasSize(2); + assertThat(registry.publishedFunctionTimerTotals).hasSize(2); + + // first (full) step + assertThat(registry.publishedCounterCounts.pop()).isOne(); + assertThat(registry.publishedTimerCounts.pop()).isOne(); + assertThat(registry.publishedTimerSumMilliseconds.pop()).isEqualTo(5.0); + assertThat(registry.publishedSummaryCounts.pop()).isOne(); + assertThat(registry.publishedSummaryTotals.pop()).isEqualTo(7); + assertThat(registry.publishedFunctionCounterCounts.pop()).isEqualTo(15); + assertThat(registry.publishedFunctionTimerCounts.pop()).isEqualTo(3); + assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(53); + + // second step (partial) + assertThat(registry.publishedCounterCounts.pop()).isEqualTo(2); + assertThat(registry.publishedTimerCounts.pop()).isEqualTo(1); + assertThat(registry.publishedTimerSumMilliseconds.pop()).isEqualTo(6.0); + assertThat(registry.publishedSummaryCounts.pop()).isOne(); + assertThat(registry.publishedSummaryTotals.pop()).isEqualTo(8); + assertThat(registry.publishedFunctionCounterCounts.pop()).isEqualTo(3); + assertThat(registry.publishedFunctionTimerCounts.pop()).isEqualTo(2); + assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24); + } + private void assertEmptyHistogramSnapshot(HistogramSnapshot snapshot) { assertThat(snapshot.count()).isZero(); assertThat(snapshot.total()).isZero(); @@ -692,6 +797,10 @@ private class TestOtlpMeterRegistry extends OtlpMeterRegistry { private long lastScheduledPublishStartTime = 0L; + AtomicBoolean isPublishing = new AtomicBoolean(false); + + CompletableFuture scheduledPublishingFuture = CompletableFuture.completedFuture(null); + TestOtlpMeterRegistry() { super(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock); } @@ -703,8 +812,38 @@ protected void publish() { } private void scheduledPublish() { - this.lastScheduledPublishStartTime = clock.wallTime(); - publish(); + scheduledPublish(() -> { + }); + } + + private void scheduledPublish(Runnable prePublishRunnable) { + scheduledPublishingFuture = CompletableFuture.runAsync(() -> { + if (isPublishing.compareAndSet(false, true)) { + this.lastScheduledPublishStartTime = clock.wallTime(); + try { + prePublishRunnable.run(); + publish(); + } + finally { + isPublishing.set(false); + } + } + }); + } + + @Override + protected boolean isPublishing() { + return isPublishing.get(); + } + + @Override + protected void waitForInProgressScheduledPublish() { + try { + scheduledPublishingFuture.get(); + } + catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } } @Override diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/push/PushMeterRegistry.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/push/PushMeterRegistry.java index d6888cd01b..3367689931 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/push/PushMeterRegistry.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/push/PushMeterRegistry.java @@ -136,7 +136,12 @@ public void close() { super.close(); } - private void waitForInProgressScheduledPublish() { + /** + * Wait until scheduled publishing by {@link PushMeterRegistry} completes, if in + * progress. + * @since 1.11.6 + */ + protected void waitForInProgressScheduledPublish() { try { // block until in progress publish finishes publishingSemaphore.acquire(); diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java index 9c183ba525..2435591425 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java @@ -138,8 +138,8 @@ public void stop() { public void close() { stop(); - if (!isPublishing() && config.enabled() && !isClosed()) { - if (!isDataPublishedForCurrentStep()) { + if (config.enabled() && !isClosed()) { + if (!isDataPublishedForCurrentStep() && !isPublishing()) { // Data was not published for the current step. So, we should flush that // first. try { @@ -151,6 +151,9 @@ public void close() { e); } } + else if (isPublishing()) { + waitForInProgressScheduledPublish(); + } closingRolloverStepMeters(); } super.close(); diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java index f852ea469c..cd8927022e 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java @@ -26,7 +26,11 @@ import java.util.Arrays; import java.util.Deque; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -35,6 +39,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.SoftAssertions.assertSoftly; +import static org.awaitility.Awaitility.await; /** * Tests for {@link StepMeterRegistry}. @@ -221,6 +226,7 @@ void finalPushHasPartialStep() { addTimeWithRolloverOnStepStart(clock, registry, config, config.step()); registry.scheduledPublish(); + registry.waitForInProgressScheduledPublish(); assertThat(registry.publishedCounterCounts).hasSize(1); assertThat(registry.publishedCounterCounts.pop()).isOne(); @@ -398,6 +404,8 @@ void scheduledRollOver() { // recordings that happened in the previous step should be published registry.scheduledPublish(); + registry.waitForInProgressScheduledPublish(); + assertThat(registry.publishedCounterCounts).hasSize(1); assertThat(registry.publishedCounterCounts.pop()).isOne(); assertThat(registry.publishedTimerCounts).hasSize(1); @@ -464,6 +472,91 @@ void publishShouldNotHappenWhenRegistryIsClosed() { assertThat(publishes.get()).isEqualTo(2); } + @Test + @Issue("gh-3846") + void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePublished() + throws InterruptedException { + AtomicDouble counterCount = new AtomicDouble(15); + AtomicLong timerCount = new AtomicLong(3); + AtomicDouble timerTotalTime = new AtomicDouble(53); + + Counter counter = Counter.builder("counter").register(registry); + counter.increment(); + Timer timer = Timer.builder("timer").register(registry); + timer.record(5, MILLISECONDS); + DistributionSummary summary = DistributionSummary.builder("summary").register(registry); + summary.record(7); + FunctionCounter functionCounter = FunctionCounter.builder("counter.function", this, obj -> counterCount.get()) + .register(registry); + FunctionTimer functionTimer = FunctionTimer + .builder("timer.function", this, obj -> timerCount.get(), obj -> timerTotalTime.get(), MILLISECONDS) + .register(registry); + + // before step rollover + assertBeforeRollover(counter, timer, summary, functionCounter, functionTimer); + + addTimeWithRolloverOnStepStart(clock, registry, config, config.step()); + + // set clock to middle of second step + addTimeWithRolloverOnStepStart(clock, registry, config, config.step().dividedBy(2)); + // record some more values in new step interval + counter.increment(2); + timer.record(6, MILLISECONDS); + summary.record(8); + counterCount.set(18); + timerCount.set(5); + timerTotalTime.set(77); + + // close registry during scheduled publish + CountDownLatch latch = new CountDownLatch(1); + registry.scheduledPublish(() -> { + try { + latch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + await().pollDelay(1, MILLISECONDS) + .atMost(100, MILLISECONDS) + .untilAsserted(() -> assertThat(registry.isPublishing()).isTrue()); + Thread closeThread = new Thread(registry::close, "simulatedShutdownHookThread"); + closeThread.start(); + latch.countDown(); + closeThread.join(); + + // publish happened twice - scheduled publish of first step and closing publish of + // partial second step + assertThat(registry.publishedCounterCounts).hasSize(2); + assertThat(registry.publishedTimerCounts).hasSize(2); + assertThat(registry.publishedTimerSumMilliseconds).hasSize(2); + assertThat(registry.publishedSummaryCounts).hasSize(2); + assertThat(registry.publishedSummaryTotals).hasSize(2); + assertThat(registry.publishedFunctionCounterCounts).hasSize(2); + assertThat(registry.publishedFunctionTimerCounts).hasSize(2); + assertThat(registry.publishedFunctionTimerTotals).hasSize(2); + + // first (full) step + assertThat(registry.publishedCounterCounts.pop()).isOne(); + assertThat(registry.publishedTimerCounts.pop()).isOne(); + assertThat(registry.publishedTimerSumMilliseconds.pop()).isEqualTo(5.0); + assertThat(registry.publishedSummaryCounts.pop()).isOne(); + assertThat(registry.publishedSummaryTotals.pop()).isEqualTo(7); + assertThat(registry.publishedFunctionCounterCounts.pop()).isEqualTo(15); + assertThat(registry.publishedFunctionTimerCounts.pop()).isEqualTo(3); + assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(53); + + // second step (partial) + assertThat(registry.publishedCounterCounts.pop()).isEqualTo(2); + assertThat(registry.publishedTimerCounts.pop()).isEqualTo(1); + assertThat(registry.publishedTimerSumMilliseconds.pop()).isEqualTo(6.0); + assertThat(registry.publishedSummaryCounts.pop()).isOne(); + assertThat(registry.publishedSummaryTotals.pop()).isEqualTo(8); + assertThat(registry.publishedFunctionCounterCounts.pop()).isEqualTo(3); + assertThat(registry.publishedFunctionTimerCounts.pop()).isEqualTo(2); + assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24); + } + private class MyStepMeterRegistry extends StepMeterRegistry { Deque publishedCounterCounts = new ArrayDeque<>(); @@ -487,6 +580,10 @@ private class MyStepMeterRegistry extends StepMeterRegistry { @Nullable Runnable prePublishAction; + AtomicBoolean isPublishing = new AtomicBoolean(false); + + CompletableFuture scheduledPublishingFuture = CompletableFuture.completedFuture(null); + MyStepMeterRegistry() { this(StepMeterRegistryTest.this.config, StepMeterRegistryTest.this.clock); } @@ -512,8 +609,38 @@ protected void publish() { } private void scheduledPublish() { - this.lastScheduledPublishStartTime = clock.wallTime(); - publish(); + scheduledPublish(() -> { + }); + } + + private void scheduledPublish(Runnable prePublishRunnable) { + scheduledPublishingFuture = CompletableFuture.runAsync(() -> { + if (isPublishing.compareAndSet(false, true)) { + this.lastScheduledPublishStartTime = clock.wallTime(); + try { + prePublishRunnable.run(); + publish(); + } + finally { + isPublishing.set(false); + } + } + }); + } + + @Override + protected boolean isPublishing() { + return isPublishing.get(); + } + + @Override + protected void waitForInProgressScheduledPublish() { + try { + scheduledPublishingFuture.get(); + } + catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } } @Override