Skip to content

Commit 530efe3

Browse files
committed
[SPARK-7911] [MLLIB] A workaround for VectorUDT serialize (or deserialize) being called multiple times
~~A PythonUDT shouldn't be serialized into external Scala types in PythonRDD. I'm not sure whether this should fix one of the bugs related to SQL UDT/UDF in PySpark.~~ The fix above didn't work. So I added a workaround for this. If a Python UDF is applied to a Python UDT. This will put the Python SQL types as inputs. Still incorrect, but at least it doesn't throw exceptions on the Scala side. davies harsha2010 Author: Xiangrui Meng <meng@databricks.com> Closes apache#6442 from mengxr/SPARK-7903 and squashes the following commits: c257d2a [Xiangrui Meng] add a workaround for VectorUDT
1 parent 000df2f commit 530efe3

File tree

1 file changed

+14
-5
lines changed

1 file changed

+14
-5
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,27 +176,31 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
176176
}
177177

178178
override def serialize(obj: Any): Row = {
179-
val row = new GenericMutableRow(4)
180179
obj match {
181180
case SparseVector(size, indices, values) =>
181+
val row = new GenericMutableRow(4)
182182
row.setByte(0, 0)
183183
row.setInt(1, size)
184184
row.update(2, indices.toSeq)
185185
row.update(3, values.toSeq)
186+
row
186187
case DenseVector(values) =>
188+
val row = new GenericMutableRow(4)
187189
row.setByte(0, 1)
188190
row.setNullAt(1)
189191
row.setNullAt(2)
190192
row.update(3, values.toSeq)
193+
row
194+
// TODO: There are bugs in UDT serialization because we don't have a clear separation between
195+
// TODO: internal SQL types and language specific types (including UDT). UDT serialize and
196+
// TODO: deserialize may get called twice. See SPARK-7186.
197+
case row: Row =>
198+
row
191199
}
192-
row
193200
}
194201

195202
override def deserialize(datum: Any): Vector = {
196203
datum match {
197-
// TODO: something wrong with UDT serialization
198-
case v: Vector =>
199-
v
200204
case row: Row =>
201205
require(row.length == 4,
202206
s"VectorUDT.deserialize given row with length ${row.length} but requires length == 4")
@@ -211,6 +215,11 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
211215
val values = row.getAs[Iterable[Double]](3).toArray
212216
new DenseVector(values)
213217
}
218+
// TODO: There are bugs in UDT serialization because we don't have a clear separation between
219+
// TODO: internal SQL types and language specific types (including UDT). UDT serialize and
220+
// TODO: deserialize may get called twice. See SPARK-7186.
221+
case v: Vector =>
222+
v
214223
}
215224
}
216225

0 commit comments

Comments
 (0)