Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions docs/content.zh/docs/connectors/pipeline-connectors/maxcompute.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ pipeline:
<td>Sink 的名称.</td>
</tr>
<tr>
<td>accessId</td>
<td>access-id</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>阿里云账号或RAM用户的AccessKey ID。您可以进入<a href="https://ram.console.aliyun.com/manage/ak">
AccessKey管理页面</a> 获取AccessKey ID。</td>
</tr>
<tr>
<td>accessKey</td>
<td>access-key</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
Expand All @@ -126,65 +126,65 @@ pipeline:
MaxCompute控制台</a>,在 工作区 > 项目管理 页面获取MaxCompute项目名称。</td>
</tr>
<tr>
<td>tunnelEndpoint</td>
<td>tunnel.endpoint</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>MaxCompute Tunnel服务的连接地址,通常这项配置可以根据指定的project所在的region进行自动路由。仅在使用代理等特殊网络环境下使用该配置。</td>
</tr>
<tr>
<td>quotaName</td>
<td>quota.name</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>MaxCompute 数据传输使用的独享资源组名称,如不指定该配置,则使用共享资源组。详情可以参考<a href="https://help.aliyun.com/zh/maxcompute/user-guide/purchase-and-use-exclusive-resource-groups-for-dts">
使用 Maxcompute 独享资源组</a></td>
</tr>
<tr>
<td>stsToken</td>
<td>sts-token</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>当使用RAM角色颁发的短时有效的访问令牌(STS Token)进行鉴权时,需要指定该参数。</td>
</tr>
<tr>
<td>bucketsNum</td>
<td>buckets-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>自动创建 MaxCompute Delta 表时使用的桶数。使用方式可以参考 <a href="https://help.aliyun.com/zh/maxcompute/user-guide/transaction-table2-0-overview">
Delta Table 概述</a></td>
</tr>
<tr>
<td>compressAlgorithm</td>
<td>compress.algorithm</td>
<td>optional</td>
<td style="word-wrap: break-word;">zlib</td>
<td>String</td>
<td>写入MaxCompute时使用的数据压缩算法,当前支持<code>raw</code>(不进行压缩),<code>zlib</code>和<code>snappy</code>。</td>
<td>写入MaxCompute时使用的数据压缩算法,当前支持<code>raw</code>(不进行压缩),<code>zlib</code>, <code>lz4</code>和<code>snappy</code>。</td>
</tr>
<tr>
<td>totalBatchSize</td>
<td>total.buffer-size</td>
<td>optional</td>
<td style="word-wrap: break-word;">64MB</td>
<td>String</td>
<td>内存中缓冲的数据量大小,单位为分区级(非分区表单位为表级),不同分区(表)的缓冲区相互独立,达到阈值后数据写入到MaxCompute。</td>
</tr>
<tr>
<td>bucketBatchSize</td>
<td>bucket.buffer-size</td>
<td>optional</td>
<td style="word-wrap: break-word;">4MB</td>
<td>String</td>
<td>内存中缓冲的数据量大小,单位为桶级,仅写入 Delta 表时生效。不同数据桶的缓冲区相互独立,达到阈值后将该桶数据写入到MaxCompute。</td>
</tr>
<tr>
<td>numCommitThreads</td>
<td>commit.thread-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>checkpoint阶段,能够同时处理的分区(表)数量。</td>
</tr>
<tr>
<td>numFlushConcurrent</td>
<td>flush.concurrent-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">4</td>
<td>Integer</td>
Expand Down
24 changes: 12 additions & 12 deletions docs/content/docs/connectors/pipeline-connectors/maxcompute.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ pipeline:
<td>The name of the sink.</td>
</tr>
<tr>
<td>accessId</td>
<td>access-id</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>AccessKey ID of Alibaba Cloud account or RAM user. You can enter <a href="https://ram.console.aliyun.com/manage/ak">
AccessKey management page</a> Obtain AccessKey ID.</td>
</tr>
<tr>
<td>accessKey</td>
<td>access-key</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
Expand All @@ -124,63 +124,63 @@ pipeline:
<td>The name of the MaxCompute project. You can log in to the <a href="https://maxcompute.console.aliyun.com/">MaxCompute console</a> and obtain the MaxCompute project name on the Workspace > Project Management page.</td>
</tr>
<tr>
<td>tunnelEndpoint</td>
<td>tunnel.endpoint</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The connection address for the MaxCompute Tunnel service. Typically, this configuration can be auto-routed based on the region where the specified project is located. It is used only in special network environments such as when using a proxy.</td>
</tr>
<tr>
<td>quotaName</td>
<td>quota.name</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The name of the exclusive resource group for MaxCompute data transfer. If not specified, the shared resource group is used. For details, refer to <a href="https://help.aliyun.com/zh/maxcompute/user-guide/purchase-and-use-exclusive-resource-groups-for-dts">Using exclusive resource groups for Maxcompute</a></td>
</tr>
<tr>
<td>stsToken</td>
<td>sts-token</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When using a temporary access token (STS Token) issued by a RAM role for authentication, this parameter must be specified.</td>
</tr>
<tr>
<td>bucketsNum</td>
<td>buckets-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>The number of buckets used when auto-creating MaxCompute Delta tables. For usage, refer to <a href="https://help.aliyun.com/zh/maxcompute/user-guide/transaction-table2-0-overview">Delta Table Overview</a></td>
</tr>
<tr>
<td>compressAlgorithm</td>
<td>compress.algorithm</td>
<td>optional</td>
<td style="word-wrap: break-word;">zlib</td>
<td>String</td>
<td>The data compression algorithm used when writing to MaxCompute. Currently supports <code>raw</code> (no compression), <code>zlib</code>, and <code>snappy</code>.</td>
<td>The data compression algorithm used when writing to MaxCompute. Currently supports <code>raw</code> (no compression), <code>zlib</code>, <code>lz4</code>, and <code>snappy</code>.</td>
</tr>
<tr>
<td>totalBatchSize</td>
<td>total.buffer-size</td>
<td>optional</td>
<td style="word-wrap: break-word;">64MB</td>
<td>String</td>
<td>The size of the data buffer in memory, by partition level (for non-partitioned tables, by table level). Buffers for different partitions (tables) are independent, and data is written to MaxCompute when the threshold is reached.</td>
</tr>
<tr>
<td>bucketBatchSize</td>
<td>bucket.buffer-size</td>
<td>optional</td>
<td style="word-wrap: break-word;">4MB</td>
<td>String</td>
<td>The size of the data buffer in memory, by bucket level. This is effective only when writing to Delta tables. Buffers for different data buckets are independent, and the bucket data is written to MaxCompute when the threshold is reached.</td>
</tr>
<tr>
<td>numCommitThreads</td>
<td>commit.thread-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>The number of partitions (tables) that can be processed simultaneously during the checkpoint stage.</td>
</tr>
<tr>
<td>numFlushConcurrent</td>
<td>flush.concurrent-num</td>
<td>optional</td>
<td style="word-wrap: break-word;">4</td>
<td>Integer</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.connectors.maxcompute.options.CompressAlgorithm;
import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
import org.apache.flink.configuration.MemorySize;
Expand Down Expand Up @@ -69,21 +70,20 @@ private MaxComputeOptions extractMaxComputeOptions(

private MaxComputeWriteOptions extractMaxComputeWriteOptions(
Configuration factoryConfiguration) {
int numCommitThread =
factoryConfiguration.get(MaxComputeDataSinkOptions.NUM_COMMIT_THREADS);
String compressAlgorithm =
int numCommitThread = factoryConfiguration.get(MaxComputeDataSinkOptions.COMMIT_THREAD_NUM);
CompressAlgorithm compressAlgorithm =
factoryConfiguration.get(MaxComputeDataSinkOptions.COMPRESS_ALGORITHM);
int flushConcurrent =
factoryConfiguration.get(MaxComputeDataSinkOptions.NUM_FLUSH_CONCURRENT);
factoryConfiguration.get(MaxComputeDataSinkOptions.FLUSH_CONCURRENT_NUM);
long maxBufferSize =
MemorySize.parse(
factoryConfiguration.get(
MaxComputeDataSinkOptions.TOTAL_BATCH_SIZE))
MaxComputeDataSinkOptions.TOTAL_BUFFER_SIZE))
.getBytes();
long maxSlotSize =
MemorySize.parse(
factoryConfiguration.get(
MaxComputeDataSinkOptions.BUCKET_BATCH_SIZE))
MaxComputeDataSinkOptions.BUCKET_BUFFER_SIZE))
.getBytes();

