Skip to content

Commit 08a27ae

Browse files
authored
[Feature] Add support for token authentication (#21)
1 parent dd1372d commit 08a27ae

File tree

4 files changed

+131
-26
lines changed

4 files changed

+131
-26
lines changed

README.md

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,47 @@ If you require features not yet available in this plugin (including client versi
1313
# Pulsar Output Configuration Options
1414
This plugin supports these configuration options.
1515

16-
| Settings | Output type | Required |
17-
|-----------------------------------|:---------------------------------------------------:|-----------:|
18-
| serviceUrl | string | No |
19-
| topic | string | Yes |
20-
| compression_type | string, one of["NONE","LZ4","ZLIB","ZSTD","SNAPPY"] | No |
21-
| block_if_queue_full | bool, default is true | No |
22-
| enable_batching | bool, default is true | No |
23-
| enable_tls | boolean, one of [true, false]. default is false | No |
24-
| tls_trust_store_path | string, required if enable_tls is set to true | No |
25-
| tls_trust_store_password | string, default is empty | No |
26-
| enable_tls_hostname_verification | boolean, one of [true, false]. default is false | No |
27-
| protocols | array, ciphers list. default is TLSv1.2 | No |
28-
| allow_tls_insecure_connection | boolean, one of [true, false].default is false | No |
29-
| auth_plugin_class_name | string | No |
30-
| ciphers | array, ciphers list | No |
31-
16+
| Settings | Output type | Required |
17+
|-----------------------------------|:-----------------------------------------------------------------------------:|-----------:|
18+
| serviceUrl | string | No |
19+
| topic | string | Yes |
20+
| producer_name | string | Yes |
21+
| compression_type | string, one of["NONE","LZ4","ZLIB","ZSTD","SNAPPY"] | No |
22+
| block_if_queue_full | bool, default is true | No |
23+
| enable_batching | bool, default is true | No |
24+
| enable_tls | boolean, one of [true, false]. default is false | No |
25+
| tls_trust_store_path | string, required if enable_tls is set to true | No |
26+
| tls_trust_store_password | string, default is empty | No |
27+
| enable_tls_hostname_verification | boolean, one of [true, false]. default is false | No |
28+
| protocols | array, ciphers list. default is TLSv1.2 | No |
29+
| allow_tls_insecure_connection | boolean, one of [true, false].default is false | No |
30+
| auth_plugin_class_name | string | No |
31+
| ciphers | array, ciphers list | No |
32+
| enable_token | boolean, one of [true, false]. default is false | No |
33+
| auth_plugin_params_String | string | No |
3234
# Example
33-
35+
pulsar without tls & token
3436
```
3537
output{
3638
pulsar{
37-
topic => "persistent://public/default/%{topic_name}"
3839
serviceUrl => "pulsar://127.0.0.1:6650"
40+
topic => "persistent://public/default/%{topic_name}"
41+
producer_name => "%{producer_name}"
42+
enable_batching => true
43+
}
44+
}
45+
```
46+
pulsar with token
47+
```
48+
output {
49+
pulsar{
50+
serviceUrl => "pulsar://localhost:6650"
51+
topic => "persistent://public/default/%{topic_name}"
52+
producer_name => "%{producer_name}"
3953
enable_batching => true
54+
enable_token => true
55+
auth_plugin_class_name => "org.apache.pulsar.client.impl.auth.AuthenticationToken"
56+
auth_plugin_params_String => "token:%{token}"
4057
}
4158
}
4259
```

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ dependencies {
5151
implementation 'org.apache.pulsar:pulsar-client:2.10.2'
5252
implementation files(LOGSTASH_CORE_PATH + '/../logstash-core/build/libs/logstash-core-7.17.4.jar')
5353

54-
54+
testImplementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3'
55+
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.12.3'
5556
testImplementation 'junit:junit:4.12'
5657
testImplementation 'org.jruby:jruby-complete:9.1.14.0'
5758
}

src/main/java/org/apache/pulsar/logstash/outputs/Pulsar.java

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ public class Pulsar implements Output {
9191
private static final PluginConfigSpec<List<Object>> CONFIG_PROTOCOLS =
9292
PluginConfigSpec.arraySetting("protocols", Collections.singletonList(protocols), false, false);
9393

94+
private static final PluginConfigSpec<Boolean> CONFIG_ENABLE_TOKEN =
95+
PluginConfigSpec.booleanSetting("enable_token",false);
96+
97+
private static final PluginConfigSpec<String> CONFIG_AUTH_PLUGIN_PARAMS_STRING =
98+
PluginConfigSpec.stringSetting("auth_plugin_params_String","");
9499

95100
private final CountDownLatch done = new CountDownLatch(1);
96101

@@ -117,6 +122,9 @@ public class Pulsar implements Output {
117122
//TLS
118123
private final boolean enableTls;
119124

125+
//Token
126+
private final boolean enableToken;
127+
120128
// TODO: batchingMaxPublishDelay milliseconds
121129

122130
// TODO: sendTimeoutMs milliseconds 30000
@@ -139,17 +147,24 @@ public Pulsar(final String id, final Configuration configuration, final Context
139147
compressionType = configuration.get(CONFIG_COMPRESSION_TYPE);
140148

141149
enableTls = configuration.get(CONFIG_ENABLE_TLS);
150+
enableToken = configuration.get(CONFIG_ENABLE_TOKEN);
151+
142152
try {
143-
if (enableTls) {
153+
if(enableTls && enableToken){
154+
logger.error("Unable to Tls and Token authentication at the same time");
155+
throw new IllegalStateException("Unable to Tls and Token authentication at the same time,enable_tls => true && enable_token => true" );
156+
} else if (enableTls) {
144157
// pulsar TLS
145158
client = buildTlsPulsar(configuration);
159+
} else if (enableToken) {
160+
// pulsar Token
161+
client = buildTokenPulsar(configuration);
146162
} else {
147163
client = buildNotTlsPulsar();
148164
}
149-
150165
producerMap = new HashMap<>();
151166
} catch (PulsarClientException e) {
152-
logger.error("fail to create pulsar client", e);
167+
logger.error("Fail to create pulsar client", e);
153168
throw new IllegalStateException("Unable to create pulsar client");
154169
}
155170
}
@@ -160,6 +175,13 @@ private PulsarClient buildNotTlsPulsar() throws PulsarClientException {
160175
.build();
161176
}
162177

178+
private PulsarClient buildTokenPulsar(Configuration configuration) throws PulsarClientException {
179+
return PulsarClient.builder()
180+
.serviceUrl(serviceUrl)
181+
.authentication(configuration.get(CONFIG_AUTH_PLUGIN_CLASS_NAME),configuration.get(CONFIG_AUTH_PLUGIN_PARAMS_STRING))
182+
.build();
183+
}
184+
163185
private PulsarClient buildTlsPulsar(Configuration configuration) throws PulsarClientException {
164186
Boolean allowTlsInsecureConnection = configuration.get(CONFIG_ALLOW_TLS_INSECURE_CONNECTION);
165187
Boolean enableTlsHostnameVerification = configuration.get(CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION);
@@ -288,16 +310,20 @@ public Collection<PluginConfigSpec<?>> configSchema() {
288310
CONFIG_COMPRESSION_TYPE,
289311
CONFIG_ENABLE_BATCHING,
290312
CONFIG_BLOCK_IF_QUEUE_FULL,
313+
CONFIG_AUTH_PLUGIN_CLASS_NAME,
291314

292315
// Pulsar TLS Config
293316
CONFIG_ENABLE_TLS,
294317
CONFIG_TLS_TRUST_STORE_PATH,
295318
CONFIG_TLS_TRUST_STORE_PASSWORD,
296319
CONFIG_PROTOCOLS,
297320
CONFIG_ALLOW_TLS_INSECURE_CONNECTION,
298-
CONFIG_AUTH_PLUGIN_CLASS_NAME,
299321
CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION,
300-
CONFIG_CIPHERS
322+
CONFIG_CIPHERS,
323+
324+
// Pulsar Token Config
325+
CONFIG_ENABLE_TOKEN,
326+
CONFIG_AUTH_PLUGIN_PARAMS_STRING
301327
));
302328

303329
}
@@ -306,6 +332,4 @@ public Collection<PluginConfigSpec<?>> configSchema() {
306332
public String getId() {
307333
return this.id;
308334
}
309-
310-
311335
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package org.apache.pulsar.logstash.outputs;
2+
3+
import co.elastic.logstash.api.Configuration;
4+
import co.elastic.logstash.api.Event;
5+
import java.util.ArrayList;
6+
import java.util.HashMap;
7+
import java.util.List;
8+
import java.util.Map;
9+
import org.junit.Assert;
10+
import org.junit.Before;
11+
import org.junit.Ignore;
12+
import org.junit.Test;
13+
import org.logstash.plugins.ConfigurationImpl;
14+
import org.logstash.plugins.codecs.Line;
15+
16+
public class PulsarOutputExampleTest {
17+
18+
String serviceUrl = "pulsar://127.0.0.1:6650";
19+
String topic = "public/default/logstash";
20+
String token = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTY5NDIzNjAwOX0.eFlDpXiiPlC4jeElru0z7NvjwHbmv5eF8orKlr96hSE";
21+
Map<String, Object> configValues = new HashMap<>();
22+
23+
// Initialize configuration information
24+
@Before
25+
public void init() {
26+
configValues.put("serviceUrl", serviceUrl);
27+
configValues.put("producer_name", "jun_test");
28+
configValues.put("topic", topic);
29+
configValues.put("enable_batching", true);
30+
configValues.put("enable_token", true);
31+
configValues.put("auth_plugin_class_name", "org.apache.pulsar.client.impl.auth.AuthenticationToken");
32+
configValues.put("auth_plugin_params_String", "token:" + token);
33+
String delimiter = "/";
34+
Map<String, Object> codecMap = new HashMap<>();
35+
codecMap.put("delimiter", delimiter);
36+
Configuration codecConf = new ConfigurationImpl(codecMap);
37+
configValues.put("codec", new Line(codecConf, null));
38+
}
39+
40+
@Test
41+
@Ignore()
42+
public void testOutputWithPulsarToken() {
43+
Configuration config = new ConfigurationImpl(configValues);
44+
Pulsar output = new Pulsar("test-id", config, null);
45+
String sourceField = "message";
46+
int eventCount = 5;
47+
List<Event> events = new ArrayList<>();
48+
for (int k = 0; k < eventCount; k++) {
49+
Event e = new org.logstash.Event();
50+
e.setField(sourceField, "message : hello test " + k);
51+
events.add(e);
52+
}
53+
//output.output(events);
54+
Assert.assertEquals("events size is 5", events.size(), eventCount);
55+
56+
int index = 0;
57+
while (index < eventCount) {
58+
Assert.assertTrue("event" + index + " contains the specified str", events.get(index).getField("message").toString().contains("message : hello test " + index));
59+
index++;
60+
}
61+
}
62+
63+
}

0 commit comments

Comments
 (0)