Skip to content

Commit 9683f15

Browse files
committed
Orc: Support row group bloom filters
1 parent 2634417 commit 9683f15

File tree

4 files changed

+133
-3
lines changed

4 files changed

+133
-3
lines changed

core/src/main/java/org/apache/iceberg/TableProperties.java

+4
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ private TableProperties() {}
177177
public static final String AVRO_COMPRESSION_LEVEL_DEFAULT = null;
178178

179179
public static final String ORC_STRIPE_SIZE_BYTES = "write.orc.stripe-size-bytes";
180+
181+
public static final String ORC_BLOOM_FILTER_COLUMNS = "write.orc.bloom.filter.columns";
182+
public static final String ORC_BLOOM_FILTER_FPP = "write.orc.bloom.filter.fpp";
183+
180184
public static final String DELETE_ORC_STRIPE_SIZE_BYTES = "write.delete.orc.stripe-size-bytes";
181185
public static final long ORC_STRIPE_SIZE_BYTES_DEFAULT = 64L * 1024 * 1024; // 64 MB
182186

docs/configuration.md

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ Iceberg tables support table properties to configure table behavior, like the de
6464
| write.orc.block-size-bytes | 268435456 (256 MB) | Define the default file system block size for ORC files |
6565
| write.orc.compression-codec | zlib | ORC compression codec: zstd, lz4, lzo, zlib, snappy, none |
6666
| write.orc.compression-strategy | speed | ORC compression strategy: speed, compression |
67+
| write.orc.bloom.filter.columns | (not set) | Comma separated list of column names for which a Bloom filter must be created |
68+
| write.orc.bloom.filter.fpp | 0.05 | False positive probability for Bloom filter (must > 0.0 and < 1.0) |
6769
| write.location-provider.impl | null | Optional custom implementation for LocationProvider |
6870
| write.metadata.compression-codec | none | Metadata compression codec; none or gzip |
6971
| write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full |

orc/src/main/java/org/apache/iceberg/orc/ORC.java

+50-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import static org.apache.iceberg.TableProperties.DELETE_ORC_WRITE_BATCH_SIZE;
2626
import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES;
2727
import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT;
28+
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
29+
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP;
2830
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
2931
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_DEFAULT;
3032
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
@@ -202,6 +204,8 @@ public <D> FileAppender<D> build() {
202204
OrcConf.COMPRESS.setString(conf, context.compressionKind().name());
203205
OrcConf.COMPRESSION_STRATEGY.setString(conf, context.compressionStrategy().name());
204206
OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, overwrite);
207+
OrcConf.BLOOM_FILTER_COLUMNS.setString(conf, context.bloomFilterColumns());
208+
OrcConf.BLOOM_FILTER_FPP.setDouble(conf, context.bloomFilterFpp());
205209

