@@ -60,13 +60,29 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
60
60
61
61
protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period" ;
62
62
63
+ /**
64
+ * Configure for the timeout of log rolling retry.
65
+ */
66
+ protected static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms" ;
67
+
68
+ /**
69
+ * Configure for the max count of log rolling retry.
70
+ * The real retry count is also limited by the timeout of log rolling
71
+ * via {@link #WAL_ROLL_WAIT_TIMEOUT}
72
+ */
73
+ protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries" ;
74
+
63
75
protected final ConcurrentMap <WAL , RollController > wals = new ConcurrentHashMap <>();
64
76
protected final T abortable ;
65
77
// Period to roll log.
66
78
private final long rollPeriod ;
67
79
private final int threadWakeFrequency ;
68
80
// The interval to check low replication on hlog's pipeline
69
81
private final long checkLowReplicationInterval ;
82
+ // Wait period for roll log
83
+ private final long rollWaitTimeout ;
84
+ // Max retry for roll log
85
+ private final int maxRollRetry ;
70
86
71
87
private volatile boolean running = true ;
72
88
@@ -114,6 +130,9 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) {
114
130
this .threadWakeFrequency = conf .getInt (HConstants .THREAD_WAKE_FREQUENCY , 10 * 1000 );
115
131
this .checkLowReplicationInterval =
116
132
conf .getLong ("hbase.regionserver.hlog.check.lowreplication.interval" , 30 * 1000 );
133
+ this .rollWaitTimeout = conf .getLong (WAL_ROLL_WAIT_TIMEOUT , 30000 );
134
+ // retry rolling does not have to be the default behavior, so the default value is 0 here
135
+ this .maxRollRetry = conf .getInt (WAL_ROLL_RETRIES , 0 );
117
136
}
118
137
119
138
/**
@@ -184,18 +203,38 @@ public void run() {
184
203
} else {
185
204
continue ;
186
205
}
187
- try {
188
- // Force the roll if the logroll.period is elapsed or if a roll was requested.
189
- // The returned value is an collection of actual region and family names.
190
- Map <byte [], List <byte []>> regionsToFlush = controller .rollWal (now );
191
- if (regionsToFlush != null ) {
192
- for (Map .Entry <byte [], List <byte []>> r : regionsToFlush .entrySet ()) {
193
- scheduleFlush (Bytes .toString (r .getKey ()), r .getValue ());
206
+ Map <byte [], List <byte []>> regionsToFlush = null ;
207
+ int nAttempts = 0 ;
208
+ long startWaiting = EnvironmentEdgeManager .currentTime ();
209
+ do {
210
+ try {
211
+ // Force the roll if the logroll.period is elapsed or if a roll was requested.
212
+ // The returned value is an collection of actual region and family names.
213
+ regionsToFlush = controller .rollWal (EnvironmentEdgeManager .currentTime ());
214
+ break ;
215
+ } catch (IOException ioe ) {
216
+ if (ioe instanceof WALClosedException ) {
217
+ LOG .warn ("WAL has been closed. Skipping rolling of writer and just remove it" , ioe );
218
+ iter .remove ();
219
+ break ;
194
220
}
221
+ long waitingTime = EnvironmentEdgeManager .currentTime () - startWaiting ;
222
+ if (waitingTime < rollWaitTimeout && nAttempts < maxRollRetry ) {
223
+ nAttempts ++;
224
+ LOG .warn ("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry,"
225
+ + " last excepiton= {}" , nAttempts , waitingTime ,
226
+ ioe .getCause ().getClass ().getSimpleName ());
227
+ sleep (1000 );
228
+ } else {
229
+ LOG .error ("Roll wal failed and waiting timeout, will not retry" , ioe );
230
+ throw ioe ;
231
+ }
232
+ }
233
+ } while (EnvironmentEdgeManager .currentTime () - startWaiting < rollWaitTimeout );
234
+ if (regionsToFlush != null ) {
235
+ for (Map .Entry <byte [], List <byte []>> r : regionsToFlush .entrySet ()) {
236
+ scheduleFlush (Bytes .toString (r .getKey ()), r .getValue ());
195
237
}
196
- } catch (WALClosedException e ) {
197
- LOG .warn ("WAL has been closed. Skipping rolling of writer and just remove it" , e );
198
- iter .remove ();
199
238
}
200
239
}
201
240
} catch (FailedLogCloseException | ConnectException e ) {
0 commit comments