Skip to content

[SPARK-26546][SQL] Caching of java.time.format.DateTimeFormatter #23462

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Jan 5, 2019

What changes were proposed in this pull request?

Added a cache for java.time.format.DateTimeFormatter instances with keys consist of pattern and locale. This should allow to avoid parsing of timestamp/date patterns each time when new instance of TimestampFormatter/DateFormatter is created.

How was this patch tested?

By existing test suites TimestampFormatterSuite/DateFormatterSuite and JsonFunctionsSuite/JsonSuite.

@SparkQA
Copy link

SparkQA commented Jan 5, 2019

Test build #100782 has finished for PR 23462 at commit aa3c146.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jan 5, 2019

@cloud-fan Please, take a look at the PR.

}

object DateTimeFormatterHelper {
private val cache = new ConcurrentHashMap[(String, Locale), DateTimeFormatter]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider cleaning up old entries in this map?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I was wondering about that too. This combinations looks huge.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In real life, locale is constant (Locale.US) and number of used date/timestamp patterns is small.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also answering my own question: the formatter is thread-safe, so this is fine.
I agree that this cache won't grow large as I can't imagine an app using more than a handful of distinct patterns and locales.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea but this can be performed row-based operation as a corner case. For instance, an expression that allows different formats for each value can make this cache grows a lot. It's unlikely but pretty possible. Also, most importantly currently it doesn't look super useful for the same reason of #23462 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. This could wrap some weak-ref based cache if we needed to.
Good point about whether this is really created a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use com.google.common.cache.Cache? It's used in several places inside Spark.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have significant concern of infinitely cache growing, I will switch on fixed size LRU cache here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to restrict growing of the cache. I replaced it by Guava's cache.

@cloud-fan
Copy link
Contributor

How helpful it is? I was convinced by you that the formatter is created per RDD partition and the creation time doesn't matter too much.

@HyukjinKwon
Copy link
Member

This looks also able to execute per record, for instance,

override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
val df = TimestampFormatter(format.toString, timeZone, Locale.US)
UTF8String.fromString(df.format(timestamp.asInstanceOf[Long]))
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
val tz = ctx.addReferenceObj("timeZone", timeZone)
val locale = ctx.addReferenceObj("locale", Locale.US)
defineCodeGen(ctx, ev, (timestamp, format) => {
s"""UTF8String.fromString($tf$$.MODULE$$.apply($format.toString(), $tz, $locale)
.format($timestamp))"""
})
}

@HyukjinKwon
Copy link
Member

BTW, we haven't never cached before even before switching the library in that code path.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jan 6, 2019

BTW, we haven't never cached before even before switching the library in that code path.

Look at Spark 2.4, It uses FastDateFormat in JSON and CSV (https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala#L83-L88) datasource which takes formatters from a cache internally implemented in the same way as in the PR via ConcurrentHashMap.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jan 6, 2019

I was convinced by you that the formatter is created per RDD partition and the creation time doesn't matter too much.

@cloud-fan This is true for current usage of the formatter and for the PR #23391 . This cache is for future usage to create formatter faster when it is hard to create it in advance.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jan 6, 2019

@MaxGekk, I mean here

override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
val df = TimestampFormatter(format.toString, timeZone, Locale.US)
UTF8String.fromString(df.format(timestamp.asInstanceOf[Long]))
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
val tz = ctx.addReferenceObj("timeZone", timeZone)
val locale = ctx.addReferenceObj("locale", Locale.US)
defineCodeGen(ctx, ev, (timestamp, format) => {
s"""UTF8String.fromString($tf$$.MODULE$$.apply($format.toString(), $tz, $locale)
.format($timestamp))"""
})
}
. For other code paths, I basically agree with Wenchen's #23462 (comment)

}

