Skip to content

Commit

Permalink
Add support for setting replicateSubscriptionState for the subscrip…
Browse files Browse the repository at this point in the history
…tion (#261)

* Add support for setting `replicateSubscriptionState` for the subscription

* Forward port 8080 for tests
  • Loading branch information
RobertIndie authored Apr 19, 2024
1 parent 6592960 commit e0282f8
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 6 deletions.
2 changes: 2 additions & 0 deletions src/Pulsar.Client/Api/Configuration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type ConsumerConfiguration<'T> =
MaxPendingChunkedMessage: int
AutoAckOldestChunkedMessageOnQueueFull: bool
ExpireTimeOfIncompleteChunkedMessage: TimeSpan
ReplicateSubscriptionState: bool
}
member this.SingleTopic with get() = this.Topics |> Seq.head
static member Default =
Expand Down Expand Up @@ -112,6 +113,7 @@ type ConsumerConfiguration<'T> =
MaxPendingChunkedMessage = 10
AutoAckOldestChunkedMessageOnQueueFull = false
ExpireTimeOfIncompleteChunkedMessage = TimeSpan.FromSeconds(60.0)
ReplicateSubscriptionState = false
}

type ProducerConfiguration =
Expand Down
5 changes: 5 additions & 0 deletions src/Pulsar.Client/Api/ConsumerBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ type ConsumerBuilder<'T> private (createConsumerAsync, createProducerAsync, conf
{ config with
ConsumerCryptoFailureAction = action }
|> this.With

member this.ReplicateSubscriptionState replicateSubscriptionState =
{ config with
ReplicateSubscriptionState = replicateSubscriptionState }
|> this.With

member this.SubscribeAsync(): Task<IConsumer<'T>> =
createConsumerAsync(verify config, schema, consumerInterceptors)
Expand Down
6 changes: 4 additions & 2 deletions src/Pulsar.Client/Common/Commands.fs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ let newGetTopicsOfNamespaceRequest (ns : NamespaceName) (requestId : RequestId)
let newSubscribe (topicName: CompleteTopicName) (subscription: SubscriptionName) (consumerId: ConsumerId) (requestId: RequestId)
(consumerName: string) (subscriptionType: SubscriptionType) (subscriptionInitialPosition: SubscriptionInitialPosition)
(readCompacted: bool) (startMessageId: MessageIdData) (durable: bool) (startMessageRollbackDuration: TimeSpan)
(createTopicIfDoesNotExist: bool) (keySharedPolicy: KeySharedPolicy option) (schemaInfo: SchemaInfo) (priorityLevel: PriorityLevel) =
(createTopicIfDoesNotExist: bool) (keySharedPolicy: KeySharedPolicy option) (schemaInfo: SchemaInfo) (priorityLevel: PriorityLevel)
(replicateSubscriptionState: bool)=
let schema = getProtoSchema schemaInfo
let subType =
match subscriptionType with
Expand All @@ -272,7 +273,8 @@ let newSubscribe (topicName: CompleteTopicName) (subscription: SubscriptionName)
| _ -> failwith "Unknown initialPosition type"
let request = CommandSubscribe(Topic = %topicName, Subscription = %subscription, subType = subType, ConsumerId = %consumerId,
ConsumerName = consumerName, RequestId = %requestId, initialPosition = initialPosition, ReadCompacted = readCompacted,
StartMessageId = startMessageId, Durable = durable, ForceTopicCreation = createTopicIfDoesNotExist, PriorityLevel = %priorityLevel)
StartMessageId = startMessageId, Durable = durable, ForceTopicCreation = createTopicIfDoesNotExist, PriorityLevel = %priorityLevel,
ReplicateSubscriptionState = replicateSubscriptionState)
match keySharedPolicy with
| Some keySharedPolicy ->
let meta = KeySharedMeta()
Expand Down
2 changes: 1 addition & 1 deletion src/Pulsar.Client/Internal/ConsumerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
consumerId requestId consumerName consumerConfig.SubscriptionType
consumerConfig.SubscriptionInitialPosition consumerConfig.ReadCompacted msgIdData isDurable
startMessageRollbackDuration createTopicIfDoesNotExist consumerConfig.KeySharedPolicy
schema.SchemaInfo consumerConfig.PriorityLevel
schema.SchemaInfo consumerConfig.PriorityLevel consumerConfig.ReplicateSubscriptionState
try
let! response = clientCnx.SendAndWaitForReply requestId payload
response |> PulsarResponseType.GetEmpty
Expand Down
31 changes: 31 additions & 0 deletions tests/IntegrationTests/Basic.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module Pulsar.Client.IntegrationTests.Basic

