Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect external table statistics #1362

Closed
Tracked by #3
penghuo opened this issue Feb 21, 2023 · 1 comment
Closed
Tracked by #3

Collect external table statistics #1362

penghuo opened this issue Feb 21, 2023 · 1 comment
Assignees

Comments

@penghuo
Copy link
Collaborator

penghuo commented Feb 21, 2023

Tasks

  • collect external table statistics using streaming query.
  • backport DataSkippingReader from delta lake 1.2.1.
@penghuo penghuo closed this as completed Mar 3, 2023
@penghuo penghuo changed the title Directly read from S3. Collect external table statistics Mar 3, 2023
@penghuo penghuo reopened this Mar 3, 2023
@penghuo penghuo removed the untriaged label Mar 6, 2023
@penghuo
Copy link
Collaborator Author

penghuo commented Mar 7, 2023

Summary

Test Steps

  • create parquet files
import scala.util.Random
import spark.implicits._
val name = List("alice", "bob", "tom", "susan", "sean")
val df = spark.sparkContext.makeRDD(1 to 5).map(i => (name(Random.nextInt(name.size)), i)).toDF("name", "age")
df.coalesce(1).write.format("parquet").mode("append").save("/Users/penghuo/release/deltalake/parquet/p001")


  • create external table
CREATE EXTERNAL TABLE IF NOT EXISTS default.t001
(name STRING, age INT)
USING DELTA
LOCATION "/Users/penghuo/release/deltalake/parquet/p001"
TBLPROPERTIES ('auto_refresh'='true');

  • add more data
import scala.util.Random
import spark.implicits._
val name = List("alice", "bob", "tom", "susan", "sean")
val df = spark.sparkContext.makeRDD(11 to 20).map(i => (name(Random.nextInt(name.size)), i)).toDF("name", "age")
df.coalesce(1).write.format("parquet").mode("append").save("/Users/penghuo/release/deltalake/parquet/p001")

  • explain query plan
explain select * from t001 where age > 11;

== Physical Plan ==
*(1) Filter (isnotnull(age#1190) AND (age#1190 > 10))
+- *(1) ColumnarToRow
   +- FileScan parquet default.t001[name#1189,age#1190] Batched: true, DataFilters: [isnotnull(age#1190), (age#1190 > 10)], Format: Parquet, Location: **PreparedDeltaFileIndex[file:/Users/penghuo/release/deltalake/parquet/p001]**, PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,10)], ReadSchema: struct<name:string,age:int>
  • add more data
import scala.util.Random
import spark.implicits._
val name = List("alice", "bob", "tom", "susan", "sean")
val age = 15 to 30
val df = spark.sparkContext.makeRDD(age).map(i => (name(Random.nextInt(name.size)), i)).toDF("name", "age")
df.coalesce(1).write.format("parquet").mode("append").save("/Users/penghuo/release/deltalake/parquet/p001")

  • run query
select * from t001 where age > 20;
  • analyze query execution result. there are four file in parquet dir and only one file contain age between [15, 30]. with dataskipping optimization, only one file is scanned.
 ~/release/deltalake  tree parquet/p001
parquet/p001
├── _SUCCESS
├── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.checkpoint.parquet
│   ├── 00000000000000000001.json
│   ├── 00000000000000000002.checkpoint.parquet
│   ├── 00000000000000000002.json
│   └── _last_checkpoint
├── part-00000-19078ee0-4bb9-45b0-bb9f-e7dd63e77645-c000.snappy.parquet
├── part-00000-1e801e12-ee98-44a6-b251-a8c66fe6cd55-c000.snappy.parquet
├── part-00000-7242e389-9cda-47d8-855d-dd46ddced8f5-c000.snappy.parquet
└── part-00000-e4f4683b-0850-45b4-9e3a-c0b8a572a257-c000.snappy.parquet

Screenshot 2023-03-07 at 8 33 07 AM

@penghuo penghuo closed this as completed Mar 7, 2023
@penghuo penghuo self-assigned this Mar 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants