Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43646][CONNECT][TESTS] Make both SBT and Maven use spark-proto uber jar to test the connect module #42236

Closed
wants to merge 15 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -3232,14 +3232,15 @@ class PlanGenerationTestSuite
"connect/common/src/test/resources/protobuf-tests/common.desc"

test("from_protobuf messageClassName") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we deem this plan too intricate, we could delete or ignore test cases from_protobuf messageClassName and from_protobuf messageClassName options to temporarily abandon this testing scenario

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! Thank you.

binary.select(pbFn.from_protobuf(fn.col("bytes"), classOf[StorageLevel].getName))
binary.select(
pbFn.from_protobuf(fn.col("bytes"), "org.apache.spark.sql.protobuf.protos.TestProtoObj"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to use this sql function, users have no choice but to relocate the content of the Java PB description files used in their business according to Spark's project rules(replcace all com.google.protobuf.
toorg.sparkproject.spark_protobuf.protobuf.

Moving the discussion about "why do users need to shade their java classes?" here in a comment thread. cc: @LuciferYang, @advancedxy, @HyukjinKwon.

There is no other option. Spark at runtime includes unshaded Protobuf library (2.5.x) in its class path. This is was as of a few months back. Don't know if that changed. Its been this way for years. There is no way to support protobuf Java classes for current versions of Protobuf library without shading.

I agree it is not convenient for the customers. That is why descriptor file is documented as the primary API (both for Scala and python).
We will soon support Python classes that will help Python customers.
FWIW I have a small gitbuf project to help customers create shaded Java classes : https://github.com/rangadi/shaded-protobuf-classes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #41153, protobuf 2.5 is no longer in classpath :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi I'm sorry that I neglected the historical context of adding these functions. Spark 3.5 no longer depends on Protobuf 2.5, and these functions are currently marked as @Experimental. In this situation, is it possible for us to make some changes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That requires removing shading of protobuf classes. It needs more discussion. In general, including non-shaded jars is problematic (that is how original protobuf 2.5 version jar remained unchanged for many years).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no other option. Spark at runtime includes unshaded Protobuf library (2.5.x) in its class path.

Yeah, I noticed there's protobu-2.5 jar in the spark-3.4 client tar.

For Spark 3.5, do you think it's better to publishing both shading and unshaded version of spark-protobuf jar? If both published, end customers could choose which jar to use and we can documented it clearly.

}

test("from_protobuf messageClassName options") {
binary.select(
pbFn.from_protobuf(
fn.col("bytes"),
classOf[StorageLevel].getName,
"org.apache.spark.sql.protobuf.protos.TestProtoObj",
Map("recursive.fields.max.depth" -> "2").asJava))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [from_protobuf(bytes#0, org.apache.spark.connect.proto.StorageLevel, None) AS from_protobuf(bytes)#0]
Project [from_protobuf(bytes#0, org.apache.spark.sql.protobuf.protos.TestProtoObj, None) AS from_protobuf(bytes)#0]
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [from_protobuf(bytes#0, org.apache.spark.connect.proto.StorageLevel, None, (recursive.fields.max.depth,2)) AS from_protobuf(bytes)#0]
Project [from_protobuf(bytes#0, org.apache.spark.sql.protobuf.protos.TestProtoObj, None, (recursive.fields.max.depth,2)) AS from_protobuf(bytes)#0]
+- LocalRelation <empty>, [id#0L, bytes#0]
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
}
}, {
"literal": {
"string": "org.apache.spark.connect.proto.StorageLevel"
"string": "org.apache.spark.sql.protobuf.protos.TestProtoObj"
}
}]
}
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
}
}, {
"literal": {
"string": "org.apache.spark.connect.proto.StorageLevel"
"string": "org.apache.spark.sql.protobuf.protos.TestProtoObj"
}
}, {
"unresolvedFunction": {
Expand Down
Binary file not shown.
88 changes: 88 additions & 0 deletions connector/connect/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
Expand Down Expand Up @@ -403,6 +410,87 @@
</transformers>
</configuration>
</plugin>
<!--
SPARK-43646: In order for `ProtoToParsedPlanTestSuite` to successfully test `from_protobuf_messageClassName`
and `from_protobuf_messageClassName_options`, `maven-antrun-plugin` is used to replace all
`com.google.protobuf.` with `org.sparkproject.spark_protobuf.protobuf.` in the Java files
generated by `protobuf-maven-plugin`.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>process-test-sources</phase>
<configuration>
<target>
<replace dir="${project.basedir}/target/generated-test-sources"
includes="**/*.java"
token="com.google.protobuf."
value="org.sparkproject.spark_protobuf.protobuf."/>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>default-protoc</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<!-- Add protobuf-maven-plugin and provide ScalaPB as a code generation plugin -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<protoTestSourceRoot>src/test/protobuf</protoTestSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>user-defined-protoc</id>
<properties>
<spark.protoc.executable.path>${env.SPARK_PROTOC_EXEC_PATH}</spark.protoc.executable.path>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocExecutable>${spark.protoc.executable.path}</protocExecutable>
<protoTestSourceRoot>src/test/protobuf</protoTestSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
27 changes: 27 additions & 0 deletions connector/connect/server/src/test/protobuf/test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

syntax = "proto3";
package org.apache.spark.sql.protobuf.protos;

option java_multiple_files = true;
option java_outer_classname = "TestProto";

message TestProtoObj {
int64 v1 = 1;
int32 v2 = 2;
}
55 changes: 53 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.io._
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.nio.file.{Files, StandardOpenOption}
import java.util.Locale

import scala.io.Source
Expand Down Expand Up @@ -765,7 +765,13 @@ object SparkConnectCommon {
object SparkConnect {
import BuildCommons.protoVersion

val rewriteJavaFile = TaskKey[Unit]("rewriteJavaFile",
"Rewrite the generated Java PB files.")
val genPBAndRewriteJavaFile = TaskKey[Unit]("genPBAndRewriteJavaFile",
"Generate Java PB files and overwrite their contents.")

lazy val settings = Seq(
PB.protocVersion := BuildCommons.protoVersion,
// For some reason the resolution from the imported Maven build does not work for some
// of these dependendencies that we need to shade later on.
libraryDependencies ++= {
Expand Down Expand Up @@ -796,6 +802,42 @@ object SparkConnect {
)
},

// SPARK-43646: The following 3 statements are used to make the connect module use the
// Spark-proto assembly jar when compiling and testing using SBT, which can be keep same
// behavior of Maven testing.
(Test / unmanagedJars) += (LocalProject("protobuf") / assembly).value,
(Test / fullClasspath) :=
(Test / fullClasspath).value.filterNot { f => f.toString.contains("spark-protobuf") },
(Test / fullClasspath) += (LocalProject("protobuf") / assembly).value,

(Test / PB.protoSources) += (Test / sourceDirectory).value / "resources" / "protobuf",

(Test / PB.targets) := Seq(
PB.gens.java -> target.value / "generated-test-sources",
),

// SPARK-43646: Create a custom task to replace all `com.google.protobuf.` with
// `org.sparkproject.spark_protobuf.protobuf.` in the generated Java PB files.
// This is to generate Java files that can be used to test spark-protobuf functions
// in `ProtoToParsedPlanTestSuite`.
rewriteJavaFile := {
val protobufDir = target.value / "generated-test-sources"/"org"/"apache"/"spark"/"sql"/"protobuf"/"protos"
protobufDir.listFiles().foreach { f =>
if (f.getName.endsWith(".java")) {
val contents = Files.readAllLines(f.toPath, UTF_8)
val replaced = contents.asScala.map { line =>
line.replaceAll("com.google.protobuf.", "org.sparkproject.spark_protobuf.protobuf.")
}
Files.write(f.toPath, replaced.asJava, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE)
}
}
},
// SPARK-43646: `genPBAndRewriteJavaFile` is used to specify the execution order of `PB.generate`
// and `rewriteJavaFile`, and makes `Test / compile` dependent on `genPBAndRewriteJavaFile`
// being executed first.
genPBAndRewriteJavaFile := Def.sequential(Test / PB.generate, rewriteJavaFile).value,
(Test / compile) := (Test / compile).dependsOn(genPBAndRewriteJavaFile).value,

(assembly / test) := { },

(assembly / logLevel) := Level.Info,
Expand Down Expand Up @@ -841,7 +883,16 @@ object SparkConnect {
case m if m.toLowerCase(Locale.ROOT).endsWith(".proto") => MergeStrategy.discard
case _ => MergeStrategy.first
}
)
) ++ {
val sparkProtocExecPath = sys.props.get("spark.protoc.executable.path")
if (sparkProtocExecPath.isDefined) {
Seq(
PB.protocExecutable := file(sparkProtocExecPath.get)
)
} else {
Seq.empty
}
}
}

object SparkConnectClient {
Expand Down