Skip to content

Commit

Permalink
kafka-2195; Add versionId to AbstractRequest.getErrorResponse and Abs…
Browse files Browse the repository at this point in the history
…tractRequest.getRequest; patched by Andrii Biletskyi; reviewed by Jun Rao
  • Loading branch information
abiletskyi authored and junrao committed Jun 16, 2015
1 parent 20a31a2 commit 54e54f0
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,33 @@ public AbstractRequest(Struct struct) {
}

/**
* Get an error response for a request
* Get an error response for a request for a given api version
*/
public abstract AbstractRequestResponse getErrorResponse(Throwable e);
public abstract AbstractRequestResponse getErrorResponse(int versionId, Throwable e);

/**
* Factory method for getting a request object based on ApiKey ID and a buffer
*/
public static AbstractRequest getRequest(int requestId, ByteBuffer buffer) {
public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
switch (ApiKeys.forId(requestId)) {
case PRODUCE:
return ProduceRequest.parse(buffer);
return ProduceRequest.parse(buffer, versionId);
case FETCH:
return FetchRequest.parse(buffer);
return FetchRequest.parse(buffer, versionId);
case LIST_OFFSETS:
return ListOffsetRequest.parse(buffer);
return ListOffsetRequest.parse(buffer, versionId);
case METADATA:
return MetadataRequest.parse(buffer);
return MetadataRequest.parse(buffer, versionId);
case OFFSET_COMMIT:
return OffsetCommitRequest.parse(buffer);
return OffsetCommitRequest.parse(buffer, versionId);
case OFFSET_FETCH:
return OffsetFetchRequest.parse(buffer);
return OffsetFetchRequest.parse(buffer, versionId);
case CONSUMER_METADATA:
return ConsumerMetadataRequest.parse(buffer);
return ConsumerMetadataRequest.parse(buffer, versionId);
case JOIN_GROUP:
return JoinGroupRequest.parse(buffer);
return JoinGroupRequest.parse(buffer, versionId);
case HEARTBEAT:
return HeartbeatRequest.parse(buffer);
return HeartbeatRequest.parse(buffer, versionId);
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,24 @@ public ConsumerMetadataRequest(Struct struct) {
}

@Override
public AbstractRequestResponse getErrorResponse(Throwable e) {
return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
switch(versionId) {
case 0:
return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONSUMER_METADATA.id)));
}
}

public String groupId() {
return groupId;
}

public static ConsumerMetadataRequest parse(ByteBuffer buffer, int versionId) {
return new ConsumerMetadataRequest(ProtoUtils.parseRequest(ApiKeys.CONSUMER_METADATA.id, versionId, buffer));
}

public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
return new ConsumerMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,23 @@ public FetchRequest(Struct struct) {
}

@Override
public AbstractRequestResponse getErrorResponse(Throwable e) {
public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();

for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.EMPTY_RECORD_SET);
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.EMPTY_RECORD_SET);
responseData.put(entry.getKey(), partitionResponse);
}

return new FetchResponse(responseData);
switch(versionId) {
case 0:
return new FetchResponse(responseData);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));
}
}

public int replicaId() {
Expand All @@ -149,6 +155,10 @@ public Map<TopicPartition, PartitionData> fetchData() {
return fetchData;
}

public static FetchRequest parse(ByteBuffer buffer, int versionId) {
return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer));
}

public static FetchRequest parse(ByteBuffer buffer) {
return new FetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ public HeartbeatRequest(Struct struct) {
consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
}

@Override
public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
switch(versionId) {
case 0:
return new HeartbeatResponse(Errors.forException(e).code());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id)));
}
}

public String groupId() {
return groupId;
}
Expand All @@ -60,12 +71,11 @@ public String consumerId() {
return consumerId;
}

public static HeartbeatRequest parse(ByteBuffer buffer) {
return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {
return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer));
}

@Override
public AbstractRequestResponse getErrorResponse(Throwable e) {
return new HeartbeatResponse(Errors.forException(e).code());
public static HeartbeatRequest parse(ByteBuffer buffer) {
return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
Expand All @@ -20,6 +21,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class JoinGroupRequest extends AbstractRequest {
Expand Down Expand Up @@ -65,6 +67,21 @@ public JoinGroupRequest(Struct struct) {
strategy = struct.getString(STRATEGY_KEY_NAME);
}

@Override
public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
return new JoinGroupResponse(
Errors.forException(e).code(),
JoinGroupResponse.UNKNOWN_GENERATION_ID,
JoinGroupResponse.UNKNOWN_CONSUMER_ID,
Collections.<TopicPartition>emptyList());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
}
}

public String groupId() {
return groupId;
}
Expand All @@ -85,12 +102,11 @@ public String strategy() {
return strategy;
}

public static JoinGroupRequest parse(ByteBuffer buffer) {
return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, buffer));
}

