Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .baseline/scala/.scala212fmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

version = 3.9.7

align = none
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't checked but are these settings resulting in code style that it very close to the Apache Spark codebase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the spark config is here: https://github.com/apache/spark/blob/master/dev/.scalafmt.conf.

The differences:

# spark
maxColumn = 98
version = 3.8.6
# iceberg
docstrings.wrap = false  # suggestion from comments
maxColumn = 100
rewrite.rules = [Imports]

version = 3.9.7

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for learning, why do we deviate for [Imports]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the [Imports] to keep it aligned with Spark. We could add the following if needed:

// https://scalameta.org/scalafmt/docs/configuration.html#imports
rewrite.rules = [Imports]
rewrite.imports.sort = original

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewrite.rules

has been removed from https://github.com/apache/spark/blob/f5b9ea8103dd1f37c6f0cea0692d7bc5b50b778c/dev/.scalafmt.conf

the only difference now is version (3.8.6 vs 3.9.7) and maxColumn (98 vs 100) and docstrings.wrap

align.openParenDefnSite = false
align.openParenCallSite = false
align.tokens = []
importSelectors = "singleLine"
optIn = {
configStyleArguments = false
}
danglingParentheses.preset = false
docstrings.style = Asterisk
docstrings.wrap = false
maxColumn = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we can fully control this (and since we're deviating from Spark anyway) I think it's fine to set this to 120 or 140 even. Let's see what others think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have voted for 120.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i merged the pr early, sorry! im happy to help make the change

here's the devlist discussion thread eduard has started, https://lists.apache.org/thread/y645tppy9q21xm91qmd582jw0c3jh6hm

runner.dialect = scala212
32 changes: 32 additions & 0 deletions .baseline/scala/.scala213fmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

version = 3.9.7

align = none
align.openParenDefnSite = false
align.openParenCallSite = false
align.tokens = []
importSelectors = "singleLine"
optIn = {
configStyleArguments = false
}
danglingParentheses.preset = false
docstrings.style = Asterisk
docstrings.wrap = false
maxColumn = 100
runner.dialect = scala213
15 changes: 15 additions & 0 deletions baseline.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ subprojects {
removeUnusedImports()
licenseHeaderFile "$rootDir/.baseline/copyright/copyright-header-java.txt"
}

// Configure different scalafmt rules for specific Scala version
if (project.name.startsWith("iceberg-spark") && project.name.endsWith("2.13")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check iceberg-spark? It only affects the Scala code, right? Also, should we get the scalaVersion from the System.getProperty rather than from the project name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we get the scalaVersion from the System.getProperty rather than from the project name?

i think thats a good idea, we do it here already

cc @ConeyLiu

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll follow up on this these days

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submitted #14798

scala {
target 'src/**/*.scala'
scalafmt("3.9.7").configFile("$rootDir/.baseline/scala/.scala213fmt.conf")
licenseHeaderFile "$rootDir/.baseline/copyright/copyright-header-java.txt", "package"
}
} else if (project.name.startsWith("iceberg-spark") && project.name.endsWith("2.12")) {
scala {
target 'src/**/*.scala'
scalafmt("3.9.7").configFile("$rootDir/.baseline/scala/.scala212fmt.conf")
licenseHeaderFile "$rootDir/.baseline/copyright/copyright-header-java.txt", "package"
}
}
Comment on lines 66 to 79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Configure different scalafmt rules for specific Scala version
if (project.name.startsWith("iceberg-spark") && project.name.endsWith("2.13")) {
scala {
target 'src/main/scala/**/*.scala', 'src/test/scala/**/*.scala', 'src/testFixtures/scala/**/*.scala'
scalafmt("3.9.7").configFile("$rootDir/.baseline/scala/.scala213fmt.conf")
licenseHeaderFile "$rootDir/.baseline/copyright/copyright-header-java.txt", "package"
}
} else if (project.name.startsWith("iceberg-spark") && project.name.endsWith("2.12")) {
scala {
target 'src/main/scala/**/*.scala', 'src/test/scala/**/*.scala', 'src/testFixtures/scala/**/*.scala'
scalafmt("3.9.7").configFile("$rootDir/.baseline/scala/.scala212fmt.conf")
licenseHeaderFile "$rootDir/.baseline/copyright/copyright-header-java.txt", "package"
}
}
// Configure different scalafmt rules for specific Scala version
if (project.name.startsWith("iceberg-spark") && project.name.endsWith("2.13")) {
scala {
target 'src/**/*.scala'
scalafmt("3.9.7").configFile("$rootDir/.baseline/scala/.scala213fmt.conf")
licenseHeaderFile "$rootDir/.baseline/copyright/copyright-header-java.txt", "package"
}
} else if (project.name.startsWith("iceberg-spark") && project.name.endsWith("2.12")) {
scala {
target 'src/**/*.scala'
scalafmt("3.9.7").configFile("$rootDir/.baseline/scala/.scala212fmt.conf")
licenseHeaderFile "$rootDir/.baseline/copyright/copyright-header-java.txt", "package"
}
}

nit capture all .scala files in case more folders are added in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kevinjqliu, updated

}
}

