Skip to content

[SPARK-9065][Streaming][PySpark] Add MessageHandler for Kafka Python API #7410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

jerryshao
Copy link
Contributor

Propose a way to add messageHandler for Kafka Python API, offer user a similar pattern to use message like Scala/Java:

def getOffset(m):
    return m and m.offset

KafkaUtils.createDirectStream(ssc, topic, kafkaParams, messageHandler=getOffset)

Internally serialize the Kafka MessageAndMetadata to Python using Pickler.

Please help to review, thanks a lot.

@jerryshao jerryshao changed the title [SPARK-8337][Streaming][Pyspark] Add MessageHandler for Kafka Python API [SPARK-8337][Streaming][PySpark] Add MessageHandler for Kafka Python API Jul 15, 2015
@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37305 has finished for PR 7410 at commit 798b047.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):
    • case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37314 has finished for PR 7410 at commit 0eabce4.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):

@jerryshao jerryshao changed the title [SPARK-8337][Streaming][PySpark] Add MessageHandler for Kafka Python API [SPARK-9065][Streaming][PySpark] Add MessageHandler for Kafka Python API Jul 15, 2015
@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37348 has finished for PR 7410 at commit f23d1cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):
    • case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #1072 has finished for PR 7410 at commit f23d1cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Jul 22, 2015

@jerryshao could you resolve the conflicts? Thanks.

@jerryshao
Copy link
Contributor Author

Yeah, will do.

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38295 has finished for PR 7410 at commit a09423e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):
    • case class ChangePrecision(child: Expression) extends UnaryExpression
    • case class DecimalType(precision: Int, scale: Int) extends FractionalType
    • case class DecimalConversion(precision: Int, scale: Int) extends JDBCConversion

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38309 has finished for PR 7410 at commit a09423e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):
    • case class ChangePrecision(child: Expression) extends UnaryExpression
    • abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable with Unevaluable
    • abstract class AggregateFunction1 extends LeafExpression with Serializable
    • case class DecimalType(precision: Int, scale: Int) extends FractionalType
    • case class DecimalConversion(precision: Int, scale: Int) extends JDBCConversion

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #85 has finished for PR 7410 at commit a09423e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):
    • case class ChangePrecision(child: Expression) extends UnaryExpression
    • abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable with Unevaluable
    • abstract class AggregateFunction1 extends LeafExpression with Serializable
    • case class DecimalType(precision: Int, scale: Int) extends FractionalType
    • case class DecimalConversion(precision: Int, scale: Int) extends JDBCConversion

