Skip to content

Commit b737869

Browse files
authored
HADOOP-18383. Codecs with @DoNotPool annotation are not closed causing memory leak (#4585)
1 parent e0c8c6e commit b737869

File tree

5 files changed

+113
-1
lines changed

5 files changed

+113
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.io.compress;
20+
21+
import java.io.IOException;
22+
23+
/**
24+
* An exception class for when a closed compressor/decopressor is being used
25+
* {@link org.apache.hadoop.io.compress.Compressor}
26+
* {@link org.apache.hadoop.io.compress.Decompressor}
27+
*/
28+
public class AlreadyClosedException extends IOException {
29+
30+
public AlreadyClosedException(String message) {
31+
super(message);
32+
}
33+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ public static void returnCompressor(Compressor compressor) {
205205
}
206206
// if the compressor can't be reused, don't pool it.
207207
if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
208+
compressor.end();
208209
return;
209210
}
210211
compressor.reset();
@@ -225,6 +226,7 @@ public static void returnDecompressor(Decompressor decompressor) {
225226
}
226227
// if the decompressor can't be reused, don't pool it.
227228
if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
229+
decompressor.end();
228230
return;
229231
}
230232
decompressor.reset();

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.zip.GZIPOutputStream;
2525

2626
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.io.compress.AlreadyClosedException;
2728
import org.apache.hadoop.io.compress.Compressor;
2829
import org.apache.hadoop.io.compress.DoNotPool;
2930
import org.apache.hadoop.util.DataChecksum;
@@ -83,6 +84,10 @@ public int compress(byte[] b, int off, int len) throws IOException {
8384
throw new IOException("compress called on finished compressor");
8485
}
8586

87+
if (state == BuiltInGzipDecompressor.GzipStateLabel.ENDED) {
88+
throw new AlreadyClosedException("compress called on closed compressor");
89+
}
90+
8691
int compressedBytesWritten = 0;
8792

8893
// If we are not within uncompressed data yet, output the header.
@@ -139,6 +144,8 @@ public long getBytesWritten() {
139144
@Override
140145
public void end() {
141146
deflater.end();
147+
148+
state = BuiltInGzipDecompressor.GzipStateLabel.ENDED;
142149
}
143150

144151
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.zip.DataFormatException;
2424
import java.util.zip.Inflater;
2525

26+
import org.apache.hadoop.io.compress.AlreadyClosedException;
2627
import org.apache.hadoop.io.compress.Decompressor;
2728
import org.apache.hadoop.io.compress.DoNotPool;
2829
import org.apache.hadoop.util.DataChecksum;
@@ -109,7 +110,11 @@ public enum GzipStateLabel {
109110
* Immediately after the trailer (and potentially prior to the next gzip
110111
* member/substream header), without reset() having been called.
111112
*/
112-
FINISHED;
113+
FINISHED,
114+
/**
115+
* Immediately after end() has been called.
116+
*/
117+
ENDED;
113118
}
114119

115120
/**
@@ -186,6 +191,10 @@ public synchronized int decompress(byte[] b, int off, int len)
186191
throws IOException {
187192
int numAvailBytes = 0;
188193

194+
if (state == GzipStateLabel.ENDED) {
195+
throw new AlreadyClosedException("decompress called on closed decompressor");
196+
}
197+
189198
if (state != GzipStateLabel.DEFLATE_STREAM) {
190199
executeHeaderState();
191200

@@ -476,6 +485,8 @@ public synchronized void reset() {
476485
@Override
477486
public synchronized void end() {
478487
inflater.end();
488+
489+
state = GzipStateLabel.ENDED;
479490
}
480491

481492
/**

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,20 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121

22+
import java.io.ByteArrayInputStream;
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.OutputStream;
25+
import java.util.Random;
2226
import java.util.concurrent.Callable;
2327
import java.util.concurrent.ExecutorService;
2428
import java.util.concurrent.Executors;
2529
import java.util.concurrent.LinkedBlockingDeque;
2630
import java.util.concurrent.TimeUnit;
2731

2832
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor;
34+
import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
35+
import org.apache.hadoop.test.LambdaTestUtils;
2936
import org.junit.Before;
3037
import org.junit.Test;
3138

@@ -189,4 +196,56 @@ public void testDecompressorNotReturnSameInstance() {
189196
CodecPool.returnDecompressor(decompressor);
190197
}
191198
}
199+
200+
@Test(timeout = 10000)
201+
public void testDoNotPoolCompressorNotUseableAfterReturn() throws Exception {
202+
203+
final GzipCodec gzipCodec = new GzipCodec();
204+
gzipCodec.setConf(new Configuration());
205+
206+
// BuiltInGzipCompressor is an explicit example of a Compressor with the @DoNotPool annotation
207+
final Compressor compressor = new BuiltInGzipCompressor(new Configuration());
208+
CodecPool.returnCompressor(compressor);
209+
210+
final CompressionOutputStream outputStream =
211+
gzipCodec.createOutputStream(new ByteArrayOutputStream(), compressor);
212+
LambdaTestUtils.intercept(
213+
AlreadyClosedException.class,
214+
"compress called on closed compressor",
215+
"Compressor from Codec with @DoNotPool should not be " +
216+
"useable after returning to CodecPool",
217+
() -> outputStream.write(1));
218+
}
219+
220+
@Test(timeout = 10000)
221+
public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception {
222+
223+
final GzipCodec gzipCodec = new GzipCodec();
224+
gzipCodec.setConf(new Configuration());
225+
226+
final Random random = new Random();
227+
final byte[] bytes = new byte[1024];
228+
random.nextBytes(bytes);
229+
230+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
231+
try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) {
232+
outputStream.write(bytes);
233+
}
234+
235+
final byte[] gzipBytes = baos.toByteArray();
236+
final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes);
237+
238+
// BuiltInGzipDecompressor is an explicit example of a Decompressor
239+
// with the @DoNotPool annotation
240+
final Decompressor decompressor = new BuiltInGzipDecompressor();
241+
CodecPool.returnDecompressor(decompressor);
242+
243+
final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor);
244+
LambdaTestUtils.intercept(
245+
AlreadyClosedException.class,
246+
"decompress called on closed decompressor",
247+
"Decompressor from Codec with @DoNotPool should not be " +
248+
"useable after returning to CodecPool",
249+
() -> inputStream.read());
250+
}
192251
}

0 commit comments

Comments
 (0)