Expand Down
2 changes: 2 additions & 0 deletions site/docs/contribute.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ This project Iceberg also has modules for adding Iceberg support to processing e

Follow the instructions for [Eclipse](https://github.com/google/google-java-format#eclipse) or
[IntelliJ](https://github.com/google/google-java-format#intellij-android-studio-and-other-jetbrains-ides) to install the **google-java-format** plugin (note the required manual actions for IntelliJ).
Follow the [instructions](https://scalameta.org/scalafmt/docs/installation.html) to install **scalafmt** plugin
and configure it to point to the configuration file located under the directory `.baseline/scala/`.

## Semantic Versioning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.extensions

import org.apache.spark.sql.SparkSessionExtensions
Expand Down Expand Up @@ -57,8 +56,8 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectResolutionRule { _ => CheckMergeIntoTableConditions }
extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
extensions.injectResolutionRule { _ => AlignRowLevelCommandAssignments }
extensions.injectResolutionRule { _ => RewriteUpdateTableForRowLineage}
extensions.injectResolutionRule { _ => RewriteMergeIntoTableForRowLineage}
extensions.injectResolutionRule { _ => RewriteUpdateTableForRowLineage }
extensions.injectResolutionRule { _ => RewriteMergeIntoTableForRowLineage }
extensions.injectResolutionRule { _ => RewriteUpdateTable }
extensions.injectResolutionRule { _ => RewriteMergeIntoTable }
extensions.injectCheckRule { _ => CheckViews }
Expand All @@ -69,7 +68,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectOptimizerRule { _ => ExtendedSimplifyConditionalsInPredicate }
extensions.injectOptimizerRule { _ => ExtendedReplaceNullWithFalseInPredicate }
extensions.injectOptimizerRule { _ => ReplaceStaticInvoke }
extensions.injectOptimizerRule { _ => RemoveRowLineageOutputFromOriginalTable}
extensions.injectOptimizerRule { _ => RemoveRowLineageOutputFromOriginalTable }
// pre-CBO rules run only once and the order of the rules is important
// - dynamic filters should be added before replacing commands with rewrite plans
// - scans must be planned before building writes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
Expand All @@ -35,8 +34,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
*
* Note that this rule must be run before rewriting row-level commands.
*/
object AlignRowLevelCommandAssignments
extends Rule[LogicalPlan] with AssignmentAlignmentSupport {
object AlignRowLevelCommandAssignments extends Rule[LogicalPlan] with AssignmentAlignmentSupport {

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UpdateIcebergTable if u.resolved && !u.aligned =>
Expand All @@ -60,7 +58,7 @@ object AlignRowLevelCommandAssignments
if (ref.size > 1) {
throw new AnalysisException(
"Nested fields are not supported inside INSERT clauses of MERGE operations: " +
s"${ref.mkString("`", "`.`", "`")}")
s"${ref.mkString("`", "`.`", "`")}")
}
}

Expand Down Expand Up @@ -101,8 +99,8 @@ object AlignRowLevelCommandAssignments
if (assignment.isEmpty) {
throw new AnalysisException(
s"Cannot find column '${targetAttr.name}' of the target table among " +
s"the INSERT columns: ${assignmentMap.keys.mkString(", ")}. " +
"INSERT clauses must provide values for all columns of the target table.")
s"the INSERT columns: ${assignmentMap.keys.mkString(", ")}. " +
"INSERT clauses must provide values for all columns of the target table.")
}

val key = assignment.get.key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -69,8 +68,8 @@ trait AssignmentAlignmentSupport extends CastSupport {

val columnUpdates = assignments.map(a => ColumnUpdate(toAssignmentRef(a.key), a.value))
val outputExprs = applyUpdates(table.output, columnUpdates)
outputExprs.zip(table.output).map {
case (expr, attr) => handleCharVarcharLimits(Assignment(attr, expr))
outputExprs.zip(table.output).map { case (expr, attr) =>
handleCharVarcharLimits(Assignment(attr, expr))
}
}

Expand Down Expand Up @@ -117,8 +116,7 @@ trait AssignmentAlignmentSupport extends CastSupport {
val colName = (namePrefix :+ col.name).mkString(".")
throw new AnalysisException(
"Updating nested fields is only supported for StructType " +
s"but $colName is of type $otherType"
)
s"but $colName is of type $otherType")
}

// if there are conflicting updates, throw an exception
Expand All @@ -129,7 +127,7 @@ trait AssignmentAlignmentSupport extends CastSupport {
val conflictingCols = updates.map(u => (namePrefix ++ u.ref).mkString("."))
throw new AnalysisException(
"Updates are in conflict for these columns: " +
conflictingCols.distinct.mkString(", "))
conflictingCols.distinct.mkString(", "))
}
}
}
Expand Down Expand Up @@ -180,8 +178,13 @@ trait AssignmentAlignmentSupport extends CastSupport {
// e.g. a struct with fields (a, b) is assigned as a struct with fields (a, c) or (b, a)
val errors = new mutable.ArrayBuffer[String]()
val canWrite = DataType.canWrite(
expr.dataType, tableAttr.dataType, byName = true, resolver, tableAttr.name,
storeAssignmentPolicy, err => errors += err)
expr.dataType,
tableAttr.dataType,
byName = true,
resolver,
tableAttr.name,
storeAssignmentPolicy,
err => errors += err)

