Skip to content

Commit 486ca91

Browse files
committed
Fix basic auth error in http-call sink and sse source
1 parent 9fa22a9 commit 486ca91

File tree

6 files changed

+132
-18
lines changed

6 files changed

+132
-18
lines changed

component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
*/
1919
package io.siddhi.extension.io.http.sink;
2020

21-
import io.netty.buffer.ByteBuf;
2221
import io.netty.buffer.Unpooled;
23-
import io.netty.handler.codec.base64.Base64;
2422
import io.netty.handler.codec.http.DefaultHttpRequest;
2523
import io.netty.handler.codec.http.DefaultLastHttpContent;
2624
import io.netty.handler.codec.http.HttpHeaders;
@@ -610,8 +608,8 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde
610608
} else if (!(EMPTY_STRING.equals(userName))) {
611609
byte[] val = (userName + HttpConstants.AUTH_USERNAME_PASSWORD_SEPARATOR + userPassword).getBytes(Charset
612610
.defaultCharset());
613-
this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + Base64.encode
614-
(Unpooled.copiedBuffer(val));
611+
this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + HttpIoUtil.encodeBase64(new String(val,
612+
StandardCharsets.UTF_8));
615613
}
616614

617615
proxyServerConfiguration = createProxyServerConfiguration(optionHolder, streamID, siddhiAppContext.getName());
@@ -755,7 +753,7 @@ protected void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, L
755753
} else {
756754
consumerKeyValue = bodyConsumerKey + ":" + bodyConsumerSecret;
757755
}
758-
String encodedAuth = "Basic " + encodeBase64(consumerKeyValue)
756+
String encodedAuth = "Basic " + HttpIoUtil.encodeBase64(consumerKeyValue)
759757
.replaceAll(HttpConstants.NEW_LINE, HttpConstants.EMPTY_STRING);
760758
//check the availability of access token in the header
761759
setAccessToken(encodedAuth, dynamicOptions, headersList, clientConnector.getPublisherURL());
@@ -1187,12 +1185,6 @@ private String encodeMessage(Object s) {
11871185
}
11881186
}
11891187