206210
return new OrcFileAppender<>(
207211
schema,
@@ -220,6 +224,10 @@ private static class Context {
220224
private final CompressionKind compressionKind;
221225
private final CompressionStrategy compressionStrategy;
222226

227+
private final String bloomFilterColumns;
228+
229+
private final double bloomFilterFpp;
230+
223231
public long stripeSize() {
224232
return stripeSize;
225233
}
@@ -240,17 +248,29 @@ public CompressionStrategy compressionStrategy() {
240248
return compressionStrategy;
241249
}
242250

251+
public String bloomFilterColumns() {
252+
return bloomFilterColumns;
253+
}
254+
255+
public double bloomFilterFpp() {
256+
return bloomFilterFpp;
257+
}
258+
243259
private Context(
244260
long stripeSize,
245261
long blockSize,
246262
int vectorizedRowBatchSize,
247263
CompressionKind compressionKind,
248-
CompressionStrategy compressionStrategy) {
264+
CompressionStrategy compressionStrategy,
265+
String bloomFilterColumns,
266+
double bloomFilterFpp) {
249267
this.stripeSize = stripeSize;
250268
this.blockSize = blockSize;
251269
this.vectorizedRowBatchSize = vectorizedRowBatchSize;
252270
this.compressionKind = compressionKind;
253271
this.compressionStrategy = compressionStrategy;
272+
this.bloomFilterColumns = bloomFilterColumns;
273+
this.bloomFilterFpp = bloomFilterFpp;
254274
}
255275

256276
static Context dataContext(Map<String, String> config) {
@@ -285,9 +305,24 @@ static Context dataContext(Map<String, String> config) {
285305
strategyAsString =
286306
PropertyUtil.propertyAsString(config, ORC_COMPRESSION_STRATEGY, strategyAsString);
287307
CompressionStrategy compressionStrategy = toCompressionStrategy(strategyAsString);
308+
String bloomFilterColumns =
309+
PropertyUtil.propertyAsString(config, OrcConf.BLOOM_FILTER_COLUMNS.getAttribute(), "");
310+
bloomFilterColumns =
311+
PropertyUtil.propertyAsString(config, ORC_BLOOM_FILTER_COLUMNS, bloomFilterColumns);
312+
313+
double bloomFilterFpp =
314+
PropertyUtil.propertyAsDouble(config, OrcConf.BLOOM_FILTER_FPP.getAttribute(), 0.05);
315+
bloomFilterFpp =
316+
PropertyUtil.propertyAsDouble(config, ORC_BLOOM_FILTER_FPP, bloomFilterFpp);
288317

289318
return new Context(
290-
stripeSize, blockSize, vectorizedRowBatchSize, compressionKind, compressionStrategy);
319+
stripeSize,
320+
blockSize,
321+
vectorizedRowBatchSize,
322+
compressionKind,
323+
compressionStrategy,
324+
bloomFilterColumns,
325+
bloomFilterFpp);
291326
}
292327

293328
static Context deleteContext(Map<String, String> config) {
@@ -316,9 +351,21 @@ static Context deleteContext(Map<String, String> config) {
316351
strategyAsString != null
317352
? toCompressionStrategy(strategyAsString)
318353
: dataContext.compressionStrategy();
354+
String bloomFilterColumns =
355+
PropertyUtil.propertyAsString(config, OrcConf.BLOOM_FILTER_COLUMNS.getAttribute(), "");
356+
double bloomFilterFpp =
357+
PropertyUtil.propertyAsDouble(config, OrcConf.BLOOM_FILTER_FPP.getAttribute(), 0.05);
358+
bloomFilterFpp =
359+
PropertyUtil.propertyAsDouble(config, ORC_BLOOM_FILTER_FPP, bloomFilterFpp);
319360

320361
return new Context(
321-
stripeSize, blockSize, vectorizedRowBatchSize, compressionKind, compressionStrategy);
362+
stripeSize,
363+
blockSize,
364+
vectorizedRowBatchSize,
365+
compressionKind,
366+
compressionStrategy,
367+
bloomFilterColumns,
368+
bloomFilterFpp);
322369
}
323370

324371
private static CompressionKind toCompressionKind(String codecAsString) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.orc;
20+
21+
import static org.apache.iceberg.types.Types.NestedField.required;
22+
23+
import java.io.File;
24+
import java.lang.reflect.Field;
25+
import org.apache.iceberg.Files;
26+
import org.apache.iceberg.Schema;
27+
import org.apache.iceberg.data.Record;
28+
import org.apache.iceberg.data.orc.GenericOrcWriter;
29+
import org.apache.iceberg.io.FileAppender;
30+
import org.apache.iceberg.types.Types;
31+
import org.apache.orc.impl.WriterImpl;
32+
import org.junit.Assert;
33+
import org.junit.Rule;
34+
import org.junit.Test;
35+
import org.junit.rules.TemporaryFolder;
36+
37+
public class TestBloomFilter {
38+
private static final Schema DATA_SCHEMA =
39+
new Schema(
40+
required(100, "id", Types.LongType.get()),
41+
required(101, "name", Types.StringType.get()),
42+
required(102, "price", Types.DoubleType.get()));
43+
44+
@Rule public TemporaryFolder temp = new TemporaryFolder();
45+
46+
@Test
47+
public void testWriteOption() throws Exception {
48+
File testFile = temp.newFile();
49+
Assert.assertTrue("Delete should succeed", testFile.delete());
50+
51+
try (FileAppender<Record> writer =
52+
ORC.write(Files.localOutput(testFile))
53+
.createWriterFunc(GenericOrcWriter::buildWriter)
54+
.schema(DATA_SCHEMA)
55+
.set("write.orc.bloom.filter.columns", "id,name")
56+
.set("write.orc.bloom.filter.fpp", "0.04")
57+
.build()) {
58+
59+
Class clazzOrcFileAppender = Class.forName("org.apache.iceberg.orc.OrcFileAppender");
60+
Field writerField = clazzOrcFileAppender.getDeclaredField("writer");
61+
writerField.setAccessible(true);
62+
WriterImpl orcWriter = (WriterImpl) writerField.get(writer);
63+
64+
Class clazzWriterImpl = Class.forName("org.apache.orc.impl.WriterImpl");
65+
Field bloomFilterColumnsField = clazzWriterImpl.getDeclaredField("bloomFilterColumns");
66+
Field bloomFilterFppField = clazzWriterImpl.getDeclaredField("bloomFilterFpp");
67+
bloomFilterColumnsField.setAccessible(true);
68+
bloomFilterFppField.setAccessible(true);
69+
boolean[] bloomFilterColumns = (boolean[]) bloomFilterColumnsField.get(orcWriter);
70+
double bloomFilterFpp = (double) bloomFilterFppField.get(orcWriter);
71+
72+
Assert.assertTrue(bloomFilterColumns[1]);
73+
Assert.assertTrue(bloomFilterColumns[2]);
74+
Assert.assertEquals(0.04, bloomFilterFpp, 1e-15);
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)