Skip to content

Commit d57d17b

Browse files
committed
2.3 AdvancedAccumulators
1 parent 2813023 commit d57d17b

16 files changed

+431
-159
lines changed

project/Build.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ object Build extends Build {
4646
// raise memory limits here if necessary
4747
javaOptions += "-Xmx4G",
4848

49-
mainClass in (Compile, run) := Some("com.packt.spark.section5.FeatureExtraction"),
49+
// mainClass in (Compile, run) := Some("com.packt.spark.section5.FeatureExtraction"),
50+
mainClass in (Compile, run) := Some("com.packt.spark.section2.AdvancedAccumulators"),
5051

5152
fork := true,
5253
connectInput in run := true,

src/main/scala/com/packt/spark/section2/Accumulators.complete

Lines changed: 31 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,42 +6,36 @@ import org.apache.spark._
66
object Accumulators extends ExampleApp {
77
def run() =
88
withSparkContext { implicit sc =>
9-
val infoAcc = sc.accumulable(DatasetInfo())
10-
11-
val violationEntries =
12-
sampleDataset
13-
.flatMap { line =>
14-
val parsed = Violation.fromRow(line)
15-
infoAcc += parsed
16-
parsed
17-
}
18-
19-
// OR
20-
21-
val violationEntries =
22-
sampleDataset
23-
.map(Violation.fromRow _)
24-
.map(infoAcc += _)
25-
.flatten
26-
27-
// More computations would happen here...
28-
violationEntries.foreach { x => }
29-
30-
val DatasetInfo(
31-
dateRange,
32-
validCount,
33-
invalidCount,
34-
bigTicketItems,
35-
totalFines
36-
) = infoAcc.value
37-
38-
println(s"Valid count: $validCount")
39-
println(s"Invalid count: $invalidCount")
40-
println(s"Date range: ${dateRange.start} to ${dateRange.end}")
41-
println("Big ticket items:")
42-
for( (desc, fine) <- bigTicketItems) {
43-
println(s" $fine $desc")
44-
}
45-
9+
val validAcc = sc.accumulator(0)
10+
val invalidAcc = sc.accumulator(0)
11+
val totalFineAcc = sc.accumulator(0.0)
12+
13+
val violations =
14+
fullDataset
15+
.mapPartitions { partition =>
16+
val parse = Violation.rowParser
17+
partition.flatMap { line =>
18+
parse(line) match {
19+
case v @ Some(violation) if violation.ticket.fine > 5.0 =>
20+
validAcc += 1
21+
totalFineAcc += violation.ticket.fine
22+
v
23+
case _ =>
24+
invalidAcc += 1
25+
None
26+
}
27+
}
28+
}
29+
30+
violations.foreach { x => }
31+
32+
val validCount = validAcc.value
33+
val invalidCount = invalidAcc.value
34+
val totalFines = totalFineAcc.value
35+
36+
println(s"Valid count is $validCount")
37+
println(s"Invalid count is $invalidCount")
38+
println(f"Total fines: $$${totalFines}%,1.2f")
39+
waitForUser()
4640
}
4741
}
Lines changed: 22 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,41 @@
11
package com.packt.spark.section2
22

33
import com.packt.spark._
4-
54
import org.apache.spark._
6-
import com.github.nscala_time.time.Imports._
75

86
object Accumulators extends ExampleApp {
9-
implicit object MaxFineAccumulatorParam extends AccumulatorParam[Ticket] {
10-
def zero(ticket: Ticket) = Ticket(0.0, "None")
11-
def addInPlace(ticket1: Ticket, ticket2: Ticket): Ticket =
12-
if(ticket1.fine > ticket2.fine) ticket1
13-
else ticket2
14-
}
157
def run() =
168
withSparkContext { implicit sc =>
179
val validAcc = sc.accumulator(0)
1810
val invalidAcc = sc.accumulator(0)
19-
val sumAcc = sc.accumulator(0.0)
20-
val maxFineAcc = sc.accumulator(Ticket(0.0, "None"))
21-
val dateRangeAcc = sc.accumulator(DataDateRange.empty)
22-
23-
val violationEntries =
24-
sampleDataset
25-
.flatMap { line =>
26-
Violation.fromRow(line) match {
27-
case e @ Some(entry) =>
28-
validAcc += 1
29-
sumAcc += entry.ticket.fine
30-
maxFineAcc += entry.ticket
31-
dateRangeAcc += entry.issueTime
32-
e
33-
case None =>
34-
invalidAcc += 1
35-
None
11+
val totalFineAcc = sc.accumulator(0.0)
12+
13+
val violations =
14+
fullDataset
15+
.mapPartitions { partition =>
16+
val parse = Violation.rowParser
17+
partition.flatMap { line =>
18+
parse(line) match {
19+
case v @ Some(violation) if violation.ticket.fine > 5.0 =>
20+
validAcc += 1
21+
totalFineAcc += violation.ticket.fine
22+
v
23+
case _ =>
24+
invalidAcc += 1
25+
None
26+
}
3627
}
3728
}
3829

39-
violationEntries.foreach { x => }
30+
violations.foreach { x => }
4031

4132
val validCount = validAcc.value
4233
val invalidCount = invalidAcc.value
43-
val totalFines = sumAcc.value
44-
val maxFineTicket = maxFineAcc.value
34+
val totalFines = totalFineAcc.value
4535

46-
println(s"Valid count: ${validAcc.value}")
47-
println(s"Invalid count: ${invalidAcc.value}")
48-
println(f"Total fines: $$${sumAcc.value.toLong}%,d")
49-
println(s"Max fine ticket: $maxFineTicket")
50-
println(s"Date range: ${dateRangeAcc.value.start} to ${dateRangeAcc.value.end}")
36+
println(s"Valid count is $validCount")
37+
println(s"Invalid count is $invalidCount")
38+
println(f"Total fines: $$${totalFines}%,1.2f")
39+
waitForUser()
5140
}
5241
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.packt.spark.section2
2+
3+
import com.packt.spark._
4+
5+
import org.apache.spark._
6+
import com.github.nscala_time.time.Imports._
7+
8+
object AdvancedAccumulators extends ExampleApp {
9+
implicit object MaxFineAccumulatorParam extends AccumulatorParam[Ticket] {
10+
def zero(ticket: Ticket) = Ticket(0.0, "None")
11+
def addInPlace(ticket1: Ticket, ticket2: Ticket): Ticket =
12+
if(ticket1.fine > ticket2.fine) ticket1
13+
else ticket2
14+
}
15+
def run() =
16+
withSparkContext { implicit sc =>
17+
val invalidAcc = sc.accumulator(0)
18+
val infoAcc = sc.accumulable(DatasetInfo())
19+
20+
val violationEntries =
21+
fullDataset
22+
.mapPartitions { partition =>
23+
val parse = Violation.rowParser
24+
partition.flatMap { line =>
25+
parse(line) match {
26+
case v @ Some(violation) if violation.ticket.fine > 5.0 =>
27+
infoAcc += violation
28+
v
29+
case _ =>
30+
invalidAcc += 1
31+
None
32+
}
33+
}
34+
}
35+
36+
violationEntries.foreach { x => }
37+
38+
val DatasetInfo(
39+
dateRange,
40+
validCount,
41+
bigTicketItems,
42+
totalFines
43+
) = infoAcc.value
44+
45+
val invalidCount = invalidAcc.value
46+
47+
println(s"Valid count is $validCount")
48+
println(s"Invalid count is $invalidCount")
49+
println(f"Total fines: $$${totalFines}%,1.2f")
50+
println(s"Date range: ${dateRange.start} to ${dateRange.end}")
51+
println("Big ticket items:")
52+
for( (desc, fine) <- bigTicketItems) {
53+
println(s" $fine $desc")
54+
}
55+
waitForUser()
56+
}
57+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.packt.spark.section2
2+
3+
import com.packt.spark._
4+
5+
import org.apache.spark._
6+
import com.github.nscala_time.time.Imports._
7+
8+
object AdvancedAccumulators extends ExampleApp {
9+
def run() =
10+
withSparkContext { implicit sc =>
11+
val invalidAcc = sc.accumulator(0)
12+
val infoAcc = sc.accumulable(DatasetInfo())
13+
14+
val violationEntries =
15+
fullDataset
16+
.mapPartitions { partition =>
17+
val parse = Violation.rowParser
18+
partition.flatMap { line =>
19+
parse(line) match {
20+
case v @ Some(violation) if violation.ticket.fine > 5.0 =>
21+
infoAcc += violation
22+
v
23+
case _ =>
24+
invalidAcc += 1
25+
None
26+
}
27+
}
28+
}
29+
30+
violationEntries.foreach { x => }
31+
32+
val DatasetInfo(
33+
dateRange,
34+
validCount,
35+
bigTicketItems,
36+
totalFines
37+
) = infoAcc.value
38+
39+
val invalidCount = invalidAcc.value
40+
41+
println(s"Valid count is $validCount")
42+
println(s"Invalid count is $invalidCount")
43+
println(f"Total fines: $$${totalFines}%,1.2f")
44+
println(s"Date range: ${dateRange.start} to ${dateRange.end}")
45+
println("Big ticket items:")
46+
for( (desc, fine) <- bigTicketItems) {
47+
println(s" $fine $desc")
48+
}
49+
waitForUser()
50+
}
51+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.packt.spark.section2
2+
3+
import com.packt.spark._
4+
import org.apache.spark._
5+
6+
object Accumulators extends ExampleApp {
7+
def run() =
8+
withSparkContext { implicit sc =>
9+
val validAcc = sc.accumulator(0)
10+
val invalidAcc = sc.accumulator(0)
11+
val totalFineAcc = sc.accumulator(0.0)
12+
13+
val violations =
14+
fullDataset
15+
.mapPartitions { partition =>
16+
val parse = Violation.rowParser
17+
partition.flatMap { line =>
18+
parse(line) match {
19+
case v @ Some(violation) if violation.ticket.fine > 5.0 =>
20+
validAcc += 1
21+
totalFineAcc += violation.ticket.fine
22+
v
23+
case _ =>
24+
invalidAcc += 1
25+
None
26+
}
27+
}
28+
}
29+
30+
violations.foreach { x => }
31+
32+
val validCount = validAcc.value
33+
val invalidCount = invalidAcc.value
34+
val totalFines = totalFineAcc.value
35+
36+
println(s"Valid count is $validCount")
37+
println(s"Invalid count is $invalidCount")
38+
println(f"Total fines: $$${totalFines}%,1.2f")
39+
waitForUser()
40+
}
41+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.packt.spark.section2
2+
3+
import com.packt.spark._
4+
import org.apache.spark._
5+
6+
object AdvancedAccumulators extends ExampleApp {
7+
implicit object MaxFineAccumulatorParam extends AccumulatorParam[Ticket] {
8+
def zero(ticket: Ticket) = Ticket(0.0, "None")
9+
def addInPlace(ticket1: Ticket, ticket2: Ticket): Ticket =
10+
if(ticket1.fine > ticket2.fine) ticket1
11+
else ticket2
12+
}
13+
14+
def run() =
15+
withSparkContext { implicit sc =>
16+
val validAcc = sc.accumulator(0)
17+
val invalidAcc = sc.accumulator(0)
18+
val totalFineAcc = sc.accumulator(0.0)
19+
val maxFineAcc = sc.accumulator(Ticket(0.0, "None"))
20+
21+
val violations =
22+
fullDataset
23+
.mapPartitions { partition =>
24+
val parse = Violation.rowParser
25+
partition.flatMap { line =>
26+
parse(line) match {
27+
case v @ Some(violation) if violation.ticket.fine > 5.0 =>
28+
validAcc += 1
29+
totalFineAcc += violation.ticket.fine
30+
maxFineAcc += violation.ticket
31+
v
32+
case _ =>
33+
invalidAcc += 1
34+
None
35+
}
36+
}
37+
}
38+
39+
violations.foreach { x => }
40+
41+
val validCount = validAcc.value
42+
val invalidCount = invalidAcc.value
43+
val totalFines = totalFineAcc.value
44+
val maxFineTicket = maxFineAcc.value
45+
46+
println(s"Valid count is $validCount")
47+
println(s"Invalid count is $invalidCount")
48+
println(f"Total fines: $$${totalFines}%,1.2f")
49+
println(s"Max fine ticket: $maxFineTicket")
50+
waitForUser()
51+
}
52+
}

0 commit comments

Comments
 (0)