Skip to content

Commit

Permalink
Event source iterable leak (#285)
Browse files Browse the repository at this point in the history
  • Loading branch information
maguro authored Apr 11, 2021
1 parent 16f17ed commit 7bab1e8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ require (
github.com/stretchr/testify v1.7.0
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775
go.uber.org/goleak v1.1.10
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
Expand Down
55 changes: 33 additions & 22 deletions iterable_eventsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,48 @@ func newEventSourceIterable(ctx context.Context, next <-chan Item, strategy Back
}

go func() {
defer func() {
it.closeAllObservers()
}()

deliver := func(item Item) (done bool) {
it.RLock()
defer it.RUnlock()

switch strategy {
default:
fallthrough
case Block:
for _, observer := range it.observers {
if !item.SendContext(ctx, observer) {
return true
}
}
case Drop:
for _, observer := range it.observers {
select {
default:
case <-ctx.Done():
return true
case observer <- item:
}
}
}
return
}

for {
select {
case <-ctx.Done():
it.closeAllObservers()
return
case item, ok := <-next:
if !ok {
it.closeAllObservers()
return
}
it.RLock()
switch strategy {
default:
fallthrough
case Block:
for _, observer := range it.observers {
if !item.SendContext(ctx, observer) {
return
}
}
case Drop:
for _, observer := range it.observers {
select {
default:
case <-ctx.Done():
return
case observer <- item:
}
}

if done := deliver(item); done {
return
}
it.RUnlock()
}
}
}()
Expand Down

0 comments on commit 7bab1e8

Please sign in to comment.