Skip to content

Commit

Permalink
Spark: Refactor SparkReadConf to use primitive type (apache#7429)
Browse files Browse the repository at this point in the history
Co-authored-by: Prashant Singh <psinghvk@amazon.com>
  • Loading branch information
singhpk234 and Prashant Singh authored Apr 27, 2023
1 parent 8ca6885 commit faddf30
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public boolean handleTimestampWithoutZone() {
.parse();
}

public Long streamFromTimestamp() {
public long streamFromTimestamp() {
return confParser
.longConf()
.option(SparkReadOptions.STREAM_FROM_TIMESTAMP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public class SparkMicroBatchStream implements MicroBatchStream {
private final boolean caseSensitive;
private final String expectedSchema;
private final Broadcast<Table> tableBroadcast;
private final Long splitSize;
private final Integer splitLookback;
private final Long splitOpenFileCost;
private final long splitSize;
private final int splitLookback;
private final long splitOpenFileCost;
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
private final boolean skipDelete;
private final boolean skipOverwrite;
private final Long fromTimestamp;
private final long fromTimestamp;

SparkMicroBatchStream(
JavaSparkContext sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public boolean handleTimestampWithoutZone() {
.parse();
}

public Long streamFromTimestamp() {
public long streamFromTimestamp() {
return confParser
.longConf()
.option(SparkReadOptions.STREAM_FROM_TIMESTAMP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ public class SparkMicroBatchStream implements MicroBatchStream {
private final boolean caseSensitive;
private final String expectedSchema;
private final Broadcast<Table> tableBroadcast;
private final Long splitSize;
private final Integer splitLookback;
private final Long splitOpenFileCost;
private final long splitSize;
private final int splitLookback;
private final long splitOpenFileCost;
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
private final boolean skipDelete;
private final boolean skipOverwrite;
private final Long fromTimestamp;
private final long fromTimestamp;

SparkMicroBatchStream(
JavaSparkContext sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public boolean handleTimestampWithoutZone() {
.parse();
}

public Long streamFromTimestamp() {
public long streamFromTimestamp() {
return confParser
.longConf()
.option(SparkReadOptions.STREAM_FROM_TIMESTAMP)
Expand All @@ -255,15 +255,15 @@ public Long endTimestamp() {
return confParser.longConf().option(SparkReadOptions.END_TIMESTAMP).parseOptional();
}

public Integer maxFilesPerMicroBatch() {
public int maxFilesPerMicroBatch() {
return confParser
.intConf()
.option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH)
.defaultValue(Integer.MAX_VALUE)
.parse();
}

public Integer maxRecordsPerMicroBatch() {
public int maxRecordsPerMicroBatch() {
return confParser
.intConf()
.option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,16 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
private final boolean caseSensitive;
private final String expectedSchema;
private final Broadcast<Table> tableBroadcast;
private final Long splitSize;
private final Integer splitLookback;
private final Long splitOpenFileCost;
private final long splitSize;
private final int splitLookback;
private final long splitOpenFileCost;
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
private final boolean skipDelete;
private final boolean skipOverwrite;
private final Long fromTimestamp;
private final Integer maxFilesPerMicroBatch;
private final Integer maxRecordsPerMicroBatch;
private final long fromTimestamp;
private final int maxFilesPerMicroBatch;
private final int maxRecordsPerMicroBatch;

SparkMicroBatchStream(
JavaSparkContext sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public boolean handleTimestampWithoutZone() {
.parse();
}

public Long streamFromTimestamp() {
public long streamFromTimestamp() {
return confParser
.longConf()
.option(SparkReadOptions.STREAM_FROM_TIMESTAMP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ public class SparkMicroBatchStream implements MicroBatchStream {
private final boolean caseSensitive;
private final String expectedSchema;
private final Broadcast<Table> tableBroadcast;
private final Long splitSize;
private final Integer splitLookback;
private final Long splitOpenFileCost;
private final long splitSize;
private final int splitLookback;
private final long splitOpenFileCost;
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
private final boolean skipDelete;
private final boolean skipOverwrite;
private final Long fromTimestamp;
private final long fromTimestamp;

SparkMicroBatchStream(
JavaSparkContext sparkContext,
Expand Down

0 comments on commit faddf30

Please sign in to comment.