1190-
private String encodeBase64(String consumerKeyValue) {
1191-
ByteBuf byteBuf = Unpooled.wrappedBuffer(consumerKeyValue.getBytes(StandardCharsets.UTF_8));
1192-
ByteBuf encodedByteBuf = Base64.encode(byteBuf);
1193-
return encodedByteBuf.toString(StandardCharsets.UTF_8);
1194-
}
1195-
11961188
private class HTTPResponseListener implements HttpConnectorListener {
11971189
Object payload;
11981190
DynamicOptions dynamicOptions;

component/src/main/java/io/siddhi/extension/io/http/source/SSESource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package io.siddhi.extension.io.http.source;
1919

20-
import io.netty.buffer.Unpooled;
21-
import io.netty.handler.codec.base64.Base64;
2220
import io.netty.handler.codec.http.DefaultHttpRequest;
2321
import io.netty.handler.codec.http.HttpHeaders;
2422
import io.netty.handler.codec.http.HttpMethod;
@@ -44,6 +42,7 @@
4442
import io.siddhi.extension.io.http.sink.util.HttpSinkUtil;
4543
import io.siddhi.extension.io.http.util.HTTPSourceRegistry;
4644
import io.siddhi.extension.io.http.util.HttpConstants;
45+
import io.siddhi.extension.io.http.util.HttpIoUtil;
4746
import org.apache.logging.log4j.LogManager;
4847
import org.apache.logging.log4j.Logger;
4948
import org.wso2.carbon.messaging.Header;
@@ -57,6 +56,7 @@
5756
import org.wso2.transport.http.netty.message.HttpCarbonMessage;
5857

5958
import java.nio.charset.Charset;
59+
import java.nio.charset.StandardCharsets;
6060
import java.util.List;
6161
import java.util.Map;
6262
import java.util.Objects;
@@ -355,8 +355,8 @@ private String validateAndGetAuthType() {
355355
} else if (!(EMPTY_STRING.equals(userName))) {
356356
byte[] val = (userName + HttpConstants.AUTH_USERNAME_PASSWORD_SEPARATOR + userPassword).getBytes(Charset
357357
.defaultCharset());
358-
this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + Base64.encode
359-
(Unpooled.copiedBuffer(val));
358+
this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + HttpIoUtil.encodeBase64(new String(val,
359+
StandardCharsets.UTF_8));
360360
}
361361

362362
if (!HttpConstants.EMPTY_STRING.equals(userName) && !HttpConstants.EMPTY_STRING.equals(userPassword)) {

component/src/main/java/io/siddhi/extension/io/http/util/HttpIoUtil.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
*/
1919
package io.siddhi.extension.io.http.util;
2020

21+
import io.netty.buffer.ByteBuf;
2122
import io.netty.buffer.Unpooled;
23+
import io.netty.handler.codec.base64.Base64;
2224
import io.netty.handler.codec.http.DefaultHttpResponse;
2325
import io.netty.handler.codec.http.DefaultLastHttpContent;
2426
import io.netty.handler.codec.http.HttpHeaderNames;
@@ -48,6 +50,7 @@
4850

4951
import java.io.UnsupportedEncodingException;
5052
import java.nio.ByteBuffer;
53+
import java.nio.charset.StandardCharsets;
5154
import java.util.ArrayList;
5255
import java.util.Arrays;
5356
import java.util.HashMap;
@@ -334,4 +337,15 @@ public static CompiledCondition createTableDeleteResource(Map<String, Table> tab
334337
return table.compileCondition(condition, matchingMetaInfoHolder, null,
335338
tableMap, siddhiQueryContext);
336339
}
340+
341+
/**
342+
* Encode the given value using Base64 encoding scheme.
343+
* @param value value to be encoded
344+
* @return encoded value
345+
*/
346+
public static String encodeBase64(String value) {
347+
ByteBuf byteBuf = Unpooled.wrappedBuffer(value.getBytes(StandardCharsets.UTF_8));
348+
ByteBuf encodedByteBuf = Base64.encode(byteBuf);
349+
return encodedByteBuf.toString(StandardCharsets.UTF_8);
350+
}
337351
}

component/src/test/java/io/siddhi/extension/io/http/sink/HttpAuthTestCase.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@
2424
import io.siddhi.core.stream.input.InputHandler;
2525
import io.siddhi.extension.io.http.sink.exception.HttpSinkAdaptorRuntimeException;
2626
import io.siddhi.extension.io.http.sink.util.HttpServerListenerHandler;
27+
import io.siddhi.extension.io.http.source.util.Constants;
2728
import io.siddhi.extension.map.xml.sinkmapper.XMLSinkMapper;
2829
import io.siddhi.query.api.exception.SiddhiAppValidationException;
2930
import org.apache.logging.log4j.LogManager;
3031
import org.apache.logging.log4j.Logger;
3132
import org.testng.Assert;
3233
import org.testng.annotations.Test;
3334

35+
import java.util.List;
36+
3437
/**
3538
* test cases for basic authentication.
3639
*/
@@ -235,4 +238,55 @@ public void testHTTPWithoutURL() throws Exception {
235238
lst.shutdown();
236239
}
237240

241+
@Test(dependsOnMethods = "testHTTPWithoutURL")
242+
public void testHttpCallSinkBasicAuthTrue() throws Exception {
243+
log.info("Creating test for publishing events with basic authentication true.");
244+
SiddhiManager siddhiManager = new SiddhiManager();
245+
siddhiManager.setExtension("xml-output-mapper", XMLSinkMapper.class);
246+
String inStreamDefinition = "Define stream FooStream (message String,method String,headers String);"
247+
+ "@sink(type='http-call',sink.id='foo',publisher.url='http://localhost:8005/abc',method='{{method}}',"
248+
+ "headers='{{headers}}', basic.auth.username='admin',basic.auth.password='admin',"
249+
+ "@map(type='xml', "
250+
+ "@payload('{{message}}'))) "
251+
+ "Define stream BarStream (message String,method String,headers String);"
252+
+ "@source(type='http-call-response', sink.id='foo', @map(type='text', regex.A='(.*)', "
253+
+ "@attributes(message='A')))"
254+
+ "define stream ResponseStream(message string);";
255+
String query = ("@info(name = 'query') "
256+
+ "from FooStream "
257+
+ "select message,method,headers "
258+
+ "insert into BarStream;"
259+
);
260+
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(inStreamDefinition +
261+
query);
262+
InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
263+
siddhiAppRuntime.start();
264+
HttpServerListenerHandler lst = new HttpServerListenerHandler(8005);
265+
lst.run();
266+
String payload = "<events>"
267+
+ "<event>"
268+
+ "<symbol>WSO2</symbol>"
269+
+ "<price>55.645</price>"
270+
+ "<volume>100</volume>"
271+
+ "</event>"
272+
+ "</events>";
273+
fooStream.send(new Object[]{payload, "POST", "'Name:John','Age:23'"});
274+
while (!lst.getServerListener().isMessageArrive()) {
275+
Thread.sleep(10);
276+
}
277+
List<String> authHeader = lst.getServerListener().getHeaders().get(Constants.BASIC_AUTH_HEADER);
278+
String authHeaderValue = (authHeader != null && authHeader.size() > 0) ? authHeader.get(0) : null;
279+
Assert.assertEquals(authHeaderValue, Constants.BASIC_AUTH_HEADER_VALUE, "Invalid basic auth header present");
280+
String eventData = lst.getServerListener().getData();
281+
String expected = "<events>"
282+
+ "<event>"
283+
+ "<symbol>WSO2</symbol>"
284+
+ "<price>55.645</price>"
285+
+ "<volume>100</volume>"
286+
+ "</event>"
287+
+ "</events>\n";
288+
Assert.assertEquals(eventData, expected);
289+
siddhiAppRuntime.shutdown();
290+
lst.shutdown();
291+
}
238292
}

