@@ -60,13 +60,21 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
6060
6161 protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period" ;
6262
63+ protected static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms" ;
64+
65+ protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries" ;
66+
6367 protected final ConcurrentMap <WAL , RollController > wals = new ConcurrentHashMap <>();
6468 protected final T abortable ;
6569 // Period to roll log.
6670 private final long rollPeriod ;
6771 private final int threadWakeFrequency ;
6872 // The interval to check low replication on hlog's pipeline
6973 private final long checkLowReplicationInterval ;
74+ // Wait period for roll log
75+ private final long rollWaitTimeout ;
76+ // Max retry for roll log
77+ private final int maxRollRetry ;
7078
7179 private volatile boolean running = true ;
7280
@@ -114,6 +122,8 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) {
114122 this .threadWakeFrequency = conf .getInt (HConstants .THREAD_WAKE_FREQUENCY , 10 * 1000 );
115123 this .checkLowReplicationInterval =
116124 conf .getLong ("hbase.regionserver.hlog.check.lowreplication.interval" , 30 * 1000 );
125+ this .rollWaitTimeout = conf .getLong (WAL_ROLL_WAIT_TIMEOUT , 30000 );
126+ this .maxRollRetry = conf .getInt (WAL_ROLL_RETRIES , 2 );
117127 }
118128
119129 /**
@@ -184,18 +194,38 @@ public void run() {
184194 } else {
185195 continue ;
186196 }
187- try {
188- // Force the roll if the logroll.period is elapsed or if a roll was requested.
189- // The returned value is an collection of actual region and family names.
190- Map <byte [], List <byte []>> regionsToFlush = controller .rollWal (now );
191- if (regionsToFlush != null ) {
192- for (Map .Entry <byte [], List <byte []>> r : regionsToFlush .entrySet ()) {
193- scheduleFlush (Bytes .toString (r .getKey ()), r .getValue ());
197+ Map <byte [], List <byte []>> regionsToFlush = null ;
198+ int nAttempts = 0 ;
199+ long startWaiting = EnvironmentEdgeManager .currentTime ();
200+ do {
201+ try {
202+ // Force the roll if the logroll.period is elapsed or if a roll was requested.
203+ // The returned value is an collection of actual region and family names.
204+ regionsToFlush = controller .rollWal (EnvironmentEdgeManager .currentTime ());
205+ break ;
206+ } catch (IOException ioe ) {
207+ if (ioe instanceof WALClosedException ) {
208+ LOG .warn ("WAL has been closed. Skipping rolling of writer and just remove it" , ioe );
209+ iter .remove ();
210+ break ;
194211 }
212+ long waitingTime = EnvironmentEdgeManager .currentTime () - startWaiting ;
213+ if (waitingTime < rollWaitTimeout && nAttempts < maxRollRetry ) {
214+ nAttempts ++;
215+ LOG .warn ("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry,"
216+ + " last excepiton= {}" , nAttempts , waitingTime ,
217+ ioe .getCause ().getClass ().getSimpleName ());
218+ sleep (1000 );
219+ } else {
220+ LOG .error ("Roll wal failed and waiting timeout, will not retry" , ioe );
221+ throw ioe ;
222+ }
223+ }
224+ } while (EnvironmentEdgeManager .currentTime () - startWaiting < rollWaitTimeout );
225+ if (regionsToFlush != null ) {
226+ for (Map .Entry <byte [], List <byte []>> r : regionsToFlush .entrySet ()) {
227+ scheduleFlush (Bytes .toString (r .getKey ()), r .getValue ());
195228 }
196- } catch (WALClosedException e ) {
197- LOG .warn ("WAL has been closed. Skipping rolling of writer and just remove it" , e );
198- iter .remove ();
199229 }
200230 }
201231 } catch (FailedLogCloseException | ConnectException e ) {
0 commit comments