Skip to content

Spark 2.0 support #221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ cache:
- "$HOME/.sbt"

scala:
- 2.10.6
# - 2.10.6
- 2.11.7

before_script:
- "./travis/start-cluster.sh"
- curl -q -sSL http://d3kbcqa49mib13.cloudfront.net/spark-1.6.2-bin-hadoop2.6.tgz | tar -zxf - -C /opt
- curl -q -sSL http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz | tar -zxf - -C /opt

script:
- "./travis/build.sh"
Expand All @@ -32,7 +32,7 @@ env:
- RIAK_FLAVOR=riak-ts
- RIAK_FLAVOR=riak-kv
global:
- SPARK_HOME=/opt/spark-1.6.2-bin-hadoop2.6
- SPARK_HOME=/opt/spark-2.1.0-bin-hadoop2.7
- secure: r9cf5Jdfxsr0MngcKwyvmlKvA0NQF2GnKCDekbsmnQONnW5WUHJjzD5k1LbqiQHdZqqxb72RnOpIEXfJhasteseofsGbZMU5ROAFiohl8WhxHnFb65pMWrXzKg/iqdKhcKZg5akP2fCIDwMz4xVFz0JrG3CtHotjQQ0/6HQ7uDCG2On7h9QB6pt50IarFGRBKd4wBTCkhR+6QKfE4S13mI8gGvq/R7ly7tfqY7i/vFg3z0KN0Vb/+KLwYv/b2aBTBZUli0L/tfBZUQbt9J5Ty9k6lnw+2vlFV9nhKRXMoQjuFi4VDskO1n59eu9pJZ9bhIYngdpnUlrrTpLzRbnXfqSXDufShA4PFOa0EQX/9meFdd1O54y2dnR2K1Gd80Id+NtC66eMa5FND03vGtqNrjjOW1yHkR9FxUhfCMpPWGWXbyBruk/M2Qou+pWp9F7BEvYOeVAkvA2mZ3AGriuBi1iSbJhpDH0plZsQAk2pnXn4CEbXVtpX759KPiA3YyHaMcSpC4QSdXaDqoYGyqazgZL1/MHCKuG8crCn4xe4/4ZbBNLldILtGC6KINas/YhgBctwkY7Sq7VDfxHaMCrSFBDkB0nHRAfG8H3ap5XfaLQkGArPsh0QsjVS92wcbnulIXK3Huf8HLPQTcy+aNyvgTE4O0lMuoHZUA9Tja84kok=
- secure: n3+vykdc/JiFBGXgUCt3O1sfWGexlWStERslgeTCxQJXQOv7UbpH/c7tH9wnsndA6Hd1vWLk6MUXu0cH/BrI97LejoCQ+uGm1CHSlOFJ6hSEuC2wxOKGOjbH9HzFQd5EaNIa5XhFisRRjK+anPXicV1zwsda5NFpspVwGcVs+ai7pnh6ysvPOiXKU19uaFPPvv3z9oO1qo7+99fJAR4ty21VGoJmH+hgK5DJ4QLOWdaKgsdclA49Ek9Fwd1Ni+lZW98imnVhHQdtpUqScfYHZcXQTRs6E/daN34Z6cwZ0v8UrX+Y3yatRhUTBy4zE8Nq+OVi02u7kaIAw7rtb5echFfUTW4nMgBSvGnnV3t0MY66JZoEpBzxWaWvhqMdmwbmPpDPdBG/NtPvshtFE6Dle6eBzzGJdu3lpK/QRgeJrdHkDpKt5aX2lAmnGJo0EwQQ6G5U1dQ7ScIZTJndSObyGB7IJtbsc3HFLA4YU5SAdMjQxHREvg+XzXuaL/mMbyJ3ZWA5M5acH43N5UkVKMpqfggdICb6rKdnRKQZRsVtv+oiBGawD1ue5gnsTziwD2xneJ9MSOJ4PQOPfoUHs8q39n8gvsxHpnkPwxgsF7Ed0vXzN7kt5OF/nR1rJc97ZSjSn6oNMYqWfdVRjZHhEVo6EcgdV0xN++3LTVgR4SR+NSs=
- secure: eigM8++nPdZRaIq4AM6VFef/X2R+SlCdvh+HICdCz2A+eBiulDNSMH5AzFwriCbUBljffDaWeqHcdmq8R4st9XcVZEk/mtIGs6j24H76ypjb0C6pH9tvze6yzFoCmc2cXa3x+NUcAwgXnM5rzmJYVk8P4ZwwxGsQ/IF7Gr4WgcFaBpg2DVYgKML4DsS+xHEOnN3WOGHg14qTDd+lu4kv99JCRTVYDtqmw3fZO658qQxzat0c2+pRwvRlGFfp2U7U0zDaAhU39CXMGi7nzBEAaWjw5ObVtCa7MQZd6Fh7l8KPV4goA4ngnKBpnJPTvsSrnm3VoP+coT9F6nU+IQCnJW+7N6R8szY1wGteWqrw8I+XA+ts1lo4A2+fnkykzO8TqhS2K5xCCp4EOu4wGRWfHKRQnvdnXKO85FtOIjxdrdsaaA+EG6v3YHvs8rg3VxAsjo1x/cHNFchac5/AUrz3QfJoKWbjybwPMl/sFm01JQMvQLGjPARSLFuUJG9OQQMCmHwxzUGuQXG11Ls4zZWHiJxUub1dhDKV94mc7jJfcyIQd9jyodcgDAB5/Sin7Q9r4Gye/z3R0i4iwx0SNKYNxG15XLFeAk0YxW+h90U3K3t4SjVYW54rtbn156CnImhVIRmRCAJ8IlJJ55+9ObTtTFsNEKS0OaXlSfdGxZOinXQ=
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Change Log
## [2.0](https://github.com/basho/spark-riak-connector/releases/tag/v2.0.0)

