Skip to content

Commit

Permalink
[fix][broker] Reformat property in generateResponseWithEntry (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
StevenLuMT authored Jun 30, 2023
1 parent 200fb56 commit e098ff1
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.github.zafarkhaja.semver.Version;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -3087,9 +3088,11 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {

ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
for (KeyValue keyValue : metadata.getPropertiesList()) {
responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
}

Map<String, String> properties = metadata.getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue, (v1, v2) -> v2));
responseBuilder.header("X-Pulsar-PROPERTY", new Gson().toJson(properties));

if (brokerEntryMetadata != null) {
if (brokerEntryMetadata.hasBrokerTimestamp()) {
responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-timestamp",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,31 @@ public void testGetMessageById() throws Exception {
}
}

@Test
public void testGetMessageById4SpecialPropsInMsg() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1";
admin.topics().createNonPartitionedTopic(topicName1);
Map<String, String> inSpecialProps = new HashMap<>();
inSpecialProps.put("city=shanghai", "tag");
inSpecialProps.put("city,beijing", "haidian");
@Cleanup
ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName1)
.enableBatching(false).create();
String data1 = "test1";
MessageIdImpl id1 = (MessageIdImpl) producer1.newMessage().value(data1.getBytes()).properties(inSpecialProps)
.send();

Message<byte[]> message1 = admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId());
Assert.assertEquals(message1.getData(), data1.getBytes());
Map<String, String> outSpecialProps = message1.getProperties();
for (String k : inSpecialProps.keySet()) {
Assert.assertEquals(inSpecialProps.get(k), outSpecialProps.get(k));
}
}

@Test
public void testGetMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.admin.internal;

import static com.google.common.base.Preconditions.checkArgument;
import com.google.gson.Gson;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.InputStream;
Expand Down Expand Up @@ -1391,9 +1392,10 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response

for (Entry<String, List<Object>> entry : headers.entrySet()) {
String header = entry.getKey();
if (header.contains("X-Pulsar-PROPERTY-")) {
String keyName = header.substring("X-Pulsar-PROPERTY-".length());
properties.put(keyName, (String) entry.getValue().get(0));
if ("X-Pulsar-PROPERTY".equals(header)) {
Map<String, String> msgPropsTmp = new Gson().fromJson((String) entry.getValue().get(0), Map.class);
properties.putAll(msgPropsTmp);
break;
}
}

Expand Down

0 comments on commit e098ff1

Please sign in to comment.