Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add AliyunOSS support in FileWriter/FileReader #2356

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add AliyunOSS support in FileWriter/FileReader
  • Loading branch information
sperlingxx committed Mar 6, 2018
commit bc67699b85e574ae4af628eb37310a9f9a8dda0d
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
<maven-enforcer-plugin.version>1.3.1</maven-enforcer-plugin.version>

<hadoop.version>2.7.3</hadoop.version>
<hadoop.aliyun.version>3.0.0-alpha2</hadoop.aliyun.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the latest version is 3.0.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, ODPS are depended on 3.0.0-alpha2.

<guava.version>11.0.2</guava.version>
<thrift.path>thrift</thrift.path>
<thrift.version>0.9.2</thrift.version>
Expand Down Expand Up @@ -235,6 +236,12 @@
<version>${hadoop.version}</version>
<scope>${spark-scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compatible with Hadoop-2.7.2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

<version>${hadoop.aliyun.version}</version>
<scope>${spark-scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
Expand Down
23 changes: 23 additions & 0 deletions spark/dl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<scope>${spark-scope}</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ object Engine {
logger.info(s"Executor number is $nExecutor and executor cores number is $executorCores")
setNodeAndCore(nExecutor, executorCores)
checkSparkContext
// set AliyunOSS configurations if exists
setAliyunOSSConfig()
}
}

Expand Down Expand Up @@ -525,4 +527,23 @@ object Engine {
throw new IllegalArgumentException(s"Engine.init: Unsupported master format $master")
}
}

/**
* Fetch AliyunOSS conf from sparkConf, if exists, put them into system.properties.
*/
private def setAliyunOSSConfig(): Unit = {
val sparkConf = SparkContext.getOrCreate().getConf
sparkConf.getOption("spark.hadoop.fs.oss.accessKeyId") match {
case Some(value) =>
System.setProperty("fs.oss.accessKeyId", value)
}
sparkConf.getOption("spark.hadoop.fs.oss.accessKeySecret") match {
case Some(value) =>
System.setProperty("fs.oss.accessKeySecret", value)
}
sparkConf.getOption("spark.hadoop.fs.oss.endpoint") match {
case Some(value) =>
System.setProperty("fs.oss.endpoint", value)
}
}
}
64 changes: 56 additions & 8 deletions spark/dl/src/main/scala/com/intel/analytics/bigdl/utils/File.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
import org.apache.hadoop.io.IOUtils

import scala.util.Try

object File {
private[bigdl] val hdfsPrefix: String = "hdfs:"
private[bigdl] val s3aPrefix: String = "s3a:"
private[bigdl] val ossPrefix: String = "oss:"

/**
* Load torch object from a torch binary file
Expand Down Expand Up @@ -54,12 +57,27 @@ object File {
}

/**
* Save scala object into a local/hdfs/s3 path
* Save scala object into a local/hdfs/s3/AliyunOSS path
*
* Notice: {{{
* S3 path should be like s3a://bucket/xxx.
*
* See (hadoop aws)[http://hadoop.apache.org/docs/r2.7.3/hadoop-aws/tools/hadoop-aws/index.html]
* for details, if you want to save model to s3.
*
* AliyunOSS path should be like oss://bucket/xxx.
*
* Notice: S3 path should be like s3a://bucket/xxx.
* If you want to save model to AliyunOSS, please set
* fs.oss.accessKeyId,
* fs.oss.accessKeySecret,
* fs.oss.endpoint
* into System.properity. Otherwise, append below configs to sparkConf:
* spark.hadoop.fs.oss.accessKeyId,
* spark.hadoop.fs.oss.accessKeySecret,
* spark.hadoop.fs.oss.endpoint.
*
* See (hadoop aws)[http://hadoop.apache.org/docs/r2.7.3/hadoop-aws/tools/hadoop-aws/index.html]
* for details, if you want to save model to s3.
* And add maven dependency: hadoop-aliyun:3.0.0-alpha2(aliyun-sdk-oss:2.2.1).
* }}}
*
* @param obj object to be saved.
* @param fileName local/hdfs output path.
Expand Down Expand Up @@ -106,6 +124,21 @@ object File {
private[bigdl] def getConfiguration(fileName: String): Configuration = {
if (fileName.startsWith(File.hdfsPrefix) || fileName.startsWith(s3aPrefix)) {
new Configuration()
} else if (fileName.startsWith(File.ossPrefix)) {
val accessKeyId = Try(System.getProperty("fs.oss.accessKeyId")).getOrElse(null)
require(accessKeyId != null, "Could not found accessKeyId in System.properties!")
val accessKeySecret = Try(System.getProperty("fs.oss.accessKeySecret")).getOrElse(null)
require(accessKeySecret != null, "Could not found accessKeySecret in System.properties!")
val endpoint = Try(System.getProperty("fs.oss.endpoint")).getOrElse(null)
require(endpoint != null, "Could not found endpoint in System.properties!")

val hadoopConf = new Configuration()
hadoopConf.set("fs.oss.connection.secure.enabled", "false")
hadoopConf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
hadoopConf.set("fs.oss.accessKeyId", accessKeyId)
hadoopConf.set("fs.oss.accessKeySecret", accessKeySecret)
hadoopConf.set("fs.oss.endpoint", endpoint)
hadoopConf
} else {
new Configuration(false)
}
Expand Down Expand Up @@ -164,12 +197,27 @@ object File {
}

/**
* Load a scala object from a local/hdfs/s3 path.
* Load a scala object from a local/hdfs/s3/AliyunOSS path.
*
* Notice: {{{
* S3 path should be like s3a://bucket/xxx.
*
* See (hadoop aws)[http://hadoop.apache.org/docs/r2.7.3/hadoop-aws/tools/hadoop-aws/index.html]
* for details, if you want to load model from s3.
*
* AliyunOSS path should be like oss://bucket/xxx.
*
* Notice: S3 path should be like s3a://bucket/xxx.
* If you want to load model from AliyunOSS, please set
* fs.oss.accessKeyId,
* fs.oss.accessKeySecret,
* fs.oss.endpoint
* into System.properity. Otherwise, append below configs to sparkConf:
* spark.hadoop.fs.oss.accessKeyId,
* spark.hadoop.fs.oss.accessKeySecret,
* spark.hadoop.fs.oss.endpoint.
*
* See (hadoop aws)[http://hadoop.apache.org/docs/r2.7.3/hadoop-aws/tools/hadoop-aws/index.html]
* for details, if you want to load model from s3.
* And add maven dependency: hadoop-aliyun:3.0.0-alpha2(aliyun-sdk-oss:2.2.1).
* }}}
*
* @param fileName file name.
*/
Expand Down