Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified release/spark-cdm-assembly-0.3.jar
Binary file not shown.
6 changes: 4 additions & 2 deletions src/main/scala/com/microsoft/cdm/utils/Constants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import java.math.MathContext
object Constants {

// TODO: ensure these match the data provided
val DATE_FORMATS = Array("MM/dd/yyyy", "MM/dd/yyyy hh:mm:ss a")
val OUTPUT_FORMAT = "MM/dd/yyyy hh:mm:ss a"
// val DATE_FORMATS = Array("MM/dd/yyyy", "MM/dd/yyyy hh:mm:ss a")
// val OUTPUT_FORMAT = "MM/dd/yyyy hh:mm:ss a"

val DECIMAL_PRECISION = 28
val MATH_CONTEXT = new MathContext(28)

val SINGLE_DATE_FORMAT = "MM/dd/yyyy"
val TIMESTAMP_FORMAT = "MM/dd/yyyy hh:mm:ss a"

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ object CsvParserFactory {
val format = settings.getFormat
format.setDelimiter(',')
settings.setMaxCharsPerColumn(500000)
settings.setMaxColumns(512 * 4)
new CsvParser(settings)
}

Expand Down
25 changes: 19 additions & 6 deletions src/main/scala/com/microsoft/cdm/utils/DataConverter.scala
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
package com.microsoft.cdm.utils

import java.text.SimpleDateFormat
import java.util.{Locale, TimeZone}

import org.apache.commons.lang.time.DateUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
* Converts between CSV/CDM data and Spark data tpyes.
* @param dateFormats Expected string date formats.
* @param outputDateFormat Output date format.
*/
class DataConverter(val dateFormats: Array[String] = Constants.DATE_FORMATS,
val outputDateFormat: String = Constants.OUTPUT_FORMAT) extends Serializable {
class DataConverter(val dateFormats: String = Constants.TIMESTAMP_FORMAT,
val outputDateFormat: String = Constants.TIMESTAMP_FORMAT) extends Serializable {

val dateFormatter = new SimpleDateFormat(outputDateFormat)

private val timestampFormatter = TimestampFormatter(Constants.SINGLE_DATE_FORMAT, TimeZone.getTimeZone("UTC"))
private val inputDateFormatter = DateFormatter(Constants.SINGLE_DATE_FORMAT)

var timeformat = new SimpleDateFormat(Constants.TIMESTAMP_FORMAT, Locale.US)
var dateformat = new SimpleDateFormat(Constants.SINGLE_DATE_FORMAT)

val toSparkType: Map[CDMDataType.Value, DataType] = Map(
CDMDataType.int64 -> LongType,
CDMDataType.dateTime -> DateType,
CDMDataType.string -> StringType,
CDMDataType.double -> DoubleType,
CDMDataType.decimal -> DecimalType(Constants.DECIMAL_PRECISION,0),
CDMDataType.boolean -> BooleanType,
CDMDataType.dateTimeOffset -> DateType
CDMDataType.dateTimeOffset -> TimestampType
)

val toCdmType: Map[DataType, CDMDataType.Value] = Map(
Expand All @@ -31,16 +40,20 @@ class DataConverter(val dateFormats: Array[String] = Constants.DATE_FORMATS,
StringType -> CDMDataType.string,
DoubleType -> CDMDataType.double,
DecimalType(Constants.DECIMAL_PRECISION,0) -> CDMDataType.decimal,
BooleanType -> CDMDataType.boolean
BooleanType -> CDMDataType.boolean,
TimestampType -> CDMDataType.dateTimeOffset
)

val jsonToData: Map[DataType, String => Any] = Map(
LongType -> (x => x.toLong),
StringType -> (x => x),
StringType -> (x => UTF8String.fromString(x)),
DoubleType -> (x => x.toDouble),
DecimalType(Constants.DECIMAL_PRECISION,0) -> (x => BigDecimal(x, Constants.MATH_CONTEXT)),
BooleanType -> (x => x.toBoolean),
DateType -> (x => new java.sql.Date(DateUtils.parseDate(x, dateFormats).getTime))
// DateType -> (x => inputDateFormatter.parse(x)),
// TimestampType -> (x => timestampFormatter.parse(x))
DateType -> (x => inputDateFormatter.parse(dateformat.format(timeformat.parse(x)))),
TimestampType -> (x => timestampFormatter.parse(dateformat.format(timeformat.parse(x))))
)

def dataToString(data: Any, dataType: DataType): String = {
Expand Down