diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java index d4665282be12..8809213582ac 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java @@ -87,7 +87,7 @@ public Pair>> fetchSource() t .setBasePath(service.getCfg().targetBasePath) .build(); String instantTime = InProcessTimeGenerator.createNewInstantTime(); - InputBatch inputBatch = service.readFromSource(instantTime, metaClient).getLeft(); + InputBatch inputBatch = service.readFromSource(instantTime, metaClient); return Pair.of(inputBatch.getSchemaProvider(), Pair.of(inputBatch.getCheckpointForNextBatch(), (JavaRDD) inputBatch.getBatch().get())); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java index 6c87f53a5652..0fd7a41ab556 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java @@ -64,7 +64,7 @@ public class SparkSampleWritesUtils { private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class); - public static Option getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc, JavaRDD records, HoodieWriteConfig writeConfig) { + public static Option getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc, Option> recordsOpt, HoodieWriteConfig writeConfig) { if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) { LOG.debug("Skip overwriting record size estimate as it's disabled."); return Option.empty(); @@ -76,7 +76,7 @@ public static Option getWriteConfigWithRecordSizeEstimate(Jav } try { String instantTime = getInstantFromTemporalAccessor(Instant.now().atZone(ZoneId.systemDefault())); - Pair result = doSampleWrites(jsc, records, writeConfig, instantTime); + Pair result = doSampleWrites(jsc, recordsOpt, writeConfig, instantTime); if (result.getLeft()) { long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight()); LOG.info("Overwriting record size estimate to " + avgSize); @@ -90,7 +90,7 @@ public static Option getWriteConfigWithRecordSizeEstimate(Jav return Option.empty(); } - private static Pair doSampleWrites(JavaSparkContext jsc, JavaRDD records, HoodieWriteConfig writeConfig, String instantTime) + private static Pair doSampleWrites(JavaSparkContext jsc, Option> recordsOpt, HoodieWriteConfig writeConfig, String instantTime) throws IOException { final String sampleWritesBasePath = getSampleWritesBasePath(jsc, writeConfig, instantTime); HoodieTableMetaClient.withPropertyBuilder() @@ -109,25 +109,31 @@ private static Pair doSampleWrites(JavaSparkContext jsc, JavaRD .withAutoCommit(true) .withPath(sampleWritesBasePath) .build(); + Pair emptyRes = Pair.of(false, null); try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) { int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE); - List samples = records.coalesce(1).take(size); - sampleWriteClient.startCommitWithTime(instantTime); - JavaRDD writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime); - if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) { - LOG.error(String.format("sample writes for table %s failed with errors.", writeConfig.getTableName())); - if (LOG.isTraceEnabled()) { - LOG.trace("Printing out the top 100 errors"); - writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> { - LOG.trace("Global error :", ws.getGlobalError()); - ws.getErrors().forEach((key, throwable) -> - LOG.trace(String.format("Error for key: %s", key), throwable)); - }); + return recordsOpt.map(records -> { + List samples = records.coalesce(1).take(size); + if (samples.isEmpty()) { + return emptyRes; } - return Pair.of(false, null); - } else { - return Pair.of(true, sampleWritesBasePath); - } + sampleWriteClient.startCommitWithTime(instantTime); + JavaRDD writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime); + if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) { + LOG.error(String.format("sample writes for table %s failed with errors.", writeConfig.getTableName())); + if (LOG.isTraceEnabled()) { + LOG.trace("Printing out the top 100 errors"); + writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> { + LOG.trace("Global error :", ws.getGlobalError()); + ws.getErrors().forEach((key, throwable) -> + LOG.trace(String.format("Error for key: %s", key), throwable)); + }); + } + return emptyRes; + } else { + return Pair.of(true, sampleWritesBasePath); + } + }).orElse(emptyRes); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index fc0160f57e23..21df7a74b0e7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -402,32 +402,26 @@ public Pair, JavaRDD> syncOnce() throws IOException .build(); String instantTime = metaClient.createNewInstantTime(); - Pair inputBatchIsEmptyPair = readFromSource(instantTime, metaClient); + InputBatch inputBatch = readFromSource(instantTime, metaClient); - if (inputBatchIsEmptyPair != null) { - final JavaRDD recordsFromSource; - if (useRowWriter) { - recordsFromSource = hoodieSparkContext.emptyRDD(); - } else { - recordsFromSource = (JavaRDD) inputBatchIsEmptyPair.getKey().getBatch().get(); - } + if (inputBatch != null) { // this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start // compactor if (writeClient == null) { - this.schemaProvider = inputBatchIsEmptyPair.getKey().getSchemaProvider(); + this.schemaProvider = inputBatch.getSchemaProvider(); // Setup HoodieWriteClient and compaction now that we decided on schema - setupWriteClient(recordsFromSource); + setupWriteClient(inputBatch.getBatch()); } else { - Schema newSourceSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getSourceSchema(); - Schema newTargetSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getTargetSchema(); + Schema newSourceSchema = inputBatch.getSchemaProvider().getSourceSchema(); + Schema newTargetSchema = inputBatch.getSchemaProvider().getTargetSchema(); if ((newSourceSchema != null && !processedSchema.isSchemaPresent(newSourceSchema)) || (newTargetSchema != null && !processedSchema.isSchemaPresent(newTargetSchema))) { String sourceStr = newSourceSchema == null ? NULL_PLACEHOLDER : newSourceSchema.toString(true); String targetStr = newTargetSchema == null ? NULL_PLACEHOLDER : newTargetSchema.toString(true); LOG.info("Seeing new schema. Source: {0}, Target: {1}", sourceStr, targetStr); // We need to recreate write client with new schema and register them. - reInitWriteClient(newSourceSchema, newTargetSchema, recordsFromSource); + reInitWriteClient(newSourceSchema, newTargetSchema, inputBatch.getBatch()); if (newSourceSchema != null) { processedSchema.addSchema(newSourceSchema); } @@ -454,7 +448,7 @@ public Pair, JavaRDD> syncOnce() throws IOException } } - result = writeToSinkAndDoMetaSync(instantTime, inputBatchIsEmptyPair.getKey(), inputBatchIsEmptyPair.getValue(), metrics, overallTimerContext); + result = writeToSinkAndDoMetaSync(instantTime, inputBatch, metrics, overallTimerContext); } metrics.updateStreamerSyncMetrics(System.currentTimeMillis()); @@ -484,7 +478,7 @@ private Option getLastPendingCompactionInstant(Option co * @throws Exception in case of any Exception */ - public Pair readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException { + public InputBatch readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException { // Retrieve the previous round checkpoints, if any Option resumeCheckpointStr = Option.empty(); if (commitsTimelineOpt.isPresent()) { @@ -499,7 +493,7 @@ public Pair readFromSource(String instantTime, HoodieTableM int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1; int curRetryCount = 0; - Pair sourceDataToSync = null; + InputBatch sourceDataToSync = null; while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) { try { sourceDataToSync = fetchFromSourceAndPrepareRecords(resumeCheckpointStr, instantTime, metaClient); @@ -519,7 +513,7 @@ public Pair readFromSource(String instantTime, HoodieTableM return sourceDataToSync; } - private Pair fetchFromSourceAndPrepareRecords(Option resumeCheckpointStr, String instantTime, + private InputBatch fetchFromSourceAndPrepareRecords(Option resumeCheckpointStr, String instantTime, HoodieTableMetaClient metaClient) { HoodieRecordType recordType = createRecordMerger(props).getRecordType(); if (recordType == HoodieRecordType.SPARK && HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ @@ -544,17 +538,14 @@ private Pair fetchFromSourceAndPrepareRecords(Option preparedInputBatchIsEmptyPair = handleEmptyBatch(useRowWriter, inputBatch, checkpointStr, schemaProvider); - if (preparedInputBatchIsEmptyPair.getValue()) { // return if empty batch - return preparedInputBatchIsEmptyPair; - } + if (useRowWriter) { // no additional processing required for row writer. - return Pair.of(inputBatch, false); + return inputBatch; } else { JavaRDD records = HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(), schemaProvider, recordType, autoGenerateRecordKeys, instantTime); - return Pair.of(new InputBatch(Option.of(records), checkpointStr, schemaProvider), false); + return new InputBatch(Option.of(records), checkpointStr, schemaProvider); } } @@ -652,33 +643,6 @@ private InputBatch fetchNextBatchFromSource(Option resumeCheckpointStr, } } - /** - * Handles empty batch from input. - * @param useRowWriter true if row write code path. - * @param inputBatch {@link InputBatch} instance to use. - * @param checkpointForNextBatch checkpiont to use for next batch. - * @param schemaProvider {@link SchemaProvider} instance of interest. - * @return a Pair of InputBatch and boolean. boolean value is set to true on empty batch. - */ - private Pair handleEmptyBatch(boolean useRowWriter, InputBatch inputBatch, - String checkpointForNextBatch, SchemaProvider schemaProvider) { - hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty"); - if (useRowWriter) { - Option> rowDatasetOptional = inputBatch.getBatch(); - if ((!rowDatasetOptional.isPresent()) || (rowDatasetOptional.get().isEmpty())) { - LOG.info("No new data, perform empty commit."); - return Pair.of(new InputBatch<>(Option.of(sparkSession.emptyDataFrame()), checkpointForNextBatch, schemaProvider), true); - } - } else { - Option> avroRDDOptional = inputBatch.getBatch(); - if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { - LOG.info("No new data, perform empty commit."); - return Pair.of(new InputBatch(Option.of(hoodieSparkContext.emptyRDD()), checkpointForNextBatch, schemaProvider), true); - } - } - return Pair.of(inputBatch, false); - } - /** * Apply schema reconcile and schema evolution rules(schema on read) and generate new target schema provider. * @@ -801,24 +765,28 @@ private HoodieWriteConfig prepareHoodieConfigForRowWriter(Schema writerSchema) { * * @param instantTime instant time to use for ingest. * @param inputBatch input batch that contains the records, checkpoint, and schema provider - * @param inputIsEmpty true if input batch is empty. * @param metrics Metrics * @param overallTimerContext Timer Context * @return Option Compaction instant if one is scheduled */ - private Pair, JavaRDD> writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch, boolean inputIsEmpty, + private Pair, JavaRDD> writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch, HoodieIngestionMetrics metrics, Timer.Context overallTimerContext) { Option scheduledCompactionInstant = Option.empty(); // write to hudi and fetch result - Pair writeClientWriteResultIsEmptyPair = writeToSink(inputBatch, instantTime, inputIsEmpty); - JavaRDD writeStatusRDD = writeClientWriteResultIsEmptyPair.getKey().getWriteStatusRDD(); - Map> partitionToReplacedFileIds = writeClientWriteResultIsEmptyPair.getKey().getPartitionToReplacedFileIds(); - boolean isEmpty = writeClientWriteResultIsEmptyPair.getRight(); + WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch, instantTime); + JavaRDD writeStatusRDD = writeClientWriteResult.getWriteStatusRDD(); + Map> partitionToReplacedFileIds = writeClientWriteResult.getPartitionToReplacedFileIds(); // process write status long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue(); long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue(); + long totalSuccessfulRecords = totalRecords - totalErrorRecords; + LOG.info(String.format("instantTime=%s, totalRecords=%d, totalErrorRecords=%d, totalSuccessfulRecords=%d", + instantTime, totalRecords, totalErrorRecords, totalSuccessfulRecords)); + if (totalRecords == 0) { + LOG.info("No new data, perform empty commit."); + } boolean hasErrors = totalErrorRecords > 0; if (!hasErrors || cfg.commitOnErrors) { HashMap checkpointCommitMetadata = new HashMap<>(); @@ -863,8 +831,10 @@ private Pair, JavaRDD> writeToSinkAndDoMetaSync(Stri scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty()); } - if (!isEmpty || cfg.forceEmptyMetaSync) { + if ((totalSuccessfulRecords > 0) || cfg.forceEmptyMetaSync) { runMetaSync(); + } else { + LOG.info(String.format("Not running metaSync totalSuccessfulRecords=%d", totalSuccessfulRecords)); } } else { LOG.info("Commit " + instantTime + " failed!"); @@ -924,22 +894,20 @@ private String startCommit(String instantTime, boolean retryEnabled) { throw lastException; } - private Pair writeToSink(InputBatch inputBatch, String instantTime, boolean inputIsEmpty) { + private WriteClientWriteResult writeToSink(InputBatch inputBatch, String instantTime) { WriteClientWriteResult writeClientWriteResult = null; instantTime = startCommit(instantTime, !autoGenerateRecordKeys); - boolean isEmpty = inputIsEmpty; if (useRowWriter) { - Dataset df = (Dataset) inputBatch.getBatch().get(); + Dataset df = (Dataset) inputBatch.getBatch().orElse(hoodieSparkContext.emptyRDD()); HoodieWriteConfig hoodieWriteConfig = prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema()); BaseDatasetBulkInsertCommitActionExecutor executor = new HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, writeClient, instantTime); writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, !HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses()); } else { - JavaRDD records = (JavaRDD) inputBatch.getBatch().get(); + JavaRDD records = (JavaRDD) inputBatch.getBatch().orElse(hoodieSparkContext.emptyRDD()); // filter dupes if needed if (cfg.filterDupes) { records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(), records, writeClient.getConfig()); - isEmpty = records.isEmpty(); } HoodieWriteResult writeResult = null; @@ -973,7 +941,7 @@ private Pair writeToSink(InputBatch inputBatch, throw new HoodieStreamerException("Unknown operation : " + cfg.operation); } } - return Pair.of(writeClientWriteResult, isEmpty); + return writeClientWriteResult; } private String getSyncClassShortName(String syncClassName) { @@ -1028,15 +996,15 @@ public void runMetaSync() { * SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of * this constraint. */ - private void setupWriteClient(JavaRDD records) throws IOException { + private void setupWriteClient(Option> recordsOpt) throws IOException { if ((null != schemaProvider)) { Schema sourceSchema = schemaProvider.getSourceSchema(); Schema targetSchema = schemaProvider.getTargetSchema(); - reInitWriteClient(sourceSchema, targetSchema, records); + reInitWriteClient(sourceSchema, targetSchema, recordsOpt); } } - private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, JavaRDD records) throws IOException { + private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, Option> recordsOpt) throws IOException { LOG.info("Setting up new Hoodie Write Client"); if (HoodieStreamerUtils.isDropPartitionColumns(props)) { targetSchema = HoodieAvroUtils.removeFields(targetSchema, HoodieStreamerUtils.getPartitionColumns(props)); @@ -1044,7 +1012,7 @@ private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, JavaRDD registerAvroSchemas(sourceSchema, targetSchema); final HoodieWriteConfig initialWriteConfig = getHoodieClientConfig(targetSchema); final HoodieWriteConfig writeConfig = SparkSampleWritesUtils - .getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), records, initialWriteConfig) + .getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), recordsOpt, initialWriteConfig) .orElse(initialWriteConfig); if (writeConfig.isEmbeddedTimelineServerEnabled()) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java index e1676219ca0a..2706a97e5d5c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java @@ -80,7 +80,7 @@ public void skipOverwriteRecordSizeEstimateWhenTimelineNonEmpty() throws Excepti .withPath(basePath()) .build(); JavaRDD records = jsc().parallelize(dataGen.generateInserts(commitTime, 1), 1); - Option writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), records, originalWriteConfig); + Option writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), Option.of(records), originalWriteConfig); assertFalse(writeConfigOpt.isPresent()); assertEquals(originalRecordSize, originalWriteConfig.getCopyOnWriteRecordSizeEstimate(), "Original record size estimate should not be changed."); } @@ -100,7 +100,7 @@ public void overwriteRecordSizeEstimateForEmptyTable() { String commitTime = HoodieTestDataGenerator.getCommitTimeAtUTC(1); JavaRDD records = jsc().parallelize(dataGen.generateInserts(commitTime, 2000), 2); - Option writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), records, originalWriteConfig); + Option writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), Option.of(records), originalWriteConfig); assertTrue(writeConfigOpt.isPresent()); assertEquals(779.0, writeConfigOpt.get().getCopyOnWriteRecordSizeEstimate(), 10.0); }