@Override
public AbstractRequestResponse getErrorResponse(Throwable e) {
return new JoinGroupResponse(Errors.forException(e).code());
public static JoinGroupRequest parse(ByteBuffer buffer) {
return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ public JoinGroupResponse(short errorCode, int generationId, String consumerId, L
this.assignedPartitions = assignedPartitions;
}

public JoinGroupResponse(short errorCode) {
this(errorCode, UNKNOWN_GENERATION_ID, UNKNOWN_CONSUMER_ID, Collections.<TopicPartition>emptyList());
}

public JoinGroupResponse(Struct struct) {
super(struct);
assignedPartitions = new ArrayList<TopicPartition>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,21 @@ public ListOffsetRequest(Struct struct) {
}

@Override
public AbstractRequestResponse getErrorResponse(Throwable e) {
public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();

for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList<Long>());
responseData.put(entry.getKey(), partitionResponse);
}

return new ListOffsetResponse(responseData);
switch(versionId) {
case 0:
return new ListOffsetResponse(responseData);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id)));
}
}

public int replicaId() {
Expand All @@ -126,6 +132,10 @@ public Map<TopicPartition, PartitionData> offsetData() {
return offsetData;
}

public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) {
return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer));
}

public static ListOffsetRequest parse(ByteBuffer buffer) {
return new ListOffsetRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
Expand Down Expand Up @@ -47,18 +50,30 @@ public MetadataRequest(Struct struct) {
}

@Override
public AbstractRequestResponse getErrorResponse(Throwable e) {
public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
Map<String, Errors> topicErrors = new HashMap<String, Errors>();
for (String topic: topics) {
for (String topic : topics) {
topicErrors.put(topic, Errors.forException(e));
}
return new MetadataResponse(topicErrors);

Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
switch (versionId) {
case 0:
return new MetadataResponse(cluster, topicErrors);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id)));
}
}

public List<String> topics() {
return topics;
}

public static MetadataRequest parse(ByteBuffer buffer, int versionId) {
return new MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer));
}

public static MetadataRequest parse(ByteBuffer buffer) {
return new MetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.kafka.common.protocol.types.Struct;

public class MetadataResponse extends AbstractRequestResponse {

private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
private static final String BROKERS_KEY_NAME = "brokers";
private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
Expand Down Expand Up @@ -67,30 +67,15 @@ public class MetadataResponse extends AbstractRequestResponse {
private final Cluster cluster;
private final Map<String, Errors> errors;

/* Constructor for error responses where most of the data, except error per topic, is irrelevant */
public MetadataResponse(Map<String, Errors> topicErrors) {
super(new Struct(CURRENT_SCHEMA));

struct.set(BROKERS_KEY_NAME, new ArrayList<Struct>().toArray());
List<Struct> topicArray = new ArrayList<Struct>();
for (Map.Entry<String, Errors> topicError : topicErrors.entrySet()) {
Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
topicData.set(TOPIC_ERROR_CODE_KEY_NAME, topicError.getValue().code());
topicData.set(TOPIC_KEY_NAME, topicError.getKey());
topicData.set(PARTITION_METADATA_KEY_NAME, new ArrayList<Struct>().toArray());
topicArray.add(topicData);
}
struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());

this.errors = topicErrors;
this.cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
}

public MetadataResponse(Cluster cluster) {
/**
* Constructor for MetadataResponse where there are errors for some of the topics,
* error data take precedence over cluster information for particular topic
*/
public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
super(new Struct(CURRENT_SCHEMA));

List<Struct> brokerArray = new ArrayList<Struct>();
for (Node node: cluster.nodes()) {
for (Node node : cluster.nodes()) {
Struct broker = struct.instance(BROKERS_KEY_NAME);
broker.set(NODE_ID_KEY_NAME, node.id());
broker.set(HOST_KEY_NAME, node.host());
Expand All @@ -100,27 +85,33 @@ public MetadataResponse(Cluster cluster) {
struct.set(BROKERS_KEY_NAME, brokerArray.toArray());

List<Struct> topicArray = new ArrayList<Struct>();
for (String topic: cluster.topics()) {
for (String topic : cluster.topics()) {
Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());

topicData.set(TOPIC_KEY_NAME, topic);
List<Struct> partitionArray = new ArrayList<Struct>();
for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
ArrayList<Integer> replicas = new ArrayList<Integer>();
for (Node node: fetchPartitionData.replicas())
replicas.add(node.id());
partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
ArrayList<Integer> isr = new ArrayList<Integer>();
for (Node node: fetchPartitionData.inSyncReplicas())
isr.add(node.id());
partitionData.set(ISR_KEY_NAME, isr.toArray());
partitionArray.add(partitionData);
if (errors.containsKey(topic)) {
topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errors.get(topic).code());
} else {
topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
List<Struct> partitionArray = new ArrayList<Struct>();
for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
ArrayList<Integer> replicas = new ArrayList<Integer>();
for (Node node : fetchPartitionData.replicas())
replicas.add(node.id());
partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
ArrayList<Integer> isr = new ArrayList<Integer>();
for (Node node : fetchPartitionData.inSyncReplicas())
isr.add(node.id());
partitionData.set(ISR_KEY_NAME, isr.toArray());
partitionArray.add(partitionData);
}
topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
}
topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());

topicArray.add(topicData);
}
struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
Expand Down Expand Up @@ -183,4 +174,4 @@ public Cluster cluster() {
public static MetadataResponse parse(ByteBuffer buffer) {
return new MetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
}
}
}
Loading

0 comments on commit 54e54f0

Please sign in to comment.