Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/component/base/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Job struct {
// InputReader is an interface for reading input data from a job.
type InputReader interface {
// ReadData reads the input data from the job into the provided struct.
// The unmarshaler automatically detects the correct naming convention for each field
// based on available input data, providing seamless integration with any external package.
ReadData(ctx context.Context, input any) (err error)

// Deprecated: Read() is deprecated and will be removed in a future version.
Expand Down
278 changes: 253 additions & 25 deletions pkg/data/struct.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package data

import (
"container/list"
"context"
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"sync"

"github.com/iancoleman/strcase"

"github.com/instill-ai/pipeline-backend/pkg/data/format"
"github.com/instill-ai/pipeline-backend/pkg/external"
Expand Down Expand Up @@ -43,23 +47,100 @@ import (
// - For Document: "application/pdf", "text/plain", etc
// - For pointers: "default=value" to specify default value when nil

// fieldMappingCache implements an LRU cache for struct field name mappings
type fieldMappingCache struct {
cache map[reflect.Type]map[string]string // Type -> FieldName -> ResolvedName
lru *list.List // LRU list for eviction
items map[reflect.Type]*list.Element // Type -> List element mapping
maxSize int // Maximum number of cached types
mu sync.RWMutex // Read-write mutex for thread safety
}

// newFieldMappingCache creates a new LRU cache with the specified maximum size
func newFieldMappingCache(maxSize int) *fieldMappingCache {
return &fieldMappingCache{
cache: make(map[reflect.Type]map[string]string),
lru: list.New(),
items: make(map[reflect.Type]*list.Element),
maxSize: maxSize,
}
}

// get retrieves field mappings for a struct type from cache
func (c *fieldMappingCache) get(structType reflect.Type) (map[string]string, bool) {
if c == nil {
return nil, false
}
c.mu.RLock()
mappings, exists := c.cache[structType]
if exists {
// Move to front (most recently used)
c.lru.MoveToFront(c.items[structType])
}
c.mu.RUnlock()
return mappings, exists
}

// set stores field mappings for a struct type in cache
func (c *fieldMappingCache) set(structType reflect.Type, mappings map[string]string) {
if c == nil {
return
}
c.mu.Lock()
defer c.mu.Unlock()

// If already exists, update and move to front
if elem, exists := c.items[structType]; exists {
c.cache[structType] = mappings
c.lru.MoveToFront(elem)
return
}

// Check if we need to evict
if c.lru.Len() >= c.maxSize {
// Remove least recently used
oldest := c.lru.Back()
if oldest != nil {
oldestType := oldest.Value.(reflect.Type)
delete(c.cache, oldestType)
delete(c.items, oldestType)
c.lru.Remove(oldest)
}
}

// Add new entry
c.cache[structType] = mappings
elem := c.lru.PushFront(structType)
c.items[structType] = elem
}

// Marshaler is used to marshal a struct into a Map.
type Marshaler struct {
fieldCache *fieldMappingCache
}

// Unmarshaler is used to unmarshal data into a struct.
type Unmarshaler struct {
binaryFetcher external.BinaryFetcher
fieldCache *fieldMappingCache
}

// NewMarshaler creates a new Marshaler.
// NewMarshaler creates a new Marshaler with field name caching enabled.
func NewMarshaler() *Marshaler {
return &Marshaler{}
return &Marshaler{
fieldCache: newFieldMappingCache(200), // Cache up to 200 struct types
}
}

// NewUnmarshaler creates a new Unmarshaler with a binary fetcher.
// NewUnmarshaler creates a new Unmarshaler with automatic naming convention detection.
// The unmarshaler automatically detects the correct naming convention for each field
// based on available input data, providing seamless integration with any external package.
// Field name mappings are cached for improved performance on repeated operations.
func NewUnmarshaler(binaryFetcher external.BinaryFetcher) *Unmarshaler {
return &Unmarshaler{binaryFetcher}
return &Unmarshaler{
binaryFetcher: binaryFetcher,
fieldCache: newFieldMappingCache(200), // Cache up to 200 struct types
}
}

// Unmarshal converts a Map value into the provided struct s using `instill` tags.
Expand Down Expand Up @@ -112,7 +193,7 @@ func (u *Unmarshaler) unmarshalStruct(ctx context.Context, m Map, v reflect.Valu
}

// Get the field name from the embedded struct's field
embFieldName := u.getFieldName(embField)
embFieldName := u.getFieldNameFromMap(embField, m)
if val, ok := m[embFieldName]; ok {
if err := u.unmarshalValue(ctx, val, embValue, embField); err != nil {
return fmt.Errorf("error unmarshaling embedded field %s: %w", embFieldName, err)
Expand All @@ -126,7 +207,7 @@ func (u *Unmarshaler) unmarshalStruct(ctx context.Context, m Map, v reflect.Valu
continue
}

fieldName := u.getFieldName(field)
fieldName := u.getFieldNameFromMap(field, m)
val, ok := m[fieldName]
if !ok {
// Check for default value if field is nil pointer or zero value
Expand Down Expand Up @@ -411,13 +492,20 @@ func (u *Unmarshaler) unmarshalToReflectMap(ctx context.Context, v Map, field re

// unmarshalToStruct handles unmarshaling of Map values into struct.
func (u *Unmarshaler) unmarshalToStruct(ctx context.Context, v Map, field reflect.Value) error {
structType := field.Type()

// Get cached field mappings for this struct type
fieldMappings := u.getFieldMappingsForType(structType, v)

for i := 0; i < field.NumField(); i++ {
structField := field.Type().Field(i)
structField := structType.Field(i)
fieldValue := field.Field(i)
if !fieldValue.CanSet() {
continue
}
fieldName := u.getFieldName(structField)

// Use cached mapping instead of computing each time
fieldName := fieldMappings[structField.Name]
val, ok := v[fieldName]
if !ok {
continue
Expand Down Expand Up @@ -552,14 +640,76 @@ func (u *Unmarshaler) unmarshalInterface(v format.Value, field reflect.Value, st
return fmt.Errorf("cannot unmarshal %T into %v", v, field.Type())
}

// getFieldName returns the field name from the struct tag or the field name itself.
func (u *Unmarshaler) getFieldName(field reflect.StructField) string {
tag := field.Tag.Get("instill")
if tag == "" {
return field.Name
// getFieldMappingsForType returns cached field name mappings for a struct type.
// If not cached, it computes the mappings using automatic naming convention detection.
func (u *Unmarshaler) getFieldMappingsForType(structType reflect.Type, inputMap Map) map[string]string {
// Initialize cache if nil (for backward compatibility with tests)
if u.fieldCache == nil {
u.fieldCache = newFieldMappingCache(200)
}

// Try to get from cache first
if mappings, exists := u.fieldCache.get(structType); exists {
return mappings
}

// Cache miss - compute mappings for all fields in this struct type
mappings := make(map[string]string)
for i := 0; i < structType.NumField(); i++ {
field := structType.Field(i)
mappings[field.Name] = u.computeFieldName(field, inputMap)
}

// Cache the computed mappings
u.fieldCache.set(structType, mappings)
return mappings
}

// computeFieldName computes the field name using automatic naming convention detection.
// This is the core logic that tries different naming conventions and returns the one that exists in the input map.
func (u *Unmarshaler) computeFieldName(field reflect.StructField, inputMap Map) string {
// First priority: instill tag (always takes precedence)
if tag := field.Tag.Get("instill"); tag != "" {
parts := strings.Split(tag, ",")
return parts[0]
}
parts := strings.Split(tag, ",")
return parts[0]

// Second priority: try json tag with automatic convention detection
jsonTag := field.Tag.Get("json")
if jsonTag != "" && jsonTag != "-" {
parts := strings.Split(jsonTag, ",")
jsonFieldName := parts[0]
if jsonFieldName != "" {
// Try different naming conventions and return the one that exists in input
conversions := []struct {
name string
converted string
}{
{"kebab-case", jsonFieldName}, // No conversion
{"camelCase", strcase.ToKebab(jsonFieldName)}, // camelCase -> kebab-case
{"snake_case", strcase.ToKebab(jsonFieldName)}, // snake_case -> kebab-case
{"PascalCase", strcase.ToKebab(jsonFieldName)}, // PascalCase -> kebab-case
}

for _, conv := range conversions {
if _, exists := inputMap[conv.converted]; exists {
return conv.converted
}
}
}
}

// Fallback: use field name as-is
return field.Name
}

// getFieldNameFromMap returns the field name using cached mappings when possible.
// This method maintains backward compatibility while leveraging caching for performance.
func (u *Unmarshaler) getFieldNameFromMap(field reflect.StructField, inputMap Map) string {
// For single field lookups, we still use the direct computation to avoid
// computing mappings for the entire struct when only one field is needed.
// The cache is most beneficial for full struct unmarshaling.
return u.computeFieldName(field, inputMap)
}

// Marshal converts a struct into a Map that represents the struct fields as values.
Expand Down Expand Up @@ -621,34 +771,112 @@ func (m *Marshaler) marshalValue(v reflect.Value) (format.Value, error) {
}
}

// marshalStruct handles marshaling of struct values.
func (m *Marshaler) marshalStruct(v reflect.Value) (Map, error) {
t := v.Type()
mp := Map{}
// getMarshalFieldMappings returns cached field name mappings for marshaling a struct type.
// If not cached, it computes the mappings for consistent kebab-case output.
func (m *Marshaler) getMarshalFieldMappings(structType reflect.Type) map[string]marshalFieldInfo {
// Initialize cache if nil (for backward compatibility with tests)
if m.fieldCache == nil {
m.fieldCache = newFieldMappingCache(200)
}

for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
fieldValue := v.Field(i)
// Try to get from cache first
if mappings, exists := m.fieldCache.get(structType); exists {
// Convert cached string mappings to marshalFieldInfo
result := make(map[string]marshalFieldInfo)
for fieldName, resolvedName := range mappings {
// We need to recompute format tags since they're not cached in string form
field, _ := structType.FieldByName(fieldName)
var formatTag string
if instillTag := field.Tag.Get("instill"); instillTag != "" {
parts := strings.Split(instillTag, ",")
if len(parts) > 1 {
formatTag = parts[1]
}
}
result[fieldName] = marshalFieldInfo{
resolvedName: resolvedName,
formatTag: formatTag,
}
}
return result
}

// Cache miss - compute mappings for all fields in this struct type
mappings := make(map[string]string)
fieldInfos := make(map[string]marshalFieldInfo)

for i := 0; i < structType.NumField(); i++ {
field := structType.Field(i)

// Skip unexported fields
if field.PkgPath != "" {
continue
}

tag := field.Tag.Get("instill")
var fieldName string
var formatTag string

if tag != "" {
parts := strings.Split(tag, ",")
// First priority: instill tag
if instillTag := field.Tag.Get("instill"); instillTag != "" {
parts := strings.Split(instillTag, ",")
fieldName = parts[0]
if len(parts) > 1 {
formatTag = parts[1]
}
} else if jsonTag := field.Tag.Get("json"); jsonTag != "" && jsonTag != "-" {
// Second priority: json tag, convert to kebab-case
parts := strings.Split(jsonTag, ",")
jsonFieldName := parts[0]
if jsonFieldName != "" {
fieldName = strcase.ToKebab(jsonFieldName)
} else {
fieldName = field.Name
}
} else {
// Fallback: use field name as-is
fieldName = field.Name
}

mappings[field.Name] = fieldName
fieldInfos[field.Name] = marshalFieldInfo{
resolvedName: fieldName,
formatTag: formatTag,
}
}

// Cache the computed mappings (string form for compatibility with cache)
m.fieldCache.set(structType, mappings)
return fieldInfos
}

// marshalFieldInfo contains field mapping information for marshaling
type marshalFieldInfo struct {
resolvedName string
formatTag string
}

// marshalStruct handles marshaling of struct values.
func (m *Marshaler) marshalStruct(v reflect.Value) (Map, error) {
t := v.Type()
mp := Map{}

// Get cached field mappings for this struct type
fieldMappings := m.getMarshalFieldMappings(t)

for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
fieldValue := v.Field(i)

// Skip unexported fields
if field.PkgPath != "" {
continue
}

// Use cached mapping
fieldInfo := fieldMappings[field.Name]
fieldName := fieldInfo.resolvedName
formatTag := fieldInfo.formatTag

// Handle format conversion before marshaling
if formatTag != "" && fieldValue.CanInterface() {
if val, ok := fieldValue.Interface().(format.Value); ok {
Expand Down
Loading
Loading