open System
open System.Net.Http
open System.Text.Json
open System.Threading
open System.Diagnostics

Expand Down Expand Up @@ -349,6 +351,35 @@ let tests =

Log.Debug("Finished 'Scheduled message should be delivered at requested time'")
}

testTask "Create the replicated subscription should be successful" {
Log.Debug("Started 'Create the replicated subscription should be successful'")
let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N")
let consumerName = "replicated-consumer"
let client = getClient()
let! (_ : IConsumer<byte[]>) =
client.NewConsumer()
.Topic(topicName)
.ConsumerName(consumerName)
.SubscriptionName("replicate")
.SubscriptionType(SubscriptionType.Shared)
.ReplicateSubscriptionState(true)
.SubscribeAsync()

do! Task.Delay 1000
let getJsonAsync (url: string) =
async {
use httpClient = new HttpClient()
let! response = httpClient.GetStringAsync(url) |> Async.AwaitTask
return JsonDocument.Parse(response)
}

let url = "http://localhost:8080/admin/v2/persistent/" + topicName + "/stats"
let json = Async.RunSynchronously (getJsonAsync url)
let isReplicated = json.RootElement.GetProperty("subscriptions").GetProperty("replicate").GetProperty("isReplicated").GetBoolean()
Expect.isTrue "" isReplicated
Log.Debug("Finished 'Create the replicated subscription should be successful'")
}

#if !NOTLS
// Before running this test set 'maxMessageSize' for broker and 'nettyMaxFrameSizeBytes' for bookkeeper
Expand Down
7 changes: 4 additions & 3 deletions tests/UnitTests/Common/CommandTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,16 @@ module CommandsTests =
let totalSize, commandSize, command =
serializeDeserializeSimpleCommand
(newSubscribe topicName %"test-subscription" consumerId requestId consumerName
SubscriptionType.Exclusive SubscriptionInitialPosition.Earliest false null true TimeSpan.Zero true None (Schema.BYTES().SchemaInfo) priorityLevel)
SubscriptionType.Exclusive SubscriptionInitialPosition.Earliest false null true TimeSpan.Zero true None (Schema.BYTES().SchemaInfo) priorityLevel false)

totalSize |> Expect.equal "" 70
commandSize |> Expect.equal "" 66
totalSize |> Expect.equal "" 72
commandSize |> Expect.equal "" 68
command.``type`` |> Expect.equal "" CommandType.Subscribe
command.Subscribe.Topic |> Expect.equal "" %topicName
command.Subscribe.RequestId |> Expect.equal "" %requestId
command.Subscribe.ConsumerId |> Expect.equal "" %consumerId
command.Subscribe.ConsumerName |> Expect.equal "" %consumerName
command.Subscribe.ReplicateSubscriptionState |> Expect.equal "" %false
}

test "newFlow should return correct frame" {
Expand Down
1 change: 1 addition & 0 deletions tests/compose/standalone/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
ports:
- "6650:6650"
- "2181:2181"
- "8080:8080"
command: >
bash -c "bin/apply-config-from-env.py conf/standalone.conf &&
bin/pulsar standalone"
Expand Down

0 comments on commit e0282f8

Please sign in to comment.