Skip to content

[SPARK-17608][SPARKR]:Long type has incorrect serialization/deserialization #17640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ readTypedObject <- function(con, type) {
"l" = readList(con),
"e" = readEnv(con),
"s" = readStruct(con),
"B" = readDouble(con),
"n" = NULL,
"j" = getJobj(readString(con)),
stop(paste("Unsupported type for deserialization", type)))
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ writeObject <- function(con, object, writeType = TRUE) {
Date = writeDate(con, object),
POSIXlt = writeTime(con, object),
POSIXct = writeTime(con, object),
bigint = writeDouble(con, object),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and another thing, there is no bigint in R
so I'm not sure how we would hit this path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When specifying schema with bigint, we will hit the bigint path. Without this change, it will thrown an error of type mismatch. But as you said, we can't specify bigint type in R console.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If R doesn't have bigint type, we should remove all bigint related logic. I don't know the history of bigint mapping in the Types.R file. Why should we have it since every big number is numeric (Double in the backend)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung Any thoughts? Thanks!

Copy link
Member

@felixcheung felixcheung Apr 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are referring to https://github.com/apache/spark/blob/master/R/pkg/R/types.R#L25
like it says,

names(PRIMITIVE_TYPES) are Scala types whereas values are equivalent R types.

so bigint there is Scala type, not R type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. But as you mentioned, we don't know how to trigger the write path on the R side, because both bigint and double are numeric. I think we can just remove the test in the R side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is different though, for PRIMITIVE_TYPES, it is used when you create a schema with structField in R. In this case you can definitely define a column as bigint and then pass a R numeric value to it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using createDataFrame, R uses serialize to send data to the backend. When taking an action, say, collect, scala side logic refers to the schema field and calls the readTypedObjects where the newly added read logic kicks in. When it returns back to R side, the newly added write logic kicks in and R side can interpret it due to the R side read logic. It seems that the write logic in R side is not called, because we don't have specific type bigint in R. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness purpose, I think we can keep the write logic in R side.

stop(paste("Unsupported type for serialization", type)))
}

Expand Down Expand Up @@ -157,6 +158,7 @@ writeType <- function(con, class) {
Date = "D",
POSIXlt = "t",
POSIXct = "t",
bigint = "B",
stop(paste("Unsupported type for serialization", class)))
writeBin(charToRaw(type), con)
}
Expand Down
12 changes: 11 additions & 1 deletion R/pkg/inst/tests/testthat/test_Serde.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ test_that("SerDe of primitive types", {
expect_equal(x, 1)
expect_equal(class(x), "numeric")

x <- callJStatic("SparkRHandler", "echo", 1380742793415240)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how to specify in R console to enforce bigint type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some google search. R can't specify bigint type. So, we can't directly test bigint type.

We can remove the tests above, as we added schema tests and scala API tests.

expect_equal(x, 1380742793415240)
expect_equal(class(x), "numeric")

x <- callJStatic("SparkRHandler", "echo", TRUE)
expect_true(x)
expect_equal(class(x), "logical")
Expand All @@ -43,6 +47,11 @@ test_that("SerDe of list of primitive types", {
expect_equal(x, y)
expect_equal(class(y[[1]]), "integer")

x <- list(1380742793415240, 13807427934152401, 13807427934152402)
y <- callJStatic("SparkRHandler", "echo", x)
expect_equal(x, y)
expect_equal(class(y[[1]]), "numeric")

x <- list(1, 2, 3)
y <- callJStatic("SparkRHandler", "echo", x)
expect_equal(x, y)
Expand All @@ -66,7 +75,8 @@ test_that("SerDe of list of primitive types", {

test_that("SerDe of list of lists", {
x <- list(list(1L, 2L, 3L), list(1, 2, 3),
list(TRUE, FALSE), list("a", "b", "c"))
list(TRUE, FALSE), list("a", "b", "c"),
list(1380742793415240, 1380742793415240))
y <- callJStatic("SparkRHandler", "echo", x)
expect_equal(x, y)

Expand Down
17 changes: 17 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -3188,6 +3188,23 @@ test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", {
expect_equal(dbs[[1]], "default")
})

test_that("dapply with bigint type", {
df <- createDataFrame(
list(list(1380742793415240, 1, "1"), list(1380742793415240, 2, "2"),
list(1380742793415240, 3, "3")), c("a", "b", "c"))
schema <- structType(structField("a", "bigint"), structField("b", "bigint"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one tests bigint

Copy link
Member

@felixcheung felixcheung Apr 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I'm not sure.
Walking through the code, createDataFrame calls parallelize which eventually calls R's serialize. Since these big values are actually numeric, and not integer, serialize writes them in that way

structField("c", "string"), structField("d", "bigint"))
df1 <- dapply(
df,
function(x) {
y <- x[x[1] > 1, ]
y <- cbind(y, y[1] + 1L)
},
schema)
result <- collect(df1)
expect_equal(result$a[1], 1380742793415240)
})

test_that("catalog APIs, listTables, listColumns, listFunctions", {
tb <- listTables()
count <- count(tables())
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private[spark] object SerDe {
case 'l' => readList(dis, jvmObjectTracker)
case 'D' => readDate(dis)
case 't' => readTime(dis)
case 'B' => new java.lang.Double(readDouble(dis))
case 'j' => jvmObjectTracker(JVMObjectId(readString(dis)))
case _ =>
if (sqlReadObject == null) {
Expand Down Expand Up @@ -198,6 +199,7 @@ private[spark] object SerDe {
case 'b' => readBooleanArr(dis)
case 'j' => readStringArr(dis).map(x => jvmObjectTracker(JVMObjectId(x)))
case 'r' => readBytesArr(dis)
case 'B' => readDoubleArr(dis)
case 'a' =>
val len = readInt(dis)
(0 until len).map(_ => readArray(dis, jvmObjectTracker)).toArray
Expand Down Expand Up @@ -278,6 +280,7 @@ private[spark] object SerDe {
case "list" => dos.writeByte('l')
case "map" => dos.writeByte('e')
case "jobj" => dos.writeByte('j')
case "bigint" => dos.writeByte('B')
case _ => throw new IllegalArgumentException(s"Invalid type $typeStr")
}
}
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/org/apache/spark/api/r/RBackendSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.api.r

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}

import org.apache.spark.SparkFunSuite

class RBackendSuite extends SparkFunSuite {
Expand All @@ -28,4 +30,21 @@ class RBackendSuite extends SparkFunSuite {
assert(tracker.get(id) === None)
assert(tracker.size === 0)
}

test("read and write bigint in the buffer") {
val bos = new ByteArrayOutputStream()
val dos = new DataOutputStream(bos)
val tracker = new JVMObjectTracker
SerDe.writeObject(dos, 1380742793415240L.asInstanceOf[Object],
tracker)
val buf = bos.toByteArray
val bis = new ByteArrayInputStream(buf)
val dis = new DataInputStream(bis)
val data = SerDe.readObject(dis, tracker)
assert(data.asInstanceOf[Double] === 1380742793415240L)
bos.close()
bis.close()
dos.close()
dis.close()
}
}