Skip to content

Commit cde6af9

Browse files
committed
Parameterize converter trait
1 parent 5ebacfa commit cde6af9

File tree

4 files changed

+26
-14
lines changed

4 files changed

+26
-14
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ import org.apache.spark.annotation.Experimental
3131
* transformation code by overriding the convert method.
3232
*/
3333
@Experimental
34-
trait Converter {
35-
def convert(obj: Any): Any
34+
trait Converter[T, U] {
35+
def convert(obj: T): U
3636
}
3737

3838
/**
3939
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
4040
* Other objects are passed through without conversion.
4141
*/
42-
private[python] object DefaultConverter extends Converter {
42+
private[python] object DefaultConverter extends Converter[Any, Any] {
4343

4444
/**
4545
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
@@ -80,8 +80,8 @@ private[python] object DefaultConverter extends Converter {
8080
*/
8181
private[python] class ConverterRegistry extends Logging {
8282

83-
var keyConverter: Converter = DefaultConverter
84-
var valueConverter: Converter = DefaultConverter
83+
var keyConverter: Converter[Any, Any] = DefaultConverter
84+
var valueConverter: Converter[Any, Any] = DefaultConverter
8585

8686
def convertKey(obj: Any): Any = keyConverter.convert(obj)
8787

@@ -97,9 +97,9 @@ private[python] class ConverterRegistry extends Logging {
9797
logInfo(s"Loaded and registered value converter ($converterClass)")
9898
}
9999

100-
private def register(converterClass: String): Converter = {
100+
private def register(converterClass: String): Converter[Any, Any] = {
101101
Try {
102-
val converter = Class.forName(converterClass).newInstance().asInstanceOf[Converter]
102+
val converter = Class.forName(converterClass).newInstance().asInstanceOf[Converter[Any, Any]]
103103
converter
104104
} match {
105105
case Success(s) => s

core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten
5454
}
5555
}
5656

57-
class TestConverter extends Converter {
57+
class TestConverter extends Converter[Any, Any] {
5858
import collection.JavaConversions._
5959
override def convert(obj: Any) = {
6060
val m = obj.asInstanceOf[MapWritable]

examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,23 @@ import org.apache.cassandra.utils.ByteBufferUtil
66
import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap}
77

88

9-
class CassandraCQLKeyConverter extends Converter {
10-
override def convert(obj: Any) = {
9+
/**
10+
* Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra
11+
* output to a Map[String, Int]
12+
*/
13+
class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int]] {
14+
override def convert(obj: Any): java.util.Map[String, Int] = {
1115
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
1216
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb)))
1317
}
1418
}
1519

16-
class CassandraCQLValueConverter extends Converter {
17-
override def convert(obj: Any) = {
20+
/**
21+
* Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra
22+
* output to a Map[String, String]
23+
*/
24+
class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, String]] {
25+
override def convert(obj: Any): java.util.Map[String, String] = {
1826
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
1927
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
2028
}

examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@ import org.apache.spark.api.python.Converter
44
import org.apache.hadoop.hbase.client.Result
55
import org.apache.hadoop.hbase.util.Bytes
66

7-
class HBaseConverter extends Converter {
8-
override def convert(obj: Any) = {
7+
/**
8+
* Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result
9+
* to a String
10+
*/
11+
class HBaseConverter extends Converter[Any, String] {
12+
override def convert(obj: Any): String = {
913
val result = obj.asInstanceOf[Result]
1014
Bytes.toStringBinary(result.value())
1115
}

0 commit comments

Comments
 (0)