diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000000..aaabe28b98 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +src/test/java/io/redis/examples/* @dmaier-redislabs diff --git a/.github/spellcheck-settings.yml b/.github/spellcheck-settings.yml new file mode 100644 index 0000000000..07b400f063 --- /dev/null +++ b/.github/spellcheck-settings.yml @@ -0,0 +1,28 @@ +matrix: +- name: Markdown + expect_match: false + apsell: + lang: en + d: en_US + ignore-case: true + dictionary: + wordlists: + - .github/wordlist.txt + output: wordlist.dic + pipeline: + - pyspelling.filters.markdown: + markdown_extensions: + - markdown.extensions.extra: + - pyspelling.filters.html: + comments: false + attributes: + - alt + ignores: + - ':matches(code, pre)' + - code + - pre + - blockquote + - img + sources: + - '*.md' + - 'docs/**' diff --git a/.github/wordlist.txt b/.github/wordlist.txt new file mode 100644 index 0000000000..7d0d85f49d --- /dev/null +++ b/.github/wordlist.txt @@ -0,0 +1,311 @@ +!!!Spelling check failed!!! +APM +ARGV +BFCommands +BitOP +BitPosParams +BuilderFactory +CFCommands +CMSCommands +CallNotPermittedException +CircuitBreaker +ClientKillParams +ClusterNode +ClusterNodes +ClusterPipeline +ClusterPubSub +ConnectionPool +CoreCommands +EVAL +EVALSHA +Failback +Failover +GSON +GenericObjectPool +GenericObjectPoolConfig +GeoAddParams +GeoRadiusParam +GeoRadiusStoreParam +GeoUnit +GraphCommands +Grokzen's +HostAndPort +HostnameVerifier +INCR +IOError +Instrumentations +JDK +JSONArray +JSONCommands +Jaeger +Javadocs +Jedis +JedisCluster +JedisConnectionException +JedisPool +JedisPooled +JedisShardInfo +ListPosition +Ludovico +Magnocavallo +McCurdy +NOSCRIPT +NUMPAT +NUMPT +NUMSUB +OSS +OpenCensus +OpenTelemetry +OpenTracing +Otel +POJO +POJOs +PubSub +Queable +READONLY +RediSearch +RediSearchCommands +RedisBloom +RedisCluster +RedisClusterCommands +RedisClusterException +RedisClusters +RedisGraph +RedisInstrumentor +RedisJSON +RedisTimeSeries +SHA +SSLParameters +SSLSocketFactory +SearchCommands +SentinelCommands +SentinelConnectionPool +ShardInfo +Sharded +Solovyov +SortingParams +SpanKind +Specfiying +StatusCode +StreamEntryID +TCP +TOPKCommands +Throwable +TimeSeriesCommands +URI +UnblockType +UnifiedJedis +Uptrace +ValueError +WATCHed +WatchError +XTrimParams +ZAddParams +ZParams +aclDelUser +api +approximateLength +arg +args +async +asyncio +autoclass +automodule +backoff +bdb +behaviour +bitcount +bitop +bitpos +bool +boolean +booleans +bysource +charset +clientId +clientKill +clientUnblock +clusterCountKeysInSlot +clusterKeySlot +configs +consumerName +consumername +cumbersome +dbIndex +dbSize +decr +decrBy +del +destKey +dev +dstKey +dstkey +eg +exc +expireAt +failback +failover +faoliver +firstName +firsttimersonly +fo +genindex +geoadd +georadiusByMemberStore +georadiusStore +getbit +gmail +groupname +hdel +hexists +hincrBy +hincrByFloat +hiredis +hlen +hset +hsetnx +hstrlen +http +idx +iff +incr +incrBy +incrByFloat +ini +json +keyslot +keyspace +keysvalues +kwarg +lastName +lastsave +linsert +linters +llen +localhost +lpush +lpushx +lrem +lua +makeapullrequest +maxLen +maxdepth +maya +memberCoordinateMap +mget +microservice +microservices +millisecondsTimestamp +mset +msetnx +multikey +mykey +newkey +nonatomic +observability +oldkey +opentelemetry +oss +param +params +performant +pexpire +pexpireAt +pfadd +pfcount +pmessage +png +pre +psubscribe +pttl +pubsub +punsubscribe +py +pypi +quickstart +readonly +readwrite +redis +redismodules +reimplemented +reinitialization +renamenx +replicaof +repo +rpush +rpushx +runtime +sadd +scard +scoreMembers +sdiffstore +sedrik +setbit +setnx +setrange +sinterstore +sismember +slowlogLen +smove +sortingParameters +srcKey +srcKeys +srckey +ssl +storeParam +str +strlen +stunnel +subcommands +sunionstore +thevalueofmykey +timeseries +toctree +topk +tox +triaging +ttl +txt +un +unblockType +unicode +unixTime +unlink +untyped +url +virtualenv +waitReplicas +whenver +www +xack +xdel +xgroupDelConsumer +xgroupDestroy +xlen +xtrim +zadd +zcard +zcount +zdiffStore +zincrby +zinterstore +zlexcount +zpopmax +zpopmin +zrandmember +zrandmemberWithScores +zrange +zrangeByLex +zrangeByScore +zrangeByScoreWithScores +zrangeWithScores +zrem +zremrangeByLex +zremrangeByRank +zremrangeByScore +zrevrange +zrevrangeByLex +zrevrangeByScore +zrevrangeByScoreWithScores +zrevrangeWithScores +zunionstore diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 84da5938f6..fd651fa6a4 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -10,13 +10,11 @@ on: - '**/*.rst' branches: - master - - '[0-9].x' - - '[0-9].[0-9].x' + - '[0-9].*' pull_request: branches: - master - - '[0-9].x' - - '[0-9].[0-9].x' + - '[0-9].*' schedule: - cron: '0 1 * * *' # nightly build workflow_dispatch: diff --git a/.github/workflows/spellcheck.yml b/.github/workflows/spellcheck.yml new file mode 100644 index 0000000000..e152841553 --- /dev/null +++ b/.github/workflows/spellcheck.yml @@ -0,0 +1,14 @@ +name: spellcheck +on: + pull_request: +jobs: + check-spelling: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Check Spelling + uses: rojopolis/spellcheck-github-actions@0.33.1 + with: + config_path: .github/spellcheck-settings.yml + task_name: Markdown diff --git a/.github/workflows/stale-issues.yml b/.github/workflows/stale-issues.yml new file mode 100644 index 0000000000..54bf059fba --- /dev/null +++ b/.github/workflows/stale-issues.yml @@ -0,0 +1,25 @@ +name: "Close stale issues" +on: + schedule: + - cron: "0 0 * * *" + +permissions: {} +jobs: + stale: + permissions: + issues: write # to close stale issues (actions/stale) + pull-requests: write # to close stale PRs (actions/stale) + + runs-on: ubuntu-latest + steps: + - uses: actions/stale@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + stale-issue-message: 'This issue is marked stale. It will be closed in 30 days if it is not updated.' + stale-pr-message: 'This pull request is marked stale. It will be closed in 30 days if it is not updated.' + days-before-stale: 365 + days-before-close: 30 + stale-issue-label: "stale" + stale-pr-label: "stale" + operations-per-run: 10 + remove-stale-when-updated: false diff --git a/docs/failover.md b/docs/failover.md index 8414e41376..38fdc8c97b 100644 --- a/docs/failover.md +++ b/docs/failover.md @@ -99,8 +99,8 @@ Jedis uses the following retry settings: | Max retry attempts | 3 | Maximum number of retry attempts (including the initial call) | | Retry wait duration | 500 ms | Number of milliseconds to wait between retry attempts | | Wait duration backoff multiplier | 2 | Exponential backoff factor multiplied against wait duration between retries. For example, with a wait duration of 1 second and a multiplier of 2, the retries would occur after 1s, 2s, 4s, 8s, 16s, and so on. | -| Retry included exception list | `JedisConnectionException` | A list of `Throwable` classes that count as failures and should be retried. | -| Retry ignored exception list | Empty list | A list of `Throwable` classes to explicitly ignore for the purposes of retry. | +| Retry included exception list | [JedisConnectionException] | A list of Throwable classes that count as failures and should be retried. | +| Retry ignored exception list | null | A list of Throwable classes to explicitly ignore for the purposes of retry. | To disable retry, set `maxRetryAttempts` to 1. @@ -116,8 +116,16 @@ Jedis uses the following circuit breaker settings: | Failure rate threshold | `50.0f` | Percentage of calls within the sliding window that must fail before the circuit breaker transitions to the `OPEN` state. | | Slow call duration threshold | 60000 ms | Duration threshold above which calls are classified as slow and added to the sliding window. | | Slow call rate threshold | `100.0f` | Percentage of calls within the sliding window that exceed the slow call duration threshold before circuit breaker transitions to the `OPEN` state. | -| Circuit breaker included exception list | `JedisConnectionException` | A list of `Throwable` classes that count as failures and add to the failure rate. | -| Circuit breaker ignored exception list | Empty list | A list of `Throwable` classes to explicitly ignore for failure rate calculations. | | +| Circuit breaker included exception list | [JedisConnectionException] | A list of Throwable classes that count as failures and add to the failure rate. | +| Circuit breaker ignored exception list | null | A list of Throwable classes to explicitly ignore for failure rate calculations. | | + +### Fallback configuration + +Jedis uses the following fallback settings: + +| Setting | Default value | Description | +|-------------------------|-------------------------------------------------------|----------------------------------------------------| +| Fallback exception list | [CallNotPermittedException, JedisConnectionException] | A list of Throwable classes that trigger fallback. | ### Failover callbacks diff --git a/pom.xml b/pom.xml index e2d0183a05..4080af65e2 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ redis.clients.jedis 1.7.36 1.7.1 - 2.16.0 + 2.16.1 3.2.3 @@ -189,7 +189,7 @@ maven-compiler-plugin - 3.11.0 + 3.12.1 1.8 1.8 diff --git a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java index c39efae7d4..980bbb91f9 100644 --- a/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java +++ b/src/main/java/redis/clients/jedis/MultiClusterClientConfig.java @@ -1,13 +1,15 @@ package redis.clients.jedis; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.SlidingWindowType; -import redis.clients.jedis.exceptions.JedisConnectionException; -import redis.clients.jedis.exceptions.JedisValidationException; import java.time.Duration; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisValidationException; + /** * @author Allen Terleto (aterleto) @@ -22,12 +24,13 @@ * not passed through to Jedis users. *

