Skip to content

Commit

Permalink
[SPARK-8207] [SQL] Add math function bin
Browse files Browse the repository at this point in the history
JIRA: https://issues.apache.org/jira/browse/SPARK-8207

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#6721 from viirya/expr_bin and squashes the following commits:

07e1c8f [Liang-Chi Hsieh] Remove AbstractUnaryMathExpression and let BIN inherit UnaryExpression.
0677f1a [Liang-Chi Hsieh] For comments.
cf62b95 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
0cf20f2 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
dea9c12 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
d4f4774 [Liang-Chi Hsieh] Add @ignore_unicode_prefix.
7a0196f [Liang-Chi Hsieh] Fix python style.
ac2bacd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
a0a2d0f [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
4cb764d [Liang-Chi Hsieh] For comments.
0f78682 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
c0c3197 [Liang-Chi Hsieh] Add bin to FunctionRegistry.
824f761 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin
50e0c3b [Liang-Chi Hsieh] Add math function bin(a: long): string.
  • Loading branch information
viirya authored and Davies Liu committed Jun 19, 2015
1 parent 43c7ec6 commit 2c59d5c
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 8 deletions.
14 changes: 14 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
__all__ = [
'array',
'approxCountDistinct',
'bin',
'coalesce',
'countDistinct',
'explode',
Expand Down Expand Up @@ -231,6 +232,19 @@ def approxCountDistinct(col, rsd=None):
return Column(jc)


@ignore_unicode_prefix
@since(1.5)
def bin(col):
"""Returns the string representation of the binary value of the given column.
>>> df.select(bin(df.age).alias('c')).collect()
[Row(c=u'10'), Row(c=u'101')]
"""
sc = SparkContext._active_spark_context
jc = sc._jvm.functions.bin(_to_java_column(col))
return Column(jc)


@since(1.4)
def coalesce(*cols):
"""Returns the first column that is not null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ object FunctionRegistry {
expression[Asin]("asin"),
expression[Atan]("atan"),
expression[Atan2]("atan2"),
expression[Bin]("bin"),
expression[Cbrt]("cbrt"),
expression[Ceil]("ceil"),
expression[Ceil]("ceiling"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.sql.catalyst.expressions

import java.lang.{Long => JLong}

import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.types.{DataType, DoubleType}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StringType}
import org.apache.spark.unsafe.types.UTF8String

/**
* A leaf expression specifically for math constants. Math constants expect no input.
Expand Down Expand Up @@ -207,6 +210,34 @@ case class ToRadians(child: Expression) extends UnaryMathExpression(math.toRadia
override def funcName: String = "toRadians"
}

case class Bin(child: Expression)
extends UnaryExpression with Serializable with ExpectsInputTypes {

val name: String = "BIN"

override def foldable: Boolean = child.foldable
override def nullable: Boolean = true
override def toString: String = s"$name($child)"

override def expectedChildTypes: Seq[DataType] = Seq(LongType)
override def dataType: DataType = StringType

def funcName: String = name.toLowerCase

override def eval(input: catalyst.InternalRow): Any = {
val evalE = child.eval(input)
if (evalE == null) {
null
} else {
UTF8String.fromString(JLong.toBinaryString(evalE.asInstanceOf[Long]))
}
}

override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
defineCodeGen(ctx, ev, (c) =>
s"${ctx.stringType}.fromString(java.lang.Long.toBinaryString($c))")
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.types.{DataType, DoubleType, LongType}

class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {

Expand All @@ -41,16 +42,18 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
* Used for testing unary math expressions.
*
* @param c expression
* @param f The functions in scala.math
* @param f The functions in scala.math or elsewhere used to generate expected results
* @param domain The set of values to run the function with
* @param expectNull Whether the given values should return null or not
* @tparam T Generic type for primitives
* @tparam U Generic type for the output of the given function `f`
*/
private def testUnary[T](
private def testUnary[T, U](
c: Expression => Expression,
f: T => T,
f: T => U,
domain: Iterable[T] = (-20 to 20).map(_ * 0.1),
expectNull: Boolean = false): Unit = {
expectNull: Boolean = false,
evalType: DataType = DoubleType): Unit = {
if (expectNull) {
domain.foreach { value =>
checkEvaluation(c(Literal(value)), null, EmptyRow)
Expand All @@ -60,7 +63,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(c(Literal(value)), f(value), EmptyRow)
}
}
checkEvaluation(c(Literal.create(null, DoubleType)), null, create_row(null))
checkEvaluation(c(Literal.create(null, evalType)), null, create_row(null))
}

/**
Expand Down Expand Up @@ -168,7 +171,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}

test("signum") {
testUnary[Double](Signum, math.signum)
testUnary[Double, Double](Signum, math.signum)
}

test("log") {
Expand All @@ -186,6 +189,23 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
testUnary(Log1p, math.log1p, (-10 to -2).map(_ * 1.0), expectNull = true)
}

test("bin") {
testUnary(Bin, java.lang.Long.toBinaryString, (-20 to 20).map(_.toLong), evalType = LongType)

val row = create_row(null, 12L, 123L, 1234L, -123L)
val l1 = 'a.long.at(0)
val l2 = 'a.long.at(1)
val l3 = 'a.long.at(2)
val l4 = 'a.long.at(3)
val l5 = 'a.long.at(4)

checkEvaluation(Bin(l1), null, row)
checkEvaluation(Bin(l2), java.lang.Long.toBinaryString(12), row)
checkEvaluation(Bin(l3), java.lang.Long.toBinaryString(123), row)
checkEvaluation(Bin(l4), java.lang.Long.toBinaryString(1234), row)
checkEvaluation(Bin(l5), java.lang.Long.toBinaryString(-123), row)
}

test("log2") {
def f: (Double) => Double = (x: Double) => math.log(x) / math.log(2)
testUnary(Log2, f, (0 to 20).map(_ * 0.1))
Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,24 @@ object functions {
*/
def atan2(l: Double, rightName: String): Column = atan2(l, Column(rightName))

/**
* An expression that returns the string representation of the binary value of the given long
* column. For example, bin("12") returns "1100".
*
* @group math_funcs
* @since 1.5.0
*/
def bin(e: Column): Column = Bin(e.expr)

/**
* An expression that returns the string representation of the binary value of the given long
* column. For example, bin("12") returns "1100".
*
* @group math_funcs
* @since 1.5.0
*/
def bin(columnName: String): Column = bin(Column(columnName))

/**
* Computes the cube-root of the given value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ class DataFrameFunctionsSuite extends QueryTest {
testData2.collect().toSeq.map(r => Row(~r.getInt(0))))
}

test("bin") {
val df = Seq[(Integer, Integer)]((12, null)).toDF("a", "b")
checkAnswer(
df.select(bin("a"), bin("b")),
Row("1100", null))
checkAnswer(
df.selectExpr("bin(a)", "bin(b)"),
Row("1100", null))
}

test("if function") {
val df = Seq((1, 2)).toDF("a", "b")
checkAnswer(
Expand Down

0 comments on commit 2c59d5c

Please sign in to comment.