Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC: Add compression properties #4273

Merged
merged 5 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ private TableProperties() {
public static final String DELETE_ORC_WRITE_BATCH_SIZE = "write.delete.orc.vectorized.batch-size";
public static final int ORC_WRITE_BATCH_SIZE_DEFAULT = 1024;

public static final String ORC_COMPRESSION = "write.orc.compression-codec";
public static final String DELETE_ORC_COMPRESSION = "write.delete.orc.compression-codec";
public static final String ORC_COMPRESSION_DEFAULT = "zlib";

public static final String ORC_COMPRESSION_STRATEGY = "write.orc.compression-strategy";
public static final String DELETE_ORC_COMPRESSION_STRATEGY = "write.delete.orc.compression-strategy";
public static final String ORC_COMPRESSION_STRATEGY_DEFAULT = "speed";

public static final String SPLIT_SIZE = "read.split.target-size";
public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB

Expand Down
2 changes: 2 additions & 0 deletions docs/tables/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.avro.compression-level | null | Avro compression level |
| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes |
| write.orc.block-size-bytes | 268435456 (256 MB) | Define the default file system block size for ORC files |
| write.orc.compression-codec | zlib | ORC compression codec: zstd, lz4, lzo, zlib, snappy, none |
| write.orc.compression-strategy | speed | ORC compression strategy: speed, compression |
| write.location-provider.impl | null | Optional custom implemention for LocationProvider |
| write.metadata.compression-codec | none | Metadata compression codec; none or gzip |
| write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full |
Expand Down
78 changes: 60 additions & 18 deletions orc/src/main/java/org/apache/iceberg/orc/ORC.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -65,6 +66,22 @@
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import static org.apache.iceberg.TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_ORC_WRITE_BATCH_SIZE;
import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_STRIPE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.ORC_STRIPE_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_WRITE_BATCH_SIZE;
import static org.apache.iceberg.TableProperties.ORC_WRITE_BATCH_SIZE_DEFAULT;

@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class ORC {

Expand Down Expand Up @@ -173,16 +190,18 @@ public <D> FileAppender<D> build() {
}

// for compatibility
if (conf.get(VECTOR_ROW_BATCH_SIZE) != null && config.get(TableProperties.ORC_WRITE_BATCH_SIZE) == null) {
config.put(TableProperties.ORC_WRITE_BATCH_SIZE, conf.get(VECTOR_ROW_BATCH_SIZE));
if (conf.get(VECTOR_ROW_BATCH_SIZE) != null && config.get(ORC_WRITE_BATCH_SIZE) == null) {
config.put(ORC_WRITE_BATCH_SIZE, conf.get(VECTOR_ROW_BATCH_SIZE));
}

// Map Iceberg properties to pass down to the ORC writer
Context context = createContextFunc.apply(config);
conf.setLong(OrcConf.STRIPE_SIZE.getAttribute(), context.stripeSize());
conf.setLong(OrcConf.BLOCK_SIZE.getAttribute(), context.blockSize());

conf.setBoolean(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), overwrite);
OrcConf.STRIPE_SIZE.setLong(conf, context.stripeSize());
OrcConf.BLOCK_SIZE.setLong(conf, context.blockSize());
OrcConf.COMPRESS.setString(conf, context.codecAsString().toUpperCase(Locale.ENGLISH));
OrcConf.COMPRESSION_STRATEGY.setString(conf, context.compressionStrategy().toUpperCase(Locale.ENGLISH));
OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, overwrite);

return new OrcFileAppender<>(schema,
this.file, createWriterFunc, conf, metadata,
Expand All @@ -193,6 +212,8 @@ private static class Context {
private final long stripeSize;
private final long blockSize;
private final int vectorizedRowBatchSize;
private final String codecAsString;
private final String compressionStrategy;

public long stripeSize() {
return stripeSize;
Expand All @@ -206,43 +227,64 @@ public int vectorizedRowBatchSize() {
return vectorizedRowBatchSize;
}

private Context(long stripeSize, long blockSize, int vectorizedRowBatchSize) {
public String codecAsString() {
return codecAsString;
}

public String compressionStrategy() {
return compressionStrategy;
}

private Context(long stripeSize, long blockSize, int vectorizedRowBatchSize,
String codecAsString, String compressionStrategy) {
this.stripeSize = stripeSize;
this.blockSize = blockSize;
this.vectorizedRowBatchSize = vectorizedRowBatchSize;
this.codecAsString = codecAsString;
this.compressionStrategy = compressionStrategy;
}

static Context dataContext(Map<String, String> config) {
long stripeSize = PropertyUtil.propertyAsLong(config, OrcConf.STRIPE_SIZE.getAttribute(),
TableProperties.ORC_STRIPE_SIZE_BYTES_DEFAULT);
stripeSize = PropertyUtil.propertyAsLong(config, TableProperties.ORC_STRIPE_SIZE_BYTES, stripeSize);
ORC_STRIPE_SIZE_BYTES_DEFAULT);
stripeSize = PropertyUtil.propertyAsLong(config, ORC_STRIPE_SIZE_BYTES, stripeSize);
Preconditions.checkArgument(stripeSize > 0, "Stripe size must be > 0");

long blockSize = PropertyUtil.propertyAsLong(config, OrcConf.BLOCK_SIZE.getAttribute(),
TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT);
blockSize = PropertyUtil.propertyAsLong(config, TableProperties.ORC_BLOCK_SIZE_BYTES, blockSize);
ORC_BLOCK_SIZE_BYTES_DEFAULT);
blockSize = PropertyUtil.propertyAsLong(config, ORC_BLOCK_SIZE_BYTES, blockSize);
Preconditions.checkArgument(blockSize > 0, "Block size must be > 0");

int vectorizedRowBatchSize = PropertyUtil.propertyAsInt(config,
TableProperties.ORC_WRITE_BATCH_SIZE, TableProperties.ORC_WRITE_BATCH_SIZE_DEFAULT);
ORC_WRITE_BATCH_SIZE, ORC_WRITE_BATCH_SIZE_DEFAULT);
Preconditions.checkArgument(vectorizedRowBatchSize > 0, "VectorizedRow batch size must be > 0");

return new Context(stripeSize, blockSize, vectorizedRowBatchSize);
String codecAsString = config.getOrDefault(OrcConf.COMPRESS.getAttribute(), ORC_COMPRESSION_DEFAULT);
codecAsString = config.getOrDefault(ORC_COMPRESSION, codecAsString);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will suggest to use the similar approach as parquet did: try to parse the string into a CompressionKind so that we can validate whether this value is a valid one in the correct place.

See: https://github.com/apache/iceberg/blob/master/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java#L392


String compressionStrategy = config.getOrDefault(OrcConf.COMPRESSION_STRATEGY.getAttribute(),
ORC_COMPRESSION_STRATEGY_DEFAULT);
compressionStrategy = config.getOrDefault(ORC_COMPRESSION_STRATEGY, compressionStrategy);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just follow the same config parse approach as the above config keys did ? I'm saying using the generic PropertyUtil.propertyAsString.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, please also use the CompressionStrategy enum to validate whether this configured value is a valid one ? ( Just as this comment said: https://github.com/apache/iceberg/pull/4273/files#r836994947 )


return new Context(stripeSize, blockSize, vectorizedRowBatchSize, codecAsString, compressionStrategy);
}

static Context deleteContext(Map<String, String> config) {
Context dataContext = dataContext(config);

long stripeSize = PropertyUtil.propertyAsLong(config,
TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES, dataContext.stripeSize());
long stripeSize = PropertyUtil.propertyAsLong(config, DELETE_ORC_STRIPE_SIZE_BYTES, dataContext.stripeSize());

long blockSize = PropertyUtil.propertyAsLong(config,
TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES, dataContext.blockSize());
long blockSize = PropertyUtil.propertyAsLong(config, DELETE_ORC_BLOCK_SIZE_BYTES, dataContext.blockSize());

int vectorizedRowBatchSize = PropertyUtil.propertyAsInt(config,
TableProperties.DELETE_ORC_WRITE_BATCH_SIZE, dataContext.vectorizedRowBatchSize());
DELETE_ORC_WRITE_BATCH_SIZE, dataContext.vectorizedRowBatchSize());

String codecAsString = config.getOrDefault(DELETE_ORC_COMPRESSION, dataContext.codecAsString());

String compressionStrategy = config.getOrDefault(DELETE_ORC_COMPRESSION_STRATEGY,
dataContext.compressionStrategy());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Please use the same approach to parse the codec & compression-strategy .


return new Context(stripeSize, blockSize, vectorizedRowBatchSize);
return new Context(stripeSize, blockSize, vectorizedRowBatchSize, codecAsString, compressionStrategy);
}
}
}
Expand Down