private object KafkaUtilsPythonHelper {
private var initialized = false

def initialize(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not directly call SerDeUtil.initialize() and new PythonMessageAndMetadataPickler().register() in the static codes of KafkaUtilsPythonHelper? It would be much simpler since it's only two lines.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. You want to call initialize to load the object KafkaUtilsPythonHelper in the closure. If so, how about adding a method in KafkaUtilsPythonHelper, such as

object KafkaUtilsPythonHelper {
...
def createPicklerIterator: Iterator[Array[Byte]] = {
  new SerDeUtil.AutoBatchedPickler(iter)
}
...
}

and call it in the closure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please share more details on it, I cannot exactly get what you mean. What I did here is to follow the pattern here in MLlib(https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala#L1370)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since initialize() is already called in object KafkaUtilsPythonHelper, the purpose of adding the initialize method is only for loading the class of object KafkaUtilsPythonHelper, right?

If changing it to

object KafkaUtilsPythonHelper {

  SerDeUtil.initialize()
  new PythonMessageAndMetadataPickler().register()

...
  def createPicklerIterator: Iterator[Array[Byte]] = {
    new SerDeUtil.AutoBatchedPickler(iter)
  }
...
}

When calling KafkaUtilsPythonHelper.createPicklerIterator in the closure, SerDeUtil.initialize() and new PythonMessageAndMetadataPickler().register() will be called when loading class of object KafkaUtilsPythonHelper.

@SparkQA
Copy link

SparkQA commented Jul 28, 2015

Test build #38661 has finished for PR 7410 at commit 348e3a2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):

@zsxwing
Copy link
Member

zsxwing commented Aug 5, 2015

Sorry for the delay. LGTM except the conflicts. Could you resolve the conflicts again?

@jerryshao
Copy link
Contributor Author

OK, I will rebase the code.

@SparkQA
Copy link

SparkQA commented Aug 5, 2015

Test build #39815 has finished for PR 7410 at commit f375e16.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):

@jerryshao
Copy link
Contributor Author

Hi @tdas, would you please help to review this patch, thanks a lot.

@zsxwing
Copy link
Member

zsxwing commented Aug 5, 2015

LGTM

@tdas
Copy link
Contributor

tdas commented Aug 6, 2015

Hey all, this is a pretty good PR. Unfortunately, I havent been able to spend cycles on this as I was busy with higher priority PRs. The Java data being serialized in python does add some overheads, and introduces some data structures, that we need to think a bit. So I think I am going to bump this to Spark 1.6. But this is important feature for python parity so we are definitely going to have this in 1.6.

@tdas
Copy link
Contributor

tdas commented Oct 7, 2015

@jerryshao Can you update this PR with latest master?

@jerryshao
Copy link
Contributor Author

Yeah, I will.

@SparkQA
Copy link

SparkQA commented Oct 8, 2015

Test build #43380 has finished for PR 7410 at commit 2324517.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Oct 8, 2015

Test build #43384 has finished for PR 7410 at commit 2324517.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):

@SparkQA
Copy link

SparkQA commented Oct 10, 2015

Test build #43512 has finished for PR 7410 at commit a794127.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class ChildProcAppHandle implements SparkAppHandle
    • abstract class LauncherConnection implements Closeable, Runnable
    • final class LauncherProtocol
    • static class Message implements Serializable
    • static class Hello extends Message
    • static class SetAppId extends Message
    • static class SetState extends Message
    • static class Stop extends Message
    • class LauncherServer implements Closeable
    • class NamedThreadFactory implements ThreadFactory
    • class OutputRedirector
    • class KafkaMessageAndMetadata(object):

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Oct 10, 2015

Test build #43521 has finished for PR 7410 at commit a794127.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Oct 10, 2015

Test build #43524 has finished for PR 7410 at commit a794127.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PythonMessageAndMetadata(
    • class PythonMessageAndMetadataPickler extends IObjectPickler
    • class KafkaMessageAndMetadata(object):

@jerryshao
Copy link
Contributor Author

@tdas , I've updated the code to the latest master, mind taking a time to review? Thanks a lot.

@ontarionick
Copy link
Contributor

Sorry to bump this, but I was wondering if was still the plan to merge this? I'd find this feature quite useful for something I'm building! 😄

@zsxwing
Copy link
Member

zsxwing commented Nov 5, 2015

@jerryshao could you update this one again... Thanks!

@jerryshao
Copy link
Contributor Author

Yeah, will do.

@SparkQA
Copy link

SparkQA commented Nov 6, 2015

Test build #45216 has finished for PR 7410 at commit ed4d179.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, LongArray>\n * final class UnsafeSortDataFormat extends SortDataFormat<RecordPointerAndKeyPrefix, LongArray>\n * case class PythonMessageAndMetadata(\n * class PythonMessageAndMetadataPickler extends IObjectPickler\n * class KafkaMessageAndMetadata(object):\n

@zsxwing
Copy link
Member

zsxwing commented Nov 6, 2015

LGTM

@jerryshao
Copy link
Contributor Author

Hi @tdas, I understand you're quite busy, still I would like to know your opinion on this PR, I've rebased for several rounds, still pending to review. Thanks a lot and appreciated.

@zsxwing
Copy link
Member

zsxwing commented Nov 13, 2015

@jerryshao I just fixed the Streaming Python tests to report test failures correctly #9669 . Could you rebase this one again? Sorry about that.

@tdas
Copy link
Contributor

tdas commented Nov 16, 2015

@jerryshao sorry for so much delays on this PR. We needed to fix issues like #9669 before merging python streaming features. Hence the delay. Would you mind if we take over the feature to fix it up in the interest of time. You will get all the credit for this PR on github and JIRA.

@jerryshao
Copy link
Contributor Author

Hi @tdas , it's all right. sorry for the delay, I planned to address this conflict today.

asfgit pushed a commit that referenced this pull request Nov 18, 2015
Fixed the merge conflicts in #7410

Closes #7410

Author: Shixiong Zhu <shixiong@databricks.com>
Author: jerryshao <saisai.shao@intel.com>
Author: jerryshao <sshao@hortonworks.com>

Closes #9742 from zsxwing/pr7410.

(cherry picked from commit 75a2922)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 75a2922 Nov 18, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants