Skip to content

Commit

Permalink
Merge pull request #16756 from ayudovin
Browse files Browse the repository at this point in the history
* pr/16756:
  Polish 'Drop blocking RedisReactiveHealthIndicator calls'
  Drop blocking RedisReactiveHealthIndicator calls
  • Loading branch information
philwebb committed May 15, 2019
2 parents ed998ef + f790556 commit e0e9c4a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2017 the original author or authors.
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.Properties;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health;
Expand All @@ -31,6 +32,7 @@
*
* @author Stephane Nicoll
* @author Mark Paluch
* @author Artsiom Yudovin
* @since 2.0.0
*/
public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
Expand All @@ -44,15 +46,29 @@ public RedisReactiveHealthIndicator(

@Override
protected Mono<Health> doHealthCheck(Health.Builder builder) {
ReactiveRedisConnection connection = this.connectionFactory
.getReactiveConnection();
return getConnection()
.flatMap((connection) -> doHealthCheck(builder, connection));
}

private Mono<Health> doHealthCheck(Health.Builder builder,
ReactiveRedisConnection connection) {
return connection.serverCommands().info().map((info) -> up(builder, info))
.doFinally((signal) -> connection.close());
.onErrorResume((ex) -> Mono.just(down(builder, ex)))
.flatMap((health) -> connection.closeLater().thenReturn(health));
}

private Mono<ReactiveRedisConnection> getConnection() {
return Mono.fromSupplier(this.connectionFactory::getReactiveConnection)
.subscribeOn(Schedulers.parallel());
}

private Health up(Health.Builder builder, Properties info) {
return builder.up().withDetail(RedisHealthIndicator.VERSION,
info.getProperty(RedisHealthIndicator.REDIS_VERSION)).build();
}

private Health down(Health.Builder builder, Throwable cause) {
return builder.down(cause).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* @author Stephane Nicoll
* @author Mark Paluch
* @author Nikolay Rybak
* @author Artsiom Yudovin
*/
public class RedisReactiveHealthIndicatorTests {

Expand All @@ -49,6 +50,7 @@ public void redisIsUp() {
Properties info = new Properties();
info.put("redis_version", "2.8.9");
ReactiveRedisConnection redisConnection = mock(ReactiveRedisConnection.class);
given(redisConnection.closeLater()).willReturn(Mono.empty());
ReactiveServerCommands commands = mock(ReactiveServerCommands.class);
given(commands.info()).willReturn(Mono.just(info));
RedisReactiveHealthIndicator healthIndicator = createHealthIndicator(
Expand All @@ -59,7 +61,7 @@ public void redisIsUp() {
assertThat(h.getDetails()).containsOnlyKeys("version");
assertThat(h.getDetails().get("version")).isEqualTo("2.8.9");
}).verifyComplete();
verify(redisConnection).close();
verify(redisConnection).closeLater();
}

@Test
Expand All @@ -68,13 +70,14 @@ public void redisCommandIsDown() {
given(commands.info()).willReturn(
Mono.error(new RedisConnectionFailureException("Connection failed")));
ReactiveRedisConnection redisConnection = mock(ReactiveRedisConnection.class);
given(redisConnection.closeLater()).willReturn(Mono.empty());
RedisReactiveHealthIndicator healthIndicator = createHealthIndicator(
redisConnection, commands);
Mono<Health> health = healthIndicator.health();
StepVerifier.create(health)
.consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN))
.verifyComplete();
verify(redisConnection).close();
verify(redisConnection).closeLater();
}

@Test
Expand Down

0 comments on commit e0e9c4a

Please sign in to comment.