* Spark 2.0.2 support; RiakCatalog, RiakSQLContext were removed due to changes in Spark 2.0 API

## [1.6.2](https://github.com/basho/spark-riak-connector/releases/tag/v1.6.2)
Critical fix Python KV: if object values are JSON objects with list fields (empty or not) then exception happens (https://github.com/basho/spark-riak-connector/pull/219).
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ MAINTAINER Alexey Suprun <asuprun@contractor.basho.com>
# These options could be changed during starting container using --build-arg property with folliwing syntax:
# --build-arg ARGUMENT_NAME=value
ARG SBT_VERSION=0.13.12
ARG SPARK_VERSION=1.6.1
ARG SPARK_HADOOP_VERSION=hadoop2.6
ARG SPARK_VERSION=2.1.0
ARG SPARK_HADOOP_VERSION=hadoop2.7

# Set env vars
ENV SBT_HOME /usr/local/sbt
Expand Down
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ The Spark-Riak connector enables you to connect Spark applications to Riak KV an
* Construct a Spark RDD using Riak KV bucket's enhanced 2i query (a.k.a. full bucket read)
* Perform parallel full bucket reads from a Riak KV bucket into multiple partitions

## Compatibility
## Version Compatibility

| Connector | Spark | Riak TS | Riak KV |
|------------|-------|---------|---------|
| 2.X | 2.X | 1.5 | 2.2.0 |
| 1.6.X | 1.6.X | 1.4 | 2.2.0 |

* Riak TS 1.3.1+
* Apache Spark 1.6+
* Scala 2.10 and 2.11
* Java 8

Expand Down
19 changes: 13 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ lazy val sparkRiakConnector = (project in file("connector"))
}.filter(_.contains("test-utils")).mkString(":")
val uberJar = buildDir.relativize((assemblyOutputPath in assembly).value.toPath)

