Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into improve_ts
Browse files Browse the repository at this point in the history
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
	sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
  • Loading branch information
Davies Liu committed Jun 11, 2015
2 parents 5233974 + 7d669a5 commit a3171b8
Show file tree
Hide file tree
Showing 56 changed files with 659 additions and 491 deletions.
31 changes: 30 additions & 1 deletion core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import scala.collection.JavaConversions._
import scala.xml.Node

import com.gargoylesoftware.htmlunit.DefaultCssErrorHandler
import org.json4s._
import org.json4s.jackson.JsonMethods
import org.openqa.selenium.htmlunit.HtmlUnitDriver
Expand All @@ -31,6 +32,7 @@ import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.selenium.WebBrowser
import org.scalatest.time.SpanSugar._
import org.w3c.css.sac.CSSParseException

import org.apache.spark.LocalSparkContext._
import org.apache.spark._
Expand All @@ -39,6 +41,31 @@ import org.apache.spark.deploy.history.HistoryServerSuite
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.status.api.v1.{JacksonMessageWriter, StageStatus}

private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {

private val cssWhiteList = List("bootstrap.min.css", "vis.min.css")

private def isInWhileList(uri: String): Boolean = cssWhiteList.exists(uri.endsWith)

override def warning(e: CSSParseException): Unit = {
if (!isInWhileList(e.getURI)) {
super.warning(e)
}
}

override def fatalError(e: CSSParseException): Unit = {
if (!isInWhileList(e.getURI)) {
super.fatalError(e)
}
}

override def error(e: CSSParseException): Unit = {
if (!isInWhileList(e.getURI)) {
super.error(e)
}
}
}

/**
* Selenium tests for the Spark Web UI.
*/
Expand All @@ -49,7 +76,9 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B


override def beforeAll(): Unit = {
webDriver = new HtmlUnitDriver
webDriver = new HtmlUnitDriver {
getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
}
}

override def afterAll(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion docs/hadoop-provided.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ export SPARK_DIST_CLASSPATH=$(hadoop classpath)
export SPARK_DIST_CLASSPATH=$(/path/to/hadoop/bin/hadoop classpath)

# Passing a Hadoop configuration directory
export SPARK_DIST_CLASSPATH=$(hadoop classpath --config /path/to/configs)
export SPARK_DIST_CLASSPATH=$(hadoop --config /path/to/configs classpath)

{% endhighlight %}
4 changes: 2 additions & 2 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
raw_input = input
xrange = range

SPARK_EC2_VERSION = "1.3.1"
SPARK_EC2_VERSION = "1.4.0"
SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))

