Skip to content

Commit a2ecabc

Browse files
GsantomaggioZerpet
andcommitted
Notify forced disconnection (#62)
* Notify socket connection closed * Check when the socket is disconnected in an unexpected way * Add 2e2 test for the disconnection * add github.com/michaelklishin/rabbit-hole/v2 lib for testing --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Signed-off-by: Aitor Pérez Cedres <acedres@vmware.com> Co-authored-by: Aitor Pérez Cedres <acedres@vmware.com>
1 parent e43229f commit a2ecabc

File tree

12 files changed

+359
-35
lines changed

12 files changed

+359
-35
lines changed

.github/workflows/build_test_linux.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ jobs:
3232
runs-on: ubuntu-latest
3333
services:
3434
rabbitmq:
35-
image: "rabbitmq:3"
35+
image: "rabbitmq:3-management"
3636
ports:
3737
- 5552
3838
- 5672
39+
- 15672:15672
3940
options: >-
4041
--health-cmd "rabbitmq-diagnostics check_running"
4142
--health-interval 10s
@@ -54,4 +55,4 @@ jobs:
5455
- name: E2E Tests
5556
env:
5657
RABBITMQ_CONTAINER_NAME: ${{ job.services.rabbitmq.id }}
57-
run: make e2e_tests RMQ_E2E_SKIP_CONTAINER_START="please" RABBITMQ_URI="rabbitmq-stream://guest:guest@localhost:${{ job.services.rabbitmq.ports[5552] }}/%2F"
58+
run: make e2e_tests RMQ_E2E_SKIP_CONTAINER_START="please" RABBITMQ_URI="rabbitmq-stream://guest:guest@localhost:${{ job.services.rabbitmq.ports[5552] }}/%2F" CI="actions"

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
1515
github.com/google/go-cmp v0.5.9 // indirect
1616
github.com/google/pprof v0.0.0-20230502171905-255e3b9b56de // indirect
17+
github.com/michaelklishin/rabbit-hole/v2 v2.13.0 // indirect
1718
golang.org/x/net v0.9.0 // indirect
1819
golang.org/x/sys v0.8.0 // indirect
1920
golang.org/x/text v0.9.0 // indirect

go.sum

Lines changed: 157 additions & 0 deletions
Large diffs are not rendered by default.

internal/peer_properties.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func NewPeerPropertiesRequest(connectionName string) *PeerPropertiesRequest {
2020
p.clientProperties["copyright"] = "Copyright (c) 2023 VMware, Inc. or its affiliates."
2121
p.clientProperties["information"] = "Licensed under the MPL 2.0. See https://www.rabbitmq.com/"
2222
//c.clientProperties.items["version"] = ClientVersion
23-
p.clientProperties["platform"] = "Golang V2"
23+
p.clientProperties["platform"] = "Golang"
2424
return p
2525
}
2626

main.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ func main() {
3434
}
3535
log.Info("connection status", "open", streamClient.IsOpen())
3636

37+
var closeChan = streamClient.NotifyConnectionClosed()
38+
go func() {
39+
for c := range closeChan {
40+
log.Info("connection closed", "reason", c, "isOpen", streamClient.IsOpen())
41+
}
42+
}()
43+
3744
err = streamClient.DeclareStream(ctx, stream, map[string]string{"name": "test-stream"})
3845
if err != nil && err.Error() != "stream already exists" {
3946
log.Error("error in declaring stream", "error", err)
@@ -50,8 +57,6 @@ func main() {
5057
const batchSize = 100
5158
const iterations = 2
5259
const totalMessages = iterations * batchSize
53-
// PublishingId 555 --> [] messages 551, 552, 553, 554, 555
54-
// ONLY 555
5560
publishChan := streamClient.NotifyPublish(make(chan *raw.PublishConfirm, 100))
5661
go func() {
5762
var confirmed int

pkg/constants/types.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,10 @@ const (
6767
CompressionNone uint8 = 0x00
6868
CompressionGzip uint8 = 0x01
6969
)
70+
71+
// Connections states
72+
const (
73+
ConnectionClosed = 0x01
74+
ConnectionOpen = 0x02
75+
ConnectionClosing = 0x03
76+
)

pkg/e2e/e2e_suite_test.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -72,23 +72,26 @@ var _ = SynchronizedBeforeSuite(func(ctx SpecContext) {
7272
containerName = defaultContainerName
7373
}
7474

75-
pluginsCmd := exec.Command(
76-
"docker",
77-
"exec",
78-
"--user=rabbitmq",
79-
containerName,
80-
"rabbitmq-plugins",
81-
"enable",
82-
"rabbitmq_stream",
83-
"rabbitmq_stream_management",
84-
"rabbitmq_management",
85-
)
86-
session, err := gexec.Start(pluginsCmd, GinkgoWriter, GinkgoWriter)
87-
Expect(err).ToNot(HaveOccurred())
88-
Eventually(session).
89-
WithTimeout(time.Second*15).
90-
WithPolling(time.Second).
91-
Should(gexec.Exit(0), "expected to enable stream plugin")
75+
if isCi := os.Getenv("CI"); isCi != "" {
76+
pluginsCmd := exec.Command(
77+
"docker",
78+
"exec",
79+
"--user=rabbitmq",
80+
containerName,
81+
"rabbitmq-plugins",
82+
"enable",
83+
"rabbitmq_stream",
84+
"rabbitmq_stream_management",
85+
"rabbitmq_management",
86+
)
87+
88+
session, err := gexec.Start(pluginsCmd, GinkgoWriter, GinkgoWriter)
89+
Expect(err).ToNot(HaveOccurred())
90+
Eventually(session).
91+
WithTimeout(time.Second*15).
92+
WithPolling(time.Second).
93+
Should(gexec.Exit(0), "expected to enable stream plugin")
94+
}
9295

9396
if rabbitDebugLog {
9497
debugCmd := exec.Command(

pkg/e2e/end_to_end_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,44 @@ var _ = Describe("E2E", Serial, Label("e2e"), func() {
217217
Expect(streamClient.Close(itCtx)).To(Succeed())
218218
}, SpecTimeout(15*time.Second))
219219

220+
// Test the notification in case of force disconnection.
221+
// The disconnection is based on the connection name.
222+
// With the HTTP API, we can check the connection name and kill it.
223+
// The client has to notify the disconnection.
224+
It("connection name and notify disconnection", Label("behaviour"), func(ctx SpecContext) {
225+
h := slog.HandlerOptions{Level: slog.LevelDebug}.NewTextHandler(GinkgoWriter)
226+
debugLogger := slog.New(h)
227+
itCtx := raw.NewContextWithLogger(ctx, *debugLogger)
228+
streamClientConfiguration, err := raw.NewClientConfiguration(rabbitmqUri)
229+
Expect(err).ToNot(HaveOccurred())
230+
connectionName := "notify-disconnection-test-1"
231+
streamClientConfiguration.SetConnectionName(connectionName)
232+
233+
By("preparing the environment")
234+
streamClient, err := raw.DialConfig(itCtx, streamClientConfiguration)
235+
Expect(err).ToNot(HaveOccurred())
236+
// Force close the connection is done with the HTTP API and based on the connection name.
237+
httpUtils := NewHTTPUtils()
238+
Eventually(func() string {
239+
name, _ := httpUtils.GetConnectionByConnectionName(connectionName)
240+
return name
241+
}, 10*time.Second).WithPolling(1*time.Second).Should(Equal(connectionName),
242+
"expected connection to be present")
243+
244+
c := streamClient.NotifyConnectionClosed()
245+
By("Forcing closing the connection")
246+
errClose := httpUtils.ForceCloseConnectionByConnectionName(connectionName)
247+
Eventually(errClose).WithTimeout(time.Second * 10).WithPolling(time.Second).ShouldNot(HaveOccurred())
248+
249+
// the channel should be open and the notification should be received
250+
select {
251+
case notify, ok := <-c:
252+
Expect(ok).To(BeTrue(), "expected the channel to be open")
253+
Expect(notify).To(Equal(raw.ErrConnectionClosed))
254+
case <-itCtx.Done():
255+
Fail("expected to receive a closed notification")
256+
}
257+
}, SpecTimeout(20*time.Second))
220258
})
221259

222260
func wrap[T any](v T) []T {

pkg/e2e/http_utils_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package e2e_test
2+
3+
import (
4+
"fmt"
5+
hole "github.com/michaelklishin/rabbit-hole/v2"
6+
)
7+
8+
type HTTPUtils struct {
9+
HttpClient *hole.Client
10+
}
11+
12+
func NewHTTPUtils() *HTTPUtils {
13+
client, err := hole.NewClient("http://localhost:15672", "guest", "guest")
14+
if err != nil {
15+
return nil
16+
}
17+
return &HTTPUtils{
18+
HttpClient: client,
19+
}
20+
}
21+
22+
func (h *HTTPUtils) getConnectionInfoByName(connectionName string) (*hole.ConnectionInfo, error) {
23+
connections, err := h.HttpClient.ListConnections()
24+
if err != nil {
25+
return nil, err
26+
}
27+
for i := range connections {
28+
if connections[i].ClientProperties["connection_name"] != nil &&
29+
connections[i].ClientProperties["connection_name"] == connectionName {
30+
return &connections[i], nil
31+
}
32+
}
33+
return nil, fmt.Errorf("connection not found %s", connectionName)
34+
}
35+
36+
func (h *HTTPUtils) GetConnectionByConnectionName(connectionName string) (string, error) {
37+
connection, err := h.getConnectionInfoByName(connectionName)
38+
if err != nil {
39+
return "", err
40+
}
41+
return connection.ClientProperties["connection_name"].(string), nil
42+
}
43+
44+
func (h *HTTPUtils) ForceCloseConnectionByConnectionName(connectionName string) error {
45+
46+
connection, err := h.getConnectionInfoByName(connectionName)
47+
if err != nil {
48+
return err
49+
}
50+
_, err = h.HttpClient.CloseConnection(connection.Name)
51+
return err
52+
}

pkg/raw/client.go

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ type Client struct {
4545
mu sync.Mutex
4646
// this channel is used for correlation-less incoming frames from the server
4747
frameBodyListener chan internal.SyncCommandRead
48-
isOpen bool
4948
// this function is used during shutdown to stop the background loop that reads from the network
5049
ioLoopCancelFn context.CancelFunc
5150
connection *internal.Connection
@@ -57,16 +56,23 @@ type Client struct {
5756
publishErrorCh chan *PublishError
5857
chunkCh chan *Chunk
5958
notifyCh chan *CreditError
60-
metadataUpdateCh chan *MetadataUpdate
61-
consumerUpdateCh chan *ConsumerUpdate
59+
// see constants states of the connection
60+
// we need different states to handle the case where the connection is closed by the server
61+
// Open/ConnectionClosing/Closed
62+
// ConnectionClosing is used to set the connection status no longer open, but still waiting for the server to close the connection
63+
connectionStatus uint8
64+
// socketClosedCh is used to notify the client that the socket has been closed
65+
socketClosedCh chan error
66+
metadataUpdateCh chan *MetadataUpdate
67+
consumerUpdateCh chan *ConsumerUpdate
6268
}
6369

6470
// IsOpen returns true if the connection is open, false otherwise
6571
// IsOpen is thread-safe
6672
func (tc *Client) IsOpen() bool {
6773
tc.mu.Lock()
6874
defer tc.mu.Unlock()
69-
return tc.isOpen
75+
return tc.connectionStatus == constants.ConnectionOpen
7076
}
7177

7278
// NewClient returns a common.Clienter implementation to interact with RabbitMQ stream using low level primitives.
@@ -76,7 +82,7 @@ func NewClient(connection net.Conn, configuration *ClientConfiguration) Clienter
7682
rawClient := &Client{
7783
frameBodyListener: make(chan internal.SyncCommandRead),
7884
connection: internal.NewConnection(connection),
79-
isOpen: false,
85+
connectionStatus: constants.ConnectionClosed,
8086
correlationsMap: sync.Map{},
8187
configuration: configuration,
8288
}
@@ -222,6 +228,24 @@ func (tc *Client) handleIncoming(ctx context.Context) error {
222228
if errors.Is(err, io.ErrClosedPipe) {
223229
return nil
224230
}
231+
232+
if errors.Is(err, io.EOF) {
233+
// EOF is returned when the connection is closed
234+
if tc.socketClosedCh != nil {
235+
if tc.IsOpen() {
236+
tc.socketClosedCh <- ErrConnectionClosed
237+
}
238+
// the TCP connection here is closed
239+
// we close the channel since we don't need to send more than one message
240+
close(tc.socketClosedCh)
241+
tc.socketClosedCh = nil
242+
}
243+
// set the shutdown flag to false since we don't want to close the connection
244+
// since it's already closed
245+
_ = tc.shutdown(false)
246+
return nil
247+
}
248+
225249
if err != nil {
226250
// TODO: some errors may be recoverable. We only need to return if reconnection
227251
// is needed
@@ -278,7 +302,7 @@ func (tc *Client) handleIncoming(ctx context.Context) error {
278302
// we do not return here because we must execute the shutdown process and close the socket
279303
}
280304

281-
err = tc.shutdown()
305+
err = tc.shutdown(true)
282306
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
283307
return err
284308
}
@@ -594,21 +618,34 @@ func (tc *Client) open(ctx context.Context, brokerIndex int) error {
594618
return streamErrorOrNil(openResp.ResponseCode())
595619
}
596620

597-
func (tc *Client) shutdown() error {
621+
func (tc *Client) shutdown(closeConnection bool) error {
598622
tc.mu.Lock()
599623
defer tc.mu.Unlock()
600-
tc.isOpen = false
624+
// The method can be called by the Close() method or by the
625+
// connection error handler, see: handleIncoming EOF error
626+
// the shutdown method has to be idempotent since it can be called multiple times
627+
// In case of unexpected connection error, the shutdown is called just once
628+
tc.connectionStatus = constants.ConnectionClosed
601629
tc.ioLoopCancelFn()
602630

603631
if tc.confirmsCh != nil {
604632
close(tc.confirmsCh)
633+
tc.confirmsCh = nil
605634
}
606635

607636
if tc.chunkCh != nil {
608637
close(tc.chunkCh)
638+
tc.chunkCh = nil
609639
}
610640

611-
return tc.connection.Close()
641+
// if the caller is handleIncoming EOF error closeConnection is false,
642+
// The connection is already closed
643+
// So we don't need to close it again
644+
645+
if closeConnection {
646+
return tc.connection.Close()
647+
}
648+
return nil
612649
}
613650

614651
func (tc *Client) handleClose(ctx context.Context, req *internal.CloseRequest) error {
@@ -863,7 +900,7 @@ func (tc *Client) Connect(ctx context.Context) error {
863900
}
864901

865902
tc.mu.Lock()
866-
tc.isOpen = true
903+
tc.connectionStatus = constants.ConnectionOpen
867904
defer tc.mu.Unlock()
868905
log.Info("connection is open")
869906

@@ -1113,6 +1150,8 @@ func (tc *Client) Close(ctx context.Context) error {
11131150
return errNilContext
11141151
}
11151152

1153+
tc.connectionStatus = constants.ConnectionClosing
1154+
11161155
log := loggerFromCtxOrDiscard(ctx).WithGroup("close")
11171156
log.Debug("starting connection close")
11181157

@@ -1127,7 +1166,7 @@ func (tc *Client) Close(ctx context.Context) error {
11271166
log.Error("close response code is not OK", "error", err)
11281167
}
11291168

1130-
err = tc.shutdown()
1169+
err = tc.shutdown(true)
11311170
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
11321171
return err
11331172
}
@@ -1238,6 +1277,20 @@ func (tc *Client) NotifyChunk(c chan *Chunk) <-chan *Chunk {
12381277
return c
12391278
}
12401279

1280+
// NotifyConnectionClosed receives notifications about connection closed events.
1281+
// It is raised only once when the connection is closed in unexpected way.
1282+
// Connection gracefully closed by the client will not raise this event.
1283+
func (tc *Client) NotifyConnectionClosed() <-chan error {
1284+
tc.mu.Lock()
1285+
defer tc.mu.Unlock()
1286+
// The user can't decide the size of the channel, so we use a buffer of 1.
1287+
// NotifyConnectionClosed is one shot notification, so we don't need a buffer.
1288+
// buffer greater than 1 cloud cause a deadlock since the channel is closed after the first notification.
1289+
c := make(chan error, 1)
1290+
tc.socketClosedCh = c
1291+
return c
1292+
}
1293+
12411294
// NotifyCreditError TODO: go docs
12421295
func (tc *Client) NotifyCreditError(notification chan *CreditError) <-chan *CreditError {
12431296
tc.mu.Lock()

0 commit comments

Comments
 (0)