From d8bbc3cc72b4b48a561e33e6fd037074088ff139 Mon Sep 17 00:00:00 2001 From: anubhav100 Date: Tue, 21 Feb 2017 12:15:14 +0530 Subject: [PATCH] resolved the issue for bad records not written in csv file when bad_records_action=redirect refactor the code for closing the streams --- .../newflow/steps/DataConverterProcessorStepImpl.java | 2 +- .../steps/DataConverterProcessorWithBucketingStepImpl.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java index ebc659e310d..b900a149ce4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java @@ -85,7 +85,6 @@ protected Iterator getIterator(final Iterator ch } return childIter.hasNext(); } - @Override public CarbonRowBatch next() { return processRowBatch(childIter.next(), localConverter); } @@ -163,6 +162,7 @@ private String getBadLogStoreLocation(String storeLocation) { @Override public void close() { if (!closed) { + createBadRecordLogger().closeStreams(); super.close(); if (converters != null) { for (RowConverter converter : converters) { diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java index af66ad7b284..db8cf420fb5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java @@ -190,6 +190,7 @@ private String getBadLogStoreLocation(String storeLocation) { public void close() { if (!closed) { super.close(); + createBadRecordLogger().closeStreams(); if (converters != null) { for (RowConverter converter : converters) { converter.finish();