Skip to content

Latest commit

 

History

History
498 lines (455 loc) · 18.5 KB

File metadata and controls

498 lines (455 loc) · 18.5 KB

Assert

Assert sink connector

Description

A flink sink plugin which can assert illegal data by user defined rules

Key Features

Options

Name Type Required Default
rules ConfigMap yes -
rules.field_rules string yes -
rules.field_rules.field_name string|ConfigMap yes -
rules.field_rules.field_type string no -
rules.field_rules.field_value ConfigList no -
rules.field_rules.field_value.rule_type string no -
rules.field_rules.field_value.rule_value numeric no -
rules.field_rules.field_value.equals_to boolean|numeric|string|ConfigList|ConfigMap no -
rules.row_rules string yes -
rules.row_rules.rule_type string no -
rules.row_rules.rule_value string no -
rules.catalog_table_rule ConfigMap no -
rules.catalog_table_rule.primary_key_rule ConfigMap no -
rules.catalog_table_rule.primary_key_rule.primary_key_name string no -
rules.catalog_table_rule.primary_key_rule.primary_key_columns ConfigList no -
rules.catalog_table_rule.constraint_key_rule ConfigList no -
rules.catalog_table_rule.constraint_key_rule.constraint_key_name string no -
rules.catalog_table_rule.constraint_key_rule.constraint_key_type string no -
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns ConfigList no -
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_column_name string no -
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_sort_type string no -
rules.catalog_table_rule.column_rule ConfigList no -
rules.catalog_table_rule.column_rule.name string no -
rules.catalog_table_rule.column_rule.type string no -
rules.catalog_table_rule.column_rule.column_length int no -
rules.catalog_table_rule.column_rule.nullable boolean no -
rules.catalog_table_rule.column_rule.default_value string no -
rules.catalog_table_rule.column_rule.comment comment no -
rules.table-names ConfigList no -
common-options no -

rules [ConfigMap]

Rule definition of user's available data. Each rule represents one field validation or row num validation.

field_rules [ConfigList]

field rules for field validation

field_name [string]

field name(string)

field_type [string | ConfigMap]

Field type declarations should adhere to this guide.

field_value [ConfigList]

A list value rule define the data value validation

rule_type [string]

The following rules are supported for now

  • NOT_NULL value can't be null
  • NULL value can be null
  • MIN define the minimum value of data
  • MAX define the maximum value of data
  • MIN_LENGTH define the minimum string length of a string data
  • MAX_LENGTH define the maximum string length of a string data
  • MIN_ROW define the minimun number of rows
  • MAX_ROW define the maximum number of rows

rule_value [numeric]

The value related to rule type. When the rule_type is MIN, MAX, MIN_LENGTH, MAX_LENGTH, MIN_ROW or MAX_ROW, users need to assign a value to the rule_value.

equals_to [boolean | numeric | string | ConfigList | ConfigMap]

equals_to is used to compare whether the field value is equal to the configured expected value. You can assign values of all types to equals_to. These types are detailed here. For instance, if one field is a row with three fields, and the declaration of row type is {a = array<string>, b = map<string, decimal(30, 2)>, c={c_0 = int, b = string}}, users can assign the value [["a", "b"], { k0 = 9999.99, k1 = 111.11 }, [123, "abcd"]] to equals_to.

The way of defining field values is consistent with FakeSource.

equals_to cannot be applied to null type fields. However, users can use the rule type NULL for verification, such as {rule_type = NULL}.

catalog_table_rule [ConfigMap]

Used to assert the catalog table is same with the user defined table.

table-names [ConfigList]

Used to assert the table should be in the data.

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Example

the whole config obey with hocon style

Assert {
    rules =
      {
        row_rules = [
          {
            rule_type = MAX_ROW
            rule_value = 10
          },
          {
            rule_type = MIN_ROW
            rule_value = 5
          }
        ],
        field_rules = [{
          field_name = name
          field_type = string
          field_value = [
            {
              rule_type = NOT_NULL
            },
            {
              rule_type = MIN_LENGTH
              rule_value = 5
            },
            {
              rule_type = MAX_LENGTH
              rule_value = 10
            }
          ]
        }, {
          field_name = age
          field_type = int
          field_value = [
            {
              rule_type = NOT_NULL
              equals_to = 23
            },
            {
              rule_type = MIN
              rule_value = 32767
            },
            {
              rule_type = MAX
              rule_value = 2147483647
            }
          ]
        }
        ]
        catalog_table_rule {
            primary_key_rule = {
                primary_key_name = "primary key"
                primary_key_columns = ["id"]
            }
            constraint_key_rule = [
                        {
                        constraint_key_name = "unique_name"
                        constraint_key_type = UNIQUE_KEY
                        constraint_key_columns = [
                            {
                                constraint_key_column_name = "id"
                                constraint_key_sort_type = ASC
                            }
                        ]
                        }
            ]
            column_rule = [
               {
                name = "id"
                type = bigint
               },
              {
                name = "name"
                type = string
              },
              {
                name = "age"
                type = int
              }
            ]
        }
      }

  }