return MaxComputeWriteOptions.builder()
Expand Down Expand Up @@ -119,11 +119,11 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(MaxComputeDataSinkOptions.STS_TOKEN);
optionalOptions.add(MaxComputeDataSinkOptions.BUCKETS_NUM);
// write options
optionalOptions.add(MaxComputeDataSinkOptions.NUM_COMMIT_THREADS);
optionalOptions.add(MaxComputeDataSinkOptions.COMMIT_THREAD_NUM);
optionalOptions.add(MaxComputeDataSinkOptions.COMPRESS_ALGORITHM);
optionalOptions.add(MaxComputeDataSinkOptions.NUM_FLUSH_CONCURRENT);
optionalOptions.add(MaxComputeDataSinkOptions.TOTAL_BATCH_SIZE);
optionalOptions.add(MaxComputeDataSinkOptions.BUCKET_BATCH_SIZE);
optionalOptions.add(MaxComputeDataSinkOptions.FLUSH_CONCURRENT_NUM);
optionalOptions.add(MaxComputeDataSinkOptions.TOTAL_BUFFER_SIZE);
optionalOptions.add(MaxComputeDataSinkOptions.BUCKET_BUFFER_SIZE);

return optionalOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@

import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.ConfigOptions;
import org.apache.flink.cdc.connectors.maxcompute.options.CompressAlgorithm;

