Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Migrate class and pom.xml to support Spark 3.0.0-preview2 #66

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ sudo: required
matrix:
include:
- os: linux
jdk: oraclejdk8
jdk: oraclejdk9
- os: osx
osx_image: xcode8
osx_image: xcode9

script:
- mvn clean package
23 changes: 12 additions & 11 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
<version>2.12.11</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand All @@ -32,34 +32,34 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.2</version>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0-preview2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.1</version>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0-preview2</version>
</dependency>
<dependency>
<groupId>org.scalactic</groupId>
<artifactId>scalactic_2.11</artifactId>
<artifactId>scalactic_2.12</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<artifactId>scalatest_2.12</artifactId>
<version>3.0.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>adal4j</artifactId>
<version>1.2.0</version>
<version>1.6.5</version>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>6.4.0.jre8</version>
<version>8.3.0.jre14-preview</version>
</dependency>
</dependencies>
<developers>
Expand Down Expand Up @@ -135,6 +135,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.0.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -194,10 +195,10 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<version>4.3.1</version>
<configuration>
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
<scalaVersion>2.11.8</scalaVersion>
<scalaVersion>2.12.11</scalaVersion>
<args>
<arg>-deprecation</arg>
<arg>-feature</arg>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ColumnMetadata implements Serializable {
private int columnType;
private int precision;
private int scale;
private DateTimeFormatter dateTimeFormatter;
public DateTimeFormatter dateTimeFormatter;

ColumnMetadata(String name, int type, int precision, int scale, DateTimeFormatter dateTimeFormatter) {
this.columnName = name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,38 @@ public class SQLServerBulkDataFrameFileRecord implements ISQLServerBulkRecord, j

private Map<Integer, ColumnMetadata> columnMetadata;

/*
* Logger
*/
protected String loggerPackageName = "com.microsoft.jdbc.SQLServerBulkRecord";
protected static java.util.logging.Logger loggerExternal = java.util.logging.Logger
.getLogger("com.microsoft.jdbc.SQLServerBulkRecord");

/*
* Contains the format that java.sql.Types.TIMESTAMP_WITH_TIMEZONE data should be read in as.
*/
protected DateTimeFormatter dateTimeFormatter = null;

/*
* Contains the format that java.sql.Types.TIME_WITH_TIMEZONE data should be read in as.
*/
protected DateTimeFormatter timeFormatter = null;

void addColumnMetadataInternal(int positionInSource, String name, int jdbcType, int precision, int scale,
DateTimeFormatter dateTimeFormatter) throws SQLServerException {}

@Override
public void addColumnMetadata(int positionInSource, String name, int jdbcType, int precision, int scale,
DateTimeFormatter dateTimeFormatter) throws SQLServerException {
addColumnMetadataInternal(positionInSource, name, jdbcType, precision, scale, dateTimeFormatter);
}

@Override
public void addColumnMetadata(int positionInSource, String name, int jdbcType, int precision,
int scale) throws SQLServerException {
addColumnMetadataInternal(positionInSource, name, jdbcType, precision, scale, null);
}

public SQLServerBulkDataFrameFileRecord(Iterator<Row> iterator, BulkCopyMetadata metadata) {
this.iterator = iterator;
this.columnMetadata = metadata.getMetadata();
Expand All @@ -54,6 +86,11 @@ public DateTimeFormatter getDateTimeFormatter(int column) {
return columnMetadata.get(column).getDateTimeFormatter();
}

@Override
public DateTimeFormatter getColumnDateTimeFormatter(int column) {
return columnMetadata.get(column).dateTimeFormatter;
}

@Override
public void close() throws SQLServerException {
// nothing to close
Expand Down Expand Up @@ -153,6 +190,40 @@ public boolean next() throws SQLServerException {
return iterator.hasNext();
}

@Override
public void setTimestampWithTimezoneFormat(String dateTimeFormat) {
loggerExternal.entering(loggerPackageName, "setTimestampWithTimezoneFormat", dateTimeFormat);
this.dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat);
loggerExternal.exiting(loggerPackageName, "setTimestampWithTimezoneFormat");
}


@Override
public void setTimestampWithTimezoneFormat(DateTimeFormatter dateTimeFormatter) {
if (loggerExternal.isLoggable(java.util.logging.Level.FINER)) {
loggerExternal.entering(loggerPackageName, "setTimestampWithTimezoneFormat",
new Object[] {dateTimeFormatter});
}
this.dateTimeFormatter = dateTimeFormatter;
loggerExternal.exiting(loggerPackageName, "setTimestampWithTimezoneFormat");
}

@Override
public void setTimeWithTimezoneFormat(String timeFormat) {
loggerExternal.entering(loggerPackageName, "setTimeWithTimezoneFormat", timeFormat);
this.timeFormatter = DateTimeFormatter.ofPattern(timeFormat);
loggerExternal.exiting(loggerPackageName, "setTimeWithTimezoneFormat");
}

@Override
public void setTimeWithTimezoneFormat(DateTimeFormatter dateTimeFormatter) {
if (loggerExternal.isLoggable(java.util.logging.Level.FINER)) {
loggerExternal.entering(loggerPackageName, "setTimeWithTimezoneFormat", new Object[] {dateTimeFormatter});
}
this.timeFormatter = dateTimeFormatter;
loggerExternal.exiting(loggerPackageName, "setTimeWithTimezoneFormat");
}

private String getSQLServerExceptionErrorMsg(String type) {
return SQLServerResource.getBundle("com.microsoft.sqlserver.jdbc.SQLServerResource").getString(type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[spark] case class DataFrameFunctions[T](@transient dataFrame: DataFrame)
} else {
metadata
}
dataFrame.foreachPartition(iterator => bulkCopy(config, iterator, actualMetadata))
dataFrame.rdd.foreachPartition(iterator => bulkCopy(config, iterator, actualMetadata))
}

private def getConnectionOrFail(config:Config):Try[Connection] = {
Expand Down