Skip to content

Commit

Permalink
Merge branch 'dev' into dev_wenjun_fixORC
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored Apr 7, 2024
2 parents 31ead24 + 01159ec commit ae5d66f
Show file tree
Hide file tree
Showing 411 changed files with 14,916 additions and 1,316 deletions.
24 changes: 22 additions & 2 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ jobs:
current_branch='${{ steps.git_init.outputs.branch }}'
pip install GitPython
workspace="${GITHUB_WORKSPACE}"
repository_owner="${GITHUB_REPOSITORY_OWNER}"
cv2_files=`python tools/update_modules_check/check_file_updates.py ua $workspace apache/dev origin/$current_branch "seatunnel-connectors-v2/**"`
true_or_false=${cv2_files%%$'\n'*}
file_list=${cv2_files#*$'\n'}
Expand Down Expand Up @@ -133,6 +134,9 @@ jobs:
api_files=`python tools/update_modules_check/check_file_updates.py ua $workspace apache/dev origin/$current_branch "seatunnel-api/**" "seatunnel-common/**" "seatunnel-config/**" "seatunnel-connectors/**" "seatunnel-core/**" "seatunnel-e2e/seatunnel-e2e-common/**" "seatunnel-formats/**" "seatunnel-plugin-discovery/**" "seatunnel-transforms-v2/**" "seatunnel-translation/**" "seatunnel-e2e/seatunnel-transforms-v2-e2e/**" "seatunnel-connectors/**" "pom.xml" "**/workflows/**" "tools/**" "seatunnel-dist/**"`
true_or_false=${api_files%%$'\n'*}
file_list=${api_files#*$'\n'}
if [[ $repository_owner == 'apache' ]];then
true_or_false='true'
fi
echo "api=$true_or_false" >> $GITHUB_OUTPUT
echo "api_files=$file_list" >> $GITHUB_OUTPUT
Expand Down Expand Up @@ -304,6 +308,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-1)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
Expand Down Expand Up @@ -333,6 +339,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-2)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
Expand Down Expand Up @@ -393,6 +401,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-4)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
Expand Down Expand Up @@ -421,6 +431,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-5)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
Expand Down Expand Up @@ -449,6 +461,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-6)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
Expand Down Expand Up @@ -477,6 +491,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-7)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
Expand Down Expand Up @@ -506,6 +522,8 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-8)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
Expand Down Expand Up @@ -538,7 +556,7 @@ jobs:
- name: run seatunnel zeta integration test
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base -am -Pci
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base,:connector-console-seatunnel-e2e -am -Pci
env:
MAVEN_OPTS: -Xmx4096m
engine-k8s-it:
Expand All @@ -560,6 +578,8 @@ jobs:
env:
KUBECONFIG: /etc/rancher/k3s/k3s.yaml
- uses: actions/checkout@v2
- name: free disk space
run: tools/github/free_disk_space.sh
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
Expand Down Expand Up @@ -977,7 +997,7 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: run jdbc connectors integration test (part-6)
- name: run jdbc connectors integration test (part-7)
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-7 -am -Pci
Expand Down
Binary file added .idea/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion config/jvm_options
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@
-XX:MaxMetaspaceSize=2g

# G1GC
-XX:+UseG1GC
-XX:+UseG1GC
116 changes: 116 additions & 0 deletions docs/en/concept/event-listener.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Event Listener

## Introduction

The SeaTunnel provides a rich event listening feature that allows you to manage the status at which data is synchronized.
This functionality is crucial when you need to listen job running status(`org.apache.seatunnel.api.event`).
This document will guide you through the usage of these parameters and how to leverage them effectively.

## Support Those Engines

> SeaTunnel Zeta<br/>
> Flink<br/>
> Spark<br/>
## API

The event API is defined in the `org.apache.seatunnel.api.event` package.

### Event Data API

- `org.apache.seatunnel.api.event.Event` - The interface for event data.
- `org.apache.seatunnel.api.event.EventType` - The enum for event type.

### Event Listener API

You can customize event handler, such as sending events to external systems

- `org.apache.seatunnel.api.event.EventHandler` - The interface for event handler, SPI will automatically load subclass from the classpath.

### Event Collect API

- `org.apache.seatunnel.api.source.SourceSplitEnumerator` - Attached event listener API to report events from `SourceSplitEnumerator`.

```java
package org.apache.seatunnel.api.source;

public interface SourceSplitEnumerator {

interface Context {

/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this enumerator.
*
* @return
*/
EventListener getEventListener();
}
}
```

- `org.apache.seatunnel.api.source.SourceReader` - Attached event listener API to report events from `SourceReader`.

```java
package org.apache.seatunnel.api.source;

public interface SourceReader {

interface Context {

/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this reader.
*
* @return
*/
EventListener getEventListener();
}
}
```

- `org.apache.seatunnel.api.sink.SinkWriter` - Attached event listener API to report events from `SinkWriter`.

```java
package org.apache.seatunnel.api.sink;

public interface SinkWriter {

interface Context {

/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this writer.
*
* @return
*/
EventListener getEventListener();
}
}
```

## Configuration Listener

