Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limit support for Replicator between clusters #4273

Merged
merged 4 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,15 @@ dispatchThrottlingRatePerSubscriptionInMsg=0

# Default number of message-bytes dispatching throttling-limit for a subscription.
# Using a value of 0, is disabling default message-byte dispatch-throttling.
dispatchThrottlingRatePerSubscribeInByte=0
dispatchThrottlingRatePerSubscriptionInByte=0

# Default messages per second dispatch throttling-limit for every replicator in replication.
# Using a value of 0, is disabling replication message dispatch-throttling
dispatchThrottlingRatePerReplicatorInMsg=0

# Default bytes per second dispatch throttling-limit for every replicator in replication.
# Using a value of 0, is disabling replication message-byte dispatch-throttling
dispatchThrottlingRatePerReplicatorInByte=0

# By default we enable dispatch-throttling for both caught up consumers as well as consumers who have
# backlog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Default number of message-bytes dispatching throttling-limit for every topic. \n\n"
+ "Using a value of 0, is disabling default message-byte dispatch-throttling")
private long dispatchThrottlingRatePerTopicInByte = 0;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
Expand All @@ -389,7 +390,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_POLICIES,
doc = "Default number of message-bytes dispatching throttling-limit for a subscription. \n\n"
+ "Using a value of 0, is disabling default message-byte dispatch-throttling.")
private long dispatchThrottlingRatePerSubscribeInByte = 0;
private long dispatchThrottlingRatePerSubscriptionInByte = 0;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Default number of message dispatching throttling-limit for every replicator in replication. \n\n"
+ "Using a value of 0, is disabling replication message dispatch-throttling")
private int dispatchThrottlingRatePerReplicatorInMsg = 0;
@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Default number of message-bytes dispatching throttling-limit for every replicator in replication. \n\n"
+ "Using a value of 0, is disabling replication message-byte dispatch-throttling")
private long dispatchThrottlingRatePerReplicatorInByte = 0;

