-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsort.go
75 lines (63 loc) · 1.51 KB
/
sort.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package pipeline
import (
"io"
"sort"
)
type Sorter struct {
w *io.PipeWriter
r *io.PipeReader
DeserializerFunc DeserializerFunc
}
// Emitters is a slice of Emitter interfaces that implements the sort.Interface
type Reducers []Reducer
func (e Reducers) Len() int { return len(e) }
func (e Reducers) Swap(i, j int) { e[i], e[j] = e[j], e[i] }
func (e Reducers) Less(i, j int) bool { return e[i].Key() < e[j].Key() }
func NewSorter(d DeserializerFunc) *Sorter {
pr, pw := io.Pipe()
return &Sorter{
w: pw,
r: pr,
DeserializerFunc: d,
}
}
func (m *Sorter) Sort(w io.Writer) {
// Return a channel from the formatter
ch := deserializeInput(m.DeserializerFunc, m)
s := make(Reducers, 0)
// Iterate over the channel and call fn
for r := range ch {
// Continue if Where method returns false
if !r.Where() {
continue
}
reducer, ok := r.(Reducer)
if ok {
s = append(s, reducer)
}
}
sort.Sort(s)
for _, o := range s {
o.Emit(w)
}
}
func (m *Sorter) In(r io.Reader) Pipeline {
go func(w *io.PipeWriter, r io.Reader) {
io.Copy(m.w, r)
m.w.Close()
}(m.w, r)
return m
}
func (m *Sorter) Then(p Pipeline) Pipeline {
go func(p Pipeline) {
m.Sort(p)
p.Close()
}(p)
return p
}
func (m *Sorter) Out(w io.Writer) {
m.Sort(w)
}
func (m *Sorter) Close() error { return m.w.Close() }
func (m *Sorter) Write(p []byte) (int, error) { return m.w.Write(p) }
func (m *Sorter) Read(p []byte) (int, error) { return m.r.Read(p) }