Skip to content

Commit

Permalink
refactor SlidingWindow
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Jan 6, 2023
1 parent 40a2384 commit cd92d06
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions flow/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flow

import (
"container/heap"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -30,44 +31,50 @@ var _ streams.Flow = (*SlidingWindow)(nil)
// NewSlidingWindow returns a new processing time based SlidingWindow.
// Processing time refers to the system time of the machine that is executing the respective operation.
//
// size is the Duration of generated windows.
// slide is the sliding interval of generated windows.
func NewSlidingWindow(size time.Duration, slide time.Duration) *SlidingWindow {
return NewSlidingWindowWithTSExtractor(size, slide, nil)
// windowSize is the Duration of generated windows.
// slidingInterval is the sliding interval of generated windows.
func NewSlidingWindow(windowSize time.Duration, slidingInterval time.Duration) (*SlidingWindow, error) {
return NewSlidingWindowWithTSExtractor(windowSize, slidingInterval, nil)
}

// NewSlidingWindowWithTSExtractor returns a new event time based SlidingWindow.
// Event time is the time that each individual event occurred on its producing device.
// Gives correct results on out-of-order events, late events, or on replays of data.
//
// size is the Duration of generated windows.
// slide is the sliding interval of generated windows.
// windowSize is the Duration of generated windows.
// slidingInterval is the sliding interval of generated windows.
// timestampExtractor is the record timestamp (in nanoseconds) extractor.
func NewSlidingWindowWithTSExtractor(size time.Duration, slide time.Duration,
timestampExtractor func(interface{}) int64) *SlidingWindow {
func NewSlidingWindowWithTSExtractor(windowSize time.Duration, slidingInterval time.Duration,
timestampExtractor func(interface{}) int64) (*SlidingWindow, error) {

if windowSize < slidingInterval {
return nil, errors.New("slidingInterval is larger than windowSize")
}

window := &SlidingWindow{
windowSize: size,
slidingInterval: slide,
windowSize: windowSize,
slidingInterval: slidingInterval,
queue: &PriorityQueue{},
in: make(chan interface{}),
out: make(chan interface{}),
done: make(chan struct{}),
timestampExtractor: timestampExtractor,
}
go window.receive()
go window.emit()

return window
return window, nil
}

// Via streams data through the given flow
func (sw *SlidingWindow) Via(flow streams.Flow) streams.Flow {
go sw.emit()
go sw.transmit(flow)
return flow
}

// To streams data to the given sink
func (sw *SlidingWindow) To(sink streams.Sink) {
go sw.emit()
sw.transmit(sink)
}

Expand Down Expand Up @@ -111,6 +118,9 @@ func (sw *SlidingWindow) receive() {

// emit is triggered by the sliding interval
func (sw *SlidingWindow) emit() {
// wait for the sliding window to start
time.Sleep(sw.windowSize - sw.slidingInterval)

ticker := time.NewTicker(sw.slidingInterval)
defer ticker.Stop()

Expand All @@ -134,8 +144,8 @@ func (sw *SlidingWindow) emit() {
break
}
}
windowSlice := extract(sw.queue.Slice(windowBottomIndex, slideUpperIndex))
if windowUpperIndex > 0 {
windowSlice := extract(sw.queue.Slice(windowBottomIndex, windowUpperIndex))
if windowUpperIndex > 0 { // the queue is not empty
s := sw.queue.Slice(slideUpperIndex, windowUpperIndex)
// reset the queue
sw.queue = &s
Expand All @@ -154,11 +164,11 @@ func (sw *SlidingWindow) emit() {
}
}

// extract generates a new window.
// extract generates a new window slice out of the given items.
func extract(items []*Item) []interface{} {
rt := make([]interface{}, len(items))
messages := make([]interface{}, len(items))
for i, item := range items {
rt[i] = item.Msg
messages[i] = item.Msg
}
return rt
return messages
}

0 comments on commit cd92d06

Please sign in to comment.