Skip to content

Commit e008cb8

Browse files
committed
HBASE-26680 Close and do not write trailer for the broken WAL writer (#4174)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent 231dca1 commit e008cb8

File tree

4 files changed

+61
-37
lines changed

4 files changed

+61
-37
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -166,38 +166,43 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro
166166
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
167167
long blocksize, StreamSlowMonitor monitor) throws IOException,
168168
StreamLacksCapabilityException {
169-
this.conf = conf;
170-
boolean doCompress = initializeCompressionContext(conf, path);
171-
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
172-
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
173-
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
174-
CommonFSUtils.getDefaultReplication(fs, path));
175-
176-
initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);
177-
178-
boolean doTagCompress = doCompress &&
179-
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
180-
boolean doValueCompress = doCompress &&
181-
conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
182-
WALHeader.Builder headerBuilder = WALHeader.newBuilder()
183-
.setHasCompression(doCompress)
184-
.setHasTagCompression(doTagCompress)
185-
.setHasValueCompression(doValueCompress);
186-
if (doValueCompress) {
187-
headerBuilder.setValueCompressionAlgorithm(
188-
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
189-
}
190-
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
191-
buildWALHeader(conf, headerBuilder)));
169+
try {
170+
this.conf = conf;
171+
boolean doCompress = initializeCompressionContext(conf, path);
172+
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
173+
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
174+
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
175+
CommonFSUtils.getDefaultReplication(fs, path));
176+
177+
initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);
178+
179+
boolean doTagCompress =
180+
doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
181+
boolean doValueCompress =
182+
doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
183+
WALHeader.Builder headerBuilder =
184+
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)
185+
.setHasValueCompression(doValueCompress);
186+
if (doValueCompress) {
187+
headerBuilder.setValueCompressionAlgorithm(
188+
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
189+
}
190+
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
191+
buildWALHeader(conf, headerBuilder)));
192192

193-
initAfterHeader(doCompress);
193+
initAfterHeader(doCompress);
194194

195-
// instantiate trailer to default value.
196-
trailer = WALTrailer.newBuilder().build();
195+
// instantiate trailer to default value.
196+
trailer = WALTrailer.newBuilder().build();
197197

198-
if (LOG.isTraceEnabled()) {
199-
LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" +
200-
", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
198+
if (LOG.isTraceEnabled()) {
199+
LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}"
200+
+ ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
201+
}
202+
} catch (Exception e) {
203+
LOG.warn("Init output failed, path={}", path, e);
204+
closeOutput();
205+
throw e;
201206
}
202207
}
203208

@@ -265,6 +270,11 @@ protected abstract void initOutput(FileSystem fs, Path path, boolean overwritabl
265270
short replication, long blockSize, StreamSlowMonitor monitor)
266271
throws IOException, StreamLacksCapabilityException;
267272

273+
/**
274+
* simply close the output, do not need to write trailer like the Writer.close
275+
*/
276+
protected abstract void closeOutput();
277+
268278
/**
269279
* return the file length after written.
270280
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
4848
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
4949
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
50-
5150
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
5251
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
5352

@@ -197,6 +196,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu
197196
this.asyncOutputWrapper = new OutputStreamWrapper(output);
198197
}
199198

199+
@Override
200+
protected void closeOutput() {
201+
if (this.output != null) {
202+
try {
203+
this.output.close();
204+
} catch (IOException e) {
205+
LOG.warn("Close output failed", e);
206+
}
207+
}
208+
}
209+
200210
private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
201211
CompletableFuture<Long> future = new CompletableFuture<>();
202212
action.accept(future);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu
118118
}
119119
}
120120

121+
@Override
122+
protected void closeOutput() {
123+
if (this.output != null) {
124+
try {
125+
this.output.close();
126+
} catch (IOException e) {
127+
LOG.warn("Close output failed", e);
128+
}
129+
}
130+
}
131+
121132
@Override
122133
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
123134
output.write(magic);

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,6 @@ public static Writer createWriter(final Configuration conf, final FileSystem fs,
9090
} else {
9191
LOG.debug("Error instantiating log writer.", e);
9292
}
93-
if (writer != null) {
94-
try{
95-
writer.close();
96-
} catch(IOException ee){
97-
LOG.error("cannot close log writer", ee);
98-
}
99-
}
10093
throw new IOException("cannot get log writer", e);
10194
}
10295
}

0 commit comments

Comments
 (0)