Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][offload]keep topic/ns set-offload-policies consistent behavior logic #20646

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
keep topic/ns set-offload-policies behavior same
  • Loading branch information
ethqunzhong committed Jun 25, 2023
commit 0b12e7fed85c7c41abd08d629ba8bb44a2d020ce
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,33 @@ public void tenants() throws Exception {
}

@Test
public void namespacesSetOffloadPolicies() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Namespaces mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
Lookup mockLookup = mock(Lookup.class);
when(admin.lookups()).thenReturn(mockLookup);

// filesystem offload
CmdNamespaces namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split(
"set-offload-policies myprop/clust/ns2 -d filesystem -oat 100M -oats 1h -oae 1h -orp bookkeeper-first"));
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns2",
OffloadPoliciesImpl.create("filesystem", null, null,
null, null, null, null, null, 64 * 1024 * 1024, 1024 * 1024,
100 * 1024 * 1024L, 3600L, 3600 * 1000L, OffloadedReadPriority.BOOKKEEPER_FIRST));

// S3 offload
CmdNamespaces namespaces2 = new CmdNamespaces(() -> admin);
namespaces2.run(split(
"set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oats 100 -oae 10s -orp tiered-storage-first"));
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket",
"http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
10 * 1024 * 1024L, 100L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST));
}

@Test
public void namespaces() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Namespaces mockNamespaces = mock(Namespaces.class);
Expand Down Expand Up @@ -1455,6 +1482,34 @@ public void topicPolicies() throws Exception {
verify(mockGlobalTopicsPolicies).removeAutoSubscriptionCreation("persistent://prop/clust/ns1/ds1");
}

@Test
public void topicsSetOffloadPolicies() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Topics mockTopics = mock(Topics.class);
when(admin.topics()).thenReturn(mockTopics);
Schemas mockSchemas = mock(Schemas.class);
when(admin.schemas()).thenReturn(mockSchemas);
Lookup mockLookup = mock(Lookup.class);
when(admin.lookups()).thenReturn(mockLookup);

// filesystem offload
CmdTopics cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d filesystem -oat 100M -oats 1h -oae 1h -orp bookkeeper-first"));
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("filesystem", null, null
, null, null, null, null, null, 64 * 1024 * 1024, 1024 * 1024,
100 * 1024 * 1024L, 3600L, 3600 * 1000L, OffloadedReadPriority.BOOKKEEPER_FIRST);
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies);

