@@ -166,38 +166,43 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro
166166 public void init (FileSystem fs , Path path , Configuration conf , boolean overwritable ,
167167 long blocksize , StreamSlowMonitor monitor ) throws IOException ,
168168 StreamLacksCapabilityException {
169- this . conf = conf ;
170- boolean doCompress = initializeCompressionContext ( conf , path ) ;
171- this . trailerWarnSize = conf . getInt ( WAL_TRAILER_WARN_SIZE , DEFAULT_WAL_TRAILER_WARN_SIZE );
172- int bufferSize = CommonFSUtils . getDefaultBufferSize ( fs );
173- short replication = ( short ) conf . getInt ( "hbase.regionserver.hlog.replication" ,
174- CommonFSUtils . getDefaultReplication ( fs , path ));
175-
176- initOutput ( fs , path , overwritable , bufferSize , replication , blocksize , monitor );
177-
178- boolean doTagCompress = doCompress &&
179- conf . getBoolean ( CompressionContext . ENABLE_WAL_TAGS_COMPRESSION , true );
180- boolean doValueCompress = doCompress &&
181- conf . getBoolean ( CompressionContext . ENABLE_WAL_VALUE_COMPRESSION , false );
182- WALHeader . Builder headerBuilder = WALHeader . newBuilder ()
183- . setHasCompression ( doCompress )
184- .setHasTagCompression (doTagCompress )
185- .setHasValueCompression (doValueCompress );
186- if (doValueCompress ) {
187- headerBuilder .setValueCompressionAlgorithm (
188- CompressionContext .getValueCompressionAlgorithm (conf ).ordinal ());
189- }
190- length .set (writeMagicAndWALHeader (ProtobufLogReader .PB_WAL_MAGIC ,
191- buildWALHeader (conf , headerBuilder )));
169+ try {
170+ this . conf = conf ;
171+ boolean doCompress = initializeCompressionContext ( conf , path );
172+ this . trailerWarnSize = conf . getInt ( WAL_TRAILER_WARN_SIZE , DEFAULT_WAL_TRAILER_WARN_SIZE );
173+ int bufferSize = CommonFSUtils . getDefaultBufferSize ( fs );
174+ short replication = ( short ) conf . getInt ( "hbase.regionserver.hlog.replication" ,
175+ CommonFSUtils . getDefaultReplication ( fs , path ));
176+
177+ initOutput ( fs , path , overwritable , bufferSize , replication , blocksize , monitor );
178+
179+ boolean doTagCompress =
180+ doCompress && conf . getBoolean ( CompressionContext . ENABLE_WAL_TAGS_COMPRESSION , true );
181+ boolean doValueCompress =
182+ doCompress && conf . getBoolean ( CompressionContext . ENABLE_WAL_VALUE_COMPRESSION , false );
183+ WALHeader . Builder headerBuilder =
184+ WALHeader . newBuilder (). setHasCompression ( doCompress ) .setHasTagCompression (doTagCompress )
185+ .setHasValueCompression (doValueCompress );
186+ if (doValueCompress ) {
187+ headerBuilder .setValueCompressionAlgorithm (
188+ CompressionContext .getValueCompressionAlgorithm (conf ).ordinal ());
189+ }
190+ length .set (writeMagicAndWALHeader (ProtobufLogReader .PB_WAL_MAGIC ,
191+ buildWALHeader (conf , headerBuilder )));
192192
193- initAfterHeader (doCompress );
193+ initAfterHeader (doCompress );
194194
195- // instantiate trailer to default value.
196- trailer = WALTrailer .newBuilder ().build ();
195+ // instantiate trailer to default value.
196+ trailer = WALTrailer .newBuilder ().build ();
197197
198- if (LOG .isTraceEnabled ()) {
199- LOG .trace ("Initialized protobuf WAL={}, compression={}, tagCompression={}" +
200- ", valueCompression={}" , path , doCompress , doTagCompress , doValueCompress );
198+ if (LOG .isTraceEnabled ()) {
199+ LOG .trace ("Initialized protobuf WAL={}, compression={}, tagCompression={}"
200+ + ", valueCompression={}" , path , doCompress , doTagCompress , doValueCompress );
201+ }
202+ } catch (Exception e ) {
203+ LOG .warn ("Init output failed, path={}" , path , e );
204+ closeOutput ();
205+ throw e ;
201206 }
202207 }
203208
@@ -265,6 +270,11 @@ protected abstract void initOutput(FileSystem fs, Path path, boolean overwritabl
265270 short replication , long blockSize , StreamSlowMonitor monitor )
266271 throws IOException , StreamLacksCapabilityException ;
267272
273+ /**
274+ * simply close the output, do not need to write trailer like the Writer.close
275+ */
276+ protected abstract void closeOutput ();
277+
268278 /**
269279 * return the file length after written.
270280 */
0 commit comments