diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index af04bc6576148..62a659518943d 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConversions._ import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata -import kafka.serializer.{Decoder, StringDecoder} +import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder} import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.{SparkContext, SparkException} @@ -532,3 +532,30 @@ object KafkaUtils { ) } } + +/** + * This is a helper class that wraps the KafkaUtils.createStream() into more + * Python-friendly class and function so that it can be easily + * instantiated and called from Python's KafkaUtils (see SPARK-6027). + * + * The zero-arg constructor helps instantiate this class from the Class object + * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream() + * takes care of known parameters instead of passing them from Python + */ +private class KafkaUtilsPythonHelper { + def createStream( + jssc: JavaStreamingContext, + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = { + KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( + jssc, + classOf[Array[Byte]], + classOf[Array[Byte]], + classOf[DefaultDecoder], + classOf[DefaultDecoder], + kafkaParams, + topics, + storageLevel) + } +} diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 19ad71f99d4d5..0002dc10e8a17 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -16,7 +16,7 @@ # from py4j.java_collections import MapConverter -from py4j.java_gateway import java_import, Py4JError +from py4j.java_gateway import java_import, Py4JError, Py4JJavaError from pyspark.storagelevel import StorageLevel from pyspark.serializers import PairDeserializer, NoOpSerializer @@ -50,8 +50,6 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, :param valueDecoder: A function used to decode value (default is utf8_decoder) :return: A DStream object """ - java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils") - kafkaParams.update({ "zookeeper.connect": zkQuorum, "group.id": groupId, @@ -63,20 +61,34 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) jlevel = ssc._sc._getJavaStorageLevel(storageLevel) - def getClassByName(name): - return ssc._jvm.org.apache.spark.util.Utils.classForName(name) - try: - array = getClassByName("[B") - decoder = getClassByName("kafka.serializer.DefaultDecoder") - jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder, - jparam, jtopics, jlevel) - except Py4JError, e: + # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027) + helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ + .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper") + helper = helperClass.newInstance() + jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel) + except Py4JJavaError, e: # TODO: use --jar once it also work on driver - if not e.message or 'call a package' in e.message: - print "No kafka package, please put the assembly jar into classpath:" - print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \ - "scala-*/spark-streaming-kafka-assembly-*.jar" + if 'ClassNotFoundException' in str(e.java_exception): + print """ +________________________________________________________________________________________________ + + Spark Streaming's Kafka libraries not found in class path. Try one of the following. + + 1. Include the Kafka library and its dependencies with in the + spark-submit command as + + $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ... + + 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, + Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s. + Then, innclude the jar in the spark-submit command as + + $ bin/spark-submit --jars ... + +________________________________________________________________________________________________ + +""" % (ssc.sparkContext.version, ssc.sparkContext.version) raise e ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) stream = DStream(jstream, ssc, ser)