Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dlq initial subscription support #273

Merged
merged 7 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Pulsar.Client/Api/Configuration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type ProducerConfiguration =
BlockIfQueueFull: bool
MessageEncryptor: IMessageEncryptor option
ProducerCryptoFailureAction: ProducerCryptoFailureAction
InitialSubscriptionName: SubscriptionName
}
member this.BatchingPartitionSwitchFrequencyIntervalMs =
this.BatchingPartitionSwitchFrequencyByPublishDelay * (int this.BatchingMaxPublishDelay.TotalMilliseconds)
Expand Down Expand Up @@ -167,6 +168,7 @@ type ProducerConfiguration =
BlockIfQueueFull = false
MessageEncryptor = None
ProducerCryptoFailureAction = ProducerCryptoFailureAction.FAIL
InitialSubscriptionName = %""
}

type ReaderConfiguration =
Expand Down
9 changes: 5 additions & 4 deletions src/Pulsar.Client/Api/ConsumerBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ type ConsumerBuilder<'T> private (createConsumerAsync, createProducerAsync, conf
let deadLettersProcessor (c: ConsumerConfiguration<'T>) (deadLettersPolicy: DeadLetterPolicy) (topic: TopicName) =
let getTopicName () =
topic.ToString()
let createProducer deadLetterTopic =
let createProducer deadLetterTopic initialSubscriptionName =
ProducerBuilder(createProducerAsync, schema)
.Topic(deadLetterTopic)
.InitialSubscriptionName(initialSubscriptionName)
.BlockIfQueueFull(false)
.CreateAsync()
DeadLetterProcessor(deadLettersPolicy, getTopicName, c.SubscriptionName, createProducer) :> IDeadLetterProcessor<'T>
Expand Down Expand Up @@ -70,11 +71,11 @@ type ConsumerBuilder<'T> private (createConsumerAsync, createProducerAsync, conf
let isEmptyDL = String.IsNullOrEmpty policy.DeadLetterTopic
let isEmptyRL = String.IsNullOrEmpty policy.RetryLetterTopic
if isEmptyDL && isEmptyRL then
DeadLetterPolicy(policy.MaxRedeliveryCount, defaultDeadLetterTopic, defaultRetryLetterTopic)
DeadLetterPolicy(policy.MaxRedeliveryCount, defaultDeadLetterTopic, defaultRetryLetterTopic, policy.InitialSubscriptionName)
elif isEmptyDL then
DeadLetterPolicy(policy.MaxRedeliveryCount, defaultDeadLetterTopic, policy.RetryLetterTopic)
DeadLetterPolicy(policy.MaxRedeliveryCount, defaultDeadLetterTopic, policy.RetryLetterTopic, policy.InitialSubscriptionName)
elif isEmptyRL then
DeadLetterPolicy(policy.MaxRedeliveryCount, policy.DeadLetterTopic, defaultRetryLetterTopic)
DeadLetterPolicy(policy.MaxRedeliveryCount, policy.DeadLetterTopic, defaultRetryLetterTopic, policy.InitialSubscriptionName)
else
policy
{ c with
Expand Down
2 changes: 2 additions & 0 deletions src/Pulsar.Client/Api/DeadLetters.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ open System.Threading.Tasks
type DeadLetterPolicy(maxRedeliveryCount: int
, [<Optional; DefaultParameterValue(null:string)>] deadLetterTopic: string
, [<Optional; DefaultParameterValue(null:string)>] retryLetterTopic: string
, [<Optional; DefaultParameterValue(null:string)>] initialSubscriptionName: string
) =
member __.MaxRedeliveryCount = maxRedeliveryCount
member __.DeadLetterTopic = deadLetterTopic
member __.RetryLetterTopic = retryLetterTopic
member __.InitialSubscriptionName = initialSubscriptionName

type IDeadLetterProcessor<'T> =
abstract member ClearMessages: unit -> unit
Expand Down
5 changes: 5 additions & 0 deletions src/Pulsar.Client/Api/ProducerBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ type ProducerBuilder<'T> private (сreateProducerAsync, config: ProducerConfigur
MessageEncryptor = Some messageEncryptor }
|> this.With

member internal this.InitialSubscriptionName initialSubscriptionName =
{ config with
InitialSubscriptionName = initialSubscriptionName }
|> this.With

member this.CreateAsync(): Task<IProducer<'T>> =
сreateProducerAsync(verify config, schema, producerInterceptors)

Expand Down
4 changes: 3 additions & 1 deletion src/Pulsar.Client/Common/Commands.fs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,14 @@ let newLookup (topicName : CompleteTopicName) (requestId : RequestId) (authorita
command |> serializeSimpleCommand

let newProducer (topicName : CompleteTopicName) (producerName: string) (producerId : ProducerId) (requestId : RequestId)
(schemaInfo: SchemaInfo) (epoch: uint64) (txnEnabled: bool) =
(schemaInfo: SchemaInfo) (epoch: uint64) (txnEnabled: bool) (initialSubscriptionName: SubscriptionName) =
let schema = getProtoSchema schemaInfo
let request = CommandProducer(Topic = %topicName, ProducerId = %producerId, RequestId = %requestId,
Epoch = epoch, TxnEnabled = txnEnabled)
if producerName |> String.IsNullOrEmpty |> not then
request.ProducerName <- producerName
if %initialSubscriptionName |> String.IsNullOrEmpty |> not then
request.InitialSubscriptionName <- %initialSubscriptionName
if schema.``type`` <> Schema.Type.None then
request.Schema <- schema
let command = BaseCommand(``type`` = CommandType.Producer, Producer = request)
Expand Down
6 changes: 3 additions & 3 deletions src/Pulsar.Client/Internal/DeadLetters.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type internal DeadLetterProcessor<'T>
(policy: DeadLetterPolicy,
getTopicName: unit -> string,
subscriptionName: SubscriptionName,
createProducer: string -> Task<IProducer<'T>>) =
createProducer: string -> SubscriptionName -> Task<IProducer<'T>>) =

let topicName = getTopicName()
let store = Dictionary<MessageId, Message<'T>>()
Expand All @@ -24,11 +24,11 @@ type internal DeadLetterProcessor<'T>
$"{topicName}-{subscriptionName}{RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX}"

let dlProducer = lazy (
createProducer dlTopicName
createProducer dlTopicName %policy.InitialSubscriptionName
)

let rlProducer = lazy (
createProducer policy.RetryLetterTopic
createProducer policy.RetryLetterTopic %""
)

let getOptionalKey (message: Message<'T>) =
Expand Down
1 change: 1 addition & 0 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
try
let payload = Commands.newProducer producerConfig.Topic.CompleteTopicName producerConfig.ProducerName
producerId requestId schema.SchemaInfo epoch clientConfig.EnableTransaction
producerConfig.InitialSubscriptionName
let! response = clientCnx.SendAndWaitForReply requestId payload
let success = response |> PulsarResponseType.GetProducerSuccess
if String.IsNullOrEmpty producerName then
Expand Down
6 changes: 3 additions & 3 deletions tests/IntegrationTests/Common.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ open FSharp.UMX
[<Literal>]
let pulsarAddress = "pulsar://127.0.0.1:6650"

#if !NOTLS
[<Literal>]
let pulsarSslAddress = "pulsar+ssl://127.0.0.1:6651"
let pulsarHttpAddress = "http://127.0.0.1:8080"

#if !NOTLS
[<Literal>]
let pulsarHttpAddress = "http://127.0.0.1:8080"
let pulsarSslAddress = "pulsar+ssl://127.0.0.1:6651"

// ssl folder copied by from https://github.com/apache/pulsar/tree/master/tests/docker-images/latest-version-image/ssl
// generate pfx file from pem, leave the password blank
Expand Down
108 changes: 94 additions & 14 deletions tests/IntegrationTests/DeadLetters.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ let tests =
NumberOfMessages = 10
|}

let getTestConfigForInitialSubscription() =
let newGuid = Guid.NewGuid().ToString("N")
let topic = sprintf "public/default/initial-subscription-dl-%s-test" newGuid
{|
TopicName = topic
DeadLettersPolicy = DeadLetterPolicy(0, sprintf "%s-DLQ" topic, null, "test-initial-subscription")
SubscriptionName = "test-subscription"
NumberOfMessages = 10
|}

let receiveAndAckNegative (consumer: IConsumer<'T>) number =
task {
for _ in 1..number do
Expand Down Expand Up @@ -81,7 +91,7 @@ let tests =
.SubscriptionName(config.SubscriptionName)
.SubscriptionType(SubscriptionType.Shared)
.SubscribeAsync()

let producerTask =
Task.Run(fun () ->
task {
Expand All @@ -107,11 +117,11 @@ let tests =
dlqConsumerTask
|]

do! Task.WhenAll(tasks)
do! Task.WhenAll(tasks)

description |> logTestEnd
}

testTask "Partitioned topic failed messages stored in a configured dead letter topic" {

let description = "Failed messages in a partitioned topic are stored in a configured dead letter topic"
Expand Down Expand Up @@ -151,7 +161,7 @@ let tests =
.SubscribeAsync()

let messages = generateMessages config.NumberOfMessages producerName

let producerTask =
Task.Run(fun () ->
task {
Expand All @@ -177,7 +187,7 @@ let tests =
dlqConsumerTask
|]

do! Task.WhenAll(tasks)
do! Task.WhenAll(tasks)
do! Task.Delay(110) // wait for acks

description |> logTestEnd
Expand Down Expand Up @@ -244,7 +254,7 @@ let tests =
dlqConsumerTask
|]

do! Task.WhenAll(tasks)
do! Task.WhenAll(tasks)

description |> logTestEnd
}
Expand Down Expand Up @@ -310,7 +320,7 @@ let tests =
dlqConsumerTask
|]

do! Task.WhenAll(tasks)
do! Task.WhenAll(tasks)

description |> logTestEnd
}
Expand Down Expand Up @@ -398,11 +408,11 @@ let tests =
dlqConsumerTask
|]

do! Task.WhenAll(tasks)
do! Task.WhenAll(tasks)

description |> logTestEnd
}

testTask "Reconsume later works properly" {

let description = "Reconsume later works properly"
Expand All @@ -427,15 +437,85 @@ let tests =
.SubscriptionName(config.SubscriptionName)
.EnableRetry(true)
.SubscribeAsync()
let! msgId = producer.SendAsync([| 0uy; 1uy; 0uy |])
let! (msg1 : Message<byte[]>) = consumer.ReceiveAsync()
do! consumer.ReconsumeLaterAsync(msg1, %(DateTime.UtcNow.AddSeconds(1.0) |> convertToMsTimestamp))
let! (msg2 : Message<byte[]>) = consumer.ReceiveAsync()

let! msgId = producer.SendAsync([| 0uy; 1uy; 0uy |])
let! (msg1 : Message<byte[]>) = consumer.ReceiveAsync()
do! consumer.ReconsumeLaterAsync(msg1, %(DateTime.UtcNow.AddSeconds(1.0) |> convertToMsTimestamp))
let! (msg2 : Message<byte[]>) = consumer.ReceiveAsync()

Expect.equal "" msgId msg1.MessageId
Expect.equal "" (msg1.GetValue() |> Array.toList) (msg2.GetValue() |> Array.toList)

description |> logTestEnd
}

// TODO make active if server updated to 2.10.0 or newer
ptestTask "Dead-letter initial subscription is created" {

let description = "When the dead-letter policy has an initial subscription, dlq messages are not dropped"

description |> logTestStart

let config = getTestConfigForInitialSubscription()
let producerName = "initialSubscriptionProducer"
let consumerName = "initialSubscriptionConsumer"
let dlqConsumerName = "initialSubscriptionDlqConsumer"

let! producer =
createProducer()
.ProducerName(producerName)
.Topic(config.TopicName)
.EnableBatching(false)
.CreateAsync()

let! consumer =
createConsumer()
.ConsumerName(consumerName)
.Topic(config.TopicName)
.SubscriptionName(config.SubscriptionName)
.SubscriptionType(SubscriptionType.Shared)
.NegativeAckRedeliveryDelay(TimeSpan.FromMilliseconds(100))
.DeadLetterPolicy(config.DeadLettersPolicy)
.SubscribeAsync()

let producerTask =
Task.Run(fun () ->
task {
do! produceMessages producer config.NumberOfMessages producerName
}:> Task)

let consumerTask =
Task.Run(fun () ->
task {
do! receiveAndAckNegative consumer config.NumberOfMessages
}:> Task)


let tasks =
[|
producerTask
consumerTask
|]

do! Task.WhenAll(tasks)
do! Task.Delay(1000)

let! dlqConsumer =
createConsumer()
.ConsumerName(dlqConsumerName)
.Topic(config.DeadLettersPolicy.DeadLetterTopic)
.SubscriptionName(config.DeadLettersPolicy.InitialSubscriptionName)
.SubscriptionType(SubscriptionType.Shared)
.SubscribeAsync()

let dlqConsumerTask =
Task.Run(fun () ->
task {
do! consumeMessages dlqConsumer config.NumberOfMessages dlqConsumerName
}:> Task)

do! dlqConsumerTask

description |> logTestEnd
}
]
2 changes: 1 addition & 1 deletion tests/UnitTests/Common/CommandTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ module CommandsTests =
let requestId = %1UL

let totalSize, commandSize, command =
serializeDeserializeSimpleCommand (newProducer topicName producerName producerId requestId (Schema.BYTES().SchemaInfo) 0UL false)
serializeDeserializeSimpleCommand (newProducer topicName producerName producerId requestId (Schema.BYTES().SchemaInfo) 0UL false %"")

totalSize |> Expect.equal "" 43
commandSize |> Expect.equal "" 39
Expand Down
Loading