Skip to content

[SPARK-4969][STREAMING][PYTHON] Add binaryRecords to streaming #3803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 30 commits into from

Conversation

freeman-lab
Copy link
Contributor

In Spark 1.2 we added a binaryRecords input method for loading flat binary data. This format is useful for numerical array data, e.g. in scientific computing applications. This PR adds support for the same format in Streaming applications, where it is similarly useful, especially for streaming time series or sensor data.

Summary of additions

  • adding binaryRecordsStream to Spark Streaming
  • exposing binaryRecordsStream in the new PySpark Streaming
  • new unit tests in Scala and Python

This required adding an optional Hadoop configuration param to fileStream and FileInputStream, but was otherwise straightforward.

@tdas @davies

@SparkQA
Copy link

SparkQA commented Dec 25, 2014

Test build #24822 has started for PR 3803 at commit becb344.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 25, 2014

Test build #24822 has finished for PR 3803 at commit becb344.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FileInputDStream[K, V, F <: NewInputFormat[K,V]](

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24822/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 25, 2014

Test build #24823 has started for PR 3803 at commit fcb915c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 25, 2014

Test build #24823 has finished for PR 3803 at commit fcb915c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FileInputDStream[K, V, F <: NewInputFormat[K,V]](

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24823/
Test PASSed.

