Skip to content

Commit

Permalink
SPARKC-94: Remove describe_splits_ex call.
Browse files Browse the repository at this point in the history
The thrift method describe_splits_ex was called to estimate the
amount of data per given token range, in order to split token ranges
into sub-splits of appropriate size. Cassandra 2.1.5
introduces the system.size_estimates table that allows us to do the
same using CQL queries, without doing deprecated thrift calls.

The new size estimates allow to control split size more precisely
than describe_splits_ex could. Cassandra exposes information
on the data size in bytes for each token range. Therefore
the configuration option spark.cassandra.input.split.size, which
previously controlled split size in number of C* partitions, has
been renamed to spark.cassandra.input.split.size_in_mb and controls
size in megabytes.

Additionally spark.cassandra.input.page.row.size property has been
renamed to spark.cassandra.input.fetch.size_in_rows to make the units
visible.
  • Loading branch information
pkolaczk committed Jun 2, 2015
1 parent 46747a7 commit 67b2d5a
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 185 deletions.
10 changes: 5 additions & 5 deletions doc/2_loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,11 @@ val emptyJoin = internalJoin.toEmptyCassandraRDD // Makes an EmptyRDD
The following options can be specified in the SparkConf object or as a jvm
-Doption to adjust the read parameters of a Cassandra table.

| Environment Variable | Controls | Default
|-----------------------------------------|------------------------------------------------------------|---------
| spark.cassandra.input.split.size | approx number of Cassandra partitions in a Spark partition | 100000
| spark.cassandra.input.page.row.size | number of CQL rows fetched per driver request | 1000
| spark.cassandra.input.consistency.level | consistency level to use when reading | LOCAL_ONE
| Environment Variable | Controls | Default
|-------------------------------------------|------------------------------------------------------------|---------
| spark.cassandra.input.split.size_in_mb | approx amount of data to be fetched into a Spark partition | 64 MB
| spark.cassandra.input.fetch.size_in_rows | number of CQL rows fetched per driver request | 1000
| spark.cassandra.input.consistency.level | consistency level to use when reading | LOCAL_ONE

### Using Implicits for Configuration

Expand Down
5 changes: 3 additions & 2 deletions doc/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ when running stages, the numbers represent (Completed Tasks + Running Tasks) / T

If you see that only a single task has been created this means that the Cassandra Token range has
not been split into a enough tasks to be well parallelized on your cluster. The number of
Spark partitions(tasks) created is directly controlled by the setting `spark.cassandra.input.split.size`.
Spark partitions(tasks) created is directly controlled by the setting
`spark.cassandra.input.split.size_in_mb`.
This number reflects the approximate number of live Cassandra Partitions in a given Spark partition.
To increase the number of Spark Partitions decrease this number from the default (100k) to one that
will sufficiently break up your C* token range. This can also be adjusted on a per cassandraTable basis
Expand Down Expand Up @@ -77,7 +78,7 @@ the executor's heap can handle. Remember that all of the executors run in the sa
of the data is multiplied by the number of executor slots.

To fix this either increase the heap size of the executors `spark.executor.memory`
or shrink the size of the partitions by decreasing `spark.cassandra.input.split.size`
or shrink the size of the partitions by decreasing `spark.cassandra.input.split.size_in_mb`

