Skip to content

Commit

Permalink
feat: support trace projectset (#3286)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer authored Oct 11, 2024
1 parent f1ee479 commit ca531f5
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions internal/topo/operator/projectset_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/internal/topo/node/tracenode"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
)

Expand All @@ -42,7 +43,7 @@ func (ps *ProjectSetOperator) Apply(ctx api.StreamContext, data interface{}, _ *
case error:
return input
case xsql.Row:
results, err := ps.handleSRFRow(input)
results, err := ps.handleSRFRow(ctx, input)
if err != nil {
return err
}
Expand All @@ -58,7 +59,7 @@ func (ps *ProjectSetOperator) Apply(ctx api.StreamContext, data interface{}, _ *
}
input = input.Filter(sel)
}
if err := ps.handleSRFRowForCollection(input); err != nil {
if err := ps.handleSRFRowForCollection(ctx, input); err != nil {
return err
}
if ps.EnableLimit && ps.LimitCount > 0 && input.Len() > ps.LimitCount {
Expand All @@ -75,12 +76,12 @@ func (ps *ProjectSetOperator) Apply(ctx api.StreamContext, data interface{}, _ *
}
}

func (ps *ProjectSetOperator) handleSRFRowForCollection(data xsql.Collection) error {
func (ps *ProjectSetOperator) handleSRFRowForCollection(ctx api.StreamContext, data xsql.Collection) error {
switch collection := data.(type) {
case *xsql.JoinTuples:
newContent := make([]*xsql.JoinTuple, 0)
for _, c := range collection.Content {
rs, err := ps.handleSRFRow(c)
rs, err := ps.handleSRFRow(ctx, c)
if err != nil {
return err
}
Expand All @@ -90,7 +91,7 @@ func (ps *ProjectSetOperator) handleSRFRowForCollection(data xsql.Collection) er
case *xsql.GroupedTuplesSet:
newGroups := make([]*xsql.GroupedTuples, 0)
for _, c := range collection.Groups {
rs, err := ps.handleSRFRow(c)
rs, err := ps.handleSRFRow(ctx, c)
if err != nil {
return err
}
Expand All @@ -100,7 +101,7 @@ func (ps *ProjectSetOperator) handleSRFRowForCollection(data xsql.Collection) er
case *xsql.WindowTuples:
newContent := make([]xsql.Row, 0)
for _, c := range collection.Content {
rs, err := ps.handleSRFRow(c)
rs, err := ps.handleSRFRow(ctx, c)
if err != nil {
return err
}
Expand All @@ -113,7 +114,7 @@ func (ps *ProjectSetOperator) handleSRFRowForCollection(data xsql.Collection) er
return nil
}

func (ps *ProjectSetOperator) handleSRFRow(row xsql.Row) (*resultWrapper, error) {
func (ps *ProjectSetOperator) handleSRFRow(ctx api.StreamContext, row xsql.Row) (*resultWrapper, error) {
// for now we only support 1 srf function in the field
srfName := ""
for k := range ps.SrfMapping {
Expand All @@ -131,6 +132,8 @@ func (ps *ProjectSetOperator) handleSRFRow(row xsql.Row) (*resultWrapper, error)
res := newResultWrapper(len(aValues), row)
for i, v := range aValues {
newRow := row.Clone().(xsql.Row)
newRow.SetTracerCtx(row.GetTracerCtx())
traced, _, span := tracenode.TraceInput(ctx, newRow, ctx.GetOpId())
// clear original column value
newRow.Del(srfName)
if mv, ok := v.(map[string]interface{}); ok {
Expand All @@ -140,6 +143,10 @@ func (ps *ProjectSetOperator) handleSRFRow(row xsql.Row) (*resultWrapper, error)
} else {
newRow.Set(srfName, v)
}
if traced {
tracenode.RecordRowOrCollection(newRow, span)
span.End()
}
res.appendTuple(i, newRow)
}
return res, nil
Expand Down

0 comments on commit ca531f5

Please sign in to comment.