Skip to content

Commit

Permalink
Change queue message id to big integer in Cas (#3149)
Browse files Browse the repository at this point in the history
* Change queue message id to big integer
  • Loading branch information
yux0 committed Mar 31, 2020
1 parent daa8998 commit a3e60fa
Show file tree
Hide file tree
Showing 22 changed files with 237 additions and 216 deletions.
16 changes: 8 additions & 8 deletions common/domain/dlqMessageHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (
type (
// DLQMessageHandler is the interface handles domain DLQ messages
DLQMessageHandler interface {
Read(lastMessageID int, pageSize int, pageToken []byte) ([]*replicator.ReplicationTask, []byte, error)
Purge(lastMessageID int) error
Merge(lastMessageID int, pageSize int, pageToken []byte) ([]byte, error)
Read(lastMessageID int64, pageSize int, pageToken []byte) ([]*replicator.ReplicationTask, []byte, error)
Purge(lastMessageID int64) error
Merge(lastMessageID int64, pageSize int, pageToken []byte) ([]byte, error)
}

dlqMessageHandlerImpl struct {
Expand All @@ -60,7 +60,7 @@ func NewDLQMessageHandler(

// ReadMessages reads domain replication DLQ messages
func (d *dlqMessageHandlerImpl) Read(
lastMessageID int,
lastMessageID int64,
pageSize int,
pageToken []byte,
) ([]*replicator.ReplicationTask, []byte, error) {
Expand All @@ -80,7 +80,7 @@ func (d *dlqMessageHandlerImpl) Read(

// PurgeMessages purges domain replication DLQ messages
func (d *dlqMessageHandlerImpl) Purge(
lastMessageID int,
lastMessageID int64,
) error {

ackLevel, err := d.domainReplicationQueue.GetDLQAckLevel()
Expand All @@ -106,7 +106,7 @@ func (d *dlqMessageHandlerImpl) Purge(

// MergeMessages merges domain replication DLQ messages
func (d *dlqMessageHandlerImpl) Merge(
lastMessageID int,
lastMessageID int64,
pageSize int,
pageToken []byte,
) ([]byte, error) {
Expand All @@ -126,7 +126,7 @@ func (d *dlqMessageHandlerImpl) Merge(
return nil, err
}

var ackedMessageID int
var ackedMessageID int64
for _, message := range messages {
domainTask := message.GetDomainTaskAttributes()
if domainTask == nil {
Expand All @@ -138,7 +138,7 @@ func (d *dlqMessageHandlerImpl) Merge(
); err != nil {
return nil, err
}
ackedMessageID = int(*message.SourceTaskId)
ackedMessageID = *message.SourceTaskId
}

if err := d.domainReplicationQueue.RangeDeleteMessagesFromDLQ(
Expand Down
6 changes: 3 additions & 3 deletions common/domain/dlqMessageHandler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 37 additions & 37 deletions common/domain/dlqMessageHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func (s *dlqMessageHandlerSuite) TearDownTest() {
}

func (s *dlqMessageHandlerSuite) TestReadMessages() {
ackLevel := 10
lastMessageID := 20
ackLevel := int64(10)
lastMessageID := int64(20)
pageSize := 100
pageToken := []byte{}

Expand All @@ -105,7 +105,7 @@ func (s *dlqMessageHandlerSuite) TestReadMessages() {
}

func (s *dlqMessageHandlerSuite) TestReadMessages_ThrowErrorOnGetDLQAckLevel() {
lastMessageID := 20
lastMessageID := int64(20)
pageSize := 100
pageToken := []byte{}

Expand All @@ -116,7 +116,7 @@ func (s *dlqMessageHandlerSuite) TestReadMessages_ThrowErrorOnGetDLQAckLevel() {
},
}
testError := fmt.Errorf("test")
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(-1, testError).Times(1)
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(int64(-1), testError).Times(1)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(tasks, nil, nil).Times(0)

Expand All @@ -126,8 +126,8 @@ func (s *dlqMessageHandlerSuite) TestReadMessages_ThrowErrorOnGetDLQAckLevel() {
}

func (s *dlqMessageHandlerSuite) TestReadMessages_ThrowErrorOnReadMessages() {
ackLevel := 10
lastMessageID := 20
ackLevel := int64(10)
lastMessageID := int64(20)
pageSize := 100
pageToken := []byte{}

Expand All @@ -142,8 +142,8 @@ func (s *dlqMessageHandlerSuite) TestReadMessages_ThrowErrorOnReadMessages() {
}

func (s *dlqMessageHandlerSuite) TestPurgeMessages() {
ackLevel := 10
lastMessageID := 20
ackLevel := int64(10)
lastMessageID := int64(20)

s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(ackLevel, nil).Times(1)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(ackLevel, lastMessageID).Return(nil).Times(1)
Expand All @@ -154,10 +154,10 @@ func (s *dlqMessageHandlerSuite) TestPurgeMessages() {
}

func (s *dlqMessageHandlerSuite) TestPurgeMessages_ThrowErrorOnGetDLQAckLevel() {
lastMessageID := 20
lastMessageID := int64(20)
testError := fmt.Errorf("test")

s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(-1, testError).Times(1)
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(int64(-1), testError).Times(1)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), gomock.Any()).Return(nil).Times(0)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any()).Times(0)
err := s.dlqMessageHandler.Purge(lastMessageID)
Expand All @@ -166,8 +166,8 @@ func (s *dlqMessageHandlerSuite) TestPurgeMessages_ThrowErrorOnGetDLQAckLevel()
}

func (s *dlqMessageHandlerSuite) TestPurgeMessages_ThrowErrorOnPurgeMessages() {
ackLevel := 10
lastMessageID := 20
ackLevel := int64(10)
lastMessageID := int64(20)
testError := fmt.Errorf("test")

s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(ackLevel, nil).Times(1)
Expand All @@ -179,11 +179,11 @@ func (s *dlqMessageHandlerSuite) TestPurgeMessages_ThrowErrorOnPurgeMessages() {
}

func (s *dlqMessageHandlerSuite) TestMergeMessages() {
ackLevel := 10
lastMessageID := 20
ackLevel := int64(10)
lastMessageID := int64(20)
pageSize := 100
pageToken := []byte{}
messageID := 11
messageID := int64(11)

domainAttribute := &replicator.DomainTaskAttributes{
ID: common.StringPtr(uuid.New()),
Expand All @@ -192,7 +192,7 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages() {
tasks := []*replicator.ReplicationTask{
{
TaskType: replicator.ReplicationTaskTypeDomain.Ptr(),
SourceTaskId: common.Int64Ptr(int64(messageID)),
SourceTaskId: common.Int64Ptr(messageID),
DomainTaskAttributes: domainAttribute,
},
}
Expand All @@ -209,10 +209,10 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages() {
}

func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnGetDLQAckLevel() {
lastMessageID := 20
lastMessageID := int64(20)
pageSize := 100
pageToken := []byte{}
messageID := 11
messageID := int64(11)
testError := fmt.Errorf("test")
domainAttribute := &replicator.DomainTaskAttributes{
ID: common.StringPtr(uuid.New()),
Expand All @@ -225,7 +225,7 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnGetDLQAckLevel()
DomainTaskAttributes: domainAttribute,
},
}
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(-1, testError).Times(1)
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(int64(-1), testError).Times(1)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(tasks, nil, nil).Times(0)
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any()).Times(0)
Expand All @@ -238,8 +238,8 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnGetDLQAckLevel()
}

func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnGetDLQMessages() {
ackLevel := 10
lastMessageID := 20
ackLevel := int64(10)
lastMessageID := int64(20)
pageSize := 100
pageToken := []byte{}
testError := fmt.Errorf("test")
Expand All @@ -257,12 +257,12 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnGetDLQMessages()
}

func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnHandleReceivingTask() {
ackLevel := 10
lastMessageID := 20
ackLevel := int64(10)
lastMessageID := int64(20)
pageSize := 100
pageToken := []byte{}
messageID1 := 11
messageID2 := 12
messageID1 := int64(11)
messageID2 := int64(12)
testError := fmt.Errorf("test")
domainAttribute1 := &replicator.DomainTaskAttributes{
ID: common.StringPtr(uuid.New()),
Expand All @@ -273,12 +273,12 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnHandleReceivingTa
tasks := []*replicator.ReplicationTask{
{
TaskType: replicator.ReplicationTaskTypeDomain.Ptr(),
SourceTaskId: common.Int64Ptr(int64(messageID1)),
SourceTaskId: common.Int64Ptr(messageID1),
DomainTaskAttributes: domainAttribute1,
},
{
TaskType: replicator.ReplicationTaskTypeDomain.Ptr(),
SourceTaskId: common.Int64Ptr(int64(messageID2)),
SourceTaskId: common.Int64Ptr(messageID2),
DomainTaskAttributes: domainAttribute2,
},
}
Expand All @@ -297,12 +297,12 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnHandleReceivingTa
}

func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnDeleteMessages() {
ackLevel := 10
lastMessageID := 20
ackLevel := int64(10)
lastMessageID := int64(20)
pageSize := 100
pageToken := []byte{}
messageID1 := 11
messageID2 := 12
messageID1 := int64(11)
messageID2 := int64(12)
testError := fmt.Errorf("test")
domainAttribute1 := &replicator.DomainTaskAttributes{
ID: common.StringPtr(uuid.New()),
Expand All @@ -313,12 +313,12 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnDeleteMessages()
tasks := []*replicator.ReplicationTask{
{
TaskType: replicator.ReplicationTaskTypeDomain.Ptr(),
SourceTaskId: common.Int64Ptr(int64(messageID1)),
SourceTaskId: common.Int64Ptr(messageID1),
DomainTaskAttributes: domainAttribute1,
},
{
TaskType: replicator.ReplicationTaskTypeDomain.Ptr(),
SourceTaskId: common.Int64Ptr(int64(messageID2)),
SourceTaskId: common.Int64Ptr(messageID2),
DomainTaskAttributes: domainAttribute2,
},
}
Expand All @@ -336,11 +336,11 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnDeleteMessages()
}

func (s *dlqMessageHandlerSuite) TestMergeMessages_IgnoreErrorOnUpdateDLQAckLevel() {
ackLevel := 10
lastMessageID := 20
ackLevel := int64(10)
lastMessageID := int64(20)
pageSize := 100
pageToken := []byte{}
messageID := 11
messageID := int64(11)
testError := fmt.Errorf("test")
domainAttribute := &replicator.DomainTaskAttributes{
ID: common.StringPtr(uuid.New()),
Expand All @@ -349,7 +349,7 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_IgnoreErrorOnUpdateDLQAckLeve
tasks := []*replicator.ReplicationTask{
{
TaskType: replicator.ReplicationTaskTypeDomain.Ptr(),
SourceTaskId: common.Int64Ptr(int64(messageID)),
SourceTaskId: common.Int64Ptr(messageID),
DomainTaskAttributes: domainAttribute,
},
}
Expand Down
Loading

0 comments on commit a3e60fa

Please sign in to comment.