From 315c05b35105e988e7ee54958c2d8e483e522eb4 Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Mon, 23 Sep 2024 14:57:04 -0500 Subject: [PATCH] async/event: Use geth's implementation of event/feed.go (#14362) * Use geth's event * run gazelle --- async/event/BUILD.bazel | 12 +- async/event/example_feed_test.go | 73 ----- async/event/feed.go | 236 +------------- async/event/feed_test.go | 509 ------------------------------- 4 files changed, 8 insertions(+), 822 deletions(-) delete mode 100644 async/event/example_feed_test.go delete mode 100644 async/event/feed_test.go diff --git a/async/event/BUILD.bazel b/async/event/BUILD.bazel index 1ceb12f4ff73..6d32b727adf6 100644 --- a/async/event/BUILD.bazel +++ b/async/event/BUILD.bazel @@ -8,22 +8,20 @@ go_library( ], importpath = "github.com/prysmaticlabs/prysm/v5/async/event", visibility = ["//visibility:public"], - deps = ["//time/mclock:go_default_library"], + deps = [ + "//time/mclock:go_default_library", + "@com_github_ethereum_go_ethereum//event:go_default_library", + ], ) go_test( name = "go_default_test", size = "small", srcs = [ - "example_feed_test.go", "example_scope_test.go", "example_subscription_test.go", - "feed_test.go", "subscription_test.go", ], embed = [":go_default_library"], - deps = [ - "//testing/assert:go_default_library", - "//testing/require:go_default_library", - ], + deps = ["//testing/require:go_default_library"], ) diff --git a/async/event/example_feed_test.go b/async/event/example_feed_test.go deleted file mode 100644 index ddc3730e2202..000000000000 --- a/async/event/example_feed_test.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package event_test - -import ( - "fmt" - - "github.com/prysmaticlabs/prysm/v5/async/event" -) - -func ExampleFeed_acknowledgedEvents() { - // This example shows how the return value of Send can be used for request/reply - // interaction between event consumers and producers. - var feed event.Feed - type ackedEvent struct { - i int - ack chan<- struct{} - } - - // Consumers wait for events on the feed and acknowledge processing. - done := make(chan struct{}) - defer close(done) - for i := 0; i < 3; i++ { - ch := make(chan ackedEvent, 100) - sub := feed.Subscribe(ch) - go func() { - defer sub.Unsubscribe() - for { - select { - case ev := <-ch: - fmt.Println(ev.i) // "process" the event - ev.ack <- struct{}{} - case <-done: - return - } - } - }() - } - - // The producer sends values of type ackedEvent with increasing values of i. - // It waits for all consumers to acknowledge before sending the next event. - for i := 0; i < 3; i++ { - acksignal := make(chan struct{}) - n := feed.Send(ackedEvent{i, acksignal}) - for ack := 0; ack < n; ack++ { - <-acksignal - } - } - // Output: - // 0 - // 0 - // 0 - // 1 - // 1 - // 1 - // 2 - // 2 - // 2 -} diff --git a/async/event/feed.go b/async/event/feed.go index 46d7f77753c8..1ebb581c14b0 100644 --- a/async/event/feed.go +++ b/async/event/feed.go @@ -14,241 +14,11 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Package event contains an event feed implementation for process communication. package event import ( - "errors" - "reflect" - "slices" - "sync" + geth_event "github.com/ethereum/go-ethereum/event" ) -var errBadChannel = errors.New("event: Subscribe argument does not have sendable channel type") - -// Feed implements one-to-many subscriptions where the carrier of events is a channel. -// Values sent to a Feed are delivered to all subscribed channels simultaneously. -// -// Feeds can only be used with a single type. The type is determined by the first Send or -// Subscribe operation. Subsequent calls to these methods panic if the type does not -// match. -// -// The zero value is ready to use. -type Feed struct { - once sync.Once // ensures that init only runs once - sendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases. - removeSub chan interface{} // interrupts Send - sendCases caseList // the active set of select cases used by Send - - // The inbox holds newly subscribed channels until they are added to sendCases. - mu sync.Mutex - inbox caseList - etype reflect.Type -} - -// This is the index of the first actual subscription channel in sendCases. -// sendCases[0] is a SelectRecv case for the removeSub channel. -const firstSubSendCase = 1 - -type feedTypeError struct { - got, want reflect.Type - op string -} - -func (e feedTypeError) Error() string { - return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String() -} - -func (f *Feed) init() { - f.removeSub = make(chan interface{}) - f.sendLock = make(chan struct{}, 1) - f.sendLock <- struct{}{} - f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}} -} - -// Subscribe adds a channel to the feed. Future sends will be delivered on the channel -// until the subscription is canceled. All channels added must have the same element type. -// -// The channel should have ample buffer space to avoid blocking other subscribers. -// Slow subscribers are not dropped. -func (f *Feed) Subscribe(channel interface{}) Subscription { - f.once.Do(f.init) - - chanval := reflect.ValueOf(channel) - chantyp := chanval.Type() - if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 { - panic(errBadChannel) - } - sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)} - - f.mu.Lock() - defer f.mu.Unlock() - if !f.typecheck(chantyp.Elem()) { - panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)}) - } - // Add the select case to the inbox. - // The next Send will add it to f.sendCases. - cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval} - f.inbox = append(f.inbox, cas) - return sub -} - -// note: callers must hold f.mu -func (f *Feed) typecheck(typ reflect.Type) bool { - if f.etype == nil { - f.etype = typ - return true - } - // In the event the feed's type is an actual interface, we - // perform an interface conformance check here. - if f.etype.Kind() == reflect.Interface && typ.Implements(f.etype) { - return true - } - return f.etype == typ -} - -func (f *Feed) remove(sub *feedSub) { - // Delete from inbox first, which covers channels - // that have not been added to f.sendCases yet. - ch := sub.channel.Interface() - f.mu.Lock() - index := f.inbox.find(ch) - if index != -1 { - f.inbox = f.inbox.delete(index) - f.mu.Unlock() - return - } - f.mu.Unlock() - - select { - case f.removeSub <- ch: - // Send will remove the channel from f.sendCases. - case <-f.sendLock: - // No Send is in progress, delete the channel now that we have the send lock. - f.sendCases = f.sendCases.delete(f.sendCases.find(ch)) - f.sendLock <- struct{}{} - } -} - -// Send delivers to all subscribed channels simultaneously. -// It returns the number of subscribers that the value was sent to. -func (f *Feed) Send(value interface{}) (nsent int) { - rvalue := reflect.ValueOf(value) - - f.once.Do(f.init) - <-f.sendLock - - // Add new cases from the inbox after taking the send lock. - f.mu.Lock() - f.sendCases = append(f.sendCases, f.inbox...) - f.inbox = nil - - if !f.typecheck(rvalue.Type()) { - f.sendLock <- struct{}{} - f.mu.Unlock() - panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) - } - f.mu.Unlock() - - // Set the sent value on all channels. - for i := firstSubSendCase; i < len(f.sendCases); i++ { - f.sendCases[i].Send = rvalue - } - - // Send until all channels except removeSub have been chosen. 'cases' tracks a prefix - // of sendCases. When a send succeeds, the corresponding case moves to the end of - // 'cases' and it shrinks by one element. - cases := f.sendCases - for { - // Fast path: try sending without blocking before adding to the select set. - // This should usually succeed if subscribers are fast enough and have free - // buffer space. - for i := firstSubSendCase; i < len(cases); i++ { - if cases[i].Chan.TrySend(rvalue) { - nsent++ - cases = cases.deactivate(i) - i-- - } - } - if len(cases) == firstSubSendCase { - break - } - // Select on all the receivers, waiting for them to unblock. - chosen, recv, _ := reflect.Select(cases) - if chosen == 0 /* <-f.removeSub */ { - index := f.sendCases.find(recv.Interface()) - f.sendCases = f.sendCases.delete(index) - if index >= 0 && index < len(cases) { - // Shrink 'cases' too because the removed case was still active. - cases = f.sendCases[:len(cases)-1] - } - } else { - cases = cases.deactivate(chosen) - nsent++ - } - } - - // Forget about the sent value and hand off the send lock. - for i := firstSubSendCase; i < len(f.sendCases); i++ { - f.sendCases[i].Send = reflect.Value{} - } - f.sendLock <- struct{}{} - return nsent -} - -type feedSub struct { - feed *Feed - channel reflect.Value - errOnce sync.Once - err chan error -} - -// Unsubscribe remove feed subscription. -func (sub *feedSub) Unsubscribe() { - sub.errOnce.Do(func() { - sub.feed.remove(sub) - close(sub.err) - }) -} - -// Err returns error channel. -func (sub *feedSub) Err() <-chan error { - return sub.err -} - -type caseList []reflect.SelectCase - -// find returns the index of a case containing the given channel. -func (cs caseList) find(channel interface{}) int { - return slices.IndexFunc(cs, func(selectCase reflect.SelectCase) bool { - return selectCase.Chan.Interface() == channel - }) -} - -// delete removes the given case from cs. -func (cs caseList) delete(index int) caseList { - return append(cs[:index], cs[index+1:]...) -} - -// deactivate moves the case at index into the non-accessible portion of the cs slice. -func (cs caseList) deactivate(index int) caseList { - last := len(cs) - 1 - cs[index], cs[last] = cs[last], cs[index] - return cs[:last] -} - -// func (cs caseList) String() string { -// s := "[" -// for i, cas := range cs { -// if i != 0 { -// s += ", " -// } -// switch cas.Dir { -// case reflect.SelectSend: -// s += fmt.Sprintf("%v<-", cas.Chan.Interface()) -// case reflect.SelectRecv: -// s += fmt.Sprintf("<-%v", cas.Chan.Interface()) -// } -// } -// return s + "]" -// } +// Feed is a re-export of the go-ethereum event feed. +type Feed = geth_event.Feed diff --git a/async/event/feed_test.go b/async/event/feed_test.go deleted file mode 100644 index 5ad594046b17..000000000000 --- a/async/event/feed_test.go +++ /dev/null @@ -1,509 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package event - -import ( - "fmt" - "reflect" - "sync" - "testing" - "time" - - "github.com/prysmaticlabs/prysm/v5/testing/assert" -) - -func TestFeedPanics(t *testing.T) { - { - var f Feed - f.Send(2) - want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(0)} - assert.NoError(t, checkPanic(want, func() { f.Send(uint64(2)) })) - // Validate it doesn't deadlock. - assert.NoError(t, checkPanic(want, func() { f.Send(uint64(2)) })) - } - { - var f Feed - ch := make(chan int) - f.Subscribe(ch) - want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(0)} - assert.NoError(t, checkPanic(want, func() { f.Send(uint64(2)) })) - } - { - var f Feed - f.Send(2) - want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))} - assert.NoError(t, checkPanic(want, func() { f.Subscribe(make(chan uint64)) })) - } - { - var f Feed - assert.NoError(t, checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) })) - } - { - var f Feed - assert.NoError(t, checkPanic(errBadChannel, func() { f.Subscribe(0) })) - } -} - -func checkPanic(want error, fn func()) (err error) { - defer func() { - panicResult := recover() - if panicResult == nil { - err = fmt.Errorf("didn't panic") - } else if !reflect.DeepEqual(panicResult, want) { - err = fmt.Errorf("panicked with wrong error: got %q, want %q", panicResult, want) - } - }() - fn() - return nil -} - -func TestFeed(t *testing.T) { - var feed Feed - var done, subscribed sync.WaitGroup - subscriber := func(i int) { - defer done.Done() - - subchan := make(chan int) - sub := feed.Subscribe(subchan) - timeout := time.NewTimer(2 * time.Second) - subscribed.Done() - - select { - case v := <-subchan: - if v != 1 { - t.Errorf("%d: received value %d, want 1", i, v) - } - case <-timeout.C: - t.Errorf("%d: receive timeout", i) - } - - sub.Unsubscribe() - select { - case _, ok := <-sub.Err(): - if ok { - t.Errorf("%d: error channel not closed after unsubscribe", i) - } - case <-timeout.C: - t.Errorf("%d: unsubscribe timeout", i) - } - } - - const n = 1000 - done.Add(n) - subscribed.Add(n) - for i := 0; i < n; i++ { - go subscriber(i) - } - subscribed.Wait() - if nsent := feed.Send(1); nsent != n { - t.Errorf("first send delivered %d times, want %d", nsent, n) - } - if nsent := feed.Send(2); nsent != 0 { - t.Errorf("second send delivered %d times, want 0", nsent) - } - done.Wait() -} - -func TestFeedSubscribeSameChannel(t *testing.T) { - var ( - feed Feed - done sync.WaitGroup - ch = make(chan int) - sub1 = feed.Subscribe(ch) - sub2 = feed.Subscribe(ch) - _ = feed.Subscribe(ch) - ) - expectSends := func(value, n int) { - if nsent := feed.Send(value); nsent != n { - t.Errorf("send delivered %d times, want %d", nsent, n) - } - done.Done() - } - expectRecv := func(wantValue, n int) { - for i := 0; i < n; i++ { - if v := <-ch; v != wantValue { - t.Errorf("received %d, want %d", v, wantValue) - } - } - } - - done.Add(1) - go expectSends(1, 3) - expectRecv(1, 3) - done.Wait() - - sub1.Unsubscribe() - - done.Add(1) - go expectSends(2, 2) - expectRecv(2, 2) - done.Wait() - - sub2.Unsubscribe() - - done.Add(1) - go expectSends(3, 1) - expectRecv(3, 1) - done.Wait() -} - -func TestFeedSubscribeBlockedPost(_ *testing.T) { - var ( - feed Feed - nsends = 2000 - ch1 = make(chan int) - ch2 = make(chan int) - wg sync.WaitGroup - ) - defer wg.Wait() - - feed.Subscribe(ch1) - wg.Add(nsends) - for i := 0; i < nsends; i++ { - go func() { - feed.Send(99) - wg.Done() - }() - } - - sub2 := feed.Subscribe(ch2) - defer sub2.Unsubscribe() - - // We're done when ch1 has received N times. - // The number of receives on ch2 depends on scheduling. - for i := 0; i < nsends; { - select { - case <-ch1: - i++ - case <-ch2: - } - } -} - -func TestFeedUnsubscribeBlockedPost(_ *testing.T) { - var ( - feed Feed - nsends = 200 - chans = make([]chan int, 2000) - subs = make([]Subscription, len(chans)) - bchan = make(chan int) - bsub = feed.Subscribe(bchan) - wg sync.WaitGroup - ) - for i := range chans { - chans[i] = make(chan int, nsends) - } - - // Queue up some Sends. None of these can make progress while bchan isn't read. - wg.Add(nsends) - for i := 0; i < nsends; i++ { - go func() { - feed.Send(99) - wg.Done() - }() - } - // Subscribe the other channels. - for i, ch := range chans { - subs[i] = feed.Subscribe(ch) - } - // Unsubscribe them again. - for _, sub := range subs { - sub.Unsubscribe() - } - // Unblock the Sends. - bsub.Unsubscribe() - wg.Wait() -} - -// Checks that unsubscribing a channel during Send works even if that -// channel has already been sent on. -func TestFeedUnsubscribeSentChan(_ *testing.T) { - var ( - feed Feed - ch1 = make(chan int) - ch2 = make(chan int) - sub1 = feed.Subscribe(ch1) - sub2 = feed.Subscribe(ch2) - wg sync.WaitGroup - ) - defer sub2.Unsubscribe() - - wg.Add(1) - go func() { - feed.Send(0) - wg.Done() - }() - - // Wait for the value on ch1. - <-ch1 - // Unsubscribe ch1, removing it from the send cases. - sub1.Unsubscribe() - - // Receive ch2, finishing Send. - <-ch2 - wg.Wait() - - // Send again. This should send to ch2 only, so the wait group will unblock - // as soon as a value is received on ch2. - wg.Add(1) - go func() { - feed.Send(0) - wg.Done() - }() - <-ch2 - wg.Wait() -} - -func TestFeedUnsubscribeFromInbox(t *testing.T) { - var ( - feed Feed - ch1 = make(chan int) - ch2 = make(chan int) - sub1 = feed.Subscribe(ch1) - sub2 = feed.Subscribe(ch1) - sub3 = feed.Subscribe(ch2) - ) - assert.Equal(t, 3, len(feed.inbox)) - assert.Equal(t, 1, len(feed.sendCases), "sendCases is non-empty after unsubscribe") - - sub1.Unsubscribe() - sub2.Unsubscribe() - sub3.Unsubscribe() - assert.Equal(t, 0, len(feed.inbox), "Inbox is non-empty after unsubscribe") - assert.Equal(t, 1, len(feed.sendCases), "sendCases is non-empty after unsubscribe") -} - -func BenchmarkFeedSend1000(b *testing.B) { - var ( - done sync.WaitGroup - feed Feed - nsubs = 1000 - ) - subscriber := func(ch <-chan int) { - for i := 0; i < b.N; i++ { - <-ch - } - done.Done() - } - done.Add(nsubs) - for i := 0; i < nsubs; i++ { - ch := make(chan int, 200) - feed.Subscribe(ch) - go subscriber(ch) - } - - // The actual benchmark. - b.ResetTimer() - for i := 0; i < b.N; i++ { - if feed.Send(i) != nsubs { - panic("wrong number of sends") - } - } - - b.StopTimer() - done.Wait() -} - -func TestFeed_Send(t *testing.T) { - tests := []struct { - name string - evFeed *Feed - testSetup func(fd *Feed, t *testing.T, o interface{}) - obj interface{} - expectPanic bool - }{ - { - name: "normal struct", - evFeed: new(Feed), - testSetup: func(fd *Feed, t *testing.T, o interface{}) { - testChan := make(chan testFeedWithPointer, 1) - fd.Subscribe(testChan) - }, - obj: testFeedWithPointer{ - a: new(uint64), - b: new(string), - }, - expectPanic: false, - }, - { - name: "un-implemented interface", - evFeed: new(Feed), - testSetup: func(fd *Feed, t *testing.T, o interface{}) { - testChan := make(chan testFeedIface, 1) - fd.Subscribe(testChan) - }, - obj: testFeedWithPointer{ - a: new(uint64), - b: new(string), - }, - expectPanic: true, - }, - { - name: "semi-implemented interface", - evFeed: new(Feed), - testSetup: func(fd *Feed, t *testing.T, o interface{}) { - testChan := make(chan testFeedIface, 1) - fd.Subscribe(testChan) - }, - obj: testFeed2{ - a: 0, - b: "", - c: []byte{'A'}, - }, - expectPanic: true, - }, - { - name: "fully-implemented interface", - evFeed: new(Feed), - testSetup: func(fd *Feed, t *testing.T, o interface{}) { - testChan := make(chan testFeedIface) - // Make it unbuffered to allow message to - // pass through - go func() { - a := <-testChan - if !reflect.DeepEqual(a, o) { - t.Errorf("Got = %v, want = %v", a, o) - } - }() - fd.Subscribe(testChan) - }, - obj: testFeed{ - a: 0, - b: "", - }, - expectPanic: false, - }, - { - name: "fully-implemented interface with additional methods", - evFeed: new(Feed), - testSetup: func(fd *Feed, t *testing.T, o interface{}) { - testChan := make(chan testFeedIface) - // Make it unbuffered to allow message to - // pass through - go func() { - a := <-testChan - if !reflect.DeepEqual(a, o) { - t.Errorf("Got = %v, want = %v", a, o) - } - }() - fd.Subscribe(testChan) - }, - obj: testFeed3{ - a: 0, - b: "", - c: []byte{'A'}, - d: []byte{'B'}, - }, - expectPanic: false, - }, - { - name: "concrete types implementing the same interface", - evFeed: new(Feed), - testSetup: func(fd *Feed, t *testing.T, o interface{}) { - testChan := make(chan testFeed, 1) - // Make it unbuffered to allow message to - // pass through - go func() { - a := <-testChan - if !reflect.DeepEqual(a, o) { - t.Errorf("Got = %v, want = %v", a, o) - } - }() - fd.Subscribe(testChan) - }, - obj: testFeed3{ - a: 0, - b: "", - c: []byte{'A'}, - d: []byte{'B'}, - }, - expectPanic: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - defer func() { - if r := recover(); r != nil { - if !tt.expectPanic { - t.Errorf("panic triggered when unexpected: %v", r) - } - } else { - if tt.expectPanic { - t.Error("panic not triggered when expected") - } - } - }() - tt.testSetup(tt.evFeed, t, tt.obj) - if gotNsent := tt.evFeed.Send(tt.obj); gotNsent != 1 { - t.Errorf("Send() = %v, want %v", gotNsent, 1) - } - }) - } -} - -// The following objects below are a collection of different -// struct types to test with. -type testFeed struct { - a uint64 - b string -} - -func (testFeed) method1() { - -} - -func (testFeed) method2() { - -} - -type testFeedWithPointer struct { - a *uint64 - b *string -} - -type testFeed2 struct { - a uint64 - b string - c []byte -} - -func (testFeed2) method1() { - -} - -type testFeed3 struct { - a uint64 - b string - c, d []byte -} - -func (testFeed3) method1() { - -} - -func (testFeed3) method2() { - -} - -func (testFeed3) method3() { - -} - -type testFeedIface interface { - method1() - method2() -}