From 5ed63adb9cb74b0ae4ee5b7df7c8441bb0c23aec Mon Sep 17 00:00:00 2001 From: Marc-Andre Tremblay Date: Thu, 4 Aug 2016 14:08:11 -0700 Subject: [PATCH 01/11] Custom JDBC column types back-port Back-port JDBC column types to 1.x branch. Author: Marc-Andre Tremblay Closes #247 from nrstott/feature/custom-jdbc-column-types. --- README.md | 24 +++++++++++ .../spark/redshift/RedshiftJDBCWrapper.scala | 43 +++++++++++-------- .../spark/redshift/RedshiftSourceSuite.scala | 19 ++++++++ 3 files changed, 67 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 232579e5..f78033c1 100644 --- a/README.md +++ b/README.md @@ -448,6 +448,30 @@ df.write .save() ``` +### Setting a custom column type + +If you need to manually set a column type, you can use the `redshift_type` column metadata. For example, if you desire to override +the `Spark SQL Schema -> Redshift SQL` type matcher to assign a user-defined column type, you can do the following: + +```scala +import org.apache.spark.sql.types.MetadataBuilder + +// Specify the custom width of each column +val columnTypeMap = Map( + "language_code" -> "CHAR(2)", + "country_code" -> "CHAR(2)", + "url" -> "BPCHAR(111)" +) + +var df = ... // the dataframe you'll want to write to Redshift + +// Apply each column metadata customization +columnTypeMap.foreach { case (colName, colType) => + val metadata = new MetadataBuilder().putString("redshift_type", colType).build() + df = df.withColumn(colName, df(colName).as(colName, metadata)) +} +``` + ### Configuring column encoding When creating a table, this library can be configured to use a specific compression encoding on individual columns. You can use the `encoding` column metadata field to specify a compression encoding for each column (see [Amazon docs](http://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html) for available encodings). diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala index 7d9bf6b9..55fdf1c5 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala @@ -241,26 +241,31 @@ private[redshift] class JDBCWrapper { val sb = new StringBuilder() schema.fields.foreach { field => { val name = field.name - val typ: String = field.dataType match { - case IntegerType => "INTEGER" - case LongType => "BIGINT" - case DoubleType => "DOUBLE PRECISION" - case FloatType => "REAL" - case ShortType => "INTEGER" - case ByteType => "SMALLINT" // Redshift does not support the BYTE type. - case BooleanType => "BOOLEAN" - case StringType => - if (field.metadata.contains("maxlength")) { - s"VARCHAR(${field.metadata.getLong("maxlength")})" - } else { - "TEXT" - } - case BinaryType => "BLOB" - case TimestampType => "TIMESTAMP" - case DateType => "DATE" - case t: DecimalType => s"DECIMAL(${t.precision},${t.scale})" - case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC") + val typ: String = if (field.metadata.contains("redshift_type")) { + field.metadata.getString("redshift_type") + } else { + field.dataType match { + case IntegerType => "INTEGER" + case LongType => "BIGINT" + case DoubleType => "DOUBLE PRECISION" + case FloatType => "REAL" + case ShortType => "INTEGER" + case ByteType => "SMALLINT" // Redshift does not support the BYTE type. + case BooleanType => "BOOLEAN" + case StringType => + if (field.metadata.contains("maxlength")) { + s"VARCHAR(${field.metadata.getLong("maxlength")})" + } else { + "TEXT" + } + case BinaryType => "BLOB" + case TimestampType => "TIMESTAMP" + case DateType => "DATE" + case t: DecimalType => s"DECIMAL(${t.precision},${t.scale})" + case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC") + } } + val nullable = if (field.nullable) "" else "NOT NULL" val encoding = if (field.metadata.contains("encoding")) { s"ENCODE ${field.metadata.getString("encoding")}" diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala index 18e19cad..ef554911 100644 --- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala @@ -465,6 +465,25 @@ class RedshiftSourceSuite assert(commentCommands === expectedCommentCommands) } + test("configuring redshift_type on columns") { + val bpcharMetadata = new MetadataBuilder().putString("redshift_type", "BPCHAR(2)").build() + val nvarcharMetadata = new MetadataBuilder().putString("redshift_type", "NVARCHAR(123)").build() + + val schema = StructType( + StructField("bpchar_str", StringType, metadata = bpcharMetadata) :: + StructField("bpchar_str", StringType, metadata = nvarcharMetadata) :: + StructField("default_str", StringType) :: + Nil) + + val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema) + val createTableCommand = + DefaultRedshiftWriter.createTableSql(df, MergedParameters.apply(defaultParams)).trim + val expectedCreateTableCommand = + """CREATE TABLE IF NOT EXISTS "PUBLIC"."test_table" ("bpchar_str" BPCHAR(2),""" + + """ "bpchar_str" NVARCHAR(123), "default_str" TEXT)""" + assert(createTableCommand === expectedCreateTableCommand) + } + test("Respect SaveMode.ErrorIfExists when table exists") { val mockRedshift = new MockRedshift( defaultParams("url"), From 237f947618249bced68ae4ab9f255953baaa411b Mon Sep 17 00:00:00 2001 From: Mathias Bogaert Date: Thu, 4 Aug 2016 13:06:11 -0700 Subject: [PATCH 02/11] Redshift doesn't support BLOB Author: Mathias Bogaert Closes #251 from analytically/patch-1. --- .../com/databricks/spark/redshift/RedshiftJDBCWrapper.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala index 55fdf1c5..fb6ab8f1 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala @@ -258,7 +258,6 @@ private[redshift] class JDBCWrapper { } else { "TEXT" } - case BinaryType => "BLOB" case TimestampType => "TIMESTAMP" case DateType => "DATE" case t: DecimalType => s"DECIMAL(${t.precision},${t.scale})" From ebe5e1a993f16f75fb9a018efd5e6e5a6b75ddcc Mon Sep 17 00:00:00 2001 From: Travis Crawford Date: Fri, 19 Aug 2016 19:13:53 -0700 Subject: [PATCH 03/11] Add support for JDBC 4.2 This patch updates `RedshiftJDBCWrapper.getDriverClass` to automatically recognize the Redshift JDBC 4.2 driver. Fixes #258. Author: Travis Crawford Closes #259 from traviscrawford/travis/jdbc42. --- .../spark/redshift/RedshiftJDBCWrapper.scala | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala index fb6ab8f1..b3fe0c30 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala @@ -70,16 +70,23 @@ private[redshift] class JDBCWrapper { jdbcSubprotocol match { case "redshift" => try { - Utils.classForName("com.amazon.redshift.jdbc41.Driver").getName + Utils.classForName("com.amazon.redshift.jdbc42.Driver").getName } catch { case _: ClassNotFoundException => try { - Utils.classForName("com.amazon.redshift.jdbc4.Driver").getName + Utils.classForName("com.amazon.redshift.jdbc41.Driver").getName } catch { - case e: ClassNotFoundException => - throw new ClassNotFoundException( - "Could not load an Amazon Redshift JDBC driver; see the README for " + - "instructions on downloading and configuring the official Amazon driver.", e) + case _: ClassNotFoundException => + try { + Utils.classForName("com.amazon.redshift.jdbc4.Driver").getName + } catch { + case e: ClassNotFoundException => + throw new ClassNotFoundException( + "Could not load an Amazon Redshift JDBC driver; see the README for " + + "instructions on downloading and configuring the official Amazon driver.", + e + ) + } } } case "postgresql" => "org.postgresql.Driver" From 8782802279a76aa7a5fae17d7b02373373a47b4f Mon Sep 17 00:00:00 2001 From: James Hou Date: Mon, 8 Aug 2016 18:02:43 -0700 Subject: [PATCH 04/11] Handle invalid S3 hostname exceptions with older aws-java-sdk versions We've seen a lot of messages lately regarding the "Invalid S3 URI: hostname does not appear to be a valid S3 endpoint" exception and so thought we should contribute our two cents and the code changes that worked for us. We've tried many approaches listed in that thread including using `spark.executor.extraClassPath` and `spark.driver.extraClassPath` environment variables to prepend to the classpath, including it in the assembled jar or as a shaded jar, Unfortunately many of these approaches failed mainly because we have on the machines themselves the older aws-java-sdk jar and that usually takes precedence. We ended up going with what Josh mentioned earlier about changing the S3 url in the spark-redshift code to add the endpoint to the host (`*.s3.amazonaws.com`). This logic will try to instantiate a new AmazonS3URI and if it fails, it'll try to add the default S3 Amazon domain to the host. Author: James Hou Author: James Hou Author: Josh Rosen Closes #254 from jameshou/feature/add-s3-full-endpoint-v1. --- .travis.yml | 18 ++++++---- dev/run-tests-travis.sh | 2 ++ project/SparkRedshiftBuild.scala | 36 ++++++++++++------- project/plugins.sbt | 3 ++ .../spark/redshift/RedshiftRelation.scala | 4 +-- .../com/databricks/spark/redshift/Utils.scala | 34 +++++++++++++++++- .../spark/redshift/UtilsSuite.scala | 5 +++ 7 files changed, 79 insertions(+), 23 deletions(-) diff --git a/.travis.yml b/.travis.yml index 95581b3c..ac362e38 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,29 +13,33 @@ matrix: # so the published Spark Maven artifacts will not work with Hadoop 1.x. - jdk: openjdk6 scala: 2.10.5 - env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.4.1" SPARK_AVRO_VERSION="2.0.1" + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.4.1" SPARK_AVRO_VERSION="2.0.1" AWS_JAVA_SDK_VERSION="1.10.22" - jdk: openjdk7 scala: 2.10.5 - env: HADOOP_VERSION="1.0.4" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1" + env: HADOOP_VERSION="1.0.4" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1" AWS_JAVA_SDK_VERSION="1.10.22" - jdk: openjdk7 scala: 2.10.5 - env: HADOOP_VERSION="1.2.1" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1" + env: HADOOP_VERSION="1.2.1" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1" AWS_JAVA_SDK_VERSION="1.10.22" - jdk: openjdk7 scala: 2.10.5 - env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1" + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1" AWS_JAVA_SDK_VERSION="1.10.22" - jdk: openjdk7 scala: 2.10.5 - env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.6.0" SPARK_AVRO_VERSION="2.0.1" + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.6.0" SPARK_AVRO_VERSION="2.0.1" AWS_JAVA_SDK_VERSION="1.10.22" # Configuration corresponding to DBC 1.4.x driver package as of DBC 2.4, # which uses spark-avro 1.0.0. We use Hadoop 2.2.0 here, while DBC uses # 1.2.1, because the 1.4.1 published to Maven Central is a Hadoop 2.x build. - jdk: openjdk7 scala: 2.10.5 - env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.4.1" SPARK_AVRO_VERSION="1.0.0" + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.4.1" SPARK_AVRO_VERSION="1.0.0" AWS_JAVA_SDK_VERSION="1.10.22" + # Test with an old version of the AWS Java SDK + - jdk: openjdk7 + scala: 2.10.5 + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.6.0" SPARK_AVRO_VERSION="2.0.1" AWS_JAVA_SDK_VERSION="1.7.4" # Scala 2.11 tests: - jdk: openjdk7 scala: 2.11.7 - env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1" + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="1.5.0" SPARK_AVRO_VERSION="2.0.1" AWS_JAVA_SDK_VERSION="1.10.22" env: global: # AWS_REDSHIFT_JDBC_URL diff --git a/dev/run-tests-travis.sh b/dev/run-tests-travis.sh index 161f9e38..b0b398b7 100755 --- a/dev/run-tests-travis.sh +++ b/dev/run-tests-travis.sh @@ -7,6 +7,7 @@ sbt ++$TRAVIS_SCALA_VERSION "test:scalastyle" sbt ++$TRAVIS_SCALA_VERSION "it:scalastyle" sbt \ + -Daws.testVersion=$AWS_JAVA_SDK_VERSION \ -Dhadoop.testVersion=$HADOOP_VERSION \ -Dspark.testVersion=$SPARK_VERSION \ -DsparkAvro.testVersion=$SPARK_AVRO_VERSION \ @@ -15,6 +16,7 @@ sbt \ if [ "$TRAVIS_SECURE_ENV_VARS" == "true" ]; then sbt \ + -Daws.testVersion=$AWS_JAVA_SDK_VERSION \ -Dhadoop.testVersion=$HADOOP_VERSION \ -Dspark.testVersion=$SPARK_VERSION \ -DsparkAvro.testVersion=$SPARK_AVRO_VERSION \ diff --git a/project/SparkRedshiftBuild.scala b/project/SparkRedshiftBuild.scala index a5423f88..424650b6 100644 --- a/project/SparkRedshiftBuild.scala +++ b/project/SparkRedshiftBuild.scala @@ -14,6 +14,8 @@ * limitations under the License. */ +import scala.math.Ordering.Implicits._ +import org.apache.maven.artifact.versioning.ComparableVersion import org.scalastyle.sbt.ScalastylePlugin.rawScalastyleSettings import sbt._ import sbt.Keys._ @@ -28,6 +30,7 @@ object SparkRedshiftBuild extends Build { val testSparkVersion = settingKey[String]("Spark version to test against") val testSparkAvroVersion = settingKey[String]("spark-avro version to test against") val testHadoopVersion = settingKey[String]("Hadoop version to test against") + val testAWSJavaSDKVersion = settingKey[String]("AWS Java SDK version to test against") // Define a custom test configuration so that unit test helper classes can be re-used under // the integration tests configuration; see http://stackoverflow.com/a/20635808. @@ -48,6 +51,7 @@ object SparkRedshiftBuild extends Build { testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value), testSparkAvroVersion := sys.props.get("sparkAvro.testVersion").getOrElse("2.0.1"), testHadoopVersion := sys.props.get("hadoop.testVersion").getOrElse("2.2.0"), + testAWSJavaSDKVersion := sys.props.get("aws.testVersion").getOrElse("1.10.22"), spName := "databricks/spark-redshift", sparkComponents ++= Seq("sql", "hive"), spIgnoreProvided := true, @@ -60,19 +64,6 @@ object SparkRedshiftBuild extends Build { libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % "1.7.5", "com.eclipsesource.minimal-json" % "minimal-json" % "0.9.4", - // These Amazon SDK depdencies are marked as 'provided' in order to reduce the risk of - // dependency conflicts with other user libraries. In many environments, such as EMR and - // Databricks, the Amazon SDK will already be on the classpath. In other cases, the SDK is - // likely to be provided via a dependency on the S3NativeFileSystem. If this was not marked - // as provided, then we would have to worry about the SDK's own dependencies evicting - // earlier versions of those dependencies that are required by the end user's own code. - // There's a trade-off here and we've chosen to err on the side of minimizing dependency - // conflicts for a majority of users while adding a minor inconvienece (adding one extra - // depenendecy by hand) for a smaller set of users. - // We exclude jackson-databind to avoid a conflict with Spark's version (see #104). - "com.amazonaws" % "aws-java-sdk-core" % "1.10.22" % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), - "com.amazonaws" % "aws-java-sdk-s3" % "1.10.22" % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), - "com.amazonaws" % "aws-java-sdk-sts" % "1.10.22" % "test" exclude("com.fasterxml.jackson.core", "jackson-databind"), // We require spark-avro, but avro-mapred must be provided to match Hadoop version. // In most cases, avro-mapred will be provided as part of the Spark assembly JAR. "com.databricks" %% "spark-avro" % "2.0.1", @@ -92,6 +83,25 @@ object SparkRedshiftBuild extends Build { "org.scalatest" %% "scalatest" % "2.2.1" % "test", "org.mockito" % "mockito-core" % "1.10.19" % "test" ), + libraryDependencies ++= (if (new ComparableVersion(testAWSJavaSDKVersion.value) < new ComparableVersion("1.8.10")) { + // These Amazon SDK depdencies are marked as 'provided' in order to reduce the risk of + // dependency conflicts with other user libraries. In many environments, such as EMR and + // Databricks, the Amazon SDK will already be on the classpath. In other cases, the SDK is + // likely to be provided via a dependency on the S3NativeFileSystem. If this was not marked + // as provided, then we would have to worry about the SDK's own dependencies evicting + // earlier versions of those dependencies that are required by the end user's own code. + // There's a trade-off here and we've chosen to err on the side of minimizing dependency + // conflicts for a majority of users while adding a minor inconvienece (adding one extra + // depenendecy by hand) for a smaller set of users. + // We exclude jackson-databind to avoid a conflict with Spark's version (see #104). + Seq("com.amazonaws" % "aws-java-sdk" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind")) + } else { + Seq( + "com.amazonaws" % "aws-java-sdk-core" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), + "com.amazonaws" % "aws-java-sdk-s3" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), + "com.amazonaws" % "aws-java-sdk-sts" % testAWSJavaSDKVersion.value % "test" exclude("com.fasterxml.jackson.core", "jackson-databind") + ) + }), libraryDependencies ++= (if (testHadoopVersion.value.startsWith("1")) { Seq( "org.apache.hadoop" % "hadoop-client" % testHadoopVersion.value % "test" force(), diff --git a/project/plugins.sbt b/project/plugins.sbt index 8f74621a..fc849f20 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -18,3 +18,6 @@ addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0") addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") + +// Cannot use 3.3.9 because we need to be Java-6 compatible for the Spark 1.4.x tests +libraryDependencies += "org.apache.maven" % "maven-artifact" % "3.2.5" diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala index cf7ede77..06d22c05 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala @@ -23,7 +23,7 @@ import java.net.URI import scala.collection.JavaConverters._ import com.amazonaws.auth.AWSCredentials -import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3URI} +import com.amazonaws.services.s3.AmazonS3Client import com.eclipsesource.json.Json import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources._ @@ -127,7 +127,7 @@ private[redshift] case class RedshiftRelation( val filesToRead: Seq[String] = { val cleanedTempDirUri = Utils.fixS3Url(Utils.removeCredentialsFromURI(URI.create(tempDir)).toString) - val s3URI = new AmazonS3URI(cleanedTempDirUri) + val s3URI = Utils.createS3URI(cleanedTempDirUri) val s3Client = s3ClientFactory(creds) val is = s3Client.getObject(s3URI.getBucket, s3URI.getKey + "manifest").getObjectContent val s3Files = try { diff --git a/src/main/scala/com/databricks/spark/redshift/Utils.scala b/src/main/scala/com/databricks/spark/redshift/Utils.scala index 0414d918..52770351 100644 --- a/src/main/scala/com/databricks/spark/redshift/Utils.scala +++ b/src/main/scala/com/databricks/spark/redshift/Utils.scala @@ -59,6 +59,38 @@ private[redshift] object Utils { url.replaceAll("s3[an]://", "s3://") } + /** + * Factory method to create new S3URI in order to handle various library incompatibilities with + * older AWS Java Libraries + */ + def createS3URI(url: String): AmazonS3URI = { + try { + // try to instantiate AmazonS3URI with url + new AmazonS3URI(url) + } catch { + case e: IllegalArgumentException if e.getMessage. + startsWith("Invalid S3 URI: hostname does not appear to be a valid S3 endpoint") => { + new AmazonS3URI(addEndpointToUrl(url)) + } + } + } + + /** + * Since older AWS Java Libraries do not handle S3 urls that have just the bucket name + * as the host, add the endpoint to the host + */ + def addEndpointToUrl(url: String, domain: String = "s3.amazonaws.com"): String = { + val uri = new URI(url) + val hostWithEndpoint = uri.getHost + "." + domain + new URI(uri.getScheme, + uri.getUserInfo, + hostWithEndpoint, + uri.getPort, + uri.getPath, + uri.getQuery, + uri.getFragment).toString + } + /** * Returns a copy of the given URI with the user credentials removed. */ @@ -87,7 +119,7 @@ private[redshift] object Utils { tempDir: String, s3Client: AmazonS3Client): Unit = { try { - val s3URI = new AmazonS3URI(Utils.fixS3Url(tempDir)) + val s3URI = createS3URI(Utils.fixS3Url(tempDir)) val bucket = s3URI.getBucket assert(bucket != null, "Could not get bucket from S3 URI") val key = Option(s3URI.getKey).getOrElse("") diff --git a/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala b/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala index d83e8b2f..f9043a52 100644 --- a/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala @@ -44,6 +44,11 @@ class UtilsSuite extends FunSuite with Matchers { Utils.fixS3Url("s3n://foo/bar/baz") shouldBe "s3://foo/bar/baz" } + test("addEndpointToUrl produces urls with endpoints added to host") { + Utils.addEndpointToUrl("s3a://foo/bar/12345") shouldBe "s3a://foo.s3.amazonaws.com/bar/12345" + Utils.addEndpointToUrl("s3n://foo/bar/baz") shouldBe "s3n://foo.s3.amazonaws.com/bar/baz" + } + test("temp paths are random subdirectories of root") { val root = "s3n://temp/" val firstTempPath = Utils.makeTempPath(root) From 34d03e99352fe9fcab83890251187d125987858d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 21 Aug 2016 14:59:45 -0700 Subject: [PATCH 05/11] README and version update for 1.1.0 release. --- README.md | 4 ++-- version.sbt | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index f78033c1..3b47295a 100644 --- a/README.md +++ b/README.md @@ -30,14 +30,14 @@ You may use this library in your applications with the following dependency info ``` groupId: com.databricks artifactId: spark-redshift_2.10 -version: 1.0.0 +version: 1.1.0 ``` **Scala 2.11** ``` groupId: com.databricks artifactId: spark-redshift_2.11 -version: 1.0.0 +version: 1.1.0 ``` You will also need to provide a JDBC driver that is compatible with Redshift. Amazon recommend that you use [their driver](http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html), which is distributed as a JAR that is hosted on Amazon's website. This library has also been successfully tested using the Postgres JDBC driver. diff --git a/version.sbt b/version.sbt index e777cd30..bd727575 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.0.1-SNAPSHOT" \ No newline at end of file +version in ThisBuild := "1.1.0-SNAPSHOT" From 33fa626a739ee81963b86a8334e45aa4ced453f4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 21 Aug 2016 15:07:04 -0700 Subject: [PATCH 06/11] Setting version to 1.1.0 --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index bd727575..a9d8ecaf 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.0-SNAPSHOT" +version in ThisBuild := "1.1.0" \ No newline at end of file From 6e4ecd41305e9df2e088d522c03a9ae55b128fd8 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 21 Aug 2016 15:08:28 -0700 Subject: [PATCH 07/11] Setting version to 1.1.1-SNAPSHOT --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index a9d8ecaf..d58ae8af 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.0" \ No newline at end of file +version in ThisBuild := "1.1.1-SNAPSHOT" \ No newline at end of file From ad2498da7f599b8f6035a9c490e4b9a610014dee Mon Sep 17 00:00:00 2001 From: "Francis T. O'Donovan" Date: Thu, 8 Sep 2016 15:13:29 -0400 Subject: [PATCH 08/11] Add missing backslash to README example (#264) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3b47295a..9ce6cd78 100644 --- a/README.md +++ b/README.md @@ -138,7 +138,7 @@ df.write \ .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \ .option("dbtable", "my_table_copy") \ .option("tempdir", "s3n://path/for/temp/data") \ - .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role") + .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role") \ .mode("error") \ .save() ``` From 0abd27dc9519d42fcdffb279982860b783800ba2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 21 Sep 2016 14:54:52 -0700 Subject: [PATCH 09/11] Fix reading of NaN and Infinity This patch fixes a bug which caused `spark-redshift` to throw `NumberFormatException` when reading NaN or Infinity from Redshift. This patch fixes the bug by adding special-case handling of the string constants `nan`, `inf`, and `-inf`, which are the values sent back by Redshift during unloads. Note that we still do not support loads of `NaN` to Redshift since Redshift itself does not seem to support this yet (https://forums.aws.amazon.com/thread.jspa?threadID=236367). Fixes #261. Author: Josh Rosen Closes #269 from JoshRosen/fix-nan. --- .../redshift/RedshiftIntegrationSuite.scala | 49 +++++++++++++++++++ .../spark/redshift/Conversions.scala | 22 ++++++--- .../spark/redshift/ConversionsSuite.scala | 16 +++++- 3 files changed, 80 insertions(+), 7 deletions(-) diff --git a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala index b48c8408..72b5b877 100644 --- a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala +++ b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala @@ -666,4 +666,53 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase { StructType(StructField("ts", TimestampType) :: Nil)) ) } + + test("read special float values (regression test for #261)") { + val tableName = s"roundtrip_special_float_values_$randomSuffix" + try { + conn.createStatement().executeUpdate( + s"CREATE TABLE $tableName (x real)") + conn.createStatement().executeUpdate( + s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')") + conn.commit() + assert(DefaultJDBCWrapper.tableExists(conn, tableName)) + val loadedDf = sqlContext.read + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", tableName) + .option("tempdir", tempDir) + .load() + // Due to #98, we use Double here instead of float: + checkAnswer( + loadedDf, + Seq(Double.NaN, Double.PositiveInfinity, Double.NegativeInfinity).map(x => Row.apply(x))) + } finally { + conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() + conn.commit() + } + } + + test("read special double values (regression test for #261)") { + val tableName = s"roundtrip_special_double_values_$randomSuffix" + try { + conn.createStatement().executeUpdate( + s"CREATE TABLE $tableName (x double precision)") + conn.createStatement().executeUpdate( + s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')") + conn.commit() + assert(DefaultJDBCWrapper.tableExists(conn, tableName)) + val loadedDf = sqlContext.read + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", tableName) + .option("tempdir", tempDir) + .load() + checkAnswer( + loadedDf, + Seq(Double.NaN, Double.PositiveInfinity, Double.NegativeInfinity).map(x => Row.apply(x))) + } finally { + conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() + conn.commit() + } + } } diff --git a/src/main/scala/com/databricks/spark/redshift/Conversions.scala b/src/main/scala/com/databricks/spark/redshift/Conversions.scala index b6b7d5a9..d162ca1a 100644 --- a/src/main/scala/com/databricks/spark/redshift/Conversions.scala +++ b/src/main/scala/com/databricks/spark/redshift/Conversions.scala @@ -81,16 +81,26 @@ private[redshift] object Conversions { val decimalFormat = createRedshiftDecimalFormat() val conversionFunctions: Array[String => Any] = schema.fields.map { field => field.dataType match { - case ByteType => (data: String) => data.toByte + case ByteType => (data: String) => java.lang.Byte.parseByte(data) case BooleanType => (data: String) => parseBoolean(data) case DateType => (data: String) => new java.sql.Date(dateFormat.parse(data).getTime) - case DoubleType => (data: String) => data.toDouble - case FloatType => (data: String) => data.toFloat + case DoubleType => (data: String) => data match { + case "nan" => Double.NaN + case "inf" => Double.PositiveInfinity + case "-inf" => Double.NegativeInfinity + case _ => java.lang.Double.parseDouble(data) + } + case FloatType => (data: String) => data match { + case "nan" => Float.NaN + case "inf" => Float.PositiveInfinity + case "-inf" => Float.NegativeInfinity + case _ => java.lang.Float.parseFloat(data) + } case dt: DecimalType => (data: String) => decimalFormat.parse(data).asInstanceOf[java.math.BigDecimal] - case IntegerType => (data: String) => data.toInt - case LongType => (data: String) => data.toLong - case ShortType => (data: String) => data.toShort + case IntegerType => (data: String) => java.lang.Integer.parseInt(data) + case LongType => (data: String) => java.lang.Long.parseLong(data) + case ShortType => (data: String) => java.lang.Short.parseShort(data) case StringType => (data: String) => data case TimestampType => (data: String) => Timestamp.valueOf(data) case _ => (data: String) => data diff --git a/src/test/scala/com/databricks/spark/redshift/ConversionsSuite.scala b/src/test/scala/com/databricks/spark/redshift/ConversionsSuite.scala index 8f0b508f..722c9c8e 100644 --- a/src/test/scala/com/databricks/spark/redshift/ConversionsSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/ConversionsSuite.scala @@ -21,7 +21,7 @@ import java.sql.Timestamp import org.scalatest.FunSuite import org.apache.spark.sql.Row -import org.apache.spark.sql.types.{TimestampType, StructField, BooleanType, StructType} +import org.apache.spark.sql.types._ /** * Unit test for data type conversions @@ -87,4 +87,18 @@ class ConversionsSuite extends FunSuite { } } } + + test("Row conversion properly handles NaN and Inf float values (regression test for #261)") { + val convertRow = Conversions.createRowConverter(StructType(Seq(StructField("a", FloatType)))) + assert(java.lang.Float.isNaN(convertRow(Array("nan")).getFloat(0))) + assert(convertRow(Array("inf")) === Row(Float.PositiveInfinity)) + assert(convertRow(Array("-inf")) === Row(Float.NegativeInfinity)) + } + + test("Row conversion properly handles NaN and Inf double values (regression test for #261)") { + val convertRow = Conversions.createRowConverter(StructType(Seq(StructField("a", DoubleType)))) + assert(java.lang.Double.isNaN(convertRow(Array("nan")).getDouble(0))) + assert(convertRow(Array("inf")) === Row(Double.PositiveInfinity)) + assert(convertRow(Array("-inf")) === Row(Double.NegativeInfinity)) + } } From 9f12f3f4ed8f8e209c8ea218a5b27c92ffe624a9 Mon Sep 17 00:00:00 2001 From: Ganesh Chand Date: Mon, 17 Oct 2016 17:51:34 -0700 Subject: [PATCH 10/11] Updated README.md with sample code to read a Redshift table in SparkR Added sample code for reading a Redshift table with SparkR. Author: Ganesh Chand Author: Josh Rosen Closes #282 from ganeshchand/patch-1. --- README.md | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 9ce6cd78..263770dc 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ This library is more suited to ETL than interactive queries, since large amounts - [Installation](#installation) - Usage: - - Data sources API: [Scala](#scala), [Python](#python), [SQL](#sql) + - Data sources API: [Scala](#scala), [Python](#python), [SQL](#sql), [R](#r) - [Hadoop InputFormat](#hadoop-inputformat) - [Configuration](#configuration) - [AWS Credentials](#aws-credentials) @@ -173,6 +173,20 @@ AS SELECT * FROM tabletosave; Note that the SQL API only supports the creation of new tables and not overwriting or appending; this corresponds to the default save mode of the other language APIs. +#### R + +Reading data using R: + +```R +df <- read.df( + sqlContext, + NULL, + "com.databricks.spark.redshift", + tempdir = "s3n://path/for/temp/data", + dbtable = "my_table", + url = "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") +``` + ### Hadoop InputFormat The library contains a Hadoop input format for Redshift tables unloaded with the ESCAPE option, From 647f678e337688ce84a371de56e9ec63f62ca033 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 16 Nov 2016 14:26:45 -0800 Subject: [PATCH 11/11] Wrap and re-throw Await.result exceptions in order to capture full stacktrace Exceptions thrown from Scala's `Await.result` don't include the waiting thread's stacktrace, making it hard to figure out where errors occur. Similar to the fix implemented in Spark in https://github.com/apache/spark/pull/12433, this patch modifies our `Await.result` usages to wrap and rethrow exceptions to capture the calling thread's stack. Author: Josh Rosen Closes #299 from JoshRosen/better-error-reporting. (cherry picked from commit b4c6053145b4c80447b7bff7b5a07fcda41ec7f0) Signed-off-by: Josh Rosen --- .../spark/redshift/IAMIntegrationSuite.scala | 2 +- .../spark/redshift/RedshiftJDBCWrapper.scala | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/it/scala/com/databricks/spark/redshift/IAMIntegrationSuite.scala b/src/it/scala/com/databricks/spark/redshift/IAMIntegrationSuite.scala index f48f8b75..98b531ba 100644 --- a/src/it/scala/com/databricks/spark/redshift/IAMIntegrationSuite.scala +++ b/src/it/scala/com/databricks/spark/redshift/IAMIntegrationSuite.scala @@ -74,7 +74,7 @@ class IAMIntegrationSuite extends IntegrationSuiteBase { .mode(SaveMode.ErrorIfExists) .save() } - assert(err.getMessage.contains("is not authorized to assume IAM Role")) + assert(err.getCause.getMessage.contains("is not authorized to assume IAM Role")) } finally { conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() conn.commit() diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala index b3fe0c30..b9178219 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._ import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.util.Try +import scala.util.control.NonFatal import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.types._ @@ -145,7 +146,16 @@ private[redshift] class JDBCWrapper { op: PreparedStatement => T): T = { try { val future = Future[T](op(statement))(ec) - Await.result(future, Duration.Inf) + try { + Await.result(future, Duration.Inf) + } catch { + case e: SQLException => + // Wrap and re-throw so that this thread's stacktrace appears to the user. + throw new SQLException("Exception thrown in awaitResult: ", e) + case NonFatal(t) => + // Wrap and re-throw so that this thread's stacktrace appears to the user. + throw new Exception("Exception thrown in awaitResult: ", t) + } } catch { case e: InterruptedException => try {