From e94260815e38bc6093b4121cbe80a0afe5c592f4 Mon Sep 17 00:00:00 2001 From: harveyyue Date: Thu, 21 Mar 2024 20:19:51 +0800 Subject: [PATCH 1/3] Support tencent cloud COS storage --- datafusion-cli/src/catalog.rs | 2 +- datafusion-cli/src/exec.rs | 16 +++++++++ datafusion-cli/src/object_storage.rs | 50 +++++++++++++++++++--------- docs/source/user-guide/cli.md | 37 ++++++++++++++++---- 4 files changed, 81 insertions(+), 24 deletions(-) diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index 46dd8bb00f06..0fbb7a5908b5 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -177,7 +177,7 @@ impl SchemaProvider for DynamicFileSchemaProvider { // Register the store for this URL. Here we don't have access // to any command options so the only choice is to use an empty collection match scheme { - "s3" | "oss" => { + "s3" | "oss" | "cos" => { state = state.add_table_options_extension(AwsOptions::default()); } "gs" | "gcs" => { diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 4e374a4c0032..114e3cefa3bf 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -415,6 +415,7 @@ mod tests { let locations = vec![ "s3://bucket/path/file.parquet", "oss://bucket/path/file.parquet", + "cos://bucket/path/file.parquet", "gcs://bucket/path/file.parquet", ]; let mut ctx = SessionContext::new(); @@ -497,6 +498,21 @@ mod tests { Ok(()) } + #[tokio::test] + async fn create_object_store_table_cos() -> Result<()> { + let access_key_id = "fake_access_key_id"; + let secret_access_key = "fake_secret_access_key"; + let endpoint = "fake_endpoint"; + let location = "cos://bucket/path/file.parquet"; + + // Should be OK + let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET + OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.cos.endpoint' '{endpoint}') LOCATION '{location}'"); + create_external_table_test(location, &sql).await?; + + Ok(()) + } + #[tokio::test] async fn create_object_store_table_gcs() -> Result<()> { let service_account_path = "fake_service_account_path"; diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 033c8f839ab2..b974a67b8015 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -19,7 +19,7 @@ use std::any::Any; use std::fmt::{Debug, Display}; use std::sync::Arc; -use datafusion::common::{config_namespace, exec_datafusion_err, exec_err, internal_err}; +use datafusion::common::{exec_datafusion_err, exec_err, internal_err}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionState; use datafusion::prelude::SessionContext; @@ -106,12 +106,27 @@ impl CredentialProvider for S3CredentialProvider { pub fn get_oss_object_store_builder( url: &Url, aws_options: &AwsOptions, +) -> Result { + return get_object_store_builder(url, aws_options, true); +} + +pub fn get_cos_object_store_builder( + url: &Url, + aws_options: &AwsOptions, +) -> Result { + return get_object_store_builder(url, aws_options, false); +} + +fn get_object_store_builder( + url: &Url, + aws_options: &AwsOptions, + virtual_hosted_style_request: bool, ) -> Result { let bucket_name = get_bucket_name(url)?; let mut builder = AmazonS3Builder::from_env() - .with_virtual_hosted_style_request(true) + .with_virtual_hosted_style_request(virtual_hosted_style_request) .with_bucket_name(bucket_name) - // oss don't care about the "region" field + // oss/cos don't care about the "region" field .with_region("do_not_care"); if let (Some(access_key_id), Some(secret_access_key)) = @@ -122,7 +137,7 @@ pub fn get_oss_object_store_builder( .with_secret_access_key(secret_access_key); } - if let Some(endpoint) = &aws_options.oss.endpoint { + if let Some(endpoint) = &aws_options.endpoint { builder = builder.with_endpoint(endpoint); } @@ -171,14 +186,8 @@ pub struct AwsOptions { pub session_token: Option, /// AWS Region pub region: Option, - /// Object Storage Service options - pub oss: OssOptions, -} - -config_namespace! { - pub struct OssOptions { - pub endpoint: Option, default = None - } + /// OSS or COS Endpoint + pub endpoint: Option, } impl ExtensionOptions for AwsOptions { @@ -210,8 +219,8 @@ impl ExtensionOptions for AwsOptions { "region" => { self.region.set(rem, value)?; } - "oss" => { - self.oss.set(rem, value)?; + "oss" | "cos" => { + self.endpoint.set(rem, value)?; } _ => { return internal_err!("Config value \"{}\" not found on AwsOptions", rem); @@ -252,7 +261,7 @@ impl ExtensionOptions for AwsOptions { .visit(&mut v, "secret_access_key", ""); self.session_token.visit(&mut v, "session_token", ""); self.region.visit(&mut v, "region", ""); - self.oss.visit(&mut v, "oss", ""); + self.endpoint.visit(&mut v, "endpoint", ""); v.0 } } @@ -376,7 +385,7 @@ pub(crate) fn register_options(ctx: &SessionContext, scheme: &str) { // Match the provided scheme against supported cloud storage schemes: match scheme { // For Amazon S3 or Alibaba Cloud OSS - "s3" | "oss" => { + "s3" | "oss" | "cos" => { // Register AWS specific table options in the session context: ctx.register_table_options_extension(AwsOptions::default()) } @@ -415,6 +424,15 @@ pub(crate) async fn get_object_store( let builder = get_oss_object_store_builder(url, options)?; Arc::new(builder.build()?) } + "cos" => { + let Some(options) = table_options.extensions.get::() else { + return exec_err!( + "Given table options incompatible with the 'cos' scheme" + ); + }; + let builder = get_cos_object_store_builder(url, options)?; + Arc::new(builder.build()?) + } "gs" | "gcs" => { let Some(options) = table_options.extensions.get::() else { return exec_err!( diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md index a94e2427eaa2..184ecc293244 100644 --- a/docs/source/user-guide/cli.md +++ b/docs/source/user-guide/cli.md @@ -312,9 +312,9 @@ select count(*) from hits; CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS( - 'access_key_id' '******', - 'secret_access_key' '******', - 'region' 'us-east-2' + 'aws.access_key_id' '******', + 'aws.secret_access_key' '******', + 'aws.region' 'us-east-2' ) LOCATION 's3://bucket/path/file.parquet'; ``` @@ -365,9 +365,9 @@ Details of the environment variables that can be used are: CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS( - 'access_key_id' '******', - 'secret_access_key' '******', - 'endpoint' 'https://bucket.oss-cn-hangzhou.aliyuncs.com' + 'aws.access_key_id' '******', + 'aws.secret_access_key' '******', + 'aws.oss.endpoint' 'https://bucket.oss-cn-hangzhou.aliyuncs.com' ) LOCATION 'oss://bucket/path/file.parquet'; ``` @@ -380,6 +380,29 @@ The supported OPTIONS are: Note that the `endpoint` format of oss needs to be: `https://{bucket}.{oss-region-endpoint}` +## Registering COS Data Sources + +[Tencent cloud COS](https://cloud.tencent.com/product/cos) data sources can be registered by executing a `CREATE EXTERNAL TABLE` SQL statement. + +```sql +CREATE EXTERNAL TABLE test +STORED AS PARQUET +OPTIONS( + 'aws.access_key_id' '******', + 'aws.secret_access_key' '******', + 'aws.cos.endpoint' 'https://cos.ap-singapore.myqcloud.com' +) +LOCATION 'cos://bucket/path/file.parquet'; +``` + +The supported OPTIONS are: + +- access_key_id +- secret_access_key +- endpoint + +Note that the `endpoint` format of oss needs to be: `https://cos.{cos-region-endpoint}` + ## Registering GCS Data Sources [Google Cloud Storage](https://cloud.google.com/storage) data sources can be registered by executing a `CREATE EXTERNAL TABLE` SQL statement. @@ -388,7 +411,7 @@ Note that the `endpoint` format of oss needs to be: `https://{bucket}.{oss-regio CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS( - 'service_account_path' '/tmp/gcs.json', + 'gcp.service_account_path' '/tmp/gcs.json', ) LOCATION 'gs://bucket/path/file.parquet'; ``` From d0367c88f8429dc67f24ddfb411e7c4a247bec16 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 24 Mar 2024 14:38:09 -0400 Subject: [PATCH 2/3] Fix clippy --- datafusion-cli/src/object_storage.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index b974a67b8015..94560cb9d8da 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -107,14 +107,14 @@ pub fn get_oss_object_store_builder( url: &Url, aws_options: &AwsOptions, ) -> Result { - return get_object_store_builder(url, aws_options, true); + get_object_store_builder(url, aws_options, true) } pub fn get_cos_object_store_builder( url: &Url, aws_options: &AwsOptions, ) -> Result { - return get_object_store_builder(url, aws_options, false); + get_object_store_builder(url, aws_options, false) } fn get_object_store_builder( From 5f12d4b1b9c177f69ee00f29a8306bb57aee5c9c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 24 Mar 2024 14:42:14 -0400 Subject: [PATCH 3/3] Update docs/source/user-guide/cli.md --- docs/source/user-guide/cli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md index 184ecc293244..da4c9870545a 100644 --- a/docs/source/user-guide/cli.md +++ b/docs/source/user-guide/cli.md @@ -401,7 +401,7 @@ The supported OPTIONS are: - secret_access_key - endpoint -Note that the `endpoint` format of oss needs to be: `https://cos.{cos-region-endpoint}` +Note that the `endpoint` format of urls must be: `https://cos.{cos-region-endpoint}` ## Registering GCS Data Sources