Here is a more complex example about equals_to. The example involves FakeSource. You may want to learn it, please read this document.

source {
  FakeSource {
    row.num = 1
    schema = {
      fields {
        c_null = "null"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_date = date
        c_timestamp = timestamp
        c_time = time
        c_bytes = bytes
        c_array = "array<int>"
        c_map = "map<time, string>"
        c_map_nest = "map<string, {c_int = int, c_string = string}>"
        c_row = {
          c_null = "null"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_decimal = "decimal(30, 8)"
          c_date = date
          c_timestamp = timestamp
          c_time = time
          c_bytes = bytes
          c_array = "array<int>"
          c_map = "map<string, string>"
        }
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [
          null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
          "bWlJWmo=",
          [0, 1, 2],
          "{ 12:01:26 = v0 }",
          { k1 = [123, "BBB-BB"]},
          [
            null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
            "bWlJWmo=",
            [0, 1, 2],
            { k0 = v0 }
          ]
        ]
      }
    ]
    result_table_name = "fake"
  }
}

sink{
  Assert {
    source_table_name = "fake"
    rules =
      {
        row_rules = [
          {
            rule_type = MAX_ROW
            rule_value = 1
          },
          {
            rule_type = MIN_ROW
            rule_value = 1
          }
        ],
        field_rules = [
            {
                field_name = c_null
                field_type = "null"
                field_value = [
                    {
                        rule_type = NULL
                    }
                ]
            },
            {
                field_name = c_string
                field_type = string
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "AAA"
                    }
                ]
            },
            {
                field_name = c_boolean
                field_type = boolean
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = false
                    }
                ]
            },
            {
                field_name = c_tinyint
                field_type = tinyint
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 1
                    }
                ]
            },
            {
                field_name = c_smallint
                field_type = smallint
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 1
                    }
                ]
            },
            {
                field_name = c_int
                field_type = int
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 333
                    }
                ]
            },
            {
                field_name = c_bigint
                field_type = bigint
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 323232
                    }
                ]
            },
            {
                field_name = c_float
                field_type = float
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 3.1
                    }
                ]
            },
            {
                field_name = c_double
                field_type = double
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 9.33333
                    }
                ]
            },
            {
                field_name = c_decimal
                field_type = "decimal(30, 8)"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 99999.99999999
                    }
                ]
            },
            {
                field_name = c_date
                field_type = date
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "2012-12-21"
                    }
                ]
            },
            {
                field_name = c_timestamp
                field_type = timestamp
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "2012-12-21T12:34:56"
                    }
                ]
            },
            {
                field_name = c_time
                field_type = time
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "12:34:56"
                    }
                ]
            },
            {
                field_name = c_bytes
                field_type = bytes
                field_value = [
                      {
                          rule_type = NOT_NULL
                          equals_to = "bWlJWmo="
                      }
                ]
            },
            {
                field_name = c_array
                field_type = "array<int>"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = [0, 1, 2]
                    }
                ]
            },
            {
                field_name = c_map
                field_type = "map<time, string>"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "{ 12:01:26 = v0 }"
                    }
                ]
            },
            {
                field_name = c_map_nest
                field_type = "map<string, {c_int = int, c_string = string}>"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = { k1 = [123, "BBB-BB"] }
                    }
                ]
            },
            {
                field_name = c_row
                field_type = {
                    c_null = "null"
                    c_string = string
                    c_boolean = boolean
                    c_tinyint = tinyint
                    c_smallint = smallint
                    c_int = int
                    c_bigint = bigint
                    c_float = float
                    c_double = double
                    c_decimal = "decimal(30, 8)"
                    c_date = date
                    c_timestamp = timestamp
                    c_time = time
                    c_bytes = bytes
                    c_array = "array<int>"
                    c_map = "map<string, string>"
                }
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = [
                           null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
                           "bWlJWmo=",
                           [0, 1, 2],
                           { k0 = v0 }
                        ]
                    }
                ]
            }
        ]
    }
  }
}

Changelog

2.2.0-beta 2022-09-26

  • Add Assert Sink Connector

2.3.0-beta 2022-10-20

  • [Improve] 1.Support check the number of rows (2844) (3031):
    • check rows not empty
    • check minimum number of rows
    • check maximum number of rows
  • [Improve] 2.Support direct define of data values(row) (2844) (3031)
  • [Improve] 3.Support setting parallelism as 1 (2844) (3031)