Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a ConcurrentCompositeHealthIndicator #15836

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ public HealthIndicatorRegistry getRegistry() {
return this.registry;
}

/**
* Return the {@link HealthAggregator} of this instance.
* @return the aggregator for {@link HealthIndicator health indicators}
*/
public HealthAggregator getAggregator() {
return this.aggregator;
}

@Override
public Health health() {
Map<String, Health> healths = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.actuate.health;

import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* Version of {@link CompositeHealthIndicator} that gathers health indications from all
* registered delegates concurrently.
*
* @author Rik vd Ende
*/
public class ConcurrentCompositeHealthIndicator extends CompositeHealthIndicator {

private final ThreadPoolTaskExecutor executor;

private final Function<Future<Health>, Health> futureFunction;

/**
* Create a new {@link ConcurrentCompositeHealthIndicator} from the specified
* indicators.
* @param healthAggregator the health aggregator
* @param indicators a map of {@link HealthIndicator HealthIndicators} with the key
* being used as an indicator name.
* @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on
*/
public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator,
Map<String, HealthIndicator> indicators, ThreadPoolTaskExecutor executor) {
this(healthAggregator, new DefaultHealthIndicatorRegistry(indicators), executor);
}

/**
* Create a new {@link ConcurrentCompositeHealthIndicator} from the specified
* indicators.
* @param healthAggregator the health aggregator
* @param indicators a map of {@link HealthIndicator HealthIndicators} with the key
* being used as an indicator name.
* @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on
* @param timeout the maximum time to wait for a HealthIndicator to complete
*/
public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator,
Map<String, HealthIndicator> indicators, ThreadPoolTaskExecutor executor,
Duration timeout) {
this(healthAggregator, new DefaultHealthIndicatorRegistry(indicators), executor,
timeout);
}

/**
* Create a new
* {@link org.springframework.boot.actuate.health.ConcurrentCompositeHealthIndicator}
* from the indicators in the given {@code registry} and provide a
* ThreadPoolTaskExecutor to submit the HealthIndicators to, without a timeout.
* @param healthAggregator the health aggregator
* @param registry the registry of {@link HealthIndicator HealthIndicators}.
* @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on
*/
public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator,
HealthIndicatorRegistry registry, ThreadPoolTaskExecutor executor) {
this(healthAggregator, registry, executor, (future) -> {
try {
return future.get();
}
catch (InterruptedException | ExecutionException ex) {
return Health.down()
.withDetail("error",
ex.getClass().getName()
+ ": Health check did not compete successfully")
.build();
}
});
}

/**
* Create a new
* {@link org.springframework.boot.actuate.health.ConcurrentCompositeHealthIndicator}
* from the indicators in the given {@code registry} and provide a
* ThreadPoolTaskExecutor for submitting the health checks.
* @param healthAggregator the health aggregator
* @param registry the registry of {@link HealthIndicator HealthIndicators}.
* @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on
* @param timeout the maximum time to wait for a HealthIndicator to complete
*/
public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator,
HealthIndicatorRegistry registry, ThreadPoolTaskExecutor executor,
Duration timeout) {
this(healthAggregator, registry, executor, (future) -> {
try {
return future.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
catch (InterruptedException | ExecutionException ex) {
return Health.down()
.withDetail("error",
ex.getClass().getName()
+ ": health check did not compete successfully")
.build();
}
catch (TimeoutException ex) {
return Health.down()
.withDetail("error",
ex.getClass().getName()
+ ": health check timed out after " + timeout)
.build();
}
});
}

/**
* Create a new
* {@link org.springframework.boot.actuate.health.ConcurrentCompositeHealthIndicator}
* from the indicators in the given {@code registry} and provide a
* ThreadPoolTaskExecutor for submitting the health checks.
* @param healthAggregator the health aggregator
* @param registry the registry of {@link HealthIndicator HealthIndicators}.
* @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on
* @param futureFunction function to select Future::get with or without a timeout
*/
private ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator,
HealthIndicatorRegistry registry, ThreadPoolTaskExecutor executor,
Function<Future<Health>, Health> futureFunction) {
super(healthAggregator, registry);
this.executor = executor;
this.futureFunction = futureFunction;
}

