Skip to content

Make raw column kinds optional #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func describeRawColumn(name string, resourcesRes *schema.GetResourcesResponse) (
}
dataStatus := resourcesRes.DataStatuses[rawColumn.GetID()]
out := dataStatusSummary(dataStatus)
out += resourceStr(rawColumn.GetUserConfig())
out += resourceStr(context.GetRawColumnUserConfig(rawColumn))
return out, nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
- kind: environment
name: dev
data:
type: csv
path: s3a://cortex-examples/reviews.csv
csv_config:
header: true
escape: "\""
schema: ["review", "label"]

- kind: transformed_column
name: embedding_input
transformer_path: implementations/transformers/tokenize_string_to_int.py
Expand Down
17 changes: 0 additions & 17 deletions examples/reviews/resources/raw_columns.yaml

This file was deleted.

11 changes: 1 addition & 10 deletions examples/reviews/resources/vocab.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
- kind: aggregator
name: vocab
output_type: {STRING: INT}
inputs:
columns:
col: STRING_COLUMN
args:
vocab_size: INT

- kind: aggregate
name: reviews_vocab
aggregator: vocab
aggregator_path: implementations/aggregators/vocab.py
inputs:
columns:
col: review
Expand Down
25 changes: 24 additions & 1 deletion pkg/operator/api/context/raw_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type RawColumns map[string]RawColumn
type RawColumn interface {
Column
GetCompute() *userconfig.SparkCompute
GetUserConfig() userconfig.Resource
}

type RawIntColumn struct {
Expand All @@ -45,6 +44,11 @@ type RawStringColumn struct {
*ComputedResourceFields
}

type RawInferredColumn struct {
*userconfig.RawInferredColumn
*ComputedResourceFields
}

func (rawColumns RawColumns) OneByID(id string) RawColumn {
for _, rawColumn := range rawColumns {
if rawColumn.GetID() == id {
Expand Down Expand Up @@ -79,6 +83,21 @@ func (rawColumns RawColumns) columnInputsID(columnInputValues map[string]interfa
return hash.Any(columnIDMap)
}

func GetRawColumnUserConfig(rawColumn RawColumn) userconfig.Resource {
switch rawColumn.GetType() {
case userconfig.IntegerColumnType:
return rawColumn.(*RawIntColumn).RawIntColumn
case userconfig.FloatColumnType:
return rawColumn.(*RawFloatColumn).RawFloatColumn
case userconfig.StringColumnType:
return rawColumn.(*RawStringColumn).RawStringColumn
case userconfig.InferredColumnType:
return rawColumn.(*RawInferredColumn).RawInferredColumn
}

return nil
}

func (rawColumns RawColumns) ColumnInputsID(columnInputValues map[string]interface{}) string {
return rawColumns.columnInputsID(columnInputValues, false)
}
Expand All @@ -98,3 +117,7 @@ func (rawColumn *RawFloatColumn) GetInputRawColumnNames() []string {
func (rawColumn *RawStringColumn) GetInputRawColumnNames() []string {
return []string{rawColumn.GetName()}
}

func (rawColumn *RawInferredColumn) GetInputRawColumnNames() []string {
return []string{rawColumn.GetName()}
}
20 changes: 14 additions & 6 deletions pkg/operator/api/context/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
)

type RawColumnsTypeSplit struct {
RawIntColumns map[string]*RawIntColumn `json:"raw_int_columns"`
RawStringColumns map[string]*RawStringColumn `json:"raw_string_columns"`
RawFloatColumns map[string]*RawFloatColumn `json:"raw_float_columns"`
RawIntColumns map[string]*RawIntColumn `json:"raw_int_columns"`
RawStringColumns map[string]*RawStringColumn `json:"raw_string_columns"`
RawFloatColumns map[string]*RawFloatColumn `json:"raw_float_columns"`
RawInferredColumns map[string]*RawInferredColumn `json:"raw_inferred_columns"`
}

type DataSplit struct {
Expand All @@ -45,6 +46,7 @@ func (ctx Context) splitRawColumns() *RawColumnsTypeSplit {
var rawIntColumns = make(map[string]*RawIntColumn)
var rawFloatColumns = make(map[string]*RawFloatColumn)
var rawStringColumns = make(map[string]*RawStringColumn)
var rawInferredColumns = make(map[string]*RawInferredColumn)
for name, rawColumn := range ctx.RawColumns {
switch typedRawColumn := rawColumn.(type) {
case *RawIntColumn:
Expand All @@ -53,13 +55,16 @@ func (ctx Context) splitRawColumns() *RawColumnsTypeSplit {
rawFloatColumns[name] = typedRawColumn
case *RawStringColumn:
rawStringColumns[name] = typedRawColumn
case *RawInferredColumn:
rawInferredColumns[name] = typedRawColumn
}
}

return &RawColumnsTypeSplit{
RawIntColumns: rawIntColumns,
RawFloatColumns: rawFloatColumns,
RawStringColumns: rawStringColumns,
RawIntColumns: rawIntColumns,
RawFloatColumns: rawFloatColumns,
RawStringColumns: rawStringColumns,
RawInferredColumns: rawInferredColumns,
}
}

Expand All @@ -75,6 +80,9 @@ func (serial Serial) collectRawColumns() RawColumns {
for name, rawColumn := range serial.RawColumnSplit.RawStringColumns {
rawColumns[name] = rawColumn
}
for name, rawColumn := range serial.RawColumnSplit.RawInferredColumns {
rawColumns[name] = rawColumn
}

return rawColumns
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/operator/api/userconfig/column_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
IntegerListColumnType
FloatListColumnType
StringListColumnType
InferredColumnType
)

var columnTypes = []string{
Expand All @@ -41,6 +42,7 @@ var columnTypes = []string{
"INT_LIST_COLUMN",
"FLOAT_LIST_COLUMN",
"STRING_LIST_COLUMN",
"INFERRED_COLUMN",
}

var columnJSONPlaceholders = []string{
Expand All @@ -51,6 +53,7 @@ var columnJSONPlaceholders = []string{
"[INT]",
"[FLOAT]",
"[\"STRING\"]",
"VALUE",
}

func ColumnTypeFromString(s string) ColumnType {
Expand Down
22 changes: 19 additions & 3 deletions pkg/operator/api/userconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ func (config *Config) Validate(envName string) error {
rawColumnNames := config.RawColumns.Names()
for _, env := range config.Environments {
ingestedColumnNames := env.Data.GetIngestedColumns()
missingColumns := slices.SubtractStrSlice(rawColumnNames, ingestedColumnNames)
if len(missingColumns) > 0 {
return errors.Wrap(ErrorRawColumnNotInEnv(env.Name), Identify(config.RawColumns.Get(missingColumns[0])))
missingColumnNames := slices.SubtractStrSlice(rawColumnNames, ingestedColumnNames)
if len(missingColumnNames) > 0 {
return errors.Wrap(ErrorRawColumnNotInEnv(env.Name), Identify(config.RawColumns.Get(missingColumnNames[0])))
}
extraColumns := slices.SubtractStrSlice(rawColumnNames, ingestedColumnNames)
if len(extraColumns) > 0 {
Expand Down Expand Up @@ -440,6 +440,22 @@ func New(configs map[string][]byte, envName string) (*Config, error) {
}
}

for _, env := range config.Environments {
ingestedColumnNames := env.Data.GetIngestedColumns()
missingColumnNames := slices.SubtractStrSlice(ingestedColumnNames, config.RawColumns.Names())
for _, inferredColumnName := range missingColumnNames {
inferredRawColumn := &RawInferredColumn{
ResourceFields: ResourceFields{
Name: inferredColumnName,
},
Type: InferredColumnType,
Compute: &SparkCompute{},
}
cr.Struct(inferredRawColumn.Compute, make(map[string]interface{}), sparkComputeStructValidation)
config.RawColumns = append(config.RawColumns, inferredRawColumn)
}
}

if err := config.Validate(envName); err != nil {
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/operator/api/userconfig/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
cr "github.com/cortexlabs/cortex/pkg/lib/configreader"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
"github.com/cortexlabs/cortex/pkg/lib/slices"
"github.com/cortexlabs/cortex/pkg/operator/api/resource"
)
Expand Down Expand Up @@ -337,6 +338,13 @@ func (environments Environments) Validate() error {
return ErrorDuplicateResourceName(dups...)
}

ingestedColumns := environments[0].Data.GetIngestedColumns()
for _, env := range environments[1:] {
if !strset.New(ingestedColumns...).IsEqual(strset.New(env.Data.GetIngestedColumns()...)) {
return ErrorEnvSchemaMismatch(environments[0], env)
}
}

return nil
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/operator/api/userconfig/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
ErrRegressionTargetType
ErrClassificationTargetType
ErrSpecifyOnlyOneMissing
ErrEnvSchemaMismatch
)

var errorKinds = []string{
Expand Down Expand Up @@ -92,9 +93,10 @@ var errorKinds = []string{
"err_regression_target_type",
"err_classification_target_type",
"err_specify_only_one_missing",
"err_env_schema_mismatch",
}

var _ = [1]int{}[int(ErrSpecifyOnlyOneMissing)-(len(errorKinds)-1)] // Ensure list length matches
var _ = [1]int{}[int(ErrEnvSchemaMismatch)-(len(errorKinds)-1)] // Ensure list length matches

func (t ErrorKind) String() string {
return errorKinds[t]
Expand Down Expand Up @@ -397,3 +399,15 @@ func ErrorSpecifyOnlyOneMissing(vals ...string) error {
message: message,
}
}

func ErrorEnvSchemaMismatch(env1, env2 *Environment) error {
return Error{
Kind: ErrEnvSchemaMismatch,
message: fmt.Sprintf("schemas diverge between environments (%s lists %s, and %s lists %s)",
env1.Name,
s.StrsAnd(env1.Data.GetIngestedColumns()),
env2.Name,
s.StrsAnd(env2.Data.GetIngestedColumns()),
),
}
}
31 changes: 20 additions & 11 deletions pkg/operator/api/userconfig/raw_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type RawColumn interface {
Column
GetType() ColumnType
GetCompute() *SparkCompute
GetUserConfig() Resource
}

type RawColumns []RawColumn
Expand Down Expand Up @@ -181,6 +180,12 @@ var rawStringColumnFieldValidations = []*cr.StructFieldValidation{
typeFieldValidation,
}

type RawInferredColumn struct {
ResourceFields
Type ColumnType `json:"type" yaml:"type"`
Compute *SparkCompute `json:"compute" yaml:"compute"`
}

func (rawColumns RawColumns) Validate() error {
resources := make([]Resource, len(rawColumns))
for i, res := range rawColumns {
Expand Down Expand Up @@ -224,6 +229,10 @@ func (column *RawStringColumn) GetType() ColumnType {
return column.Type
}

func (column *RawInferredColumn) GetType() ColumnType {
return column.Type
}

func (column *RawIntColumn) GetCompute() *SparkCompute {
return column.Compute
}
Expand All @@ -236,6 +245,10 @@ func (column *RawStringColumn) GetCompute() *SparkCompute {
return column.Compute
}

func (column *RawInferredColumn) GetCompute() *SparkCompute {
return column.Compute
}

func (column *RawIntColumn) GetResourceType() resource.Type {
return resource.RawColumnType
}
Expand All @@ -248,6 +261,10 @@ func (column *RawStringColumn) GetResourceType() resource.Type {
return resource.RawColumnType
}

func (column *RawInferredColumn) GetResourceType() resource.Type {
return resource.RawColumnType
}

func (column *RawIntColumn) IsRaw() bool {
return true
}
Expand All @@ -260,14 +277,6 @@ func (column *RawStringColumn) IsRaw() bool {
return true
}

func (column *RawIntColumn) GetUserConfig() Resource {
return column
}

func (column *RawFloatColumn) GetUserConfig() Resource {
return column
}

func (column *RawStringColumn) GetUserConfig() Resource {
return column
func (column *RawInferredColumn) IsRaw() bool {
return true
}
5 changes: 5 additions & 0 deletions pkg/operator/api/userconfig/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func CheckColumnRuntimeTypesMatch(columnRuntimeTypes map[string]interface{}, col
if !ok {
return errors.Wrap(ErrorUnsupportedColumnType(columnRuntimeTypeInter, validTypes), columnInputName)
}

if columnRuntimeType == InferredColumnType {
continue
}

if !slices.HasString(validTypes, columnRuntimeType.String()) {
return errors.Wrap(ErrorUnsupportedColumnType(columnRuntimeType, validTypes), columnInputName)
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/operator/context/raw_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,20 @@ func getRawColumns(
},
RawStringColumn: typedColumnConfig,
}
case *userconfig.RawInferredColumn:
buf.WriteString(typedColumnConfig.Name)
id := hash.Bytes(buf.Bytes())
rawColumn = &context.RawInferredColumn{
ComputedResourceFields: &context.ComputedResourceFields{
ResourceFields: &context.ResourceFields{
ID: id,
ResourceType: resource.RawColumnType,
},
},
RawInferredColumn: typedColumnConfig,
}
default:
return nil, errors.Wrap(configreader.ErrorInvalidStr(userconfig.TypeKey, userconfig.IntegerColumnType.String(), userconfig.FloatColumnType.String(), userconfig.StringColumnType.String()), userconfig.Identify(columnConfig)) // unexpected error
return nil, errors.Wrap(configreader.ErrorInvalidStr(typedColumnConfig.GetType().String(), userconfig.IntegerColumnType.String(), userconfig.FloatColumnType.String(), userconfig.StringColumnType.String()), userconfig.Identify(columnConfig)) // unexpected error
}

rawColumns[columnConfig.GetName()] = rawColumn
Expand Down
3 changes: 2 additions & 1 deletion pkg/operator/context/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func loadUserTransformers(
ResourceFields: userconfig.ResourceFields{
Name: implHash,
},
Path: *transColConfig.TransformerPath,
OutputType: userconfig.InferredColumnType,
Path: *transColConfig.TransformerPath,
}
transformer, err := newTransformer(*anonTransformerConfig, impl, nil, pythonPackages)
if err != nil {
Expand Down
Loading