if(!scalaBinaryVersion.value.equals("2.11")) {
if(scalaBinaryVersion.value.equals("2.11")) {
val rtnCode = s"connector/python/test.sh $uberJar:$cp $pyTestMark" ! streams.value.log
//val rtnCode = s"docker build -t $namespace ." #&& s"docker run --rm -i -e RIAK_HOSTS=$riakHosts -e SPARK_CLASSPATH=$uberJar:$cp -v ${buildDir.toString}:/usr/src/$namespace -v ${home.toString}/.ivy2:/root/.ivy2 -v /var/run/docker.sock:/var/run/docker.sock -v /usr/bin/docker:/bin/docker -w /usr/src/$namespace $namespace ./connector/python/test.sh" ! streams.value.log
if (rtnCode != 0) {
Expand All @@ -101,7 +101,8 @@ lazy val examples = (project in file("examples"))
.settings(
name := s"$namespace-examples",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming-kafka" % Versions.spark,
"org.apache.spark" %% "spark-streaming-kafka" % Versions.sparkStreamingKafka
exclude("org.scalatest", s"scalatest_${scalaBinaryVersion.value}"),
"org.apache.kafka" %% "kafka" % Versions.kafka))
.settings(publishSettings)
.dependsOn(sparkRiakConnector, sparkRiakConnectorTestUtils)
Expand All @@ -122,7 +123,7 @@ lazy val sparkRiakConnectorTestUtils = (project in file("test-utils"))
lazy val commonSettings = Seq(
organization := "com.basho.riak",
version := "1.6.3-SNAPSHOT",
scalaVersion := "2.10.6",
scalaVersion := "2.11.8",
crossPaths := true,
spName := s"basho/$namespace",
sparkVersion := Versions.spark,
Expand All @@ -142,8 +143,10 @@ lazy val commonDependencies = Seq(
"com.basho.riak" % "riak-client" % Versions.riakClient exclude("io.netty", "netty-all")
exclude("org.slf4j", "slf4j-api")
exclude("com.fasterxml.jackson.datatype", "jackson-datatype-joda"),
"org.apache.spark" %% "spark-sql" % Versions.spark % "provided",
"org.apache.spark" %% "spark-streaming" % Versions.spark % "provided",
"org.apache.spark" %% "spark-sql" % Versions.spark % "provided"
exclude("org.scalatest", s"scalatest_${scalaBinaryVersion.value}"),
"org.apache.spark" %% "spark-streaming" % Versions.spark % "provided"
exclude("org.scalatest", s"scalatest_${scalaBinaryVersion.value}"),
"com.google.guava" % "guava" % Versions.guava,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jacksonModule exclude("com.google.guava", "guava")
exclude("com.google.code.findbugs", "jsr305")
Expand All @@ -155,7 +158,11 @@ lazy val commonDependencies = Seq(
"org.powermock" % "powermock-module-junit4" % Versions.powermokc % "test",
"org.powermock" % "powermock-api-mockito" % Versions.powermokc % "test",
"com.novocode" % "junit-interface" % Versions.junitInterface % "test",
"com.basho.riak.test" % "riak-test-docker" % Versions.riakTestDocker % "test"
"com.basho.riak.test" % "riak-test-docker" % Versions.riakTestDocker % "test",
"com.spotify" % "docker-client" % "5.0.2" % "test"
exclude("com.fasterxml.jackson.core", "jackson-databind")
exclude("com.fasterxml.jackson.core", "jackson-annotations")
exclude("com.fasterxml.jackson.core", "jackson-core")
),

// Connector will use same version of Jackson that Spark uses. No need to incorporate it into uber jar.
Expand Down
28 changes: 21 additions & 7 deletions connector/python/tests/pyspark_tests_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,39 @@
import pytest
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, SQLContext, Row
from pyspark import SparkContext, SparkConf, Row
from pyspark.sql import SparkSession
import riak, pyspark_riak

@pytest.fixture(scope="session")
def docker_cli(request):
# Start spark context to get access to py4j gateway
conf = SparkConf().setMaster("local[*]").setAppName("pytest-pyspark-py4j")
sc = SparkContext(conf=conf)
sparkSession = SparkSession.builder.config(conf).getOrCreate()
sc = sparkSession.sparkContext
docker_cli = sc._gateway.jvm.com.basho.riak.test.cluster.DockerRiakCluster(1, 2)
docker_cli.start()
sc.stop()
# Start spark context since it's not aware of riak nodes and thus can't be used to test riak
request.addfinalizer(lambda: docker_cli.stop())
return docker_cli

@pytest.fixture(scope="session")
def spark_session(request):
if not os.environ.has_key('RIAK_HOSTS'):
docker_cli = request.getfuncargvalue('docker_cli')
host_and_port = get_host_and_port(docker_cli)
os.environ['RIAK_HOSTS'] = host_and_port
os.environ['USE_DOCKER'] = 'true'
# Start new spark context
conf = SparkConf().setMaster('local[*]').setAppName('pytest-pyspark-local-testing')
conf.set('spark.riak.connection.host', os.environ['RIAK_HOSTS'])
conf.set('spark.driver.memory', '4g')
conf.set('spark.executor.memory', '4g')
spark_session = SparkSession.builder.config(conf=conf).getOrCreate()
return spark_session


@pytest.fixture(scope="session")
def spark_context(request):
# If RIAK_HOSTS is not set, use Docker to start a Riak node
Expand All @@ -30,16 +48,12 @@ def spark_context(request):
conf.set('spark.riak.connection.host', os.environ['RIAK_HOSTS'])
conf.set('spark.driver.memory', '4g')
conf.set('spark.executor.memory', '4g')
spark_context = SparkContext(conf=conf)
spark_context = SparkSession.builder.config(conf=conf).getOrCreate().sparkContext
spark_context.setLogLevel('INFO')
pyspark_riak.riak_context(spark_context)
request.addfinalizer(lambda: spark_context.stop())
return spark_context

@pytest.fixture(scope="session")
def sql_context(request, spark_context):
sqlContext = SQLContext(spark_context)
return sqlContext

@pytest.fixture(scope="session")
def riak_client(request):
Expand Down
Loading