Skip to content

Commit 548857e

Browse files
committed
lint
1 parent 8bcef5b commit 548857e

File tree

7 files changed

+36
-27
lines changed

7 files changed

+36
-27
lines changed

flow/connectors/postgres/postgres.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -150,20 +150,20 @@ func (c *PostgresConnector) fetchCustomTypeMapping(ctx context.Context, version
150150
c.customTypeMapping = customTypeMapping
151151

152152
if version >= shared.InternalVersion_CompositeTypeAsTuple {
153-
var compositeTypeNames []string
154-
for _, typeData := range customTypeMapping {
155-
if typeData.Type == 'c' && typeData.Delim == 0 { // Only composite types
156-
compositeTypeNames = append(compositeTypeNames, typeData.Name)
153+
var compositeTypeNames []string
154+
for _, typeData := range customTypeMapping {
155+
if typeData.Type == 'c' && typeData.Delim == 0 { // Only composite types
156+
compositeTypeNames = append(compositeTypeNames, typeData.Name)
157+
}
157158
}
159+
types, err := c.conn.LoadTypes(ctx, compositeTypeNames)
160+
if err != nil {
161+
c.logger.Error("failed to load composite types",
162+
slog.Any("error", err), slog.Any("composite_type_names", compositeTypeNames))
163+
return nil, fmt.Errorf("failed to load composite types: %w", err)
164+
}
165+
c.typeMap.RegisterTypes(types)
158166
}
159-
types, err := c.conn.LoadTypes(ctx, compositeTypeNames)
160-
if err != nil {
161-
c.logger.Error("failed to load composite types",
162-
slog.Any("error", err), slog.Any("composite_type_names", compositeTypeNames))
163-
return nil, fmt.Errorf("failed to load composite types: %w", err)
164-
}
165-
c.typeMap.RegisterTypes(types)
166-
}
167167
}
168168
return c.customTypeMapping, nil
169169
}
@@ -941,11 +941,12 @@ func (c *PostgresConnector) GetSelectedColumns(
941941
}
942942

