Skip to content

Commit

Permalink
[SPARK-6027][SPARK-5546] Fixed --jar and --packages not working for K…
Browse files Browse the repository at this point in the history
…afkaUtils and improved error message

The problem with SPARK-6027 in short is that JARs like the kafka-assembly.jar does not work in python as the added JAR is not visible in the classloader used by Py4J. Py4J uses Class.forName(), which does not uses the systemclassloader, but the JARs are only visible in the Thread's contextclassloader. So this back uses the context class loader to create the KafkaUtils dstream object. This works for both cases where the Kafka libraries are added with --jars spark-streaming-kafka-assembly.jar or with --packages spark-streaming-kafka

Also improves the error message.

davies

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#4779 from tdas/kafka-python-fix and squashes the following commits:

fb16b04 [Tathagata Das] Removed import
c1fdf35 [Tathagata Das] Fixed long line and improved documentation
7b88be8 [Tathagata Das] Fixed --jar not working for KafkaUtils and improved error message
  • Loading branch information
tdas authored and Andrew Or committed Feb 26, 2015
1 parent 8942b52 commit aa63f63
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.collection.JavaConversions._

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.{Decoder, StringDecoder}
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}

import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.{SparkContext, SparkException}
Expand Down Expand Up @@ -532,3 +532,30 @@ object KafkaUtils {
)
}
}

/**
* This is a helper class that wraps the KafkaUtils.createStream() into more
* Python-friendly class and function so that it can be easily
* instantiated and called from Python's KafkaUtils (see SPARK-6027).
*
* The zero-arg constructor helps instantiate this class from the Class object
* classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
* takes care of known parameters instead of passing them from Python
*/
private class KafkaUtilsPythonHelper {
def createStream(
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = {
KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
jssc,
classOf[Array[Byte]],
classOf[Array[Byte]],
classOf[DefaultDecoder],
classOf[DefaultDecoder],
kafkaParams,
topics,
storageLevel)
}
}
42 changes: 27 additions & 15 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

from py4j.java_collections import MapConverter
from py4j.java_gateway import java_import, Py4JError
from py4j.java_gateway import java_import, Py4JError, Py4JJavaError

from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer
Expand Down Expand Up @@ -50,8 +50,6 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A DStream object
"""
java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils")

kafkaParams.update({
"zookeeper.connect": zkQuorum,
"group.id": groupId,
Expand All @@ -63,20 +61,34 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)

def getClassByName(name):
return ssc._jvm.org.apache.spark.util.Utils.classForName(name)

try:
array = getClassByName("[B")
decoder = getClassByName("kafka.serializer.DefaultDecoder")
jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
jparam, jtopics, jlevel)
except Py4JError, e:
# Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
helper = helperClass.newInstance()
jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
if not e.message or 'call a package' in e.message:
print "No kafka package, please put the assembly jar into classpath:"
print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \
"scala-*/spark-streaming-kafka-assembly-*.jar"
if 'ClassNotFoundException' in str(e.java_exception):
print """
________________________________________________________________________________________________
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
1. Include the Kafka library and its dependencies with in the
spark-submit command as
$ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ...
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
Then, innclude the jar in the spark-submit command as
$ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
________________________________________________________________________________________________
""" % (ssc.sparkContext.version, ssc.sparkContext.version)
raise e
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
Expand Down

0 comments on commit aa63f63

Please sign in to comment.