-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Closed
Labels
Description
Search before asking
- I had searched in the feature and found no similar feature requirement.
Description
Design of Dirty Data Collection Functionality for Apache SeaTunnel
Introduction
The dirty data collection function can effectively guide users to discover data quality problems in a timely manner in the data integration framework, ensuring the accuracy and stability of data synchronization.
Functional Requirements
The dirty data collection function needs to support the following features:
- Collect and record all dirty data content and exception information.
- Support plug-in extension.
Technical Solution
Interface Design
Add interface DirtyRecordCollector
:
/** Base interface for dirty records collector */
public interface DirtyRecordCollector extends Serializable {
/** Collect dirty record with exception and error message */
void collect(
final int subTaskIndex,
final SeaTunnelRow dirtyRecord,
final Throwable exception,
final String errorMessage);
/** Collect dirty record with exception */
default void collect(
final int subTaskIndex, final SeaTunnelRow dirtyRecord, final Throwable exception) {
collect(subTaskIndex, dirtyRecord, exception, "");
}
}
Add method getDirtyRecordCollector
to SinkWriter.Context
:
interface Context extends Serializable {
/** @return The index of this subtask. */
int getIndexOfSubtask();
/** @return metricsContext of this reader. */
MetricsContext getMetricsContext();
/**
* Get dirty record collector
*/
DirtyRecordCollector getDirtyRecordCollector();
}
Add method getPluginConfig
to SeaTunnelPluginLifeCycle
:
/**
* Get the config of plugin
*/
Config getPluginConfig();
Configuration File Example
env {
job.mode = BATCH
dirty.collector = {
type = log
}
}
source {
FakeSource {
row.num = 100
schema {
fields {
name = string
age = int
}
}
}
}
sink {
Console {}
}
Technical Implementation
- Modify
seatunnel-config-shade
. During the transmission process,SeaTunnelSink
will undergo serialization operations, but the underlying interface oftypesafe-config
does not support serialization, so it is necessary to makeConfig
support serialization to ensure that it is not lost during transmission. See [Feature][Shade][typesafe-config] Make Config can be serialized #4586 for specific implementation details. - Modify all
SeaTunnelSink
to implement thegetPluginConfig
interface and inject the pluginConfig during the prepare phase.
Before instantiating the Sink, merge the plugin configuration for all Sinks, and merge the dirty data collector configuration information from the env section into the plugin configuration. - Modify
DefaultSinkWriterContext
andSinkWriterContext
to instantiate theDirtyRecordCollector
using the configuration and implement thegetDirtyRecordCollector
method. - Modify all
SeaTunnelSink
to report and update metrics using the dirty data collector when data writing or conversion fails.
Process Design
Usage Scenario
No response
Related issues
No response
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
sunxiaojian