Skip to content

Commit 9a0dc86

Browse files
authored
refine code structure for expression resolver (#760)
* refine code structure for expression resolver * add columns package
1 parent b741910 commit 9a0dc86

28 files changed

+1262
-1348
lines changed

sql/attribute.go

Lines changed: 105 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,110 @@ type attribute struct {
2626
Value interface{}
2727
}
2828

29+
type gitLabModule struct {
30+
ModuleName string
31+
ProjectName string
32+
Sha string
33+
PrivateToken string
34+
SourceRoot string
35+
GitLabServer string
36+
}
37+
38+
type engineSpec struct {
39+
etype string
40+
ps resourceSpec
41+
worker resourceSpec
42+
cluster string
43+
queue string
44+
masterResourceRequest string
45+
masterResourceLimit string
46+
workerResourceRequest string
47+
workerResourceLimit string
48+
volume string
49+
imagePullPolicy string
50+
restartPolicy string
51+
extraPypiIndex string
52+
namespace string
53+
minibatchSize int
54+
masterPodPriority string
55+
clusterSpec string
56+
recordsPerTask int
57+
}
58+
59+
func getEngineSpec(attrs map[string]*attribute) engineSpec {
60+
getInt := func(key string, defaultValue int) int {
61+
if p, ok := attrs[key]; ok {
62+
strVal, _ := p.Value.(string)
63+
intVal, err := strconv.Atoi(strVal)
64+
65+
if err == nil {
66+
return intVal
67+
}
68+
}
69+
return defaultValue
70+
}
71+
getString := func(key string, defaultValue string) string {
72+
if p, ok := attrs[key]; ok {
73+
strVal, ok := p.Value.(string)
74+
if ok {
75+
// TODO(joyyoj): use the parser to do those validations.
76+
if strings.HasPrefix(strVal, "\"") && strings.HasSuffix(strVal, "\"") {
77+
return strVal[1 : len(strVal)-1]
78+
}
79+
return strVal
80+
}
81+
}
82+
return defaultValue
83+
}
84+
85+
psNum := getInt("ps_num", 1)
86+
psMemory := getInt("ps_memory", 2400)
87+
workerMemory := getInt("worker_memory", 1600)
88+
workerNum := getInt("worker_num", 2)
89+
engineType := getString("type", "local")
90+
if (psNum > 0 || workerNum > 0) && engineType == "local" {
91+
engineType = "yarn"
92+
}
93+
cluster := getString("cluster", "")
94+
queue := getString("queue", "")
95+
96+
// ElasticDL engine specs
97+
masterResourceRequest := getString("master_resource_request", "cpu=0.1,memory=1024Mi")
98+
masterResourceLimit := getString("master_resource_limit", "")
99+
workerResourceRequest := getString("worker_resource_request", "cpu=1,memory=4096Mi")
100+
workerResourceLimit := getString("worker_resource_limit", "")
101+
volume := getString("volume", "")
102+
imagePullPolicy := getString("image_pull_policy", "Always")
103+
restartPolicy := getString("restart_policy", "Never")
104+
extraPypiIndex := getString("extra_pypi_index", "")
105+
namespace := getString("namespace", "default")
106+
minibatchSize := getInt("minibatch_size", 64)
107+
masterPodPriority := getString("master_pod_priority", "")
108+
clusterSpec := getString("cluster_spec", "")
109+
recordsPerTask := getInt("records_per_task", 100)
110+
111+
return engineSpec{
112+
etype: engineType,
113+
ps: resourceSpec{Num: psNum, Memory: psMemory},
114+
worker: resourceSpec{Num: workerNum, Memory: workerMemory},
115+
cluster: cluster,
116+
queue: queue,
117+
masterResourceRequest: masterResourceRequest,
118+
masterResourceLimit: masterResourceLimit,
119+
workerResourceRequest: workerResourceRequest,
120+
workerResourceLimit: workerResourceLimit,
121+
volume: volume,
122+
imagePullPolicy: imagePullPolicy,
123+
restartPolicy: restartPolicy,
124+
extraPypiIndex: extraPypiIndex,
125+
namespace: namespace,
126+
minibatchSize: minibatchSize,
127+
masterPodPriority: masterPodPriority,
128+
clusterSpec: clusterSpec,
129+
recordsPerTask: recordsPerTask,
130+
}
131+
}
132+
29133
func (a *attribute) GenerateCode() (string, error) {
30134
if val, ok := a.Value.(string); ok {
31135
// auto convert to int first.
@@ -45,7 +149,7 @@ func (a *attribute) GenerateCode() (string, error) {
45149
return "", fmt.Errorf("value of attribute must be string or list of int, given %s", a.Value)
46150
}
47151

48-
func filter(attrs map[string]*attribute, prefix string, remove bool) map[string]*attribute {
152+
func attrFilter(attrs map[string]*attribute, prefix string, remove bool) map[string]*attribute {
49153
ret := make(map[string]*attribute, 0)
50154
for _, a := range attrs {
51155
if strings.EqualFold(a.Prefix, prefix) {
@@ -57,26 +161,3 @@ func filter(attrs map[string]*attribute, prefix string, remove bool) map[string]
57161
}
58162
return ret
59163
}
60-
61-
func resolveAttribute(attrs *attrs) (map[string]*attribute, error) {
62-
ret := make(map[string]*attribute)
63-
for k, v := range *attrs {
64-
subs := strings.SplitN(k, ".", 2)
65-
name := subs[len(subs)-1]
66-
prefix := ""
67-
if len(subs) == 2 {
68-
prefix = subs[0]
69-
}
70-
r, _, err := resolveExpression(v)
71-
if err != nil {
72-
return nil, err
73-
}
74-
a := &attribute{
75-
FullName: k,
76-
Prefix: prefix,
77-
Name: name,
78-
Value: r}
79-
ret[a.FullName] = a
80-
}
81-
return ret, nil
82-
}

sql/attribute_test.go

Lines changed: 0 additions & 45 deletions
This file was deleted.

sql/bucket_column.go

Lines changed: 0 additions & 84 deletions
This file was deleted.

sql/bucket_column_test.go

Lines changed: 0 additions & 55 deletions
This file was deleted.

0 commit comments

Comments
 (0)