Skip to content

Commit 3ffc076

Browse files
committed
ARROW-6199: [Java] Avro adapter avoid potential resource leak.
1 parent 7f6f010 commit 3ffc076

File tree

7 files changed

+105
-18
lines changed

7 files changed

+105
-18
lines changed

java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE;
2121
import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE;
2222

23-
import java.io.EOFException;
2423
import java.io.IOException;
2524
import java.util.ArrayList;
2625
import java.util.HashMap;
@@ -37,6 +36,7 @@
3736
import org.apache.arrow.consumers.AvroNullConsumer;
3837
import org.apache.arrow.consumers.AvroStringConsumer;
3938
import org.apache.arrow.consumers.AvroUnionsConsumer;
39+
import org.apache.arrow.consumers.CompositeAvroConsumer;
4040
import org.apache.arrow.consumers.Consumer;
4141
import org.apache.arrow.consumers.NullableTypeConsumer;
4242
import org.apache.arrow.memory.BufferAllocator;
@@ -246,19 +246,10 @@ public static VectorSchemaRoot avroToArrowVectors(Schema schema, Decoder decoder
246246

247247
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
248248

249-
int valueCount = 0;
250-
while (true) {
251-
try {
252-
for (Consumer consumer : consumers) {
253-
consumer.consume(decoder);
254-
}
255-
valueCount++;
256-
//reach end will throw EOFException.
257-
} catch (EOFException eofException) {
258-
root.setRowCount(valueCount);
259-
break;
260-
}
249+
try (CompositeAvroConsumer compositeConsumer = new CompositeAvroConsumer(consumers)) {
250+
compositeConsumer.consume(decoder, root);
261251
}
252+
262253
return root;
263254
}
264255
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,11 @@ public void setPosition(int index) {
7979
public FieldVector getVector() {
8080
return vector;
8181
}
82+
83+
@Override
84+
public void close() {
85+
if (cacheBuffer != null) {
86+
cacheBuffer.clear();
87+
}
88+
}
8289
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,11 @@ public void setPosition(int index) {
8080
public FieldVector getVector() {
8181
return this.vector;
8282
}
83+
84+
@Override
85+
public void close() {
86+
if (cacheBuffer != null) {
87+
cacheBuffer.clear();
88+
}
89+
}
8390
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
*/
3232
public class AvroUnionsConsumer implements Consumer {
3333

34-
private Consumer[] indexDelegates;
34+
private Consumer[] delegates;
3535
private Types.MinorType[] types;
3636

3737
private UnionWriter writer;
@@ -40,11 +40,11 @@ public class AvroUnionsConsumer implements Consumer {
4040
/**
4141
* Instantiate a AvroUnionConsumer.
4242
*/
43-
public AvroUnionsConsumer(UnionVector vector, Consumer[] indexDelegates, Types.MinorType[] types) {
43+
public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorType[] types) {
4444

4545
this.writer = new UnionWriter(vector);
4646
this.vector = vector;
47-
this.indexDelegates = indexDelegates;
47+
this.delegates = delegates;
4848
this.types = types;
4949
}
5050

@@ -53,7 +53,7 @@ public void consume(Decoder decoder) throws IOException {
5353
int fieldIndex = decoder.readInt();
5454
int position = writer.getPosition();
5555

56-
Consumer delegate = indexDelegates[fieldIndex];
56+
Consumer delegate = delegates[fieldIndex];
5757

5858
vector.setType(position, types[fieldIndex]);
5959
// In UnionVector we need to set sub vector writer position before consume a value
@@ -80,4 +80,11 @@ public FieldVector getVector() {
8080
vector.setValueCount(writer.getPosition());
8181
return this.vector;
8282
}
83+
84+
@Override
85+
public void close() {
86+
for (Consumer consumer : delegates) {
87+
consumer.close();
88+
}
89+
}
8390
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.consumers;
19+
20+
import java.io.EOFException;
21+
import java.io.IOException;
22+
import java.util.List;
23+
24+
import org.apache.arrow.vector.VectorSchemaRoot;
25+
import org.apache.avro.io.Decoder;
26+
27+
/**
28+
* Composite consumer which hold all consumers.
29+
* It manages the consume and cleanup process.
30+
*/
31+
public class CompositeAvroConsumer implements AutoCloseable {
32+
33+
private final List<Consumer> consumers;
34+
35+
public CompositeAvroConsumer(List<Consumer> consumers) {
36+
this.consumers = consumers;
37+
}
38+
39+
/**
40+
* Consume decoder data and write into {@link VectorSchemaRoot}.
41+
*/
42+
public void consume(Decoder decoder, VectorSchemaRoot root) throws IOException {
43+
int valueCount = 0;
44+
while (true) {
45+
try {
46+
for (Consumer consumer : consumers) {
47+
consumer.consume(decoder);
48+
}
49+
valueCount++;
50+
//reach end will throw EOFException.
51+
} catch (EOFException eofException) {
52+
root.setRowCount(valueCount);
53+
break;
54+
}
55+
}
56+
}
57+
58+
@Override
59+
public void close() {
60+
// clean up
61+
for (Consumer consumer : consumers) {
62+
consumer.close();
63+
}
64+
}
65+
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
/**
2626
* Interface that is used to consume values from avro decoder.
2727
*/
28-
public interface Consumer {
28+
public interface Consumer extends AutoCloseable {
2929

3030
/**
3131
* Consume a specific type value from avro decoder and write it to vector.
@@ -48,4 +48,9 @@ public interface Consumer {
4848
* Get the vector within the consumer.
4949
*/
5050
FieldVector getVector();
51+
52+
/**
53+
* Close this consumer, do some clean work such as clear reuse ArrowBuf.
54+
*/
55+
default void close() {}
5156
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,9 @@ public FieldVector getVector() {
6565
return delegate.getVector();
6666
}
6767

68+
@Override
69+
public void close() {
70+
delegate.close();
71+
}
72+
6873
}

0 commit comments

Comments
 (0)