Skip to content

Commit

Permalink
EnqueueMessage and ReadMessage methods for QueueV2 SQL Implementation (
Browse files Browse the repository at this point in the history
…temporalio#4937)

<!-- Describe what has changed in this PR -->
**What changed?**
Adding EnqueueMessage() and ReadMessage() methods for SQL version of
persistence.QueueV2, which supersedes the persistence.Queue interface.
The implementations for delete and merge are left out for now and will
be implemented later.


<!-- Tell your future self why have you made these changes -->
**Why?**
We plan on using this for the upcoming history task DLQ project.


<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Unit tests.

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
prathyushpv authored Oct 10, 2023
1 parent 5f6768c commit 293928d
Show file tree
Hide file tree
Showing 35 changed files with 1,317 additions and 50 deletions.
2 changes: 1 addition & 1 deletion common/persistence/cassandra/queue_v2_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *queueV2Store) ReadMessages(
return nil, gocql.ConvertError("QueueV2ReadMessages", err)
}

nextPageToken := s.getNextPageToken(messages, messageID)
nextPageToken := persistence.GetNextPageTokenForQueueV2(messages)

return &persistence.InternalReadMessagesResponse{
Messages: messages,
Expand Down
75 changes: 75 additions & 0 deletions common/persistence/queue_v2_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package persistence

import (
"fmt"

persistencespb "go.temporal.io/server/api/persistence/v1"
)

const (
// pageTokenPrefixByte is the first byte of the serialized page token. It's used to ensure that the page token is
// not empty. Without this, if the last_read_message_id is 0, the serialized page token would be empty, and clients
// could erroneously assume that there are no more messages beyond the first page. This is purely used to ensure
// that tokens are non-empty; it is not used to verify that the token is valid like the magic byte in some other
// protocols.
pageTokenPrefixByte = 0
)

func GetNextPageTokenForQueueV2(result []QueueV2Message) []byte {
if len(result) == 0 {
return nil
}
lastReadMessageID := result[len(result)-1].MetaData.ID
token := &persistencespb.ReadQueueMessagesNextPageToken{
LastReadMessageId: lastReadMessageID,
}
// This can never fail if you inspect the implementation.
b, _ := token.Marshal()

// See the comment above pageTokenPrefixByte for why we want to do this.
return append([]byte{pageTokenPrefixByte}, b...)
}

func GetMinMessageIDForQueueV2(request *InternalReadMessagesRequest) (int64, error) {
// TODO: start from the ack level of the queue partition instead of the first message ID when there is no token.
if len(request.NextPageToken) == 0 {
return FirstQueueMessageID, nil
}
var token persistencespb.ReadQueueMessagesNextPageToken

// Skip the first byte. See the comment on pageTokenPrefixByte for more details.
err := token.Unmarshal(request.NextPageToken[1:])
if err != nil {
return 0, fmt.Errorf(
"%w: %q: %v",
ErrInvalidReadQueueMessagesNextPageToken,
request.NextPageToken,
err,
)
}
return token.LastReadMessageId + 1, nil
}
19 changes: 15 additions & 4 deletions common/persistence/sql/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ func NewFactory(
}
}

// GetDB return a new SQL DB connection
func (f *Factory) GetDB() (sqlplugin.DB, error) {
conn, err := f.mainDBConn.Get()
if err != nil {
return nil, err
}
return conn, err
}

