Skip to content

Commit

Permalink
[Feature][Connector-V2][Assert] Support check the precision and scale…
Browse files Browse the repository at this point in the history
… of Decimal type. (apache#6110)
  • Loading branch information
CheneyYin authored Jan 2, 2024
1 parent 4def0f9 commit dd64ed5
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 10 deletions.
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
- [Transform-V2] Add Catalog support for FilterRowKindTransform (#4420)
- [Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
- [Transform-V2] Add catalog support for SQL Transform plugin (#4819)
- [Connector-V2] [Assert] Support check the precision and scale of Decimal type (#6110)

### Zeta(ST-Engine)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.seatunnel.connectors.seatunnel.assertion.excecutor;

import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
Expand Down Expand Up @@ -163,6 +165,21 @@ private boolean compareValue(Object value, AssertFieldRule.AssertRule valueRule)
}

private Boolean checkType(Object value, SeaTunnelDataType<?> fieldType) {
if (fieldType.getSqlType() == SqlType.DECIMAL) {
return checkDecimalType(value, fieldType);
}
return value.getClass().equals(fieldType.getTypeClass());
}

private static Boolean checkDecimalType(Object value, SeaTunnelDataType<?> fieldType) {
if (!value.getClass().equals(fieldType.getTypeClass())) {
return false;
}
DecimalType fieldDecimalType = (DecimalType) fieldType;
BigDecimal valueObj = (BigDecimal) value;
if (valueObj.scale() != fieldDecimalType.getScale()) {
return false;
}
return valueObj.precision() <= fieldDecimalType.getPrecision();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.EQUALS_TO;
Expand All @@ -39,6 +41,9 @@

public class AssertRuleParser {

private static final Pattern DECIMAL_TYPE_PATTERN =
Pattern.compile("^decimal\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)$");

public List<AssertFieldRule.AssertRule> parseRowRules(List<? extends Config> rowRuleList) {

return assembleFieldValueRules(rowRuleList);
Expand Down Expand Up @@ -91,13 +96,14 @@ private List<AssertFieldRule.AssertRule> assembleFieldValueRules(
}

private SeaTunnelDataType<?> getFieldType(String fieldTypeStr) {
if (fieldTypeStr.toLowerCase().startsWith("decimal(")) {
String lengthAndScale =
fieldTypeStr.toLowerCase().replace("decimal(", "").replace(")", "");
String[] split = lengthAndScale.split(",");
return new DecimalType(Integer.valueOf(split[0]), Integer.valueOf(split[1]));
final String normalTypeStr = fieldTypeStr.trim().toLowerCase();
Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(normalTypeStr);
if (matcher.find()) {
int precision = Integer.parseInt(matcher.group(1));
int scale = Integer.parseInt(matcher.group(2));
return new DecimalType(precision, scale);
}
return TYPES.get(fieldTypeStr.toLowerCase());
return TYPES.get(normalTypeStr);
}

private static final Map<String, SeaTunnelDataType<?>> TYPES = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.flink.assertion;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand All @@ -28,8 +29,11 @@

import com.google.common.collect.Lists;

import java.math.BigDecimal;
import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

Expand Down Expand Up @@ -111,4 +115,51 @@ private AssertFieldRule getFieldRule4Name() {
rule.setFieldRules(valueRules);
return rule;
}

@Test
public void testDecimalTypeCheck() {
List<AssertFieldRule> rules = Lists.newArrayList();
AssertFieldRule rule = new AssertFieldRule();
rule.setFieldName("c_mock");
DecimalType assertFieldType = new DecimalType(10, 2);
rule.setFieldType(assertFieldType);

AssertFieldRule.AssertRule valueRule = new AssertFieldRule.AssertRule();
valueRule.setEqualTo("99999999.90");

rule.setFieldRules(Collections.singletonList(valueRule));
rules.add(rule);

SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {new BigDecimal("99999999.90")});
SeaTunnelRowType mockType =
new SeaTunnelRowType(
new String[] {"c_mock"}, new SeaTunnelDataType[] {new DecimalType(10, 2)});

AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType, rules).orElse(null);
assertNull(failRule);
}

@Test
public void testDecimalTypeCheckError() {
List<AssertFieldRule> rules = Lists.newArrayList();
AssertFieldRule rule = new AssertFieldRule();
rule.setFieldName("c_mock");
DecimalType assertFieldType = new DecimalType(1, 0);
rule.setFieldType(assertFieldType);

AssertFieldRule.AssertRule valueRule = new AssertFieldRule.AssertRule();
valueRule.setRuleType(AssertFieldRule.AssertRuleType.NOT_NULL);
rule.setFieldRules(Collections.singletonList(valueRule));
rules.add(rule);

SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {BigDecimal.valueOf(99999999.99)});
SeaTunnelRowType mockType =
new SeaTunnelRowType(
new String[] {"c_mock"}, new SeaTunnelDataType[] {new DecimalType(10, 2)});

AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType, rules).orElse(null);
assertNotNull(failRule);
assertEquals(assertFieldType, failRule.getFieldType());
assertEquals("c_mock", failRule.getFieldName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser;

Expand All @@ -37,8 +38,39 @@ public class AssertRuleParserTest {
public void testParseRules() {
List<? extends Config> ruleConfigList = assembleConfig();
List<AssertFieldRule> assertFieldRules = parser.parseRules(ruleConfigList);
assertEquals(assertFieldRules.size(), 2);
assertEquals(assertFieldRules.get(0).getFieldType(), BasicType.STRING_TYPE);
assertEquals(3, assertFieldRules.size());

AssertFieldRule nameRule = assertFieldRules.get(0);
List<AssertFieldRule.AssertRule> nameValueRules = nameRule.getFieldRules();
assertEquals(BasicType.STRING_TYPE, nameRule.getFieldType());
assertEquals("name", nameRule.getFieldName());
assertEquals(3, nameValueRules.size());
assertEquals(AssertFieldRule.AssertRuleType.NOT_NULL, nameValueRules.get(0).getRuleType());
assertEquals(
AssertFieldRule.AssertRuleType.MIN_LENGTH, nameValueRules.get(1).getRuleType());
assertEquals(3.0, nameValueRules.get(1).getRuleValue());
assertEquals(
AssertFieldRule.AssertRuleType.MAX_LENGTH, nameValueRules.get(2).getRuleType());
assertEquals(5.0, nameValueRules.get(2).getRuleValue());

AssertFieldRule ageRule = assertFieldRules.get(1);
List<AssertFieldRule.AssertRule> ageValueRules = ageRule.getFieldRules();
assertEquals("age", ageRule.getFieldName());
assertEquals(3, ageValueRules.size());
assertEquals(AssertFieldRule.AssertRuleType.NOT_NULL, ageValueRules.get(0).getRuleType());
assertEquals(AssertFieldRule.AssertRuleType.MIN, ageValueRules.get(1).getRuleType());
assertEquals(10.0, ageValueRules.get(1).getRuleValue());
assertEquals(AssertFieldRule.AssertRuleType.MAX, ageValueRules.get(2).getRuleType());
assertEquals(20.0, ageValueRules.get(2).getRuleValue());

AssertFieldRule decimalRule = assertFieldRules.get(2);
List<AssertFieldRule.AssertRule> decimalValueRules = decimalRule.getFieldRules();
assertEquals("c_decimal", decimalRule.getFieldName());
assertEquals(new DecimalType(10, 2), decimalRule.getFieldType());
assertEquals(2, decimalValueRules.size());
assertEquals(
AssertFieldRule.AssertRuleType.NOT_NULL, decimalValueRules.get(0).getRuleType());
assertEquals("12.12", decimalValueRules.get(1).getEqualTo());
}

private List<? extends Config> assembleConfig() {
Expand Down Expand Up @@ -76,6 +108,17 @@ private List<? extends Config> assembleConfig() {
+ " rule_value = 20\n"
+ " }\n"
+ " ]\n"
+ " },{\n"
+ " field_name = c_decimal\n"
+ " field_type= \" decimal( 10 , 2 ) \"\n"
+ " field_value = [\n"
+ " {\n"
+ " rule_type = NOT_NULL\n"
+ " },\n"
+ " {\n"
+ " equals_to = \"12.12\"\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ " \n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected void insertTestData() {
+ " TRUE,"
+ " '2023-09-04',"
+ " '2023-09-04 10:30:00',"
+ " 42.12,"
+ " 42.10,"
+ " 42.12)");
}
} catch (Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ sink{
{
field_name = hive_e2e_source_table.decimal_column
field_type = "decimal(10,2)"
field_value = [{equals_to = 42.12}]
field_value = [{equals_to = "42.10"}]
},
{
field_name = hive_e2e_source_table.numeric_column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ sink {
rule_type = NOT_NULL
}
]
},
{
field_name = c_decimal
field_type = "decimal(33, 18)"
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
Expand Down

0 comments on commit dd64ed5

Please sign in to comment.