Skip to content

Commit 70917a5

Browse files
committed
Merge remote-tracking branch 'apache/master' into fix-runid
2 parents 6bc61b0 + 1667057 commit 70917a5

File tree

31 files changed

+2208
-736
lines changed

31 files changed

+2208
-736
lines changed

R/pkg/R/mllib_utils.R

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,3 @@ read.ml <- function(path) {
130130
stop("Unsupported model: ", jobj)
131131
}
132132
}
133-

dev/merge_spark_pr.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import re
3131
import subprocess
3232
import sys
33+
import traceback
3334
import urllib2
3435

3536
try:
@@ -298,24 +299,37 @@ def choose_jira_assignee(issue, asf_jira):
298299
Prompt the user to choose who to assign the issue to in jira, given a list of candidates,
299300
including the original reporter and all commentors
300301
"""
301-
reporter = issue.fields.reporter
302-
commentors = map(lambda x: x.author, issue.fields.comment.comments)
303-
candidates = set(commentors)
304-
candidates.add(reporter)
305-
candidates = list(candidates)
306-
print("JIRA is unassigned, choose assignee")
307-
for idx, author in enumerate(candidates):
308-
annotations = ["Reporter"] if author == reporter else []
309-
if author in commentors:
310-
annotations.append("Commentor")
311-
print("[%d] %s (%s)" % (idx, author.displayName, ",".join(annotations)))
312-
assignee = raw_input("Enter number of user to assign to (blank to leave unassigned):")
313-
if assignee == "":
314-
return None
315-
else:
316-
assignee = candidates[int(assignee)]
317-
asf_jira.assign_issue(issue.key, assignee.key)
318-
return assignee
302+
while True:
303+
try:
304+
reporter = issue.fields.reporter
305+
commentors = map(lambda x: x.author, issue.fields.comment.comments)
306+
candidates = set(commentors)
307+
candidates.add(reporter)
308+
candidates = list(candidates)
309+
print("JIRA is unassigned, choose assignee")
310+
for idx, author in enumerate(candidates):
311+
if author.key == "apachespark":
312+
continue
313+
annotations = ["Reporter"] if author == reporter else []
314+
if author in commentors:
315+
annotations.append("Commentor")
316+
print("[%d] %s (%s)" % (idx, author.displayName, ",".join(annotations)))
317+
raw_assignee = raw_input(
318+
"Enter number of user, or userid, to assign to (blank to leave unassigned):")
319+
if raw_assignee == "":
320+
return None
321+
else:
322+
try:
323+
id = int(raw_assignee)
324+
assignee = candidates[id]
325+
except:
326+
# assume it's a user id, and try to assign (might fail, we just prompt again)
327+
assignee = asf_jira.user(raw_assignee)
328+
asf_jira.assign_issue(issue.key, assignee.key)
329+
return assignee
330+
except:
331+
traceback.print_exc()
332+
print("Error assigning JIRA, try again (or leave blank and fix manually)")
319333

320334

321335
def resolve_jira_issues(title, merge_branches, comment):
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.kafka010
19+
20+
import java.{util => ju}
21+
import java.util.concurrent.TimeoutException
22+
23+
import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException}
24+
import org.apache.kafka.common.TopicPartition
25+
26+
import org.apache.spark.TaskContext
27+
import org.apache.spark.internal.Logging
28+
import org.apache.spark.sql.SparkSession
29+
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
30+
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
31+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
32+
import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
33+
import org.apache.spark.sql.sources.v2.reader._
34+
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
35+
import org.apache.spark.sql.types.StructType
36+
import org.apache.spark.unsafe.types.UTF8String
37+
38+
/**
39+
* A [[ContinuousReader]] for data from kafka.
40+
*
41+
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
42+
* read by per-task consumers generated later.
43+
* @param kafkaParams String params for per-task Kafka consumers.
44+
* @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which
45+
* are not Kafka consumer params.
46+
* @param metadataPath Path to a directory this reader can use for writing metadata.
47+
* @param initialOffsets The Kafka offsets to start reading data at.
48+
* @param failOnDataLoss Flag indicating whether reading should fail in data loss
49+
* scenarios, where some offsets after the specified initial ones can't be
50+
* properly read.
51+
*/
52+
class KafkaContinuousReader(
53+
offsetReader: KafkaOffsetReader,
54+
kafkaParams: ju.Map[String, Object],
55+
sourceOptions: Map[String, String],
56+
metadataPath: String,
57+
initialOffsets: KafkaOffsetRangeLimit,
58+
failOnDataLoss: Boolean)
59+
extends ContinuousReader with SupportsScanUnsafeRow with Logging {
60+
61+
private lazy val session = SparkSession.getActiveSession.get
62+
private lazy val sc = session.sparkContext
63+
64+
private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
65+
66+
// Initialized when creating read tasks. If this diverges from the partitions at the latest
67+
// offsets, we need to reconfigure.
68+
// Exposed outside this object only for unit tests.
69+
private[sql] var knownPartitions: Set[TopicPartition] = _
70+
71+
override def readSchema: StructType = KafkaOffsetReader.kafkaSchema
72+
73+
private var offset: Offset = _
74+
override def setOffset(start: ju.Optional[Offset]): Unit = {
75+
offset = start.orElse {
76+
val offsets = initialOffsets match {
77+
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
78+
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
79+
case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
80+
}
81+
logInfo(s"Initial offsets: $offsets")
82+
offsets
83+
}
84+
}
85+
86+
override def getStartOffset(): Offset = offset
87+
88+
override def deserializeOffset(json: String): Offset = {
89+
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
90+
}
91+
92+
override def createUnsafeRowReadTasks(): ju.List[ReadTask[UnsafeRow]] = {
93+
import scala.collection.JavaConverters._
94+
95+
val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)
96+
97+
val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
98+
val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
99+
val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
100+
101+
val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
102+
if (deletedPartitions.nonEmpty) {
103+
reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
104+
}
105+
106+
val startOffsets = newPartitionOffsets ++
107+
oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
108+
knownPartitions = startOffsets.keySet
109+
110+
startOffsets.toSeq.map {
111+
case (topicPartition, start) =>
112+
KafkaContinuousReadTask(
113+
topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
114+
.asInstanceOf[ReadTask[UnsafeRow]]
115+
}.asJava
116+
}
117+
118+
/** Stop this source and free any resources it has allocated. */
119+
def stop(): Unit = synchronized {
120+
offsetReader.close()
121+
}
122+
123+
override def commit(end: Offset): Unit = {}
124+
125+
override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
126+
val mergedMap = offsets.map {
127+
case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
128+
}.reduce(_ ++ _)
129+
KafkaSourceOffset(mergedMap)
130+
}
131+
132+
override def needsReconfiguration(): Boolean = {
133+
knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != knownPartitions
134+
}
135+
136+
override def toString(): String = s"KafkaSource[$offsetReader]"
137+
138+
/**
139+
* If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
140+
* Otherwise, just log a warning.
141+
*/
142+
private def reportDataLoss(message: String): Unit = {
143+
if (failOnDataLoss) {
144+
throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
145+
} else {
146+
logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
147+
}
148+
}
149+
}
150+
151+
/**
152+
* A read task for continuous Kafka processing. This will be serialized and transformed into a
153+
* full reader on executors.
154+
*
155+
* @param topicPartition The (topic, partition) pair this task is responsible for.
156+
* @param startOffset The offset to start reading from within the partition.
157+
* @param kafkaParams Kafka consumer params to use.
158+
* @param pollTimeoutMs The timeout for Kafka consumer polling.
159+
* @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
160+
* are skipped.
161+
*/
162+
case class KafkaContinuousReadTask(
163+
topicPartition: TopicPartition,
164+
startOffset: Long,
165+
kafkaParams: ju.Map[String, Object],
166+
pollTimeoutMs: Long,
167+
failOnDataLoss: Boolean) extends ReadTask[UnsafeRow] {
168+
override def createDataReader(): KafkaContinuousDataReader = {
169+
new KafkaContinuousDataReader(
170+
topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
171+
}
172+
}
173+
174+
/**
175+
* A per-task data reader for continuous Kafka processing.
176+
*
177+
* @param topicPartition The (topic, partition) pair this data reader is responsible for.
178+
* @param startOffset The offset to start reading from within the partition.
179+
* @param kafkaParams Kafka consumer params to use.
180+
* @param pollTimeoutMs The timeout for Kafka consumer polling.
181+
* @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
182+
* are skipped.
183+
*/
184+
class KafkaContinuousDataReader(
185+
topicPartition: TopicPartition,
186+
startOffset: Long,
187+
kafkaParams: ju.Map[String, Object],
188+
pollTimeoutMs: Long,
189+
failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
190+
private val topic = topicPartition.topic
191+
private val kafkaPartition = topicPartition.partition
192+
private val consumer = CachedKafkaConsumer.createUncached(topic, kafkaPartition, kafkaParams)
193+
194+
private val sharedRow = new UnsafeRow(7)
195+
private val bufferHolder = new BufferHolder(sharedRow)
196+
private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)
197+
198+
private var nextKafkaOffset = startOffset
199+
private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _
200+
201+
override def next(): Boolean = {
202+
var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
203+
while (r == null) {
204+
if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false
205+
// Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving
206+
// interrupt points to end the query rather than waiting for new data that might never come.
207+
try {
208+
r = consumer.get(
209+
nextKafkaOffset,
210+
untilOffset = Long.MaxValue,
211+
pollTimeoutMs,
212+
failOnDataLoss)
213+
} catch {
214+
// We didn't read within the timeout. We're supposed to block indefinitely for new data, so
215+
// swallow and ignore this.
216+
case _: TimeoutException =>
217+
218+
// This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range,
219+
// or if it's the endpoint of the data range (i.e. the "true" next offset).
220+
case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
221+
val range = consumer.getAvailableOffsetRange()
222+
if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) {
223+
// retry
224+
} else {
225+
throw e
226+
}
227+
}
228+
}
229+
nextKafkaOffset = r.offset + 1
230+
currentRecord = r
231+
true
232+
}
233+
234+
override def get(): UnsafeRow = {
235+
bufferHolder.reset()
236+
237+
if (currentRecord.key == null) {
238+
rowWriter.setNullAt(0)
239+
} else {
240+
rowWriter.write(0, currentRecord.key)
241+
}
242+
rowWriter.write(1, currentRecord.value)
243+
rowWriter.write(2, UTF8String.fromString(currentRecord.topic))
244+
rowWriter.write(3, currentRecord.partition)
245+
rowWriter.write(4, currentRecord.offset)
246+
rowWriter.write(5,
247+
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(currentRecord.timestamp)))
248+
rowWriter.write(6, currentRecord.timestampType.id)
249+
sharedRow.setTotalSize(bufferHolder.totalSize)
250+
sharedRow
251+
}
252+
253+
override def getOffset(): KafkaSourcePartitionOffset = {
254+
KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset)
255+
}
256+
257+
override def close(): Unit = {
258+
consumer.close()
259+
}
260+
}

0 commit comments

Comments
 (0)