To use the event listening feature, you need to configure engine config.

### Zeta Engine

Example config in your config file(seatunnel.yaml):

```
seatunnel:
engine:
event-report-http:
url: "http://example.com:1024/event/report"
headers:
Content-Type: application/json
```

### Flink Engine

You can define the implementation class of `org.apache.seatunnel.api.event.EventHandler` interface and add to the classpath to automatically load it through SPI.

Support flink version: 1.14.0+

Example: `org.apache.seatunnel.api.event.LoggingEventHandler`

### Spark Engine

You can define the implementation class of `org.apache.seatunnel.api.event.EventHandler` interface and add to the classpath to automatically load it through SPI.
1 change: 1 addition & 0 deletions docs/en/concept/schema-feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ columns = [
| type | Yes | - | The data type of the column |
| nullable | No | true | If the column can be nullable |
| columnLength | No | 0 | The length of the column which will be useful when you need to define the length |
| columnScale | No | - | The scale of the column which will be useful when you need to define the scale |
| defaultValue | No | null | The default value of the column |
| comment | No | null | The comment of the column |

Expand Down
7 changes: 7 additions & 0 deletions docs/en/connector-v2/formats/debezium-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ The MySQL products table has 4 columns (id, name, description and weight).
The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15.
Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel conf to consume this topic and interpret the change events by Debezium format.

**In this config, you must specify the `schema` and `debezium_record_include_schema` options **
- `schema` should same with your table format
- if your json data contains `schema` field, `debezium_record_include_schema` should be true, and if your json data doesn't contains `schema` field, `debezium_record_include_schema` should be false
- `{"schema" : {}, "payload": { "before" : {}, "after": {} ... } }` --> `true`
- `{"before" : {}, "after": {} ... }` --> `false`

```bash
env {
parallelism = 1
Expand All @@ -88,6 +94,7 @@ source {
weight = "string"
}
}
debezium_record_include_schema = false
format = debezium_json
}

Expand Down
6 changes: 6 additions & 0 deletions docs/en/connector-v2/sink/CosFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. |
| xml_row_tag | string | no | RECORD | Only used when file_format is xml. |
| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. |
| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. |

### path [string]

Expand Down Expand Up @@ -205,6 +206,11 @@ Specifies the tag name of the data rows within the XML file.

Specifies Whether to process data using the tag attribute format.

### encoding [string]

Only used when file_format_type is json,text,csv,xml.
The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`.

## Example

For text file format with `have_partition` and `custom_filename` and `sink_columns`
Expand Down
22 changes: 13 additions & 9 deletions docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,20 @@ We use templates to automatically create Doris tables,
which will create corresponding table creation statements based on the type of upstream data and schema type,
and the default template can be modified according to the situation.

Default template:

```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(
${rowtype_fields}
) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
"replication_num" = "1"
);
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
${rowtype_fields}
) ENGINE=OLAP
UNIQUE KEY (${rowtype_primary_key})
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
)
```

If a custom field is filled in the template, such as adding an `id` field
Expand Down
14 changes: 10 additions & 4 deletions docs/en/connector-v2/sink/FtpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ By default, we use 2PC commit to ensure `exactly-once`
|----------------------------------|---------|----------|--------------------------------------------|-------------------------------------------------------------------------------------------------------------------|
| host | string | yes | - | |
| port | int | yes | - | |
| username | string | yes | - | |
| user | string | yes | - | |
| password | string | yes | - | |
| path | string | yes | - | |
| tmp_path | string | yes | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a FTP dir. |
Expand All @@ -60,6 +60,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. |
| xml_row_tag | string | no | RECORD | Only used when file_format is xml. |
| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. |
| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. |

### host [string]

Expand All @@ -69,7 +70,7 @@ The target ftp host is required

The target ftp port is required

### username [string]
### user [string]

The target ftp username is required

Expand Down Expand Up @@ -210,6 +211,11 @@ Specifies the tag name of the data rows within the XML file.

Specifies Whether to process data using the tag attribute format.

### encoding [string]

Only used when file_format_type is json,text,csv,xml.
The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`.

## Example

For text file format simple config
Expand All @@ -219,7 +225,7 @@ For text file format simple config
FtpFile {
host = "xxx.xxx.xxx.xxx"
port = 21
username = "username"
user = "username"
password = "password"
path = "/data/ftp"
file_format_type = "text"
Expand All @@ -237,7 +243,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
FtpFile {
host = "xxx.xxx.xxx.xxx"
port = 21
username = "username"
user = "username"
password = "password"
path = "/data/ftp/seatunnel/job1"
tmp_path = "/data/ftp/seatunnel/tmp"
Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Output data to hdfs file
| xml_root_tag | string | no | RECORDS | Only used when file_format is xml, specifies the tag name of the root element within the XML file. |
| xml_row_tag | string | no | RECORD | Only used when file_format is xml, specifies the tag name of the data rows within the XML file |
| xml_use_attr_format | boolean | no | - | Only used when file_format is xml, specifies Whether to process data using the tag attribute format. |
| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. |

### Tips

Expand Down
Loading

0 comments on commit ae5d66f

Please sign in to comment.