Skip to content

Commit 2e765cc

Browse files
authored
GH-3040: DictionaryFilter.canDrop may return false positive result when dict size exceeds 8k (#3041)
* GH-3040: DictionaryFilter.canDrop may return false positive result when dict size exceeds 8k * style * check bytesRead * import
1 parent bdc9346 commit 2e765cc

File tree

3 files changed

+60
-1
lines changed

3 files changed

+60
-1
lines changed

parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020

2121
import java.io.ByteArrayOutputStream;
2222
import java.io.DataInputStream;
23+
import java.io.EOFException;
2324
import java.io.IOException;
2425
import java.io.InputStream;
2526
import java.io.OutputStream;
2627
import java.nio.ByteBuffer;
2728
import java.nio.ByteOrder;
2829
import java.nio.channels.Channels;
30+
import java.nio.channels.ReadableByteChannel;
2931
import java.nio.channels.WritableByteChannel;
3032
import java.util.Arrays;
3133
import java.util.List;
@@ -376,7 +378,15 @@ void writeInto(ByteBuffer buffer) {
376378
ByteBuffer workBuf = buffer.duplicate();
377379
int pos = buffer.position();
378380
workBuf.limit(pos + byteCount);
379-
Channels.newChannel(in).read(workBuf);
381+
ReadableByteChannel channel = Channels.newChannel(in);
382+
int remaining = byteCount;
383+
while (remaining > 0) {
384+
int bytesRead = channel.read(workBuf);
385+
if (bytesRead < 0) {
386+
throw new EOFException("Reached the end of stream with " + remaining + " bytes left to read");
387+
}
388+
remaining -= bytesRead;
389+
}
380390
buffer.position(pos + byteCount);
381391
} catch (IOException e) {
382392
new RuntimeException("Exception occurred during reading input stream", e);
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.parquet.bytes;
21+
22+
import java.io.ByteArrayInputStream;
23+
24+
public class AvailableAgnosticInputStream extends ByteArrayInputStream {
25+
26+
public AvailableAgnosticInputStream(byte[] buf) {
27+
super(buf);
28+
}
29+
30+
// In practice, there are some implementations always return 0 even if they has more data
31+
@Override
32+
public synchronized int available() {
33+
return 0;
34+
}
35+
}

parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,20 @@ public void testFromInputStream() throws IOException {
140140
validate(data, factory);
141141
}
142142

143+
@Test
144+
public void testFromLargeAvailableAgnosticInputStream() throws IOException {
145+
// allocate a bytes that large than
146+
// java.nio.channel.Channels.ReadableByteChannelImpl.TRANSFER_SIZE = 8192
147+
byte[] data = new byte[9 * 1024];
148+
RANDOM.nextBytes(data);
149+
byte[] input = new byte[data.length + 10];
150+
RANDOM.nextBytes(input);
151+
System.arraycopy(data, 0, input, 0, data.length);
152+
Supplier<BytesInput> factory = () -> BytesInput.from(new AvailableAgnosticInputStream(input), 9 * 1024);
153+
154+
validate(data, factory);
155+
}
156+
143157
@Test
144158
public void testFromByteArrayOutputStream() throws IOException {
145159
byte[] data = new byte[1000];

0 commit comments

Comments
 (0)