Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] [Connector-V2] File Connector add lzo compression way. #3782

Merged
merged 12 commits into from
Dec 30, 2022
11 changes: 9 additions & 2 deletions docs/en/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] parquet
- [x] orc
- [x] json
- [x] compress codec
- [x] lzo


## Options

Expand All @@ -40,6 +43,7 @@ In order to use this connector, You must ensure your spark/flink cluster already
| is_enable_transaction | boolean | no | true |
| batch_size | int | no | 1000000 |
| common-options | | no | - |
| compressCodec | string | no | none |

### fs.defaultFS [string]

Expand Down Expand Up @@ -120,8 +124,10 @@ Only support `true` now.

The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.

### common options
### compressCodec [string]
Support lzo compression for text in file format. The file name ends with ".lzo.txt" .

### common options
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details

## Example
Expand Down Expand Up @@ -202,4 +208,5 @@ HdfsFile {
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed

- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
4 changes: 3 additions & 1 deletion docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] text
- [x] parquet
- [x] orc
- [x] compress codec
- [x] lzo

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| compressCodec | string | no | none |
| common-options | | no | - |

### table_name [string]

Target Hive table name eg: db1.table1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,15 @@ public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig, Seri

public BaseFileSinkConfig(@NonNull Config config) {
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
throw new FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
"Compress not supported by SeaTunnel file connector now");
CompressFormat compressFormat = CompressFormat.valueOf(config.getString(BaseSinkConfig.COMPRESS_CODEC.key()).toUpperCase(Locale.ROOT));
switch (compressFormat) {
case LZO:
this.compressCodec = compressFormat.getCompressCodec();
break;
default:
throw new FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
"Compress not supported this compress code by SeaTunnel file connector now");
}
}
if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.config;

import java.io.Serializable;

public enum CompressFormat implements Serializable {

LZO("lzo"),
NONE("none");

private final String compressCodec;

CompressFormat(String compressCodec) {
this.compressCodec = compressCodec;
}

public String getCompressCodec() {
return compressCodec;
}

public static CompressFormat getCompressFormat(String value) {
switch (value) {
case "lzo":
return CompressFormat.LZO;
default:
return CompressFormat.NONE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.VariablesSubstitute;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
Expand Down Expand Up @@ -197,8 +198,12 @@ public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow
public String generateFileName(String transactionId) {
String fileNameExpression = fileSinkConfig.getFileNameExpression();
FileFormat fileFormat = fileSinkConfig.getFileFormat();
String suffix = fileFormat.getSuffix();
if (CompressFormat.LZO.getCompressCodec().equals(fileSinkConfig.getCompressCodec())) {
suffix = "." + CompressFormat.LZO.getCompressCodec() + "." + suffix;
}
if (StringUtils.isBlank(fileNameExpression)) {
return transactionId + fileFormat.getSuffix();
return transactionId + suffix;
}
String timeFormat = fileSinkConfig.getFileNameTimeFormat();
DateTimeFormatter df = DateTimeFormatter.ofPattern(timeFormat);
Expand All @@ -209,7 +214,7 @@ public String generateFileName(String transactionId) {
valuesMap.put(timeFormat, formattedDate);
valuesMap.put(BaseSinkConfig.TRANSACTION_EXPRESSION, transactionId);
String substitute = VariablesSubstitute.substitute(fileNameExpression, valuesMap) + "_" + partId;
return substitute + fileFormat.getSuffix();
return substitute + suffix;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.format.text.TextSerializationSchema;

import io.airlift.compress.lzo.LzopCodec;
import lombok.NonNull;
import org.apache.hadoop.fs.FSDataOutputStream;

import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

public class TextWriteStrategy extends AbstractWriteStrategy {
Expand All @@ -44,6 +48,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
private final DateTimeUtils.Formatter dateTimeFormat;
private final TimeUtils.Formatter timeFormat;
private SerializationSchema serializationSchema;
private String compressCodec;

public TextWriteStrategy(FileSinkConfig textFileSinkConfig) {
super(textFileSinkConfig);
Expand All @@ -54,6 +59,7 @@ public TextWriteStrategy(FileSinkConfig textFileSinkConfig) {
this.dateFormat = textFileSinkConfig.getDateFormat();
this.dateTimeFormat = textFileSinkConfig.getDatetimeFormat();
this.timeFormat = textFileSinkConfig.getTimeFormat();
this.compressCodec = textFileSinkConfig.getCompressCodec();
}

@Override
Expand Down Expand Up @@ -111,7 +117,21 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
if (fsDataOutputStream == null) {
try {
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
if (compressCodec != null) {
CompressFormat compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase(Locale.ROOT));
switch (compressFormat) {
case LZO:
LzopCodec lzo = new LzopCodec();
OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
fsDataOutputStream = new FSDataOutputStream(out, null);
break;
default:
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
}
} else {
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
}

beingWrittenOutputStream.put(filePath, fsDataOutputStream);
isFirstWrite.put(filePath, true);
} catch (IOException e) {
Expand Down