Skip to content

Commit

Permalink
Specify the RabbitMQSink type as Bytes by default. (#4007)
Browse files Browse the repository at this point in the history
  • Loading branch information
murong00 authored and merlimat committed Apr 10, 2019
1 parent bc6472d commit 357284a
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.pulsar.io.core.annotations.IOType;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
Expand All @@ -44,7 +43,7 @@
configClass = RabbitMQSinkConfig.class
)
@Slf4j
public class RabbitMQSink<T> implements Sink<T> {
public class RabbitMQSink implements Sink<byte[]> {

private Connection rabbitMQConnection;
private Channel rabbitMQChannel;
Expand Down Expand Up @@ -76,8 +75,8 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
}

@Override
public void write(Record<T> record) {
byte[] value = toBytes(record.getValue());
public void write(Record<byte[]> record) {
byte[] value = record.getValue();
try {
rabbitMQChannel.basicPublish(exchangeName, routingKey, null, value);
record.ack();
Expand All @@ -96,17 +95,4 @@ public void close() throws Exception {
rabbitMQConnection.close();
}
}

private byte[] toBytes(Object obj) {
final byte[] result;
if (obj instanceof String) {
String s = (String) obj;
result = s.getBytes(StandardCharsets.UTF_8);
} else if (obj instanceof byte[]) {
result = (byte[]) obj;
} else {
throw new IllegalArgumentException("The value of the record must be String or Bytes.");
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -68,23 +69,23 @@ public void TestOpenAndWriteSink() throws Exception {
sink.open(configs, null);

// write should success
Record<String> record = build("test-topic", "fakeKey", "fakeValue");
Record<byte[]> record = build("test-topic", "fakeKey", "fakeValue");
sink.write(record);

sink.close();
}

private Record<String> build(String topic, String key, String value) {
private Record<byte[]> build(String topic, String key, String value) {
// prepare a SinkRecord
SinkRecord<String> record = new SinkRecord<>(new Record<String>() {
SinkRecord<byte[]> record = new SinkRecord<>(new Record<byte[]>() {
@Override
public Optional<String> getKey() {
return Optional.empty();
}

@Override
public String getValue() {
return key;
public byte[] getValue() {
return value.getBytes(StandardCharsets.UTF_8);
}

@Override
Expand All @@ -95,7 +96,7 @@ public Optional<String> getDestinationTopic() {
return Optional.empty();
}
}
}, value);
}, value.getBytes(StandardCharsets.UTF_8));
return record;
}
}

0 comments on commit 357284a

Please sign in to comment.