Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC: Add compression properties #4273

Merged
merged 5 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ private TableProperties() {
public static final String DELETE_ORC_WRITE_BATCH_SIZE = "write.delete.orc.vectorized.batch-size";
public static final int ORC_WRITE_BATCH_SIZE_DEFAULT = 1024;

public static final String ORC_COMPRESSION = "write.orc.compression-codec";
public static final String DELETE_ORC_COMPRESSION = "write.delete.orc.compression-codec";
public static final String ORC_COMPRESSION_DEFAULT = "zlib";

public static final String ORC_COMPRESSION_STRATEGY = "write.orc.compression-strategy";
public static final String DELETE_ORC_COMPRESSION_STRATEGY = "write.delete.orc.compression-strategy";
public static final String ORC_COMPRESSION_STRATEGY_DEFAULT = "speed";

public static final String SPLIT_SIZE = "read.split.target-size";
public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB

Expand Down
2 changes: 2 additions & 0 deletions docs/tables/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.avro.compression-level | null | Avro compression level |
| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes |
| write.orc.block-size-bytes | 268435456 (256 MB) | Define the default file system block size for ORC files |
| write.orc.compression-codec | zlib | ORC compression codec: zstd, lz4, lzo, zlib, snappy, none |
| write.orc.compression-strategy | speed | ORC compression strategy: speed, compression |
| write.location-provider.impl | null | Optional custom implemention for LocationProvider |
| write.metadata.compression-codec | none | Metadata compression codec; none or gzip |
| write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full |
Expand Down
102 changes: 84 additions & 18 deletions orc/src/main/java/org/apache/iceberg/orc/ORC.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -58,13 +59,31 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcFile.CompressionStrategy;
import org.apache.orc.OrcFile.ReaderOptions;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import static org.apache.iceberg.TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_ORC_WRITE_BATCH_SIZE;
import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_STRIPE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.ORC_STRIPE_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_WRITE_BATCH_SIZE;
import static org.apache.iceberg.TableProperties.ORC_WRITE_BATCH_SIZE_DEFAULT;

@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class ORC {

Expand Down Expand Up @@ -173,16 +192,18 @@ public <D> FileAppender<D> build() {
}

// for compatibility
if (conf.get(VECTOR_ROW_BATCH_SIZE) != null && config.get(TableProperties.ORC_WRITE_BATCH_SIZE) == null) {
config.put(TableProperties.ORC_WRITE_BATCH_SIZE, conf.get(VECTOR_ROW_BATCH_SIZE));
if (conf.get(VECTOR_ROW_BATCH_SIZE) != null && config.get(ORC_WRITE_BATCH_SIZE) == null) {
config.put(ORC_WRITE_BATCH_SIZE, conf.get(VECTOR_ROW_BATCH_SIZE));
}

// Map Iceberg properties to pass down to the ORC writer
Context context = createContextFunc.apply(config);
conf.setLong(OrcConf.STRIPE_SIZE.getAttribute(), context.stripeSize());
conf.setLong(OrcConf.BLOCK_SIZE.getAttribute(), context.blockSize());

conf.setBoolean(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), overwrite);
OrcConf.STRIPE_SIZE.setLong(conf, context.stripeSize());
OrcConf.BLOCK_SIZE.setLong(conf, context.blockSize());
OrcConf.COMPRESS.setString(conf, context.compressionKind().name());
OrcConf.COMPRESSION_STRATEGY.setString(conf, context.compressionStrategy().name());
OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, overwrite);

return new OrcFileAppender<>(schema,
this.file, createWriterFunc, conf, metadata,
Expand All @@ -193,6 +214,8 @@ private static class Context {
private final long stripeSize;
private final long blockSize;
private final int vectorizedRowBatchSize;
private final CompressionKind compressionKind;
private final CompressionStrategy compressionStrategy;

public long stripeSize() {
return stripeSize;
Expand All @@ -206,43 +229,86 @@ public int vectorizedRowBatchSize() {
return vectorizedRowBatchSize;
}

private Context(long stripeSize, long blockSize, int vectorizedRowBatchSize) {
public CompressionKind compressionKind() {
return compressionKind;
}

public CompressionStrategy compressionStrategy() {
return compressionStrategy;
}

private Context(long stripeSize, long blockSize, int vectorizedRowBatchSize,
CompressionKind compressionKind, CompressionStrategy compressionStrategy) {
this.stripeSize = stripeSize;
this.blockSize = blockSize;
this.vectorizedRowBatchSize = vectorizedRowBatchSize;
this.compressionKind = compressionKind;
this.compressionStrategy = compressionStrategy;
}

static Context dataContext(Map<String, String> config) {
long stripeSize = PropertyUtil.propertyAsLong(config, OrcConf.STRIPE_SIZE.getAttribute(),
TableProperties.ORC_STRIPE_SIZE_BYTES_DEFAULT);
stripeSize = PropertyUtil.propertyAsLong(config, TableProperties.ORC_STRIPE_SIZE_BYTES, stripeSize);
ORC_STRIPE_SIZE_BYTES_DEFAULT);
stripeSize = PropertyUtil.propertyAsLong(config, ORC_STRIPE_SIZE_BYTES, stripeSize);
Preconditions.checkArgument(stripeSize > 0, "Stripe size must be > 0");

long blockSize = PropertyUtil.propertyAsLong(config, OrcConf.BLOCK_SIZE.getAttribute(),
TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT);
blockSize = PropertyUtil.propertyAsLong(config, TableProperties.ORC_BLOCK_SIZE_BYTES, blockSize);
ORC_BLOCK_SIZE_BYTES_DEFAULT);
blockSize = PropertyUtil.propertyAsLong(config, ORC_BLOCK_SIZE_BYTES, blockSize);
Preconditions.checkArgument(blockSize > 0, "Block size must be > 0");

int vectorizedRowBatchSize = PropertyUtil.propertyAsInt(config,
TableProperties.ORC_WRITE_BATCH_SIZE, TableProperties.ORC_WRITE_BATCH_SIZE_DEFAULT);
ORC_WRITE_BATCH_SIZE, ORC_WRITE_BATCH_SIZE_DEFAULT);
Preconditions.checkArgument(vectorizedRowBatchSize > 0, "VectorizedRow batch size must be > 0");

return new Context(stripeSize, blockSize, vectorizedRowBatchSize);
String codecAsString = PropertyUtil.propertyAsString(config, OrcConf.COMPRESS.getAttribute(),
ORC_COMPRESSION_DEFAULT);
codecAsString = PropertyUtil.propertyAsString(config, ORC_COMPRESSION, codecAsString);
CompressionKind compressionKind = toCompressionKind(codecAsString);

String strategyAsString = PropertyUtil.propertyAsString(config, OrcConf.COMPRESSION_STRATEGY.getAttribute(),
ORC_COMPRESSION_STRATEGY_DEFAULT);
strategyAsString = PropertyUtil.propertyAsString(config, ORC_COMPRESSION_STRATEGY, strategyAsString);
CompressionStrategy compressionStrategy = toCompressionStrategy(strategyAsString);

return new Context(stripeSize, blockSize, vectorizedRowBatchSize, compressionKind, compressionStrategy);
}

static Context deleteContext(Map<String, String> config) {
Context dataContext = dataContext(config);

long stripeSize = PropertyUtil.propertyAsLong(config,
TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES, dataContext.stripeSize());
long stripeSize = PropertyUtil.propertyAsLong(config, DELETE_ORC_STRIPE_SIZE_BYTES, dataContext.stripeSize());

long blockSize = PropertyUtil.propertyAsLong(config,
TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES, dataContext.blockSize());
long blockSize = PropertyUtil.propertyAsLong(config, DELETE_ORC_BLOCK_SIZE_BYTES, dataContext.blockSize());

int vectorizedRowBatchSize = PropertyUtil.propertyAsInt(config,
TableProperties.DELETE_ORC_WRITE_BATCH_SIZE, dataContext.vectorizedRowBatchSize());
DELETE_ORC_WRITE_BATCH_SIZE, dataContext.vectorizedRowBatchSize());

String codecAsString = config.get(DELETE_ORC_COMPRESSION);
CompressionKind compressionKind = codecAsString != null ? toCompressionKind(codecAsString) :
dataContext.compressionKind();

String strategyAsString = config.get(DELETE_ORC_COMPRESSION_STRATEGY);
CompressionStrategy compressionStrategy =
strategyAsString != null ? toCompressionStrategy(strategyAsString) : dataContext.compressionStrategy();

return new Context(stripeSize, blockSize, vectorizedRowBatchSize, compressionKind, compressionStrategy);
}

private static CompressionKind toCompressionKind(String codecAsString) {
try {
return CompressionKind.valueOf(codecAsString.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unsupported compression codec: " + codecAsString);
}
}

return new Context(stripeSize, blockSize, vectorizedRowBatchSize);
private static CompressionStrategy toCompressionStrategy(String strategyAsString) {
try {
return CompressionStrategy.valueOf(strategyAsString.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unsupported compression strategy: " + strategyAsString);
}
}
}
}
Expand Down
39 changes: 31 additions & 8 deletions orc/src/test/java/org/apache/iceberg/orc/TestTableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.orc;

import java.io.File;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
Expand All @@ -35,7 +36,9 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile.CompressionStrategy;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -53,12 +56,20 @@ public class TestTableProperties {

@Test
public void testOrcTableProperties() throws Exception {
Long stripeSizeBytes = 32L * 1024 * 1024;
Long blockSizeBytes = 128L * 1024 * 1024;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed Long to long because what OrcConf.XXX#getLong returns is a long, and I got

Ambiguous method call. Both assertEquals(Object, Object) in Assert and assertEquals(long, long) in Assert match

when using both Long and long as parameters for Assert#assertEquals.

Random random = new Random();
int numOfCodecs = CompressionKind.values().length;
int numOfStrategies = CompressionStrategy.values().length;

long stripeSizeBytes = 32L * 1024 * 1024;
long blockSizeBytes = 128L * 1024 * 1024;
String codecAsString = CompressionKind.values()[random.nextInt(numOfCodecs)].name();
String strategyAsString = CompressionStrategy.values()[random.nextInt(numOfStrategies)].name();

ImmutableMap<String, String> properties = ImmutableMap.of(
TableProperties.ORC_STRIPE_SIZE_BYTES, String.valueOf(stripeSizeBytes),
TableProperties.ORC_BLOCK_SIZE_BYTES, String.valueOf(blockSizeBytes),
TableProperties.ORC_COMPRESSION, codecAsString,
TableProperties.ORC_COMPRESSION_STRATEGY, strategyAsString,
TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.name());

File folder = TEMPORARY_FOLDER.newFolder();
Expand All @@ -82,19 +93,29 @@ public void testOrcTableProperties() throws Exception {
DynFields.builder().hiddenImpl(writer.getClass(), "conf").build(writer);

Configuration configuration = confField.get();
Assert.assertEquals(String.valueOf(blockSizeBytes), configuration.get(OrcConf.BLOCK_SIZE.getAttribute()));
Assert.assertEquals(String.valueOf(stripeSizeBytes), configuration.get(OrcConf.STRIPE_SIZE.getAttribute()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to using OrcConf to get value directly so no need to know property key detail.

Assert.assertEquals(blockSizeBytes, OrcConf.BLOCK_SIZE.getLong(configuration));
Assert.assertEquals(stripeSizeBytes, OrcConf.STRIPE_SIZE.getLong(configuration));
Assert.assertEquals(codecAsString, OrcConf.COMPRESS.getString(configuration));
Assert.assertEquals(strategyAsString, OrcConf.COMPRESSION_STRATEGY.getString(configuration));
Assert.assertEquals(FileFormat.ORC.name(), configuration.get(TableProperties.DEFAULT_FILE_FORMAT));
}

@Test
public void testOrcTableDeleteProperties() throws Exception {
Long stripeSizeBytes = 32L * 1024 * 1024;
Long blockSizeBytes = 128L * 1024 * 1024;
Random random = new Random();
int numOfCodecs = CompressionKind.values().length;
int numOfStrategies = CompressionStrategy.values().length;

long stripeSizeBytes = 32L * 1024 * 1024;
long blockSizeBytes = 128L * 1024 * 1024;
String codecAsString = CompressionKind.values()[random.nextInt(numOfCodecs)].name();
String strategyAsString = CompressionStrategy.values()[random.nextInt(numOfStrategies)].name();

ImmutableMap<String, String> properties = ImmutableMap.of(
TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES, String.valueOf(stripeSizeBytes),
TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES, String.valueOf(blockSizeBytes),
TableProperties.DELETE_ORC_COMPRESSION, codecAsString,
TableProperties.DELETE_ORC_COMPRESSION_STRATEGY, strategyAsString,
TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.name());

File folder = TEMPORARY_FOLDER.newFolder();
Expand Down Expand Up @@ -123,8 +144,10 @@ public void testOrcTableDeleteProperties() throws Exception {
DynFields.builder().hiddenImpl(orcFileAppender.getClass(), "conf").build(orcFileAppender);

Configuration configuration = confField.get();
Assert.assertEquals(String.valueOf(blockSizeBytes), configuration.get(OrcConf.BLOCK_SIZE.getAttribute()));
Assert.assertEquals(String.valueOf(stripeSizeBytes), configuration.get(OrcConf.STRIPE_SIZE.getAttribute()));
Assert.assertEquals(blockSizeBytes, OrcConf.BLOCK_SIZE.getLong(configuration));
Assert.assertEquals(stripeSizeBytes, OrcConf.STRIPE_SIZE.getLong(configuration));
Assert.assertEquals(codecAsString, OrcConf.COMPRESS.getString(configuration));
Assert.assertEquals(strategyAsString, OrcConf.COMPRESSION_STRATEGY.getString(configuration));
Assert.assertEquals(FileFormat.ORC.name(), configuration.get(TableProperties.DEFAULT_FILE_FORMAT));
}
}