### Why can't my spark job find My Application Classes / Anonymous Functions?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ public class SparkContextJavaFunctionsTest
@Test
public void testReadConfPopulating() {
SparkConf conf = new SparkConf();
conf.set("spark.cassandra.input.page.row.size", "1234");
conf.set("spark.cassandra.input.split.size", "4321");
conf.set("spark.cassandra.input.fetch.size_in_rows", "1234");
conf.set("spark.cassandra.input.split.size_in_mb", "4321");
conf.set("spark.cassandra.input.consistency.level", "THREE");

SparkContext sc = Mockito.mock(SparkContext.class);
when(sc.getConf()).thenReturn(conf);

ReadConf readConf = CassandraJavaUtil.javaFunctions(sc).cassandraTable("a", "b").rdd().readConf();

assertEquals(readConf.fetchSize(), 1234);
assertEquals(readConf.splitSize(), 4321);
assertEquals(readConf.fetchSizeInRows(), 1234);
assertEquals(readConf.splitSizeInMB(), 4321);
assertEquals(readConf.consistencyLevel(), ConsistencyLevel.THREE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.datastax.spark.connector.rdd.partitioner

import org.scalatest.{Matchers, FlatSpec}

import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.embedded.EmbeddedCassandra
import com.datastax.spark.connector.rdd.partitioner.dht.LongToken
import com.datastax.spark.connector.testkit.SharedEmbeddedCassandra

class DataSizeEstimatesSpec extends FlatSpec with Matchers with SharedEmbeddedCassandra {

useCassandraConfig(Seq("cassandra-default.yaml.template"))
val conn = CassandraConnector(hosts = Set(EmbeddedCassandra.getHost(0)))

val keyspaceName = "data_size_estimates"

conn.withSessionDo { session =>
session.execute(
s"CREATE KEYSPACE IF NOT EXISTS $keyspaceName " +
s"WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
}

// TODO: enable this test once we upgrade to 2.1.5, which populates the size estimates table
"DataSizeEstimates" should "fetch data size estimates for a known table" ignore {
val tableName = "table1"
conn.withSessionDo { session =>
session.execute(
s"CREATE TABLE IF NOT EXISTS $keyspaceName.$tableName(key int PRIMARY KEY, value VARCHAR)")
for (i <- 1 to 10000)
session.execute(
s"INSERT INTO $keyspaceName.$tableName(key, value) VALUES (?, ?)",
i.asInstanceOf[AnyRef],
"value" + i)
}

val estimates = new DataSizeEstimates[Long, LongToken](conn, keyspaceName, tableName)
estimates.partitionCount should be > 5000L
estimates.partitionCount should be < 20000L
estimates.dataSizeInBytes should be > 0L
}

it should "should return zeroes for an empty table" in {
val tableName = "table2"
conn.withSessionDo { session =>
session.execute(
s"CREATE TABLE IF NOT EXISTS $keyspaceName.$tableName(key int PRIMARY KEY, value VARCHAR)")
}

val estimates = new DataSizeEstimates[Long, LongToken](conn, keyspaceName, tableName)
estimates.partitionCount shouldBe 0L
estimates.dataSizeInBytes shouldBe 0L
}

it should "return zeroes for a non-existing table" in {
val tableName = "table3"
val estimates = new DataSizeEstimates[Long, LongToken](conn, keyspaceName, tableName)
estimates.partitionCount shouldBe 0L
estimates.dataSizeInBytes shouldBe 0L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ trait CassandraTableRowReaderProvider[R] {

protected def readConf: ReadConf

protected def fetchSize: Int = readConf.fetchSize
protected def fetchSize: Int = readConf.fetchSizeInRows

protected def splitSize: Int = readConf.splitSize
protected def splitSizeInMB: Int = readConf.splitSizeInMB

protected def consistencyLevel: ConsistencyLevel = readConf.consistencyLevel

/** RowReaderFactory and ClassTag should be provided from implicit parameters in the constructor
/** RowReaderFactory and ClassTag should be exitprovided from implicit parameters in the constructor
* of the class implementing this trait
* @see CassandraTableScanRDD */
protected val rowReaderFactory: RowReaderFactory[R]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import com.datastax.spark.connector.util.Quote._
* To reduce the number of roundtrips to Cassandra, every partition is fetched in batches.
*
* The following properties control the number of partitions and the fetch size:
* - spark.cassandra.input.split.size: approx number of Cassandra partitions in a Spark partition,
* default 100000
* - spark.cassandra.input.page.row.size: number of CQL rows fetched per roundtrip,
* - spark.cassandra.input.split.size_in_mb: approx amount of data to be fetched into a single Spark
* partition, default 64 MB
* - spark.cassandra.input.fetch.size_in_rows: number of CQL rows fetched per roundtrip,
* default 1000
*
* A `CassandraRDD` object gets serialized and sent to every Spark Executor, which then
Expand Down Expand Up @@ -117,7 +117,7 @@ class CassandraTableScanRDD[R] private[connector](
override def getPartitions: Array[Partition] = {
verify() // let's fail fast
val tf = TokenFactory.forCassandraPartitioner(cassandraPartitionerClassName)
val partitions = new CassandraRDDPartitioner(connector, tableDef, splitSize)(tf).partitions(where)
val partitions = new CassandraRDDPartitioner(connector, tableDef, splitSizeInMB)(tf).partitions(where)
logDebug(s"Created total ${partitions.length} partitions for $keyspaceName.$tableName.")
logTrace("Partitions: \n" + partitions.mkString("\n"))
partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,35 @@ import org.apache.spark.SparkConf

/** Read settings for RDD
*
* @param splitSize number of Cassandra partitions to be read in a single Spark task
* @param fetchSize number of CQL rows to fetch in a single round-trip to Cassandra
* @param splitSizeInMB number of Cassandra partitions to be read in a single Spark task
* @param fetchSizeInRows number of CQL rows to fetch in a single round-trip to Cassandra
* @param consistencyLevel consistency level for reads, default LOCAL_ONE;
* higher consistency level will disable data-locality
* @param taskMetricsEnabled whether or not enable task metrics updates (requires Spark 1.2+) */
case class ReadConf(
splitSize: Int = ReadConf.DefaultSplitSize,
fetchSize: Int = ReadConf.DefaultFetchSize,
splitSizeInMB: Int = ReadConf.DefaultSplitSizeInMB,
fetchSizeInRows: Int = ReadConf.DefaultFetchSizeInRows,
consistencyLevel: ConsistencyLevel = ReadConf.DefaultConsistencyLevel,
taskMetricsEnabled: Boolean = ReadConf.DefaultReadTaskMetricsEnabled)


object ReadConf {

val ReadFetchSizeProperty = "spark.cassandra.input.page.row.size"
val ReadSplitSizeProperty = "spark.cassandra.input.split.size"
val ReadFetchSizeInRowsProperty = "spark.cassandra.input.fetch.size_in_rows"
val ReadSplitSizeInMBProperty = "spark.cassandra.input.split.size_in_mb"
val ReadConsistencyLevelProperty = "spark.cassandra.input.consistency.level"
val ReadTaskMetricsProperty = "spark.cassandra.input.metrics"

// Whitelist for allowed Read environment variables
val Properties = Set(
ReadFetchSizeProperty,
ReadSplitSizeProperty,
ReadFetchSizeInRowsProperty,
ReadSplitSizeInMBProperty,
ReadConsistencyLevelProperty,
ReadTaskMetricsProperty
)

val DefaultSplitSize = 100000
val DefaultFetchSize = 1000
val DefaultSplitSizeInMB = 64 // 64 MB
val DefaultFetchSizeInRows = 1000
val DefaultConsistencyLevel = ConsistencyLevel.LOCAL_ONE
val DefaultReadTaskMetricsEnabled = true

Expand All @@ -43,8 +43,8 @@ object ReadConf {
ConfigCheck.checkConfig(conf)

ReadConf(
fetchSize = conf.getInt(ReadFetchSizeProperty, DefaultFetchSize),
splitSize = conf.getInt(ReadSplitSizeProperty, DefaultSplitSize),
fetchSizeInRows = conf.getInt(ReadFetchSizeInRowsProperty, DefaultFetchSizeInRows),
splitSizeInMB = conf.getInt(ReadSplitSizeInMBProperty, DefaultSplitSizeInMB),
consistencyLevel = ConsistencyLevel.valueOf(
conf.get(ReadConsistencyLevelProperty, DefaultConsistencyLevel.name())),
taskMetricsEnabled = conf.getBoolean(ReadTaskMetricsProperty, DefaultReadTaskMetricsEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class CassandraRDDPartitioner[V, T <: Token[V]](
new TokenRange(startToken, endToken, replicas, None)
}

private def describeRing(client: Cassandra.Iface): Seq[TokenRange] = {
private def describeRing: Seq[TokenRange] = {
connector.withClusterDo { cluster =>
val metadata = cluster.getMetadata
for (tr <- metadata.getTokenRanges.toSeq) yield tokenRange(tr, metadata)
Expand Down Expand Up @@ -77,28 +77,16 @@ class CassandraRDDPartitioner[V, T <: Token[V]](
end - start + tokenFactory.totalTokenCount
}

/** Rows per token average is required for fast local range splitting.
* Used only for Murmur3Partitioner and RandomPartitioner. */
private def estimateCassandraPartitionsPerToken(tokenRanges: Seq[TokenRange]): Double = {
val random = new scala.util.Random(0)
val tokenRangeSample = random.shuffle(tokenRanges).take(CassandraRDDPartitioner.TokenRangeSampleSize)
val splitter = new ServerSideTokenRangeSplitter(connector, keyspaceName, tableName, tokenFactory)
val splits = splitsOf(tokenRangeSample, splitter)
val tokenCountSum = splits.map(tokenCount).sum
val rowCountSum = splits.map(_.rowCount.get).sum
rowCountSum.toDouble / tokenCountSum.toDouble
}

private def createSplitterFor(tokenRanges: Seq[TokenRange]): TokenRangeSplitter[V, T] = {
private def createTokenRangeSplitter: TokenRangeSplitter[V, T] = {
val dataSizeEstimates = new DataSizeEstimates(connector, keyspaceName, tableName)
val dataSize = dataSizeEstimates.dataSizeInBytes
tokenFactory.asInstanceOf[TokenFactory[_, _]] match {
case TokenFactory.RandomPartitionerTokenFactory =>
val partitionsPerToken = estimateCassandraPartitionsPerToken(tokenRanges)
new RandomPartitionerTokenRangeSplitter(partitionsPerToken).asInstanceOf[TokenRangeSplitter[V, T]]
new RandomPartitionerTokenRangeSplitter(dataSize).asInstanceOf[TokenRangeSplitter[V, T]]
case TokenFactory.Murmur3TokenFactory =>
val partitionsPerToken = estimateCassandraPartitionsPerToken(tokenRanges)
new Murmur3PartitionerTokenRangeSplitter(partitionsPerToken).asInstanceOf[TokenRangeSplitter[V, T]]
new Murmur3PartitionerTokenRangeSplitter(dataSize).asInstanceOf[TokenRangeSplitter[V, T]]
case _ =>
new ServerSideTokenRangeSplitter(connector, keyspaceName, tableName, tokenFactory)
throw new UnsupportedOperationException(s"Unsupported TokenFactory $tokenFactory")
}
}

Expand Down Expand Up @@ -130,9 +118,9 @@ class CassandraRDDPartitioner[V, T <: Token[V]](
def partitions(whereClause: CqlWhereClause): Array[Partition] = {
connector.withCassandraClientDo {
client =>
val tokenRanges = describeRing(client)
val tokenRanges = describeRing
val endpointCount = tokenRanges.map(_.replicas).reduce(_ ++ _).size
val splitter = createSplitterFor(tokenRanges)
val splitter = createTokenRangeSplitter
val splits = splitsOf(tokenRanges, splitter).toSeq
val maxGroupSize = tokenRanges.size / endpointCount
val clusterer = new TokenRangeClusterer[V, T](splitSize, maxGroupSize)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.datastax.spark.connector.rdd.partitioner

import scala.collection.JavaConversions._

import org.apache.spark.Logging

import com.datastax.driver.core.exceptions.InvalidQueryException
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.rdd.partitioner.dht.{TokenFactory, Token}


/** Estimates amount of data in the Cassandra table.
* Takes token range size estimates from the `system.size_estimates` table,
* available since Cassandra 2.1.5. */
class DataSizeEstimates[V, T <: Token[V]](
conn: CassandraConnector,
keyspaceName: String,
tableName: String)(
implicit
tokenFactory: TokenFactory[V, T])
extends Logging {

/** Represents a single `system.size_estimates` table row */
private case class TokenRangeSizeEstimate(
rangeStart: T,
rangeEnd: T,
partitionsCount: Long,
meanPartitionSize: Long) {

def ringFraction: Double =
tokenFactory.ringFraction(rangeStart, rangeEnd)

def totalSizeInBytes: Long =
partitionsCount * meanPartitionSize
}

private lazy val tokenRanges: Seq[TokenRangeSizeEstimate] =
conn.withSessionDo { session =>
try {
val rs = session.execute(
"SELECT range_start, range_end, partitions_count, mean_partition_size " +
"FROM system.size_estimates " +
"WHERE keyspace_name = ? AND table_name = ?", keyspaceName, tableName)

for (row <- rs.all()) yield TokenRangeSizeEstimate(
rangeStart = tokenFactory.tokenFromString(row.getString("range_start")),
rangeEnd = tokenFactory.tokenFromString(row.getString("range_end")),
partitionsCount = row.getLong("partitions_count"),
meanPartitionSize = row.getLong("mean_partition_size")
)

// The table may not contain the estimates yet if the data was just inserted and the
// amount of data in the table was small. This is very common situation during tests,
// when we insert a few rows and immediately query them. However, for tiny data sets the lack
// of size estimates is not a problem at all, because we don't want to split tiny data anyways.
// Therefore, we're not issuing a warning if the result set was empty.
}
catch {
case e: InvalidQueryException =>
logError(
s"Failed to fetch size estimates for $keyspaceName.$tableName from system.size_estimates " +
s"table. The number of created Spark partitions may be inaccurate. " +
s"Please make sure you use Cassandra 2.1.5 or newer.", e)
Seq.empty
}
}

private lazy val ringFraction =
tokenRanges.map(_.ringFraction).sum

/** Estimates the total number of partitions in a ring */
lazy val partitionCount: Long = {
val partitionsCount = tokenRanges.map(_.partitionsCount).sum
val normalizedCount = (partitionsCount / ringFraction).toLong
logDebug(s"Estimated partition count of $keyspaceName.$tableName is $normalizedCount")
normalizedCount
}

/** Estimates the total amount of data in a table assuming no replication. */
lazy val dataSizeInBytes: Long = {
val byteCount = tokenRanges.map(_.totalSizeInBytes).sum
val normalizedCount = (byteCount / ringFraction).toLong
logDebug(s"Estimated size of $keyspaceName.$tableName is $normalizedCount bytes")
normalizedCount
}
}
Loading

0 comments on commit 67b2d5a

Please sign in to comment.