Skip to content

Commit

Permalink
Fix domain replication queue cleanup query (#3259)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed May 13, 2020
1 parent 9821fb0 commit 25bb63e
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions common/persistence/cassandra/cassandraQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ const (
)

const (
templateEnqueueMessageQuery = `INSERT INTO queue (queue_type, message_id, message_payload) VALUES(?, ?, ?) IF NOT EXISTS`
templateGetLastMessageIDQuery = `SELECT message_id FROM queue WHERE queue_type=? ORDER BY message_id DESC LIMIT 1`
templateGetMessagesQuery = `SELECT message_id, message_payload FROM queue WHERE queue_type = ? and message_id > ? LIMIT ?`
templateGetMessagesFromDLQQuery = `SELECT message_id, message_payload FROM queue WHERE queue_type = ? and message_id > ? and message_id <= ?`
templateDeleteMessagesQuery = `DELETE FROM queue WHERE queue_type = ? and message_id > ? and message_id <= ?`
templateDeleteMessageQuery = `DELETE FROM queue WHERE queue_type = ? and message_id = ?`
templateGetQueueMetadataQuery = `SELECT cluster_ack_level, version FROM queue_metadata WHERE queue_type = ?`
templateInsertQueueMetadataQuery = `INSERT INTO queue_metadata (queue_type, cluster_ack_level, version) VALUES(?, ?, ?) IF NOT EXISTS`
templateUpdateQueueMetadataQuery = `UPDATE queue_metadata SET cluster_ack_level = ?, version = ? WHERE queue_type = ? IF version = ?`
templateEnqueueMessageQuery = `INSERT INTO queue (queue_type, message_id, message_payload) VALUES(?, ?, ?) IF NOT EXISTS`
templateGetLastMessageIDQuery = `SELECT message_id FROM queue WHERE queue_type=? ORDER BY message_id DESC LIMIT 1`
templateGetMessagesQuery = `SELECT message_id, message_payload FROM queue WHERE queue_type = ? and message_id > ? LIMIT ?`
templateGetMessagesFromDLQQuery = `SELECT message_id, message_payload FROM queue WHERE queue_type = ? and message_id > ? and message_id <= ?`
templateDeleteMessagesBeforeQuery = `DELETE FROM queue WHERE queue_type = ? and message_id < ?`
templateDeleteMessagesQuery = `DELETE FROM queue WHERE queue_type = ? and message_id > ? and message_id <= ?`
templateDeleteMessageQuery = `DELETE FROM queue WHERE queue_type = ? and message_id = ?`
templateGetQueueMetadataQuery = `SELECT cluster_ack_level, version FROM queue_metadata WHERE queue_type = ?`
templateInsertQueueMetadataQuery = `INSERT INTO queue_metadata (queue_type, cluster_ack_level, version) VALUES(?, ?, ?) IF NOT EXISTS`
templateUpdateQueueMetadataQuery = `UPDATE queue_metadata SET cluster_ack_level = ?, version = ? WHERE queue_type = ? IF version = ?`
)

type (
Expand Down Expand Up @@ -295,13 +296,12 @@ func (q *cassandraQueue) DeleteMessagesBefore(
messageID int64,
) error {

query := q.session.Query(templateDeleteMessagesQuery, q.queueType, messageID)
query := q.session.Query(templateDeleteMessagesBeforeQuery, q.queueType, messageID)
if err := query.Exec(); err != nil {
return &workflow.InternalServiceError{
Message: fmt.Sprintf("DeleteMessagesBefore operation failed. Error %v", err),
}
}

return nil
}

Expand Down

0 comments on commit 25bb63e

Please sign in to comment.