Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: filter batch duration opt was not propagated correctly #1224

Merged
merged 1 commit into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions waku/v2/api/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func (fc FilterConfig) String() string {
return string(jsonStr)
}

const filterSubLoopInterval = 5 * time.Second

type Sub struct {
ContentFilter protocol.ContentFilter
DataCh chan *protocol.Envelope
Expand Down Expand Up @@ -69,13 +71,7 @@ func defaultOptions() []SubscribeOptions {
}

// Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) {
optList := append(defaultOptions(), opts...)
params := new(subscribeParameters)
for _, opt := range optList {
opt(params)
}

func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
sub := new(Sub)
sub.id = uuid.NewString()
sub.wf = wf
Expand All @@ -95,8 +91,9 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
sub.multiplex(subs)
}
}

go sub.subscriptionLoop(params.batchInterval)
// filter subscription loop is to check if target subscriptions for a filter are active and if not
// trigger resubscribe.
go sub.subscriptionLoop(filterSubLoopInterval)
return sub, nil
}

Expand Down
14 changes: 10 additions & 4 deletions waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type appFilterMap map[string]filterConfig
type FilterManager struct {
sync.Mutex
ctx context.Context
opts []SubscribeOptions
params *subscribeParameters
minPeersPerFilter int
onlineChecker *onlinechecker.DefaultOnlineChecker
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
Expand Down Expand Up @@ -64,18 +64,24 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
mgr.opts = opts
mgr.logger = logger
mgr.minPeersPerFilter = minPeersPerFilter
mgr.envProcessor = envProcessor
mgr.filterSubscriptions = make(map[string]SubDetails)
mgr.node = node
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
mgr.node.SetOnlineChecker(mgr.onlineChecker)
mgr.filterSubBatchDuration = 5 * time.Second
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)

//parsing the subscribe params only to read the batchInterval passed.
mgr.params = new(subscribeParameters)
opts = append(defaultOptions(), opts...)
for _, opt := range opts {
opt(mgr.params)
}
mgr.filterSubBatchDuration = mgr.params.batchInterval
go mgr.startFilterSubLoop()
return mgr
}
Expand Down Expand Up @@ -153,7 +159,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.opts...)
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
mgr.Unlock()
Expand Down
3 changes: 2 additions & 1 deletion waku/v2/api/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
ctx, cancel := context.WithCancel(context.Background())
s.Log.Info("About to perform API Subscribe()")
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log)
params := subscribeParameters{300 * time.Second, 1024}
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, &params)
s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter)
s.Log.Info("Subscribed")
Expand Down
Loading