/** Options for MaxCompute Data Sink. */
public class MaxComputeDataSinkOptions {
// basic options.
public static final ConfigOption<String> ACCESS_ID =
ConfigOptions.key("accessId")
ConfigOptions.key("access-id")
.stringType()
.noDefaultValue()
.withDescription("MaxCompute user access id.");

public static final ConfigOption<String> ACCESS_KEY =
ConfigOptions.key("accessKey")
ConfigOptions.key("access-key")
.stringType()
.noDefaultValue()
.withDescription("MaxCompute user access key.");
Expand All @@ -49,59 +50,60 @@ public class MaxComputeDataSinkOptions {
.withDescription("MaxCompute project.");

public static final ConfigOption<String> TUNNEL_ENDPOINT =
ConfigOptions.key("tunnelEndpoint")
ConfigOptions.key("tunnel.endpoint")
.stringType()
.noDefaultValue()
.withDescription("MaxCompute tunnel end point.");

public static final ConfigOption<String> QUOTA_NAME =
ConfigOptions.key("quotaName")
ConfigOptions.key("quota.name")
.stringType()
.noDefaultValue()
.withDescription(
"MaxCompute tunnel quota name, note that not quota nick-name.");

public static final ConfigOption<String> STS_TOKEN =
ConfigOptions.key("stsToken")
ConfigOptions.key("sts-token")
.stringType()
.noDefaultValue()
.withDescription("MaxCompute sts token.");

public static final ConfigOption<Integer> BUCKETS_NUM =
ConfigOptions.key("bucketsNum")
ConfigOptions.key("buckets-num")
.intType()
.defaultValue(16)
.withDescription(
"The batch size of MaxCompute table when automatically create table.");

// write options.
public static final ConfigOption<String> COMPRESS_ALGORITHM =
ConfigOptions.key("compressAlgorithm")
.stringType()
.defaultValue("zlib")
public static final ConfigOption<CompressAlgorithm> COMPRESS_ALGORITHM =
ConfigOptions.key("compress.algorithm")
.enumType(CompressAlgorithm.class)
.defaultValue(CompressAlgorithm.ZLIB)
.withDescription(
"The compress algorithm of data upload to MaxCompute, support 'zlib', 'snappy', 'raw'.");

public static final ConfigOption<String> TOTAL_BATCH_SIZE =
ConfigOptions.key("totalBatchSize")
.stringType()
.defaultValue("64MB")
.withDescription("The max batch size of data upload to MaxCompute.");
"The compress algorithm of data upload to MaxCompute, support 'zlib', 'snappy', 'lz4', 'raw'.");

public static final ConfigOption<String> BUCKET_BATCH_SIZE =
ConfigOptions.key("bucketBatchSize")
public static final ConfigOption<String> BUCKET_BUFFER_SIZE =
ConfigOptions.key("bucket.buffer-size")
.stringType()
.defaultValue("4MB")
.withDescription(
"The max batch size of data per bucket when upload to MaxCompute");

public static final ConfigOption<Integer> NUM_COMMIT_THREADS =
ConfigOptions.key("numCommitThreads")
public static final ConfigOption<String> TOTAL_BUFFER_SIZE =
ConfigOptions.key("total.buffer-size")
.stringType()
.defaultValue("64MB")
.withDescription("The max batch size of data upload to MaxCompute.");

public static final ConfigOption<Integer> COMMIT_THREAD_NUM =
ConfigOptions.key("commit.thread-num")
.intType()
.defaultValue(16)
.withDescription("The number of threads used to commit data to MaxCompute.");

public static final ConfigOption<Integer> NUM_FLUSH_CONCURRENT =
ConfigOptions.key("numFlushConcurrent")
public static final ConfigOption<Integer> FLUSH_CONCURRENT_NUM =
ConfigOptions.key("flush.concurrent-num")
.intType()
.defaultValue(4)
.withDescription("The number of concurrent with flush bucket data.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.maxcompute.options;

/** Compress algorithm for MaxCompute table. */
public enum CompressAlgorithm {
/** No compress. */
RAW("raw"),

/** Zlib compress. */
ZLIB("zlib"),

/** LZ4 compress. */
LZ4("lz4"),

/** Snappy compress. */
@Deprecated
SNAPPY("snappy");

private final String value;

CompressAlgorithm(String value) {
this.value = value;
}

public String getValue() {
return value;
}

@Override
public String toString() {
return value;
}
}
Loading
Loading