Skip to content

Latest commit



84 lines (60 loc) · 2.81 KB


File metadata and controls

84 lines (60 loc) · 2.81 KB

HasOffsetRanges and OffsetRange


HasOffsetRanges represents an object that has a collection of OffsetRanges (i.e. a range of offsets from a single Kafka topic partition).

HasOffsetRanges is part of org.apache.spark.streaming.kafka010 package.

KafkaRDD is a HasOffsetRanges object.

You can access HasOffsetRanges given a KafkaRDD as follows:

import org.apache.spark.streaming.kafka010.KafkaUtils
KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
  import org.apache.spark.streaming.kafka010.OffsetRange
  val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges


OffsetRange represents a range of offsets from a single Kafka TopicPartition (i.e. a topic name and partition number).

OffsetRange holds a topic, partition number, fromOffset (inclusive) and untilOffset (exclusive) offsets.

You can create instances of OffsetRange using the factory methods from OffsetRange companion object. You can then count the number of records in a topic partition using count method.

// Start spark-shell with spark-streaming-kafka-0-10_2.11 dependency
// --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0-SNAPSHOT
import org.apache.spark.streaming.kafka010.OffsetRange

scala> val offsets = OffsetRange(topic = "spark-logs", partition = 0, fromOffset = 2, untilOffset = 5)
offsets: org.apache.spark.streaming.kafka010.OffsetRange = OffsetRange(topic: 'spark-logs', partition: 0, range: [2 -> 5])

scala> offsets.count
res0: Long = 3

scala> offsets.topicPartition
res1: org.apache.kafka.common.TopicPartition = spark-logs-0

OffsetRange is part of org.apache.spark.streaming.kafka010 package.

Creating OffsetRange Instance

You can create instances of OffsetRange using the following factory methods (from OffsetRange companion object):

  topic: String,
  partition: Int,
  fromOffset: Long,
  untilOffset: Long): OffsetRange

  topicPartition: TopicPartition,
  fromOffset: Long,
  untilOffset: Long): OffsetRange

  topic: String,
  partition: Int,
  fromOffset: Long,
  untilOffset: Long): OffsetRange

  topicPartition: TopicPartition,
  fromOffset: Long,
  untilOffset: Long): OffsetRange

Counting Records in Topic Partition — count method

count(): Long

count counts the number of records in a OffsetRange.