Skip to content

Commit 8c2cb27

Browse files
committed
Tile distribution
1 parent 1e8244f commit 8c2cb27

File tree

7 files changed

+198
-18
lines changed

7 files changed

+198
-18
lines changed

README.md

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,23 @@
44

55
This repository contains different data analysis of data distribution in the OSM dataset.
66

7+
## Spark locally
8+
If you don't have access to a Spark cluster, it is possible to execute it locally.
9+
A laptop with 16Gb memory and 8 cores should be enough.
10+
In my case, I'm using a Desktop with 16cores and 32Gb RAM. Full specs at the very bottom.
11+
12+
To start Spark in local mode, after download and uncompress:
13+
```shell script
14+
sbin/start-all.sh
15+
```
16+
17+
To access to the UI: [http://localhost:8080/](http://localhost:8080/)
18+
19+
To stop Spark in local mode:
20+
```shell script
21+
sbin/stop-all.sh
22+
```
23+
724

825
## Extract blocks
926
To be able to parallelize, lets extract all blocks. Full universe will take 4 minutes:
@@ -36,14 +53,35 @@ It will take around 30 minutes.
3653
-o file:///home/angelcc/Downloads/osm/planet/distribution/nodeId/100
3754
```
3855

56+
## Tile distribution
57+
Following, example of how to distribution report for tiles of 10000x10000, locally,
58+
using 5 cores and 4Gb per core. It will take around 30 minutes.
59+
60+
```shell script
61+
/home/angelcc/apps/spark-2.4.5-bin-hadoop2.7/bin/spark-submit \
62+
--class com.simplexportal.simplexspatial.analysis.Driver \
63+
--master "spark://angelcc-B450-AORUS-ELITE:7077" \
64+
--deploy-mode cluster \
65+
--executor-memory 4G \
66+
--total-executor-cores 5 \
67+
--num-executors 1 \
68+
target/scala-2.11/simplexspatial-data-distribution-analysis-assembly-0.1.jar \
69+
tile \
70+
--latPartitions 10000 \
71+
--lonPartitions 10000 \
72+
-i file:///home/angelcc/Downloads/osm/planet/blobs \
73+
-o file:///home/angelcc/Downloads/osm/planet/distribution/tile/10000x10000
74+
```
3975

4076
## Zeppelin
41-
To start the notebook, from {root_project}/zeppelin:
77+
To start the notebook, from a temporal folder:
4278
```shell script
43-
docker run -p 8080:8080 --rm \
79+
mkdir logs notebook
80+
docker run -p 8081:8080 --rm \
4481
-v $PWD/logs:/logs \
4582
-v $PWD/notebook:/notebook \
4683
-v /home/angelcc/Downloads/osm/planet/distribution/nodeId/100:/zeppelin/data/nodeId \
84+
-v /home/angelcc/Downloads/osm/planet/distribution/tile/10000x10000:/zeppelin/data/tile \
4785
-e ZEPPELIN_LOG_DIR='/logs' \
4886
-e ZEPPELIN_NOTEBOOK_DIR='/notebook' \
4987
--name zeppelin \

src/main/scala/com/simplexportal/simplexspatial/analysis/AppConfig.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,15 @@
1717

1818
package com.simplexportal.simplexspatial.analysis
1919

20-
import com.acervera.osm4scala.model.OSMEntity
2120
import com.simplexportal.simplexspatial.analysis.AppConfig.{Command, NoneCmd}
2221

2322
case class AppConfig(
2423
cmd: Command = NoneCmd,
2524
input: String = "",
2625
output: String = "",
27-
modPartitions: Long = 0,
28-
latPartitions: Long = 0,
29-
lonPartitions: Long = 0
26+
modPartitions: Int = 0,
27+
latPartitions: Int = 0,
28+
lonPartitions: Int = 0
3029
)
3130

3231
object AppConfig {
@@ -65,28 +64,31 @@ object AppConfig {
6564
cmd(EXTRACT.id)
6665
.action((_, cfg) => cfg.copy(cmd = EXTRACT))
6766
.text("Extract blobs from osm pbf file")
67+
6868
cmd(MOD.id)
6969
.action((_, cfg) => cfg.copy(cmd = MOD))
7070
.text("Calculate distribution using a module of the node id as partitioner.")
7171
.children(
72-
opt[Long]("partitions")
72+
opt[Int]("partitions")
7373
.abbr("p")
7474
.required()
7575
.action((v, args) => args.copy(modPartitions = v))
7676
)
77+
7778
cmd(TILE.id)
7879
.action((_, cfg) => cfg.copy(cmd = TILE))
7980
.text("Calculate distribution partitioning data by Tile")
8081
.children(
81-
opt[Long]("latPartitions")
82+
opt[Int]("latPartitions")
8283
.abbr("latP")
8384
.required()
8485
.action((v, cfg) => cfg.copy(latPartitions = v)),
85-
opt[Long]("lonPartitions")
86+
opt[Int]("lonPartitions")
8687
.abbr("lonP")
8788
.required()
8889
.action((v, cfg) => cfg.copy(lonPartitions = v))
8990
)
91+
9092
checkConfig {
9193
case cfg: AppConfig if cfg.cmd == "" => failure("partitioner not present.")
9294
case _ => success

src/main/scala/com/simplexportal/simplexspatial/analysis/Driver.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
package com.simplexportal.simplexspatial.analysis
1919

2020
import com.simplexportal.simplexspatial.analysis.AppConfig._
21-
import com.simplexportal.simplexspatial.analysis.NodeIdDistribution._
21+
import org.apache.spark.SparkConf
2222
import org.apache.spark.sql.SparkSession
23-
import org.apache.spark.{SparkConf, SparkContext}
2423

2524
object Driver {
2625

@@ -39,6 +38,9 @@ object Driver {
3938
case MOD =>
4039
NodeIdDistribution
4140
.run(cfg.input, cfg.output, cfg.modPartitions)
41+
case AppConfig.TILE =>
42+
TileDistribution
43+
.run(cfg.input, cfg.output, cfg.latPartitions, cfg.lonPartitions)
4244
case EXTRACT =>
4345
println(s"Extracted ${BlocksExtraction.extractBlobs(cfg.input, cfg.output)} blocks")
4446
case x =>

src/main/scala/com/simplexportal/simplexspatial/analysis/NodeIdDistribution.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ object NodeIdDistribution {
4949
entities
5050
.withColumnRenamed("_1", "partition")
5151
.withColumnRenamed("_2", "id")
52-
.createTempView("ids_per_partition")
52+
.createTempView("mod_ids_per_partition")
5353

5454
sparkSession.sql(
55-
"select partition, max(id) as maxId, min(id) as minId, count(*) as ids from ids_per_partition group by partition"
55+
"select partition, max(id) as maxId, min(id) as minId, count(*) as ids from mod_ids_per_partition group by partition"
5656
)
5757
}
5858
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2020 Ángel Cervera Claudio
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.simplexportal.simplexspatial.analysis
19+
20+
import com.acervera.osm4scala.model.{NodeEntity, OSMEntity}
21+
import org.apache.spark.SparkContext
22+
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
23+
import org.slf4j.LoggerFactory
24+
25+
object TileDistribution {
26+
val logger = LoggerFactory.getLogger(TileDistribution.getClass.getName)
27+
28+
def run(input: String, output: String, latPartitions: Int, lonPartitions: Int)(
29+
implicit ctx: SparkContext,
30+
spark: SparkSession
31+
): Unit = {
32+
import spark.implicits._
33+
Common
34+
.fromBlobs(input, extractor(latPartitions, lonPartitions))
35+
.toDS
36+
.distribution()
37+
.write
38+
.orc(output)
39+
}
40+
41+
def extractor(
42+
latPartitions: Int,
43+
lonPartitions: Int,
44+
decimalPrecision: Byte = 6
45+
): OSMEntity => Option[(Long, Long)] = {
46+
val PRECISION_ROUNDING: Int = Math.pow(10, decimalPrecision).toInt
47+
48+
def latPartition(lat: Double): Int =
49+
((lat + 90) * PRECISION_ROUNDING).toInt / ((180 * PRECISION_ROUNDING) / latPartitions)
50+
51+
def lonPartition(lon: Double): Int =
52+
((lon + 180) * PRECISION_ROUNDING).toInt / ((360 * PRECISION_ROUNDING) / lonPartitions)
53+
54+
(entity: OSMEntity) =>
55+
entity match {
56+
case node: NodeEntity =>
57+
Some((latPartition(node.latitude) << 16 | lonPartition(node.longitude), node.id))
58+
case _ => None
59+
}
60+
}
61+
62+
implicit class Entities(entities: Dataset[(Long, Long)])(implicit sparkSession: SparkSession) {
63+
64+
def distribution(): DataFrame = {
65+
entities
66+
.withColumnRenamed("_1", "partition")
67+
.withColumnRenamed("_2", "id")
68+
.createTempView("tile_ids_per_partition")
69+
70+
val mask = 0x0000ffff
71+
72+
sparkSession.sql(
73+
"select" +
74+
" int(shiftrightunsigned(partition, 16)) as latPart," +
75+
" int(partition & 4095) as lonPart," +
76+
" max(id) as maxId," +
77+
" min(id) as minId," +
78+
" count(*) as ids" +
79+
" from tile_ids_per_partition group by partition"
80+
)
81+
}
82+
}
83+
84+
}

src/test/scala/com/simplexportal/simplexspatial/analysis/NodeIdDistributionSpec.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,15 @@
1717

1818
package com.simplexportal.simplexspatial.analysis
1919

20-
import com.acervera.osm4scala.model.{NodeEntity, OSMEntity, WayEntity}
20+
import com.acervera.osm4scala.model.{NodeEntity, WayEntity}
2121
import org.apache.spark.sql.Row
2222
import org.scalatest.matchers.should.Matchers
2323

24-
import scala.util.Random
25-
2624
class NodeIdDistributionSpec
2725
extends org.scalatest.wordspec.AnyWordSpecLike
2826
with Matchers
2927
with SparkBaseSQLTesting {
30-
"Calculate distribution" should {
28+
"Calculate mod of id distribution" should {
3129
"correctly" in {
3230

3331
import NodeIdDistribution._
@@ -47,7 +45,7 @@ class NodeIdDistributionSpec
4745
.createDataset(data.flatMap(NodeIdDistribution.extractor(10)(_)))
4846
.distribution
4947

50-
result.sort($"partition".asc).collect().toSet shouldBe (
48+
result.collect().toSet shouldBe (
5149
Set(
5250
Row(0, 30, 10, 3),
5351
Row(1, 21, 11, 2),
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2020 Ángel Cervera Claudio
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.simplexportal.simplexspatial.analysis
19+
20+
import com.acervera.osm4scala.model.{NodeEntity, WayEntity}
21+
import org.apache.spark.sql.Row
22+
import org.scalatest.matchers.should.Matchers
23+
24+
class TileDistributionSpec
25+
extends org.scalatest.wordspec.AnyWordSpecLike
26+
with Matchers
27+
with SparkBaseSQLTesting {
28+
"Calculate Tile distribution" should {
29+
"correctly" in {
30+
31+
import TileDistribution._
32+
import sparkSession.implicits._
33+
34+
val data = Seq(
35+
NodeEntity(10, -10.1, -10.1, Map.empty),
36+
NodeEntity(20, -20.1, -10.1, Map.empty),
37+
NodeEntity(30, -30.1, -10.1, Map.empty),
38+
NodeEntity(11, -11.1, -10.1, Map.empty),
39+
NodeEntity(21, 21.1, 10.1, Map.empty),
40+
NodeEntity(32, 32.1, 10.1, Map.empty),
41+
WayEntity(100, Seq(10, 20, 30), Map.empty)
42+
)
43+
44+
val result = sparkSession
45+
.createDataset(data.flatMap(TileDistribution.extractor(2, 2)(_)))
46+
.distribution
47+
48+
result.collect().toSet shouldBe (
49+
Set(
50+
Row(0, 0, 30, 10, 4),
51+
Row(1, 1, 32, 21, 2)
52+
)
53+
)
54+
}
55+
}
56+
}

0 commit comments

Comments
 (0)