Web Socket Proxy Server provides a simple way to interact with Pulsar under WSS
protocol.
- When a wss-producer was registered, Web Socket Proxy Server will create a one-to-one producer to actually send messages to the Broker.
- When a wss-consumer was registered, Web Socket Proxy Server will create a one-to-one consumer to actually receive messages from the Broker and send them to WSS Consumer.
- Solution 1: encrypt message payload before WSS Producer sends messages, and decrypt after WSS Consumer receives messages. If the user wants to use different encryption keys for different messages, they can set a property into messages to indicate the message was encrypted by which key. But this solution has a shortcoming: if the user also has consumers with Java clients, then these consumers cannot auto-decrypt the messages(Normally, java clients can decrypt messages automatically). And the benefit of this solution is that the user does not need to expose the private key to Web Socket Proxy Server.
- Solution 2: In the release
2.11
, there is a feature that provides a way to set encrypt keys for the internal producers and consumers of Web Socket Proxy Server, but needs the user to upload both public key and private key into the Web Socket Proxy Server(in other words: user should expose the keys to Web Socket Proxy Server), there is a un-recommended workaround for this shortcoming[1]. The benefit is that the WSS producer and WSS consumer should not care about encryption and decryption.
- The Producer will composite several message payloads into a batched message payload if the producer is enabled batch;
- The Producer will compress the batched message payload to a compressed payload if enabled compression;
- After the previous two steps, the Producer encrypts the compressed payload to an encrypted payload.
The Construction of the Encrypt Context:
{
"batchSize": 2, // How many single messages are in the batch. If null, it means it is not a batched message.
"compressionType": "NONE", // the compression type.
"uncompressedMessageSize": 0, // the size of the uncompressed payload.
"keys": {
"client-rsa.pem": { // key name.
"keyValue": "asdvfdw==", // key value.
"metadata": {} // extra props of the key.
}
},
"param": "Tfu1PxVm6S9D3+Hk" // the IV of current encryption for this message.
}
All the fields of Encrypt Context are used to parse the encrypted message payload.
keys
andparam
are used to decrypt the encrypted message payload.compressionType
anduncompressedMessageSize
are used to uncompress the compressed message payload.batchSize
is used to extract the batched message payload.
There is another attribute named encryptionAlgo
used to identify what encrypt algo is using, it is an optional attribute, so there is no such property in Encrypt Context.
When the internal consumer of the Web Socket Proxy Server receives a message, if the message metadata indicates that the message is encrypted, the consumer will add Encrypt Context into the response for the WSS consumer.
CryptoKeyReader
: an interface that requires users to implement to read public key and private key.MessageCrypto
: a tool interface to encrypt and decrypt the message payload and add and extract encryption information for message metadata.
Therefore, there is no way to enable encryption under the WSS protocol and meet the following conditions:
- WSS producer and WSS consumer did encrypt and decrypt themselves and did not share private keys to Web Socket Proxy Server.
- Other clients(such as Java and CPP) can automatically decrypt the messages which WSS producer sent.
Provide a way to make Web Socket Proxy Server just passes encrypt information to the client, the WSS producer and WSS consumer did encrypt and decrypt themselves.
Since the order of producer operation for message payloads is compression --> encryption,
users need to handle Compression themselves if needed.
If other clients(such as Java, CPP) are sending messages to the topic that the WSS consumer was subscribed to, it is possible that there are some batched messages in the topic, then the WSS consumer will inevitably receive the batched messages. Since the order of consumer operation for message payload is deencryption --> un-compression --> extract the batched messages
, users need to handle Un-compression and Extract Batch Messages themselves.
This proposal does not intend to support the three features:
- Support publishing "Null value messages" for WSS producers.
- Support publishing "Chunked messages" for WSS producers.
- Support publishing "Batched messages" for WSS producers.
For WSS producers:
Modify the definition of parameter encryptionKeys
to make it can set in two ways:
- The original mode: If the producer registered with a string parameter
encryptionKeys
, then Web Socket Proxy Server will still work in the original way, which is defined in the PIP Support encryption in Web Socket Proxy Server - The new mode: If a producer registered with a JSON parameter
encryptionKeys
, and theencryptionKeys[{key_name}].keyValue
is not empty, Web Socket Proxy Server will mark this Producer as Client-Side Encryption Producer, then discard server-side batch messages, server-side compression, and server-side encryption. The constructor ofencryptionKeys
is like below:
{
"client-ecdsa.pem": {
"keyValue": "BDJfN+Iw==",
"metadata": {
"k1": "v1"
}
}
}
For WSS consumers: Users can set the parameter cryptoFailureAction
to CONSUME
to directly receive the undecrypted message payload (it was supported before).
For the producers marked as Client-Side Encryption Producers:
- forcefully set the component
CryptoKeyReader
toDummyCryptoKeyReaderImpl
.DummyCryptoKeyReaderImpl
: doesn't provide any public key or private key, and just returnsnull
.
- forcefully set the component
MessageCrypto
toWSSDummyMessageCryptoImpl
to skip the message Server-Side encryption.WSSDummyMessageCryptoImpl
: only set the encryption info into the message metadata and discard payload encryption.
- forcefully set
enableBatching
tofalse
to skip Server-Side batch messages building, and print a log if the discarded parametersenableBatching
,batchingMaxMessages
,maxPendingMessages
,batchingMaxPublishDelay
were set. - forcefully set the
CompressionType
toNone
to skip the Server-Side compression, and print a log if the discarded parametercompressionType
was set. - forcefully set the param
enableChunking
tofalse
(the default value isfalse
) to prevent unexpected problems if the default setting is changed in the future.
For the client-side encryption consumers:
- To avoid too many warning logs: after setting the config
cryptoFailureAction
of the consumer isCONSUME
, just print anDEBUG
level log when receiving an encrypted message if the consumer could not decrypt it(the original log level isWARN
).
Define a new mode for the parameter encryptionKeys
:
param name | description | constructor (before encode) |
---|---|---|
encryptionKeys |
Base64 encoded and URL encoded and JSON formatted encryption keys | Map<String, EncryptionKey> |
Add JSON attributes below:
param name | description | constructor (before encode) |
---|---|---|
compressionType |
Compression type. Do not set it if compression is not performed | CompressionType |
uncompressedMessageSize |
The size of the payload before compression. Do not set it if compression is not performed | int |
encryptionParam |
Base64 encoded serialized initialization vector used when the client encrypts | byte[] |
public void connect() {
String protocolAndHostPort = "ws://localhost:55217";
String topicName = "perssitent://public/default/tp1";
String keys = ```
{
"client-ecdsa.pem": {
"keyValue": "BDJf/72DhLRs0C0/U+vkykeIBfXaaJiwpqPVgWJvV7B7GwqIMvY6OFXdFvi0gx7Co/0xO7vKTHLQP8GZAt8DWrsCb8W1jhxmOjpThHBaksXG0kN+Iw==",
"metadata": {
"k1": "v1"
}
}
}
```
StringBuilder producerUrL = new StringBuilder(protocolAndHostPort)
.append("/ws/v2/producer/persistent/")
.append(topicName)
.append("?")
.append("encryptionKeys=").append(base64AndURLEncode(keys));
WebSocketClient wssClient = new WebSocketClient();
wssClient.start();
Session session = wssClient.connect(this, producerUrL, new ClientUpgradeRequest()).get();
}
public void sendMessage() {
byte[] payload = "msg-123".getBytes(UTF-8); // [109, 115, 103, 45, 49, 50, 51]
String msgKey = "client-ecdsa.pem";
// Compression if needed(optional).
CompressionType compressionType = CompressionType.LZ4;
msg.uncompressedMessageSize = 5;
byte[] compressedPayload = compress(payload); // [109, 115, 103, 45, 49, 50, 51]
// Encrypt if needed.
bytes[] encryptionParam = getEncryptionParam(); // [-10, -5, -124, 23, 14, -122, 30, 127, 64, 63, 85, -79]
String base64EncodedEncryptionParam = base64Encode(encryptionParam); // 9vuEFw6GHn9AP1Wx
bytes[] encryptedPayload = encrypt(compressedPayload, encryptionParam); // H2RbToHyfXrAUJq3kCC81wlmpGRU5l4=
// Do send.
ProducerMessage msg = new ProducerMessage();
msg.key = msgKey;
msg.payload = encryptedPayload;
msg.encryptionParam = base64EncodedEncryptionParam;
msg.compressionType = compressionType;
msg.uncompressedMessageSize = uncompressedMessageSize;
this.session.getRemote().sendString(toJSON(msg));
}
public void connect() {
String protocolAndHostPort = "ws://localhost:55217";
String topicName = "perssitent://public/default/tp1";
StringBuilder consumerUri = new StringBuilder(protocolAndHostPort)
.append("/ws/v2/consumer/persistent/")
.append(topicName)
.append("/")
.append(subscriptionName)
.append("?")
.append("subscriptionType=").append(subscriptionType.toString())
// Set "cryptoFailureAction" to "CONSUME".
.append("&").append("cryptoFailureAction=CONSUME");
WebSocketClient wssClient = new WebSocketClient();
wssClient.start();
Session session = wssClient.connect(this, buildConnectURL(), new ClientUpgradeRequest()).get();
}
public byte[] messageReceived(String text) {
/**
* A demo of the parameter "text":
* {
* "messageId": "CAcQADAA",
* "payload": "ApU16CsV0iHO2zbX7T22jhGMzdjE5drm",
* "properties": {},
* "publishTime": "2023-08-22T02:40:32.856+08:00",
* "redeliveryCount": 0,
* "encryptionContext": {
* "keys": {
* "client-ecdsa.pem": {
* "keyValue": "BMQKA==",
* "metadata": {
* "k1": "v1"
* },
* "param": "SnqNyjPetp1dGBa6",
* "compressionType": "LZ4",
* "uncompressedMessageSize": 7,
* "batchSize": null
* }
* }
*/
ConsumerMessage msg = parseJsonToObject(text);
/**
* The constructor of encryptionContext:
* {
* "client-ecdsa.pem": {
* "keyValue": "BMQKA==",
* "metadata": {
* "k1": "v1"
* }
* }
* }
*/
EncryptionContext encryptionContext = msg.encryptionContext;
// base64Decode and decrypt message payload.
byte[] decryptedPayload = decrypt(base64Decode(msg.payload), encryptionContext);
//Un-compress is needed.
byte[] unCompressedPayload = unCompressIfNeeded(decryptedPayload);
return unCompressedPayload;
}
- Pub & Sub with WSS producer and consumer.
- compression & decryption.
- Pub with Java client library and Sub with WSS consumers.
- non-compression & decryption.
- compression & decryption.
- compression & decryption & batch send.
- Pub with WSS protocol and Sub with Java client library(verify it can auto decompression, decryption).
- non-compression & decryption.
- compression & decryption.
[1]: A workaround to avoid exposing the private key to Web Socket Proxy Server(should expose the public key to Web Socket Proxy Server). A quick background: there are three policies when a consumer cannot describe the message payload:
- CONSUME: it responds to the user's original message payload and prints a warning log.
- DISCARD: discard this message.
- FAIL: add this message into
unackMessagesTracker.
How this message is ultimately handled depends on the policy of unacknowledged messages.
Workaround
- Set
cryptoFailureAction
toCONSUME
for the WSS consumer - Make the return value
EncryptionKeyInfo
tonull
for theCryptoKeyReader
. This will make the internal consumer of Web Socket Proxy Server decrypt message payload fail.
Then the flow of Pub & Sub will be executed like the following:
- Users do not encrypt message payload before the WSS producer sends messages.
- The internal producer of WebSocket does message payload encryption by the feature: Support encryption in Web Socket Proxy Server
- The decryption of the internal consumer of Web Socket Proxy Server message payload will be failed, and just send original message payload to the users.
- Users decrypt the message payload themself.