Skip to content

Commit

Permalink
KAFKA-2136; Add throttle time (on quota violation) in fetch/produce
Browse files Browse the repository at this point in the history
responses; reviewed by Joel Koshy, Dong Lin and Jun Rao
  • Loading branch information
auradkar authored and jjkoshy committed Aug 26, 2015
1 parent 5d453ba commit 436b7dd
Show file tree
Hide file tree
Showing 24 changed files with 463 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
}
this.sensors.bytesFetched.record(totalBytes);
this.sensors.recordsFetched.record(totalCount);
this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}
this.sensors.fetchLatency.record(resp.requestLatencyMs());
}
Expand Down Expand Up @@ -493,6 +494,7 @@ private class FetchManagerMetrics {
public final Sensor recordsFetched;
public final Sensor fetchLatency;
public final Sensor recordsFetchLag;
public final Sensor fetchThrottleTimeSensor;


public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
Expand Down Expand Up @@ -542,6 +544,17 @@ public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String,
this.metricGrpName,
"The maximum lag in terms of number of records for any partition in this window",
tags), new Max());

this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-avg",
this.metricGrpName,
"The average throttle time in ms",
tags), new Avg());

this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-max",
this.metricGrpName,
"The maximum throttle time in ms",
tags), new Max());
}

public void recordTopicFetchMetrics(String topic, int bytes, int records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition,
completeBatch(batch, error, partResp.baseOffset, correlationId, now);
}
this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
this.sensors.recordThrottleTime(response.request().request().destination(),
produceResponse.getThrottleTime());
} else {
// this is the acks = 0 case, just complete all requests
for (RecordBatch batch : batches.values())
Expand Down Expand Up @@ -352,6 +354,7 @@ private class SenderMetrics {
public final Sensor batchSizeSensor;
public final Sensor compressionRateSensor;
public final Sensor maxRecordSizeSensor;
public final Sensor produceThrottleTimeSensor;

public SenderMetrics(Metrics metrics) {
this.metrics = metrics;
Expand Down Expand Up @@ -381,6 +384,12 @@ public SenderMetrics(Metrics metrics) {
m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags);
this.requestTimeSensor.add(m, new Max());

this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
m = new MetricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms", metricTags);
this.produceThrottleTimeSensor.add(m, new Avg());
m = new MetricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms", metricTags);
this.produceThrottleTimeSensor.add(m, new Max());

this.recordsPerRequestSensor = metrics.sensor("records-per-request");
m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags);
this.recordsPerRequestSensor.add(m, new Rate());
Expand Down Expand Up @@ -515,6 +524,11 @@ public void recordLatency(String node, long latency) {
nodeRequestTime.record(latency, now);
}
}

public void recordThrottleTime(String node, long throttleTimeMs) {
this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,25 @@ public class Protocol {
INT16),
new Field("base_offset",
INT64))))))));

public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0};
public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0};
public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0;

public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
new ArrayOf(new Schema(new Field("topic", STRING),
new Field("partition_responses",
new ArrayOf(new Schema(new Field("partition",
INT32),
new Field("error_code",
INT16),
new Field("base_offset",
INT64))))))),
new Field("throttle_time_ms",
INT32,
"Duration in milliseconds for which the request was throttled" +
" due to quota violation. (Zero if the request did not violate any quota.)",
0));

public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1};
public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1};

/* Offset commit api */
public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
Expand Down Expand Up @@ -342,6 +358,9 @@ public class Protocol {
new ArrayOf(FETCH_REQUEST_TOPIC_V0),
"Topics to fetch."));

// The V1 Fetch Request body is the same as V0.
// Only the version number is incremented to indicate a newer client
public static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0;
public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
Expand All @@ -357,9 +376,16 @@ public class Protocol {

public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));

public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0};
public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0};
public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms",
INT32,
"Duration in milliseconds for which the request was throttled" +
" due to quota violation. (Zero if the request did not violate any quota.)",
0),
new Field("responses",
new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));

public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1};
public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1};

/* Consumer metadata api */
public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {

switch (versionId) {
case 0:
return new FetchResponse(responseData);
return new FetchResponse(responseData, 0);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.List;
import java.util.Map;

/**
* This wrapper supports both v0 and v1 of FetchResponse.
*/
public class FetchResponse extends AbstractRequestResponse {

private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
Expand All @@ -37,12 +40,16 @@ public class FetchResponse extends AbstractRequestResponse {
// topic level field names
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITIONS_KEY_NAME = "partition_responses";
private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";

// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";

/**
// Default throttle time
private static final int DEFAULT_THROTTLE_TIME = 0;

/**
* Possible error code:
*
* OFFSET_OUT_OF_RANGE (1)
Expand All @@ -59,6 +66,7 @@ public class FetchResponse extends AbstractRequestResponse {
public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);

private final Map<TopicPartition, PartitionData> responseData;
private final int throttleTime;

public static final class PartitionData {
public final short errorCode;
Expand All @@ -72,8 +80,50 @@ public PartitionData(short errorCode, long highWatermark, ByteBuffer recordSet)
}
}

/**
* Constructor for Version 0
* @param responseData fetched data grouped by topic-partition
*/
public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0)));
initCommonFields(responseData);
this.responseData = responseData;
this.throttleTime = DEFAULT_THROTTLE_TIME;
}

/**
* Constructor for Version 1
* @param responseData fetched data grouped by topic-partition
* @param throttleTime Time in milliseconds the response was throttled
*/
public FetchResponse(Map<TopicPartition, PartitionData> responseData, int throttleTime) {
super(new Struct(CURRENT_SCHEMA));
initCommonFields(responseData);
struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
this.responseData = responseData;
this.throttleTime = throttleTime;
}

public FetchResponse(Struct struct) {
super(struct);
responseData = new HashMap<TopicPartition, PartitionData>();
for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
Struct topicResponse = (Struct) topicResponseObj;
String topic = topicResponse.getString(TOPIC_KEY_NAME);
for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
Struct partitionResponse = (Struct) partitionResponseObj;
int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
responseData.put(new TopicPartition(topic, partition), partitionData);
}
}
this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
}

private void initCommonFields(Map<TopicPartition, PartitionData> responseData) {
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);

List<Struct> topicArray = new ArrayList<Struct>();
Expand All @@ -94,32 +144,22 @@ public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
topicArray.add(topicData);
}
struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
this.responseData = responseData;
}

public FetchResponse(Struct struct) {
super(struct);
responseData = new HashMap<TopicPartition, PartitionData>();
for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
Struct topicResponse = (Struct) topicResponseObj;
String topic = topicResponse.getString(TOPIC_KEY_NAME);
for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
Struct partitionResponse = (Struct) partitionResponseObj;
int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
responseData.put(new TopicPartition(topic, partition), partitionData);
}
}
}

public Map<TopicPartition, PartitionData> responseData() {
return responseData;
}

public int getThrottleTime() {
return this.throttleTime;
}

public static FetchResponse parse(ByteBuffer buffer) {
return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
}

public static FetchResponse parse(ByteBuffer buffer, int version) {
return new FetchResponse((Struct) ProtoUtils.responseSchema(ApiKeys.FETCH.id, version).read(buffer));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.List;
import java.util.Map;

public class ProduceRequest extends AbstractRequest {
public class ProduceRequest extends AbstractRequest {

private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
private static final String ACKS_KEY_NAME = "acks";
Expand Down Expand Up @@ -103,7 +103,7 @@ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {

switch (versionId) {
case 0:
return new ProduceResponse(responseMap);
return new ProduceResponse(responseMap, 0);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));
Expand Down
Loading

0 comments on commit 436b7dd

Please sign in to comment.