Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
9657b75
feat: support array_append (#1072)
NoeB Nov 13, 2024
c32bf0c
chore: Simplify CometShuffleMemoryAllocator to use Spark unified memo…
viirya Nov 14, 2024
f3da844
docs: Update benchmarking.md (#1085)
rluvaton-flarion Nov 14, 2024
2c832b4
feat: Require offHeap memory to be enabled (always use unified memory…
andygrove Nov 14, 2024
7cec285
test: Restore one test in CometExecSuite by adding COMET_SHUFFLE_MODE…
viirya Nov 15, 2024
10ef62a
Add changelog for 0.4.0 (#1089)
andygrove Nov 15, 2024
0c9a403
chore: Prepare for 0.5.0 development (#1090)
andygrove Nov 15, 2024
406ffef
build: Skip installation of spark-integration and fuzz testing modul…
parthchandra Nov 15, 2024
bfd7054
Add hint for finding the GPG key to use when publishing to maven (#1093)
andygrove Nov 15, 2024
59da6ce
docs: Update documentation for 0.4.0 release (#1096)
andygrove Nov 18, 2024
ca3a529
fix: Unsigned type related bugs (#1095)
kazuyukitanimura Nov 19, 2024
b64c13d
chore: Include first ScanExec batch in metrics (#1105)
andygrove Nov 20, 2024
19dd58d
chore: Improve CometScan metrics (#1100)
andygrove Nov 20, 2024
e602305
chore: Add custom metric for native shuffle fetching batches from JVM…
andygrove Nov 21, 2024
9990b34
feat: support array_insert (#1073)
SemyonSinchenko Nov 22, 2024
500895d
feat: enable decimal to decimal cast of different precision and scale…
himadripal Nov 22, 2024
7b1a290
docs: fix readme FGPA/FPGA typo (#1117)
gstvg Nov 24, 2024
5400fd7
fix: Use RDD partition index (#1112)
viirya Nov 25, 2024
ebdde77
fix: Various metrics bug fixes and improvements (#1111)
andygrove Dec 2, 2024
9b250c4
fix: Don't create CometScanExec for subclasses of ParquetFileFormat (…
Kimahriman Dec 2, 2024
95727aa
fix: Fix metrics regressions (#1132)
andygrove Dec 3, 2024
36a2307
docs: Add more technical detail and new diagram to Comet plugin overv…
andygrove Dec 3, 2024
2671e0c
Stop passing Java config map into native createPlan (#1101)
andygrove Dec 4, 2024
8d7bcb8
feat: Improve ScanExec native metrics (#1133)
andygrove Dec 6, 2024
587c29b
chore: Remove unused StringView struct (#1143)
andygrove Dec 6, 2024
b95dc1d
docs: Add some documentation explaining how shuffle works (#1148)
andygrove Dec 6, 2024
1c6c7a9
test: enable more Spark 4.0 tests (#1145)
kazuyukitanimura Dec 6, 2024
8d83cc1
chore: Refactor cast to use SparkCastOptions param (#1146)
andygrove Dec 6, 2024
21503ca
Enable more scenarios in CometExecBenchmark. (#1151)
mbutrovich Dec 7, 2024
73f1405
chore: Move more expressions from core crate to spark-expr crate (#1152)
andygrove Dec 9, 2024
5c45fdc
remove dead code (#1155)
andygrove Dec 10, 2024
2c1a6b9
fix: Spark 4.0-preview1 SPARK-47120 (#1156)
kazuyukitanimura Dec 11, 2024
49cf0d7
chore: Move string kernels and expressions to spark-expr crate (#1164)
andygrove Dec 12, 2024
7db9aa6
chore: Move remaining expressions to spark-expr crate + some minor re…
andygrove Dec 12, 2024
f1d0879
chore: Add ignored tests for reading complex types from Parquet (#1167)
andygrove Dec 12, 2024
b9ac78b
feat: Add Spark-compatible implementation of SchemaAdapterFactory (#1…
andygrove Dec 17, 2024
46a28db
fix: Document enabling comet explain plan usage in Spark (4.0) (#1176)
parthchandra Dec 17, 2024
655081b
test: enabling Spark tests with offHeap requirement (#1177)
kazuyukitanimura Dec 18, 2024
e297d23
feat: Improve shuffle metrics (second attempt) (#1175)
andygrove Dec 18, 2024
8f4a8a5
fix: stddev_pop should not directly return 0.0 when count is 1.0 (#1184)
viirya Dec 19, 2024
ea6d205
feat: Make native shuffle compression configurable and respect `spark…
andygrove Dec 20, 2024
053b7cc
minor: move shuffle classes from common to spark (#1193)
andygrove Dec 22, 2024
639fa2f
minor: refactor decodeBatches to make private in broadcast exchange (…
andygrove Dec 22, 2024
58dee73
minor: refactor prepare_output so that it does not require an Executi…
andygrove Dec 22, 2024
5432e03
fix: fix missing explanation for then branch in case when (#1200)
rluvaton Dec 27, 2024
103f82f
minor: remove unused source files (#1202)
andygrove Dec 28, 2024
5d2c909
chore: Upgrade to DataFusion 44.0.0-rc2 (#1154)
andygrove Dec 28, 2024
4f8ce75
feat: add support for array_contains expression (#1163)
dharanad Jan 2, 2025
9320aed
feat: Add a `spark.comet.exec.memoryPool` configuration for experimen…
Kontinuation Jan 3, 2025
2e0f00a
feat: Reenable tests for filtered SMJ anti join (#1211)
comphead Jan 3, 2025
4333dce
chore: Add safety check to CometBuffer (#1050)
viirya Jan 3, 2025
4b56c52
remove unreachable code (#1213)
andygrove Jan 4, 2025
5f1e998
test: Enable Comet by default except some tests in SparkSessionExten…
kazuyukitanimura Jan 4, 2025
e39ffa6
extract struct expressions to folders based on spark grouping (#1216)
rluvaton Jan 6, 2025
5c389d1
chore: extract static invoke expressions to folders based on spark gr…
rluvaton Jan 6, 2025
e72beb1
chore: Follow-on PR to fully enable onheap memory usage (#1210)
andygrove Jan 6, 2025
74a6a8d
feat: Move shuffle block decompression and decoding to native code an…
andygrove Jan 7, 2025
3f0d442
chore: extract agg_funcs expressions to folders based on spark groupi…
rluvaton Jan 7, 2025
4cf840f
extract datetime_funcs expressions to folders based on spark grouping…
rluvaton Jan 7, 2025
508db06
chore: use datafusion from crates.io (#1232)
rluvaton Jan 7, 2025
c19202c
chore: extract strings file to `strings_func` like in spark grouping …
rluvaton Jan 8, 2025
fbcf025
chore: extract predicate_functions expressions to folders based on sp…
rluvaton Jan 8, 2025
ca7b4a8
build(deps): bump protobuf version to 3.21.12 (#1234)
wForget Jan 8, 2025
c6acc9d
extract json_funcs expressions to folders based on spark grouping (#1…
rluvaton Jan 8, 2025
0a68f1c
test: Enable shuffle by default in Spark tests (#1240)
kazuyukitanimura Jan 9, 2025
e731b6e
chore: extract hash_funcs expressions to folders based on spark group…
rluvaton Jan 9, 2025
be48839
fix: Fall back to Spark for unsupported partition or sort expressions…
andygrove Jan 9, 2025
d15d051
perf: Improve query planning to more reliably fall back to columnar s…
andygrove Jan 9, 2025
d52038e
fix regression (#1259)
andygrove Jan 10, 2025
c25060e
feat: add support for array_remove expression (#1179)
jatin510 Jan 12, 2025
e8261fb
fix: Fall back to Spark for distinct aggregates (#1262)
andygrove Jan 13, 2025
d7a7812
feat: Implement custom RecordBatch serde for shuffle for improved per…
andygrove Jan 13, 2025
1eb932a
docs: Update TPC-H benchmark results (#1257)
andygrove Jan 13, 2025
9fe5420
fix: disable initCap by default (#1276)
kazuyukitanimura Jan 14, 2025
cbe50e1
chore: Add changelog for 0.5.0 (#1278)
andygrove Jan 14, 2025
08d892a
update TPC-DS results for 0.5.0 (#1277)
andygrove Jan 14, 2025
9c1f0ee
fix: cast timestamp to decimal is unsupported (#1281)
wForget Jan 14, 2025
017963a
Merge branch 'main' into comet-parquet-exec
parthchandra Jan 14, 2025
285396c
Fix build after merge
parthchandra Jan 14, 2025
b3703f5
Fix tests after merge
parthchandra Jan 15, 2025
2c83bdd
Fix plans after merge
parthchandra Jan 15, 2025
79717b8
fix partition id in execute plan after merge (from Andy Grove)
parthchandra Jan 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
19 changes: 6 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,30 +46,23 @@ The following chart shows the time it takes to run the 22 TPC-H queries against
using a single executor with 8 cores. See the [Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html)
for details of the environment used for these benchmarks.

When using Comet, the overall run time is reduced from 615 seconds to 364 seconds, a 1.7x speedup, with query 1
running 9x faster than Spark.
When using Comet, the overall run time is reduced from 640 seconds to 331 seconds, very close to a 2x speedup.

Running the same queries with DataFusion standalone (without Spark) using the same number of cores results in a 3.6x
speedup compared to Spark.
![](docs/source/_static/images/benchmark-results/0.5.0/tpch_allqueries.png)

Comet is not yet achieving full DataFusion speeds in all cases, but with future work we aim to provide a 2x-4x speedup
for a broader set of queries.
Here is a breakdown showing relative performance of Spark and Comet for each TPC-H query.

![](docs/source/_static/images/benchmark-results/0.4.0/tpch_allqueries.png)

Here is a breakdown showing relative performance of Spark, Comet, and DataFusion for each TPC-H query.

![](docs/source/_static/images/benchmark-results/0.4.0/tpch_queries_compare.png)
![](docs/source/_static/images/benchmark-results/0.5.0/tpch_queries_compare.png)

The following charts shows how much Comet currently accelerates each query from the benchmark.

### Relative speedup

![](docs/source/_static/images/benchmark-results/0.4.0/tpch_queries_speedup_rel.png)
![](docs/source/_static/images/benchmark-results/0.5.0/tpch_queries_speedup_rel.png)

### Absolute speedup

![](docs/source/_static/images/benchmark-results/0.4.0/tpch_queries_speedup_abs.png)
![](docs/source/_static/images/benchmark-results/0.5.0/tpch_queries_speedup_abs.png)

These benchmarks can be reproduced in any environment using the documentation in the
[Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html). We encourage
Expand Down
22 changes: 0 additions & 22 deletions common/src/main/java/org/apache/comet/parquet/ColumnReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,28 +172,6 @@ public void close() {

/** Returns a decoded {@link CometDecodedVector Comet vector}. */
public CometDecodedVector loadVector() {
// Only re-use Comet vector iff:
// 1. if we're not using dictionary encoding, since with dictionary encoding, the native
// side may fallback to plain encoding and the underlying memory address for the vector
// will change as result.
// 2. if the column type is of fixed width, in other words, string/binary are not supported
// since the native side may resize the vector and therefore change memory address.
// 3. if the last loaded vector contains null values: if values of last vector are all not
// null, Arrow C data API will skip loading the native validity buffer, therefore we
// should not re-use the vector in that case.
// 4. if the last loaded vector doesn't contain any null value, but the current vector also
// are all not null, which means we can also re-use the loaded vector.
// 5. if the new number of value is the same or smaller
if ((hadNull || currentNumNulls == 0)
&& currentVector != null
&& dictionary == null
&& currentVector.isFixedLength()
&& currentVector.numValues() >= currentNumValues) {
currentVector.setNumNulls(currentNumNulls);
currentVector.setNumValues(currentNumValues);
return currentVector;
}

LOG.debug("Reloading vector");

// Close the previous vector first to release struct memory allocated to import Arrow array &
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ public ConstantColumnReader(

public ConstantColumnReader(
DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) {
super(type, descriptor, useDecimal128);
super(type, descriptor, useDecimal128, true);
this.value = value;
}

ConstantColumnReader(
DataType type, ColumnDescriptor descriptor, int batchSize, boolean useDecimal128) {
super(type, descriptor, useDecimal128);
super(type, descriptor, useDecimal128, true);
this.batchSize = batchSize;
initNative();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ public class MetadataColumnReader extends AbstractColumnReader {
private ArrowArray array = null;
private ArrowSchema schema = null;

public MetadataColumnReader(DataType type, ColumnDescriptor descriptor, boolean useDecimal128) {
private boolean isConstant;

public MetadataColumnReader(
DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
super(type, descriptor, useDecimal128, false);

this.isConstant = isConstant;
}

@Override
Expand All @@ -62,7 +67,7 @@ public void readBatch(int total) {

Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
FieldVector fieldVector = Data.importVector(allocator, array, schema, null);
vector = new CometPlainVector(fieldVector, useDecimal128);
vector = new CometPlainVector(fieldVector, useDecimal128, false, isConstant);
}

vector.setNumValues(total);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class RowIndexColumnReader extends MetadataColumnReader {
private long offset;

public RowIndexColumnReader(StructField field, int batchSize, long[] indices) {
super(field.dataType(), TypeUtil.convertToParquet(field), false);
super(field.dataType(), TypeUtil.convertToParquet(field), false, false);
this.indices = indices;
setBatchSize(batchSize);
}
Expand Down
16 changes: 16 additions & 0 deletions common/src/main/java/org/apache/comet/vector/CometPlainVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ public class CometPlainVector extends CometDecodedVector {
private byte booleanByteCache;
private int booleanByteCacheIndex = -1;

private boolean isReused;

public CometPlainVector(ValueVector vector, boolean useDecimal128) {
this(vector, useDecimal128, false);
}

public CometPlainVector(ValueVector vector, boolean useDecimal128, boolean isUuid) {
this(vector, useDecimal128, isUuid, false);
}

public CometPlainVector(
ValueVector vector, boolean useDecimal128, boolean isUuid, boolean isReused) {
super(vector, vector.getField(), useDecimal128, isUuid);
// NullType doesn't have data buffer.
if (vector instanceof NullVector) {
Expand All @@ -52,6 +59,15 @@ public CometPlainVector(ValueVector vector, boolean useDecimal128, boolean isUui
}

isBaseFixedWidthVector = valueVector instanceof BaseFixedWidthVector;
this.isReused = isReused;
}

public boolean isReused() {
return isReused;
}

public void setReused(boolean isReused) {
this.isReused = isReused;
}

@Override
Expand Down
56 changes: 41 additions & 15 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ object CometConf extends ShimCometConf {
createExecEnabledConfig("window", defaultValue = true)
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
val COMET_EXEC_INITCAP_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("initCap", defaultValue = false)

val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
Expand Down Expand Up @@ -275,6 +277,13 @@ object CometConf extends ShimCometConf {
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("auto")

val COMET_SHUFFLE_FALLBACK_TO_COLUMNAR: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.fallbackToColumnar")
.doc("Whether to try falling back to columnar shuffle when native shuffle is not supported")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
Expand All @@ -293,12 +302,29 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported.")
.stringConf
.createWithDefault("zstd")
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and " +
"snappy are supported. Compression can be disabled by setting " +
"spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd", "lz4", "snappy"))
.createWithDefault("lz4")

val COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.zstd.level")
.doc("The compression level to use when compressing shuffle files with zstd.")
.intConf
.createWithDefault(1)

val COMET_SHUFFLE_ENABLE_FAST_ENCODING: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enableFastEncoding")
.doc("Whether to enable Comet's faster proprietary encoding for shuffle blocks " +
"rather than using Arrow IPC.")
.internal()
.booleanConf
.createWithDefault(true)

val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.async.enabled")
Expand Down Expand Up @@ -465,21 +491,21 @@ object CometConf extends ShimCometConf {
.intConf
.createWithDefault(8192)

val COMET_EXEC_MEMORY_FRACTION: ConfigEntry[Double] = conf("spark.comet.exec.memoryFraction")
.doc(
"The fraction of memory from Comet memory overhead that the native memory " +
"manager can use for execution. The purpose of this config is to set aside memory for " +
"untracked data structures, as well as imprecise size estimation during memory " +
"acquisition.")
.doubleConf
.createWithDefault(0.7)

val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
conf("spark.comet.parquet.enable.directBuffer")
.doc("Whether to use Java direct byte buffer when reading Parquet.")
.booleanConf
.createWithDefault(false)

val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool")
.doc(
"The type of memory pool to be used for Comet native execution. " +
"Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', " +
"'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, " +
"this config is 'greedy_task_shared'.")
.stringConf
.createWithDefault("greedy_task_shared")

val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.preFetch.enabled")
.doc("Whether to enable pre-fetching feature of CometScan.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ class NativeUtil {
case numRows =>
val cometVectors = importVector(arrays, schemas)
Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt))
case flag =>
throw new IllegalStateException(s"Invalid native flag: $flag")
}
}

Expand Down

This file was deleted.

Loading
Loading