Skip to content

Commit

Permalink
Merge branch '1.11.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
shakuzen committed Nov 8, 2023
2 parents bcc7fd1 + 3bf0f96 commit 214f038
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ protected DistributionStatisticConfig defaultHistogramConfig() {
@Override
public void close() {
stop();
if (!isPublishing() && isDelta()) {
if (!isDataPublishedForCurrentStep()) {
if (config.enabled() && isDelta() && !isClosed()) {
if (!isDataPublishedForCurrentStep() && !isPublishing()) {
// Data was not published for the current step. So, we should flush that
// first.
try {
Expand All @@ -259,6 +259,9 @@ public void close() {
e);
}
}
else if (isPublishing()) {
waitForInProgressScheduledPublish();
}
getMeters().forEach(this::closingRollover);
}
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
Expand All @@ -41,6 +45,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class OtlpDeltaMeterRegistryTest extends OtlpMeterRegistryTest {

Expand Down Expand Up @@ -551,6 +556,7 @@ void finalPushHasPartialStep() {

stepOverNStep(1);
registry.scheduledPublish();
registry.waitForInProgressScheduledPublish();

assertThat(registry.publishedCounterCounts).hasSize(1);
assertThat(registry.publishedCounterCounts.pop()).isOne();
Expand Down Expand Up @@ -621,6 +627,105 @@ void finalPushHasPartialStep() {
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24);
}

@Test
@Issue("gh-3846")
void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePublished()
throws InterruptedException {
TestOtlpMeterRegistry registry = new TestOtlpMeterRegistry();

AtomicDouble counterCount = new AtomicDouble(15);
AtomicLong timerCount = new AtomicLong(3);
AtomicDouble timerTotalTime = new AtomicDouble(53);

Counter counter = Counter.builder("counter").register(registry);
counter.increment();
Timer timer = Timer.builder("timer").register(registry);
timer.record(5, MILLISECONDS);
DistributionSummary summary = DistributionSummary.builder("summary").register(registry);
summary.record(7);
FunctionCounter functionCounter = FunctionCounter.builder("counter.function", this, obj -> counterCount.get())
.register(registry);
FunctionTimer functionTimer = FunctionTimer
.builder("timer.function", this, obj -> timerCount.get(), obj -> timerTotalTime.get(), MILLISECONDS)
.register(registry);

// before step rollover
assertThat(counter.count()).isZero();
assertThat(timer.count()).isZero();
assertThat(timer.totalTime(MILLISECONDS)).isZero();
assertThat(timer.max(MILLISECONDS)).isZero();
assertEmptyHistogramSnapshot(timer.takeSnapshot());
assertThat(summary.count()).isZero();
assertThat(summary.totalAmount()).isZero();
assertThat(summary.max()).isZero();
assertEmptyHistogramSnapshot(summary.takeSnapshot());
assertThat(functionCounter.count()).isZero();
assertThat(functionTimer.count()).isZero();
assertThat(functionTimer.totalTime(MILLISECONDS)).isZero();

stepOverNStep(1);
registry.pollMetersToRollover();

// set clock to middle of second step
clock.add(otlpConfig().step().dividedBy(2));
// record some more values in new step interval
counter.increment(2);
timer.record(6, MILLISECONDS);
summary.record(8);
counterCount.set(18);
timerCount.set(5);
timerTotalTime.set(77);

// close registry during scheduled publish
CountDownLatch latch = new CountDownLatch(1);
registry.scheduledPublish(() -> {
try {
latch.await();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
await().pollDelay(1, MILLISECONDS)
.atMost(100, MILLISECONDS)
.untilAsserted(() -> assertThat(registry.isPublishing()).isTrue());
Thread closeThread = new Thread(registry::close, "simulatedShutdownHookThread");
closeThread.start();
latch.countDown();
closeThread.join();

// publish happened twice - scheduled publish of first step and closing publish of
// partial second step
assertThat(registry.publishedCounterCounts).hasSize(2);
assertThat(registry.publishedTimerCounts).hasSize(2);
assertThat(registry.publishedTimerSumMilliseconds).hasSize(2);
assertThat(registry.publishedSummaryCounts).hasSize(2);
assertThat(registry.publishedSummaryTotals).hasSize(2);
assertThat(registry.publishedFunctionCounterCounts).hasSize(2);
assertThat(registry.publishedFunctionTimerCounts).hasSize(2);
assertThat(registry.publishedFunctionTimerTotals).hasSize(2);

// first (full) step
assertThat(registry.publishedCounterCounts.pop()).isOne();
assertThat(registry.publishedTimerCounts.pop()).isOne();
assertThat(registry.publishedTimerSumMilliseconds.pop()).isEqualTo(5.0);
assertThat(registry.publishedSummaryCounts.pop()).isOne();
assertThat(registry.publishedSummaryTotals.pop()).isEqualTo(7);
assertThat(registry.publishedFunctionCounterCounts.pop()).isEqualTo(15);
assertThat(registry.publishedFunctionTimerCounts.pop()).isEqualTo(3);
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(53);

// second step (partial)
assertThat(registry.publishedCounterCounts.pop()).isEqualTo(2);
assertThat(registry.publishedTimerCounts.pop()).isEqualTo(1);
assertThat(registry.publishedTimerSumMilliseconds.pop()).isEqualTo(6.0);
assertThat(registry.publishedSummaryCounts.pop()).isOne();
assertThat(registry.publishedSummaryTotals.pop()).isEqualTo(8);
assertThat(registry.publishedFunctionCounterCounts.pop()).isEqualTo(3);
assertThat(registry.publishedFunctionTimerCounts.pop()).isEqualTo(2);
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24);
}

private void assertEmptyHistogramSnapshot(HistogramSnapshot snapshot) {
assertThat(snapshot.count()).isZero();
assertThat(snapshot.total()).isZero();
Expand Down Expand Up @@ -692,6 +797,10 @@ private class TestOtlpMeterRegistry extends OtlpMeterRegistry {

private long lastScheduledPublishStartTime = 0L;

AtomicBoolean isPublishing = new AtomicBoolean(false);

CompletableFuture<Void> scheduledPublishingFuture = CompletableFuture.completedFuture(null);

TestOtlpMeterRegistry() {
super(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock);
}
Expand All @@ -703,8 +812,38 @@ protected void publish() {
}

private void scheduledPublish() {
this.lastScheduledPublishStartTime = clock.wallTime();
publish();
scheduledPublish(() -> {
});
}

private void scheduledPublish(Runnable prePublishRunnable) {
scheduledPublishingFuture = CompletableFuture.runAsync(() -> {
if (isPublishing.compareAndSet(false, true)) {
this.lastScheduledPublishStartTime = clock.wallTime();
try {
prePublishRunnable.run();
publish();
}
finally {
isPublishing.set(false);
}
}
});
}

@Override
protected boolean isPublishing() {
return isPublishing.get();
}

@Override
protected void waitForInProgressScheduledPublish() {
try {
scheduledPublishingFuture.get();
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ public void close() {
super.close();
}

private void waitForInProgressScheduledPublish() {
/**
* Wait until scheduled publishing by {@link PushMeterRegistry} completes, if in
* progress.
* @since 1.11.6
*/
protected void waitForInProgressScheduledPublish() {
try {
// block until in progress publish finishes
publishingSemaphore.acquire();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public void stop() {
public void close() {
stop();

if (!isPublishing() && config.enabled() && !isClosed()) {
if (!isDataPublishedForCurrentStep()) {
if (config.enabled() && !isClosed()) {
if (!isDataPublishedForCurrentStep() && !isPublishing()) {
// Data was not published for the current step. So, we should flush that
// first.
try {
Expand All @@ -151,6 +151,9 @@ public void close() {
e);
}
}
else if (isPublishing()) {
waitForInProgressScheduledPublish();
}
closingRolloverStepMeters();
}
super.close();
Expand Down
Loading

0 comments on commit 214f038

Please sign in to comment.