@freeman-lab freeman-lab changed the title [SPARK-4969] [STREAMING] [PYTHON] Add binaryRecords to streaming [SPARK-4969][STREAMING][PYTHON] Add binaryRecords to streaming Dec 25, 2014
Thread.sleep(1000)
// Set up the streaming context and input streams
val newConf = conf.clone.set(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is based on the FileInputStream test, which is known to be flaky. I have a PR open which rewrites that test to not depend on SystemClock / Thread.sleep(): #3801. Therefore, if we want to have this style of test, then this PR should block until my PR is merged so that it can use the new test utilities that I added.

Here's the relevant change from my PR: https://github.com/apache/spark/pull/3801/files#diff-4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok great, I'll wait for your PR to be merged and then refactor this test accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My PR has been merged, so I think this should be unblocked now.

@davies
Copy link
Contributor

davies commented Jan 5, 2015

The Python part look good to me, thanks.

@SparkQA
Copy link

SparkQA commented Jan 5, 2015

Test build #25066 has started for PR 3803 at commit 317b6d1.

  • This patch merges cleanly.

@freeman-lab
Copy link
Contributor Author

Thanks for the review! I'll wait for @JoshRosen 's PR to merge and then update the test here. And will wait for your thoughts on the getBytes issue. Otherwise, I think everything's addressed.

@freeman-lab
Copy link
Contributor Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Feb 1, 2015

Test build #26476 has started for PR 3803 at commit 14bca9a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 2, 2015

Test build #26476 has finished for PR 3803 at commit 14bca9a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FileInputDStream[K, V, F <: NewInputFormat[K,V]](

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26476/
Test PASSed.

* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make these parameters a superset of the parameters in the other fileStream? Otherwise it seems like people can either use a conf or use a filter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with me, though would require wiring the extra arguments (filter and newFilesOnly) through to all the binary record methods (because they'll also need the conf). Should we expose those arguments in binaryRecords? Or just use their defaults?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the wiring to binaryRecords? Those two parameters are only relevant to
DStream, so nothing to pass on down to the RDDs
On Feb 2, 2015 7:30 PM, "Jeremy Freeman" notifications@github.com wrote:

In
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
#3803 (comment):

@@ -361,6 +363,25 @@ class StreamingContext private[streaming] (

/**
* Create a input stream that monitors a Hadoop-compatible filesystem

  • * for new files and reads them using the given key-value types and input format.
  • * Files must be written to the monitored directory by "moving" them from another
  • * location within the same file system. File names starting with . are ignored.
  • * @param directory HDFS directory to monitor for new file
  • * @param conf Hadoop configuration
  • * @tparam K Key type for reading HDFS file
  • * @tparam V Value type for reading HDFS file
  • * @tparam F Input format for reading HDFS file
  • */
  • def fileStream[

Fine with me, though would require wiring the extra arguments (filter and
newFilesOnly) through to all the binary record methods (because they'll
also need the conf). Should we expose those arguments in binaryRecords?
Or just use their defaults?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3803/files#r23980541.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, meant exposing them to binaryRecordsStream. That's calling fileStream, and to include conf as an arg we'll also need to either specify filter and newFilesOnly or pass defaults, right?

@davies
Copy link
Contributor

davies commented Feb 3, 2015

The Python parts look good to me, thanks!

- Ensures the argument including conf is a superset of existing
arguments
- Use default versions for binaryRecordStream
@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26680 has started for PR 3803 at commit eba925c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26680 has finished for PR 3803 at commit eba925c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FileInputDStream[K, V, F <: NewInputFormat[K,V]](

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26680/
Test PASSed.

val data = br.map { case (k, v) =>
val bytes = v.getBytes
assert(bytes.length == recordLength, "Byte array does not have correct length")
bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is this something that the user should be made aware of in the docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean something more than these notes we're adding? I just clarified the notes a bit to make it obvious the check is on the byte array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant should the user be told that the system can throw error when the records are not of the expected size. I dont have any strong feeling on this, just wondering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, I think it's ok as is then. Given what FixedLengthInputFormat is doing, this is more a defensive assertion, it's not something the user should hit due to an inappropriate input.

@tdas
Copy link
Contributor

tdas commented Feb 4, 2015

LGTM. Just two minor nits. Can merge as soon as you take a look. This is great @freeman-lab

@freeman-lab
Copy link
Contributor Author

Thanks for the detailed look @tdas! Think I addressed both nits.

@SparkQA
Copy link

SparkQA commented Feb 4, 2015

Test build #26725 has started for PR 3803 at commit b676534.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 4, 2015

Test build #26725 has finished for PR 3803 at commit b676534.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FileInputDStream[K, V, F <: NewInputFormat[K,V]](

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26725/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Feb 4, 2015

Thanks Jeremy! Good clean patch!
On Feb 3, 2015 10:45 PM, "UCB AMPLab" notifications@github.com wrote:

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26725/
Test PASSed.


Reply to this email directly or view it on GitHub
#3803 (comment).

@asfgit asfgit closed this in 242b4f0 Feb 4, 2015
asfgit pushed a commit that referenced this pull request Feb 4, 2015
In Spark 1.2 we added a `binaryRecords` input method for loading flat binary data. This format is useful for numerical array data, e.g. in scientific computing applications. This PR adds support for the same format in Streaming applications, where it is similarly useful, especially for streaming time series or sensor data.

Summary of additions
- adding `binaryRecordsStream` to Spark Streaming
- exposing `binaryRecordsStream` in the new PySpark Streaming
- new unit tests in Scala and Python

This required adding an optional Hadoop configuration param to `fileStream` and `FileInputStream`, but was otherwise straightforward.

tdas davies

Author: freeman <the.freeman.lab@gmail.com>

Closes #3803 from freeman-lab/streaming-binary-records and squashes the following commits:

b676534 [freeman] Clarify note
5ff1b75 [freeman] Add note to java streaming context
eba925c [freeman] Simplify notes
c4237b8 [freeman] Add experimental tag
30eba67 [freeman] Add filter and newFilesOnly alongside conf
c2cfa6d [freeman] Expose new version of fileStream with conf in java
34d20ef [freeman] Add experimental tag
14bca9a [freeman] Add experimental tag
b85bffc [freeman] Formatting
47560f4 [freeman] Space formatting
9a3715a [freeman] Refactor to reflect changes to FileInputSuite
7373f73 [freeman] Add note and defensive assertion for byte length
3ceb684 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-binary-records
317b6d1 [freeman] Make test inline
fcb915c [freeman] Formatting
becb344 [freeman] Formatting
d3e75b2 [freeman] Add tests in python
a4324a3 [freeman] Line length
029d49c [freeman] Formatting
1c739aa [freeman] Simpler default arg handling
94d90d0 [freeman] Spelling
2843e9d [freeman] Add params to docstring
8b70fbc [freeman] Reorganization
28bff9b [freeman] Fix missing arg
9398bcb [freeman] Expose optional hadoop configuration
23dd69f [freeman] Tests for binaryRecordsStream
36cb0fd [freeman] Add binaryRecordsStream to scala
fe4e803 [freeman] Add binaryRecordStream to Java API
ecef0eb [freeman] Add binaryRecordsStream to python
8550c26 [freeman] Expose additional argument combination

(cherry picked from commit 242b4f0)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants