Skip to content

Commit

Permalink
[BugFix] fix the issue of reading paimon table on OSS (#34996)
Browse files Browse the repository at this point in the history
Signed-off-by: miomiocat <284487410@qq.com>
  • Loading branch information
miomiocat authored Nov 15, 2023
1 parent 101a0ef commit e95749d
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 117 deletions.
25 changes: 1 addition & 24 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,36 +568,13 @@ HdfsScanner* HiveDataSource::_create_paimon_jni_scanner(const FSOptions& options
nested_fields = nested_fields.substr(0, nested_fields.size() - 1);
}
std::map<std::string, std::string> jni_scanner_params;
jni_scanner_params["catalog_type"] = paimon_table->get_catalog_type();
jni_scanner_params["metastore_uri"] = paimon_table->get_metastore_uri();
jni_scanner_params["warehouse_path"] = paimon_table->get_warehouse_path();
jni_scanner_params["database_name"] = paimon_table->get_database_name();
jni_scanner_params["table_name"] = paimon_table->get_table_name();
jni_scanner_params["paimon_options"] = paimon_table->get_paimon_options();
jni_scanner_params["required_fields"] = required_fields;
jni_scanner_params["split_info"] = _scan_range.paimon_split_info;
jni_scanner_params["predicate_info"] = _scan_range.paimon_predicate_info;
jni_scanner_params["nested_fields"] = nested_fields;

string option_info = "";
if (options.cloud_configuration != nullptr && options.cloud_configuration->cloud_type == TCloudType::AWS) {
const AWSCloudConfiguration aws_cloud_configuration =
CloudConfigurationFactory::create_aws(*options.cloud_configuration);
AWSCloudCredential aws_cloud_credential = aws_cloud_configuration.aws_cloud_credential;
if (!aws_cloud_credential.endpoint.empty()) {
option_info += "s3.endpoint=" + aws_cloud_credential.endpoint + ",";
}
if (!aws_cloud_credential.access_key.empty()) {
option_info += "s3.access-key=" + aws_cloud_credential.access_key + ",";
}
if (!aws_cloud_credential.secret_key.empty()) {
option_info += "s3.secret-key=" + aws_cloud_credential.secret_key + ",";
}
string enable_ssl = aws_cloud_configuration.enable_ssl ? "true" : "false";
option_info += "s3.connection.ssl.enabled=" + enable_ssl + ",";
string enable_path_style_access = aws_cloud_configuration.enable_path_style_access ? "true" : "false";
option_info += "s3.path.style.access=" + enable_path_style_access;
}
jni_scanner_params["option_info"] = option_info;
jni_scanner_params["fs_options_props"] = build_fs_options_properties(options);

std::string scanner_factory_class = "com/starrocks/paimon/reader/PaimonSplitScannerFactory";
Expand Down
20 changes: 5 additions & 15 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,23 +320,9 @@ const std::string& HudiTableDescriptor::get_serde_lib() const {

PaimonTableDescriptor::PaimonTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool)
: HiveTableDescriptor(tdesc, pool) {
_catalog_type = tdesc.paimonTable.catalog_type;
_metastore_uri = tdesc.paimonTable.metastore_uri;
_warehouse_path = tdesc.paimonTable.warehouse_path;
_database_name = tdesc.dbName;
_table_name = tdesc.tableName;
}

const std::string& PaimonTableDescriptor::get_catalog_type() const {
return _catalog_type;
}

const std::string& PaimonTableDescriptor::get_metastore_uri() const {
return _metastore_uri;
}

const std::string& PaimonTableDescriptor::get_warehouse_path() const {
return _warehouse_path;
_paimon_options = tdesc.paimonTable.paimon_options;
}

const std::string& PaimonTableDescriptor::get_database_name() const {
Expand All @@ -347,6 +333,10 @@ const std::string& PaimonTableDescriptor::get_table_name() const {
return _table_name;
}

const std::string& PaimonTableDescriptor::get_paimon_options() const {
return _paimon_options;
}

HiveTableDescriptor::HiveTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool) : TableDescriptor(tdesc) {}

bool HiveTableDescriptor::is_partition_col(const SlotDescriptor* slot) const {
Expand Down
8 changes: 2 additions & 6 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,18 +299,14 @@ class PaimonTableDescriptor : public HiveTableDescriptor {
PaimonTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool);
~PaimonTableDescriptor() override = default;
bool has_partition() const override { return false; }
const std::string& get_catalog_type() const;
const std::string& get_metastore_uri() const;
const std::string& get_warehouse_path() const;
const std::string& get_database_name() const;
const std::string& get_table_name() const;
const std::string& get_paimon_options() const;

private:
std::string _catalog_type;
std::string _metastore_uri;
std::string _warehouse_path;
std::string _database_name;
std::string _table_name;
std::string _paimon_options;
};

// ===========================================
Expand Down
24 changes: 11 additions & 13 deletions fe/fe-core/src/main/java/com/starrocks/catalog/PaimonTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@

package com.starrocks.catalog;

import com.google.common.base.Preconditions;
import com.starrocks.analysis.DescriptorTable;
import com.starrocks.thrift.TPaimonTable;
import com.starrocks.thrift.TTableDescriptor;
import com.starrocks.thrift.TTableType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.types.DataField;

Expand All @@ -34,26 +34,21 @@

public class PaimonTable extends Table {
private static final Logger LOG = LogManager.getLogger(PaimonTable.class);
private final String catalogType;
private final String metastoreUris;
private final String warehousePath;
private final String catalogName;
private final String databaseName;
private final String tableName;
private final Options paimonOptions;
private final AbstractFileStoreTable paimonNativeTable;
private final List<String> partColumnNames;
private final List<String> paimonFieldNames;

public PaimonTable(String catalogName, String dbName, String tblName, List<Column> schema,
String catalogType, String metastoreUris, String warehousePath,
org.apache.paimon.table.Table paimonNativeTable, long createTime) {
Options paimonOptions, org.apache.paimon.table.Table paimonNativeTable, long createTime) {
super(CONNECTOR_ID_GENERATOR.getNextId().asInt(), tblName, TableType.PAIMON, schema);
this.catalogName = catalogName;
this.databaseName = dbName;
this.tableName = tblName;
this.catalogType = catalogType;
this.metastoreUris = metastoreUris;
this.warehousePath = warehousePath;
this.paimonOptions = paimonOptions;
this.paimonNativeTable = (AbstractFileStoreTable) paimonNativeTable;
this.partColumnNames = paimonNativeTable.partitionKeys();
this.paimonFieldNames = paimonNativeTable.rowType().getFields().stream()
Expand Down Expand Up @@ -120,11 +115,14 @@ public boolean isSupported() {

@Override
public TTableDescriptor toThrift(List<DescriptorTable.ReferencedPartitionInfo> partitions) {
Preconditions.checkNotNull(partitions);
TPaimonTable tPaimonTable = new TPaimonTable();
tPaimonTable.setCatalog_type(catalogType);
tPaimonTable.setMetastore_uri(metastoreUris);
tPaimonTable.setWarehouse_path(warehousePath);
StringBuilder sb = new StringBuilder();
for (String key : this.paimonOptions.keySet()) {
sb.append(key).append("=").append(this.paimonOptions.get(key)).append(",");
}
String option = sb.substring(0, sb.length() - 1);

tPaimonTable.setPaimon_options(option);
TTableDescriptor tTableDescriptor = new TTableDescriptor(id, TTableType.PAIMON_TABLE,
fullSchema.size(), 0, tableName, databaseName);
tTableDescriptor.setPaimonTable(tPaimonTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudConfigurationFactory;
import com.starrocks.credential.CloudType;
import com.starrocks.credential.aliyun.AliyunCloudConfiguration;
import com.starrocks.credential.aliyun.AliyunCloudCredential;
import com.starrocks.credential.aws.AWSCloudConfiguration;
import com.starrocks.credential.aws.AWSCloudCredential;
import org.apache.paimon.catalog.Catalog;
Expand All @@ -31,6 +33,8 @@
import org.apache.paimon.options.Options;

import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.URI;
Expand Down Expand Up @@ -74,7 +78,16 @@ public PaimonConnector(ConnectorContext context) {
throw new StarRocksConnectorException("The property %s must be set.", PAIMON_CATALOG_WAREHOUSE);
}
paimonOptions.setString(WAREHOUSE.key(), warehousePath);
initFsOption(cloudConfiguration);
String keyPrefix = "paimon.option.";
Set<String> optionKeys = properties.keySet().stream().filter(k -> k.startsWith(keyPrefix)).collect(Collectors.toSet());
for (String k : optionKeys) {
String key = k.substring(keyPrefix.length());
paimonOptions.setString(key, properties.get(k));
}
}

public void initFsOption(CloudConfiguration cloudConfiguration) {
if (cloudConfiguration.getCloudType() == CloudType.AWS) {
AWSCloudConfiguration awsCloudConfiguration = (AWSCloudConfiguration) cloudConfiguration;
paimonOptions.set("s3.connection.ssl.enabled", String.valueOf(awsCloudConfiguration.getEnableSSL()));
Expand All @@ -90,6 +103,19 @@ public PaimonConnector(ConnectorContext context) {
paimonOptions.set("s3.secret-key", awsCloudCredential.getSecretKey());
}
}
if (cloudConfiguration.getCloudType() == CloudType.ALIYUN) {
AliyunCloudConfiguration aliyunCloudConfiguration = (AliyunCloudConfiguration) cloudConfiguration;
AliyunCloudCredential aliyunCloudCredential = aliyunCloudConfiguration.getAliyunCloudCredential();
if (!aliyunCloudCredential.getEndpoint().isEmpty()) {
paimonOptions.set("fs.oss.endpoint", aliyunCloudCredential.getEndpoint());
}
if (!aliyunCloudCredential.getAccessKey().isEmpty()) {
paimonOptions.set("fs.oss.accessKeyId", aliyunCloudCredential.getAccessKey());
}
if (!aliyunCloudCredential.getSecretKey().isEmpty()) {
paimonOptions.set("fs.oss.accessKeySecret", aliyunCloudCredential.getSecretKey());
}
}
}

public Options getPaimonOptions() {
Expand All @@ -105,7 +131,6 @@ public Catalog getPaimonNativeCatalog() {

@Override
public ConnectorMetadata getMetadata() {
return new PaimonMetadata(catalogName, hdfsEnvironment, getPaimonNativeCatalog(), catalogType, metastoreUris,
warehousePath);
return new PaimonMetadata(catalogName, hdfsEnvironment, getPaimonNativeCatalog(), getPaimonOptions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
Expand All @@ -68,21 +69,17 @@ public class PaimonMetadata implements ConnectorMetadata {
private static final Logger LOG = LogManager.getLogger(PaimonMetadata.class);
private final Catalog paimonNativeCatalog;
private final HdfsEnvironment hdfsEnvironment;
private final String catalogType;
private final String metastoreUris;
private final String warehousePath;
private final String catalogName;
private final Options paimonOptions;
private final Map<Identifier, Table> tables = new ConcurrentHashMap<>();
private final Map<String, Database> databases = new ConcurrentHashMap<>();
private final Map<PaimonFilter, PaimonSplitsInfo> paimonSplits = new ConcurrentHashMap<>();

public PaimonMetadata(String catalogName, HdfsEnvironment hdfsEnvironment, Catalog paimonNativeCatalog,
String catalogType, String metastoreUris, String warehousePath) {
Options paimonOptions) {
this.paimonNativeCatalog = paimonNativeCatalog;
this.hdfsEnvironment = hdfsEnvironment;
this.catalogType = catalogType;
this.metastoreUris = metastoreUris;
this.warehousePath = warehousePath;
this.paimonOptions = paimonOptions;
this.catalogName = catalogName;
}

Expand Down Expand Up @@ -176,8 +173,8 @@ public Table getTable(String dbName, String tblName) {
} catch (Exception e) {
LOG.error("Get paimon table {}.{} createtime failed, error: {}", dbName, tblName, e);
}
PaimonTable table = new PaimonTable(catalogName, dbName, tblName, fullSchema,
catalogType, metastoreUris, warehousePath, paimonNativeTable, createTime);
PaimonTable table = new PaimonTable(this.catalogName, dbName, tblName, fullSchema,
this.paimonOptions, paimonNativeTable, createTime);
tables.put(identifier, table);
return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public AliyunCloudConfiguration(AliyunCloudCredential aliyunCloudCredential) {
this.aliyunCloudCredential = aliyunCloudCredential;
}

public AliyunCloudCredential getAliyunCloudCredential() {
return aliyunCloudCredential;
}

// reuse aws client logic of BE
@Override
public void toThrift(TCloudConfiguration tCloudConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ public AliyunCloudCredential(String accessKey, String secretKey, String endpoint
this.endpoint = endpoint;
}

public String getAccessKey() {
return accessKey;
}

public String getSecretKey() {
return secretKey;
}

public String getEndpoint() {
return endpoint;
}

@Override
public void applyToConfiguration(Configuration configuration) {
configuration.set("fs.oss.impl", "com.aliyun.jindodata.oss.JindoOssFileSystem");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@

import com.google.common.collect.Lists;
import com.starrocks.connector.ColumnTypeConverter;
import com.starrocks.thrift.TTableDescriptor;
import mockit.Expectations;
import mockit.Mocked;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PaimonTableTest {

Expand All @@ -40,15 +46,15 @@ public void testPartitionKeys(@Mocked AbstractFileStoreTable paimonNativeTable)
List<Column> fullSchema = new ArrayList<>(fields.size());
ArrayList<String> partitions = Lists.newArrayList("b", "c");

ArrayList<Column> expections = new ArrayList<>();
ArrayList<Column> partitionSchema = new ArrayList<>();
for (DataField field : fields) {
String fieldName = field.name();
DataType type = field.type();
Type fieldType = ColumnTypeConverter.fromPaimonType(type);
Column column = new Column(fieldName, fieldType, true);
fullSchema.add(column);
if (partitions.contains(fieldName)) {
expections.add(column);
partitionSchema.add(column);
}
}
new Expectations() {
Expand All @@ -59,11 +65,52 @@ public void testPartitionKeys(@Mocked AbstractFileStoreTable paimonNativeTable)
result = partitions;
}
};
PaimonTable paimonTable = new PaimonTable("testCatalog", "testDB", "testTable", fullSchema, "filesystem", null,
"file:///home/wgcn", paimonNativeTable, 100L);
List<String> keys = new ArrayList<>();
Options options = new Options();
options.set(CatalogOptions.METASTORE, "filesystem");
options.set(CatalogOptions.WAREHOUSE, "file:///home/wgcn");
PaimonTable paimonTable = new PaimonTable("testCatalog", "testDB", "testTable", fullSchema,
options, paimonNativeTable, 100L);
List<Column> partitionColumns = paimonTable.getPartitionColumns();
Assertions.assertThat(partitionColumns).hasSameElementsAs(expections);
Assertions.assertThat(partitionColumns).hasSameElementsAs(partitionSchema);
}

@Test
public void testToThrift(@Mocked AbstractFileStoreTable paimonNativeTable) {
RowType rowType =
RowType.builder().field("a", DataTypes.INT()).field("b", DataTypes.INT()).field("c", DataTypes.INT())
.build();
List<DataField> fields = rowType.getFields();
List<Column> fullSchema = new ArrayList<>(fields.size());
ArrayList<String> partitions = Lists.newArrayList("b", "c");
new Expectations() {
{
paimonNativeTable.rowType();
result = rowType;
paimonNativeTable.partitionKeys();
result = partitions;
}
};
Options options = new Options();
options.set(CatalogOptions.METASTORE, "filesystem");
options.set(CatalogOptions.WAREHOUSE, "hdfs://host/warehouse");
String dbName = "testDB";
String tableName = "testTable";
PaimonTable paimonTable = new PaimonTable("testCatalog", dbName, tableName, fullSchema,
options, paimonNativeTable, 100L);

TTableDescriptor tTableDescriptor = paimonTable.toThrift(null);
Assert.assertEquals(tTableDescriptor.getDbName(), dbName);
Assert.assertEquals(tTableDescriptor.getTableName(), tableName);
String optionString = tTableDescriptor.getPaimonTable().getPaimon_options();
String[] optionArray = optionString.split(",");
Map<String, String> optionMap = new HashMap<>();
for (String s : optionArray) {
String[] kv = s.split("=");
optionMap.put(kv[0], kv[1]);
}
for (String key : options.keySet()) {
Assert.assertEquals(optionMap.get(key), options.get(key));
}
}

}
Loading

0 comments on commit e95749d

Please sign in to comment.