Skip to content

Commit 37f5f9d

Browse files
committed
Documentation and test were adjusted to describe/cover Spark Datasets API usage.
1 parent 41a2748 commit 37f5f9d

File tree

3 files changed

+84
-2
lines changed

3 files changed

+84
-2
lines changed

connector/src/test/scala/com/basho/riak/spark/rdd/timeseries/AbstractTimeSeriesTest.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright (c) 2015 Basho Technologies, Inc.
2+
* Copyright (c) 2015-2017 Basho Technologies, Inc.
33
*
44
* This file is provided to you under the Apache License,
55
* Version 2.0 (the "License"); you may not use this file
@@ -101,7 +101,8 @@ abstract class AbstractTimeSeriesTest(val createTestData: Boolean = true) extend
101101
new Cell(f.temperature_k))
102102
)
103103

104-
final val sqlWhereClause = s"WHERE time >= $queryFromMillis AND time <= $queryToMillis AND surrogate_key = 1 AND family = 'f'"
104+
final val filterExpression = s"time >= $queryFromMillis AND time <= $queryToMillis AND surrogate_key = 1 AND family = 'f'"
105+
final val sqlWhereClause = s"WHERE $filterExpression"
105106

106107
final val sqlQuery = s"SELECT surrogate_key, family, time, user_id, temperature_k FROM $bucketName $sqlWhereClause"
107108

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright (c) 2015-2017 Basho Technologies, Inc.
3+
*
4+
* This file is provided to you under the Apache License,
5+
* Version 2.0 (the "License"); you may not use this file
6+
* except in compliance with the License. You may obtain
7+
* a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing,
12+
* software distributed under the License is distributed on an
13+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
* KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations
16+
* under the License.
17+
*/
18+
package com.basho.riak.spark.rdd.timeseries
19+
20+
import com.basho.riak.spark.rdd.RiakTSTests
21+
import org.junit.Test
22+
import org.junit.experimental.categories.Category
23+
24+
/**
25+
* @author Sergey Galkin <srggal at gmail dot com>
26+
*/
27+
@Category(Array(classOf[RiakTSTests]))
28+
class SparkDataSetTest extends AbstractTimeSeriesTest {
29+
30+
@Test
31+
def genericLoadAsDataSet(): Unit = {
32+
import sparkSession.implicits._
33+
34+
val ds = sparkSession.read
35+
.format("org.apache.spark.sql.riak")
36+
.option("spark.riakts.bindings.timestamp", "useLong")
37+
.load(bucketName)
38+
.filter(filterExpression)
39+
.as[TimeSeriesData]
40+
41+
val data: Array[TimeSeriesData] = ds.collect()
42+
43+
// -- verification
44+
assertEqualsUsingJSONIgnoreOrder(
45+
"""
46+
|[
47+
| {time:111111, user_id:'bryce', temperature_k:305.37},
48+
| {time:111222, user_id:'bryce', temperature_k:300.12},
49+
| {time:111333, user_id:'bryce', temperature_k:295.95},
50+
| {time:111444, user_id:'ratman', temperature_k:362.121},
51+
| {time:111555, user_id:'ratman', temperature_k:3502.212}
52+
|]
53+
""".stripMargin, data)
54+
}
55+
}

docs/using-connector.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Scroll down or click below for the desired information:
1212
- [Writing Data To TS Table](./using-connector.md#writing-data-to-ts-table)
1313
- [Spark Dataframes With KV Bucket](./using-connector.md#spark-dataframes-with-kv-bucket)
1414
- [Spark Dataframes With TS Table](./using-connector.md#spark-dataframes-with-ts-table)
15+
- [Spark DataSets With TS Table](./using-connector.md#spark-datasets-with-ts-table)
1516
- [Partitioning for KV Buckets](./using-connector.md#partitioning-for-kv-buckets)
1617
- [Working With TS Dates](./using-connector.md#working-with-ts-dates)
1718
- [Partitioning for Riak TS Table Queries](./using-connector.md#partitioning-for-riak-ts-table-queries)
@@ -419,6 +420,31 @@ inputDF.write \
419420
So far SaveMode.Append is the only mode available.
420421
Any of the Spark Connector options can be provided in `.option()` or `.options()`.
421422

423+
## Spark Datasets With TS Table
424+
Spark Datasets aka strongly typed Dataframes might be created in a very similar manner to the dataframe, there are only two difference:
425+
426+
* Datasets requires to have an Encoder; builtin encoders for common Scala types and their product types are already available in implicits object, and you only need to import these implicits as follows:
427+
```scala
428+
import spark.implicits._
429+
```
430+
431+
* the data type should be provided by calling `as()` routine
432+
433+
Here is an example of a Dataset creation:
434+
```scala
435+
import spark.implicits._
436+
437+
case class TimeSeriesData(time: Long, user_id: String, temperature_k: Double)
438+
439+
val ds = sparkSession.read
440+
.format("org.apache.spark.sql.riak")
441+
.option("spark.riakts.bindings.timestamp", "useLong")
442+
.load(bucketName)
443+
.filter(filterExpression)
444+
.as[TimeSeriesData]
445+
```
446+
447+
NOTE: There is no Datasets support for Python since Spark does not support this.
422448

423449
## Partitioning for KV Buckets
424450

0 commit comments

Comments
 (0)