-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-9065][Streaming][PySpark] Add MessageHandler for Kafka Python API #7410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #37305 has finished for PR 7410 at commit
|
Test build #37314 has finished for PR 7410 at commit
|
Test build #37348 has finished for PR 7410 at commit
|
Test build #1072 has finished for PR 7410 at commit
|
@jerryshao could you resolve the conflicts? Thanks. |
Yeah, will do. |
Test build #38295 has finished for PR 7410 at commit
|
Jenkins, retest this please. |
Test build #38309 has finished for PR 7410 at commit
|
Test build #85 has finished for PR 7410 at commit
|
private object KafkaUtilsPythonHelper { | ||
private var initialized = false | ||
|
||
def initialize(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not directly call SerDeUtil.initialize()
and new PythonMessageAndMetadataPickler().register()
in the static codes of KafkaUtilsPythonHelper
? It would be much simpler since it's only two lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. You want to call initialize
to load the object KafkaUtilsPythonHelper
in the closure. If so, how about adding a method in KafkaUtilsPythonHelper
, such as
object KafkaUtilsPythonHelper {
...
def createPicklerIterator: Iterator[Array[Byte]] = {
new SerDeUtil.AutoBatchedPickler(iter)
}
...
}
and call it in the closure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please share more details on it, I cannot exactly get what you mean. What I did here is to follow the pattern here in MLlib(https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala#L1370)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since initialize()
is already called in object KafkaUtilsPythonHelper
, the purpose of adding the initialize
method is only for loading the class of object KafkaUtilsPythonHelper
, right?
If changing it to
object KafkaUtilsPythonHelper {
SerDeUtil.initialize()
new PythonMessageAndMetadataPickler().register()
...
def createPicklerIterator: Iterator[Array[Byte]] = {
new SerDeUtil.AutoBatchedPickler(iter)
}
...
}
When calling KafkaUtilsPythonHelper.createPicklerIterator in the closure, SerDeUtil.initialize()
and new PythonMessageAndMetadataPickler().register()
will be called when loading class of object KafkaUtilsPythonHelper
.
Test build #38661 has finished for PR 7410 at commit
|
Sorry for the delay. LGTM except the conflicts. Could you resolve the conflicts again? |
OK, I will rebase the code. |
Test build #39815 has finished for PR 7410 at commit
|
Hi @tdas, would you please help to review this patch, thanks a lot. |
LGTM |
Hey all, this is a pretty good PR. Unfortunately, I havent been able to spend cycles on this as I was busy with higher priority PRs. The Java data being serialized in python does add some overheads, and introduces some data structures, that we need to think a bit. So I think I am going to bump this to Spark 1.6. But this is important feature for python parity so we are definitely going to have this in 1.6. |
@jerryshao Can you update this PR with latest master? |
Yeah, I will. |
Test build #43380 has finished for PR 7410 at commit
|
Jenkins, retest this please. |
Test build #43384 has finished for PR 7410 at commit
|
Test build #43512 has finished for PR 7410 at commit
|
Jenkins, retest this please. |
Test build #43521 has finished for PR 7410 at commit
|
Jenkins, retest this please. |
Test build #43524 has finished for PR 7410 at commit
|
@tdas , I've updated the code to the latest master, mind taking a time to review? Thanks a lot. |
Sorry to bump this, but I was wondering if was still the plan to merge this? I'd find this feature quite useful for something I'm building! 😄 |
@jerryshao could you update this one again... Thanks! |
Yeah, will do. |
Test build #45216 has finished for PR 7410 at commit
|
LGTM |
Hi @tdas, I understand you're quite busy, still I would like to know your opinion on this PR, I've rebased for several rounds, still pending to review. Thanks a lot and appreciated. |
@jerryshao I just fixed the Streaming Python tests to report test failures correctly #9669 . Could you rebase this one again? Sorry about that. |
@jerryshao sorry for so much delays on this PR. We needed to fix issues like #9669 before merging python streaming features. Hence the delay. Would you mind if we take over the feature to fix it up in the interest of time. You will get all the credit for this PR on github and JIRA. |
Hi @tdas , it's all right. sorry for the delay, I planned to address this conflict today. |
Fixed the merge conflicts in #7410 Closes #7410 Author: Shixiong Zhu <shixiong@databricks.com> Author: jerryshao <saisai.shao@intel.com> Author: jerryshao <sshao@hortonworks.com> Closes #9742 from zsxwing/pr7410. (cherry picked from commit 75a2922) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Propose a way to add messageHandler for Kafka Python API, offer user a similar pattern to use message like Scala/Java:
Internally serialize the Kafka
MessageAndMetadata
to Python using Pickler.Please help to review, thanks a lot.