Skip to content

Remove #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ object TypeCoercion {
BooleanEquality ::
FunctionArgumentConversion ::
ConcatCoercion(conf) ::
MapZipWithCoercion ::
EltCoercion(conf) ::
CaseWhenCoercion ::
IfCoercion ::
Expand Down Expand Up @@ -763,30 +762,6 @@ object TypeCoercion {
}
}

/**
* Coerces key types of two different [[MapType]] arguments of the [[MapZipWith]] expression
* to a common type.
*/
object MapZipWithCoercion extends TypeCoercionRule {
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Lambda function isn't resolved when the rule is executed.
case m @ MapZipWith(left, right, function) if m.arguments.forall(a => a.resolved &&
MapType.acceptsType(a.dataType)) && !m.leftKeyType.sameType(m.rightKeyType) =>
findWiderTypeForTwo(m.leftKeyType, m.rightKeyType) match {
case Some(finalKeyType) if !Cast.forceNullable(m.leftKeyType, finalKeyType) &&
!Cast.forceNullable(m.rightKeyType, finalKeyType) =>
val newLeft = castIfNotSameType(
left,
MapType(finalKeyType, m.leftValueType, m.leftValueContainsNull))
val newRight = castIfNotSameType(
right,
MapType(finalKeyType, m.rightValueType, m.rightValueContainsNull))
MapZipWith(newLeft, newRight, function)
case _ => m
}
}
}

/**
* Coerces the types of [[Elt]] children to expected ones.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,56 +260,6 @@ case class ArrayTransform(
override def prettyName: String = "transform"
}

/**
* Filters entries in a map using the provided function.
*/
@ExpressionDescription(
usage = "_FUNC_(expr, func) - Filters entries in a map using the function.",
examples = """
Examples:
> SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v);
{1:0,3:-1}
""",
since = "2.4.0")
case class MapFilter(
argument: Expression,
function: Expression)
extends MapBasedSimpleHigherOrderFunction with CodegenFallback {

@transient lazy val (keyVar, valueVar) = {
val args = function.asInstanceOf[LambdaFunction].arguments
(args.head.asInstanceOf[NamedLambdaVariable], args.tail.head.asInstanceOf[NamedLambdaVariable])
}

@transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType

override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapFilter = {
copy(function = f(function, (keyType, false) :: (valueType, valueContainsNull) :: Nil))
}

override def nullSafeEval(inputRow: InternalRow, argumentValue: Any): Any = {
val m = argumentValue.asInstanceOf[MapData]
val f = functionForEval
val retKeys = new mutable.ListBuffer[Any]
val retValues = new mutable.ListBuffer[Any]
m.foreach(keyType, valueType, (k, v) => {
keyVar.value.set(k)
valueVar.value.set(v)
if (f.eval(inputRow).asInstanceOf[Boolean]) {
retKeys += k
retValues += v
}
})
ArrayBasedMapData(retKeys.toArray, retValues.toArray)
}

override def dataType: DataType = argument.dataType

override def functionType: AbstractDataType = BooleanType

override def prettyName: String = "map_filter"
}

/**
* Filters the input array using the given lambda function.
*/
Expand Down Expand Up @@ -491,286 +441,6 @@ case class ArrayAggregate(
override def prettyName: String = "aggregate"
}

/**
* Transform Keys for every entry of the map by applying the transform_keys function.
* Returns map with transformed key entries
*/
@ExpressionDescription(
usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.",
examples = """
Examples:
> SELECT _FUNC_(map_from_arrays(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 1);
{2:1,3:2,4:3}
> SELECT _FUNC_(map_from_arrays(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + v);
{2:1,4:2,6:3}
""",
since = "2.4.0")
case class TransformKeys(
argument: Expression,
function: Expression)
extends MapBasedSimpleHigherOrderFunction with CodegenFallback {

@transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType

override def dataType: DataType = MapType(function.dataType, valueType, valueContainsNull)

override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): TransformKeys = {
copy(function = f(function, (keyType, false) :: (valueType, valueContainsNull) :: Nil))
}

@transient lazy val LambdaFunction(
_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function


override def nullSafeEval(inputRow: InternalRow, argumentValue: Any): Any = {
val map = argumentValue.asInstanceOf[MapData]
val resultKeys = new GenericArrayData(new Array[Any](map.numElements))
var i = 0
while (i < map.numElements) {
keyVar.value.set(map.keyArray().get(i, keyVar.dataType))
valueVar.value.set(map.valueArray().get(i, valueVar.dataType))
val result = functionForEval.eval(inputRow)
if (result == null) {
throw new RuntimeException("Cannot use null as map key!")
}
resultKeys.update(i, result)
i += 1
}
new ArrayBasedMapData(resultKeys, map.valueArray())
}

override def prettyName: String = "transform_keys"
}

/**
* Returns a map that applies the function to each value of the map.
*/
@ExpressionDescription(
usage = "_FUNC_(expr, func) - Transforms values in the map using the function.",
examples = """
Examples:
> SELECT _FUNC_(map_from_arrays(array(1, 2, 3), array(1, 2, 3)), (k, v) -> v + 1);
{1:2,2:3,3:4}
> SELECT _FUNC_(map_from_arrays(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + v);
{1:2,2:4,3:6}
""",
since = "2.4.0")
case class TransformValues(
argument: Expression,
function: Expression)
extends MapBasedSimpleHigherOrderFunction with CodegenFallback {

@transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType

override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)

override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction)
: TransformValues = {
copy(function = f(function, (keyType, false) :: (valueType, valueContainsNull) :: Nil))
}

@transient lazy val LambdaFunction(
_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function

override def nullSafeEval(inputRow: InternalRow, argumentValue: Any): Any = {
val map = argumentValue.asInstanceOf[MapData]
val resultValues = new GenericArrayData(new Array[Any](map.numElements))
var i = 0
while (i < map.numElements) {
keyVar.value.set(map.keyArray().get(i, keyVar.dataType))
valueVar.value.set(map.valueArray().get(i, valueVar.dataType))
resultValues.update(i, functionForEval.eval(inputRow))
i += 1
}
new ArrayBasedMapData(map.keyArray(), resultValues)
}

override def prettyName: String = "transform_values"
}

/**
* Merges two given maps into a single map by applying function to the pair of values with
* the same key.
*/
@ExpressionDescription(
usage =
"""
_FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
function to the pair of values with the same key. For keys only presented in one map,
NULL will be passed as the value for the missing key. If an input map contains duplicated
keys, only the first entry of the duplicated key is passed into the lambda function.
""",
examples = """
Examples:
> SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
{1:"ax",2:"by"}
""",
since = "2.4.0")
case class MapZipWith(left: Expression, right: Expression, function: Expression)
extends HigherOrderFunction with CodegenFallback {

def functionForEval: Expression = functionsForEval.head

@transient lazy val MapType(leftKeyType, leftValueType, leftValueContainsNull) = left.dataType

@transient lazy val MapType(rightKeyType, rightValueType, rightValueContainsNull) = right.dataType

@transient lazy val keyType =
TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(leftKeyType, rightKeyType).get

@transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)

override def arguments: Seq[Expression] = left :: right :: Nil

override def argumentTypes: Seq[AbstractDataType] = MapType :: MapType :: Nil

override def functions: Seq[Expression] = function :: Nil

override def functionTypes: Seq[AbstractDataType] = AnyDataType :: Nil

override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)

override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
copy(function = f(function, arguments))
}

override def checkArgumentDataTypes(): TypeCheckResult = {
super.checkArgumentDataTypes() match {
case TypeCheckResult.TypeCheckSuccess =>
if (leftKeyType.sameType(rightKeyType)) {
TypeUtils.checkForOrderingExpr(leftKeyType, s"function $prettyName")
} else {
TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
s"been two ${MapType.simpleString}s with compatible key types, but the key types are " +
s"[${leftKeyType.catalogString}, ${rightKeyType.catalogString}].")
}
case failure => failure
}
}

