@@ -59,13 +59,30 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
59
59
60
60
protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period" ;
61
61
62
+ /**
63
+ * Configure for the timeout of log rolling retry.
64
+ */
65
+ protected static final String WAL_ROLL_WAIT_TIMEOUT =
66
+ "hbase.regionserver.logroll.wait.timeout.ms" ;
67
+
68
+ /**
69
+ * Configure for the max count of log rolling retry.
70
+ * The real retry count is also limited by the timeout of log rolling
71
+ * via {@link #WAL_ROLL_WAIT_TIMEOUT}
72
+ */
73
+ protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries" ;
74
+
62
75
protected final ConcurrentMap <WAL , RollController > wals = new ConcurrentHashMap <>();
63
76
protected final T abortable ;
64
77
// Period to roll log.
65
78
private final long rollPeriod ;
66
79
private final int threadWakeFrequency ;
67
80
// The interval to check low replication on hlog's pipeline
68
81
private final long checkLowReplicationInterval ;
82
+ // Wait period for roll log
83
+ private final long rollWaitTimeout ;
84
+ // Max retry for roll log
85
+ private final int maxRollRetry ;
69
86
70
87
private volatile boolean running = true ;
71
88
@@ -113,6 +130,9 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) {
113
130
this .threadWakeFrequency = conf .getInt (HConstants .THREAD_WAKE_FREQUENCY , 10 * 1000 );
114
131
this .checkLowReplicationInterval =
115
132
conf .getLong ("hbase.regionserver.hlog.check.lowreplication.interval" , 30 * 1000 );
133
+ this .rollWaitTimeout = conf .getLong (WAL_ROLL_WAIT_TIMEOUT , 30000 );
134
+ // retry rolling does not have to be the default behavior, so the default value is 0 here
135
+ this .maxRollRetry = conf .getInt (WAL_ROLL_RETRIES , 0 );
116
136
}
117
137
118
138
/**
@@ -183,9 +203,29 @@ public void run() {
183
203
} else {
184
204
continue ;
185
205
}
186
- // Force the roll if the logroll.period is elapsed or if a roll was requested.
187
- // The returned value is an collection of actual region and family names.
188
- Map <byte [], List <byte []>> regionsToFlush = controller .rollWal (now );
206
+ Map <byte [], List <byte []>> regionsToFlush = null ;
207
+ int nAttempts = 0 ;
208
+ long startWaiting = System .currentTimeMillis ();
209
+ do {
210
+ try {
211
+ // Force the roll if the logroll.period is elapsed or if a roll was requested.
212
+ // The returned value is an collection of actual region and family names.
213
+ regionsToFlush = controller .rollWal (System .currentTimeMillis ());
214
+ break ;
215
+ } catch (IOException ioe ) {
216
+ long waitingTime = System .currentTimeMillis () - startWaiting ;
217
+ if (waitingTime < rollWaitTimeout && nAttempts < maxRollRetry ) {
218
+ nAttempts ++;
219
+ LOG .warn ("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry,"
220
+ + " last excepiton= {}" , nAttempts , waitingTime ,
221
+ ioe .getCause ().getClass ().getSimpleName ());
222
+ sleep (1000 );
223
+ } else {
224
+ LOG .error ("Roll wal failed and waiting timeout, will not retry" , ioe );
225
+ throw ioe ;
226
+ }
227
+ }
228
+ } while (EnvironmentEdgeManager .currentTime () - startWaiting < rollWaitTimeout );
189
229
if (regionsToFlush != null ) {
190
230
for (Map .Entry <byte [], List <byte []>> r : regionsToFlush .entrySet ()) {
191
231
scheduleFlush (Bytes .toString (r .getKey ()), r .getValue ());
0 commit comments