Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: more logs for rule state and error #3158

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/topo/rule/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (s *State) transit(newState RunState, err error) {
default:
// do nothing
}
s.logger.Infof("rule %s transit to state %s", s.Rule.Id, StateName[s.currentState])
}

func (s *State) GetState() RunState {
Expand Down Expand Up @@ -263,6 +264,7 @@ func (s *State) Start() error {
}
// Start normally or start in schedule period Rule
// doStart trigger the Rule run. If no trigger error, the Rule will run async and control the state by itself
s.logger.Infof("start to run rule %s", s.Rule.Id)
err := s.doStart()
if err != nil {
s.transit(StoppedByErr, err)
Expand All @@ -281,6 +283,7 @@ func (s *State) ScheduleStart() error {
return nil
}
// doStart trigger the Rule run. If no trigger error, the Rule will run async and control the state by itself
s.logger.Infof("schedule to run rule %s", s.Rule.Id)
err := s.doStart()
if err != nil {
s.transit(StoppedByErr, err)
Expand Down Expand Up @@ -372,6 +375,7 @@ func (s *State) Stop() {
return
}
// do stop, stopping action and starting action are mutual exclusive. No concurrent problem here
s.logger.Infof("stopping rule %s", s.Rule.Id)
err := s.doStop()
if err == nil {
err = errors.New("canceled manually")
Expand All @@ -389,6 +393,7 @@ func (s *State) ScheduleStop() {
return
}
// do stop, stopping action and starting action are mutual exclusive. No concurrent problem here
s.logger.Infof("schedule to stop rule %s", s.Rule.Id)
err := s.doStop()
// currentState may be accessed concurrently
if schedule.IsAfterTimeRanges(timex.GetNow(), s.Rule.Options.CronDatetimeRange) {
Expand Down
3 changes: 2 additions & 1 deletion internal/topo/topotest/plugin_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,15 @@ func TestExtensions(t *testing.T) {
nm := make([]map[string]any, 0, len(rt))
for _, mm := range rt {
nm = append(nm, mm.ToMap())
break
}
maps = append(maps, nm)
break
default:
conf.Log.Errorf("receive wrong tuple %v", rt)
}
}
return maps
return maps[:1]
})
}
}
Expand Down
19 changes: 4 additions & 15 deletions pkg/infra/saferun.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package infra
import (
"errors"
"fmt"
"runtime"
"runtime/debug"

"github.com/lf-edge/ekuiper/contract/v2/api"
Expand Down Expand Up @@ -53,27 +54,15 @@ func SafeRun(fn func() error) (err error) {
// It is usually the error outlet of an op/rule.
func DrainError(ctx api.StreamContext, err error, errCh chan<- error) {
if err != nil {
_, file, line, _ := runtime.Caller(1) // 1 means the caller of DrainError
if ctx != nil {
ctx.GetLogger().Errorf("runtime error: %v", err)
ctx.GetLogger().Errorf("runtime error from %s/l%d: %v", file, line, err)
} else {
conf.Log.Errorf("runtime error: %v", err)
conf.Log.Errorf("runtime error %s/l%d: %v", file, line, err)
}
}
select {
case errCh <- err:
default:
}
}

// DrainCtrl a non-block function to send out the signal to the ctrl channel
// It will retry in blocking mode once the channel is full and make sure it is delivered finally but may lose order
func DrainCtrl(ctx api.StreamContext, signal any, ctrlCh chan<- any) {
select {
case ctrlCh <- signal:
default:
ctx.GetLogger().Warnf("failed to send signal %v to ctrl channel, retry", signal)
go func() {
ctrlCh <- signal
}()
}
}
Loading