override def checkInputDataTypes(): TypeCheckResult = checkArgumentDataTypes()

override def eval(input: InternalRow): Any = {
val value1 = left.eval(input)
if (value1 == null) {
null
} else {
val value2 = right.eval(input)
if (value2 == null) {
null
} else {
nullSafeEval(input, value1, value2)
}
}
}

@transient lazy val LambdaFunction(_, Seq(
keyVar: NamedLambdaVariable,
value1Var: NamedLambdaVariable,
value2Var: NamedLambdaVariable),
_) = function

/**
* The function accepts two key arrays and returns a collection of keys with indexes
* to value arrays. Indexes are represented as an array of two items. This is a small
* optimization leveraging mutability of arrays.
*/
@transient private lazy val getKeysWithValueIndexes:
(ArrayData, ArrayData) => mutable.Iterable[(Any, Array[Option[Int]])] = {
if (TypeUtils.typeWithProperEquals(keyType)) {
getKeysWithIndexesFast
} else {
getKeysWithIndexesBruteForce
}
}

private def assertSizeOfArrayBuffer(size: Int): Unit = {
if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
s"unique keys due to exceeding the array size limit " +
s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
}
}

private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
val hashMap = new mutable.LinkedHashMap[Any, Array[Option[Int]]]
for((z, array) <- Array((0, keys1), (1, keys2))) {
var i = 0
while (i < array.numElements()) {
val key = array.get(i, keyType)
hashMap.get(key) match {
case Some(indexes) =>
if (indexes(z).isEmpty) {
indexes(z) = Some(i)
}
case None =>
val indexes = Array[Option[Int]](None, None)
indexes(z) = Some(i)
hashMap.put(key, indexes)
}
i += 1
}
}
hashMap
}

private def getKeysWithIndexesBruteForce(keys1: ArrayData, keys2: ArrayData) = {
val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
for((z, array) <- Array((0, keys1), (1, keys2))) {
var i = 0
while (i < array.numElements()) {
val key = array.get(i, keyType)
var found = false
var j = 0
while (!found && j < arrayBuffer.size) {
val (bufferKey, indexes) = arrayBuffer(j)
if (ordering.equiv(bufferKey, key)) {
found = true
if(indexes(z).isEmpty) {
indexes(z) = Some(i)
}
}
j += 1
}
if (!found) {
assertSizeOfArrayBuffer(arrayBuffer.size)
val indexes = Array[Option[Int]](None, None)
indexes(z) = Some(i)
arrayBuffer += Tuple2(key, indexes)
}
i += 1
}
}
arrayBuffer
}

private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: Any): Any = {
val mapData1 = value1.asInstanceOf[MapData]
val mapData2 = value2.asInstanceOf[MapData]
val keysWithIndexes = getKeysWithValueIndexes(mapData1.keyArray(), mapData2.keyArray())
val size = keysWithIndexes.size
val keys = new GenericArrayData(new Array[Any](size))
val values = new GenericArrayData(new Array[Any](size))
val valueData1 = mapData1.valueArray()
val valueData2 = mapData2.valueArray()
var i = 0
for ((key, Array(index1, index2)) <- keysWithIndexes) {
val v1 = index1.map(valueData1.get(_, leftValueType)).getOrElse(null)
val v2 = index2.map(valueData2.get(_, rightValueType)).getOrElse(null)
keyVar.value.set(key)
value1Var.value.set(v1)
value2Var.value.set(v2)
keys.update(i, key)
values.update(i, functionForEval.eval(inputRow))
i += 1
}
new ArrayBasedMapData(keys, values)
}

override def prettyName: String = "map_zip_with"
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.",
Expand Down
Loading