Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support tencent cloud COS storage in datafusion-cli #9734

Merged
merged 4 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
// Register the store for this URL. Here we don't have access
// to any command options so the only choice is to use an empty collection
match scheme {
"s3" | "oss" => {
"s3" | "oss" | "cos" => {
state = state.add_table_options_extension(AwsOptions::default());
}
"gs" | "gcs" => {
Expand Down
16 changes: 16 additions & 0 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ mod tests {
let locations = vec![
"s3://bucket/path/file.parquet",
"oss://bucket/path/file.parquet",
"cos://bucket/path/file.parquet",
"gcs://bucket/path/file.parquet",
];
let mut ctx = SessionContext::new();
Expand Down Expand Up @@ -497,6 +498,21 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn create_object_store_table_cos() -> Result<()> {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
let endpoint = "fake_endpoint";
let location = "cos://bucket/path/file.parquet";

// Should be OK
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.cos.endpoint' '{endpoint}') LOCATION '{location}'");
create_external_table_test(location, &sql).await?;

Ok(())
}

#[tokio::test]
async fn create_object_store_table_gcs() -> Result<()> {
let service_account_path = "fake_service_account_path";
Expand Down
50 changes: 34 additions & 16 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::any::Any;
use std::fmt::{Debug, Display};
use std::sync::Arc;

use datafusion::common::{config_namespace, exec_datafusion_err, exec_err, internal_err};
use datafusion::common::{exec_datafusion_err, exec_err, internal_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -106,12 +106,27 @@ impl CredentialProvider for S3CredentialProvider {
pub fn get_oss_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
) -> Result<AmazonS3Builder> {
get_object_store_builder(url, aws_options, true)
}

pub fn get_cos_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
) -> Result<AmazonS3Builder> {
get_object_store_builder(url, aws_options, false)
}

fn get_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
virtual_hosted_style_request: bool,
) -> Result<AmazonS3Builder> {
let bucket_name = get_bucket_name(url)?;
let mut builder = AmazonS3Builder::from_env()
.with_virtual_hosted_style_request(true)
.with_virtual_hosted_style_request(virtual_hosted_style_request)
.with_bucket_name(bucket_name)
// oss don't care about the "region" field
// oss/cos don't care about the "region" field
.with_region("do_not_care");

if let (Some(access_key_id), Some(secret_access_key)) =
Expand All @@ -122,7 +137,7 @@ pub fn get_oss_object_store_builder(
.with_secret_access_key(secret_access_key);
}

if let Some(endpoint) = &aws_options.oss.endpoint {
if let Some(endpoint) = &aws_options.endpoint {
builder = builder.with_endpoint(endpoint);
}

Expand Down Expand Up @@ -171,14 +186,8 @@ pub struct AwsOptions {
pub session_token: Option<String>,
/// AWS Region
pub region: Option<String>,
/// Object Storage Service options
pub oss: OssOptions,
}

config_namespace! {
pub struct OssOptions {
pub endpoint: Option<String>, default = None
}
/// OSS or COS Endpoint
pub endpoint: Option<String>,
}

impl ExtensionOptions for AwsOptions {
Expand Down Expand Up @@ -210,8 +219,8 @@ impl ExtensionOptions for AwsOptions {
"region" => {
self.region.set(rem, value)?;
}
"oss" => {
self.oss.set(rem, value)?;
"oss" | "cos" => {
self.endpoint.set(rem, value)?;
}
_ => {
return internal_err!("Config value \"{}\" not found on AwsOptions", rem);
Expand Down Expand Up @@ -252,7 +261,7 @@ impl ExtensionOptions for AwsOptions {
.visit(&mut v, "secret_access_key", "");
self.session_token.visit(&mut v, "session_token", "");
self.region.visit(&mut v, "region", "");
self.oss.visit(&mut v, "oss", "");
self.endpoint.visit(&mut v, "endpoint", "");
v.0
}
}
Expand Down Expand Up @@ -376,7 +385,7 @@ pub(crate) fn register_options(ctx: &SessionContext, scheme: &str) {
// Match the provided scheme against supported cloud storage schemes:
match scheme {
// For Amazon S3 or Alibaba Cloud OSS
"s3" | "oss" => {
"s3" | "oss" | "cos" => {
// Register AWS specific table options in the session context:
ctx.register_table_options_extension(AwsOptions::default())
}
Expand Down Expand Up @@ -415,6 +424,15 @@ pub(crate) async fn get_object_store(
let builder = get_oss_object_store_builder(url, options)?;
Arc::new(builder.build()?)
}
"cos" => {
let Some(options) = table_options.extensions.get::<AwsOptions>() else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In follow up we need to refactor this part as for different schemas its too much of the common code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agree with you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to file a ticket to do so 🙏

return exec_err!(
"Given table options incompatible with the 'cos' scheme"
);
};
let builder = get_cos_object_store_builder(url, options)?;
Arc::new(builder.build()?)
}
"gs" | "gcs" => {
let Some(options) = table_options.extensions.get::<GcpOptions>() else {
return exec_err!(
Expand Down
37 changes: 30 additions & 7 deletions docs/source/user-guide/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ select count(*) from hits;
CREATE EXTERNAL TABLE test
STORED AS PARQUET
OPTIONS(
'access_key_id' '******',
'secret_access_key' '******',
'region' 'us-east-2'
'aws.access_key_id' '******',
'aws.secret_access_key' '******',
'aws.region' 'us-east-2'
)
LOCATION 's3://bucket/path/file.parquet';
```
Expand Down Expand Up @@ -365,9 +365,9 @@ Details of the environment variables that can be used are:
CREATE EXTERNAL TABLE test
STORED AS PARQUET
OPTIONS(
'access_key_id' '******',
'secret_access_key' '******',
'endpoint' 'https://bucket.oss-cn-hangzhou.aliyuncs.com'
'aws.access_key_id' '******',
'aws.secret_access_key' '******',
'aws.oss.endpoint' 'https://bucket.oss-cn-hangzhou.aliyuncs.com'
)
LOCATION 'oss://bucket/path/file.parquet';
```
Expand All @@ -380,6 +380,29 @@ The supported OPTIONS are:

Note that the `endpoint` format of oss needs to be: `https://{bucket}.{oss-region-endpoint}`

## Registering COS Data Sources

[Tencent cloud COS](https://cloud.tencent.com/product/cos) data sources can be registered by executing a `CREATE EXTERNAL TABLE` SQL statement.

```sql
CREATE EXTERNAL TABLE test
STORED AS PARQUET
OPTIONS(
'aws.access_key_id' '******',
'aws.secret_access_key' '******',
'aws.cos.endpoint' 'https://cos.ap-singapore.myqcloud.com'
)
LOCATION 'cos://bucket/path/file.parquet';
```

The supported OPTIONS are:

- access_key_id
- secret_access_key
- endpoint

Note that the `endpoint` format of urls must be: `https://cos.{cos-region-endpoint}`

## Registering GCS Data Sources

[Google Cloud Storage](https://cloud.google.com/storage) data sources can be registered by executing a `CREATE EXTERNAL TABLE` SQL statement.
Expand All @@ -388,7 +411,7 @@ Note that the `endpoint` format of oss needs to be: `https://{bucket}.{oss-regio
CREATE EXTERNAL TABLE test
STORED AS PARQUET
OPTIONS(
'service_account_path' '/tmp/gcs.json',
'gcp.service_account_path' '/tmp/gcs.json',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it related to cos onboard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, add the missing prefix gcp and aws to related external table options.

)
LOCATION 'gs://bucket/path/file.parquet';
```
Expand Down
Loading