This repository has been archived by the owner on Jun 19, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mapping_thread.go
118 lines (90 loc) · 2.98 KB
/
mapping_thread.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package meduce
import (
"fmt"
"github.com/djordje200179/extendedlibrary/misc/functions/comparison"
"sort"
"strings"
"sync"
)
type mappingThread[KeyIn, ValueIn, KeyOut, ValueOut any] struct {
*Process[KeyIn, ValueIn, KeyOut, ValueOut]
keys []KeyOut
values []ValueOut
mappingsCount int
emitsCount int
combinationsCount int
}
func (thread *mappingThread[KeyIn, ValueIn, KeyOut, ValueOut]) run(finishSignal *sync.WaitGroup) {
for pair := range thread.Source {
thread.Mapper(pair.First, pair.Second, thread.append)
thread.mappingsCount++
}
thread.emitsCount = thread.Len()
sort.Sort(thread)
thread.combine()
thread.combinationsCount = thread.Len()
if thread.Logger != nil {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("Process %d: mapping thread finished\n", thread.uid))
sb.WriteString(fmt.Sprintf("\t%d mappings finished\n", thread.mappingsCount))
sb.WriteString(fmt.Sprintf("\t%d emmited key-value pairs\n", thread.emitsCount))
sb.WriteString(fmt.Sprintf("\t%d unique keys\n", thread.combinationsCount))
thread.Logger.Print(sb.String())
}
finishSignal.Done()
}
func (thread *mappingThread[KeyIn, ValueIn, KeyOut, ValueOut]) append(key KeyOut, value ValueOut) {
thread.keys = append(thread.keys, key)
thread.values = append(thread.values, value)
}
func (thread *mappingThread[KeyIn, ValueIn, KeyOut, ValueOut]) Len() int {
return len(thread.keys)
}
func (thread *mappingThread[KeyIn, ValueIn, KeyOut, ValueOut]) Less(i, j int) bool {
keyComparisonResult := thread.KeyComparator(thread.keys[i], thread.keys[j])
if keyComparisonResult == comparison.FirstSmaller {
return true
} else if keyComparisonResult == comparison.Equal {
if thread.ValueComparator == nil {
return false
}
return thread.ValueComparator(thread.values[i], thread.values[j]) == comparison.FirstSmaller
} else {
return false
}
}
func (thread *mappingThread[KeyIn, ValueIn, KeyOut, ValueOut]) Swap(i, j int) {
thread.keys[i], thread.keys[j] = thread.keys[j], thread.keys[i]
thread.values[i], thread.values[j] = thread.values[j], thread.values[i]
}
func (thread *mappingThread[KeyIn, ValueIn, KeyOut, ValueOut]) combine() {
if len(thread.keys) == 0 {
return
}
uniqueKeys := make([]KeyOut, 0)
combinedValues := make([]ValueOut, 0)
lastIndex := -1
for i := 1; i <= thread.Len(); i++ {
lastKey := thread.keys[i-1]
if i != thread.Len() {
currentKey := thread.keys[i]
if thread.KeyComparator(lastKey, currentKey) == comparison.Equal {
continue
}
}
firstIndex := lastIndex + 1
lastIndex = i - 1
if firstIndex == lastIndex {
value := thread.values[firstIndex]
uniqueKeys = append(uniqueKeys, lastKey)
combinedValues = append(combinedValues, value)
continue
}
validValues := thread.values[firstIndex : lastIndex+1]
reducedValue := thread.Config.Reducer(lastKey, validValues)
uniqueKeys = append(uniqueKeys, lastKey)
combinedValues = append(combinedValues, reducedValue)
}
thread.keys = uniqueKeys
thread.values = combinedValues
}