|
1 |
| -# 老司机带你用 Go 语言实现 MapReduce 框架 |
| 1 | +# 老司机带你飞系列 |
2 | 2 |
|
3 |
| -  MapReduce 是 Google 提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。简而言之,就是将任务切分成很小的任务然后一个一个区的执行最后汇总,这就像小时候我们老师经常教育我们一样,大事化小,小事化了(瞬间感觉那时候老师好言简意赅啊!!!)思想就这么一个思想,那么按照这个思想在现代软件定义一切的世界里面,我们怎么运用这样的方式来解决海量数据的处理,这篇就告诉你一个这样的一个简单的实现使用 Go 语言。 |
4 |
| - |
5 |
| -## 上车 |
6 |
| - |
7 |
| -  简单介绍一下几个概念: |
8 |
| - |
9 |
| -  概念“Map(映射)”和“Reduce(归纳)”,及他们的主要思想,都是从函数式编程语言借来的,还有从矢量编程语言借来的特性。当前的软件实现是指定一个 Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的 Reduce(归纳)函数,用来保证所有映射的键值对中的每一个共享相同的键组。 |
10 |
| - |
11 |
| -  以一个例子为简单的开始: |
12 |
| - |
13 |
| -  词频的统计(WorldCount),在现实的需求的上面可能我们可能有这样的一个需求,就是计算出一篇文章里面出现每个单词的个数。具体到生活就是,就算 Top N 的结果,比如全校要开表彰大会,找出 10 个好学生这样的 Top N 这样的例子比比皆是,而 World Count 就是他的一个实现,只是最终的结果只取出排在前面的结果而已。 |
14 |
| - |
15 |
| -  有了上面找出 10 个好学生的需求的时候,我们来想想怎么去实现它呢,很显然这个需求可能是校长在开会的时候提出来的,那么具体的实现就是每个年级组长是不是要把每个年级排名前 10 的学生找出来,然后年级组长的领导,将这些信息在汇总取出 前 10 的学生咯,那么具体的每个年级怎么做呢?同理,将每个班的前10名学生找出来,然后汇总到年级部门咯。 |
16 |
| - |
17 |
| -## 发车 |
18 |
| - |
19 |
| -  基本概览和思路已经明白了,现在开始构建整个 MapReduce 框架了,首先我们明确一个思想就是,将任务划分成合适的大小,然后对其进行计算,然后将每一步计算的的结果,进行一个汇总合并的过程。那么这两个过程我们先分别定义为Map 和Reduce 过程。 |
20 |
| - |
21 |
| -  还是以 World Count 这个为例子: |
22 |
| - |
23 |
| -  Map 的处理过程就是读取给定的文件,将文件里面的每个单词的出现频率初始化为 1。 |
24 |
| - |
25 |
| -  Reduce 的处理过程就是将相同的单词,数据进行一个累加的过程。那么,我们 MapReduce 框架的目的是调用在合适的时候调用这个 Map 和 Reduce 的过程。 |
26 |
| -在 common_map.go 里面 doMap 方法就是给定文件,读取数据然后,调用 Map 这个过程,代码里面有注释,在这里进行一个简单概述一下主要有这几个步骤: |
27 |
| - |
28 |
| -1. 读取文件; |
29 |
| -2. 将读文件的内容,调用用户 Map 函数,生产对于的 KeyValue 值; |
30 |
| -3. 最后按照 KeyValue 里面的 Key 进行分区,将内容写入到文件里面,以便于后面的 Reduce 过程执行; |
31 |
| - |
32 |
| -``` go |
33 |
| -func doMap( |
34 |
| - jobName string, // // the name of the MapReduce job |
35 |
| - mapTaskNumber int, // which map task this is |
36 |
| - inFile string, |
37 |
| - nReduce int, // the number of reduce task that will be run |
38 |
| - mapF func(file string, contents string) []KeyValue, |
39 |
| -) { |
40 |
| - |
41 |
| - //setp 1 read file |
42 |
| - contents, err := ioutil.ReadFile(inFile) |
43 |
| - if err != nil { |
44 |
| - log.Fatal("do map error for inFile ",err) |
45 |
| - } |
46 |
| - //setp 2 call user user-map method ,to get kv |
47 |
| - kvResult := mapF(inFile, string(contents)) |
48 |
| - |
49 |
| - /** |
50 |
| - * setp 3 use key of kv generator nReduce file ,partition |
51 |
| - * a. create tmpFiles |
52 |
| - * b. create encoder for tmpFile to write contents |
53 |
| - * c. partition by key, then write tmpFile |
54 |
| - */ |
55 |
| - |
56 |
| - var tmpFiles [] *os.File = make([] *os.File, nReduce) |
57 |
| - var encoders [] *json.Encoder = make([] *json.Encoder, nReduce) |
58 |
| - |
59 |
| - for i := 0; i < nReduce; i++ { |
60 |
| - tmpFileName := reduceName(jobName,mapTaskNumber,i) |
61 |
| - tmpFiles[i],err = os.Create(tmpFileName) |
62 |
| - if err!=nil { |
63 |
| - log.Fatal(err) |
64 |
| - } |
65 |
| - |
66 |
| - defer tmpFiles[i].Close() |
67 |
| - encoders[i] = json.NewEncoder(tmpFiles[i]) |
68 |
| - if err!=nil { |
69 |
| - log.Fatal(err) |
70 |
| - } |
71 |
| - } |
72 |
| - |
73 |
| - for _ , kv := range kvResult { |
74 |
| - hashKey := int(ihash(kv.Key)) % nReduce |
75 |
| - err := encoders[hashKey].Encode(&kv) |
76 |
| - if err!=nil { |
77 |
| - log.Fatal("do map encoders ",err) |
78 |
| - } |
79 |
| - } |
80 |
| -} |
81 |
| -``` |
82 |
| - |
83 |
| -  doReduce 函数在 common_reduce.go 里面,主要步骤: |
84 |
| - |
85 |
| -1. 读取 doMap 过程中产生的中间文件; |
86 |
| -2. 按照读取相同文件中的 Key 进新按照字典顺序进行排序; |
87 |
| -3. 遍历读取的 KeyValue,并且调用用户的 Reduce 方法,将计算的结果继续写入到文件中; |
88 |
| - |
89 |
| -``` go |
90 |
| -func doReduce( |
91 |
| - jobName string, // the name of the whole MapReduce job |
92 |
| - reduceTaskNumber int, // which reduce task this is |
93 |
| - nMap int, // the number of map tasks that were run ("M" in the paper) |
94 |
| - reduceF func(key string, values []string) string, |
95 |
| -) { |
96 |
| - |
97 |
| - // file.Close() |
98 |
| - |
99 |
| - //setp 1,read map generator file ,same key merge put map[string][]string |
100 |
| - |
101 |
| - kvs := make(map[string][]string) |
102 |
| - |
103 |
| - for i := 0; i < nMap; i++ { |
104 |
| - fileName := reduceName(jobName, i, reduceTaskNumber) |
105 |
| - file, err := os.Open(fileName) |
106 |
| - if err != nil { |
107 |
| - log.Fatal("doReduce1: ", err) |
108 |
| - } |
109 |
| - |
110 |
| - dec := json.NewDecoder(file) |
111 |
| - |
112 |
| - for { |
113 |
| - var kv KeyValue |
114 |
| - err = dec.Decode(&kv) |
115 |
| - if err != nil { |
116 |
| - break |
117 |
| - } |
118 |
| - |
119 |
| - _, ok := kvs[kv.Key] |
120 |
| - if !ok { |
121 |
| - kvs[kv.Key] = []string{} |
122 |
| - } |
123 |
| - kvs[kv.Key] = append(kvs[kv.Key], kv.Value) |
124 |
| - } |
125 |
| - file.Close() |
126 |
| - } |
127 |
| - |
128 |
| - var keys []string |
129 |
| - |
130 |
| - for k := range kvs { |
131 |
| - keys = append(keys, k) |
132 |
| - } |
133 |
| - |
134 |
| - //setp 2 sort by keys |
135 |
| - sort.Strings(keys) |
136 |
| - |
137 |
| - //setp 3 create result file |
138 |
| - p := mergeName(jobName, reduceTaskNumber) |
139 |
| - file, err := os.Create(p) |
140 |
| - if err != nil { |
141 |
| - log.Fatal("doReduce2: ceate ", err) |
142 |
| - } |
143 |
| - enc := json.NewEncoder(file) |
144 |
| - |
145 |
| - //setp 4 call user reduce each key of kvs |
146 |
| - for _, k := range keys { |
147 |
| - res := reduceF(k, kvs[k]) |
148 |
| - enc.Encode(KeyValue{k, res}) |
149 |
| - } |
150 |
| - |
151 |
| - file.Close() |
152 |
| -} |
153 |
| -``` |
154 |
| - |
155 |
| -  Merge 过程 |
156 |
| - |
157 |
| -  当然最后就是将每个 Reduce 产生的结果进行一个Merge 的过程,在 merge 的过程中,同样也是需要进行按照 Key 进行字典顺序排列,然后写入到最终的文件中。代码跟 reduce 还是相似的,这里就不自爱赘述了。 |
158 |
| - |
159 |
| -  使用 go 的多线程来实现分布式的任务执行,这里主要是是 schedule.go 里面的 schedule 方法,主要是步骤: |
160 |
| - |
161 |
| -1. 通过不同的阶段( Map or Reduce ),获取到需要执行多少个 map (reduce),然后调用远程的 worker.go 里面的 DoTask 方法; |
162 |
| -2. 等待所有的任务完成,然后才结束。这里主要使用了go 语言的一些特性,[Go RPC documentation](https://golang.org/pkg/net/rpc/) 和 [Concurrency in Go](https://golang.org/doc/effective_go.html#concurrency)。 |
163 |
| - |
164 |
| -``` go |
165 |
| -func (mr *Master) schedule(phase jobPhase) { |
166 |
| - var ntasks int |
167 |
| - var nios int // number of inputs (for reduce) or outputs (for map) |
168 |
| - switch phase { |
169 |
| - case mapPhase: |
170 |
| - ntasks = len(mr.files) |
171 |
| - nios = mr.nReduce |
172 |
| - case reducePhase: |
173 |
| - ntasks = mr.nReduce |
174 |
| - nios = len(mr.files) |
175 |
| - } |
176 |
| - |
177 |
| - fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios) |
178 |
| - |
179 |
| - //use go routing,worker rpc executor task, |
180 |
| - done := make(chan bool) |
181 |
| - for i := 0; i < ntasks; i++ { |
182 |
| - go func(number int) { |
183 |
| - |
184 |
| - args := DoTaskArgs{mr.jobName, mr.files[ntasks], phase, number, nios} |
185 |
| - var worker string |
186 |
| - reply := new(struct{}) |
187 |
| - ok := false |
188 |
| - for ok != true { |
189 |
| - worker = <- mr.registerChannel |
190 |
| - ok = call(worker, "Worker.DoTask", args, reply) |
191 |
| - } |
192 |
| - done <- true |
193 |
| - mr.registerChannel <- worker |
194 |
| - }(i) |
195 |
| - |
196 |
| - } |
197 |
| - |
198 |
| - //wait for all task is complate |
199 |
| - for i := 0; i< ntasks; i++ { |
200 |
| - <- done |
201 |
| - } |
202 |
| - fmt.Printf("Schedule: %v phase done\n", phase) |
203 |
| -} |
204 |
| -``` |
205 |
| - |
206 |
| -## 到站 |
207 |
| - |
208 |
| -- 运行测试: |
209 |
| - |
210 |
| - |
211 |
| - |
212 |
| -- 测试结果: |
213 |
| - |
214 |
| - |
215 |
| - |
216 |
| -- 测试倒排结果: |
217 |
| - |
218 |
| - |
| 3 | +1. [《老司机带你用 Go 语言实现 MapReduce 框架》](src/mapreduce) |
| 4 | +2. [《老司机带你用 Go 语言实现 Raft 分布式一致性协议》](src/raft) |
0 commit comments