Skip to content

[SPARK-16114] [SQL] structured streaming event time window example #13957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

jjthomas
Copy link
Contributor

What changes were proposed in this pull request?

A structured streaming example with event time windowing.

How was this patch tested?

Run locally

@tdas
Copy link
Contributor

tdas commented Jun 29, 2016

use the same JIRA number as the previous one.

* dev0,7.0,2015-03-18T12:00:00
* dev1,8.0,2015-03-18T12:00:10
* dev0,5.0,2015-03-18T12:00:20
* dev1,3.0,2015-03-18T12:00:30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont like it that the user has to enter data in a particular format. that make this clumsy to use.

how about this. lets add an option in the socket source, that attaches the receiving time as a column in the data. so the returned schema would be (time, line) if that option is specified. then we can very easily write a simple sliding window word count example using that.
what do you think?

@jjthomas jjthomas changed the title structured streaming event time window example [SPARK-16114] [SQL] structured streaming event time window example Jun 30, 2016
@@ -67,7 +71,9 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext)
return
}
TextSocketSource.this.synchronized {
lines += line
lines += ((line,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
Copy link
Contributor

@tdas tdas Jun 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to create a format object every time. better to reuse.

@tdas
Copy link
Contributor

tdas commented Jul 1, 2016

ok to test

@@ -16,7 +16,7 @@
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Counts words in UTF8 encoded, '\n' delimited text received from the network.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61597 has finished for PR 13957 at commit 3e15cc9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// Split the lines into words, retaining timestamps
val words = lines.flatMap(line =>
line._1.split(" ")
Copy link
Contributor

@tdas tdas Jul 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put the following map in the same line.

@tdas
Copy link
Contributor

tdas commented Jul 1, 2016

this looks good for scala, please add the java and python.

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61635 has finished for PR 13957 at commit 57a8b11.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public final class JavaStructuredNetworkWordCountWindowed

words = lines.select(
# explode turns each item in an array into a separate row
explode(
split(lines.value, ' ')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks too nested, put explode and split in the same line.

@tdas
Copy link
Contributor

tdas commented Jul 1, 2016

this is looking good. a few cosmetic changes. can you update the windows operation section in the programming guide to use this code snippet? And link to the full source code

* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed
* localhost 9999 <window duration> <optional slide duration>`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

window duration in seconds...

public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port>" +
" <window duration in seconds> <optional slide duration in seconds>");
Copy link
Contributor

@tdas tdas Jul 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional stuff is usually written as [ ... ]. Add an example here with the 10 seconds and 5 seconds. 10 and 5 so that the user can start seeing the window immediately.

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61638 has finished for PR 13957 at commit 8bb543d.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), windowArg, slideArg),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

windowArg --> windowDuration
slideArg --> slideInterval

So that this code snippet in the guide is self explanatory.
And make sure its consistent in other languages.


# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, windowArg, slideArg),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename here

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61639 has finished for PR 13957 at commit e7a81e1.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

source = null
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add tests below to make sure includeTimestamp errors are checked.

@SparkQA
Copy link

SparkQA commented Jul 5, 2016

Test build #61789 has finished for PR 13957 at commit 893f70e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 5, 2016

Test build #61790 has finished for PR 13957 at commit 8f97b66.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Jul 12, 2016

LGTM. Merging this to master and 2.0. Thanks!

@asfgit asfgit closed this in 9e2c763 Jul 12, 2016
asfgit pushed a commit that referenced this pull request Jul 12, 2016
## What changes were proposed in this pull request?

A structured streaming example with event time windowing.

## How was this patch tested?

Run locally

Author: James Thomas <jamesjoethomas@gmail.com>

Closes #13957 from jjthomas/current.

(cherry picked from commit 9e2c763)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants