Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ object AuronConverters extends Logging {
extends LeafExecNode
with NativeSupports {

private def nativeSchema = Util.getNativeSchema(output)
private lazy val nativeSchema = Util.getNativeSchema(output)

// check whether native converting is supported
nativeSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ abstract class NativeAggBase(
case SortAgg => pb.AggExecMode.SORT_AGG
}

private def nativeAggrs = nativeAggrInfos.flatMap(_.nativeAggrs)
private lazy val nativeAggrs = nativeAggrInfos.flatMap(_.nativeAggrs)

private def nativeGroupingExprs = groupingExpressions.map(NativeConverters.convertExpr(_))
private lazy val nativeGroupingExprs = groupingExpressions.map(NativeConverters.convertExpr(_))

private def nativeGroupingNames = groupingExpressions.map(Util.getFieldNameByExprId)
private lazy val nativeGroupingNames = groupingExpressions.map(Util.getFieldNameByExprId)

private def nativeAggrNames = nativeAggrInfos.map(_.outputAttr).map(_.name)
private lazy val nativeAggrNames = nativeAggrInfos.map(_.outputAttr).map(_.name)

private def nativeAggrModes = nativeAggrInfos.map(_.mode match {
private lazy val nativeAggrModes = nativeAggrInfos.map(_.mode match {
case Partial => pb.AggMode.PARTIAL
case PartialMerge => pb.AggMode.PARTIAL_MERGE
case Final => pb.AggMode.FINAL
Expand All @@ -138,8 +138,6 @@ abstract class NativeAggBase(
// check whether native converting is supported
nativeAggrs
nativeGroupingExprs
nativeGroupingNames
nativeAggrs
nativeAggrModes

override def output: Seq[Attribute] =
Expand All @@ -165,12 +163,6 @@ abstract class NativeAggBase(
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeExecMode = this.nativeExecMode
val nativeAggrNames = this.nativeAggrNames
val nativeGroupingNames = this.nativeGroupingNames
val nativeAggrModes = this.nativeAggrModes
val nativeAggrs = this.nativeAggrs
val nativeGroupingExprs = this.nativeGroupingExprs

new NativeRDD(
sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ abstract class NativeBroadcastJoinBase(
}
}

private def nativeSchema = Util.getNativeSchema(output)
private lazy val nativeSchema = Util.getNativeSchema(output)

private def nativeJoinOn = {
private lazy val nativeJoinOn = {
if (leftKeys.nonEmpty && rightKeys.nonEmpty) {
val rewrittenLeftKeys = rewriteKeyExprToLong(leftKeys)
val rewrittenRightKeys = rewriteKeyExprToLong(rightKeys)
Expand All @@ -108,9 +108,9 @@ abstract class NativeBroadcastJoinBase(
}
}

private def nativeJoinType = NativeConverters.convertJoinType(joinType)
private lazy val nativeJoinType = NativeConverters.convertJoinType(joinType)

private def nativeBroadcastSide = broadcastSide match {
private lazy val nativeBroadcastSide = broadcastSide match {
case BroadcastLeft => pb.JoinSide.LEFT_SIDE
case BroadcastRight => pb.JoinSide.RIGHT_SIDE
}
Expand All @@ -127,9 +127,6 @@ abstract class NativeBroadcastJoinBase(
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: rightRDD.metrics :: Nil)
val nativeSchema = this.nativeSchema
val nativeJoinType = this.nativeJoinType
val nativeJoinOn = this.nativeJoinOn

val (probedRDD, builtRDD) = broadcastSide match {
case BroadcastLeft => (rightRDD, leftRDD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ abstract class NativeExpandBase(
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
override def outputOrdering: Seq[SortOrder] = Nil

private def nativeSchema = Util.getNativeSchema(output)
private def nativeProjections = projections.map { projection =>
private lazy val nativeSchema = Util.getNativeSchema(output)
private lazy val nativeProjections = projections.map { projection =>
projection
.zip(Util.getSchema(output).fields.map(_.dataType))
.map(e => NativeConverters.convertExpr(Cast(e._1, e._2)))
Expand All @@ -73,8 +73,6 @@ abstract class NativeExpandBase(
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeSchema = this.nativeSchema
val nativeProjections = this.nativeProjections

new NativeRDD(
sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec)
// predicate pruning is buggy for decimal type, so we need to
// temporarily disable predicate pruning for decimal type
// see https://github.com/apache/auron/issues/1032
protected def nativePruningPredicateFilters: Seq[pb.PhysicalExprNode] =
protected lazy val nativePruningPredicateFilters: Seq[pb.PhysicalExprNode] =
basedFileScan.dataFilters
.filter(expr => expr.find(_.dataType.isInstanceOf[DecimalType]).isEmpty)
.map(expr => NativeConverters.convertScanPruningExpr(expr))

protected def nativeFileSchema: pb.Schema =
protected lazy val nativeFileSchema: pb.Schema =
NativeConverters.convertSchema(StructType(basedFileScan.relation.dataSchema.map {
case field if basedFileScan.requiredSchema.exists(_.name == field.name) =>
field.copy(nullable = true)
Expand All @@ -88,7 +88,7 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec)
StructField(field.name, NullType, nullable = true)
}))

protected def nativePartitionSchema: pb.Schema =
protected lazy val nativePartitionSchema: pb.Schema =
NativeConverters.convertSchema(partitionSchema)

protected def nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) => {
Expand Down Expand Up @@ -123,7 +123,6 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec)
nativePruningPredicateFilters
nativeFileSchema
nativePartitionSchema
nativeFileGroups

protected def putJniBridgeResource(
resourceId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ abstract class NativeFilterBase(condition: Expression, override val child: Spark
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

private def nativeFilterExprs = {
private lazy val nativeFilterExprs = {
val splittedExprs = ArrayBuffer[PhysicalExprNode]()

// do not split simple IsNotNull(col) exprs
Expand Down Expand Up @@ -90,7 +90,6 @@ abstract class NativeFilterBase(condition: Expression, override val child: Spark
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeFilterExprs = this.nativeFilterExprs
new NativeRDD(
sparkContext,
nativeMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ abstract class NativeGenerateBase(
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = Nil

private def nativeGenerator = generator match {
private lazy val nativeGenerator = generator match {
case Explode(child) =>
pb.Generator
.newBuilder()
Expand Down Expand Up @@ -117,10 +117,10 @@ abstract class NativeGenerateBase(
.build()
}

private def nativeGeneratorOutput =
private lazy val nativeGeneratorOutput =
Util.getSchema(generatorOutput).map(NativeConverters.convertField)

private def nativeRequiredChildOutput =
private lazy val nativeRequiredChildOutput =
Util.getSchema(requiredChildOutput).map(_.name)

// check whether native converting is supported
Expand All @@ -131,9 +131,6 @@ abstract class NativeGenerateBase(
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeGenerator = this.nativeGenerator
val nativeGeneratorOutput = this.nativeGeneratorOutput
val nativeRequiredChildOutput = this.nativeRequiredChildOutput

new NativeRDD(
sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ abstract class NativeOrcScanBase(basedFileScan: FileSourceScanExec)
inputMetric.incRecordsRead(v)
case _ =>
}))
val nativePruningPredicateFilters = this.nativePruningPredicateFilters
val nativeFileSchema = this.nativeFileSchema
val nativeFileGroups = this.nativeFileGroups
val nativePartitionSchema = this.nativePartitionSchema
val projection = schema.map(field => basedFileScan.relation.schema.fieldIndex(field.name))
val broadcastedHadoopConf = this.broadcastedHadoopConf
val numPartitions = partitions.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ abstract class NativeParquetScanBase(basedFileScan: FileSourceScanExec)
inputMetric.incRecordsRead(v)
case _ =>
}))
val nativePruningPredicateFilters = this.nativePruningPredicateFilters
val nativeFileSchema = this.nativeFileSchema
val nativeFileGroups = this.nativeFileGroups
val nativePartitionSchema = this.nativePartitionSchema

val projection = schema.map(field => basedFileScan.relation.schema.fieldIndex(field.name))
val broadcastedHadoopConf = this.broadcastedHadoopConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,14 @@ abstract class NativeProjectBase(projectList: Seq[NamedExpression], override val
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

private def nativeProject = getNativeProjectBuilder(projectList).buildPartial()
private lazy val nativeProject = getNativeProjectBuilder(projectList).buildPartial()

// check whether native converting is supported
nativeProject

override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeProject = this.nativeProject

new NativeRDD(
sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ abstract class NativeShuffleExchangeBase(
metrics)
}

def nativeSchema: Schema = Util.getNativeSchema(child.output)
lazy val nativeSchema: Schema = Util.getNativeSchema(child.output)

private def nativeHashExprs = outputPartitioning match {
private lazy val nativeHashExprs = outputPartitioning match {
case HashPartitioning(expressions, _) =>
expressions.map(expr => NativeConverters.convertExpr(expr)).toList
case _ => null
}

private def nativeSortExecNode = outputPartitioning match {
private lazy val nativeSortExecNode = outputPartitioning match {
case RangePartitioning(expressions, _) =>
val nativeSortExprs = expressions.map { sortOrder =>
PhysicalExprNode
Expand Down Expand Up @@ -147,7 +147,6 @@ abstract class NativeShuffleExchangeBase(
(partition, taskContext) => {
val shuffleReadMetrics = taskContext.taskMetrics().createTempShuffleReadMetrics()
val metricReporter = new SQLShuffleReadMetricsReporter(shuffleReadMetrics, metrics)
val nativeSchema = this.nativeSchema

// store fetch iterator in jni resource before native compute
val jniResourceId = s"NativeShuffleReadExec:${UUID.randomUUID().toString}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ abstract class NativeShuffledHashJoinBase(
"input_row_count"))
.toSeq: _*)

private def nativeSchema = Util.getNativeSchema(output)
private lazy val nativeSchema = Util.getNativeSchema(output)

private def nativeJoinOn = {
private lazy val nativeJoinOn = {
val rewrittenLeftKeys = rewriteKeyExprToLong(leftKeys)
val rewrittenRightKeys = rewriteKeyExprToLong(rightKeys)
rewrittenLeftKeys.zip(rewrittenRightKeys).map { case (leftKey, rightKey) =>
Expand All @@ -76,9 +76,9 @@ abstract class NativeShuffledHashJoinBase(
}
}

private def nativeJoinType = NativeConverters.convertJoinType(joinType)
private lazy val nativeJoinType = NativeConverters.convertJoinType(joinType)

private def nativeBuildSide = buildSide match {
private lazy val nativeBuildSide = buildSide match {
case BuildLeft => pb.JoinSide.LEFT_SIDE
case BuildRight => pb.JoinSide.RIGHT_SIDE
}
Expand All @@ -95,9 +95,6 @@ abstract class NativeShuffledHashJoinBase(
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: rightRDD.metrics :: Nil)
val nativeJoinOn = this.nativeJoinOn
val nativeJoinType = this.nativeJoinType
val nativeBuildSide = this.nativeBuildSide

val (partitions, partitioner) = if (joinType != RightOuter) {
(leftRDD.partitions, leftRDD.partitioner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ abstract class NativeSortBase(
UnspecifiedDistribution :: Nil
}

private def nativeSortExprs = sortOrder.map { sortOrder =>
private lazy val nativeSortExprs = sortOrder.map { sortOrder =>
PhysicalExprNode
.newBuilder()
.setSort(
Expand All @@ -96,7 +96,6 @@ abstract class NativeSortBase(
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeSortExprs = this.nativeSortExprs

new NativeRDD(
sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ abstract class NativeSortMergeJoinBase(
keys.map(SortOrder(_, Ascending))
}

private def nativeSchema = Util.getNativeSchema(output)
private lazy val nativeSchema = Util.getNativeSchema(output)

private def nativeJoinOn = leftKeys.zip(rightKeys).map { case (leftKey, rightKey) =>
private lazy val nativeJoinOn = leftKeys.zip(rightKeys).map { case (leftKey, rightKey) =>
val leftKeyExpr = NativeConverters.convertExpr(leftKey)
val rightKeyExpr = NativeConverters.convertExpr(rightKey)
JoinOn
Expand All @@ -85,15 +85,15 @@ abstract class NativeSortMergeJoinBase(
.build()
}

private def nativeSortOptions = nativeJoinOn.map(_ => {
private lazy val nativeSortOptions = nativeJoinOn.map(_ => {
SortOptions
.newBuilder()
.setAsc(true)
.setNullsFirst(true)
.build()
})

private def nativeJoinType = NativeConverters.convertJoinType(joinType)
private lazy val nativeJoinType = NativeConverters.convertJoinType(joinType)

// check whether native converting is supported
nativeSchema
Expand All @@ -105,9 +105,6 @@ abstract class NativeSortMergeJoinBase(
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: rightRDD.metrics :: Nil)
val nativeSortOptions = this.nativeSortOptions
val nativeJoinOn = this.nativeJoinOn
val nativeJoinType = this.nativeJoinType

val (partitions, partitioner) = if (joinType != RightOuter) {
(leftRDD.partitions, leftRDD.partitioner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ abstract class NativeTakeOrderedBase(
override def outputPartitioning: Partitioning = SinglePartition
override def outputOrdering: Seq[SortOrder] = sortOrder

private def nativeSortExprs = sortOrder.map { sortOrder =>
private lazy val nativeSortExprs = sortOrder.map { sortOrder =>
PhysicalExprNode
.newBuilder()
.setSort(
Expand Down Expand Up @@ -125,7 +125,6 @@ abstract class NativeTakeOrderedBase(
// merge top-K from every children partitions into a single partition
val shuffled = Shims.get.createNativeShuffleExchangeExec(SinglePartition, partial)
val shuffledRDD = NativeHelper.executeNative(shuffled)
val nativeSortExprs = this.nativeSortExprs

// take top-K from the final partition
new NativeRDD(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ abstract class NativeWindowBase(
override def requiredChildOrdering: Seq[Seq[SortOrder]] =
Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)

private def nativeWindowExprs = windowExpression.map { named =>
private lazy val nativeWindowExprs = windowExpression.map { named =>
val field = NativeConverters.convertField(Util.getSchema(named :: Nil).fields(0))
val windowExprBuilder = pb.WindowExprNode.newBuilder().setField(field)
windowExprBuilder.setReturnType(NativeConverters.convertDataType(named.dataType))
Expand Down Expand Up @@ -167,11 +167,11 @@ abstract class NativeWindowBase(
windowExprBuilder.build()
}

private def nativePartitionSpecExprs = partitionSpec.map { partition =>
private lazy val nativePartitionSpecExprs = partitionSpec.map { partition =>
NativeConverters.convertExpr(partition)
}

private def nativeOrderSpecExprs = orderSpec.map { sortOrder =>
private lazy val nativeOrderSpecExprs = orderSpec.map { sortOrder =>
pb.PhysicalExprNode
.newBuilder()
.setSort(
Expand All @@ -192,9 +192,6 @@ abstract class NativeWindowBase(
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeWindowExprs = this.nativeWindowExprs
val nativeOrderSpecExprs = this.nativeOrderSpecExprs
val nativePartitionSpecExprs = this.nativePartitionSpecExprs

new NativeRDD(
sparkContext,
Expand Down
Loading
Loading