// NewTaskStore returns a new task store
func (f *Factory) NewTaskStore() (p.TaskStore, error) {
conn, err := f.mainDBConn.Get()
Expand Down Expand Up @@ -130,11 +139,13 @@ func (f *Factory) NewQueue(queueType p.QueueType) (p.Queue, error) {
return newQueue(conn, f.logger, queueType)
}

// NewQueueV2 returns a new data-access object for queues and messages. The current implementation is a no-op because
// we haven't implemented QueueV2 in SQL yet. However, we still need to return something here so that the graph is built
// correctly. Calls to any method on the returned object will themselves return an error.
// NewQueueV2 returns a new data-access object for queues and messages.
func (f *Factory) NewQueueV2() (p.QueueV2, error) {
return NewQueueV2(), nil
conn, err := f.mainDBConn.Get()
if err != nil {
return nil, err
}
return NewQueueV2(conn, f.logger), nil
}

// Close closes the factory
Expand Down
144 changes: 132 additions & 12 deletions common/persistence/sql/queue_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,158 @@ package sql

import (
"context"
"database/sql"
"errors"
"fmt"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
)

type (
queueV2 struct{}
const (
defaultPartition = 0
)

var (
ErrNotImplemented = errors.New("method is not implemented yet for SQL")
)

// NewQueueV2 returns an implementation of [persistence.QueueV2] which always returns an error because it is not
// implemented yet.
func NewQueueV2() persistence.QueueV2 {
return &queueV2{}
type (
queueV2 struct {
logger log.Logger
SqlStore
}

QueueV2MetadataPayload struct {
AckLevel int64
}
)

// NewQueueV2 returns an implementation of persistence.QueueV2.
func NewQueueV2(db sqlplugin.DB,
logger log.Logger,
) persistence.QueueV2 {
return &queueV2{
SqlStore: NewSqlStore(db, logger),
logger: logger,
}
}

func (q *queueV2) EnqueueMessage(
context.Context,
*persistence.InternalEnqueueMessageRequest,
ctx context.Context,
request *persistence.InternalEnqueueMessageRequest,
) (*persistence.InternalEnqueueMessageResponse, error) {
return nil, fmt.Errorf("%w: EnqueueMessage", ErrNotImplemented)
var lastMessageID int64
tx, err := q.Db.BeginTx(ctx)
if err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf("EnqueueMessage failed. Failed to start transaction. Error: %v", err))
}
lastMessageID, err = tx.GetLastEnqueuedMessageIDForUpdateV2(ctx, sqlplugin.QueueV2Filter{
QueueType: request.QueueType,
QueueName: request.QueueName,
Partition: defaultPartition,
})
switch {
case err == nil:
lastMessageID = lastMessageID + 1
case errors.Is(err, sql.ErrNoRows):
lastMessageID = persistence.FirstQueueMessageID
default:
rollBackErr := tx.Rollback()
if rollBackErr != nil {
q.logger.Error("transaction rollback error", tag.Error(rollBackErr))
}
return nil, fmt.Errorf("failed to get last enqueued message id: %w", err)
}
_, err = tx.InsertIntoQueueV2Messages(ctx, []sqlplugin.QueueV2MessageRow{
newQueueV2Row(request.QueueType, request.QueueName, lastMessageID, request.Blob),
})
if err != nil {
rollBackErr := tx.Rollback()
if rollBackErr != nil {
q.logger.Error("transaction rollback error", tag.Error(rollBackErr))
}
return nil, err
}

if err := tx.Commit(); err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf("EnqueueMessage failed. Failed to commit transaction. Error: %v", err))
}
return &persistence.InternalEnqueueMessageResponse{Metadata: persistence.MessageMetadata{ID: lastMessageID}}, err
}

func (q *queueV2) ReadMessages(
context.Context,
*persistence.InternalReadMessagesRequest,
ctx context.Context,
request *persistence.InternalReadMessagesRequest,
) (*persistence.InternalReadMessagesResponse, error) {
return nil, fmt.Errorf("%w: ReadMessages", ErrNotImplemented)
if request.PageSize <= 0 {
return nil, persistence.ErrNonPositiveReadQueueMessagesPageSize
}

var minMessageID int64
minMessageID, err := persistence.GetMinMessageIDForQueueV2(request)
if err != nil {
return nil, err
}

rows, err := q.Db.RangeSelectFromQueueV2Messages(ctx, sqlplugin.QueueV2MessagesFilter{
QueueType: request.QueueType,
QueueName: request.QueueName,
Partition: defaultPartition,
MinMessageID: minMessageID,
PageSize: request.PageSize,
})

if err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf("RangeSelectFromQueueV2Messages operation failed. Error %v", err))
}

var messages []persistence.QueueV2Message
for _, row := range rows {
encoding, ok := enums.EncodingType_value[row.MessageEncoding]
if !ok {
return nil, serialization.NewUnknownEncodingTypeError(row.MessageEncoding)
}
encodingType := enums.EncodingType(encoding)
message := persistence.QueueV2Message{
MetaData: persistence.MessageMetadata{ID: row.MessageID},
Data: commonpb.DataBlob{
EncodingType: encodingType,
Data: row.MessagePayload,
},
}
messages = append(messages, message)
}

nextPageToken := persistence.GetNextPageTokenForQueueV2(messages)
response := &persistence.InternalReadMessagesResponse{
Messages: messages,
NextPageToken: nextPageToken,
}
return response, nil
}

func newQueueV2Row(
queueType persistence.QueueV2Type,
queueName string,
messageID int64,
blob commonpb.DataBlob,
) sqlplugin.QueueV2MessageRow {

return sqlplugin.QueueV2MessageRow{
QueueType: queueType,
QueueName: queueName,
QueuePartition: defaultPartition,
MessageID: messageID,
MessagePayload: blob.Data,
MessageEncoding: blob.EncodingType.String(),
}
}

func (q *queueV2) CreateQueue(
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/sql/sqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type (
Visibility
QueueMessage
QueueMetadata
QueueV2Message
QueueV2Metadata

MatchingTask
MatchingTaskQueue
Expand Down
Loading

0 comments on commit 293928d

Please sign in to comment.