Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR] Misc fixes in deltastreamer #10067

Merged
merged 4 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,8 @@ protected Pair<CompletableFuture, ExecutorService> startService() {
while (!isShutdownRequested()) {
try {
long start = System.currentTimeMillis();
// Send a heartbeat metrics event to track the active ingestion job for this table.
streamSync.getMetrics().updateStreamerHeartbeatTimestamp(start);
// check if deltastreamer need to update the configuration before the sync
if (configurationHotUpdateStrategyOpt.isPresent()) {
Option<TypedProperties> newProps = configurationHotUpdateStrategyOpt.get().updateProperties(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,11 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
// configured via this option. The column is then used to trigger error events.
StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)
.add(new StructField(ERROR_TABLE_CURRUPT_RECORD_COL_NAME, DataTypes.StringType, true, Metadata.empty()));
StructType nullableStruct = dataType.asNullable();
Option<Dataset<Row>> dataset = r.getBatch().map(rdd -> source.getSparkSession().read()
.option("columnNameOfCorruptRecord", ERROR_TABLE_CURRUPT_RECORD_COL_NAME).schema(dataType.asNullable())
.option("columnNameOfCorruptRecord", ERROR_TABLE_CURRUPT_RECORD_COL_NAME)
.schema(nullableStruct)
.option("mode", "PERMISSIVE")
.json(rdd));
Option<Dataset<Row>> eventsDataset = processErrorEvents(dataset,
ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public class StreamSync implements Serializable, Closeable {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(StreamSync.class);
private static final String NULL_PLACEHOLDER = "[null]";

/**
* Delta Sync Config.
Expand Down Expand Up @@ -421,14 +422,19 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
} else {
Schema newSourceSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getSourceSchema();
Schema newTargetSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getTargetSchema();
if (!(processedSchema.isSchemaPresent(newSourceSchema))
|| !(processedSchema.isSchemaPresent(newTargetSchema))) {
LOG.info("Seeing new schema. Source :" + newSourceSchema.toString(true)
+ ", Target :" + newTargetSchema.toString(true));
if ((newSourceSchema != null && !processedSchema.isSchemaPresent(newSourceSchema))
|| (newTargetSchema != null && !processedSchema.isSchemaPresent(newTargetSchema))) {
String sourceStr = newSourceSchema == null ? NULL_PLACEHOLDER : newSourceSchema.toString(true);
String targetStr = newTargetSchema == null ? NULL_PLACEHOLDER : newTargetSchema.toString(true);
LOG.info("Seeing new schema. Source: {0}, Target: {1}", sourceStr, targetStr);
// We need to recreate write client with new schema and register them.
reInitWriteClient(newSourceSchema, newTargetSchema, recordsFromSource);
processedSchema.addSchema(newSourceSchema);
processedSchema.addSchema(newTargetSchema);
if (newSourceSchema != null) {
processedSchema.addSchema(newSourceSchema);
}
if (newTargetSchema != null) {
processedSchema.addSchema(newTargetSchema);
}
}
}

Expand Down Expand Up @@ -577,7 +583,8 @@ private InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr,
ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);

checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null
&& this.userProvidedSchemaProvider.getTargetSchema() != InputBatch.NULL_SCHEMA) {
if (useRowWriter) {
inputBatchForWriter = new InputBatch(transformed, checkpointStr, this.userProvidedSchemaProvider);
} else {
Expand Down Expand Up @@ -983,6 +990,7 @@ public void runMetaSync() {
LOG.info("When set --enable-hive-sync will use HiveSyncTool for backward compatibility");
}
if (cfg.enableMetaSync) {
LOG.debug("[MetaSync] Starting sync");
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, hoodieSparkContext.hadoopConfiguration());

TypedProperties metaProps = new TypedProperties();
Expand All @@ -996,14 +1004,19 @@ public void runMetaSync() {
Map<String, HoodieException> failedMetaSyncs = new HashMap<>();
for (String impl : syncClientToolClasses) {
Timer.Context syncContext = metrics.getMetaSyncTimerContext();
boolean success = false;
try {
SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, conf, fs, cfg.targetBasePath, cfg.baseFileFormat);
success = true;
} catch (HoodieMetaSyncException e) {
LOG.warn("SyncTool class " + impl.trim() + " failed with exception", e);
LOG.error("SyncTool class {0} failed with exception {1}", impl.trim(), e);
failedMetaSyncs.put(impl, e);
}
long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0;
metrics.updateStreamerMetaSyncMetrics(getSyncClassShortName(impl), metaSyncTimeMs);
if (success) {
LOG.info("[MetaSync] SyncTool class {0} completed successfully and took {1} ", impl.trim(), metaSyncTimeMs);
}
}
if (!failedMetaSyncs.isEmpty()) {
throw getHoodieMetaSyncException(failedMetaSyncs);
Expand Down Expand Up @@ -1175,13 +1188,14 @@ private void registerAvroSchemas(SchemaProvider schemaProvider) {
*/
private void registerAvroSchemas(Schema sourceSchema, Schema targetSchema) {
// register the schemas, so that shuffle does not serialize the full schemas
if (null != sourceSchema) {
List<Schema> schemas = new ArrayList<>();
List<Schema> schemas = new ArrayList<>();
if (sourceSchema != null) {
schemas.add(sourceSchema);
if (targetSchema != null) {
schemas.add(targetSchema);
}

}
if (targetSchema != null) {
schemas.add(targetSchema);
}
if (!schemas.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Registering Schema: " + schemas);
}
Expand Down
Loading