Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/main/scala/za/co/absa/abris/avro/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package za.co.absa.abris.avro
import org.apache.spark.sql.Column
import za.co.absa.abris.avro.sql.{AvroDataToCatalyst, CatalystDataToAvro}
import za.co.absa.abris.config.{AbrisConfig, FromAvroConfig, ToAvroConfig}
import za.co.absa.abris.utils.SparkColumnCompat.{col2expr, expr2col}


// scalastyle:off: object.name
Expand All @@ -35,8 +36,8 @@ object functions {
def to_avro(column: Column, config: ToAvroConfig): Column = {
config.validate()

new Column(CatalystDataToAvro(
column.expr,
expr2col(CatalystDataToAvro(
col2expr(column),
config.abrisConfig()
))
}
Expand Down Expand Up @@ -64,8 +65,8 @@ object functions {
def from_avro(column: Column, config: FromAvroConfig): Column = {
config.validate()

new Column(AvroDataToCatalyst(
column.expr,
expr2col(AvroDataToCatalyst(
col2expr(column),
config.abrisConfig(),
config.schemaRegistryConf()
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.functions.struct
import org.apache.spark.sql.types.StructType

import java.nio.charset.Charset
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -80,10 +80,8 @@ object AvroSchemaUtils {
recordName: String,
nameSpace: String
): Schema = {
val allColumns = struct(columnNames.map(dataFrame.col): _*)
val expression = allColumns.expr

SchemaConverters.toAvroType(expression.dataType, expression.nullable, recordName, nameSpace)
val structType = StructType(columnNames.map(dataFrame.schema(_)))
SchemaConverters.toAvroType(structType, nullable = false, recordName, nameSpace)
}

def toAvroSchema(
Expand Down
77 changes: 77 additions & 0 deletions src/main/scala/za/co/absa/abris/utils/SparkColumnCompat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.abris.utils

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.Expression

/**
* Compatibility layer for Column <-> Expression conversions across Spark versions.
*
* In Spark 3.x, Column has a direct `.expr` property and a constructor taking an Expression.
* In Spark 4.x, Column wraps a ColumnNode AST and conversions go through
* `org.apache.spark.sql.classic.ColumnConversions` and `ExpressionUtils`.
*
* This object uses reflection to support both versions from a single codebase,
* following the same pattern used in AbrisAvroDeserializer.
*/
object SparkColumnCompat {

/** Convert a Column to a Catalyst Expression. Replaces `column.expr`. */
lazy val col2expr: Column => Expression = {
// Try Spark 3.x first: Column.expr is a direct method
val spark3Method = try {
Some(classOf[Column].getMethod("expr"))
} catch {
case _: NoSuchMethodException => None
}

spark3Method match {
case Some(method) =>
(column: Column) => method.invoke(column).asInstanceOf[Expression]

case None =>
// Spark 4.x: use org.apache.spark.sql.classic.ColumnConversions.expression(column)
val clazz = Class.forName("org.apache.spark.sql.classic.ColumnConversions$")
val instance = clazz.getField("MODULE$").get(null)
val method = clazz.getMethod("expression", classOf[Column])
(column: Column) => method.invoke(instance, column).asInstanceOf[Expression]
}
}

/** Convert a Catalyst Expression to a Column. Replaces `new Column(expr)`. */
lazy val expr2col: Expression => Column = {
// Try Spark 3.x first: new Column(Expression)
val spark3Ctor = try {
Some(classOf[Column].getConstructor(classOf[Expression]))
} catch {
case _: NoSuchMethodException => None
}

spark3Ctor match {
case Some(ctor) =>
(expr: Expression) => ctor.newInstance(expr)

case None =>
// Spark 4.x: use ExpressionUtils.column(expr)
val clazz = Class.forName("org.apache.spark.sql.classic.ExpressionUtils$")
val instance = clazz.getField("MODULE$").get(null)
val method = clazz.getMethod("column", classOf[Expression])
(expr: Expression) => method.invoke(instance, expr).asInstanceOf[Column]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import za.co.absa.abris.avro.functions._
import za.co.absa.abris.avro.utils.AvroSchemaEncoder
import za.co.absa.abris.config.{AbrisConfig, FromAvroConfig}
import za.co.absa.abris.examples.data.generation.TestSchemas
import za.co.absa.abris.utils.SparkColumnCompat.col2expr

import java.util.Collections
import java.nio.ByteBuffer
Expand Down Expand Up @@ -66,7 +67,7 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft
))

val column = from_avro(col("avroBytes"), fromAvroConfig)
column.expr.toString() should not include sensitiveData
col2expr(column).toString() should not include sensitiveData
}

it should "use the default schema converter by default" in {
Expand All @@ -84,7 +85,7 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft
))

val column = from_avro(col("avroBytes"), fromAvroConfig)
column.expr.dataType shouldBe expectedDataType
col2expr(column).dataType shouldBe expectedDataType
}

it should "use a custom schema converter identified by the short name" in {
Expand All @@ -99,7 +100,7 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft
.withSchemaConverter(DummySchemaConverter.name)

val column = from_avro(col("avroBytes"), fromAvroConfig)
column.expr.dataType shouldBe DummySchemaConverter.dataType
col2expr(column).dataType shouldBe DummySchemaConverter.dataType
}

it should "use a custom schema converter identified by the fully qualified name" in {
Expand All @@ -114,7 +115,7 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft
.withSchemaConverter("za.co.absa.abris.avro.sql.DummySchemaConverter")

val column = from_avro(col("avroBytes"), fromAvroConfig)
column.expr.dataType shouldBe DummySchemaConverter.dataType
col2expr(column).dataType shouldBe DummySchemaConverter.dataType
}

it should "throw an error if the specified custom schema converter does not exist" in {
Expand All @@ -128,14 +129,14 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft
))
.withSchemaConverter("nonexistent")

val ex = intercept[ClassNotFoundException](from_avro(col("avroBytes"), fromAvroConfig).expr.dataType)
val ex = intercept[ClassNotFoundException](col2expr(from_avro(col("avroBytes"), fromAvroConfig)).dataType)
ex.getMessage should include ("nonexistent")
}

it should "be serializable" in {
val schemaString = TestSchemas.NATIVE_SIMPLE_NESTED_SCHEMA
val config = FromAvroConfig().withReaderSchema(schemaString)
val avroDataToCatalyst = from_avro(col("col"), config).expr
val avroDataToCatalyst = col2expr(from_avro(col("col"), config))

val javaSerializer = new JavaSerializer(new SparkConf())
javaSerializer.newInstance().serialize(avroDataToCatalyst)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.abris.avro.functions._
import za.co.absa.abris.config.ToAvroConfig
import za.co.absa.abris.utils.SparkColumnCompat.col2expr

class CatalystDataToAvroSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach {
it should "be serializable" in {
Expand All @@ -36,7 +37,7 @@ class CatalystDataToAvroSpec extends AnyFlatSpec with Matchers with BeforeAndAft
.endRecord()
.toString
val config = ToAvroConfig().withSchema(schema)
val catalystDataToAvro = to_avro(col("col"), config).expr
val catalystDataToAvro = col2expr(to_avro(col("col"), config))

val javaSerializer = new JavaSerializer(new SparkConf())
javaSerializer.newInstance().serialize(catalystDataToAvro)
Expand Down