-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-4969][STREAMING][PYTHON] Add binaryRecords to streaming #3803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #24822 has started for PR 3803 at commit
|
Test build #24822 has finished for PR 3803 at commit
|
Test FAILed. |
Test build #24823 has started for PR 3803 at commit
|
Test build #24823 has finished for PR 3803 at commit
|
Test PASSed. |
Thread.sleep(1000) | ||
// Set up the streaming context and input streams | ||
val newConf = conf.clone.set( | ||
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this is based on the FileInputStream test, which is known to be flaky. I have a PR open which rewrites that test to not depend on SystemClock / Thread.sleep(): #3801. Therefore, if we want to have this style of test, then this PR should block until my PR is merged so that it can use the new test utilities that I added.
Here's the relevant change from my PR: https://github.com/apache/spark/pull/3801/files#diff-4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok great, I'll wait for your PR to be merged and then refactor this test accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My PR has been merged, so I think this should be unblocked now.
The Python part look good to me, thanks. |
Test build #25066 has started for PR 3803 at commit
|
Thanks for the review! I'll wait for @JoshRosen 's PR to merge and then update the test here. And will wait for your thoughts on the |
Jenkins, retest this please |
Test build #26476 has started for PR 3803 at commit
|
Test build #26476 has finished for PR 3803 at commit
|
Test PASSed. |
* @tparam V Value type for reading HDFS file | ||
* @tparam F Input format for reading HDFS file | ||
*/ | ||
def fileStream[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make these parameters a superset of the parameters in the other fileStream? Otherwise it seems like people can either use a conf or use a filter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine with me, though would require wiring the extra arguments (filter
and newFilesOnly
) through to all the binary record methods (because they'll also need the conf
). Should we expose those arguments in binaryRecords
? Or just use their defaults?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the wiring to binaryRecords? Those two parameters are only relevant to
DStream, so nothing to pass on down to the RDDs
On Feb 2, 2015 7:30 PM, "Jeremy Freeman" notifications@github.com wrote:
In
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
#3803 (comment):@@ -361,6 +363,25 @@ class StreamingContext private[streaming] (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
- * for new files and reads them using the given key-value types and input format.
- * Files must be written to the monitored directory by "moving" them from another
- * location within the same file system. File names starting with . are ignored.
- * @param directory HDFS directory to monitor for new file
- * @param conf Hadoop configuration
- * @tparam K Key type for reading HDFS file
- * @tparam V Value type for reading HDFS file
- * @tparam F Input format for reading HDFS file
- */
- def fileStream[
Fine with me, though would require wiring the extra arguments (filter and
newFilesOnly) through to all the binary record methods (because they'll
also need the conf). Should we expose those arguments in binaryRecords?
Or just use their defaults?—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3803/files#r23980541.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, meant exposing them to binaryRecordsStream
. That's calling fileStream
, and to include conf
as an arg we'll also need to either specify filter
and newFilesOnly
or pass defaults, right?
The Python parts look good to me, thanks! |
- Ensures the argument including conf is a superset of existing arguments - Use default versions for binaryRecordStream
Test build #26680 has started for PR 3803 at commit
|
Test build #26680 has finished for PR 3803 at commit
|
Test PASSed. |
val data = br.map { case (k, v) => | ||
val bytes = v.getBytes | ||
assert(bytes.length == recordLength, "Byte array does not have correct length") | ||
bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is this something that the user should be made aware of in the docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean something more than these notes we're adding? I just clarified the notes a bit to make it obvious the check is on the byte array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant should the user be told that the system can throw error when the records are not of the expected size. I dont have any strong feeling on this, just wondering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, I think it's ok as is then. Given what FixedLengthInputFormat
is doing, this is more a defensive assertion, it's not something the user should hit due to an inappropriate input.
LGTM. Just two minor nits. Can merge as soon as you take a look. This is great @freeman-lab |
Thanks for the detailed look @tdas! Think I addressed both nits. |
Test build #26725 has started for PR 3803 at commit
|
Test build #26725 has finished for PR 3803 at commit
|
Test PASSed. |
Thanks Jeremy! Good clean patch!
|
In Spark 1.2 we added a `binaryRecords` input method for loading flat binary data. This format is useful for numerical array data, e.g. in scientific computing applications. This PR adds support for the same format in Streaming applications, where it is similarly useful, especially for streaming time series or sensor data. Summary of additions - adding `binaryRecordsStream` to Spark Streaming - exposing `binaryRecordsStream` in the new PySpark Streaming - new unit tests in Scala and Python This required adding an optional Hadoop configuration param to `fileStream` and `FileInputStream`, but was otherwise straightforward. tdas davies Author: freeman <the.freeman.lab@gmail.com> Closes #3803 from freeman-lab/streaming-binary-records and squashes the following commits: b676534 [freeman] Clarify note 5ff1b75 [freeman] Add note to java streaming context eba925c [freeman] Simplify notes c4237b8 [freeman] Add experimental tag 30eba67 [freeman] Add filter and newFilesOnly alongside conf c2cfa6d [freeman] Expose new version of fileStream with conf in java 34d20ef [freeman] Add experimental tag 14bca9a [freeman] Add experimental tag b85bffc [freeman] Formatting 47560f4 [freeman] Space formatting 9a3715a [freeman] Refactor to reflect changes to FileInputSuite 7373f73 [freeman] Add note and defensive assertion for byte length 3ceb684 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-binary-records 317b6d1 [freeman] Make test inline fcb915c [freeman] Formatting becb344 [freeman] Formatting d3e75b2 [freeman] Add tests in python a4324a3 [freeman] Line length 029d49c [freeman] Formatting 1c739aa [freeman] Simpler default arg handling 94d90d0 [freeman] Spelling 2843e9d [freeman] Add params to docstring 8b70fbc [freeman] Reorganization 28bff9b [freeman] Fix missing arg 9398bcb [freeman] Expose optional hadoop configuration 23dd69f [freeman] Tests for binaryRecordsStream 36cb0fd [freeman] Add binaryRecordsStream to scala fe4e803 [freeman] Add binaryRecordStream to Java API ecef0eb [freeman] Add binaryRecordsStream to python 8550c26 [freeman] Expose additional argument combination (cherry picked from commit 242b4f0) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
In Spark 1.2 we added a
binaryRecords
input method for loading flat binary data. This format is useful for numerical array data, e.g. in scientific computing applications. This PR adds support for the same format in Streaming applications, where it is similarly useful, especially for streaming time series or sensor data.Summary of additions
binaryRecordsStream
to Spark StreamingbinaryRecordsStream
in the new PySpark StreamingThis required adding an optional Hadoop configuration param to
fileStream
andFileInputStream
, but was otherwise straightforward.@tdas @davies