Skip to content

[SPARK-6195] [SQL] Adds in-memory column type for fixed-precision decimals #4938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

liancheng
Copy link
Contributor

This PR adds a specialized in-memory column type for fixed-precision decimals.

For all other column types, a single integer column type ID is enough to determine which column type to use. However, this doesn't apply to fixed-precision decimal types with different precision and scale parameters. Moreover, according to the previous design, there seems no trivial way to encode precision and scale information into the columnar byte buffer. On the other hand, considering we always know the data type of the column to be built / scanned ahead of time. This PR no longer use column type ID to construct ColumnBuilders and ColumnAccessors, but resorts to the actual column data type. In this way, we can pass precision / scale information along the way.

The column type ID is now not used anymore and can be removed in a future PR.

Micro benchmark result

The following micro benchmark builds a simple table with 2 million decimals (precision = 10, scale = 0), cache it in memory, then count all the rows. Code (simply paste it into Spark shell):

import sc._
import sqlContext._
import sqlContext.implicits._
import org.apache.spark.sql.types._
import com.google.common.base.Stopwatch

def benchmark(n: Int)(f: => Long) {
  val stopwatch = new Stopwatch()

  def run() = {
    stopwatch.reset()
    stopwatch.start()
    f
    stopwatch.stop()
    stopwatch.elapsedMillis()
  }

  val records = (0 until n).map(_ => run())

  (0 until n).foreach(i => println(s"Round $i: ${records(i)} ms"))
  println(s"Average: ${records.sum / n.toDouble} ms")
}

// Explicit casting is required because ScalaReflection can't inspect decimal precision
parallelize(1 to 2000000)
  .map(i => Tuple1(Decimal(i, 10, 0)))
  .toDF("dec")
  .select($"dec" cast DecimalType(10, 0))
  .registerTempTable("dec")

sql("CACHE TABLE dec")
val df = table("dec")

// Warm up
df.count()
df.count()

benchmark(5) {
  df.count()
}

With FIXED_DECIMAL column type:

  • Round 0: 75 ms
  • Round 1: 97 ms
  • Round 2: 75 ms
  • Round 3: 70 ms
  • Round 4: 72 ms
  • Average: 77.8 ms

Without FIXED_DECIMAL column type:

  • Round 0: 1233 ms
  • Round 1: 1170 ms
  • Round 2: 1171 ms
  • Round 3: 1141 ms
  • Round 4: 1141 ms
  • Average: 1171.2 ms

Review on Reviewable

@SparkQA
Copy link

SparkQA commented Mar 7, 2015

Test build #28367 has started for PR 4938 at commit 4db713d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Mar 7, 2015

Test build #28367 has finished for PR 4938 at commit 4db713d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28367/
Test PASSed.


// The first 4 bytes in the buffer indicate the column type. This field is not used now,
// because we always know the data type of the column ahead of time.
dup.getInt()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this line is not necessary any more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call has side effect, still need to call it to read 4 bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, we can remove this line after removing the whole column type ID stuff.

@SparkQA
Copy link

SparkQA commented Mar 9, 2015

Test build #28391 has started for PR 4938 at commit e08ab5b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Mar 9, 2015

Test build #28391 has finished for PR 4938 at commit e08ab5b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28391/
Test PASSed.

We only enables the specialized column type when the precision is less
then 19.
@liancheng
Copy link
Contributor Author

Added micro benchmark result. Notice that explicit casting is required because ScalaReflection can't capture precision information. This doesn't affect use cases where schema are specified explicitly (Hive tables, parquet tables, etc.).

@SparkQA
Copy link

SparkQA commented Mar 14, 2015

Test build #28608 has started for PR 4938 at commit fef5338.

  • This patch merges cleanly.

@liancheng
Copy link
Contributor Author

/cc @yhuai, this should be helpful for the TPC-DS benchmark. Gonna merge this once Jenkins nods.

@SparkQA
Copy link

SparkQA commented Mar 14, 2015

Test build #28608 has finished for PR 4938 at commit fef5338.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28608/
Test PASSed.

@asfgit asfgit closed this in 5be6b0e Mar 14, 2015
@liancheng liancheng deleted the decimal-column-type branch April 4, 2015 13:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants