Skip to content

Commit

Permalink
Updates job mapping validator to return error codes (#3179)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored Jan 23, 2025
1 parent ba356bc commit cfeb6fb
Show file tree
Hide file tree
Showing 13 changed files with 4,853 additions and 2,525 deletions.
4,608 changes: 2,683 additions & 1,925 deletions backend/gen/go/protos/mgmt/v1alpha1/job.pb.go

Large diffs are not rendered by default.

50 changes: 50 additions & 0 deletions backend/gen/go/protos/mgmt/v1alpha1/job.pb.json.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

131 changes: 123 additions & 8 deletions backend/protos/mgmt/v1alpha1/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ message PostgresDestinationConnectionOptions {

message PostgresOnConflictConfig {
// @deprecated - Use strategy nothing instead
bool do_nothing = 1;
bool do_nothing = 1 [deprecated = true];

oneof strategy {
option (buf.validate.oneof).required = false;
Expand Down Expand Up @@ -1111,8 +1111,47 @@ message ColumnError {
string table = 2;
// The column of the error
string column = 3;
// The list of errors
repeated string errors = 4;
// @deprecated - Use error_reports instead
repeated string errors = 4 [deprecated = true];

// The list of error reports
repeated ColumnErrorReport error_reports = 5;

// An enumeration of column error codes
enum ColumnErrorCode {
// Default unspecified value
COLUMN_ERROR_CODE_UNSPECIFIED = 0;
// Column not found in source database
COLUMN_ERROR_CODE_NOT_FOUND_IN_SOURCE = 1;
// Column not found in job mapping
COLUMN_ERROR_CODE_NOT_FOUND_IN_MAPPING = 2;
// Required column not found in job mapping
COLUMN_ERROR_CODE_REQUIRED_COLUMN_NOT_FOUND_IN_MAPPING = 3;
// Required foreign key not found in job mapping
COLUMN_ERROR_CODE_REQUIRED_FOREIGN_KEY_NOT_FOUND_IN_MAPPING = 4;
// Unsupported circular dependency detected
COLUMN_ERROR_CODE_UNSUPPORTED_CIRCULAR_DEPENDENCY_AT_LEAST_ONE_NULLABLE = 5;
// Virtual foreign key source column not found in mapping
COLUMN_ERROR_CODE_VFK_SOURCE_COLUMN_NOT_FOUND_IN_MAPPING = 6;
// Virtual foreign key source column not found in source
COLUMN_ERROR_CODE_VFK_SOURCE_COLUMN_NOT_FOUND_IN_SOURCE = 7;
// Virtual foreign key target column not found in mapping
COLUMN_ERROR_CODE_VFK_TARGET_COLUMN_NOT_FOUND_IN_MAPPING = 8;
// Virtual foreign key target column not found in source
COLUMN_ERROR_CODE_VFK_TARGET_COLUMN_NOT_FOUND_IN_SOURCE = 9;
// Virtual foreign key column datatype mismatch
COLUMN_ERROR_CODE_VFK_COLUMN_DATATYPE_MISMATCH = 10;
// Virtual foreign key source column not unique
COLUMN_ERROR_CODE_VFK_SOURCE_COLUMN_NOT_UNIQUE = 11;
}

// Column error report
message ColumnErrorReport {
// The error code
ColumnErrorCode code = 1;
// The error message
string message = 2;
}
}

message ColumnWarning {
Expand All @@ -1122,21 +1161,97 @@ message ColumnWarning {
string table = 2;
// The column of the warning
string column = 3;
// The list of warnings
repeated string warnings = 5;
// @deprecated - Use warning_reports instead
repeated string warnings = 5 [deprecated = true];
// The list of warning reports
repeated ColumnWarningReport warning_reports = 6;

// An enumeration of column warning codes
enum ColumnWarningCode {
// Default unspecified value
COLUMN_WARNING_CODE_UNSPECIFIED = 0;
// Column not found in source database
COLUMN_WARNING_CODE_NOT_FOUND_IN_SOURCE = 1;
// Column not found in job mapping
COLUMN_WARNING_CODE_NOT_FOUND_IN_MAPPING = 2;
}

// Column warning report
message ColumnWarningReport {
// The warning code
ColumnWarningCode code = 1;
// The warning message
string message = 2;
}
}

message DatabaseError {
// The list of errors
repeated string errors = 1;
// @deprecated - Use error_reports instead
repeated string errors = 1 [deprecated = true];
// The list of error reports
repeated DatabaseErrorReport error_reports = 2;

// An enumeration of database error codes
enum DatabaseErrorCode {
// Default unspecified value
DATABASE_ERROR_CODE_UNSPECIFIED = 0;
// Unsupported circular dependency detected
DATABASE_ERROR_CODE_UNSUPPORTED_CIRCULAR_DEPENDENCY_AT_LEAST_ONE_NULLABLE = 1;
// Virtual foreign key column mismatch
DATABASE_ERROR_CODE_VFK_COLUMN_MISMATCH = 2;
}

// Database error report
message DatabaseErrorReport {
// The error code
DatabaseErrorCode code = 1;
// The error message
string message = 2;
}
}

message TableError {
// The schema of the table
string schema = 1;
// The table of the error
string table = 2;
// The list of error reports
repeated TableErrorReport error_reports = 3;

// An enumeration of table error codes
enum TableErrorCode {
// Default unspecified value
TABLE_ERROR_CODE_UNSPECIFIED = 0;
// Table not found in source database
TABLE_ERROR_CODE_TABLE_NOT_FOUND_IN_SOURCE = 1;
// Virtual foreign key source table not found in mapping
TABLE_ERROR_CODE_VFK_SOURCE_TABLE_NOT_FOUND_IN_MAPPING = 2;
// Virtual foreign key source table not found in source
TABLE_ERROR_CODE_VFK_SOURCE_TABLE_NOT_FOUND_IN_SOURCE = 3;
// Virtual foreign key target table not found in mapping
TABLE_ERROR_CODE_VFK_TARGET_TABLE_NOT_FOUND_IN_MAPPING = 4;
// Virtual foreign key target table not found in source
TABLE_ERROR_CODE_VFK_TARGET_TABLE_NOT_FOUND_IN_SOURCE = 5;
}

// Database error report
message TableErrorReport {
// The error code
TableErrorCode code = 1;
// The error message
string message = 2;
}
}

message ValidateJobMappingsResponse {
// The list of column errors
repeated ColumnError column_errors = 1;
// The database error
// The list of database errors
DatabaseError database_errors = 2;
// The list of column warnings
repeated ColumnWarning column_warnings = 3;
// The list of table errors
repeated TableError table_errors = 4;
}

message VirtualForeignKey {
Expand Down
54 changes: 44 additions & 10 deletions backend/services/mgmt/v1alpha1/job-service/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1561,20 +1561,40 @@ func (s *Service) ValidateJobMappings(
return nil, err
}

dbErrMsgs := []string{}
dbErrReports := []*mgmtv1alpha1.DatabaseError_DatabaseErrorReport{}
for _, err := range result.DatabaseErrors {
dbErrMsgs = append(dbErrMsgs, err.Message)
dbErrReports = append(dbErrReports, &mgmtv1alpha1.DatabaseError_DatabaseErrorReport{
Code: err.Code,
Message: err.Message,
})
}
dbErrors := &mgmtv1alpha1.DatabaseError{
Errors: []string{},
Errors: dbErrMsgs,
ErrorReports: dbErrReports,
}

tableErrors := []*mgmtv1alpha1.TableError{}
for tableName, errs := range result.TableErrors {
schema, table := sqlmanager_shared.SplitTableKey(tableName)
tableErrors = append(tableErrors, &mgmtv1alpha1.TableError{
Schema: schema,
Table: table,
ErrorReports: errs,
})
}
dbErrors.Errors = append(dbErrors.Errors, result.DatabaseErrors...)

colErrors := []*mgmtv1alpha1.ColumnError{}
for tableName, colMap := range result.ColumnErrors {
for col, errors := range colMap {
schema, table := sqlmanager_shared.SplitTableKey(tableName)
colErrors = append(colErrors, &mgmtv1alpha1.ColumnError{
Schema: schema,
Table: table,
Column: col,
Errors: errors,
Schema: schema,
Table: table,
Column: col,
Errors: getErrorMessages(errors),
ErrorReports: errors,
})
}
}
Expand All @@ -1584,21 +1604,35 @@ func (s *Service) ValidateJobMappings(
for col, warnings := range colMap {
schema, table := sqlmanager_shared.SplitTableKey(tableName)
colWarnings = append(colWarnings, &mgmtv1alpha1.ColumnWarning{
Schema: schema,
Table: table,
Column: col,
Warnings: warnings,
Schema: schema,
Table: table,
Column: col,
Warnings: getErrorMessages(warnings),
WarningReports: warnings,
})
}
}

return connect.NewResponse(&mgmtv1alpha1.ValidateJobMappingsResponse{
DatabaseErrors: dbErrors,
TableErrors: tableErrors,
ColumnErrors: colErrors,
ColumnWarnings: colWarnings,
}), nil
}

type ErrorReport interface {
GetMessage() string
}

func getErrorMessages[T ErrorReport](errorsReports []T) []string {
messages := []string{}
for _, err := range errorsReports {
messages = append(messages, err.GetMessage())
}
return messages
}

func getJobSourceConnectionId(jobSource *mgmtv1alpha1.JobSource) (*string, error) {
var connectionIdToVerify *string
switch config := jobSource.Options.Config.(type) {
Expand Down
Loading

0 comments on commit cfeb6fb

Please sign in to comment.