Skip to content

Commit 900e280

Browse files
committed
Rename to sparkgeo; initial version of spark_read_geojson
1 parent 791a204 commit 900e280

14 files changed

+73
-59
lines changed

DESCRIPTION

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
Package: sparknyc
1+
Package: sparkgeo
22
Type: Package
3-
Title: sparknyc: NYC neighborhood matching
3+
Title: Geospatial Analytics for Spark
44
Version: 0.1.0
55
Authors@R: c(person(family = "Mirai Solutions GmbH", role = "aut",
66
email = "info@mirai-solutions.com"),
@@ -10,13 +10,13 @@ Authors@R: c(person(family = "Mirai Solutions GmbH", role = "aut",
1010
email = "nicola.lambiase@mirai-solutions.com"),
1111
person("Ömer", "Demirel", role = c("ctb"),
1212
email = "omer.demirel@mirai-solutions.com"))
13-
Description: The 'sparknyc' package is a 'sparklyr' extension package providing NYC geospatial
14-
neighborhood matching.
13+
Description: The 'sparkgeo' package is a 'sparklyr' extension package providing geospatial
14+
analytics capabilities to Spark and R.
1515
Depends: R (>= 3.3.2)
1616
Imports: sparklyr
1717
Suggests: dplyr
18-
License: GPL-3
19-
SystemRequirements: Java (>= 1.8)
18+
License: GPL-3 | file LICENSE
19+
SystemRequirements: Java (>= 1.8), Spark 2.x
2020
Encoding: UTF-8
2121
LazyData: true
2222
RoxygenNote: 6.0.1

NAMESPACE

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Generated by roxygen2: do not edit by hand
22

3-
export(sparknyc_register)
3+
export(spark_read_geojson)
4+
export(sparkgeo_register_udfs)
45
importFrom(sparklyr,invoke_static)
56
importFrom(sparklyr,register_extension)
7+
importFrom(sparklyr,spark_read_source)
68
importFrom(sparklyr,spark_session)

R/dependencies.R

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ spark_dependencies <- function(spark_version, scala_version, ...) {
22
sparklyr::spark_dependency(
33
jars = c(
44
system.file(
5-
sprintf("java/sparknyc-%s-%s.jar", spark_version, scala_version),
6-
package = "sparknyc"
5+
sprintf("java/sparkgeo-%s-%s.jar", spark_version, scala_version),
6+
package = "sparkgeo"
77
)
88
),
99
packages = sprintf("harsha2010:magellan:1.0.5-s_%s", scala_version)

R/main.R

-6
This file was deleted.

R/spark_read_geojson.R

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#' @title Reading GeoJSON files
2+
#' @description Imports data from GeoJSON files into Spark DataFrames.
3+
#' @param sc \code{\link[sparklyr]{spark_connection}} provided by sparklyr.
4+
#' @param name The name to assign to the newly generated table (see also
5+
#' \code{\link[sparklyr]{spark_read_source}}).
6+
#' @param path The path to the GeoJSON file. This may be a local path or
7+
#' an HDFS path.
8+
#' @param magellanIndex \code{logical} specifying whether geometries should
9+
#' be indexed when loading the data (see
10+
#' \url{https://github.com/harsha2010/magellan#creating-indexes-while-loading-data}).
11+
#' Indexing creates an additional column called "index" which holds the list of
12+
#' ZOrder curves of the given precision (see argument \code{magellanIndexPrecision}).
13+
#' Defaults to \code{TRUE}.
14+
#' @param magellanIndexPrecision \code{integer} specifying the precision to use for creating
15+
#' the ZOrder curves.
16+
#' @param ... Additional arguments passed to \code{\link[sparklyr]{spark_read_source}}.
17+
#' @return A \code{tbl_spark} which provides a \code{dplyr}-compatible reference to a
18+
#' Spark DataFrame.
19+
#' @references
20+
#' \url{https://github.com/harsha2010/magellan}
21+
#' \url{http://geojson.org/}
22+
#' @family Spark serialization routines
23+
#' @seealso \code{\link[sparklyr]{spark_read_source}}
24+
#' @keywords file, connection
25+
#' @importFrom sparklyr spark_read_source
26+
#' @export
27+
spark_read_geojson <- function(sc, name, path, magellanIndex = TRUE, magellanIndexPrecision = 30L, ...) {
28+
spark_read_source(
29+
sc = sc,
30+
name = name,
31+
source = "magellan",
32+
options = list(
33+
"type" = "geojson",
34+
"magellan.index" = ifelse(magellanIndex, "true", "false"),
35+
"magellan.index.precision" = as.character(magellanIndexPrecision),
36+
"path" = path
37+
),
38+
...
39+
)
40+
}

R/sparkgeo_register_udfs.R

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#' @title Register Spark User-Defined Functions (UDFs)
2+
#' @description Registers UDFs with Spark
3+
#' @param sc \code{\link[sparklyr]{spark_connection}} provided by sparklyr.
4+
#' @importFrom sparklyr invoke_static
5+
#' @importFrom sparklyr spark_session
6+
#' @export
7+
sparkgeo_register_udfs <- function(sc) {
8+
sparklyr::invoke_static(sc, "com.miraisolutions.spark.geo.UDF", "register", spark_session(sc))
9+
invisible()
10+
}

inst/java/sparkgeo-2.0-2.11.jar

12.1 KB
Binary file not shown.

inst/java/sparkgeo-2.1-2.11.jar

12.1 KB
Binary file not shown.

inst/java/sparkgeo-2.2-2.11.jar

12.1 KB
Binary file not shown.

inst/java/sparknyc-2.0-2.11.jar

-7.73 KB
Binary file not shown.

inst/java/sparknyc-2.1-2.11.jar

-7.73 KB
Binary file not shown.

inst/java/sparknyc-2.2-2.11.jar

-7.73 KB
Binary file not shown.

java/main.scala

+12-32
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,17 @@
1-
package com.miraisolutions.spark.nyc
1+
package com.miraisolutions.spark.geo
22

3-
import org.apache.spark.sql.{DataFrame, SparkSession}
4-
import org.apache.spark.sql.magellan.dsl.expressions._
5-
import org.apache.spark.sql.functions._
3+
import org.apache.spark.sql.SparkSession
4+
import magellan.{Point, Polygon}
5+
import scala.collection.immutable.Map
66

7-
object Main {
7+
object UDF {
8+
def register(spark: SparkSession): Unit = {
9+
// See https://github.com/harsha2010/magellan#spatial-joins
10+
magellan.Utils.injectRules(spark)
811

9-
private def read_neighborhoods(spark: SparkSession, path: String): DataFrame = {
10-
import spark.implicits._
11-
12-
spark.read
13-
.format("magellan")
14-
.option("type", "geojson")
15-
.option("magellan.index", "true")
16-
.load(path)
17-
.select($"polygon", $"metadata"("neighborhood").as("name"))
18-
.cache()
19-
}
20-
21-
private var neighborhoods: DataFrame = _
22-
23-
private val match_neighborhood = (latitude: Double, longitude: Double) => {
24-
neighborhoods
25-
.filter(point(lit(longitude), lit(latitude)) within col("polygon"))
26-
.select("name")
27-
.collect()
28-
.map(_.getString(0))
29-
.headOption
30-
.getOrElse(null)
31-
}
32-
33-
def register_nyc(spark: SparkSession, neighborhoodFile: String): Unit = {
34-
neighborhoods = read_neighborhoods(spark, neighborhoodFile)
35-
spark.udf.register("neighborhood", match_neighborhood)
12+
spark.udf.register("point", (latitude: Double, longitude: Double) => Point(longitude, latitude))
13+
spark.udf.register("within", (point: Point, polygon: Polygon) => point.within(polygon))
14+
spark.udf.register("metadata_string", (metadata: Map[String, Any], name: String) =>
15+
metadata(name).asInstanceOf[String])
3616
}
3717
}

man/hello.Rd

-12
This file was deleted.

0 commit comments

Comments
 (0)