@Override
public Health health() {
Map<String, Future<Health>> futureHealths = getRegistry().getAll().entrySet()
.stream()
.map((entry) -> new SimpleEntry<>(entry.getKey(),
this.executor.submit(() -> entry.getValue().health())))
.collect(LinkedHashMap::new,
(map, entry) -> map.put(entry.getKey(), entry.getValue()),
Map::putAll);

Map<String, Health> healths = futureHealths.entrySet().stream()
.map((entry) -> new SimpleEntry<>(entry.getKey(),
this.futureFunction.apply(entry.getValue())))
.collect(LinkedHashMap::new,
(map, entry) -> map.put(entry.getKey(), entry.getValue()),
Map::putAll);

return getAggregator().aggregate(healths);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.actuate.health;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.BDDMockito.given;

/**
* Tests for {@link ConcurrentCompositeHealthIndicator}
*
* @author Rik vd Ende
*/
public class ConcurrentCompositeHealthIndicatorTest {

private HealthAggregator healthAggregator;

@Mock
private HealthIndicator one;

@Mock
private HealthIndicator two;

@Mock
private ThreadPoolTaskExecutor executor;

private ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors
.newFixedThreadPool(2);

@Before
@SuppressWarnings("unchecked")
public void setup() {
MockitoAnnotations.initMocks(this);
given(this.one.health())
.willReturn(new Health.Builder().unknown().withDetail("1", "1").build());
given(this.two.health())
.willReturn(new Health.Builder().unknown().withDetail("2", "2").build());
given(this.executor.getThreadPoolExecutor()).willReturn(this.threadPoolExecutor);
given(this.executor.submit(isA(Callable.class)))
.will((InvocationOnMock invocation) -> this.threadPoolExecutor
.submit((Callable<Health>) invocation.getArgument(0)));

this.healthAggregator = new OrderedHealthAggregator();
}

@Test
public void createWithIndicators() {
Map<String, HealthIndicator> indicators = new HashMap<>();
indicators.put("one", this.one);
indicators.put("two", this.two);
ConcurrentCompositeHealthIndicator composite = new ConcurrentCompositeHealthIndicator(
this.healthAggregator, indicators, this.executor);
Health result = composite.health();
assertThat(result.getDetails()).hasSize(2);
assertThat(result.getDetails()).containsEntry("one",
new Health.Builder().unknown().withDetail("1", "1").build());
assertThat(result.getDetails()).containsEntry("two",
new Health.Builder().unknown().withDetail("2", "2").build());
}

@Test
public void testSerialization() throws Exception {
Map<String, HealthIndicator> indicators = new HashMap<>();
indicators.put("db1", this.one);
indicators.put("db2", this.two);
ConcurrentCompositeHealthIndicator innerComposite = new ConcurrentCompositeHealthIndicator(
this.healthAggregator, indicators, this.executor);
ConcurrentCompositeHealthIndicator composite = new ConcurrentCompositeHealthIndicator(
this.healthAggregator, Collections.singletonMap("db", innerComposite),
this.executor);
Health result = composite.health();
ObjectMapper mapper = new ObjectMapper();
assertThat(mapper.writeValueAsString(result)).isEqualTo(
"{\"status\":\"UNKNOWN\",\"details\":{\"db\":{\"status\":\"UNKNOWN\""
+ ",\"details\":{\"db1\":{\"status\":\"UNKNOWN\",\"details\""
+ ":{\"1\":\"1\"}},\"db2\":{\"status\":\"UNKNOWN\",\"details\""
+ ":{\"2\":\"2\"}}}}}}");
}

@Test
public void testWithTimeout() throws Exception {
Map<String, HealthIndicator> indicators = new HashMap<>();
indicators.put("one", this.one);
indicators.put("two", this.two);
ConcurrentCompositeHealthIndicator innerComposite = new ConcurrentCompositeHealthIndicator(
this.healthAggregator, indicators, this.executor);
ConcurrentCompositeHealthIndicator composite = new ConcurrentCompositeHealthIndicator(
this.healthAggregator, Collections.singletonMap("db", innerComposite),
this.executor, Duration.ZERO);

Health result = composite.health();
assertThat(result.getDetails()).hasSize(1);
assertThat(result.getDetails()).containsEntry("db",
Health.down()
.withDetail("error", TimeoutException.class.getName()
+ ": health check timed out after " + Duration.ZERO)
.build());

}

}