1
+ library(sparklyr )
2
+ library(sparknyc )
3
+ library(dplyr )
4
+
5
+ sc <- spark_connect(master = " local[*]" )
6
+
7
+ # Downloaded from
8
+ # https://raw.githubusercontent.com/harsha2010/magellan/master/examples/datasets/NYC-NEIGHBORHOODS/neighborhoods.geojson
9
+ sparknyc_register(sc , " /home/nicola/Downloads/neighborhoods.geojson" )
10
+
11
+ latlon <- data.frame (
12
+ lon = c(- 74.013127 , - 73.968703 ),
13
+ lat = c(40.700940 , 40.775303 ))
14
+
15
+ latlonSpark <- copy_to(sc , latlon , " latlon" )
16
+
17
+ latlonSpark %> %
18
+ mutate(neighborhood = neighborhood(lat , lon )) %> %
19
+ collect()
20
+
21
+ # #####################################################################################
22
+
23
+ library(sparklyr )
24
+ library(sparkbq )
25
+ library(sparknyc )
26
+ library(dplyr )
27
+
28
+ gcpJsonKeyfile <- " /home/nicola/Downloads/mirai-sbb-da847ce33b19.json"
29
+ Sys.setenv(" GOOGLE_APPLICATION_CREDENTIALS" = gcpJsonKeyfile )
30
+
31
+ config <- spark_config()
32
+ config [[" spark.hadoop.google.cloud.auth.service.account.json.keyfile" ]] = gcpJsonKeyfile
33
+
34
+ sc <- spark_connect(master = " local[*]" , config = config )
35
+
36
+ sparknyc_register(sc , " /home/nicola/Downloads/neighborhoods.geojson" )
37
+
38
+ bigquery_defaults(
39
+ billingProjectId = " mirai-sbb" ,
40
+ gcsBucket = " sbb" ,
41
+ datasetLocation = " US"
42
+ )
43
+
44
+ start.pickup.time = as.POSIXct(" 2014-01-06 13:00:00" , tz = " UTC" )
45
+ end.pickup.time = as.POSIXct(" 2014-01-12 13:30:00" , tz = " UTC" )
46
+
47
+ # trips <-
48
+ # spark_read_bigquery(
49
+ # sc = sc,
50
+ # name = "trips2014",
51
+ # projectId = "bigquery-public-data",
52
+ # datasetId = "new_york",
53
+ # tableId = "tlc_yellow_trips_2014"
54
+ # ) %>%
55
+ # filter(payment_type == "CRD") %>%
56
+ # select(pickup_latitude, pickup_longitude, tip_amount, total_amount) %>% # pickup_datetime,
57
+ # mutate(
58
+ # tip_pct = tip_amount / total_amount,
59
+ # neighborhood = neighborhood(pickup_latitude, pickup_longitude)
60
+ # ) %>%
61
+ # group_by(neighborhood) %>%
62
+ # summarize(avg_tip_pct = mean(tip_pct)) %>%
63
+ # collect()
64
+
65
+ # TODO: use the full 2014 dataset once we are happy with the tests
66
+ average_tips_per_neighborhood <- spark_read_bigquery(
67
+ sc = sc ,
68
+ name = " trips2014" ,
69
+ projectId = " bigquery-public-data" ,
70
+ sqlQuery = paste0(' SELECT * FROM `bigquery-public-data.new_york.tlc_yellow_trips_2014` WHERE pickup_datetime >= "' ,
71
+ format(start.pickup.time ),
72
+ ' " AND pickup_datetime < "' ,
73
+ format(end.pickup.time ),
74
+ ' "' )
75
+ ) %> %
76
+ filter(payment_type == " CRD" ) %> %
77
+ select(pickup_latitude , pickup_longitude , tip_amount , total_amount ) %> % # pickup_datetime,
78
+ mutate(
79
+ tip_pct = tip_amount / total_amount ,
80
+ neighborhood = neighborhood(pickup_latitude , pickup_longitude )
81
+ ) %> %
82
+ group_by(neighborhood ) %> %
83
+ summarize(avg_tip_pct = mean(tip_pct )) %> %
84
+ collect()
85
+
86
+ library(geojsonio )
87
+ library(leaflet )
88
+ geoJsonfile = " /home/nicola/Downloads/neighborhoods.geojson"
89
+ nyc_neighborhoods <- geojsonio :: geojson_read(geoJsonfile ,
90
+ what = " sp" )
91
+
92
+ average_tips_with_shapes <- nyc_neighborhoods
93
+ average_tips_with_shapes @ data <- merge(nyc_neighborhoods @ data , average_tips_per_neighborhood , all.x = T )
94
+
95
+ pal <- colorNumeric(c(" #00FF00" , " #FFFF00" , " #FF0000" ), average_tips_with_shapes $ avg_tip_pct )
96
+
97
+ average_tips_with_shapes %> %
98
+ leaflet() %> %
99
+ addProviderTiles(providers $ OpenStreetMap.BlackAndWhite ,
100
+ options = providerTileOptions(noWrap = TRUE )) %> %
101
+ addPolygons(weight = 1 , opacity = 0.7 ,
102
+ smoothFactor = 0.3 , fillOpacity = 0.7 ,
103
+ fillColor = ~ pal(avg_tip_pct ),
104
+ label = ~ paste0(neighborhood , " - " , round(avg_tip_pct * 100 , 2 ), " %" ),
105
+ highlightOptions = highlightOptions(color = " yellow" , weight = 2 ,
106
+ bringToFront = TRUE )) %> %
107
+ addLegend(pal = pal ,
108
+ values = ~ avg_tip_pct ,
109
+ opacity = 1 ,
110
+ title = " Tips %" ,
111
+ labFormat = labelFormat(
112
+ suffix = ' %' ,
113
+ transform = function (x ) 100 * x ))
114
+
115
+ # only 2 observations, of which one with a very generous person
116
+ boerum_hill_check <- spark_read_bigquery(
117
+ sc = sc ,
118
+ name = " trips2014" ,
119
+ projectId = " bigquery-public-data" ,
120
+ sqlQuery = paste0(' SELECT * FROM `bigquery-public-data.new_york.tlc_yellow_trips_2014` WHERE pickup_datetime >= "' ,
121
+ format(start.pickup.time ),
122
+ ' " AND pickup_datetime < "' ,
123
+ format(end.pickup.time ),
124
+ ' "' )
125
+ ) %> %
126
+ filter(payment_type == " CRD" ) %> %
127
+ select(pickup_latitude , pickup_longitude , tip_amount , total_amount ) %> % # pickup_datetime,
128
+ mutate(
129
+ tip_pct = tip_amount / total_amount ,
130
+ neighborhood = neighborhood(pickup_latitude , pickup_longitude )
131
+ ) %> %
132
+ filter(neighborhood == " Boerum Hill" ) %> %
133
+ collect()
0 commit comments