if (!canWrite) {
throw new AnalysisException(
Expand All @@ -195,7 +198,8 @@ trait AssignmentAlignmentSupport extends CastSupport {
case _ if tableAttr.dataType.sameType(expr.dataType) =>
expr
case StoreAssignmentPolicy.ANSI =>
val cast = Cast(expr, tableAttr.dataType, Option(conf.sessionLocalTimeZone), ansiEnabled = true)
val cast =
Cast(expr, tableAttr.dataType, Option(conf.sessionLocalTimeZone), ansiEnabled = true)
cast.setTagValue(Cast.BY_TABLE_INSERTION, ())
TableOutputResolver.checkCastOverflowInTableInsert(cast, colPath.quoted)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -57,18 +56,20 @@ object CheckMergeIntoTableConditions extends Rule[LogicalPlan] {
if (!cond.deterministic) {
throw new AnalysisException(
s"Non-deterministic functions are not supported in $condName conditions of " +
s"MERGE operations: ${cond.sql}")
s"MERGE operations: ${cond.sql}")
}

if (SubqueryExpression.hasSubquery(cond)) {
throw new AnalysisException(
s"Subqueries are not supported in conditions of MERGE operations. " +
s"Found a subquery in the $condName condition: ${cond.sql}")
s"Found a subquery in the $condName condition: ${cond.sql}")
}

if (cond.find(_.isInstanceOf[AggregateExpression]).isDefined) {
throw new AnalysisException(
s"Agg functions are not supported in $condName conditions of MERGE operations: " + {cond.sql})
s"Agg functions are not supported in $condName conditions of MERGE operations: " + {
cond.sql
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
Expand All @@ -38,44 +37,71 @@ object CheckViews extends (LogicalPlan => Unit) {

override def apply(plan: LogicalPlan): Unit = {
plan foreach {
case CreateIcebergView(resolvedIdent@ResolvedIdentifier(_: ViewCatalog, _), _, query, columnAliases, _,
_, _, _, _, replace, _) =>
case CreateIcebergView(
resolvedIdent @ ResolvedIdentifier(_: ViewCatalog, _),
_,
query,
columnAliases,
_,
_,
_,
_,
_,
replace,
_) =>
verifyColumnCount(resolvedIdent, columnAliases, query)
SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames.toIndexedSeq, SQLConf.get.resolver)
SchemaUtils.checkColumnNameDuplication(
query.schema.fieldNames.toIndexedSeq,
SQLConf.get.resolver)
if (replace) {
val viewIdent: Seq[String] = resolvedIdent.catalog.name() +: resolvedIdent.identifier.asMultipartIdentifier
val viewIdent: Seq[String] =
resolvedIdent.catalog.name() +: resolvedIdent.identifier.asMultipartIdentifier
checkCyclicViewReference(viewIdent, query, Seq(viewIdent))
}

case AlterViewAs(ResolvedV2View(_, _), _, _) =>
throw new AnalysisException("ALTER VIEW <viewName> AS is not supported. Use CREATE OR REPLACE VIEW instead")
throw new AnalysisException(
"ALTER VIEW <viewName> AS is not supported. Use CREATE OR REPLACE VIEW instead")

case _ => // OK
}
}

private def verifyColumnCount(ident: ResolvedIdentifier, columns: Seq[String], query: LogicalPlan): Unit = {
private def verifyColumnCount(
ident: ResolvedIdentifier,
columns: Seq[String],
query: LogicalPlan): Unit = {
if (columns.nonEmpty) {
if (columns.length > query.output.length) {
throw new AnalysisException(String.format("Cannot create view %s.%s, the reason is not enough data columns:\n" +
"View columns: %s\n" +
"Data columns: %s", ident.catalog.name(), ident.identifier, columns.mkString(", "),
query.output.map(c => c.name).mkString(", ")))
throw new AnalysisException(
String.format(
"Cannot create view %s.%s, the reason is not enough data columns:\n" +
"View columns: %s\n" +
"Data columns: %s",
ident.catalog.name(),
ident.identifier,
columns.mkString(", "),
query.output.map(c => c.name).mkString(", ")))
} else if (columns.length < query.output.length) {
throw new AnalysisException(String.format("Cannot create view %s.%s, the reason is too many data columns:\n" +
"View columns: %s\n" +
"Data columns: %s", ident.catalog.name(), ident.identifier, columns.mkString(", "),
query.output.map(c => c.name).mkString(", ")))
throw new AnalysisException(
String.format(
"Cannot create view %s.%s, the reason is too many data columns:\n" +
"View columns: %s\n" +
"Data columns: %s",
ident.catalog.name(),
ident.identifier,
columns.mkString(", "),
query.output.map(c => c.name).mkString(", ")))
}
}
}

private def checkCyclicViewReference(
viewIdent: Seq[String],
plan: LogicalPlan,
cyclePath: Seq[Seq[String]]): Unit = {
viewIdent: Seq[String],
plan: LogicalPlan,
cyclePath: Seq[Seq[String]]): Unit = {
plan match {
case sub@SubqueryAlias(_, Project(_, _)) =>
case sub @ SubqueryAlias(_, Project(_, _)) =>
val currentViewIdent: Seq[String] = sub.identifier.qualifier :+ sub.identifier.name
checkIfRecursiveView(viewIdent, currentViewIdent, cyclePath, sub.children)
case v1View: View =>
Expand All @@ -94,15 +120,17 @@ object CheckViews extends (LogicalPlan => Unit) {
}

private def checkIfRecursiveView(
viewIdent: Seq[String],
currentViewIdent: Seq[String],
cyclePath: Seq[Seq[String]],
children: Seq[LogicalPlan]
): Unit = {
viewIdent: Seq[String],
currentViewIdent: Seq[String],
cyclePath: Seq[Seq[String]],
children: Seq[LogicalPlan]): Unit = {
val newCyclePath = cyclePath :+ currentViewIdent
if (currentViewIdent == viewIdent) {
throw new AnalysisException(String.format("Recursive cycle in view detected: %s (cycle: %s)",
viewIdent.asIdentifier, newCyclePath.map(p => p.mkString(".")).mkString(" -> ")))
throw new AnalysisException(
String.format(
"Recursive cycle in view detected: %s (cycle: %s)",
viewIdent.asIdentifier,
newCyclePath.map(p => p.mkString(".")).mkString(" -> ")))
} else {
children.foreach { c =>
checkCyclicViewReference(viewIdent, c, newCyclePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
Expand Down
Loading