-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 APIs #20445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
*/ | ||
def fullOutput: Seq[AttributeReference] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan This fixes the bug I spoke to you offline about.
The target of this PR is only master, not 2.3.x. So if you want to have this fix in 2.3.0, please make a separate PR accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this PR has to be merged to 2.3.0 branch does it require more additional changes?
Test build #86857 has finished for PR 20445 at commit
|
Test build #86855 has finished for PR 20445 at commit
|
Test build #86950 has finished for PR 20445 at commit
|
This PR is currently blocked by a DataSourceV2ScanExec bug, which is being fixed in this PR #20387 |
Optional.empty()) | ||
|
||
(s, Some(s.getEndOffset)) | ||
reportTimeTaken("setOffsetRange") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the old metric names don't make much sense anymore, but I worry about changing external-facing behavior as part of an API migration.
@@ -89,7 +96,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |||
|
|||
def addData(data: TraversableOnce[A]): Offset = { | |||
val encoded = data.toVector.map(d => encoder.toRow(d).copy()) | |||
val plan = new LocalRelation(schema.toAttributes, encoded, isStreaming = true) | |||
val plan = new LocalRelation(attributes, encoded, isStreaming = false) | |||
val ds = Dataset[A](sqlContext.sparkSession, plan) | |||
logDebug(s"Adding ds: $ds") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need to store the batches as datasets, now that we're just collect()ing them back out in createDataReaderFactories()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
Test build #86951 has finished for PR 20445 at commit
|
Test build #86960 has finished for PR 20445 at commit
|
Test build #87010 has finished for PR 20445 at commit
|
Test build #87018 has finished for PR 20445 at commit
|
jenkins retest this please |
Test build #87122 has finished for PR 20445 at commit
|
jenkins retest this please. |
Test build #87133 has finished for PR 20445 at commit
|
Test build #87144 has finished for PR 20445 at commit
|
Jenkins retest this please |
ForeachSinkSuite.Process(value = 4), | ||
ForeachSinkSuite.Close(None) | ||
) | ||
val events = ForeachSinkSuite.allEvents() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test assumed that the output would arrive in specific order after repartitioning, which isnt guaranteed. So I rewrote the test to verify the output in an order-independent way.
Retest this please |
@@ -149,18 +149,12 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |||
} | |||
|
|||
private def generateDebugString( | |||
blocks: Iterable[Array[UnsafeRow]], | |||
blocks: Seq[UnsafeRow], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's probably more "rows" than "blocks" now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right! i thought of changing but forgot. my bad.
LGTM pending passing run of that HiveDDLSuite test |
Test build #87174 has finished for PR 20445 at commit
|
seems like an unrelated flaky test ^ |
Test build #87176 has finished for PR 20445 at commit
|
Merging to master. |
## What changes were proposed in this pull request? This PR migrates the MemoryStream to DataSourceV2 APIs. One additional change is in the reported keys in StreamingQueryProgress.durationMs. "getOffset" and "getBatch" replaced with "setOffsetRange" and "getEndOffset" as tracking these make more sense. Unit tests changed accordingly. ## How was this patch tested? Existing unit tests, few updated unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Burak Yavuz <brkyvz@gmail.com> Closes apache#20445 from tdas/SPARK-23092.
This PR migrates the MemoryStream to DataSourceV2 APIs. One additional change is in the reported keys in StreamingQueryProgress.durationMs. "getOffset" and "getBatch" replaced with "setOffsetRange" and "getEndOffset" as tracking these make more sense. Unit tests changed accordingly. Existing unit tests, few updated unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Burak Yavuz <brkyvz@gmail.com> Closes apache#20445 from tdas/SPARK-23092. Ref: LIHADOOP-48531 RB=1832973 G=superfriends-reviewers R=latang,yezhou,zolin,mshen,fli A=
What changes were proposed in this pull request?
This PR migrates the MemoryStream to DataSourceV2 APIs.
One additional change is in the reported keys in StreamingQueryProgress.durationMs. "getOffset" and "getBatch" replaced with "setOffsetRange" and "getEndOffset" as tracking these make more sense. Unit tests changed accordingly.
How was this patch tested?
Existing unit tests, few updated unit tests.