Skip to content

Commit ce55d0f

Browse files
committed
Add test for sender properties
1 parent bc2a4cc commit ce55d0f

File tree

1 file changed

+92
-0
lines changed

1 file changed

+92
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.autoconfigure;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.time.Duration;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
import org.apache.pulsar.client.api.CompressionType;
26+
import org.apache.pulsar.client.api.HashingScheme;
27+
import org.apache.pulsar.client.api.MessageRoutingMode;
28+
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
29+
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
30+
import org.junit.jupiter.api.Nested;
31+
import org.junit.jupiter.api.Test;
32+
33+
import org.springframework.boot.context.properties.bind.Bindable;
34+
import org.springframework.boot.context.properties.bind.Binder;
35+
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
36+
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
37+
38+
/**
39+
* Unit tests for {@link PulsarReactiveProperties}.
40+
*
41+
* @author Christophe Bornet
42+
*/
43+
public class PulsarReactivePropertiesTests {
44+
45+
private final PulsarReactiveProperties properties = new PulsarReactiveProperties();
46+
47+
private void bind(Map<String, String> map) {
48+
ConfigurationPropertySource source = new MapConfigurationPropertySource(map);
49+
new Binder(source).bind("spring.pulsar.reactive", Bindable.ofInstance(this.properties));
50+
}
51+
52+
@Nested
53+
class SenderPropertiesTests {
54+
55+
@Test
56+
void senderPropsToSenderSpec() {
57+
Map<String, String> props = new HashMap<>();
58+
props.put("spring.pulsar.reactive.sender.topic-name", "my-topic");
59+
props.put("spring.pulsar.reactive.sender.producer-name", "my-producer");
60+
props.put("spring.pulsar.reactive.sender.send-timeout", "2s");
61+
props.put("spring.pulsar.reactive.sender.max-pending-messages", "3");
62+
props.put("spring.pulsar.reactive.sender.max-pending-messages-across-partitions", "4");
63+
props.put("spring.pulsar.reactive.sender.message-routing-mode", "CustomPartition");
64+
props.put("spring.pulsar.reactive.sender.hashing-scheme", "Murmur3_32Hash");
65+
props.put("spring.pulsar.reactive.sender.crypto-failure-action", "SEND");
66+
props.put("spring.pulsar.reactive.sender.batching-max-publish-delay", "5s");
67+
props.put("spring.pulsar.reactive.sender.batching-max-messages", "6");
68+
props.put("spring.pulsar.reactive.sender.batching-enabled", "false");
69+
props.put("spring.pulsar.reactive.sender.chunking-enabled", "true");
70+
props.put("spring.pulsar.reactive.sender.compression-type", "LZ4");
71+
72+
bind(props);
73+
ReactiveMessageSenderSpec senderSpec = properties.buildReactiveMessageSenderSpec();
74+
75+
assertThat(senderSpec.getTopicName()).isEqualTo("my-topic");
76+
assertThat(senderSpec.getProducerName()).isEqualTo("my-producer");
77+
assertThat(senderSpec.getSendTimeout()).isEqualTo(Duration.ofSeconds(2));
78+
assertThat(senderSpec.getMaxPendingMessages()).isEqualTo(3);
79+
assertThat(senderSpec.getMaxPendingMessagesAcrossPartitions()).isEqualTo(4);
80+
assertThat(senderSpec.getMessageRoutingMode()).isEqualTo(MessageRoutingMode.CustomPartition);
81+
assertThat(senderSpec.getHashingScheme()).isEqualTo(HashingScheme.Murmur3_32Hash);
82+
assertThat(senderSpec.getCryptoFailureAction()).isEqualTo(ProducerCryptoFailureAction.SEND);
83+
assertThat(senderSpec.getBatchingMaxPublishDelay()).isEqualTo(Duration.ofSeconds(5));
84+
assertThat(senderSpec.getBatchingMaxMessages()).isEqualTo(6);
85+
assertThat(senderSpec.getBatchingEnabled()).isEqualTo(false);
86+
assertThat(senderSpec.getChunkingEnabled()).isEqualTo(true);
87+
assertThat(senderSpec.getCompressionType()).isEqualTo(CompressionType.LZ4);
88+
}
89+
90+
}
91+
92+
}

0 commit comments

Comments
 (0)