Skip to content

Added support for IsNotNull & IsNull filters #222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ The Spark-Riak connector enables you to connect Spark applications to Riak KV an
* Construct a Spark RDD using Riak KV bucket's enhanced 2i query (a.k.a. full bucket read)
* Perform parallel full bucket reads from a Riak KV bucket into multiple partitions

## Compatibility
## Version Compatibility

| Connector | Spark | Riak TS | Riak KV |
|------------|-------|---------|---------|
| 2.X | 2.X | 1.5 | 2.2.0 |
| 1.6.X | 1.6.X | 1.4 | 2.2.0 |

* Riak TS 1.3.1+
* Apache Spark 1.6+
* Scala 2.10 and 2.11
* Java 8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ trait RiakTSPartitioner {
}
recursiveInterpolateFirst(sql, values.iterator)
}

//scalastyle:off
protected def toSql(columnNames: Option[Seq[String]], tableName: String, schema: Option[StructType], whereConstraints: (String, Seq[Any])): (String, Seq[Any]) = {
var values: Seq[Any] = Seq.empty[Nothing]
val sql = "SELECT " +
Expand All @@ -112,26 +112,32 @@ trait RiakTSPartitioner {
}

/** Construct Sql clause */
protected def filterToSqlAndValue(filter: Any): (String, Any) = {
protected def filterToSqlAndValue(filter: Any): (String, Option[Any]) = {
val (attribute, sqlOperator, value) = filter match {
case EqualTo(a, v) => (a, "=", v)
case LessThan(a, v) => (a, "<", v)
case LessThanOrEqual(a, v) => (a, "<=", v)
case GreaterThan(a, v) => (a, ">", v)
case GreaterThanOrEqual(a, v) => (a, ">=", v)
case IsNotNull(a) => (a, "IS NOT NULL", None)
case IsNull(a) => (a, "IS NULL", None)
case _ =>
throw new UnsupportedOperationException(
s"It's not a valid filter $filter to be pushed down, only >, <, >=, <= and = are allowed.")
s"It's not a valid filter $filter to be pushed down, only is not null, is null, >, <, >=, <= and = are allowed.")
}

// TODO: need to add pattern matching for values, to be sure that they are used correctly
(s"$attribute $sqlOperator ?", value)
}

value match {
case None => (s"$attribute $sqlOperator", None)
case _ => (s"$attribute $sqlOperator ?", Some(value))
}
}
//scalastyle:on
protected def whereClause(filters: Array[Filter]): (String, Seq[Any]) = {
val sqlValue = filters.map(filterToSqlAndValue)
val sql = sqlValue.map(_._1).mkString(" AND ")
val args = sqlValue.map(_._2)
val args = sqlValue.flatMap(_._2) // Changed to use flatMap to remove None arguments
(sql, args.seq)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package com.basho.riak.spark.rdd

import com.basho.riak.spark.rdd.connector.RiakConnector
import com.basho.riak.spark.rdd.partitioner.RiakTSCoveragePlanBasedPartitioner
import org.apache.spark.sql.sources._
import org.junit.{Rule, Test}
import org.junit.runner.RunWith
import org.junit.Assert._
import org.junit.rules.ExpectedException
import org.mockito.Mock
import org.mockito.runners.MockitoJUnitRunner

@RunWith(classOf[MockitoJUnitRunner])
class Filters2sqlConversionTest {

@Mock
private val rc: RiakConnector = null

val _expectedException: ExpectedException = ExpectedException.none()

@Rule
def expectedException: ExpectedException = _expectedException

private val bucketName = "test"

private val equalTo = EqualTo("field1", "value1")
private val lessThan = LessThan("field2", 2)
private val lessThanOrEqual = LessThanOrEqual("field3", 3)
private val greaterThan = GreaterThan("field4", 4)
private val greaterThanOrEqual = GreaterThanOrEqual("field5", 5)
private val isNotNull = IsNotNull("field6")
private val isNull = IsNull("field7")

// Unsupported filters
private val equalNullSafe = EqualNullSafe("field", "value")
private val in = In("field", Array("val0","val1"))
private val and = And(equalTo, lessThan)
private val or = Or(equalTo,lessThan)
private val not = Not(equalTo)
private val stringStartsWith = StringStartsWith("field","value")
private val stringEndsWith = StringEndsWith("field","value")
private val stringContains = StringContains("field","value")

private def verifyFilters(expectedSql: String, filters: Filter*) = {
val partitioner = new RiakTSCoveragePlanBasedPartitioner(rc, bucketName, None, None, filters.toArray, new ReadConf())
assertEquals(expectedSql, partitioner.query)
}

private def verifyUnsupportedFilter(filter: Filter, expectedFilter: String) = {
expectedException.expect(classOf[UnsupportedOperationException])
expectedException.expectMessage(s"It's not a valid filter $expectedFilter " +
s"to be pushed down, only is not null, is null, >, <, >=, <= and = are allowed")
new RiakTSCoveragePlanBasedPartitioner(rc, bucketName, None, None, Array(filter), new ReadConf())
}

@Test
def testEqualToConversion(): Unit = {
verifyFilters("SELECT * FROM test WHERE field1 = 'value1'", equalTo)
}

@Test
def testLessThanConversion(): Unit = {
verifyFilters("SELECT * FROM test WHERE field2 < 2", lessThan)
}

@Test
def testLessThanOrEqualConversion(): Unit = {
verifyFilters("SELECT * FROM test WHERE field3 <= 3", lessThanOrEqual)
}

@Test
def testGreaterThanConversion(): Unit = {
verifyFilters("SELECT * FROM test WHERE field4 > 4", greaterThan)
}

@Test
def testGreaterThanOrEqualConversion(): Unit = {
verifyFilters("SELECT * FROM test WHERE field5 >= 5", greaterThanOrEqual)
}

@Test
def testIsNotNullConversion(): Unit = {
verifyFilters("SELECT * FROM test WHERE field6 IS NOT NULL", isNotNull)
}

@Test
def testIsNullConversion(): Unit = {
verifyFilters("SELECT * FROM test WHERE field7 IS NULL", isNull)
}

@Test
def testMultipleFiltersConversion(): Unit = {
verifyFilters("SELECT * FROM test WHERE field1 = 'value1' AND field2 < 2 AND field6 IS NOT NULL",
equalTo,lessThan, isNotNull)
}

@Test
def testUnsuportedFiltersEqualNullSafeConversion(): Unit = {
verifyUnsupportedFilter(equalNullSafe, "EqualNullSafe(field,value)")
}

@Test
def testUnsuportedFiltersInConversion(): Unit = {
verifyUnsupportedFilter(in, "In(field, [val0,val1]")
}

@Test
def testUnsuportedFiltersAndConversion(): Unit = {
verifyUnsupportedFilter(and, "And(EqualTo(field1,value1),LessThan(field2,2))")
}

@Test
def testUnsuportedFiltersOrConversion(): Unit = {
verifyUnsupportedFilter(or, "Or(EqualTo(field1,value1),LessThan(field2,2))")
}

@Test
def testUnsuportedFiltersNotConversion(): Unit = {
verifyUnsupportedFilter(not, "Not(EqualTo(field1,value1))")
}

@Test
def testUnsuportedFiltersStringStartsWithConversion(): Unit = {
verifyUnsupportedFilter(stringStartsWith, "StringStartsWith(field,value)")
}

@Test
def testUnsuportedFiltersStringEndsWithConversion(): Unit = {
verifyUnsupportedFilter(stringEndsWith, "StringEndsWith(field,value)")
}

@Test
def testUnsuportedFiltersStringContainsConversion(): Unit = {
verifyUnsupportedFilter(stringContains, "StringContains(field,value)")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ class TimeSeriesWriteTest extends AbstractTimeSeriesTest(false) {
}

@Test
@Ignore //FIXME: doesn't work even with null value in Row
def dataFrameWriteWithEmptyCells(): Unit = {
val sqlContext = new SQLContext(sc)

Expand All @@ -197,7 +196,7 @@ class TimeSeriesWriteTest extends AbstractTimeSeriesTest(false) {
val udfGetMillis = udf(getMillis)

val tsRows = Seq[org.apache.spark.sql.Row] (
org.apache.spark.sql.Row(2L, "f", 111111L, "test", None),
org.apache.spark.sql.Row(2L, "f", 111111L, "test", null),
org.apache.spark.sql.Row(2L, "f", 111222L, "test", 123.123),
org.apache.spark.sql.Row(2L, "f", 111333L, "test", 345.34)
)
Expand Down