Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,30 @@ df.write
.save()
```

### Setting a custom column type

If you need to manually set a column type, you can use the `redshift_type` column metadata. For example, if you desire to override
the `Spark SQL Schema -> Redshift SQL` type matcher to assign a user-defined column type, you can do the following:

```scala
import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
```

### Configuring column encoding

When creating a table, this library can be configured to use a specific compression encoding on individual columns. You can use the `encoding` column metadata field to specify a compression encoding for each column (see [Amazon docs](http://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html) for available encodings).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,26 +241,31 @@ private[redshift] class JDBCWrapper {
val sb = new StringBuilder()
schema.fields.foreach { field => {
val name = field.name
val typ: String = field.dataType match {
case IntegerType => "INTEGER"
case LongType => "BIGINT"
case DoubleType => "DOUBLE PRECISION"
case FloatType => "REAL"
case ShortType => "INTEGER"
case ByteType => "SMALLINT" // Redshift does not support the BYTE type.
case BooleanType => "BOOLEAN"
case StringType =>
if (field.metadata.contains("maxlength")) {
s"VARCHAR(${field.metadata.getLong("maxlength")})"
} else {
"TEXT"
}
case BinaryType => "BLOB"
case TimestampType => "TIMESTAMP"
case DateType => "DATE"
case t: DecimalType => s"DECIMAL(${t.precision},${t.scale})"
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
val typ: String = if (field.metadata.contains("redshift_type")) {
field.metadata.getString("redshift_type")
} else {
field.dataType match {
case IntegerType => "INTEGER"
case LongType => "BIGINT"
case DoubleType => "DOUBLE PRECISION"
case FloatType => "REAL"
case ShortType => "INTEGER"
case ByteType => "SMALLINT" // Redshift does not support the BYTE type.
case BooleanType => "BOOLEAN"
case StringType =>
if (field.metadata.contains("maxlength")) {
s"VARCHAR(${field.metadata.getLong("maxlength")})"
} else {
"TEXT"
}
case BinaryType => "BLOB"
case TimestampType => "TIMESTAMP"
case DateType => "DATE"
case t: DecimalType => s"DECIMAL(${t.precision},${t.scale})"
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
}
}

val nullable = if (field.nullable) "" else "NOT NULL"
val encoding = if (field.metadata.contains("encoding")) {
s"ENCODE ${field.metadata.getString("encoding")}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,25 @@ class RedshiftSourceSuite
assert(commentCommands === expectedCommentCommands)
}

test("configuring redshift_type on columns") {
val bpcharMetadata = new MetadataBuilder().putString("redshift_type", "BPCHAR(2)").build()
val nvarcharMetadata = new MetadataBuilder().putString("redshift_type", "NVARCHAR(123)").build()

val schema = StructType(
StructField("bpchar_str", StringType, metadata = bpcharMetadata) ::
StructField("bpchar_str", StringType, metadata = nvarcharMetadata) ::
StructField("default_str", StringType) ::
Nil)

val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema)
val createTableCommand =
DefaultRedshiftWriter.createTableSql(df, MergedParameters.apply(defaultParams)).trim
val expectedCreateTableCommand =
"""CREATE TABLE IF NOT EXISTS "PUBLIC"."test_table" ("bpchar_str" BPCHAR(2),""" +
""" "bpchar_str" NVARCHAR(123), "default_str" TEXT)"""
assert(createTableCommand === expectedCreateTableCommand)
}

test("Respect SaveMode.ErrorIfExists when table exists") {
val mockRedshift = new MockRedshift(
defaultParams("url"),
Expand Down