Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support trace projectset #3286

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions internal/topo/operator/projectset_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/internal/topo/node/tracenode"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
)

Expand All @@ -42,7 +43,7 @@
case error:
return input
case xsql.Row:
results, err := ps.handleSRFRow(input)
results, err := ps.handleSRFRow(ctx, input)
if err != nil {
return err
}
Expand All @@ -58,7 +59,7 @@
}
input = input.Filter(sel)
}
if err := ps.handleSRFRowForCollection(input); err != nil {
if err := ps.handleSRFRowForCollection(ctx, input); err != nil {

Check warning on line 62 in internal/topo/operator/projectset_operator.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/operator/projectset_operator.go#L62

Added line #L62 was not covered by tests
return err
}
if ps.EnableLimit && ps.LimitCount > 0 && input.Len() > ps.LimitCount {
Expand All @@ -75,12 +76,12 @@
}
}

func (ps *ProjectSetOperator) handleSRFRowForCollection(data xsql.Collection) error {
func (ps *ProjectSetOperator) handleSRFRowForCollection(ctx api.StreamContext, data xsql.Collection) error {

Check warning on line 79 in internal/topo/operator/projectset_operator.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/operator/projectset_operator.go#L79

Added line #L79 was not covered by tests
switch collection := data.(type) {
case *xsql.JoinTuples:
newContent := make([]*xsql.JoinTuple, 0)
for _, c := range collection.Content {
rs, err := ps.handleSRFRow(c)
rs, err := ps.handleSRFRow(ctx, c)

Check warning on line 84 in internal/topo/operator/projectset_operator.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/operator/projectset_operator.go#L84

Added line #L84 was not covered by tests
if err != nil {
return err
}
Expand All @@ -90,7 +91,7 @@
case *xsql.GroupedTuplesSet:
newGroups := make([]*xsql.GroupedTuples, 0)
for _, c := range collection.Groups {
rs, err := ps.handleSRFRow(c)
rs, err := ps.handleSRFRow(ctx, c)

Check warning on line 94 in internal/topo/operator/projectset_operator.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/operator/projectset_operator.go#L94

Added line #L94 was not covered by tests
if err != nil {
return err
}
Expand All @@ -100,7 +101,7 @@
case *xsql.WindowTuples:
newContent := make([]xsql.Row, 0)
for _, c := range collection.Content {
rs, err := ps.handleSRFRow(c)
rs, err := ps.handleSRFRow(ctx, c)

Check warning on line 104 in internal/topo/operator/projectset_operator.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/operator/projectset_operator.go#L104

Added line #L104 was not covered by tests
if err != nil {
return err
}
Expand All @@ -113,7 +114,7 @@
return nil
}

func (ps *ProjectSetOperator) handleSRFRow(row xsql.Row) (*resultWrapper, error) {
func (ps *ProjectSetOperator) handleSRFRow(ctx api.StreamContext, row xsql.Row) (*resultWrapper, error) {
// for now we only support 1 srf function in the field
srfName := ""
for k := range ps.SrfMapping {
Expand All @@ -131,6 +132,8 @@
res := newResultWrapper(len(aValues), row)
for i, v := range aValues {
newRow := row.Clone().(xsql.Row)
newRow.SetTracerCtx(row.GetTracerCtx())
traced, _, span := tracenode.TraceInput(ctx, newRow, ctx.GetOpId())
// clear original column value
newRow.Del(srfName)
if mv, ok := v.(map[string]interface{}); ok {
Expand All @@ -140,6 +143,10 @@
} else {
newRow.Set(srfName, v)
}
if traced {
tracenode.RecordRowOrCollection(newRow, span)
span.End()
}

Check warning on line 149 in internal/topo/operator/projectset_operator.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/operator/projectset_operator.go#L147-L149

Added lines #L147 - L149 were not covered by tests
res.appendTuple(i, newRow)
}
return res, nil
Expand Down
Loading