Skip to content

[Feature][Core] Design of Dirty Data Collection #4587

@TyrantLucifer

Description

@TyrantLucifer

Search before asking

  • I had searched in the feature and found no similar feature requirement.

Description

Design of Dirty Data Collection Functionality for Apache SeaTunnel

Introduction

The dirty data collection function can effectively guide users to discover data quality problems in a timely manner in the data integration framework, ensuring the accuracy and stability of data synchronization.

Functional Requirements

The dirty data collection function needs to support the following features:

  1. Collect and record all dirty data content and exception information.
  2. Support plug-in extension.

Technical Solution

Interface Design

Add interface DirtyRecordCollector:

/** Base interface for dirty records collector */
public interface DirtyRecordCollector extends Serializable {

    /** Collect dirty record with exception and error message */
    void collect(
            final int subTaskIndex,
            final SeaTunnelRow dirtyRecord,
            final Throwable exception,
            final String errorMessage);

    /** Collect dirty record with exception */
    default void collect(
            final int subTaskIndex, final SeaTunnelRow dirtyRecord, final Throwable exception) {
        collect(subTaskIndex, dirtyRecord, exception, "");
    }
}

Add method getDirtyRecordCollector to SinkWriter.Context:

interface Context extends Serializable {

    /** @return The index of this subtask. */
    int getIndexOfSubtask();

    /** @return metricsContext of this reader. */
    MetricsContext getMetricsContext();

    /**
     * Get dirty record collector
     */
    DirtyRecordCollector getDirtyRecordCollector();
}

Add method getPluginConfig to SeaTunnelPluginLifeCycle:

/**
 * Get the config of plugin
 */
Config getPluginConfig();

Configuration File Example

env {
  job.mode = BATCH
  dirty.collector = {
    type = log
  }
}

source {
  FakeSource {
    row.num = 100
    schema {
      fields {
        name = string
        age = int
      }
    }
  }
}

sink {
  Console {}
}

Technical Implementation

  1. Modify seatunnel-config-shade. During the transmission process, SeaTunnelSink will undergo serialization operations, but the underlying interface of typesafe-config does not support serialization, so it is necessary to make Config support serialization to ensure that it is not lost during transmission. See [Feature][Shade][typesafe-config] Make Config can be serialized #4586 for specific implementation details.
  2. Modify all SeaTunnelSink to implement the getPluginConfig interface and inject the pluginConfig during the prepare phase.
    Before instantiating the Sink, merge the plugin configuration for all Sinks, and merge the dirty data collector configuration information from the env section into the plugin configuration.
  3. Modify DefaultSinkWriterContext and SinkWriterContext to instantiate the DirtyRecordCollector using the configuration and implement the getDirtyRecordCollector method.
  4. Modify all SeaTunnelSink to report and update metrics using the dirty data collector when data writing or conversion fails.

Process Design

drawio

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions