Skip to content

Commit 04a0ad0

Browse files
authored
Add custom accumulator script
1 parent 8d80cd8 commit 04a0ad0

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

accumulator_word_count.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Sample spark script to use customized accumulators
2+
# Author: Mateusz Dolinski
3+
4+
from pyspark import SparkConf, SparkContext
5+
from pyspark.accumulators import AccumulatorParam
6+
7+
8+
class WordAccumulator(AccumulatorParam):
9+
"""
10+
Accumulator for saving words.
11+
12+
Accumulator that stores words appearing in text.
13+
in form of dictionary {word: #_of_occurences_in_text}
14+
"""
15+
16+
def zero(self, value):
17+
"""
18+
Method for setting counter to zero.
19+
:param: value: currently unused
20+
:return: empty dictionary
21+
"""
22+
return {}
23+
24+
def addInPlace(self, v1, v2):
25+
"""
26+
Function for accumulator incrementation.
27+
It merges two dictionaries.
28+
:param v1:
29+
:param v2:
30+
:return: incremented accumulator
31+
"""
32+
if len(v1.keys()) == 0:
33+
return v2
34+
for key in v2.keys():
35+
if key in v1.keys():
36+
v1[key] += v2[key]
37+
else:
38+
v1[key] = v2[key]
39+
return v1
40+
41+
42+
conf = SparkConf() \
43+
.setAppName('Wordcount') \
44+
.setMaster('local[*]')
45+
46+
sc = SparkContext(conf = conf)
47+
48+
wordcount = sc.accumulator({}, WordAccumulator())
49+
50+
sample_file = sc.textFile('/path/to/sample/textfile/on/hdfs') \
51+
.flatMap(lambda line: line.split(' ')) \
52+
.filter(lambda word: word != '') \
53+
.map(lambda word: {word: 1}) \
54+
.foreach(lambda pair: wordcount.add(pair))
55+
56+
print(wordcount.value)

0 commit comments

Comments
 (0)