-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-16114] [SQL] structured streaming event time window example #13957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
use the same JIRA number as the previous one. |
* dev0,7.0,2015-03-18T12:00:00 | ||
* dev1,8.0,2015-03-18T12:00:10 | ||
* dev0,5.0,2015-03-18T12:00:20 | ||
* dev1,3.0,2015-03-18T12:00:30 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont like it that the user has to enter data in a particular format. that make this clumsy to use.
how about this. lets add an option in the socket source, that attaches the receiving time as a column in the data. so the returned schema would be (time, line) if that option is specified. then we can very easily write a simple sliding window word count example using that.
what do you think?
@@ -67,7 +71,9 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext) | |||
return | |||
} | |||
TextSocketSource.this.synchronized { | |||
lines += line | |||
lines += ((line, | |||
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to create a format object every time. better to reuse.
ok to test |
@@ -16,7 +16,7 @@ | |||
# | |||
|
|||
""" | |||
Counts words in UTF8 encoded, '\n' delimited text received from the network every second. | |||
Counts words in UTF8 encoded, '\n' delimited text received from the network. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
Test build #61597 has finished for PR 13957 at commit
|
|
||
// Split the lines into words, retaining timestamps | ||
val words = lines.flatMap(line => | ||
line._1.split(" ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put the following map in the same line.
this looks good for scala, please add the java and python. |
Test build #61635 has finished for PR 13957 at commit
|
words = lines.select( | ||
# explode turns each item in an array into a separate row | ||
explode( | ||
split(lines.value, ' ') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks too nested, put explode and split in the same line.
this is looking good. a few cosmetic changes. can you update the windows operation section in the programming guide to use this code snippet? And link to the full source code |
* `$ nc -lk 9999` | ||
* and then run the example | ||
* `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed | ||
* localhost 9999 <window duration> <optional slide duration>` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
window duration in seconds...
public static void main(String[] args) throws Exception { | ||
if (args.length < 3) { | ||
System.err.println("Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port>" + | ||
" <window duration in seconds> <optional slide duration in seconds>"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional stuff is usually written as [ ... ]
. Add an example here with the 10 seconds and 5 seconds. 10 and 5 so that the user can start seeing the window immediately.
Test build #61638 has finished for PR 13957 at commit
|
|
||
// Group the data by window and word and compute the count of each group | ||
Dataset<Row> windowedCounts = words.groupBy( | ||
functions.window(words.col("timestamp"), windowArg, slideArg), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
windowArg --> windowDuration
slideArg --> slideInterval
So that this code snippet in the guide is self explanatory.
And make sure its consistent in other languages.
|
||
# Group the data by window and word and compute the count of each group | ||
windowedCounts = words.groupBy( | ||
window(words.timestamp, windowArg, slideArg), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename here
Test build #61639 has finished for PR 13957 at commit
|
source = null | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add tests below to make sure includeTimestamp errors are checked.
Test build #61789 has finished for PR 13957 at commit
|
Test build #61790 has finished for PR 13957 at commit
|
LGTM. Merging this to master and 2.0. Thanks! |
## What changes were proposed in this pull request? A structured streaming example with event time windowing. ## How was this patch tested? Run locally Author: James Thomas <jamesjoethomas@gmail.com> Closes #13957 from jjthomas/current. (cherry picked from commit 9e2c763) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
What changes were proposed in this pull request?
A structured streaming example with event time windowing.
How was this patch tested?
Run locally