Skip to content

Commit dd4532a

Browse files
tianchen92emkornfield
authored andcommitted
ARROW-6199: [Java] Avro adapter avoid potential resource leak.
Related to [ARROW-6199](https://issues.apache.org/jira/browse/ARROW-6199). Currently, avro consumer interface has no close API, which may cause resource leak like AvroBytesConsumer#cacheBuffer. To resolve this, make consumer extends AutoCloseable and create CompositeAvroConsumer to encompasses consume and close logic. Closes #5059 from tianchen92/ARROW-6199 and squashes the following commits: d60d94c <tianchen> fix 42f22da <tianchen> clear vectors in close 5b91da7 <tianchen> fix comments 3ffc076 <tianchen> ARROW-6199: Avro adapter avoid potential resource leak. Authored-by: tianchen <niki.lj@alibaba-inc.com> Signed-off-by: Micah Kornfield <emkornfield@gmail.com>
1 parent 5479d30 commit dd4532a

13 files changed

+141
-18
lines changed

java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE;
2121
import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE;
2222

23-
import java.io.EOFException;
2423
import java.io.IOException;
2524
import java.util.ArrayList;
2625
import java.util.HashMap;
@@ -37,6 +36,7 @@
3736
import org.apache.arrow.consumers.AvroNullConsumer;
3837
import org.apache.arrow.consumers.AvroStringConsumer;
3938
import org.apache.arrow.consumers.AvroUnionsConsumer;
39+
import org.apache.arrow.consumers.CompositeAvroConsumer;
4040
import org.apache.arrow.consumers.Consumer;
4141
import org.apache.arrow.consumers.NullableTypeConsumer;
4242
import org.apache.arrow.memory.BufferAllocator;
@@ -246,19 +246,15 @@ public static VectorSchemaRoot avroToArrowVectors(Schema schema, Decoder decoder
246246

247247
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
248248

249-
int valueCount = 0;
250-
while (true) {
251-
try {
252-
for (Consumer consumer : consumers) {
253-
consumer.consume(decoder);
254-
}
255-
valueCount++;
256-
//reach end will throw EOFException.
257-
} catch (EOFException eofException) {
258-
root.setRowCount(valueCount);
259-
break;
260-
}
249+
CompositeAvroConsumer compositeConsumer = null;
250+
try {
251+
compositeConsumer = new CompositeAvroConsumer(consumers);
252+
compositeConsumer.consume(decoder, root);
253+
} catch (Exception e) {
254+
compositeConsumer.close();
255+
throw new RuntimeException("Error occurs while consume process.", e);
261256
}
257+
262258
return root;
263259
}
264260
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,9 @@ public FieldVector getVector() {
6363
return this.vector;
6464
}
6565

66+
@Override
67+
public void close() throws Exception {
68+
writer.close();
69+
}
70+
6671
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,9 @@ public void setPosition(int index) {
7979
public FieldVector getVector() {
8080
return vector;
8181
}
82+
83+
@Override
84+
public void close() throws Exception {
85+
writer.close();
86+
}
8287
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,9 @@ public void setPosition(int index) {
6262
public FieldVector getVector() {
6363
return vector;
6464
}
65+
66+
@Override
67+
public void close() throws Exception {
68+
writer.close();
69+
}
6570
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,9 @@ public void setPosition(int index) {
6262
public FieldVector getVector() {
6363
return this.vector;
6464
}
65+
66+
@Override
67+
public void close() throws Exception {
68+
writer.close();
69+
}
6570
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,9 @@ public void setPosition(int index) {
6262
public FieldVector getVector() {
6363
return this.vector;
6464
}
65+
66+
@Override
67+
public void close() throws Exception {
68+
writer.close();
69+
}
6570
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,9 @@ public void setPosition(int index) {
6262
public FieldVector getVector() {
6363
return this.vector;
6464
}
65+
66+
@Override
67+
public void close() throws Exception {
68+
writer.close();
69+
}
6570
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,9 @@ public void setPosition(int index) {}
4848
public FieldVector getVector() {
4949
return this.vector;
5050
}
51+
52+
@Override
53+
public void close() {
54+
vector.close();
55+
}
5156
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,9 @@ public void setPosition(int index) {
8080
public FieldVector getVector() {
8181
return this.vector;
8282
}
83+
84+
@Override
85+
public void close() throws Exception {
86+
writer.close();
87+
}
8388
}

java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
*/
3232
public class AvroUnionsConsumer implements Consumer {
3333

34-
private Consumer[] indexDelegates;
34+
private Consumer[] delegates;
3535
private Types.MinorType[] types;
3636

3737
private UnionWriter writer;
@@ -40,11 +40,11 @@ public class AvroUnionsConsumer implements Consumer {
4040
/**
4141
* Instantiate a AvroUnionConsumer.
4242
*/
43-
public AvroUnionsConsumer(UnionVector vector, Consumer[] indexDelegates, Types.MinorType[] types) {
43+
public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorType[] types) {
4444

4545
this.writer = new UnionWriter(vector);
4646
this.vector = vector;
47-
this.indexDelegates = indexDelegates;
47+
this.delegates = delegates;
4848
this.types = types;
4949
}
5050

@@ -53,7 +53,7 @@ public void consume(Decoder decoder) throws IOException {
5353
int fieldIndex = decoder.readInt();
5454
int position = writer.getPosition();
5555

56-
Consumer delegate = indexDelegates[fieldIndex];
56+
Consumer delegate = delegates[fieldIndex];
5757

5858
vector.setType(position, types[fieldIndex]);
5959
// In UnionVector we need to set sub vector writer position before consume a value
@@ -80,4 +80,12 @@ public FieldVector getVector() {
8080
vector.setValueCount(writer.getPosition());
8181
return this.vector;
8282
}
83+
84+
@Override
85+
public void close() throws Exception {
86+
writer.close();
87+
for (Consumer delegate: delegates) {
88+
delegate.close();
89+
}
90+
}
8391
}

0 commit comments

Comments
 (0)