*/ +// TODO: move public final class MultiClusterClientConfig { private static final int RETRY_MAX_ATTEMPTS_DEFAULT = 3; private static final int RETRY_WAIT_DURATION_DEFAULT = 500; // measured in milliseconds private static final int RETRY_WAIT_DURATION_EXPONENTIAL_BACKOFF_MULTIPLIER_DEFAULT = 2; - private static final Class RETRY_INCLUDED_EXCEPTIONS_DEFAULT = JedisConnectionException.class; + private static final List RETRY_INCLUDED_EXCEPTIONS_DEFAULT = Arrays.asList(JedisConnectionException.class); private static final float CIRCUIT_BREAKER_FAILURE_RATE_THRESHOLD_DEFAULT = 50.0f; // measured as percentage private static final int CIRCUIT_BREAKER_SLIDING_WINDOW_MIN_CALLS_DEFAULT = 100; @@ -35,7 +38,10 @@ public final class MultiClusterClientConfig { private static final int CIRCUIT_BREAKER_SLIDING_WINDOW_SIZE_DEFAULT = 100; private static final int CIRCUIT_BREAKER_SLOW_CALL_DURATION_THRESHOLD_DEFAULT = 60000; // measured in milliseconds private static final float CIRCUIT_BREAKER_SLOW_CALL_RATE_THRESHOLD_DEFAULT = 100.0f; // measured as percentage - private static final Class CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT = JedisConnectionException.class; + private static final List CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT = Arrays.asList(JedisConnectionException.class); + + private static final List> FALLBACK_EXCEPTIONS_DEFAULT = + Arrays.asList(CallNotPermittedException.class, JedisConnectionException.class); private final ClusterConfig[] clusterConfigs; @@ -99,6 +105,7 @@ public final class MultiClusterClientConfig { * failure nor success, even if the exceptions is part of recordExceptions */ private List circuitBreakerIgnoreExceptionList; + private List> fallbackExceptionList; public MultiClusterClientConfig(ClusterConfig[] clusterConfigs) { this.clusterConfigs = clusterConfigs; @@ -160,6 +167,10 @@ public SlidingWindowType getCircuitBreakerSlidingWindowType() { return circuitBreakerSlidingWindowType; } + public List> getFallbackExceptionList() { + return fallbackExceptionList; + } + public static class ClusterConfig { private int priority; @@ -195,8 +206,8 @@ public static class Builder { private int retryMaxAttempts = RETRY_MAX_ATTEMPTS_DEFAULT; private int retryWaitDuration = RETRY_WAIT_DURATION_DEFAULT; private int retryWaitDurationExponentialBackoffMultiplier = RETRY_WAIT_DURATION_EXPONENTIAL_BACKOFF_MULTIPLIER_DEFAULT; - private List retryIncludedExceptionList; - private List retryIgnoreExceptionList; + private List retryIncludedExceptionList = RETRY_INCLUDED_EXCEPTIONS_DEFAULT; + private List retryIgnoreExceptionList = null; private float circuitBreakerFailureRateThreshold = CIRCUIT_BREAKER_FAILURE_RATE_THRESHOLD_DEFAULT; private int circuitBreakerSlidingWindowMinCalls = CIRCUIT_BREAKER_SLIDING_WINDOW_MIN_CALLS_DEFAULT; @@ -204,9 +215,9 @@ public static class Builder { private int circuitBreakerSlidingWindowSize = CIRCUIT_BREAKER_SLIDING_WINDOW_SIZE_DEFAULT; private int circuitBreakerSlowCallDurationThreshold = CIRCUIT_BREAKER_SLOW_CALL_DURATION_THRESHOLD_DEFAULT; private float circuitBreakerSlowCallRateThreshold = CIRCUIT_BREAKER_SLOW_CALL_RATE_THRESHOLD_DEFAULT; - private List circuitBreakerIncludedExceptionList; - private List circuitBreakerIgnoreExceptionList; - private List> circuitBreakerFallbackExceptionList; + private List circuitBreakerIncludedExceptionList = CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT; + private List circuitBreakerIgnoreExceptionList = null; + private List> fallbackExceptionList = FALLBACK_EXCEPTIONS_DEFAULT; public Builder(ClusterConfig[] clusterConfigs) { @@ -219,6 +230,10 @@ public Builder(ClusterConfig[] clusterConfigs) { this.clusterConfigs = clusterConfigs; } + public Builder(List clusterConfigs) { + this(clusterConfigs.toArray(new ClusterConfig[0])); + } + public Builder retryMaxAttempts(int retryMaxAttempts) { this.retryMaxAttempts = retryMaxAttempts; return this; @@ -284,8 +299,16 @@ public Builder circuitBreakerIgnoreExceptionList(List circuitBreakerIgnor return this; } + /** + * @deprecated Use {@link #fallbackExceptionList(java.util.List)}. + */ + @Deprecated public Builder circuitBreakerFallbackExceptionList(List> circuitBreakerFallbackExceptionList) { - this.circuitBreakerFallbackExceptionList = circuitBreakerFallbackExceptionList; + return fallbackExceptionList(circuitBreakerFallbackExceptionList); + } + + public Builder fallbackExceptionList(List> fallbackExceptionList) { + this.fallbackExceptionList = fallbackExceptionList; return this; } @@ -296,16 +319,9 @@ public MultiClusterClientConfig build() { config.retryWaitDuration = Duration.ofMillis(this.retryWaitDuration); config.retryWaitDurationExponentialBackoffMultiplier = this.retryWaitDurationExponentialBackoffMultiplier; - if (this.retryIncludedExceptionList != null && !retryIncludedExceptionList.isEmpty()) - config.retryIncludedExceptionList = this.retryIncludedExceptionList; - - else { - config.retryIncludedExceptionList = new ArrayList<>(); - config.retryIncludedExceptionList.add(RETRY_INCLUDED_EXCEPTIONS_DEFAULT); - } + config.retryIncludedExceptionList = this.retryIncludedExceptionList; - if (this.retryIgnoreExceptionList != null && !retryIgnoreExceptionList.isEmpty()) - config.retryIgnoreExceptionList = this.retryIgnoreExceptionList; + config.retryIgnoreExceptionList = this.retryIgnoreExceptionList; config.circuitBreakerFailureRateThreshold = this.circuitBreakerFailureRateThreshold; config.circuitBreakerSlidingWindowMinCalls = this.circuitBreakerSlidingWindowMinCalls; @@ -314,16 +330,11 @@ public MultiClusterClientConfig build() { config.circuitBreakerSlowCallDurationThreshold = Duration.ofMillis(this.circuitBreakerSlowCallDurationThreshold); config.circuitBreakerSlowCallRateThreshold = this.circuitBreakerSlowCallRateThreshold; - if (this.circuitBreakerIncludedExceptionList != null && !circuitBreakerIncludedExceptionList.isEmpty()) - config.circuitBreakerIncludedExceptionList = this.circuitBreakerIncludedExceptionList; + config.circuitBreakerIncludedExceptionList = this.circuitBreakerIncludedExceptionList; - else { - config.circuitBreakerIncludedExceptionList = new ArrayList<>(); - config.circuitBreakerIncludedExceptionList.add(CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT); - } + config.circuitBreakerIgnoreExceptionList = this.circuitBreakerIgnoreExceptionList; - if (this.circuitBreakerIgnoreExceptionList != null && !circuitBreakerIgnoreExceptionList.isEmpty()) - config.circuitBreakerIgnoreExceptionList = this.circuitBreakerIgnoreExceptionList; + config.fallbackExceptionList = this.fallbackExceptionList; return config; } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 489a331b3a..4af1261cd4 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -55,6 +55,7 @@ public final class Protocol { private static final String CLUSTERDOWN_PREFIX = "CLUSTERDOWN "; private static final String BUSY_PREFIX = "BUSY "; private static final String NOSCRIPT_PREFIX = "NOSCRIPT "; + private static final String NOAUTH_PREFIX = "NOAUTH"; private static final String WRONGPASS_PREFIX = "WRONGPASS"; private static final String NOPERM_PREFIX = "NOPERM"; @@ -100,9 +101,9 @@ private static void processError(final RedisInputStream is) { throw new JedisBusyException(message); } else if (message.startsWith(NOSCRIPT_PREFIX)) { throw new JedisNoScriptException(message); - } else if (message.startsWith(WRONGPASS_PREFIX)) { - throw new JedisAccessControlException(message); - } else if (message.startsWith(NOPERM_PREFIX)) { + } else if (message.startsWith(NOAUTH_PREFIX) + || message.startsWith(WRONGPASS_PREFIX) + || message.startsWith(NOPERM_PREFIX)) { throw new JedisAccessControlException(message); } throw new JedisDataException(message); diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java index 7a857174cb..38b32bbad0 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java @@ -32,8 +32,8 @@ public T executeCommand(CommandObject commandObject) { supplier.withRetry(cluster.getRetry()); supplier.withCircuitBreaker(cluster.getCircuitBreaker()); - supplier.withFallback(defaultCircuitBreakerFallbackException, - e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker())); + supplier.withFallback(provider.getFallbackExceptionList(), + e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker())); return supplier.decorate().get(); } diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java index 2228233849..b06d7b9604 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java @@ -1,11 +1,6 @@ package redis.clients.jedis.mcf; -import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; - -import java.util.Arrays; -import java.util.List; - import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; import redis.clients.jedis.util.IOUtils; @@ -21,9 +16,6 @@ */ public class CircuitBreakerFailoverBase implements AutoCloseable { - protected final static List> defaultCircuitBreakerFallbackException = - Arrays.asList(CallNotPermittedException.class); - protected final MultiClusterPooledConnectionProvider provider; public CircuitBreakerFailoverBase(MultiClusterPooledConnectionProvider provider) { diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java index dad2c751c8..10a0823973 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java @@ -26,8 +26,8 @@ public Connection getConnection() { supplier.withRetry(cluster.getRetry()); supplier.withCircuitBreaker(cluster.getCircuitBreaker()); - supplier.withFallback(defaultCircuitBreakerFallbackException, - e -> this.handleClusterFailover(cluster.getCircuitBreaker())); + supplier.withFallback(provider.getFallbackExceptionList(), + e -> this.handleClusterFailover(cluster.getCircuitBreaker())); return supplier.decorate().get(); } diff --git a/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java b/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java index 94f686c2d7..d4052dae7b 100644 --- a/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/mcf/MultiClusterPipeline.java @@ -22,12 +22,13 @@ public class MultiClusterPipeline extends PipelineBase implements Closeable { public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider) { super(new CommandObjects()); - try (Connection connection = pooledProvider.getConnection()) { // we don't need a healthy connection now + + this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider); + + try (Connection connection = failoverProvider.getConnection()) { RedisProtocol proto = connection.getRedisProtocol(); if (proto != null) this.commandObjects.setProtocol(proto); } - - this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider); } @Override diff --git a/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java b/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java index 5ce9ecd9b5..540911f2d6 100644 --- a/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java +++ b/src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java @@ -26,7 +26,7 @@ public class MultiClusterTransaction extends TransactionBase { private static final Builder NO_OP_BUILDER = BuilderFactory.RAW_OBJECT; - private final CircuitBreakerFailoverConnectionProvider provider; + private final CircuitBreakerFailoverConnectionProvider failoverProvider; private final AtomicInteger extraCommandCount = new AtomicInteger(); private final Queue>> commands = new LinkedList<>(); @@ -50,13 +50,13 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) { * @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI */ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti) { - try (Connection connection = provider.getConnection()) { // we don't need a healthy connection now + this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider); + + try (Connection connection = failoverProvider.getConnection()) { RedisProtocol proto = connection.getRedisProtocol(); if (proto != null) this.commandObjects.setProtocol(proto); } - this.provider = new CircuitBreakerFailoverConnectionProvider(provider); - if (doMulti) multi(); } @@ -129,7 +129,7 @@ public final List exec() { throw new IllegalStateException("EXEC without MULTI"); } - try (Connection connection = provider.getConnection()) { + try (Connection connection = failoverProvider.getConnection()) { commands.forEach((command) -> connection.sendCommand(command.getKey())); // following connection.getMany(int) flushes anyway, so no flush here. @@ -174,7 +174,7 @@ public final String discard() { throw new IllegalStateException("DISCARD without MULTI"); } - try (Connection connection = provider.getConnection()) { + try (Connection connection = failoverProvider.getConnection()) { commands.forEach((command) -> connection.sendCommand(command.getKey())); // following connection.getMany(int) flushes anyway, so no flush here. diff --git a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java index abe5515b97..e6013a2c58 100644 --- a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java @@ -8,18 +8,21 @@ import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.RetryConfig; import io.github.resilience4j.retry.RetryRegistry; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import redis.clients.jedis.*; import redis.clients.jedis.MultiClusterClientConfig.ClusterConfig; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisValidationException; import redis.clients.jedis.util.Pool; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; - /** * @author Allen Terleto (aterleto) @@ -31,6 +34,7 @@ * Support for manual failback is provided by way of {@link #setActiveMultiClusterIndex(int)} *

*/ +// TODO: move? public class MultiClusterPooledConnectionProvider implements ConnectionProvider { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -62,6 +66,7 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider */ private Consumer clusterFailoverPostProcessor; + private List> fallbackExceptionList; public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiClusterClientConfig) { @@ -78,7 +83,7 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste retryConfigBuilder.retryExceptions(multiClusterClientConfig.getRetryIncludedExceptionList().stream().toArray(Class[]::new)); List retryIgnoreExceptionList = multiClusterClientConfig.getRetryIgnoreExceptionList(); - if (retryIgnoreExceptionList != null && !retryIgnoreExceptionList.isEmpty()) + if (retryIgnoreExceptionList != null) retryConfigBuilder.ignoreExceptions(retryIgnoreExceptionList.stream().toArray(Class[]::new)); RetryConfig retryConfig = retryConfigBuilder.build(); @@ -96,7 +101,7 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste circuitBreakerConfigBuilder.automaticTransitionFromOpenToHalfOpenEnabled(false); // State transitions are forced. No half open states are used List circuitBreakerIgnoreExceptionList = multiClusterClientConfig.getCircuitBreakerIgnoreExceptionList(); - if (circuitBreakerIgnoreExceptionList != null && !circuitBreakerIgnoreExceptionList.isEmpty()) + if (circuitBreakerIgnoreExceptionList != null) circuitBreakerConfigBuilder.ignoreExceptions(circuitBreakerIgnoreExceptionList.stream().toArray(Class[]::new)); CircuitBreakerConfig circuitBreakerConfig = circuitBreakerConfigBuilder.build(); @@ -123,10 +128,14 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste circuitBreakerEventPublisher.onSlowCallRateExceeded(event -> log.error(String.valueOf(event))); circuitBreakerEventPublisher.onStateTransition(event -> log.warn(String.valueOf(event))); - multiClusterMap.put(config.getPriority(), new Cluster(new ConnectionPool(config.getHostAndPort(), - config.getJedisClientConfig()), - retry, circuitBreaker)); + multiClusterMap.put(config.getPriority(), + new Cluster(new ConnectionPool(config.getHostAndPort(), + config.getJedisClientConfig()), retry, circuitBreaker)); } + + /// --- /// + + this.fallbackExceptionList = multiClusterClientConfig.getFallbackExceptionList(); } /** @@ -289,6 +298,10 @@ public void setClusterFailoverPostProcessor(Consumer clusterFailoverPost this.clusterFailoverPostProcessor = clusterFailoverPostProcessor; } + public List> getFallbackExceptionList() { + return fallbackExceptionList; + } + public static class Cluster { private final ConnectionPool connectionPool; diff --git a/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java b/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java index 4f8f896afc..dc03c52f39 100644 --- a/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java +++ b/src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java @@ -1,11 +1,20 @@ package redis.clients.jedis.misc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import redis.clients.jedis.AbstractPipeline; import redis.clients.jedis.AbstractTransaction; @@ -16,51 +25,50 @@ import redis.clients.jedis.JedisClientConfig; import redis.clients.jedis.MultiClusterClientConfig; import redis.clients.jedis.UnifiedJedis; +import redis.clients.jedis.exceptions.JedisAccessControlException; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; +import redis.clients.jedis.util.IOUtils; public class AutomaticFailoverTest { - private final HostAndPort hostAndPort1 = HostAndPorts.getRedisServers().get(0); - private final HostAndPort hostAndPort2 = HostAndPorts.getRedisServers().get(1); + private static final Logger log = LoggerFactory.getLogger(AutomaticFailoverTest.class); + + private final HostAndPort hostPort_1 = new HostAndPort(HostAndPorts.getRedisServers().get(0).getHost(), 6378); + private final HostAndPort hostPort_1_2 = HostAndPorts.getRedisServers().get(0); + private final HostAndPort hostPort_2 = HostAndPorts.getRedisServers().get(7); - private final JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().password("foobared").build(); + private final JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().build(); - private Jedis jedis1; private Jedis jedis2; - private MultiClusterPooledConnectionProvider provider; + private List getClusterConfigs( + JedisClientConfig clientConfig, HostAndPort... hostPorts) { + return Arrays.stream(hostPorts) + .map(hp -> new MultiClusterClientConfig.ClusterConfig(hp, clientConfig)) + .collect(Collectors.toList()); + } @Before public void setUp() { - - MultiClusterClientConfig.ClusterConfig[] clusterConfigs = new MultiClusterClientConfig.ClusterConfig[2]; - clusterConfigs[0] = new MultiClusterClientConfig.ClusterConfig(hostAndPort1, clientConfig); - clusterConfigs[1] = new MultiClusterClientConfig.ClusterConfig(hostAndPort2, clientConfig); - - provider = new MultiClusterPooledConnectionProvider(new MultiClusterClientConfig.Builder(clusterConfigs).build()); - - jedis1 = new Jedis(hostAndPort1, clientConfig); - jedis1.flushAll(); - jedis2 = new Jedis(hostAndPort2, clientConfig); + jedis2 = new Jedis(hostPort_2, clientConfig); jedis2.flushAll(); } @After public void cleanUp() { - - provider.close(); - - jedis1.close(); - jedis2.close(); + IOUtils.closeQuietly(jedis2); } @Test public void pipelineWithSwitch() { + MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider( + new MultiClusterClientConfig.Builder(getClusterConfigs(clientConfig, hostPort_1, hostPort_2)).build()); + try (UnifiedJedis client = new UnifiedJedis(provider)) { AbstractPipeline pipe = client.pipelined(); pipe.set("pstr", "foobar"); pipe.hset("phash", "foo", "bar"); - provider.incrementActiveMultiClusterIndex(); + //provider.incrementActiveMultiClusterIndex(); pipe.sync(); } @@ -70,15 +78,116 @@ public void pipelineWithSwitch() { @Test public void transactionWithSwitch() { + MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider( + new MultiClusterClientConfig.Builder(getClusterConfigs(clientConfig, hostPort_1, hostPort_2)).build()); + try (UnifiedJedis client = new UnifiedJedis(provider)) { AbstractTransaction tx = client.multi(); tx.set("tstr", "foobar"); tx.hset("thash", "foo", "bar"); - provider.incrementActiveMultiClusterIndex(); + //provider.incrementActiveMultiClusterIndex(); assertEquals(Arrays.asList("OK", Long.valueOf(1L)), tx.exec()); } assertEquals("foobar", jedis2.get("tstr")); assertEquals("bar", jedis2.hget("thash", "foo")); } + + @Test + public void commandFailover() { + int slidingWindowMinCalls = 10; + int slidingWindowSize = 10; + + MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder( + getClusterConfigs(clientConfig, hostPort_1, hostPort_2)) + .circuitBreakerSlidingWindowMinCalls(slidingWindowMinCalls) + .circuitBreakerSlidingWindowSize(slidingWindowSize); + + RedisFailoverReporter failoverReporter = new RedisFailoverReporter(); + MultiClusterPooledConnectionProvider cacheProvider = new MultiClusterPooledConnectionProvider(builder.build()); + cacheProvider.setClusterFailoverPostProcessor(failoverReporter); + + UnifiedJedis jedis = new UnifiedJedis(cacheProvider); + + assertFalse(failoverReporter.failedOver); + log.info("Starting calls to Redis"); + String key = "hash-" + System.nanoTime(); + jedis.hset(key, "f1", "v1"); + assertTrue(failoverReporter.failedOver); + + assertEquals(Collections.singletonMap("f1", "v1"), jedis.hgetAll(key)); + jedis.flushAll(); + + jedis.close(); + } + + @Test + public void pipelineFailover() { + int slidingWindowMinCalls = 10; + int slidingWindowSize = 10; + + MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder( + getClusterConfigs(clientConfig, hostPort_1, hostPort_2)) + .circuitBreakerSlidingWindowMinCalls(slidingWindowMinCalls) + .circuitBreakerSlidingWindowSize(slidingWindowSize); + + RedisFailoverReporter failoverReporter = new RedisFailoverReporter(); + MultiClusterPooledConnectionProvider cacheProvider = new MultiClusterPooledConnectionProvider(builder.build()); + cacheProvider.setClusterFailoverPostProcessor(failoverReporter); + + UnifiedJedis jedis = new UnifiedJedis(cacheProvider); + + assertFalse(failoverReporter.failedOver); + log.info("Starting calls to Redis"); + AbstractPipeline pipe = jedis.pipelined(); + String key = "hash-" + System.nanoTime(); + pipe.hset(key, "f1", "v1"); + pipe.sync(); + assertTrue(failoverReporter.failedOver); + + assertEquals(Collections.singletonMap("f1", "v1"), jedis.hgetAll(key)); + jedis.flushAll(); + + jedis.close(); + } + + @Test + public void failoverFromAuthError() { + int slidingWindowMinCalls = 10; + int slidingWindowSize = 10; + + MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder( + getClusterConfigs(clientConfig, hostPort_1_2, hostPort_2)) + .circuitBreakerSlidingWindowMinCalls(slidingWindowMinCalls) + .circuitBreakerSlidingWindowSize(slidingWindowSize) + .fallbackExceptionList(Arrays.asList(JedisAccessControlException.class)); + + RedisFailoverReporter failoverReporter = new RedisFailoverReporter(); + MultiClusterPooledConnectionProvider cacheProvider = new MultiClusterPooledConnectionProvider(builder.build()); + cacheProvider.setClusterFailoverPostProcessor(failoverReporter); + + UnifiedJedis jedis = new UnifiedJedis(cacheProvider); + + assertFalse(failoverReporter.failedOver); + log.info("Starting calls to Redis"); + String key = "hash-" + System.nanoTime(); + jedis.hset(key, "f1", "v1"); + assertTrue(failoverReporter.failedOver); + + assertEquals(Collections.singletonMap("f1", "v1"), jedis.hgetAll(key)); + jedis.flushAll(); + + jedis.close(); + } + + class RedisFailoverReporter implements Consumer { + + boolean failedOver = false; + + @Override + public void accept(String clusterName) { + log.info("Jedis fail over to cluster: " + clusterName); + failedOver = true; + } + } }