Skip to content

[SPARK-17925][SQL] Break fileSourceInterfaces.scala into multiple pieces #15473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,152 +20,15 @@ package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

/**
* ::Experimental::
* A factory that produces [[OutputWriter]]s. A new [[OutputWriterFactory]] is created on driver
* side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized
* to executor side to create actual [[OutputWriter]]s on the fly.
*
* @since 1.4.0
*/
@Experimental
abstract class OutputWriterFactory extends Serializable {
/**
* When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side
* to instantiate new [[OutputWriter]]s.
*
* @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that
* this may not point to the final output file. For example, `FileOutputFormat` writes to
* temporary directories and then merge written files back to the final destination. In
* this case, `path` points to a temporary output file under the temporary directory.
* @param dataSchema Schema of the rows to be written. Partition columns are not included in the
* schema if the relation being written is partitioned.
* @param context The Hadoop MapReduce task context.
* @since 1.4.0
*/
def newInstance(
path: String,
bucketId: Option[Int], // TODO: This doesn't belong here...
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter

/**
* Returns a new instance of [[OutputWriter]] that will write data to the given path.
* This method gets called by each task on executor to write [[InternalRow]]s to
* format-specific files. Compared to the other `newInstance()`, this is a newer API that
* passes only the path that the writer must write to. The writer must write to the exact path
* and not modify it (do not add subdirectories, extensions, etc.). All other
* file-format-specific information needed to create the writer must be passed
* through the [[OutputWriterFactory]] implementation.
* @since 2.0.0
*/
def newWriter(path: String): OutputWriter = {
throw new UnsupportedOperationException("newInstance with just path not supported")
}
}

/**
* ::Experimental::
* [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the
* underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
* An [[OutputWriter]] instance is created and initialized when a new output file is opened on
* executor side. This instance is used to persist rows to this single output file.
*
* @since 1.4.0
*/
@Experimental
abstract class OutputWriter {
/**
* Persists a single row. Invoked on the executor side. When writing to dynamically partitioned
* tables, dynamic partition columns are not included in rows to be written.
*
* @since 1.4.0
*/
def write(row: Row): Unit

/**
* Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before
* the task output is committed.
*
* @since 1.4.0
*/
def close(): Unit

private var converter: InternalRow => Row = _

protected[sql] def initConverter(dataSchema: StructType) = {
converter =
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
}

protected[sql] def writeInternal(row: InternalRow): Unit = {
write(converter(row))
}
}

/**
* Acts as a container for all of the metadata required to read from a datasource. All discovery,
* resolution and merging logic for schemas and partitions has been removed.
*
* @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
* this relation.
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
* present in the actual data files as well, they are preserved.
* @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values).
* @param fileFormat A file format that can be used to read and write the data in files.
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
location: FileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
options: Map[String, String])(val sparkSession: SparkSession)
extends BaseRelation with FileRelation {

override def sqlContext: SQLContext = sparkSession.sqlContext

val schema: StructType = {
val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
StructType(dataSchema ++ partitionSchema.filterNot { column =>
dataSchemaColumnNames.contains(column.name.toLowerCase)
})
}

def partitionSchemaOption: Option[StructType] =
if (partitionSchema.isEmpty) None else Some(partitionSchema)
def partitionSpec: PartitionSpec = location.partitionSpec()

def refresh(): Unit = location.refresh()

override def toString: String = {
fileFormat match {
case source: DataSourceRegister => source.shortName()
case _ => "HadoopFiles"
}
}

/** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[String] =
location.allFiles().map(_.getPath.toUri.toString).toArray

override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum
}

/**
* Used to read and write data stored in files to/from the [[InternalRow]] format.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
import org.apache.spark.sql.types.StructType


/**
* Acts as a container for all of the metadata required to read from a datasource. All discovery,
* resolution and merging logic for schemas and partitions has been removed.
*
* @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
* this relation.
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
* present in the actual data files as well, they are preserved.
* @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values).
* @param fileFormat A file format that can be used to read and write the data in files.
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
location: FileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
options: Map[String, String])(val sparkSession: SparkSession)
extends BaseRelation with FileRelation {

override def sqlContext: SQLContext = sparkSession.sqlContext

val schema: StructType = {
val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
StructType(dataSchema ++ partitionSchema.filterNot { column =>
dataSchemaColumnNames.contains(column.name.toLowerCase)
})
}

def partitionSchemaOption: Option[StructType] =
if (partitionSchema.isEmpty) None else Some(partitionSchema)

def partitionSpec: PartitionSpec = location.partitionSpec()

def refresh(): Unit = location.refresh()

override def toString: String = {
fileFormat match {
case source: DataSourceRegister => source.shortName()
case _ => "HadoopFiles"
}
}

/** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[String] =
location.allFiles().map(_.getPath.toUri.toString).toArray

override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.types.StructType


/**
* A factory that produces [[OutputWriter]]s. A new [[OutputWriterFactory]] is created on driver
* side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized
* to executor side to create actual [[OutputWriter]]s on the fly.
*/
abstract class OutputWriterFactory extends Serializable {
/**
* When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side
* to instantiate new [[OutputWriter]]s.
*
* @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that
* this may not point to the final output file. For example, `FileOutputFormat` writes to
* temporary directories and then merge written files back to the final destination. In
* this case, `path` points to a temporary output file under the temporary directory.
* @param dataSchema Schema of the rows to be written. Partition columns are not included in the
* schema if the relation being written is partitioned.
* @param context The Hadoop MapReduce task context.
* @since 1.4.0
*/
def newInstance(
path: String,
bucketId: Option[Int], // TODO: This doesn't belong here...
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter

/**
* Returns a new instance of [[OutputWriter]] that will write data to the given path.
* This method gets called by each task on executor to write InternalRows to
* format-specific files. Compared to the other `newInstance()`, this is a newer API that
* passes only the path that the writer must write to. The writer must write to the exact path
* and not modify it (do not add subdirectories, extensions, etc.). All other
* file-format-specific information needed to create the writer must be passed
* through the [[OutputWriterFactory]] implementation.
* @since 2.0.0
*/
def newWriter(path: String): OutputWriter = {
throw new UnsupportedOperationException("newInstance with just path not supported")
}
}


/**
* [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the
* underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
* An [[OutputWriter]] instance is created and initialized when a new output file is opened on
* executor side. This instance is used to persist rows to this single output file.
*/
abstract class OutputWriter {
/**
* Persists a single row. Invoked on the executor side. When writing to dynamically partitioned
* tables, dynamic partition columns are not included in rows to be written.
*
* @since 1.4.0
*/
def write(row: Row): Unit

/**
* Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before
* the task output is committed.
*
* @since 1.4.0
*/
def close(): Unit

private var converter: InternalRow => Row = _

protected[sql] def initConverter(dataSchema: StructType) = {
converter =
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
}

protected[sql] def writeInternal(row: InternalRow): Unit = {
write(converter(row))
}
}
Loading