diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/exception/psync/KeeperTolerantClosePsyncException.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/exception/psync/KeeperTolerantClosePsyncException.java new file mode 100644 index 0000000000..25b66facf4 --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/exception/psync/KeeperTolerantClosePsyncException.java @@ -0,0 +1,18 @@ +package com.ctrip.xpipe.redis.keeper.exception.psync; + +import com.ctrip.xpipe.redis.keeper.monitor.PsyncFailReason; + +public class KeeperTolerantClosePsyncException extends PsyncRuntimeException { + + public KeeperTolerantClosePsyncException(PsyncRuntimeException e) { + super("keeper tolerant:" + e.getMessage(), e); + } + + @Override + public PsyncFailReason toReason() { + Throwable cause = getCause(); + if (cause instanceof PsyncRuntimeException) return ((PsyncRuntimeException) cause).toReason(); + else return super.toReason(); + } + +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplication.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplication.java index afee3874f3..87cba0f26e 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplication.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplication.java @@ -11,8 +11,10 @@ import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; import com.ctrip.xpipe.redis.keeper.RedisMaster; import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; +import com.ctrip.xpipe.redis.keeper.exception.psync.KeeperTolerantClosePsyncException; import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncConnectMasterFailException; import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncMasterRdbOffsetNotContinuousRuntimeException; +import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncRuntimeException; import com.ctrip.xpipe.redis.keeper.exception.replication.UnexpectedReplIdException; import com.ctrip.xpipe.redis.keeper.store.RdbOnlyReplicationStore; import com.ctrip.xpipe.utils.VisibleForTesting; @@ -24,6 +26,7 @@ import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; /** * @author wenchao.meng @@ -47,6 +50,8 @@ enum REPL_STATE { private REPL_STATE state; + private AtomicReference currentPsync = new AtomicReference<>(); + public RdbonlyRedisMasterReplication(RedisKeeperServer redisKeeperServer, RedisMaster redisMaster, boolean tryRordb, boolean freshRdbNeeded, NioEventLoopGroup nioEventLoopGroup, ScheduledExecutorService scheduled, @@ -120,14 +125,24 @@ protected void psyncFail(Throwable cause) { protected Psync createPsync() { Psync psync; + RdbOnlyReplicationStore replicationStore; + try { + replicationStore = tryGetReplicationStore(); + } catch (Throwable th) { + logger.info("[createPsync][prepareReplicationStore][fail] close", th); + disconnectWithMaster(); + throw new PsyncRuntimeException("prepare ReplicationStore fail", th); + } + if (state.equals(REPL_STATE.FRESH_SYNC)) { - psync = new FreshRdbOnlyPsync(clientPool, rdbOnlyReplicationStore, scheduled); + psync = new FreshRdbOnlyPsync(clientPool, replicationStore, scheduled); } else { - psync = new RdbOnlyPsync(clientPool, rdbOnlyReplicationStore, scheduled); + psync = new RdbOnlyPsync(clientPool, replicationStore, scheduled); } psync.addPsyncObserver(this); psync.addPsyncObserver(redisKeeperServer.createPsyncObserverForRdbOnlyRepl()); + currentPsync.set(psync); return psync; } @@ -175,11 +190,13 @@ protected void doOnFullSync(long masterRdbOffset) { if (state.equals(REPL_STATE.NORMAL_SYNC)) { state = REPL_STATE.FAIL_FOR_NOT_CONTINUE; try { - logger.info("[retryOnceForRdbNotContinue][resetRdbStore]{}", dumpedRdbStore); - dumpedRdbStore = getRdbDumper().prepareRdbStore(); - rdbOnlyReplicationStore = new RdbOnlyReplicationStore(dumpedRdbStore); + logger.info("[retryOnceForRdbNotContinue][resetRdbStore][{}:{}]{}", masterRdbOffset, firstAvailable, dumpedRdbStore); + currentPsync.get().future().setFailure(new KeeperTolerantClosePsyncException( + new PsyncMasterRdbOffsetNotContinuousRuntimeException(masterRdbOffset, firstAvailable))); + resetReplicationStore(); disconnectWithMaster(); } catch (Exception e) { + logger.info("[doOnFullSync][retryForNotContinue] fail", e); dumpFail(new PsyncMasterRdbOffsetNotContinuousRuntimeException(masterRdbOffset, firstAvailable)); } } else { @@ -188,6 +205,34 @@ protected void doOnFullSync(long masterRdbOffset) { } } + private synchronized void resetReplicationStore() { + dumpedRdbStore = null; + rdbOnlyReplicationStore = null; + } + + private RdbOnlyReplicationStore tryGetReplicationStore() throws IOException { + if (null != rdbOnlyReplicationStore) return rdbOnlyReplicationStore; + + synchronized (this) { + if (null != rdbOnlyReplicationStore) return rdbOnlyReplicationStore; + + dumpedRdbStore = getRdbDumper().prepareRdbStore(); + rdbOnlyReplicationStore = new RdbOnlyReplicationStore(dumpedRdbStore); + return rdbOnlyReplicationStore; + } + } + + @Override + protected void dumpFail(Throwable th) { + if (th instanceof KeeperTolerantClosePsyncException || + (null != th.getCause() && th.getCause() instanceof KeeperTolerantClosePsyncException)) { + logger.info("[dumpFail][tolerant] {}", th.getMessage()); + return; + } + + super.dumpFail(th); + } + @Override protected String getSimpleName() { return "RdbRep"; diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplicationTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplicationTest.java index b7f2627f74..388f273558 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplicationTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplicationTest.java @@ -13,6 +13,7 @@ import com.ctrip.xpipe.redis.keeper.AbstractRedisKeeperTest; import com.ctrip.xpipe.redis.keeper.RedisMaster; import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; +import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncCommandFailException; import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncMasterRdbOffsetNotContinuousRuntimeException; import com.ctrip.xpipe.redis.keeper.monitor.KeeperMonitor; import com.ctrip.xpipe.redis.keeper.monitor.KeeperStats; @@ -159,6 +160,14 @@ protected void scheduleReconnect(int timeMilli) { } }; + Psync psync = rdbonlyRedisMasterReplication.createPsync(); + Assert.assertTrue(psync instanceof RdbOnlyPsync); + + psync.future().addListener(commandFuture -> { + if (!commandFuture.isSuccess()) { + rdbonlyRedisMasterReplication.dumpFail(new PsyncCommandFailException(commandFuture.cause())); + } + }); when(replicationStore.firstAvailableOffset()).thenReturn(120L); rdbonlyRedisMasterReplication.onFullSync(100); Assert.assertFalse(dumper.future().isDone()); @@ -167,6 +176,9 @@ protected void scheduleReconnect(int timeMilli) { Assert.assertEquals(1, reconnectCnt.get()); Assert.assertFalse(dumper.future().isDone()); + psync = rdbonlyRedisMasterReplication.createPsync(); + Assert.assertTrue(psync instanceof FreshRdbOnlyPsync); + rdbonlyRedisMasterReplication.masterDisconnected(Mockito.mock(Channel.class)); Assert.assertEquals(1, reconnectCnt.get()); Assert.assertTrue(dumper.future().isDone());