object DateTimeFormatterHelper {
private val cache = new ConcurrentHashMap[(String, Locale), DateTimeFormatter]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also answering my own question: the formatter is thread-safe, so this is fine.
I agree that this cache won't grow large as I can't imagine an app using more than a handful of distinct patterns and locales.


def getFormatter(pattern: String, locale: Locale): DateTimeFormatter = {
val key = (pattern, locale)
var formatter = cache.get(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computeIfAbsent ought to be simpler and more efficient here, to compute the value only if it isn't already present

Copy link
Member Author

@MaxGekk MaxGekk Jan 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it can block other threads during lambda computation: ...Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple... but this implementation does not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but that's necessary to avoid computing it more than once right? and only is an issue if multiple threads need the value at once, the first time. if it's blocking for milliseconds that seems OK. It would be an issue if it meant every subsequent access slowed down or was unnecessarily contended.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commons lang3 supports Java 7, so couldn't use computeIfAbsent. I presume it would if it could. I don't feel super strongly about it, but think we can take advantage of Java 8 here. It saves a second lookup, and in so doing, avoids the (fairly harmless) race condition here -- multiple threads can find the instance isn't cached and compute and try to put the result. It is still correct but not optimal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @srowen. If the blocking frequently occurs (i.e. key does not exist), this cache does not work effectively.
If a key frequently exists, the blocking will not occur frequently.

@SparkQA
Copy link

SparkQA commented Jan 6, 2019

Test build #100838 has finished for PR 23462 at commit f08c71e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jan 7, 2019

@hvanhovell Could you look at this, please.

@SparkQA
Copy link

SparkQA commented Jan 7, 2019

Test build #100904 has finished for PR 23462 at commit 61038ea.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 8, 2019

Test build #100908 has finished for PR 23462 at commit 68ec759.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jan 8, 2019

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Jan 8, 2019

Test build #100919 has finished for PR 23462 at commit 68ec759.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jan 8, 2019

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Jan 8, 2019

Test build #100920 has finished for PR 23462 at commit 68ec759.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private val cache = CacheBuilder.newBuilder()
.initialCapacity(8)
.maximumSize(128)
.expireAfterAccess(1, TimeUnit.HOURS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need the expire policy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I think it has to make a thread to deal with it and it's not worth it. Min size doesn't really matter either.

@cloud-fan
Copy link
Contributor

LGTM

private val cache = CacheBuilder.newBuilder()
.initialCapacity(8)
.maximumSize(128)
.expireAfterAccess(1, TimeUnit.HOURS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I think it has to make a thread to deal with it and it's not worth it. Min size doesn't really matter either.

@SparkQA
Copy link

SparkQA commented Jan 8, 2019

Test build #100946 has finished for PR 23462 at commit 0737991.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 8, 2019

Test build #100943 has finished for PR 23462 at commit 0ea07db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 9, 2019

Test build #100944 has finished for PR 23462 at commit b7413a4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def getOrCreateFormatter(pattern: String, locale: Locale): DateTimeFormatter = {
val key = (pattern, locale)
var formatter = cache.getIfPresent(key)
if (formatter == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment to say that, we intentionally drop the synchronized here, as the worst case is we create the same formatter more than once, which doesn't matter.

without the comment, I'm afraid people may open PRs to add the synchronized later, as they don't know the context.

Instant.from(zonedDateTime)
}

def getOrCreateFormatter(pattern: String, locale: Locale): DateTimeFormatter = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected

@SparkQA
Copy link

SparkQA commented Jan 9, 2019

Test build #100967 has finished for PR 23462 at commit c68778c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master

@asfgit asfgit closed this in 73c7b12 Jan 10, 2019
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

Added a cache for  java.time.format.DateTimeFormatter instances with keys consist of pattern and locale. This should allow to avoid parsing of timestamp/date patterns each time when new instance of `TimestampFormatter`/`DateFormatter` is created.

## How was this patch tested?

By existing test suites `TimestampFormatterSuite`/`DateFormatterSuite` and `JsonFunctionsSuite`/`JsonSuite`.

Closes apache#23462 from MaxGekk/time-formatter-caching.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@MaxGekk MaxGekk deleted the time-formatter-caching branch August 17, 2019 13:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants