Skip to content

Commit

Permalink
[feature-wip](Cloud) Support Azure Cloud Blob in FE (#35990)
Browse files Browse the repository at this point in the history
As one subsequent pr of #35307, this pr tries to support Azure Cloud in
FE. After this pr, we should be able to use Azure Blob for Cold Heat
Separation and Storage Compute Separation along with backup restore to
Azure Cloud.

For the data lakes support, you can add corresponding sdks to FE and
implements the corresponding features.
  • Loading branch information
ByteYue authored Jun 24, 2024
1 parent 0db0b18 commit e11f516
Show file tree
Hide file tree
Showing 111 changed files with 955 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.AzureProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.loadv2.LoadTask;
Expand Down Expand Up @@ -104,7 +105,7 @@ public class LoadStmt extends DdlStmt {

// for S3 load check
public static final List<String> PROVIDERS =
new ArrayList<>(Arrays.asList("cos", "oss", "s3", "obs", "bos"));
new ArrayList<>(Arrays.asList("cos", "oss", "s3", "obs", "bos", "azure"));

// mini load params
public static final String KEY_IN_PARAM_COLUMNS = "columns";
Expand Down Expand Up @@ -612,6 +613,9 @@ public void checkWhiteList() throws UserException {
+ " is not in s3 load endpoint white list: " + String.join(",", whiteList));
}
brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint);
if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
return;
}
checkEndpoint(endpoint);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,12 @@ private void analyzeProperties() throws UserException {
processedPropKeys.add(PROP_SUCCESS_FILE_NAME);
}

// For Azure compatibility, this is temporarily added to the map without further processing.
// The validity of each provider's value will be checked later in S3Properties' check.
if (properties.containsKey(S3Properties.PROVIDER)) {
processedPropKeys.add(S3Properties.PROVIDER);
}

if (this.fileFormatType == TFileFormatType.FORMAT_PARQUET) {
getParquetProperties(processedPropKeys);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,17 @@ public static void checkUri(URI uri, StorageBackend.StorageType type) throws Ana
&& !schema.equalsIgnoreCase("cosn")
&& !schema.equalsIgnoreCase("gfs")
&& !schema.equalsIgnoreCase("jfs")
&& !schema.equalsIgnoreCase("azure")
&& !schema.equalsIgnoreCase("gs")) {
throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'viewfs://', 'afs://',"
+ " 'bos://', 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://', 'gfs://', 'gs://'"
+ " or 'jfs://' path.");
throw new AnalysisException(
"Invalid broker path " + uri.toString() + ". please use valid 'hdfs://', 'viewfs://', 'afs://',"
+ " 'bos://', 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://', 'gfs://', 'gs://'"
+ " or 'jfs://' path.");
}
} else if (type == StorageBackend.StorageType.S3 && !schema.equalsIgnoreCase("s3")) {
throw new AnalysisException("Invalid export path. please use valid 's3://' path.");
throw new AnalysisException("Invalid export path " + uri.toString() + ". please use valid 's3://' path.");
} else if (type == StorageBackend.StorageType.AZURE && !schema.equalsIgnoreCase("azure")) {
throw new AnalysisException("Invalid export path. please use valid 'azure://' path.");
} else if (type == StorageBackend.StorageType.HDFS && !schema.equalsIgnoreCase("hdfs")
&& !schema.equalsIgnoreCase("viewfs")) {
throw new AnalysisException("Invalid export path. please use valid 'HDFS://' or 'viewfs://' path.");
Expand Down Expand Up @@ -146,7 +150,8 @@ public enum StorageType {
OFS("Tencent CHDFS"),
GFS("Tencent Goose File System"),
JFS("Juicefs"),
STREAM("Stream load pipe");
STREAM("Stream load pipe"),
AZURE("MicroSoft Azure Blob");

private final String description;

Expand All @@ -171,6 +176,8 @@ public TStorageBackendType toThrift() {
return TStorageBackendType.JFS;
case LOCAL:
return TStorageBackendType.LOCAL;
case AZURE:
return TStorageBackendType.AZURE;
default:
return TStorageBackendType.BROKER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.task.DirMoveTask;
Expand Down Expand Up @@ -232,14 +233,22 @@ public void alterRepository(AlterRepositoryStmt stmt) throws DdlException {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository does not exist");
}

if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
if (repo.getRemoteFileSystem() instanceof S3FileSystem
|| repo.getRemoteFileSystem() instanceof AzureFileSystem) {
Map<String, String> oldProperties = new HashMap<>(stmt.getProperties());
Status status = repo.alterRepositoryS3Properties(oldProperties);
if (!status.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, status.getErrMsg());
}
RemoteFileSystem fileSystem = FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
StorageBackend.StorageType.S3, oldProperties);
RemoteFileSystem fileSystem = null;
if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
fileSystem = FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
StorageBackend.StorageType.S3, oldProperties);
} else if (repo.getRemoteFileSystem() instanceof AzureFileSystem) {
fileSystem = FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
StorageBackend.StorageType.AZURE, oldProperties);
}

Repository newRepo = new Repository(repo.getId(), repo.getName(), repo.isReadOnly(),
repo.getLocation(), fileSystem);
if (!newRepo.ping()) {
Expand All @@ -260,7 +269,7 @@ public void alterRepository(AlterRepositoryStmt stmt) throws DdlException {
}
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Only support alter s3 repository");
"Only support alter s3 or azure repository");
}
} finally {
seqlock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
Expand Down Expand Up @@ -226,7 +227,7 @@ public Status initRepository() {
// so that we can add regression tests about backup/restore.
//
// TODO: support hdfs/brokers
if (fileSystem instanceof S3FileSystem) {
if (fileSystem instanceof S3FileSystem || fileSystem instanceof AzureFileSystem) {
String deleteStaledSnapshots = fileSystem.getProperties()
.getOrDefault(CreateRepositoryStmt.PROP_DELETE_IF_EXISTS, "false");
if (deleteStaledSnapshots.equalsIgnoreCase("true")) {
Expand All @@ -235,7 +236,7 @@ public Status initRepository() {
String snapshotPrefix = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name));
LOG.info("property {} is set, delete snapshots with prefix: {}",
CreateRepositoryStmt.PROP_DELETE_IF_EXISTS, snapshotPrefix);
Status st = ((S3FileSystem) fileSystem).deleteDirectory(snapshotPrefix);
Status st = fileSystem.deleteDirectory(snapshotPrefix);
if (!st.ok()) {
return st;
}
Expand Down Expand Up @@ -557,7 +558,7 @@ public Status upload(String localFilePath, String remoteFilePath) {
if (!st.ok()) {
return st;
}
} else if (fileSystem instanceof S3FileSystem) {
} else if (fileSystem instanceof S3FileSystem || fileSystem instanceof AzureFileSystem) {
if (LOG.isDebugEnabled()) {
LOG.debug("get md5sum of file: {}. final remote path: {}", localFilePath, finalRemotePath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -102,7 +103,8 @@ public Status alterRepo(Repository newRepo, boolean isReplay) {
try {
Repository repo = repoNameMap.get(newRepo.getName());
if (repo != null) {
if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
if (repo.getRemoteFileSystem() instanceof S3FileSystem
|| repo.getRemoteFileSystem() instanceof AzureFileSystem) {
repoNameMap.put(repo.getName(), newRepo);
repoIdMap.put(repo.getId(), newRepo);

Expand Down
198 changes: 198 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/AzureResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.remote.AzureFileSystem;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class AzureResource extends Resource {

private static final Logger LOG = LogManager.getLogger(AzureResource.class);
private Map<String, String> properties;

public AzureResource() {
super();
}

public AzureResource(String name) {
super(name, ResourceType.AZURE);
}

@Override
protected void setProperties(Map<String, String> properties) throws DdlException {
Preconditions.checkState(properties != null);
// check properties
S3Properties.requiredS3PingProperties(properties);
// default need check resource conf valid, so need fix ut and regression case
boolean needCheck = isNeedCheck(properties);
if (LOG.isDebugEnabled()) {
LOG.debug("azure info need check validity : {}", needCheck);
}

// the endpoint for ping need add uri scheme.
String pingEndpoint = properties.get(S3Properties.ENDPOINT);
if (!pingEndpoint.startsWith("http://")) {
pingEndpoint = "http://" + properties.get(S3Properties.ENDPOINT);
properties.put(S3Properties.ENDPOINT, pingEndpoint);
properties.put(S3Properties.Env.ENDPOINT, pingEndpoint);
}
String region = S3Properties.getRegionOfEndpoint(pingEndpoint);
properties.putIfAbsent(S3Properties.REGION, region);
String ak = properties.get(S3Properties.ACCESS_KEY);
String sk = properties.get(S3Properties.SECRET_KEY);
String token = properties.get(S3Properties.SESSION_TOKEN);
CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk, token);

if (needCheck) {
String bucketName = properties.get(S3Properties.BUCKET);
String rootPath = properties.get(S3Properties.ROOT_PATH);
pingAzure(credential, bucketName, rootPath, properties);
}
// optional
S3Properties.optionalS3Property(properties);
this.properties = properties;
}

private static void pingAzure(CloudCredentialWithEndpoint credential, String bucketName, String rootPath,
Map<String, String> properties) throws DdlException {
AzureFileSystem fileSystem = new AzureFileSystem(properties);
String testFile = rootPath + "/test-object-valid.txt";
if (FeConstants.runningUnitTest) {
return;
}
Status status = fileSystem.exists(testFile);
if (status != Status.OK || status.getErrCode() != Status.ErrCode.NOT_FOUND) {
throw new DdlException(
"ping azure failed(head), status: " + status + ", properties: " + new PrintableMap<>(
properties, "=", true, false, true, false));
}

LOG.info("success to ping azure");
}

@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
if (references.containsValue(ReferenceType.POLICY)) {
// can't change, because remote fs use it info to find data.
List<String> cantChangeProperties = Arrays.asList(S3Properties.ENDPOINT, S3Properties.REGION,
S3Properties.ROOT_PATH, S3Properties.BUCKET, S3Properties.Env.ENDPOINT, S3Properties.Env.REGION,
S3Properties.Env.ROOT_PATH, S3Properties.Env.BUCKET);
Optional<String> any = cantChangeProperties.stream().filter(properties::containsKey).findAny();
if (any.isPresent()) {
throw new DdlException("current not support modify property : " + any.get());
}
}
// compatible with old version, Need convert if modified properties map uses old properties.
S3Properties.convertToStdProperties(properties);
boolean needCheck = isNeedCheck(properties);
if (LOG.isDebugEnabled()) {
LOG.debug("s3 info need check validity : {}", needCheck);
}
if (needCheck) {
S3Properties.requiredS3PingProperties(this.properties);
Map<String, String> changedProperties = new HashMap<>(this.properties);
changedProperties.putAll(properties);
String bucketName = properties.getOrDefault(S3Properties.BUCKET, this.properties.get(S3Properties.BUCKET));
String rootPath = properties.getOrDefault(S3Properties.ROOT_PATH,
this.properties.get(S3Properties.ROOT_PATH));

pingAzure(getS3PingCredentials(changedProperties), bucketName, rootPath, changedProperties);
}

// modify properties
writeLock();
for (Map.Entry<String, String> kv : properties.entrySet()) {
replaceIfEffectiveValue(this.properties, kv.getKey(), kv.getValue());
if (kv.getKey().equals(S3Properties.Env.TOKEN)
|| kv.getKey().equals(S3Properties.SESSION_TOKEN)) {
this.properties.put(kv.getKey(), kv.getValue());
}
}
++version;
writeUnlock();
super.modifyProperties(properties);
}

private CloudCredentialWithEndpoint getS3PingCredentials(Map<String, String> properties) {
String ak = properties.getOrDefault(S3Properties.ACCESS_KEY, this.properties.get(S3Properties.ACCESS_KEY));
String sk = properties.getOrDefault(S3Properties.SECRET_KEY, this.properties.get(S3Properties.SECRET_KEY));
String token = properties.getOrDefault(S3Properties.SESSION_TOKEN,
this.properties.get(S3Properties.SESSION_TOKEN));
String endpoint = properties.getOrDefault(S3Properties.ENDPOINT, this.properties.get(S3Properties.ENDPOINT));
String pingEndpoint = "http://" + endpoint;
String region = S3Properties.getRegionOfEndpoint(pingEndpoint);
properties.putIfAbsent(S3Properties.REGION, region);
return new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk, token);
}

private boolean isNeedCheck(Map<String, String> newProperties) {
boolean needCheck = !this.properties.containsKey(S3Properties.VALIDITY_CHECK)
|| Boolean.parseBoolean(this.properties.get(S3Properties.VALIDITY_CHECK));
if (newProperties != null && newProperties.containsKey(S3Properties.VALIDITY_CHECK)) {
needCheck = Boolean.parseBoolean(newProperties.get(S3Properties.VALIDITY_CHECK));
}
return needCheck;
}

@Override
public Map<String, String> getCopiedProperties() {
return Maps.newHashMap(properties);
}

@Override
protected void getProcNodeData(BaseProcResult result) {
String lowerCaseType = type.name().toLowerCase();
result.addRow(Lists.newArrayList(name, lowerCaseType, "id", String.valueOf(id)));
readLock();
result.addRow(Lists.newArrayList(name, lowerCaseType, "version", String.valueOf(version)));
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (PrintableMap.HIDDEN_KEY.contains(entry.getKey())) {
continue;
}
// it's dangerous to show password in show odbc resource,
// so we use empty string to replace the real password
if (entry.getKey().equals(S3Properties.Env.SECRET_KEY)
|| entry.getKey().equals(S3Properties.SECRET_KEY)
|| entry.getKey().equals(S3Properties.Env.TOKEN)
|| entry.getKey().equals(S3Properties.SESSION_TOKEN)) {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), "******"));
} else {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
}
}
readUnlock();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public enum ResourceType {
JDBC,
HDFS,
HMS,
ES;
ES,
AZURE;

public static ResourceType fromString(String resourceType) {
for (ResourceType type : ResourceType.values()) {
Expand Down Expand Up @@ -177,6 +178,9 @@ private static Resource getResourceInstance(ResourceType type, String name) thro
case S3:
resource = new S3Resource(name);
break;
case AZURE:
resource = new AzureResource(name);
break;
case JDBC:
resource = new JdbcResource(name);
break;
Expand Down
Loading

0 comments on commit e11f516

Please sign in to comment.