@FieldContext(
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S

final String cluster = config.getClusterName();
// attach default dispatch rate polices
if (policies.clusterDispatchRate.isEmpty()) {
policies.clusterDispatchRate.put(cluster, dispatchRate());
if (policies.topicDispatchRate.isEmpty()) {
policies.topicDispatchRate.put(cluster, dispatchRate());
}

if (policies.subscriptionDispatchRate.isEmpty()) {
Expand All @@ -400,7 +400,7 @@ protected DispatchRate dispatchRate() {
protected DispatchRate subscriptionDispatchRate() {
return new DispatchRate(
pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(),
pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte(),
pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInByte(),
1
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat
log.debug("Failed to validate cluster ownership for {}-{}, {}", namespaceName.toString(), bundleRange, e.getMessage(), e);
}
}

// validate namespace ownership only if namespace is not owned by local-cluster (it happens when broker doesn't
// receive replication-cluster change watch and still owning bundle
if (!isOwnedByLocalCluster) {
Expand Down Expand Up @@ -657,7 +657,7 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit
}
}

protected void internalSetDispatchRate(DispatchRate dispatchRate) {
protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();

Expand All @@ -668,7 +668,7 @@ protected void internalSetDispatchRate(DispatchRate dispatchRate) {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
policiesNode.getKey().topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);

// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
Expand All @@ -694,11 +694,11 @@ protected void internalSetDispatchRate(DispatchRate dispatchRate) {
}
}

protected DispatchRate internalGetDispatchRate() {
protected DispatchRate internalGetTopicDispatchRate() {
validateAdminAccessForTenant(namespaceName.getTenant());

Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.clusterDispatchRate.get(pulsar().getConfiguration().getClusterName());
DispatchRate dispatchRate = policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
if (dispatchRate != null) {
return dispatchRate;
} else {
Expand Down Expand Up @@ -806,6 +806,56 @@ protected SubscribeRate internalGetSubscribeRate() {
}
}

protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();

Entry<Policies, Stat> policiesNode = null;

try {
final String path = path(POLICIES, namespaceName.toString());
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);

// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
policiesNode.getValue().getVersion());
policiesCache().invalidate(path);

log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}: does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to update the replicatorDispatchRate for cluster on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected DispatchRate internalGetReplicatorDispatchRate() {
validateAdminAccessForTenant(namespaceName.getTenant());

Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
if (dispatchRate != null) {
return dispatchRate;
} else {
throw new RestException(Status.NOT_FOUND,
"replicator-Dispatch-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
}
}

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa
public void setDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, DispatchRate dispatchRate) {
validateNamespaceName(property, cluster, namespace);
internalSetDispatchRate(dispatchRate);
internalSetTopicDispatchRate(dispatchRate);
}

@GET
Expand All @@ -446,7 +446,7 @@ public void setDispatchRate(@PathParam("property") String property, @PathParam("
public DispatchRate getDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetDispatchRate();
return internalGetTopicDispatchRate();
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void grantPermissionOnSubscription(@PathParam("property") String property
validateNamespaceName(property, namespace);
internalGrantPermissionOnSubscription(subscription, roles);
}

@DELETE
@Path("/{tenant}/{namespace}/permissions/{role}")
@ApiOperation(value = "Revoke all permissions to a role on a namespace.")
Expand All @@ -208,7 +208,7 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert
validateNamespaceName(property, namespace);
internalRevokePermissionsOnSubscription(subscription, role);
}

@GET
@Path("/{tenant}/{namespace}/replication")
@ApiOperation(value = "Get the replication clusters for a namespace.", response = String.class, responseContainer = "List")
Expand Down Expand Up @@ -336,7 +336,7 @@ public void splitNamespaceBundle(@PathParam("tenant") String tenant, @PathParam(
public void setDispatchRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
DispatchRate dispatchRate) {
validateNamespaceName(tenant, namespace);
internalSetDispatchRate(dispatchRate);
internalSetTopicDispatchRate(dispatchRate);
}

@GET
Expand All @@ -347,7 +347,7 @@ public void setDispatchRate(@PathParam("tenant") String tenant, @PathParam("name
public DispatchRate getDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetDispatchRate();
return internalGetTopicDispatchRate();
}

@POST
Expand Down Expand Up @@ -393,6 +393,28 @@ public SubscribeRate getSubscribeRate(@PathParam("tenant") String tenant,
return internalGetSubscribeRate();
}

@POST
@Path("/{tenant}/{namespace}/replicatorDispatchRate")
@ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
DispatchRate dispatchRate) {
validateNamespaceName(tenant, namespace);
internalSetReplicatorDispatchRate(dispatchRate);
}

@GET
@Path("/{tenant}/{namespace}/replicatorDispatchRate")
@ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, -1 represents not configured yet")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetReplicatorDispatchRate();
}

@GET
@Path("/{tenant}/{namespace}/backlogQuotaMap")
@ApiOperation(value = "Get backlog quota map on a namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,26 +1195,34 @@ private void updateConfigurationAndRegisterListeners() {
log.warn("Failed to change load manager due to {}", ex);
}
});
// add listener to update message-dispatch-rate in msg
// add listener to update message-dispatch-rate in msg for topic
registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> {
updateTopicMessageDispatchRate();
});
// add listener to update message-dispatch-rate in byte
// add listener to update message-dispatch-rate in byte for topic
registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", (dispatchRatePerTopicInByte) -> {
updateTopicMessageDispatchRate();
});
// add listener to update managed-ledger config to skipNonRecoverableLedgers
registerConfigurationListener("autoSkipNonRecoverableData", (skipNonRecoverableLedger) -> {
updateManagedLedgerConfig();
});
// add listener to update message-dispatch-rate in msg
// add listener to update message-dispatch-rate in msg for subscription
registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInMsg", (dispatchRatePerTopicInMsg) -> {
updateSubscriptionMessageDispatchRate();
});
// add listener to update message-dispatch-rate in byte
registerConfigurationListener("dispatchThrottlingRatePerSubscribeInByte", (dispatchRatePerTopicInByte) -> {
// add listener to update message-dispatch-rate in byte for subscription
registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInByte", (dispatchRatePerTopicInByte) -> {
updateSubscriptionMessageDispatchRate();
});
// add listener to update message-dispatch-rate in msg for replicator
registerConfigurationListener("dispatchThrottlingRatePerReplicatorInMsg", (dispatchRatePerTopicInMsg) -> {
updateReplicatorMessageDispatchRate();
});
// add listener to update message-dispatch-rate in byte for replicator
registerConfigurationListener("dispatchThrottlingRatePerReplicatorInByte", (dispatchRatePerTopicInByte) -> {
updateReplicatorMessageDispatchRate();
});
// add more listeners here
}

Expand Down Expand Up @@ -1243,6 +1251,18 @@ private void updateSubscriptionMessageDispatchRate() {
});
}

private void updateReplicatorMessageDispatchRate() {
this.pulsar().getExecutor().submit(() -> {
// update message-rate for each topic Replicator in Geo-replication
forEachTopic(topic ->
topic.getReplicators().forEach((name, persistentReplicator) -> {
if (persistentReplicator.getRateLimiter().isPresent()) {
persistentReplicator.getRateLimiter().get().updateDispatchRate();
}
}));
});
}

private void updateManagedLedgerConfig() {
this.pulsar().getExecutor().execute(() -> {
// update managed-ledger config of each topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
*/
package org.apache.pulsar.broker.service;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ReplicatorStats;

public interface Replicator {

void startProducer();

ReplicatorStats getStats();

CompletableFuture<Void> disconnect();
Expand All @@ -36,4 +39,11 @@ public interface Replicator {

String getRemoteCluster();

default void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
//No-op
}

default Optional<DispatchRateLimiter> getRateLimiter() {
return Optional.empty();
}
}
Loading