Skip to content

Commit dc3c53f

Browse files
committed
Add GetNextPublishSeqNo for channel in confirm mode
1 parent e6b33f4 commit dc3c53f

File tree

3 files changed

+51
-0
lines changed

3 files changed

+51
-0
lines changed

channel.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,6 +1290,15 @@ func (ch *Channel) ExchangeUnbind(destination, key, source string, noWait bool,
12901290
)
12911291
}
12921292

1293+
// GetNextPublishSeqNo returns the sequence number of the next message to be
1294+
// published, when in confirm mode.
1295+
func (ch *Channel) GetNextPublishSeqNo() uint64 {
1296+
ch.confirms.Lock()
1297+
defer ch.confirms.Unlock()
1298+
1299+
return ch.confirms.published + 1
1300+
}
1301+
12931302
/*
12941303
Publish sends a Publishing from the client to an exchange on the server.
12951304

confirms.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,13 @@ func (c *confirms) Close() error {
9292
c.listeners = nil
9393
return nil
9494
}
95+
96+
// Lock acquire the lock on confirms.
97+
func (c *confirms) Lock() {
98+
c.m.Lock()
99+
}
100+
101+
// Unlock release the locks on confirms.
102+
func (c *confirms) Unlock() {
103+
c.m.Unlock()
104+
}

integration_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,6 +1160,38 @@ func TestIntegrationCancel(t *testing.T) {
11601160
}
11611161
}
11621162

1163+
func TestIntegrationGetNextPublishSeqNo(t *testing.T) {
1164+
if c := integrationConnection(t, "GetNextPublishSeqNo"); c != nil {
1165+
defer c.Close()
1166+
1167+
ch, err := c.Channel()
1168+
if err != nil {
1169+
t.Fatalf("channel: %v", err)
1170+
}
1171+
1172+
if err = ch.Confirm(false); err != nil {
1173+
t.Fatalf("could not confirm")
1174+
}
1175+
1176+
ex := "test-get-next-pub"
1177+
if err = ch.ExchangeDeclare(ex, "direct", false, false, false, false, nil); err != nil {
1178+
t.Fatalf("cannot declare %v: got: %v", ex, err)
1179+
}
1180+
1181+
n := ch.GetNextPublishSeqNo()
1182+
if n != 1 {
1183+
t.Errorf("wrong next publish seqence number before any publish, expected: %d, got: %d", 1, n)
1184+
}
1185+
1186+
ch.Publish("test-get-next-pub-seq", "", false, false, Publishing{})
1187+
1188+
n = ch.GetNextPublishSeqNo()
1189+
if n != 2 {
1190+
t.Errorf("wrong next publish seqence number after 1 publishing, expected: %d, got: %d", 2, n)
1191+
}
1192+
}
1193+
}
1194+
11631195
func TestIntegrationConfirm(t *testing.T) {
11641196
if c, ch := integrationQueue(t, "confirm"); c != nil {
11651197
defer c.Close()

0 commit comments

Comments
 (0)