component/src/test/java/io/siddhi/extension/io/http/source/SSESourceTestCase.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import io.siddhi.core.util.SiddhiTestHelper;
2929
import io.siddhi.core.util.persistence.InMemoryPersistenceStore;
3030
import io.siddhi.core.util.persistence.PersistenceStore;
31+
import io.siddhi.extension.io.http.sink.util.HttpServerListenerHandler;
32+
import io.siddhi.extension.io.http.source.util.Constants;
3133
import io.siddhi.extension.io.http.util.HttpConstants;
3234
import io.siddhi.extension.map.json.sinkmapper.JsonSinkMapper;
3335
import io.siddhi.extension.map.json.sourcemapper.JsonSourceMapper;
@@ -60,7 +62,7 @@ public class SSESourceTestCase {
6062
private HttpServer sseServer;
6163
private ThreadPoolExecutor threadPoolExecutor;
6264

63-
@BeforeMethod
65+
@BeforeMethod(groups = "event-server")
6466
public void init() {
6567
eventCount.set(0);
6668
try {
@@ -104,7 +106,7 @@ public void handle(HttpExchange httpExchange) throws IOException {
104106
}
105107
}
106108

107-
@Test
109+
@Test(groups = "event-server")
108110
public void testSSESource() throws Exception {
109111
List<String> receivedEventList = new ArrayList<>(2);
110112
PersistenceStore persistenceStore = new InMemoryPersistenceStore();
@@ -147,7 +149,31 @@ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
147149
siddhiAppRuntime.shutdown();
148150
}
149151

150-
@AfterMethod
152+
@Test(dependsOnMethods = "testSSESource")
153+
public void testSSESourceWithBasicAuth() throws Exception {
154+
SiddhiManager siddhiManager = new SiddhiManager();
155+
siddhiManager.setExtension("json-output-mapper", JsonSinkMapper.class);
156+
siddhiManager.setExtension("json-input-mapper", JsonSourceMapper.class);
157+
String sourceStreamDefinition = "@Source(type='sse', receiver.url='http://localhost:8005/abc', " +
158+
"basic.auth.username='admin', basic.auth.password='admin',\n" +
159+
"@map(type='json'))\n" +
160+
"define stream ReceiveProductionStream (param1 string);\n";
161+
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(sourceStreamDefinition);
162+
siddhiAppRuntime.start();
163+
HttpServerListenerHandler lst = new HttpServerListenerHandler(8005);
164+
lst.run();
165+
while (!lst.getServerListener().isMessageArrive()) {
166+
Thread.sleep(10);
167+
}
168+
List<String> authHeader = lst.getServerListener().getHeaders().get(Constants.BASIC_AUTH_HEADER);
169+
String authHeaderValue = (authHeader != null && authHeader.size() > 0) ? authHeader.get(0) : null;
170+
Assert.assertEquals(authHeaderValue, Constants.BASIC_AUTH_HEADER_VALUE, "Invalid basic auth header present");
171+
siddhiAppRuntime.shutdown();
172+
lst.shutdown();
173+
}
174+
175+
176+
@AfterMethod(groups = "event-server")
151177
public void destroy() {
152178
sseServer.stop(1);
153179
threadPoolExecutor.shutdownNow();
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
3+
*
4+
* WSO2 LLC. licenses this file to you under the Apache License,
5+
* Version 2.0 (the "License"); you may not use this file except
6+
* in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing,
12+
* software distributed under the License is distributed on an
13+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
* KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations
16+
* under the License.
17+
*
18+
*/
19+
package io.siddhi.extension.io.http.source.util;
20+
21+
/**
22+
* Constants used by the test cases.
23+
*/
24+
public class Constants {
25+
public static final String BASIC_AUTH_HEADER = "Authorization";
26+
public static final String BASIC_AUTH_HEADER_VALUE = "Basic YWRtaW46YWRtaW4=";
27+
28+
}

0 commit comments

Comments
 (0)