VALID_SPARK_VERSIONS = set([
Expand Down Expand Up @@ -89,7 +89,7 @@

# Default location to get the spark-ec2 scripts (and ami-list) from
DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/mesos/spark-ec2"
DEFAULT_SPARK_EC2_BRANCH = "branch-1.3"
DEFAULT_SPARK_EC2_BRANCH = "branch-1.4"


def setup_external_libs(libs):
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,7 @@
<include>**/*Suite.java</include>
</includes>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<argLine>-Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
<argLine>-Xmx3g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
<environmentVariables>
<!--
Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
Expand Down
6 changes: 4 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ object SparkBuild extends PomBuild {
javacOptions in (Compile, doc) ++= {
val Array(major, minor, _) = System.getProperty("java.version").split("\\.", 3)
if (major.toInt >= 1 && minor.toInt >= 8) Seq("-Xdoclint:all", "-Xdoclint:-missing") else Seq.empty
}
},

javacOptions in Compile ++= Seq("-encoding", "UTF-8")
)

def enable(settings: Seq[Setting[_]])(projectRef: ProjectRef) = {
Expand Down Expand Up @@ -514,7 +516,7 @@ object TestSettings {
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
.map { case (k,v) => s"-D$k=$v" }.toSeq,
javaOptions in Test += "-ea",
javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g"
javaOptions in Test ++= "-Xmx3g -Xss4096k -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g"
.split(" ").toSeq,
javaOptions += "-Xmx3g",
// Show full stack trace and duration in test cases.
Expand Down
32 changes: 32 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tempfile
import pickle
import functools
import time
import datetime

import py4j
Expand All @@ -47,6 +48,20 @@
from pyspark.sql.window import Window


class UTC(datetime.tzinfo):
"""UTC"""
ZERO = datetime.timedelta(0)

def utcoffset(self, dt):
return self.ZERO

def tzname(self, dt):
return "UTC"

def dst(self, dt):
return self.ZERO


class ExamplePointUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePoint.
Expand Down Expand Up @@ -588,6 +603,23 @@ def test_filter_with_datetime(self):
self.assertEqual(0, df.filter(df.date > date).count())
self.assertEqual(0, df.filter(df.time > time).count())

def test_time_with_timezone(self):
day = datetime.date.today()
now = datetime.datetime.now()
ts = time.mktime(now.timetuple()) + now.microsecond / 1e6
# class in __main__ is not serializable
from pyspark.sql.tests import UTC
utc = UTC()
utcnow = datetime.datetime.fromtimestamp(ts, utc)
df = self.sqlCtx.createDataFrame([(day, now, utcnow)])
day1, now1, utcnow1 = df.first()
# Pyrolite serialize java.sql.Date as datetime, will be fixed in new version
self.assertEqual(day1.date(), day)
# Pyrolite does not support microsecond, the error should be
# less than 1 millisecond
self.assertTrue(now - now1 < datetime.timedelta(0.001))
self.assertTrue(now - utcnow1 < datetime.timedelta(0.001))

def test_dropna(self):
schema = StructType([
StructField("name", StringType(), True),
Expand Down
27 changes: 18 additions & 9 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,15 @@ def _need_python_to_sql_conversion(dataType):
_need_python_to_sql_conversion(dataType.valueType)
elif isinstance(dataType, UserDefinedType):
return True
elif isinstance(dataType, TimestampType):
elif isinstance(dataType, (DateType, TimestampType)):
return True
else:
return False


EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()


def _python_to_sql_converter(dataType):
"""
Returns a converter that converts a Python object into a SQL datum for the given type.
Expand Down Expand Up @@ -698,26 +701,32 @@ def converter(obj):
return tuple(c(d.get(n)) for n, c in zip(names, converters))
else:
return tuple(c(v) for c, v in zip(converters, obj))
else:
elif obj is not None:
raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType))
return converter
elif isinstance(dataType, ArrayType):
element_converter = _python_to_sql_converter(dataType.elementType)
return lambda a: [element_converter(v) for v in a]
return lambda a: a and [element_converter(v) for v in a]
elif isinstance(dataType, MapType):
key_converter = _python_to_sql_converter(dataType.keyType)
value_converter = _python_to_sql_converter(dataType.valueType)
return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
return lambda m: m and dict([(key_converter(k), value_converter(v)) for k, v in m.items()])

elif isinstance(dataType, UserDefinedType):
return lambda obj: dataType.serialize(obj)
return lambda obj: obj and dataType.serialize(obj)

elif isinstance(dataType, DateType):
return lambda d: d and d.toordinal() - EPOCH_ORDINAL

elif isinstance(dataType, TimestampType):

def to_posix_timstamp(dt):
if dt.tzinfo is None:
return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10)
else:
return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10)
if dt:
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
else time.mktime(dt.timetuple()))
return int(seconds * 1e7 + dt.microsecond * 10)
return to_posix_timstamp

else:
raise ValueError("Unexpected type %r" % dataType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.spark.sql.BaseMutableRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.UTF8String;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.bitset.BitSetMethods;

Expand Down
21 changes: 21 additions & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,27 @@ public int fieldIndex(String name) {
throw new UnsupportedOperationException();
}

/**
* A generic version of Row.equals(Row), which is used for tests.
*/
@Override
public boolean equals(Object other) {
if (other instanceof Row) {
Row row = (Row) other;
int n = size();
if (n != row.size()) {
return false;
}
for (int i = 0; i < n; i ++) {
if (isNullAt(i) != row.isNullAt(i) || (!isNullAt(i) && !get(i).equals(row.get(i)))) {
return false;
}
}
return true;
}
return false;
}

@Override
public Row copy() {
final int n = size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
* Functions to convert Scala types to Catalyst types and vice versa.
Expand Down Expand Up @@ -257,7 +258,7 @@ object CatalystTypeConverters {

private object StringConverter extends CatalystTypeConverter[Any, String, Any] {
override def toCatalystImpl(scalaValue: Any): UTF8String = scalaValue match {
case str: String => UTF8String(str)
case str: String => UTF8String.fromString(str)
case utf8: UTF8String => utf8
}
override def toScala(catalystValue: Any): String = catalystValue match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst

import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/** Cast the child expression to the target data type. */
case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with Logging {
Expand Down Expand Up @@ -111,11 +112,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w

// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, UTF8String(_))
case DateType => buildCast[Int](_, d => UTF8String(DateTimeUtils.toString(d)))
case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.toString(d)))
case TimestampType => buildCast[Long](_,
t => UTF8String(timestampToString(DateTimeUtils.toJavaTimestamp(t))))
case _ => buildCast[Any](_, o => UTF8String(o.toString))
t => UTF8String.fromString(timestampToString(DateTimeUtils.toJavaTimestamp(t))))
case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString))
}

// BinaryConverter
Expand All @@ -141,7 +142,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case ByteType =>
buildCast[Byte](_, _ != 0)
case DecimalType() =>
buildCast[Decimal](_, _ != 0)
buildCast[Decimal](_, _ != Decimal(0))
case DoubleType =>
buildCast[Double](_, _ != 0)
case FloatType =>
Expand Down Expand Up @@ -454,7 +455,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case (BooleanType, dt: NumericType) =>
defineCodeGen(ctx, ev, c => s"(${ctx.javaType(dt)})($c ? 1 : 0)")
case (dt: DecimalType, BooleanType) =>
defineCodeGen(ctx, ev, c => s"$c.isZero()")
defineCodeGen(ctx, ev, c => s"!$c.isZero()")
case (dt: NumericType, BooleanType) =>
defineCodeGen(ctx, ev, c => s"$c != 0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
* A parent class for mutable container objects that are reused when the values are changed,
Expand Down Expand Up @@ -240,7 +241,8 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
}
}

override def setString(ordinal: Int, value: String): Unit = update(ordinal, UTF8String(value))
override def setString(ordinal: Int, value: String): Unit =
update(ordinal, UTF8String.fromString(value))

override def getString(ordinal: Int): String = apply(ordinal).toString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.PlatformDependent
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.types.UTF8String

/**
* Converts Rows into UnsafeRow format. This class is NOT thread-safe.
Expand Down
Loading

0 comments on commit a3171b8

Please sign in to comment.