943943
func (c *PostgresConnector) getCompositeTypeDetails(ctx context.Context, system protos.TypeSystem, version uint32,
944-
customTypeMapping map[uint32]shared.CustomDataType, OID uint32) ([]*protos.FieldDescription, error) {
944+
customTypeMapping map[uint32]shared.CustomDataType, oid uint32,
945+
) ([]*protos.FieldDescription, error) {
945946
result := make([]*protos.FieldDescription, 0)
946-
subfields, err := shared.GetCompositeDataTypeDetails(ctx, c.conn, OID)
947+
subfields, err := shared.GetCompositeDataTypeDetails(ctx, c.conn, oid)
947948
if err != nil {
948-
return nil, fmt.Errorf("error getting composite data type details for %d: %w", OID, err)
949+
return nil, fmt.Errorf("error getting composite data type details for %d: %w", oid, err)
949950
}
950951
for _, subfield := range subfields {
951952
var subColType string

flow/connectors/postgres/qrep_query_executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (qe *QRepQueryExecutor) buildQFieldFromOID(name string, oid uint32, typeMod
101101
}
102102
}
103103
qe.logger.Error("[pg_query_executor] type not found for oid or not a composite type",
104-
slog.String("type_oid", fmt.Sprintf("%d", oid)),
104+
slog.Uint64("type_oid", uint64(oid)),
105105
slog.String("type_name", name))
106106
return types.QField{
107107
Name: name,
@@ -123,7 +123,7 @@ func (qe *QRepQueryExecutor) buildQFieldFromOID(name string, oid uint32, typeMod
123123
}
124124
}
125125
qe.logger.Error("[pg_query_executor] type not found for oid or not an array composite type",
126-
slog.String("type_oid", fmt.Sprintf("%d", oid)),
126+
slog.Uint64("type_oid", uint64(oid)),
127127
slog.String("type_name", name))
128128
return types.QField{
129129
Name: name,

flow/connectors/postgres/qvalue_convert.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,12 @@ func (c *PostgresConnector) parseFieldFromPostgresOID(
765765
return nil, fmt.Errorf("failed to parse value %v (%T) into QValueKind %v", value, value, qvalueKind)
766766
}
767767

768-
func (c *PostgresConnector) compositeToQValue(oid uint32, value any, customTypeMapping map[uint32]shared.CustomDataType, version uint32) (types.QValue, error) {
768+
func (c *PostgresConnector) compositeToQValue(
769+
oid uint32,
770+
value any,
771+
customTypeMapping map[uint32]shared.CustomDataType,
772+
version uint32,
773+
) (types.QValue, error) {
769774
typ, ok := c.typeMap.TypeForOID(oid)
770775
if !ok {
771776
return nil, fmt.Errorf("composite type OID %d not found in typeMap", oid)

flow/model/qvalue/avro_converter.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/base64"
66
"encoding/hex"
7+
"errors"
78
"fmt"
89
"log/slog"
910
"math/big"
@@ -115,8 +116,8 @@ func GetAvroSchemaFromQValueKind(
115116
case types.QValueKindHStore, types.QValueKindJSON, types.QValueKindJSONB:
116117
return avro.NewPrimitiveSchema(avro.String, nil), nil
117118
case types.QValueKindComposite:
118-
if qField.SubFields == nil || len(qField.SubFields) == 0 {
119-
return nil, fmt.Errorf("composite subfields are required for composite type but none provided")
119+
if len(qField.SubFields) == 0 {
120+
return nil, errors.New("composite subfields are required for composite type but none provided")
120121
}
121122

122123
avroFields := make([]*avro.Field, 0, len(qField.SubFields))
@@ -143,8 +144,8 @@ func GetAvroSchemaFromQValueKind(
143144

144145
return avro.NewRecordSchema(qField.Name, "", avroFields)
145146
case types.QValueKindArrayComposite:
146-
if qField.SubFields == nil || len(qField.SubFields) != 1 {
147-
return nil, fmt.Errorf("array composite subfields must contain exactly one field definition")
147+
if len(qField.SubFields) != 1 {
148+
return nil, errors.New("array composite subfields must contain exactly one field definition")
148149
}
149150
fieldSchema, err := GetAvroSchemaFromQValueKind(
150151
ctx, env, *qField.SubFields[0], targetDWH)
@@ -682,7 +683,6 @@ func (c *QValueAvroConverter) processComposite(
682683
ctx, subfieldValue, subfield, c.TargetDWH, c.logger,
683684
c.UnboundedNumericAsString, c.Stat, binaryFormat,
684685
)
685-
686686
if err != nil {
687687
return nil, fmt.Errorf("failed to convert composite field %s: %w", subfield.Name, err)
688688
}

flow/model/qvalue/kind.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func ToDWHColumnType(
9191
colType = "JSON"
9292
} else if kind == types.QValueKindComposite {
9393
colType = "Tuple("
94-
if column.Composite == nil || len(column.Composite) == 0 {
94+
if len(column.Composite) == 0 {
9595
return "", fmt.Errorf("composite type is nil or empty for column %s", column.Name)
9696
}
9797
for _, subfield := range column.Composite {
@@ -110,10 +110,11 @@ func ToDWHColumnType(
110110
}
111111
colType = strings.TrimSuffix(colType, ", ") + ")"
112112
} else if kind == types.QValueKindArrayComposite {
113-
if column.Composite == nil || len(column.Composite) != 1 {
113+
if len(column.Composite) != 1 {
114114
return "", fmt.Errorf("composite array type %s must have exactly 1 subfield", column.Name)
115115
}
116-
elementType, err := ToDWHColumnType(ctx, types.QValueKind(column.Composite[0].Type), env, dwhType, dwhVersion, column.Composite[0], nullableEnabled)
116+
elementType, err := ToDWHColumnType(ctx, types.QValueKind(column.Composite[0].Type),
117+
env, dwhType, dwhVersion, column.Composite[0], nullableEnabled)
117118
if err != nil {
118119
return "", fmt.Errorf("failed to get DWH column type for composite field %s: %w", column.Composite[0].Name, err)
119120
}

flow/shared/types/qschema.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import (
88
type QField struct {
99
Name string
1010
Type QValueKind
11+
SubFields []*QField
1112
Precision int16
1213
Scale int16
1314
Nullable bool
14-
SubFields []*QField // For composite types, this holds the sub-fields
1515
}
1616

1717
type QRecordSchema struct {

flow/shared/types/qvalue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,9 +862,11 @@ type QValueArrayComposite struct {
862862
func (QValueArrayComposite) Kind() QValueKind {
863863
return QValueKindArrayComposite
864864
}
865+
865866
func (v QValueArrayComposite) Value() any {
866867
return v.Val
867868
}
869+
868870
func (v QValueArrayComposite) LValue(ls *lua.LState) lua.LValue {
869871
table := ls.NewTable()
870872
for i, composite := range v.Val {

0 commit comments

Comments
 (0)