@@ -60,13 +60,17 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
6060
6161 protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period" ;
6262
63+ protected static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms" ;
64+
6365 protected final ConcurrentMap <WAL , RollController > wals = new ConcurrentHashMap <>();
6466 protected final T abortable ;
6567 // Period to roll log.
6668 private final long rollPeriod ;
6769 private final int threadWakeFrequency ;
6870 // The interval to check low replication on hlog's pipeline
6971 private final long checkLowReplicationInterval ;
72+ // Wait period for roll log.
73+ private final long rollWaitTimeout ;
7074
7175 private volatile boolean running = true ;
7276
@@ -114,6 +118,7 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) {
114118 this .threadWakeFrequency = conf .getInt (HConstants .THREAD_WAKE_FREQUENCY , 10 * 1000 );
115119 this .checkLowReplicationInterval =
116120 conf .getLong ("hbase.regionserver.hlog.check.lowreplication.interval" , 30 * 1000 );
121+ this .rollWaitTimeout = conf .getLong (WAL_ROLL_WAIT_TIMEOUT , 30000 );
117122 }
118123
119124 /**
@@ -184,18 +189,38 @@ public void run() {
184189 } else {
185190 continue ;
186191 }
187- try {
188- // Force the roll if the logroll.period is elapsed or if a roll was requested.
189- // The returned value is an collection of actual region and family names.
190- Map <byte [], List <byte []>> regionsToFlush = controller .rollWal (now );
191- if (regionsToFlush != null ) {
192- for (Map .Entry <byte [], List <byte []>> r : regionsToFlush .entrySet ()) {
193- scheduleFlush (Bytes .toString (r .getKey ()), r .getValue ());
192+ Map <byte [], List <byte []>> regionsToFlush = null ;
193+ long startWaiting = EnvironmentEdgeManager .currentTime ();
194+ int nAttempts = 0 ;
195+ do {
196+ try {
197+ // Force the roll if the logroll.period is elapsed or if a roll was requested.
198+ // The returned value is an collection of actual region and family names.
199+ regionsToFlush = controller .rollWal (EnvironmentEdgeManager .currentTime ());
200+ break ;
201+ } catch (IOException ioe ) {
202+ if (ioe instanceof WALClosedException ) {
203+ LOG .warn ("WAL has been closed. Skipping rolling of writer and just remove it" , ioe );
204+ iter .remove ();
205+ break ;
194206 }
207+ long waitingTime = EnvironmentEdgeManager .currentTime () - startWaiting ;
208+ if (waitingTime < rollWaitTimeout ) {
209+ nAttempts ++;
210+ LOG .warn ("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry,"
211+ + " last excepiton= {}" , nAttempts , waitingTime ,
212+ ioe .getCause ().getClass ().getSimpleName ());
213+ sleep (1000 );
214+ } else {
215+ LOG .error ("Roll wal failed and waiting timeout, will not retry" , ioe );
216+ throw ioe ;
217+ }
218+ }
219+ } while (EnvironmentEdgeManager .currentTime () - startWaiting < rollWaitTimeout );
220+ if (regionsToFlush != null ) {
221+ for (Map .Entry <byte [], List <byte []>> r : regionsToFlush .entrySet ()) {
222+ scheduleFlush (Bytes .toString (r .getKey ()), r .getValue ());
195223 }
196- } catch (WALClosedException e ) {
197- LOG .warn ("WAL has been closed. Skipping rolling of writer and just remove it" , e );
198- iter .remove ();
199224 }
200225 }
201226 } catch (FailedLogCloseException | ConnectException e ) {
0 commit comments