Skip to content

Commit

Permalink
fixed offset by kafka-2.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
smartloli committed Dec 24, 2018
1 parent 9febe9f commit fbd29b7
Showing 1 changed file with 53 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.smartloli.kafka.eagle.ipc;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -33,6 +34,8 @@
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartloli.kafka.eagle.common.protocol.offsets.KeyAndValueSchemasInfo;
Expand All @@ -42,6 +45,8 @@
import org.smartloli.kafka.eagle.core.factory.KafkaFactory;
import org.smartloli.kafka.eagle.core.factory.KafkaService;

import com.google.gson.Gson;

import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadata;
import kafka.common.Topic;
Expand Down Expand Up @@ -79,11 +84,16 @@ public class TestKafkaOffsetGetter extends Thread {
private static Field KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic");
private static Field KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition");

private static Schema OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", Type.INT64), new Field("metadata", Type.STRING, "Associated metadata.", ""),
new Field("timestamp", Type.INT64));
private static Schema OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", Type.INT64), new Field("metadata", Type.STRING, "Associated metadata.", ""), new Field("timestamp", Type.INT64));

private static Schema OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", Type.INT64), new Field("metadata", Type.STRING, "Associated metadata.", ""), new Field("commit_timestamp", Type.INT64),
new Field("expire_timestamp", Type.INT64));

private static Schema OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", Type.INT64), new Field("metadata", Type.STRING, "Associated metadata.", ""),
new Field("commit_timestamp", Type.INT64), new Field("expire_timestamp", Type.INT64));
private static Schema OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", Type.INT64), new Field("metadata", Type.STRING, "Associated metadata.", ""), new Field("commit_timestamp", Type.INT64));

/** GroupMetadataManager . */
private static Schema OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", Type.INT64), new Field("leader_epoch", Type.INT32), new Field("metadata", Type.STRING, "Associated metadata.", ""),
new Field("commit_timestamp", Type.INT64));

private static Field VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset");
private static Field VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata");
Expand All @@ -92,6 +102,15 @@ public class TestKafkaOffsetGetter extends Thread {
private static Field VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset");
private static Field VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata");
private static Field VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp");
private static Field OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp");

private static Field VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset");
private static Field VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata");
private static Field VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp");

private static Field VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset");
private static Field VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata");
private static Field VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp");
/** ============================ End Filter ========================= */

/** Kafka offset memory in schema. */
Expand All @@ -107,6 +126,17 @@ public class TestKafkaOffsetGetter extends Thread {
ks1.setKeySchema(OFFSET_COMMIT_KEY_SCHEMA_V0);
ks1.setValueSchema(OFFSET_COMMIT_VALUE_SCHEMA_V1);
put(1, ks1);

KeyAndValueSchemasInfo ks2 = new KeyAndValueSchemasInfo();
ks2.setKeySchema(OFFSET_COMMIT_KEY_SCHEMA_V0);
ks2.setValueSchema(OFFSET_COMMIT_VALUE_SCHEMA_V2);
put(2, ks2);

/** Fixed by kafka-2.1.0 version. */
KeyAndValueSchemasInfo ks3 = new KeyAndValueSchemasInfo();
ks3.setKeySchema(OFFSET_COMMIT_KEY_SCHEMA_V0);
ks3.setValueSchema(OFFSET_COMMIT_VALUE_SCHEMA_V3);
put(3, ks3);
}
};

Expand Down Expand Up @@ -170,7 +200,9 @@ private static synchronized void startOffsetSASLListener(String clusterAlias, Ka

/** Get instance K&V schema. */
private static KeyAndValueSchemasInfo schemaFor(int version) {
return OFFSET_SCHEMAS.get(version);
// return OFFSET_SCHEMAS.get(version);
KeyAndValueSchemasInfo kvsi = OFFSET_SCHEMAS.get(version);
return kvsi;
}

/** Analysis of Kafka data in topic in buffer. */
Expand Down Expand Up @@ -199,6 +231,17 @@ private static OffsetAndMetadata readMessageValue(ByteBuffer buffer) {
long offset = structAndVersion.getValue().getLong(VALUE_OFFSET_FIELD_V1);
String metadata = structAndVersion.getValue().getString(VALUE_METADATA_FIELD_V1);
long commitTimestamp = structAndVersion.getValue().getLong(VALUE_COMMIT_TIMESTAMP_FIELD_V1);
long expireTimestamp = structAndVersion.getValue().getLong(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1);
return new OffsetAndMetadata(new OffsetMetadata(offset, metadata), commitTimestamp, expireTimestamp);
} else if (structAndVersion.getVersion() == 2) {
long offset = structAndVersion.getValue().getLong(VALUE_OFFSET_FIELD_V2);
String metadata = structAndVersion.getValue().getString(VALUE_METADATA_FIELD_V2);
long commitTimestamp = structAndVersion.getValue().getLong(VALUE_COMMIT_TIMESTAMP_FIELD_V2);
return new OffsetAndMetadata(new OffsetMetadata(offset, metadata), commitTimestamp,commitTimestamp);
} else if (structAndVersion.getVersion() == 3) {
long offset = structAndVersion.getValue().getLong(VALUE_OFFSET_FIELD_V3);
String metadata = structAndVersion.getValue().getString(VALUE_METADATA_FIELD_V3);
long commitTimestamp = structAndVersion.getValue().getLong(VALUE_COMMIT_TIMESTAMP_FIELD_V3);
return new OffsetAndMetadata(new OffsetMetadata(offset, metadata), commitTimestamp, commitTimestamp);
} else {
throw new IllegalStateException("Unknown offset message version: " + structAndVersion.getVersion());
Expand All @@ -214,6 +257,7 @@ private static MessageValueStructAndVersionInfo readMessageValueStruct(ByteBuffe
mvs.setVersion(Short.valueOf("-1"));
} else {
short version = buffer.getShort();
KeyAndValueSchemasInfo kk = schemaFor(version);
Schema valueSchema = schemaFor(version).getValueSchema();
Struct value = (Struct) valueSchema.read(buffer);
mvs.setValue(value);
Expand All @@ -224,8 +268,8 @@ private static MessageValueStructAndVersionInfo readMessageValueStruct(ByteBuffe

static {
LOG.info("Initialize KafkaOffsetGetter clazz.");
// TestKafkaOffsetGetter kafka = new TestKafkaOffsetGetter();
// kafka.start();
// TestKafkaOffsetGetter kafka = new TestKafkaOffsetGetter();
// kafka.start();
}

/** Instance KafkaOffsetGetter clazz. */
Expand All @@ -252,10 +296,10 @@ public void run() {
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(Topic.GroupMetadataTopicName()));
startOffsetSASLListener(clusterAlias, consumer);
Expand Down

0 comments on commit fbd29b7

Please sign in to comment.