Skip to content

Commit

Permalink
indicator: fix klines stream emitter
Browse files Browse the repository at this point in the history
  • Loading branch information
c9s committed Jun 1, 2023
1 parent 9c43c75 commit 23a49a8
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions pkg/indicator/klinestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,43 @@ type KLineStream struct {
kLines []types.KLine
}

func (s *KLineStream) Length() int {
return len(s.kLines)
}

func (s *KLineStream) Last(i int) *types.KLine {
l := len(s.kLines)
if i < 0 || l-1-i < 0 {
return nil
}

return &s.kLines[l-1-i]
}

// AddSubscriber adds the subscriber function and push historical data to the subscriber
func (s *KLineStream) AddSubscriber(f func(k types.KLine)) {
s.OnUpdate(f)

if len(s.kLines) > 0 {
// push historical klines to the subscriber
for _, k := range s.kLines {
f(k)
}
}
s.OnUpdate(f)
}

// KLines creates a KLine stream that pushes the klines to the subscribers
func KLines(source types.Stream) *KLineStream {
func KLines(source types.Stream, symbol string, interval types.Interval) *KLineStream {
s := &KLineStream{}

source.OnKLineClosed(func(k types.KLine) {
source.OnKLineClosed(types.KLineWith(symbol, interval, func(k types.KLine) {
s.kLines = append(s.kLines, k)
s.EmitUpdate(k)

if len(s.kLines) > MaxNumOfKLines {
s.kLines = s.kLines[len(s.kLines)-1-MaxNumOfKLines:]
}
s.EmitUpdate(k)
})
}))

return s
}

0 comments on commit 23a49a8

Please sign in to comment.