Skip to content

Commit 0eef63a

Browse files
author
Will McGinnis
committed
added two more examples and a runner script.
1 parent 3587eb9 commit 0eef63a

File tree

18 files changed

+1863
-3
lines changed

18 files changed

+1863
-3
lines changed

.idea/.name

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/flink-python-examples.iml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/misc.xml

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/vcs.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/workspace.xml

Lines changed: 517 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,145 @@
11
Python Flink Examples
22
=====================
33

4-
A collection of examples using Apache Flink's new python API.
4+
A collection of examples using Apache Flink's new python API. To set up your local environment with
5+
the latest Flink build, see the guide (HERE)[http://willmcginnis.com/2015/11/08/getting-started-with-python-and-apache-flink/].
6+
7+
The examples here use the v1.0 python API (they won't work with the current stable release pre-1.0), and
8+
are meant to serve as demonstrations of simple use cases. Currently the python API supports a portion of the DataSet
9+
API, which has a similar functionality to Spark, from the user's perspective.
10+
11+
To run the examples, I've included a runner script at the top level with methods for each example, simply
12+
add in the path to your pyflink script and you should be good to go (as long as you have a flask cluster running locally).
13+
14+
The currently included examples are:
15+
16+
Examples
17+
========
18+
19+
A listing of the examples included here.
20+
21+
Word Count
22+
----------
23+
24+
An extremely simple analysis program uses a source from a simple string, counts the occurrences of each word
25+
and outputs to a file on disk (using the overwrite functionality).
26+
27+
28+
Trending Hashtags
29+
-----------------
30+
31+
A very similar example to word count, but includes a filter step to only include hashtags, and different source/sinks.
32+
The input data in this case is read off of disk, and the output is written as a csv. The file is generated dynamically
33+
at run time, so you can play with different volumes of tweets to get an idea of Flink's scalability and performance.
34+
35+
36+
Data Enrichment
37+
---------------
38+
39+
In this example, we have row-wise json in one file, with an attribute field that refers to a csv dimension table with
40+
colors. So we load both datasets in, convert the json data into a ordered and typed tuple, and join then two together
41+
to get a nice dataset of cars and their colors.
42+
43+
44+
Features
45+
========
46+
47+
A quick listing of high level features, and the examples that include them
48+
49+
Text data-source (read\_text)
50+
----------------------------
51+
52+
* trending hashtags
53+
* data enrichment
54+
55+
CSV data-srouce (read\_csv)
56+
---------------------------
57+
58+
* data enrichment
59+
60+
String data-source (from\_elements)
61+
-----------------------------------
62+
63+
* word count
64+
65+
Text output (write\_text)
66+
-------------------------
67+
68+
* word count
69+
* data enrichment
70+
71+
CSV output (write\_csv)
72+
-----------------------
73+
74+
* trending hashtags
75+
76+
Log to stdout output (output)
77+
-----------------------------
78+
79+
80+
Transformations: Map
81+
--------------------
82+
83+
* word count
84+
* trending hashtags
85+
* data enrichment
86+
87+
Transformations: FlatMap
88+
------------------------
89+
90+
* word count
91+
* trending hashtags
92+
93+
Transformations: MapPartition
94+
-----------------------------
95+
96+
97+
Transformations: Filter
98+
-----------------------
99+
100+
* trending hashtags
101+
102+
Transformations: Reduce
103+
-----------------------
104+
105+
106+
Transformations: ReduceGroup
107+
----------------------------
108+
109+
* word count
110+
* trending hashtags
111+
112+
Transformations: Join
113+
--------------------
114+
115+
* data enrichment
116+
117+
Transformations: CoGroup
118+
------------------------
119+
120+
121+
Transformations: Cross
122+
----------------------
123+
124+
125+
Transformations: Union
126+
----------------------
127+
128+
129+
Gotchas We've Found
130+
===================
131+
132+
As we go through the process of making these examples in an extremely young library, we run across quirks, that we will
133+
mention here, and if appropriate report as bugs (we will take these down once they are fixed if they are bugs).
134+
135+
Using os.path to set file paths dynamically
136+
-------------------------------------------
137+
138+
There is a tendency to want to write code without hard-coded paths. So we may include the path to the output file
139+
in the word count example as:
140+
141+
import os
142+
output_path = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'output_file.txt'
143+
144+
But this doesn't seem to work, because some part of how pyflink is executing the python code moves it, so the abspath
145+
term evaluates to some temp directory.

data_enrichment/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__author__ = 'willmcginnis'

data_enrichment/data_enrichment.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import os
2+
import json
3+
import sys
4+
from flink.plan.Environment import get_environment
5+
from flink.plan.Constants import INT, STRING, WriteMode
6+
from flink.functions.GroupReduceFunction import GroupReduceFunction
7+
8+
__author__ = 'willmcginnis'
9+
10+
11+
class Adder(GroupReduceFunction):
12+
def reduce(self, iterator, collector):
13+
count, word = iterator.next()
14+
count += sum([x[0] for x in iterator])
15+
collector.collect((count, word))
16+
17+
18+
def json_to_tuple(js, fields):
19+
return tuple([str(js.get(f, '')) for f in fields])
20+
21+
if __name__ == "__main__":
22+
# get the base path out of the runtime params
23+
base_path = sys.argv[1]
24+
25+
# setup paths to input and output files on disk
26+
dim_file = 'file://' + base_path + '/data_enrichment/dimensional_data.csv'
27+
input_file = 'file://' + base_path + '/data_enrichment/input_data.csv'
28+
output_file = 'file://' + base_path + '/data_enrichment/out.txt'
29+
30+
# remove the output file, if there is one there already
31+
if os.path.isfile(output_file):
32+
os.remove(output_file)
33+
34+
# set up the environment with a text file source
35+
env = get_environment()
36+
input_data = env.read_text(input_file)
37+
dimensional_data = env.read_csv(dim_file, types=[STRING, STRING])
38+
39+
input_data \
40+
.map(lambda x: json_to_tuple(json.loads(x), ['car', 'attr']), (STRING, STRING)) \
41+
.join(dimensional_data).where(1).equal_to(0) \
42+
.map(lambda x: 'This %s is %s' % (x[0][0], x[1][1]), STRING) \
43+
.write_text(output_file, write_mode=WriteMode.OVERWRITE)
44+
45+
env.execute(local=True)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
1,red
2+
2,green
3+
3,blue

0 commit comments

Comments
 (0)