Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement handling of Kafka header values of type int, long and short #1352

Merged
merged 1 commit into from
Jan 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.micronaut.context.annotation.Value;
import kafka.coordinator.group.GroupMetadataManager;
import kafka.coordinator.transaction.TransactionLog;
import kafka.coordinator.transaction.TxnKey;
import lombok.*;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.utils.AvroToJsonDeserializer;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.ContentUtils;
import org.akhq.utils.ProtobufToJsonDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -115,7 +115,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.bytesValue = bytesValue;
this.valueSchemaId = getAvroSchemaId(this.bytesValue);
for (Header header: record.headers()) {
String headerValue = header.value() != null ? new String(header.value()) : null;
String headerValue = String.valueOf(ContentUtils.convertToObject(header.value()));
this.headers.add(new KeyValue<>(header.key(), headerValue));
}

Expand Down
93 changes: 93 additions & 0 deletions src/main/java/org/akhq/utils/ContentUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.akhq.utils;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.regex.Pattern;

public class ContentUtils {

/**
* Detects if bytes contain a UTF-8 string or something else
* Source: https://stackoverflow.com/questions/1193200/how-can-i-check-whether-a-byte-array-contains-a-unicode-string-in-java
* @param value the bytes to test for a UTF-8 encoded {@code java.lang.String} value
* @return true, if the byte[] contains a UTF-8 encode {@code java.lang.String}, false if it hold something else (e.g. a {@code int)
* @throws UnsupportedEncodingException
*/
private static boolean isValidUTF8(byte[] value) throws UnsupportedEncodingException
{
Pattern p = Pattern.compile("\\A(\n" +
" [\\x09\\x0A\\x0D\\x20-\\x7E] # ASCII\\n" +
"| [\\xC2-\\xDF][\\x80-\\xBF] # non-overlong 2-byte\n" +
"| \\xE0[\\xA0-\\xBF][\\x80-\\xBF] # excluding overlongs\n" +
"| [\\xE1-\\xEC\\xEE\\xEF][\\x80-\\xBF]{2} # straight 3-byte\n" +
"| \\xED[\\x80-\\x9F][\\x80-\\xBF] # excluding surrogates\n" +
"| \\xF0[\\x90-\\xBF][\\x80-\\xBF]{2} # planes 1-3\n" +
"| [\\xF1-\\xF3][\\x80-\\xBF]{3} # planes 4-15\n" +
"| \\xF4[\\x80-\\x8F][\\x80-\\xBF]{2} # plane 16\n" +
")*\\z", Pattern.COMMENTS);

String phonyString = new String(value, "ISO-8859-1");
return p.matcher(phonyString).matches();
}

/**
* Converts bytes to long.
*
* @param value the bytes to convert in to a long
* @return the long build from the given bytes
*/
private static Long asLong(byte[] value) {
return value != null ? ByteBuffer.wrap(value).getLong() : null;
}

/**
* Converts the given bytes to {@code int}.
*
* @param value the bytes to convert into a {@code int}
* @return the {@code int} build from the given bytes
*/
private static Integer asInt(byte[] value) {
return value != null ? ByteBuffer.wrap(value).getInt() : null;
}

/**
* Converts the given bytes to {@code short}.
*
* @param value the bytes to convert into a {@code short}
* @return the {@code short} build from the given bytes
*/
private static Short asShort(byte[] value) {
return value != null ? ByteBuffer.wrap(value).getShort() : null;
}

/**
* Converts the given bytes either into a {@code java.lang.string}, {@code int}, {@code long} or {@code short} depending on the content it contains.
* @param value the bytes to convert
* @return the value as an {@code java.lang.string}, {@code int}, {@code long} or {@code short}
*/
public static Object convertToObject(byte[] value) {
Object valueAsObject = null;

if (value != null) {
try {
if (ContentUtils.isValidUTF8(value)) {
valueAsObject = new String(value);
} else {
try {
valueAsObject = ContentUtils.asLong(value);
} catch (Exception e) {
try {
valueAsObject = ContentUtils.asInt(value);
} catch (Exception ex) {
valueAsObject = ContentUtils.asShort(value);
}
}
}
} catch(UnsupportedEncodingException ex) {
valueAsObject = "[encoding error]";
}
}
return valueAsObject;
}

}
73 changes: 73 additions & 0 deletions src/test/java/org/akhq/utils/ContentUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.akhq.utils;

import org.akhq.models.Record;
import org.apache.kafka.common.header.Header;
import org.junit.jupiter.api.Test;

import javax.swing.text.AbstractDocument;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class ContentUtilsTest {

private static byte[] toBytes(Short value) {
ByteBuffer buffer = ByteBuffer.allocate(Short.BYTES);
buffer.putShort(value);
return buffer.array();
}

private static byte[] toBytes(Integer value) {
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.putInt(value);
return buffer.array();
}

private static byte[] toBytes(Long value) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(value);
return buffer.array();
}

private static byte[] toBytes(Float value) {
ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES);
buffer.putFloat(value);
return buffer.array();
}

private static byte[] toBytes(Double value) {
ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
buffer.putDouble(value);
return buffer.array();
}

@Test
void testHeaderValueStringUTF8() {
String testValue = "Test";

assertEquals(testValue, ContentUtils.convertToObject(testValue.getBytes(StandardCharsets.UTF_8)));
}

@Test
void testHeaderValueInteger() {
int testValue = 1;

assertEquals(testValue, ContentUtils.convertToObject(toBytes(testValue)));
}

@Test
void testHeaderValueLong() {
long testValue = 111l;

assertEquals(testValue, ContentUtils.convertToObject(toBytes(testValue)));
}

@Test
void testHeaderValueShort() {
short testValue = 10;

assertEquals(testValue, ContentUtils.convertToObject(toBytes(testValue)));
}

}