-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Description
Describe the bug, including details regarding any error messages, version, and platform.
Background
I get some data loss reports after upgrading the internal Spark's Parquet from 1.13.1 to 1.14.3, after some experiments, I believe this should be a bug on the Parquet side, and it could be worked around by disabling spark.sql.parquet.filterPushdown.
Analysis
With some debugging, I think the issue was introduced by PARQUET-2432(#1278).
The issue is, during the evaluation of DictionaryFilter.canDrop(this happens when reading a column that has PLAIN_DICTIONARY with pushed predications), when dict size exceeds 8k, only the head 8k was copied
parquet-java/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
Line 113 in 274dc51
| return new DictionaryPage(dict.getBytes().copy(releaser), dict.getDictionarySize(), dict.getEncoding()); |
the correct data
...
|00001fd0| 34 32 35 34 30 39 0a 00 00 00 37 32 39 33 39 38 |425409....729398|
|00001fe0| 39 35 33 30 0a 00 00 00 39 38 32 30 31 36 39 36 |9530....98201696|
|00001ff0| 34 34 0a 00 00 00 37 33 39 31 32 34 37 38 30 36 |44....7391247806|
|00002000| 0b 00 00 00 31 32 35 32 35 31 36 31 34 31 36 0a |....12525161416.|
|00002010| 00 00 00 38 37 38 35 35 34 34 36 34 35 0b 00 00 |...8785544645...|
|00002020| 00 31 32 32 38 30 38 37 35 39 30 32 0b 00 00 00 |.12280875902....|
...
the copied data
...
|00001fd0| 34 32 35 34 30 39 0a 00 00 00 37 32 39 33 39 38 |425409....729398|
|00001fe0| 39 35 33 30 0a 00 00 00 39 38 32 30 31 36 39 36 |9530....98201696|
|00001ff0| 34 34 0a 00 00 00 37 33 39 31 32 34 37 38 30 36 |44....7391247806|
|00002000| 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
|00002010| 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
|00002020| 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
...
the root cause is
parquet-java/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
Line 379 in 274dc51
| Channels.newChannel(in).read(workBuf); |
may not read fully if the underlying InputStream's available method always returns 0
private static class ReadableByteChannelImpl
extends AbstractInterruptibleChannel // Not really interruptible
implements ReadableByteChannel
{
...
private static final int TRANSFER_SIZE = 8192;
public int read(ByteBuffer dst) throws IOException {
if (!isOpen()) {
throw new ClosedChannelException();
}
int len = dst.remaining();
int totalRead = 0;
int bytesRead = 0;
synchronized (readLock) {
while (totalRead < len) {
int bytesToRead = Math.min((len - totalRead),
TRANSFER_SIZE);
if (buf.length < bytesToRead)
buf = new byte[bytesToRead];
if ((totalRead > 0) && !(in.available() > 0))
break; // block at most once
try {
begin();
bytesRead = in.read(buf, 0, bytesToRead);
} finally {
end(bytesRead > 0);
}
if (bytesRead < 0)
break;
else
totalRead += bytesRead;
dst.put(buf, 0, bytesRead);
}
if ((bytesRead < 0) && (totalRead == 0))
return -1;
return totalRead;
}
}
...
}
Component(s)
Core