Skip to content

Commit de02de4

Browse files
committed
Trivial style improvements
1 parent e458a60 commit de02de4

File tree

3 files changed

+8
-6
lines changed

3 files changed

+8
-6
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToRowConverter.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.kafka010
1919

20+
import java.sql.Timestamp
21+
2022
import scala.collection.JavaConverters._
2123

2224
import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -37,13 +39,13 @@ private[kafka010] class KafkaRecordToRowConverter {
3739
val toInternalRowWithoutHeaders: Record => InternalRow =
3840
(cr: Record) => InternalRow(
3941
cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset,
40-
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), cr.timestampType.id
42+
DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)), cr.timestampType.id
4143
)
4244

4345
val toInternalRowWithHeaders: Record => InternalRow =
4446
(cr: Record) => InternalRow(
4547
cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset,
46-
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), cr.timestampType.id,
48+
DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)), cr.timestampType.id,
4749
if (cr.headers.iterator().hasNext) {
4850
new GenericArrayData(cr.headers.iterator().asScala
4951
.map(header =>

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private[kafka010] class KafkaRelation(
109109
}
110110
val rdd = new KafkaSourceRDD(
111111
sqlContext.sparkContext, executorKafkaParams, offsetRanges,
112-
pollTimeoutMs, failOnDataLoss).map(toInternalRow(_))
112+
pollTimeoutMs, failOnDataLoss).map(toInternalRow)
113113
sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schema).rdd
114114
}
115115

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester
9292
test("new KafkaDataConsumer instance in case of Task retry") {
9393
try {
9494
val kafkaParams = getKafkaParams()
95-
val key = new CacheKey(groupId, topicPartition)
95+
val key = CacheKey(groupId, topicPartition)
9696

9797
val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
9898
TaskContext.setTaskContext(context1)
@@ -168,9 +168,9 @@ class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester
168168
// value
169169
assert(expected._1 === actual._1)
170170
// headers
171-
expected._2 zip actual._2 foreach { case (l, r) =>
171+
expected._2.zip(actual._2).foreach { case (l, r) =>
172172
// header key
173-
assert(l._1 == r._1)
173+
assert(l._1 === r._1)
174174
// header value
175175
assert(l._2 === r._2)
176176
}

0 commit comments

Comments
 (0)