diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 282336b4c37d3..8509d037cbabc 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -356,6 +356,33 @@ public void tenants() throws Exception { } @Test + public void namespacesSetOffloadPolicies() throws Exception { + PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); + Namespaces mockNamespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(mockNamespaces); + Lookup mockLookup = mock(Lookup.class); + when(admin.lookups()).thenReturn(mockLookup); + + // filesystem offload + CmdNamespaces namespaces = new CmdNamespaces(() -> admin); + namespaces.run(split( + "set-offload-policies myprop/clust/ns2 -d filesystem -oat 100M -oats 1h -oae 1h -orp bookkeeper-first")); + verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns2", + OffloadPoliciesImpl.create("filesystem", null, null, + null, null, null, null, null, 64 * 1024 * 1024, 1024 * 1024, + 100 * 1024 * 1024L, 3600L, 3600 * 1000L, OffloadedReadPriority.BOOKKEEPER_FIRST)); + + // S3 offload + CmdNamespaces namespaces2 = new CmdNamespaces(() -> admin); + namespaces2.run(split( + "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oats 100 -oae 10s -orp tiered-storage-first")); + verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1", + OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket", + "http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024, + 10 * 1024 * 1024L, 100L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + } + + @Test public void namespaces() throws Exception { PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); Namespaces mockNamespaces = mock(Namespaces.class); @@ -1455,6 +1482,34 @@ public void topicPolicies() throws Exception { verify(mockGlobalTopicsPolicies).removeAutoSubscriptionCreation("persistent://prop/clust/ns1/ds1"); } + @Test + public void topicsSetOffloadPolicies() throws Exception { + PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); + Topics mockTopics = mock(Topics.class); + when(admin.topics()).thenReturn(mockTopics); + Schemas mockSchemas = mock(Schemas.class); + when(admin.schemas()).thenReturn(mockSchemas); + Lookup mockLookup = mock(Lookup.class); + when(admin.lookups()).thenReturn(mockLookup); + + // filesystem offload + CmdTopics cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d filesystem -oat 100M -oats 1h -oae 1h -orp bookkeeper-first")); + OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("filesystem", null, null + , null, null, null, null, null, 64 * 1024 * 1024, 1024 * 1024, + 100 * 1024 * 1024L, 3600L, 3600 * 1000L, OffloadedReadPriority.BOOKKEEPER_FIRST); + verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies); + +// S3 offload + CmdTopics cmdTopics2 = new CmdTopics(() -> admin); + cmdTopics2.run(split("set-offload-policies persistent://myprop/clust/ns1/ds2 -d s3 -r region -b bucket -e endpoint -ts 50 -m 8 -rb 9 -t 10 -orp tiered-storage-first")); + OffloadPoliciesImpl offloadPolicies2 = OffloadPoliciesImpl.create("s3", "region", "bucket" + , "endpoint", null, null, null, null, + 8, 9, 10L, 50L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST); + verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds2", offloadPolicies2); + } + + @Test public void topics() throws Exception { PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 998591f8177d1..3d18b97060a08 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -2229,7 +2229,7 @@ private class SetOffloadPolicies extends CliCommand { @Parameter( names = {"--bucket", "-b"}, description = "Bucket to place offloaded ledger into", - required = true) + required = false) private String bucket; @Parameter( @@ -2265,7 +2265,8 @@ private class SetOffloadPolicies extends CliCommand { @Parameter( names = {"--maxBlockSize", "-mbs"}, - description = "Max block size (eg: 32M, 64M), default is 64MB", + description = "Max block size (eg: 32M, 64M), default is 64MB" + + "s3 and google-cloud-storage requires this parameter", required = false) private String maxBlockSizeStr; @@ -2277,7 +2278,8 @@ private class SetOffloadPolicies extends CliCommand { @Parameter( names = {"--offloadAfterElapsed", "-oae"}, - description = "Offload after elapsed in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w).", + description = "Delay time in Millis for deleting the bookkeeper ledger after offload " + + "(or seconds,minutes,hours,days,weeks eg: 10s, 100m, 3h, 2d, 5w).", required = false) private String offloadAfterElapsedStr; @@ -2289,7 +2291,7 @@ private class SetOffloadPolicies extends CliCommand { @Parameter( names = {"--offloadAfterThresholdInSeconds", "-oats"}, - description = "Offload after threshold seconds (eg: 1,5,10)", + description = "Offload after threshold seconds (or minutes,hours,days,weeks eg: 100m, 3h, 2d, 5w).", required = false ) private String offloadAfterThresholdInSecondsStr; @@ -2390,7 +2392,13 @@ && maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) { Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS; if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) { - long offloadThresholdInSeconds0 = Long.parseLong(offloadAfterThresholdInSecondsStr.trim()); + Long offloadThresholdInSeconds0; + try { + offloadThresholdInSeconds0 = TimeUnit.SECONDS.toSeconds( + RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterThresholdInSecondsStr.trim())); + } catch (IllegalArgumentException exception) { + throw new ParameterException(exception.getMessage()); + } if (maxValueCheck("OffloadAfterThresholdInSeconds", offloadThresholdInSeconds0, Long.MAX_VALUE)) { offloadThresholdInSeconds = offloadThresholdInSeconds0; } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index f96410749aea6..b07ad0056617e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -23,6 +23,7 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; import com.beust.jcommander.Parameters; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -47,6 +48,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.OffloadProcessStatus; @@ -67,6 +69,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -2143,27 +2146,32 @@ private class SetOffloadPolicies extends CliCommand { , description = "S3 role session name used for STSAssumeRoleSessionCredentialsProvider") private String s3RoleSessionName; - @Parameter(names = {"-m", "--maxBlockSizeInBytes"}, - description = "ManagedLedger offload max block Size in bytes," - + "s3 and google-cloud-storage requires this parameter") - private int maxBlockSizeInBytes; + @Parameter( + names = {"-m", "--maxBlockSizeInBytes", "--maxBlockSize", "-mbs"}, + description = "Max block size (eg: 32M, 64M), default is 64MB" + + "s3 and google-cloud-storage requires this parameter", + required = false) + private String maxBlockSizeStr; - @Parameter(names = {"-rb", "--readBufferSizeInBytes"}, - description = "ManagedLedger offload read buffer size in bytes," - + "s3 and google-cloud-storage requires this parameter") - private int readBufferSizeInBytes; + @Parameter( + names = {"-rb", "--readBufferSizeInBytes", "--readBufferSize", "-rbs"}, + description = "Read buffer size (eg: 1M, 5M), default is 1MB" + + "s3 and google-cloud-storage requires this parameter", + required = false) + private String readBufferSizeStr; - @Parameter(names = {"-t", "--offloadThresholdInBytes"} - , description = "ManagedLedger offload threshold in bytes", required = true) - private long offloadThresholdInBytes; + @Parameter(names = {"-t", "--offloadThresholdInBytes", "--offloadAfterThreshold", "-oat"} + , description = "Offload after threshold size (eg: 1M, 5M)", required = false) + private String offloadAfterThresholdStr; - @Parameter(names = {"-ts", "--offloadThresholdInSeconds"} - , description = "ManagedLedger offload threshold in seconds") - private Long offloadThresholdInSeconds; + @Parameter(names = {"-ts", "--offloadThresholdInSeconds", "--offloadAfterThresholdInSeconds", "-oats"}, + description = "Offload after threshold seconds (or minutes,hours,days,weeks eg: 100m, 3h, 2d, 5w).") + private String offloadAfterThresholdInSecondsStr; - @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"} - , description = "ManagedLedger offload deletion lag in bytes") - private Long offloadDeletionLagInMillis; + @Parameter(names = {"-dl", "--offloadDeletionLagInMillis", "--offloadAfterElapsed", "-oae"} + , description = "Delay time in Millis for deleting the bookkeeper ledger after offload " + + "(or seconds,minutes,hours,days,weeks eg: 10s, 100m, 3h, 2d, 5w).") + private String offloadAfterElapsedStr; @Parameter(names = {"--offloadedReadPriority", "-orp"}, description = "Read priority for offloaded messages. " @@ -2175,10 +2183,102 @@ private class SetOffloadPolicies extends CliCommand { ) private String offloadReadPriorityStr; + public final List driverNames = OffloadPoliciesImpl.DRIVER_NAMES; + + public boolean driverSupported(String driver) { + return driverNames.stream().anyMatch(d -> d.equalsIgnoreCase(driver)); + } + + public boolean isS3Driver(String driver) { + if (StringUtils.isEmpty(driver)) { + return false; + } + return driver.equalsIgnoreCase(driverNames.get(0)) || driver.equalsIgnoreCase(driverNames.get(1)); + } + + public boolean positiveCheck(String paramName, long value) { + if (value <= 0) { + throw new ParameterException(paramName + " is not be negative or 0!"); + } + return true; + } + + public boolean maxValueCheck(String paramName, long value, long maxValue) { + if (value > maxValue) { + throw new ParameterException(paramName + " is not bigger than " + maxValue + "!"); + } + return true; + } + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); + if (!driverSupported(driver)) { + throw new ParameterException( + "The driver " + driver + " is not supported, " + + "(Possible values: " + String.join(",", driverNames) + ")."); + } + if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) { + throw new ParameterException( + "Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set" + + " if s3 offload enabled"); + } + + int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; + if (StringUtils.isNotEmpty(maxBlockSizeStr)) { + long maxBlockSize = validateSizeString(maxBlockSizeStr); + if (positiveCheck("MaxBlockSize", maxBlockSize) + && maxValueCheck("MaxBlockSize", maxBlockSize, Integer.MAX_VALUE)) { + maxBlockSizeInBytes = Long.valueOf(maxBlockSize).intValue(); + } + } + + int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES; + if (StringUtils.isNotEmpty(readBufferSizeStr)) { + long readBufferSize = validateSizeString(readBufferSizeStr); + if (positiveCheck("ReadBufferSize", readBufferSize) + && maxValueCheck("ReadBufferSize", readBufferSize, Integer.MAX_VALUE)) { + readBufferSizeInBytes = Long.valueOf(readBufferSize).intValue(); + } + } + + Long offloadAfterElapsedInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS; + if (StringUtils.isNotEmpty(offloadAfterElapsedStr)) { + Long offloadAfterElapsed; + try { + offloadAfterElapsed = TimeUnit.SECONDS.toMillis( + RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterElapsedStr)); + } catch (IllegalArgumentException exception) { + throw new ParameterException(exception.getMessage()); + } + if (maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) { + offloadAfterElapsedInMillis = offloadAfterElapsed; + } + } + + Long offloadAfterThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES; + if (StringUtils.isNotEmpty(offloadAfterThresholdStr)) { + long offloadAfterThreshold = validateSizeString(offloadAfterThresholdStr); + if (maxValueCheck("OffloadAfterThreshold", offloadAfterThreshold, Long.MAX_VALUE)) { + offloadAfterThresholdInBytes = offloadAfterThreshold; + } + } + + Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS; + if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) { + Long offloadThresholdInSeconds0; + try { + offloadThresholdInSeconds0 = TimeUnit.SECONDS.toSeconds( + RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterThresholdInSecondsStr.trim())); + } catch (IllegalArgumentException exception) { + throw new ParameterException(exception.getMessage()); + } + if (maxValueCheck("OffloadAfterThresholdInSeconds", offloadThresholdInSeconds0, Long.MAX_VALUE)) { + offloadThresholdInSeconds = offloadThresholdInSeconds0; + } + } + OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY; if (this.offloadReadPriorityStr != null) { @@ -2193,12 +2293,11 @@ void run() throws PulsarAdminException { } } - OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint, + OffloadPolicies offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint, s3Role, s3RoleSessionName, awsId, awsSecret, - maxBlockSizeInBytes, - readBufferSizeInBytes, offloadThresholdInBytes, offloadThresholdInSeconds, - offloadDeletionLagInMillis, offloadedReadPriority); + maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes, + offloadThresholdInSeconds, offloadAfterElapsedInMillis, offloadedReadPriority); getTopics().setOffloadPolicies(persistentTopic, offloadPolicies); }