@@ -171,38 +171,43 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro
171171 public void init (FileSystem fs , Path path , Configuration conf , boolean overwritable ,
172172 long blocksize , StreamSlowMonitor monitor ) throws IOException ,
173173 StreamLacksCapabilityException {
174- this . conf = conf ;
175- boolean doCompress = initializeCompressionContext ( conf , path ) ;
176- this . trailerWarnSize = conf . getInt ( WAL_TRAILER_WARN_SIZE , DEFAULT_WAL_TRAILER_WARN_SIZE );
177- int bufferSize = CommonFSUtils . getDefaultBufferSize ( fs );
178- short replication = ( short ) conf . getInt ( "hbase.regionserver.hlog.replication" ,
179- CommonFSUtils . getDefaultReplication ( fs , path ));
180-
181- initOutput ( fs , path , overwritable , bufferSize , replication , blocksize , monitor );
182-
183- boolean doTagCompress = doCompress &&
184- conf . getBoolean ( CompressionContext . ENABLE_WAL_TAGS_COMPRESSION , true );
185- boolean doValueCompress = doCompress &&
186- conf . getBoolean ( CompressionContext . ENABLE_WAL_VALUE_COMPRESSION , false );
187- WALHeader . Builder headerBuilder = WALHeader . newBuilder ()
188- . setHasCompression ( doCompress )
189- .setHasTagCompression (doTagCompress )
190- .setHasValueCompression (doValueCompress );
191- if (doValueCompress ) {
192- headerBuilder .setValueCompressionAlgorithm (
193- CompressionContext .getValueCompressionAlgorithm (conf ).ordinal ());
194- }
195- length .set (writeMagicAndWALHeader (ProtobufLogReader .PB_WAL_MAGIC ,
196- buildWALHeader (conf , headerBuilder )));
174+ try {
175+ this . conf = conf ;
176+ boolean doCompress = initializeCompressionContext ( conf , path );
177+ this . trailerWarnSize = conf . getInt ( WAL_TRAILER_WARN_SIZE , DEFAULT_WAL_TRAILER_WARN_SIZE );
178+ int bufferSize = CommonFSUtils . getDefaultBufferSize ( fs );
179+ short replication = ( short ) conf . getInt ( "hbase.regionserver.hlog.replication" ,
180+ CommonFSUtils . getDefaultReplication ( fs , path ));
181+
182+ initOutput ( fs , path , overwritable , bufferSize , replication , blocksize , monitor );
183+
184+ boolean doTagCompress =
185+ doCompress && conf . getBoolean ( CompressionContext . ENABLE_WAL_TAGS_COMPRESSION , true );
186+ boolean doValueCompress =
187+ doCompress && conf . getBoolean ( CompressionContext . ENABLE_WAL_VALUE_COMPRESSION , false );
188+ WALHeader . Builder headerBuilder =
189+ WALHeader . newBuilder (). setHasCompression ( doCompress ) .setHasTagCompression (doTagCompress )
190+ .setHasValueCompression (doValueCompress );
191+ if (doValueCompress ) {
192+ headerBuilder .setValueCompressionAlgorithm (
193+ CompressionContext .getValueCompressionAlgorithm (conf ).ordinal ());
194+ }
195+ length .set (writeMagicAndWALHeader (ProtobufLogReader .PB_WAL_MAGIC ,
196+ buildWALHeader (conf , headerBuilder )));
197197
198- initAfterHeader (doCompress );
198+ initAfterHeader (doCompress );
199199
200- // instantiate trailer to default value.
201- trailer = WALTrailer .newBuilder ().build ();
200+ // instantiate trailer to default value.
201+ trailer = WALTrailer .newBuilder ().build ();
202202
203- if (LOG .isTraceEnabled ()) {
204- LOG .trace ("Initialized protobuf WAL={}, compression={}, tagCompression={}" +
205- ", valueCompression={}" , path , doCompress , doTagCompress , doValueCompress );
203+ if (LOG .isTraceEnabled ()) {
204+ LOG .trace ("Initialized protobuf WAL={}, compression={}, tagCompression={}"
205+ + ", valueCompression={}" , path , doCompress , doTagCompress , doValueCompress );
206+ }
207+ } catch (Exception e ) {
208+ LOG .warn ("Init output failed, path={}" , path , e );
209+ closeOutput ();
210+ throw e ;
206211 }
207212 }
208213
@@ -270,6 +275,8 @@ protected abstract void initOutput(FileSystem fs, Path path, boolean overwritabl
270275 short replication , long blockSize , StreamSlowMonitor monitor )
271276 throws IOException , StreamLacksCapabilityException ;
272277
278+ protected abstract void closeOutput ();
279+
273280 /**
274281 * return the file length after written.
275282 */
0 commit comments