-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26546][SQL] Caching of java.time.format.DateTimeFormatter #23462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #100782 has finished for PR 23462 at commit
|
@cloud-fan Please, take a look at the PR. |
} | ||
|
||
object DateTimeFormatterHelper { | ||
private val cache = new ConcurrentHashMap[(String, Locale), DateTimeFormatter]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to consider cleaning up old entries in this map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I was wondering about that too. This combinations looks huge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In real life, locale is constant (Locale.US
) and number of used date/timestamp patterns is small.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also answering my own question: the formatter is thread-safe, so this is fine.
I agree that this cache won't grow large as I can't imagine an app using more than a handful of distinct patterns and locales.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea but this can be performed row-based operation as a corner case. For instance, an expression that allows different formats for each value can make this cache grows a lot. It's unlikely but pretty possible. Also, most importantly currently it doesn't look super useful for the same reason of #23462 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. This could wrap some weak-ref based cache if we needed to.
Good point about whether this is really created a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use com.google.common.cache.Cache
? It's used in several places inside Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have significant concern of infinitely cache growing, I will switch on fixed size LRU cache here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to restrict growing of the cache. I replaced it by Guava's cache.
How helpful it is? I was convinced by you that the formatter is created per RDD partition and the creation time doesn't matter too much. |
This looks also able to execute per record, for instance, Lines 564 to 577 in e0054b8
|
BTW, we haven't never cached before even before switching the library in that code path. |
Look at Spark 2.4, It uses FastDateFormat in JSON and CSV (https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala#L83-L88) datasource which takes formatters from a cache internally implemented in the same way as in the PR via |
@cloud-fan This is true for current usage of the formatter and for the PR #23391 . This cache is for future usage to create formatter faster when it is hard to create it in advance. |
@MaxGekk, I mean here Lines 564 to 577 in e0054b8
|
} | ||
|
||
object DateTimeFormatterHelper { | ||
private val cache = new ConcurrentHashMap[(String, Locale), DateTimeFormatter]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also answering my own question: the formatter is thread-safe, so this is fine.
I agree that this cache won't grow large as I can't imagine an app using more than a handful of distinct patterns and locales.
|
||
def getFormatter(pattern: String, locale: Locale): DateTimeFormatter = { | ||
val key = (pattern, locale) | ||
var formatter = cache.get(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
computeIfAbsent
ought to be simpler and more efficient here, to compute the value only if it isn't already present
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but it can block other threads during lambda computation: ...Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple...
but this implementation does not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but that's necessary to avoid computing it more than once right? and only is an issue if multiple threads need the value at once, the first time. if it's blocking for milliseconds that seems OK. It would be an issue if it meant every subsequent access slowed down or was unnecessarily contended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just follow implementation of FastDateFormat from Apache Commons lang3: https://github.com/apache/commons-lang/blob/8e8b8e05e4eb9aa009444c2fea3552d28b57aa98/src/main/java/org/apache/commons/lang3/time/FormatCache.java#L71-L91
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commons lang3 supports Java 7, so couldn't use computeIfAbsent. I presume it would if it could. I don't feel super strongly about it, but think we can take advantage of Java 8 here. It saves a second lookup, and in so doing, avoids the (fairly harmless) race condition here -- multiple threads can find the instance isn't cached and compute and try to put the result. It is still correct but not optimal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @srowen. If the blocking frequently occurs (i.e. key does not exist), this cache does not work effectively.
If a key frequently exists, the blocking will not occur frequently.
Test build #100838 has finished for PR 23462 at commit
|
@hvanhovell Could you look at this, please. |
Test build #100904 has finished for PR 23462 at commit
|
Test build #100908 has finished for PR 23462 at commit
|
jenkins, retest this, please |
Test build #100919 has finished for PR 23462 at commit
|
jenkins, retest this, please |
Test build #100920 has finished for PR 23462 at commit
|
private val cache = CacheBuilder.newBuilder() | ||
.initialCapacity(8) | ||
.maximumSize(128) | ||
.expireAfterAccess(1, TimeUnit.HOURS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need the expire policy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I think it has to make a thread to deal with it and it's not worth it. Min size doesn't really matter either.
LGTM |
private val cache = CacheBuilder.newBuilder() | ||
.initialCapacity(8) | ||
.maximumSize(128) | ||
.expireAfterAccess(1, TimeUnit.HOURS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I think it has to make a thread to deal with it and it's not worth it. Min size doesn't really matter either.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
Outdated
Show resolved
Hide resolved
Test build #100946 has finished for PR 23462 at commit
|
Test build #100943 has finished for PR 23462 at commit
|
Test build #100944 has finished for PR 23462 at commit
|
def getOrCreateFormatter(pattern: String, locale: Locale): DateTimeFormatter = { | ||
val key = (pattern, locale) | ||
var formatter = cache.getIfPresent(key) | ||
if (formatter == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a comment to say that, we intentionally drop the synchronized
here, as the worst case is we create the same formatter more than once, which doesn't matter.
without the comment, I'm afraid people may open PRs to add the synchronized
later, as they don't know the context.
Instant.from(zonedDateTime) | ||
} | ||
|
||
def getOrCreateFormatter(pattern: String, locale: Locale): DateTimeFormatter = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected
Test build #100967 has finished for PR 23462 at commit
|
Merged to master |
## What changes were proposed in this pull request? Added a cache for java.time.format.DateTimeFormatter instances with keys consist of pattern and locale. This should allow to avoid parsing of timestamp/date patterns each time when new instance of `TimestampFormatter`/`DateFormatter` is created. ## How was this patch tested? By existing test suites `TimestampFormatterSuite`/`DateFormatterSuite` and `JsonFunctionsSuite`/`JsonSuite`. Closes apache#23462 from MaxGekk/time-formatter-caching. Lead-authored-by: Maxim Gekk <max.gekk@gmail.com> Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
Added a cache for java.time.format.DateTimeFormatter instances with keys consist of pattern and locale. This should allow to avoid parsing of timestamp/date patterns each time when new instance of
TimestampFormatter
/DateFormatter
is created.How was this patch tested?
By existing test suites
TimestampFormatterSuite
/DateFormatterSuite
andJsonFunctionsSuite
/JsonSuite
.