Skip to content

[SQL][minor] use stricter type parameter to make it clear that parquet reader returns UnsafeRow #14458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,11 @@ private[sql] class ParquetFileFormat
logDebug(s"Falling back to parquet-mr")
val reader = pushed match {
case Some(filter) =>
new ParquetRecordReader[InternalRow](
new ParquetRecordReader[UnsafeRow](
new ParquetReadSupport,
FilterCompat.get(filter, null))
case _ =>
new ParquetRecordReader[InternalRow](new ParquetReadSupport)
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport)
}
reader.initialize(split, hadoopAttemptContext)
reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import org.apache.parquet.schema._
import org.apache.parquet.schema.Type.Repetition

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.types._

/**
* A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst
* [[InternalRow]]s.
* [[UnsafeRow]]s.
*
* The API interface of [[ReadSupport]] is a little bit over complicated because of historical
* reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be
Expand All @@ -48,7 +48,7 @@ import org.apache.spark.sql.types._
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with Logging {
private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Logging {
private var catalystRequestedSchema: StructType = _

/**
Expand All @@ -72,13 +72,13 @@ private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with
/**
* Called on executor side after [[init()]], before instantiating actual Parquet record readers.
* Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
* records to Catalyst [[InternalRow]]s.
* records to Catalyst [[UnsafeRow]]s.
*/
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = {
readContext: ReadContext): RecordMaterializer[UnsafeRow] = {
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
val parquetRequestedSchema = readContext.getRequestedSchema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -32,12 +32,12 @@ import org.apache.spark.sql.types.StructType
*/
private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
extends RecordMaterializer[InternalRow] {
extends RecordMaterializer[UnsafeRow] {

private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater)

override def getCurrentRecord: InternalRow = rootConverter.currentRecord
override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord

override def getRootConverter: GroupConverter = rootConverter
}