Skip to content

Commit

Permalink
Support ks3 in broker load (StarRocks#866)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Oct 25, 2021
1 parent 876fa17 commit 9cfa940
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
2 changes: 1 addition & 1 deletion fs_brokers/apache_hdfs_broker/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ install -d ${BROKER_OUTPUT}/bin ${BROKER_OUTPUT}/conf \
${BROKER_OUTPUT}lib/

# download aliyun hadoop oss and tencent hadoop cos jar, this can not be found in maven repository, so we download it manually
wget http://dorisdb-thirdparty.oss-cn-zhangjiakou.aliyuncs.com/broker_thirdparty_jars.tar.gz
wget http://cdn-thirdparty.starrocks.com/broker_thirdparty_jars.tar.gz
tar xzf broker_thirdparty_jars.tar.gz
mv broker_thirdparty_jars/*.jar ${BROKER_OUTPUT}/lib/
rm -r broker_thirdparty_jars broker_thirdparty_jars.tar.gz
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class FileSystemManager {
private static final String S3A_SCHEME = "s3a";
private static final String OSS_SCHEME = "oss";
private static final String COS_SCHEME = "cosn";
private static final String KS3_SCHEME = "ks3";

private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
Expand Down Expand Up @@ -104,6 +105,14 @@ public class FileSystemManager {
// This property is used like 'fs.hdfs.impl.disable.cache'
private static final String FS_S3A_IMPL_DISABLE_CACHE = "fs.s3a.impl.disable.cache";

// arguments for ks3
private static final String FS_KS3_ACCESS_KEY = "fs.ks3.AccessKey";
private static final String FS_KS3_SECRET_KEY = "fs.ks3.AccessSecret";
private static final String FS_KS3_ENDPOINT = "fs.ks3.endpoint";
private static final String FS_KS3_IMPL = "fs.ks3.impl";
// This property is used like 'fs.ks3.impl.disable.cache'
private static final String FS_KS3_IMPL_DISABLE_CACHE = "fs.ks3.impl.disable.cache";

//arguments for oss
private static final String FS_OSS_ACCESS_KEY = "fs.oss.accessKeyId";
private static final String FS_OSS_SECRET_KEY = "fs.oss.accessKeySecret";
Expand Down Expand Up @@ -179,6 +188,8 @@ public BrokerFileSystem getFileSystem(String path, Map<String, String> propertie
brokerFileSystem = getOSSFileSystem(path, properties);
} else if (scheme.equals(COS_SCHEME)) {
brokerFileSystem = getCOSFileSystem(path, properties);
} else if (scheme.equals(KS3_SCHEME)) {
brokerFileSystem = getKS3FileSystem(path, properties);
} else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
Expand Down Expand Up @@ -443,6 +454,62 @@ public BrokerFileSystem getS3AFileSystem(String path, Map<String, String> proper
fileSystem.getLock().unlock();
}
}
/**
* visible for test
* <p>
* file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
*
* @param path
* @param properties
* @return
* @throws URISyntaxException
* @throws Exception
*/
public BrokerFileSystem getKS3FileSystem(String path, Map<String, String> properties) {
WildcardURI pathUri = new WildcardURI(path);
String accessKey = properties.getOrDefault(FS_KS3_ACCESS_KEY, "");
String secretKey = properties.getOrDefault(FS_KS3_SECRET_KEY, "");
String endpoint = properties.getOrDefault(FS_KS3_ENDPOINT, "");
String disableCache = properties.getOrDefault(FS_KS3_IMPL_DISABLE_CACHE, "true");
// endpoint is the server host, pathUri.getUri().getHost() is the bucket
// we should use these two params as the host identity, because FileSystem will cache both.
String host = KS3_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
String ks3aUgi = accessKey + "," + secretKey;
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ks3aUgi);
BrokerFileSystem fileSystem = null;
cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
fileSystem = cachedFileSystem.get(fileSystemIdentity);
if (fileSystem == null) {
// it means it is removed concurrently by checker thread
return null;
}
fileSystem.getLock().lock();
try {
if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
// this means the file system is closed by file system checker thread
// it is a corner case
return null;
}
if (fileSystem.getDFSFileSystem() == null) {
logger.info("could not find file system for path " + path + " create a new one");
// create a new filesystem
Configuration conf = new Configuration();
conf.set(FS_KS3_ACCESS_KEY, accessKey);
conf.set(FS_KS3_SECRET_KEY, secretKey);
conf.set(FS_KS3_ENDPOINT, endpoint);
conf.set(FS_KS3_IMPL, "com.ksyun.kmr.hadoop.fs.ks3.Ks3FileSystem");
conf.set(FS_KS3_IMPL_DISABLE_CACHE, disableCache);
FileSystem ks3FileSystem = FileSystem.get(pathUri.getUri(), conf);
fileSystem.setFileSystem(ks3FileSystem);
}
return fileSystem;
} catch (Exception e) {
logger.error("errors while connect to " + path, e);
throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
fileSystem.getLock().unlock();
}
}

/**
* visible for test
Expand Down

0 comments on commit 9cfa940

Please sign in to comment.