// S3 offload
CmdTopics cmdTopics2 = new CmdTopics(() -> admin);
cmdTopics2.run(split("set-offload-policies persistent://myprop/clust/ns1/ds2 -d s3 -r region -b bucket -e endpoint -ts 50 -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
OffloadPoliciesImpl offloadPolicies2 = OffloadPoliciesImpl.create("s3", "region", "bucket"
, "endpoint", null, null, null, null,
8, 9, 10L, 50L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST);
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds2", offloadPolicies2);
}


@Test
public void topics() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2229,7 +2229,7 @@ private class SetOffloadPolicies extends CliCommand {
@Parameter(
names = {"--bucket", "-b"},
description = "Bucket to place offloaded ledger into",
required = true)
required = false)
private String bucket;

@Parameter(
Expand Down Expand Up @@ -2265,7 +2265,8 @@ private class SetOffloadPolicies extends CliCommand {

@Parameter(
names = {"--maxBlockSize", "-mbs"},
description = "Max block size (eg: 32M, 64M), default is 64MB",
description = "Max block size (eg: 32M, 64M), default is 64MB"
+ "s3 and google-cloud-storage requires this parameter",
required = false)
private String maxBlockSizeStr;

Expand All @@ -2277,7 +2278,8 @@ private class SetOffloadPolicies extends CliCommand {

@Parameter(
names = {"--offloadAfterElapsed", "-oae"},
description = "Offload after elapsed in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w).",
description = "Delay time in Millis for deleting the bookkeeper ledger after offload "
+ "(or seconds,minutes,hours,days,weeks eg: 10s, 100m, 3h, 2d, 5w).",
required = false)
private String offloadAfterElapsedStr;

Expand All @@ -2289,7 +2291,7 @@ private class SetOffloadPolicies extends CliCommand {

@Parameter(
names = {"--offloadAfterThresholdInSeconds", "-oats"},
description = "Offload after threshold seconds (eg: 1,5,10)",
description = "Offload after threshold seconds (or minutes,hours,days,weeks eg: 100m, 3h, 2d, 5w).",
required = false
)
ethqunzhong marked this conversation as resolved.
Show resolved Hide resolved
private String offloadAfterThresholdInSecondsStr;
Expand Down Expand Up @@ -2390,7 +2392,13 @@ && maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) {

Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) {
long offloadThresholdInSeconds0 = Long.parseLong(offloadAfterThresholdInSecondsStr.trim());
Long offloadThresholdInSeconds0;
try {
offloadThresholdInSeconds0 = TimeUnit.SECONDS.toSeconds(
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterThresholdInSecondsStr.trim()));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}
if (maxValueCheck("OffloadAfterThresholdInSeconds", offloadThresholdInSeconds0, Long.MAX_VALUE)) {
offloadThresholdInSeconds = offloadThresholdInSeconds0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
Expand All @@ -47,6 +48,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
Expand All @@ -62,18 +64,7 @@
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.*;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.RelativeTimeUtil;
Expand Down Expand Up @@ -2143,27 +2134,32 @@ private class SetOffloadPolicies extends CliCommand {
, description = "S3 role session name used for STSAssumeRoleSessionCredentialsProvider")
private String s3RoleSessionName;

@Parameter(names = {"-m", "--maxBlockSizeInBytes"},
description = "ManagedLedger offload max block Size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
private int maxBlockSizeInBytes;
@Parameter(
names = {"-m", "--maxBlockSizeInBytes", "--maxBlockSize", "-mbs"},
description = "Max block size (eg: 32M, 64M), default is 64MB"
+ "s3 and google-cloud-storage requires this parameter",
required = false)
private String maxBlockSizeStr;

@Parameter(names = {"-rb", "--readBufferSizeInBytes"},
description = "ManagedLedger offload read buffer size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
private int readBufferSizeInBytes;
@Parameter(
names = {"-rb", "--readBufferSizeInBytes", "--readBufferSize", "-rbs"},
description = "Read buffer size (eg: 1M, 5M), default is 1MB"
+ "s3 and google-cloud-storage requires this parameter",
required = false)
private String readBufferSizeStr;

@Parameter(names = {"-t", "--offloadThresholdInBytes"}
, description = "ManagedLedger offload threshold in bytes", required = true)
private long offloadThresholdInBytes;
@Parameter(names = {"-t", "--offloadThresholdInBytes", "--offloadAfterThreshold", "-oat"}
, description = "Offload after threshold size (eg: 1M, 5M)", required = true)
private String offloadAfterThresholdStr;

@Parameter(names = {"-ts", "--offloadThresholdInSeconds"}
, description = "ManagedLedger offload threshold in seconds")
private Long offloadThresholdInSeconds;
@Parameter(names = {"-ts", "--offloadThresholdInSeconds", "--offloadAfterThresholdInSeconds", "-oats"},
description = "Offload after threshold seconds (or minutes,hours,days,weeks eg: 100m, 3h, 2d, 5w).")
private String offloadAfterThresholdInSecondsStr;

@Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
, description = "ManagedLedger offload deletion lag in bytes")
private Long offloadDeletionLagInMillis;
@Parameter(names = {"-dl", "--offloadDeletionLagInMillis", "--offloadAfterElapsed", "-oae"}
, description = "Delay time in Millis for deleting the bookkeeper ledger after offload "
+ "(or seconds,minutes,hours,days,weeks eg: 10s, 100m, 3h, 2d, 5w).")
private String offloadAfterElapsedStr;

@Parameter(names = {"--offloadedReadPriority", "-orp"},
description = "Read priority for offloaded messages. "
Expand All @@ -2175,10 +2171,102 @@ private class SetOffloadPolicies extends CliCommand {
)
private String offloadReadPriorityStr;

public final List<String> driverNames = OffloadPoliciesImpl.DRIVER_NAMES;

public boolean driverSupported(String driver) {
return driverNames.stream().anyMatch(d -> d.equalsIgnoreCase(driver));
}

public boolean isS3Driver(String driver) {
if (StringUtils.isEmpty(driver)) {
return false;
}
return driver.equalsIgnoreCase(driverNames.get(0)) || driver.equalsIgnoreCase(driverNames.get(1));
}

public boolean positiveCheck(String paramName, long value) {
if (value <= 0) {
throw new ParameterException(paramName + " is not be negative or 0!");
}
return true;
}

public boolean maxValueCheck(String paramName, long value, long maxValue) {
if (value > maxValue) {
throw new ParameterException(paramName + " is not bigger than " + maxValue + "!");
}
return true;
}

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);

if (!driverSupported(driver)) {
throw new ParameterException(
"The driver " + driver + " is not supported, "
+ "(Possible values: " + String.join(",", driverNames) + ").");
}
if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
throw new ParameterException(
"Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set"
+ " if s3 offload enabled");
}

int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
if (StringUtils.isNotEmpty(maxBlockSizeStr)) {
long maxBlockSize = validateSizeString(maxBlockSizeStr);
if (positiveCheck("MaxBlockSize", maxBlockSize)
&& maxValueCheck("MaxBlockSize", maxBlockSize, Integer.MAX_VALUE)) {
maxBlockSizeInBytes = Long.valueOf(maxBlockSize).intValue();
}
}

int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
if (StringUtils.isNotEmpty(readBufferSizeStr)) {
long readBufferSize = validateSizeString(readBufferSizeStr);
if (positiveCheck("ReadBufferSize", readBufferSize)
&& maxValueCheck("ReadBufferSize", readBufferSize, Integer.MAX_VALUE)) {
readBufferSizeInBytes = Long.valueOf(readBufferSize).intValue();
}
}

Long offloadAfterElapsedInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
if (StringUtils.isNotEmpty(offloadAfterElapsedStr)) {
Long offloadAfterElapsed;
try {
offloadAfterElapsed = TimeUnit.SECONDS.toMillis(
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterElapsedStr));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}
if (maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) {
offloadAfterElapsedInMillis = offloadAfterElapsed;
}
}

Long offloadAfterThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
if (StringUtils.isNotEmpty(offloadAfterThresholdStr)) {
long offloadAfterThreshold = validateSizeString(offloadAfterThresholdStr);
if (maxValueCheck("OffloadAfterThreshold", offloadAfterThreshold, Long.MAX_VALUE)) {
offloadAfterThresholdInBytes = offloadAfterThreshold;
}
}

Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) {
Long offloadThresholdInSeconds0;
try {
offloadThresholdInSeconds0 = TimeUnit.SECONDS.toSeconds(
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterThresholdInSecondsStr.trim()));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}
if (maxValueCheck("OffloadAfterThresholdInSeconds", offloadThresholdInSeconds0, Long.MAX_VALUE)) {
offloadThresholdInSeconds = offloadThresholdInSeconds0;
}
}

OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;

if (this.offloadReadPriorityStr != null) {
Expand All @@ -2193,12 +2281,11 @@ void run() throws PulsarAdminException {
}
}

OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
OffloadPolicies offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
s3Role, s3RoleSessionName,
awsId, awsSecret,
maxBlockSizeInBytes,
readBufferSizeInBytes, offloadThresholdInBytes, offloadThresholdInSeconds,
offloadDeletionLagInMillis, offloadedReadPriority);
maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes,
offloadThresholdInSeconds, offloadAfterElapsedInMillis, offloadedReadPriority);

getTopics().setOffloadPolicies(persistentTopic, offloadPolicies);
}
Expand Down