@@ -37,7 +37,7 @@ object CatalystTypeConverters {
37
37
// Since the map values can be mutable, we explicitly import scala.collection.Map at here.
38
38
import scala .collection .Map
39
39
40
- private def getConverterForType (dataType : DataType ): CatalystTypeConverter [Any , Any ] = {
40
+ private def getConverterForType (dataType : DataType ): CatalystTypeConverter [Any , Any , Any ] = {
41
41
val converter = dataType match {
42
42
case udt : UserDefinedType [_] => UDTConverter (udt)
43
43
case arrayType : ArrayType => ArrayConverter (arrayType.elementType)
@@ -55,81 +55,78 @@ object CatalystTypeConverters {
55
55
case DoubleType => DoubleConverter
56
56
case _ => IdentityConverter
57
57
}
58
- converter.asInstanceOf [CatalystTypeConverter [Any , Any ]]
58
+ converter.asInstanceOf [CatalystTypeConverter [Any , Any , Any ]]
59
59
}
60
60
61
61
/**
62
62
* Converts a Scala type to its Catalyst equivalent (and vice versa).
63
+ *
64
+ * @tparam ScalaInputType The type of Scala values that can be converted to Catalyst.
65
+ * @tparam ScalaOutputType The type of Scala values returned when converting Catalyst to Scala.
66
+ * @tparam CatalystType The internal Catalyst type used to represent values of this Scala type.
63
67
*/
64
- private abstract class CatalystTypeConverter [ScalaType , CatalystType ] extends Serializable {
68
+ private abstract class CatalystTypeConverter [ScalaInputType , ScalaOutputType , CatalystType ]
69
+ extends Serializable {
65
70
66
71
/**
67
72
* Converts a Scala type to its Catalyst equivalent while automatically handling nulls
68
73
* and Options.
69
74
*/
70
75
final def toCatalyst (@ Nullable maybeScalaValue : Any ): CatalystType = {
71
76
maybeScalaValue match {
72
- case opt : Option [ScalaType ] =>
77
+ case opt : Option [ScalaInputType ] =>
73
78
if (opt.isDefined) {
74
79
toCatalystImpl(opt.get)
75
80
} else {
76
81
null .asInstanceOf [CatalystType ]
77
82
}
78
83
case null => null .asInstanceOf [CatalystType ]
79
- case scalaValue : ScalaType => toCatalystImpl(scalaValue)
84
+ case scalaValue : ScalaInputType => toCatalystImpl(scalaValue)
80
85
}
81
86
}
82
87
83
88
/**
84
89
* Given a Catalyst row, convert the value at column `column` to its Scala equivalent.
85
90
*/
86
- final def toScala (row : Row , column : Int ): Any = {
87
- if (row.isNullAt(column)) null else toScalaImpl(row, column)
91
+ final def toScala (row : Row , column : Int ): ScalaOutputType = {
92
+ if (row.isNullAt(column)) null . asInstanceOf [ ScalaOutputType ] else toScalaImpl(row, column)
88
93
}
89
94
90
95
/**
91
96
* Convert a Catalyst value to its Scala equivalent.
92
97
*/
93
- def toScala (@ Nullable catalystValue : CatalystType ): ScalaType
98
+ def toScala (@ Nullable catalystValue : CatalystType ): ScalaOutputType
94
99
95
100
/**
96
101
* Converts a Scala value to its Catalyst equivalent.
97
102
* @param scalaValue the Scala value, guaranteed not to be null.
98
103
* @return the Catalyst value.
99
104
*/
100
- protected def toCatalystImpl (scalaValue : ScalaType ): CatalystType
105
+ protected def toCatalystImpl (scalaValue : ScalaInputType ): CatalystType
101
106
102
107
/**
103
108
* Given a Catalyst row, convert the value at column `column` to its Scala equivalent.
104
109
* This method will only be called on non-null columns.
105
110
*/
106
- protected def toScalaImpl (row : Row , column : Int ): ScalaType
107
- }
108
-
109
- /**
110
- * Convenience wrapper to write type converters for primitives. We use a converter for primitives
111
- * so that we can use type-specific field accessors when converting Catalyst rows to Scala rows.
112
- */
113
- private abstract class PrimitiveCatalystTypeConverter [T ] extends CatalystTypeConverter [T , T ] {
114
- override final def toScala (catalystValue : T ): T = catalystValue
115
- override final def toCatalystImpl (scalaValue : T ): T = scalaValue
111
+ protected def toScalaImpl (row : Row , column : Int ): ScalaOutputType
116
112
}
117
113
118
- private object IdentityConverter extends CatalystTypeConverter [Any , Any ] {
114
+ private object IdentityConverter extends CatalystTypeConverter [Any , Any , Any ] {
119
115
override def toCatalystImpl (scalaValue : Any ): Any = scalaValue
120
116
override def toScala (catalystValue : Any ): Any = catalystValue
121
117
override def toScalaImpl (row : Row , column : Int ): Any = row(column)
122
118
}
123
119
124
- private case class UDTConverter (udt : UserDefinedType [_]) extends CatalystTypeConverter [Any , Any ] {
120
+ private case class UDTConverter (
121
+ udt : UserDefinedType [_]) extends CatalystTypeConverter [Any , Any , Any ] {
125
122
override def toCatalystImpl (scalaValue : Any ): Any = udt.serialize(scalaValue)
126
123
override def toScala (catalystValue : Any ): Any = udt.deserialize(catalystValue)
127
124
override def toScalaImpl (row : Row , column : Int ): Any = toScala(row(column))
128
125
}
129
126
130
- // Converter for array, seq, iterables.
127
+ /** Converter for arrays, sequences, and Java iterables. */
131
128
private case class ArrayConverter (
132
- elementType : DataType ) extends CatalystTypeConverter [Any , Seq [Any ]] {
129
+ elementType : DataType ) extends CatalystTypeConverter [Any , Seq [Any ], Seq [ Any ] ] {
133
130
134
131
private [this ] val elementConverter = getConverterForType(elementType)
135
132
@@ -162,8 +159,8 @@ object CatalystTypeConverters {
162
159
163
160
private case class MapConverter (
164
161
keyType : DataType ,
165
- valueType : DataType
166
- ) extends CatalystTypeConverter [Any , Map [Any , Any ]] {
162
+ valueType : DataType )
163
+ extends CatalystTypeConverter [Any , Map [ Any , Any ] , Map [Any , Any ]] {
167
164
168
165
private [this ] val keyConverter = getConverterForType(keyType)
169
166
private [this ] val valueConverter = getConverterForType(valueType)
@@ -200,7 +197,7 @@ object CatalystTypeConverters {
200
197
}
201
198
202
199
private case class StructConverter (
203
- structType : StructType ) extends CatalystTypeConverter [Any , Row ] {
200
+ structType : StructType ) extends CatalystTypeConverter [Any , Row , Row ] {
204
201
205
202
private [this ] val converters = structType.fields.map { f => getConverterForType(f.dataType) }
206
203
@@ -242,7 +239,7 @@ object CatalystTypeConverters {
242
239
override def toScalaImpl (row : Row , column : Int ): Row = toScala(row(column).asInstanceOf [Row ])
243
240
}
244
241
245
- private object StringConverter extends CatalystTypeConverter [Any , Any ] {
242
+ private object StringConverter extends CatalystTypeConverter [Any , String , Any ] {
246
243
override def toCatalystImpl (scalaValue : Any ): UTF8String = scalaValue match {
247
244
case str : String => UTF8String (str)
248
245
case utf8 : UTF8String => utf8
@@ -255,14 +252,14 @@ object CatalystTypeConverters {
255
252
override def toScalaImpl (row : Row , column : Int ): String = row(column).toString
256
253
}
257
254
258
- private object DateConverter extends CatalystTypeConverter [Date , Any ] {
255
+ private object DateConverter extends CatalystTypeConverter [Date , Date , Any ] {
259
256
override def toCatalystImpl (scalaValue : Date ): Int = DateUtils .fromJavaDate(scalaValue)
260
257
override def toScala (catalystValue : Any ): Date =
261
258
if (catalystValue == null ) null else DateUtils .toJavaDate(catalystValue.asInstanceOf [Int ])
262
259
override def toScalaImpl (row : Row , column : Int ): Date = toScala(row.getInt(column))
263
260
}
264
261
265
- private object BigDecimalConverter extends CatalystTypeConverter [Any , Decimal ] {
262
+ private object BigDecimalConverter extends CatalystTypeConverter [Any , JavaBigDecimal , Decimal ] {
266
263
override def toCatalystImpl (scalaValue : Any ): Decimal = scalaValue match {
267
264
case d : BigDecimal => Decimal (d)
268
265
case d : JavaBigDecimal => Decimal (d)
@@ -275,32 +272,46 @@ object CatalystTypeConverters {
275
272
}
276
273
}
277
274
278
- private object BooleanConverter extends PrimitiveCatalystTypeConverter [ Boolean ] {
275
+ private object BooleanConverter extends CatalystTypeConverter [ Boolean , Boolean , Boolean ] {
279
276
override def toScalaImpl (row : Row , column : Int ): Boolean = row.getBoolean(column)
277
+ override def toScala (catalystValue : Boolean ): Boolean = catalystValue
278
+ override protected def toCatalystImpl (scalaValue : Boolean ): Boolean = scalaValue
280
279
}
281
280
282
- private object ByteConverter extends PrimitiveCatalystTypeConverter [ Byte ] {
281
+ private object ByteConverter extends CatalystTypeConverter [ Byte , Byte , Byte ] {
283
282
override def toScalaImpl (row : Row , column : Int ): Byte = row.getByte(column)
283
+ override def toScala (catalystValue : Byte ): Byte = catalystValue
284
+ override protected def toCatalystImpl (scalaValue : Byte ): Byte = scalaValue
284
285
}
285
286
286
- private object ShortConverter extends PrimitiveCatalystTypeConverter [ Short ] {
287
+ private object ShortConverter extends CatalystTypeConverter [ Short , Short , Short ] {
287
288
override def toScalaImpl (row : Row , column : Int ): Short = row.getShort(column)
289
+ override def toScala (catalystValue : Short ): Short = catalystValue
290
+ override protected def toCatalystImpl (scalaValue : Short ): Short = scalaValue
288
291
}
289
292
290
- private object IntConverter extends PrimitiveCatalystTypeConverter [ Int ] {
293
+ private object IntConverter extends CatalystTypeConverter [ Int , Int , Int ] {
291
294
override def toScalaImpl (row : Row , column : Int ): Int = row.getInt(column)
295
+ override def toScala (catalystValue : Int ): Int = catalystValue
296
+ override protected def toCatalystImpl (scalaValue : Int ): Int = scalaValue
292
297
}
293
298
294
- private object LongConverter extends PrimitiveCatalystTypeConverter [ Long ] {
299
+ private object LongConverter extends CatalystTypeConverter [ Long , Long , Long ] {
295
300
override def toScalaImpl (row : Row , column : Int ): Long = row.getLong(column)
301
+ override def toScala (catalystValue : Long ): Long = catalystValue
302
+ override protected def toCatalystImpl (scalaValue : Long ): Long = scalaValue
296
303
}
297
304
298
- private object FloatConverter extends PrimitiveCatalystTypeConverter [ Float ] {
305
+ private object FloatConverter extends CatalystTypeConverter [ Float , Float , Float ] {
299
306
override def toScalaImpl (row : Row , column : Int ): Float = row.getFloat(column)
307
+ override def toScala (catalystValue : Float ): Float = catalystValue
308
+ override protected def toCatalystImpl (scalaValue : Float ): Float = scalaValue
300
309
}
301
310
302
- private object DoubleConverter extends PrimitiveCatalystTypeConverter [ Double ] {
311
+ private object DoubleConverter extends CatalystTypeConverter [ Double , Double , Double ] {
303
312
override def toScalaImpl (row : Row , column : Int ): Double = row.getDouble(column)
313
+ override def toScala (catalystValue : Double ): Double = catalystValue
314
+ override protected def toCatalystImpl (scalaValue : Double ): Double = scalaValue
304
315
}
305
316
306
317
/**
0 commit comments