Skip to content

Commit

Permalink
[feature][connector-jdbc]Add Save Mode function and Connector-JDBC (M…
Browse files Browse the repository at this point in the history
…ySQL) connector has been realized (apache#5663)
  • Loading branch information
chl-wxp authored Oct 30, 2023
1 parent 245705d commit eff17cc
Show file tree
Hide file tree
Showing 34 changed files with 1,474 additions and 224 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,30 @@ jobs:
env:
MAVEN_OPTS: -Xmx4096m

jdbc-connectors-it-part-7:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
runs-on: ${{ matrix.os }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: run jdbc connectors integration test (part-6)
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1C verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-7 -am -Pci
env:
MAVEN_OPTS: -Xmx4096m


kafka-connector-it:
needs: [ changes, sanity-check ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public enum SeaTunnelAPIErrorCode implements SeaTunnelErrorCode {
DATABASE_ALREADY_EXISTED("API-07", "Database already existed"),
TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
HANDLE_SAVE_MODE_FAILED("API-09", "Handle save mode failed"),
;
SOURCE_ALREADY_HAS_DATA("API-10", "The target data source already has data"),
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@
* The SaveMode for the Sink connectors that use table or other table structures to organize data
*/
public enum DataSaveMode {
// Will drop table in MySQL, Will drop path for File Connector.
DROP_SCHEMA,

// Only drop the data in MySQL, Only drop the files in the path for File Connector.
KEEP_SCHEMA_DROP_DATA,
// Preserve database structure and delete data
DROP_DATA,

// Keep the table and data and continue to write data to the existing table for MySQL. Keep the
// path and files in the path, create new files in the path.
KEEP_SCHEMA_AND_DATA,
// Preserve database structure, preserve data
APPEND_DATA,

// The connector provides custom processing methods, such as running user provided SQL or shell
// scripts, etc
// User defined processing
CUSTOM_PROCESSING,

// Throw error when table is exists for MySQL. Throw error when path is exists.
ERROR_WHEN_EXISTS
// When there exist data, an error will be reported
ERROR_WHEN_DATA_EXISTS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

import lombok.AllArgsConstructor;

import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SINK_TABLE_NOT_EXIST;
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SOURCE_ALREADY_HAS_DATA;

@AllArgsConstructor
public class DefaultSaveModeHandler implements SaveModeHandler {

public SchemaSaveMode schemaSaveMode;
public DataSaveMode dataSaveMode;
public Catalog catalog;
public TablePath tablePath;
public CatalogTable catalogTable;
public String customSql;

public DefaultSaveModeHandler(
SchemaSaveMode schemaSaveMode,
DataSaveMode dataSaveMode,
Catalog catalog,
CatalogTable catalogTable,
String customSql) {
this(
schemaSaveMode,
dataSaveMode,
catalog,
catalogTable.getTableId().toTablePath(),
catalogTable,
customSql);
}

@Override
public void handleSchemaSaveMode() {
switch (schemaSaveMode) {
case RECREATE_SCHEMA:
recreateSchema();
break;
case CREATE_SCHEMA_WHEN_NOT_EXIST:
createSchemaWhenNotExist();
break;
case ERROR_WHEN_SCHEMA_NOT_EXIST:
errorWhenSchemaNotExist();
break;
default:
throw new UnsupportedOperationException("Unsupported save mode: " + schemaSaveMode);
}
}

@Override
public void handleDataSaveMode() {
switch (dataSaveMode) {
case DROP_DATA:
keepSchemaDropData();
break;
case APPEND_DATA:
keepSchemaAndData();
break;
case CUSTOM_PROCESSING:
customProcessing();
break;
case ERROR_WHEN_DATA_EXISTS:
errorWhenDataExists();
break;
default:
throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode);
}
}

protected void recreateSchema() {
if (tableExists()) {
dropTable();
}
createTable();
}

protected void createSchemaWhenNotExist() {
if (!tableExists()) {
createTable();
}
}

protected void errorWhenSchemaNotExist() {
if (!tableExists()) {
throw new SeaTunnelRuntimeException(SINK_TABLE_NOT_EXIST, "The sink table not exist");
}
}

protected void keepSchemaDropData() {
if (tableExists()) {
truncateTable();
}
}

protected void keepSchemaAndData() {}

protected void customProcessing() {
executeCustomSql();
}

protected void errorWhenDataExists() {
if (dataExists()) {
throw new SeaTunnelRuntimeException(
SOURCE_ALREADY_HAS_DATA, "The target data source already has data");
}
}

protected boolean tableExists() {
return catalog.tableExists(tablePath);
}

protected void dropTable() {
catalog.dropTable(tablePath, true);
}

protected void createTable() {
catalog.createTable(tablePath, catalogTable, true);
}

protected void truncateTable() {
catalog.truncateTable(tablePath, true);
}

protected boolean dataExists() {
return catalog.isExistsData(tablePath);
}

protected void executeCustomSql() {
catalog.executeSql(tablePath, customSql);
}

@Override
public void close() throws Exception {
catalog.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

package org.apache.seatunnel.api.sink;

/** The Sink Connectors which support data SaveMode should implement this interface */
public interface SupportDataSaveMode {
String SAVE_MODE_KEY = "savemode";
/**
* Return the value of DataSaveMode configured by user in the job config file.
*
* @return
*/
DataSaveMode getUserConfigSaveMode();
public interface SaveModeHandler extends AutoCloseable {

/** The implementation of specific logic according to different {@link DataSaveMode} */
void handleSaveMode(DataSaveMode userConfigSaveMode);
void handleSchemaSaveMode();

void handleDataSaveMode();

default void handleSaveMode() {
handleSchemaSaveMode();
handleDataSaveMode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

public enum SchemaSaveMode {

// Will create when the table does not exist, delete and rebuild when the table is saved
RECREATE_SCHEMA,

// Will Created when the table does not exist, skipped when the table is saved
CREATE_SCHEMA_WHEN_NOT_EXIST,

// Error will be reported when the table does not exist
ERROR_WHEN_SCHEMA_NOT_EXIST,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

public final class SinkReplaceNameConstant {

public static final String REPLACE_TABLE_NAME_KEY = "${table_name}";

public static final String REPLACE_SCHEMA_NAME_KEY = "${schema_name}";

public static final String REPLACE_DATABASE_NAME_KEY = "${database_name}";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import java.util.Optional;

/** The Sink Connectors which support schema and data SaveMode should implement this interface */
public interface SupportSaveMode {

String DATA_SAVE_MODE_KEY = "data_save_mode";

String SCHEMA_SAVE_MODE_KEY = "schema_save_mode";

// This method defines the return of a specific save_mode handler
Optional<SaveModeHandler> getSaveModeHandler();
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,25 @@ void createDatabase(TablePath tablePath, boolean ignoreIfExists)
void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException;

/**
* Truncate an existing table data in this catalog.
*
* @param tablePath Path of the table
* @param ignoreIfNotExists Flag to specify behavior when a table with the given name doesn't
* exist
* @throws TableNotExistException thrown if the table doesn't exist in the catalog and
* ignoreIfNotExists is false
* @throws CatalogException in case of any runtime exception
*/
default void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {}

default boolean isExistsData(TablePath tablePath) {
return false;
}

default void executeSql(TablePath tablePath, String sql) {}

// todo: Support for update table metadata

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ public void test() {
Options.key("save_mode")
.singleChoice(
DataSaveMode.class,
Arrays.asList(
DataSaveMode.DROP_SCHEMA,
DataSaveMode.KEEP_SCHEMA_DROP_DATA))
.defaultValue(DataSaveMode.DROP_SCHEMA)
Arrays.asList(DataSaveMode.APPEND_DATA, DataSaveMode.DROP_DATA))
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("save mode test");

OptionRule build = OptionRule.builder().optional(stringOption, saveModeOption).build();
Expand All @@ -58,6 +56,6 @@ public void test() {
option = optionalOptions.get(1);
singleChoiceOption = (SingleChoiceOption) option;
Assertions.assertEquals(2, singleChoiceOption.getOptionValues().size());
Assertions.assertEquals(DataSaveMode.DROP_SCHEMA, singleChoiceOption.defaultValue());
Assertions.assertEquals(DataSaveMode.APPEND_DATA, singleChoiceOption.defaultValue());
}
}
Loading

0 comments on commit eff17cc

Please sign in to comment.