From 47981c93812a7556129d85b06e10ab82f4da0681 Mon Sep 17 00:00:00 2001 From: Tao Jiuming <95597048+tjiuming@users.noreply.github.com> Date: Mon, 21 Nov 2022 11:39:13 +0800 Subject: [PATCH] [feat][admin] Add offload managedLedgerOffloadThreshold RestAPI and CLI tools (#18218) --- .../mledger/impl/OffloadLedgerDeleteTest.java | 1 + .../mledger/impl/OffloadPrefixReadTest.java | 3 +- .../mledger/impl/OffloadPrefixTest.java | 1 + .../broker/admin/impl/NamespacesBase.java | 31 +++++++ .../pulsar/broker/admin/v2/Namespaces.java | 52 ++++++++++++ .../broker/admin/AdminApiOffloadTest.java | 77 ++++++++++++++++- .../pulsar/broker/admin/NamespacesTest.java | 26 +++++- .../service/ReplicatorTopicPoliciesTest.java | 2 +- .../pulsar/client/admin/Namespaces.java | 83 +++++++++++++++++++ .../pulsar/common/policies/data/Policies.java | 5 +- .../client/admin/internal/NamespacesImpl.java | 23 +++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 16 ++-- .../pulsar/admin/cli/CmdNamespaces.java | 18 +++- .../pulsar/admin/cli/CmdTopicPolicies.java | 7 +- .../apache/pulsar/admin/cli/CmdTopics.java | 7 +- .../policies/data/OffloadPoliciesImpl.java | 9 +- .../policies/data/OffloadPoliciesTest.java | 11 +++ .../sql/presto/TestPulsarSplitManager.java | 1 + 18 files changed, 354 insertions(+), 19 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java index c54cee0765384..56da315553ea4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java @@ -82,6 +82,7 @@ Set deletedOffloads() { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS, OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 718d49f986110..cd224e33e2734 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -202,12 +202,13 @@ static class MockLedgerOffloader implements LedgerOffloader { ConcurrentHashMap offloads = new ConcurrentHashMap(); - OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("S3", "", "", "", + OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl .create("S3", "", "", "", null, null, null, null, OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS, OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 9edcbeb64814b..2cdb14fb71e41 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -1262,6 +1262,7 @@ Set deletedOffloads() { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, + OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS, OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 4dcd8809c7788..04fdc6d5104e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1967,6 +1967,37 @@ protected void internalSetOffloadThreshold(long newThreshold) { } } + protected CompletableFuture internalSetOffloadThresholdInSecondsAsync(long newThreshold) { + CompletableFuture f = new CompletableFuture<>(); + + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE) + .thenApply(v -> validatePoliciesReadOnlyAccessAsync()) + .thenApply(v -> updatePoliciesAsync(namespaceName, + policies -> { + if (policies.offload_policies == null) { + policies.offload_policies = new OffloadPoliciesImpl(); + } + ((OffloadPoliciesImpl) policies.offload_policies) + .setManagedLedgerOffloadThresholdInSeconds(newThreshold); + policies.offload_threshold_in_seconds = newThreshold; + return policies; + }) + ) + .thenAccept(v -> { + log.info("[{}] Successfully updated offloadThresholdInSeconds configuration:" + + " namespace={}, value={}", clientAppId(), namespaceName, newThreshold); + f.complete(null); + }) + .exceptionally(t -> { + log.error("[{}] Failed to update offloadThresholdInSeconds configuration for namespace {}", + clientAppId(), namespaceName, t); + f.completeExceptionally(new RestException(t)); + return null; + }); + + return f; + } + protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) { validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 2acc763e56584..b6bf1f0927cc6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -2090,6 +2090,58 @@ public void setOffloadThreshold(@PathParam("tenant") String tenant, internalSetOffloadThreshold(newThreshold); } + @GET + @Path("/{tenant}/{namespace}/offloadThresholdInSeconds") + @ApiOperation(value = "Maximum number of bytes stored on the pulsar cluster for a topic," + + " before the broker will start offloading to longterm storage", + notes = "A negative value disables automatic offloading") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace doesn't exist") }) + public void getOffloadThresholdInSeconds( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> { + if (policies.offload_policies == null + || policies.offload_policies.getManagedLedgerOffloadThresholdInSeconds() == null) { + asyncResponse.resume(policies.offload_threshold_in_seconds); + } else { + asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadThresholdInSeconds()); + } + }) + .exceptionally(ex -> { + log.error("[{}] Failed to get offload threshold on namespace {}", clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @PUT + @Path("/{tenant}/{namespace}/offloadThresholdInSeconds") + @ApiOperation(value = "Set maximum number of seconds stored on the pulsar cluster for a topic," + + " before the broker will start offloading to longterm storage", + notes = "A negative value disables automatic offloading") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace doesn't exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "offloadThresholdInSeconds value is not valid") }) + public void setOffloadThresholdInSeconds( + @Suspended final AsyncResponse response, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + long newThreshold) { + validateNamespaceName(tenant, namespace); + internalSetOffloadThresholdInSecondsAsync(newThreshold) + .thenAccept(response::resume) + .exceptionally(t -> { + resumeAsyncResponseExceptionally(response, t); + return null; + }); + } + @GET @Path("/{tenant}/{namespace}/offloadDeletionLagMs") @ApiOperation(value = "Number of milliseconds to wait before deleting a ledger segment which has been offloaded" diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 862ce926581b3..604bc437f1963 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -186,12 +186,13 @@ public void testOffloadPolicies() throws Exception { String bucket = "test-bucket"; String endpoint = "test-endpoint"; long offloadThresholdInBytes = 0; + long offloadThresholdInSeconds = 100; long offloadDeletionLagInMillis = 100L; OffloadedReadPriority priority = OffloadedReadPriority.TIERED_STORAGE_FIRST; OffloadPolicies offload1 = OffloadPoliciesImpl.create( driver, region, bucket, endpoint, null, null, null, null, - 100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis, priority); + 100, 100, offloadThresholdInBytes, offloadThresholdInSeconds, offloadDeletionLagInMillis, priority); admin.namespaces().setOffloadPolicies(namespaceName, offload1); OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName); assertEquals(offload1, offload2); @@ -239,6 +240,7 @@ public void testOffloadPoliciesAppliedApi() throws Exception { OffloadPoliciesImpl namespacePolicies = new OffloadPoliciesImpl(); namespacePolicies.setManagedLedgerOffloadThresholdInBytes(100L); + namespacePolicies.setManagedLedgerOffloadThresholdInSeconds(100L); namespacePolicies.setManagedLedgerOffloadDeletionLagInMillis(200L); namespacePolicies.setManagedLedgerOffloadDriver("s3"); namespacePolicies.setManagedLedgerOffloadBucket("buck"); @@ -250,6 +252,7 @@ public void testOffloadPoliciesAppliedApi() throws Exception { OffloadPoliciesImpl topicPolicies = new OffloadPoliciesImpl(); topicPolicies.setManagedLedgerOffloadThresholdInBytes(200L); + topicPolicies.setManagedLedgerOffloadThresholdInSeconds(200L); topicPolicies.setManagedLedgerOffloadDeletionLagInMillis(400L); topicPolicies.setManagedLedgerOffloadDriver("s3"); topicPolicies.setManagedLedgerOffloadBucket("buck2"); @@ -267,6 +270,78 @@ public void testOffloadPoliciesAppliedApi() throws Exception { -> assertEquals(admin.topics().getOffloadPolicies(topicName, true), brokerPolicies)); } + + @Test + public void testSetNamespaceOffloadPolicies() throws Exception { + conf.setManagedLedgerOffloadThresholdInSeconds(100); + conf.setManagedLedgerOffloadAutoTriggerSizeThresholdBytes(100); + + OffloadPoliciesImpl policies = new OffloadPoliciesImpl(); + policies.setManagedLedgerOffloadThresholdInBytes(200L); + policies.setManagedLedgerOffloadThresholdInSeconds(200L); + policies.setManagedLedgerOffloadDeletionLagInMillis(400L); + policies.setManagedLedgerOffloadDriver("s3"); + policies.setManagedLedgerOffloadBucket("buck2"); + + admin.namespaces().setOffloadThresholdInSeconds(myNamespace, 300); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(myNamespace), 300); + + admin.namespaces().setOffloadPolicies(myNamespace, policies); + assertEquals(admin.namespaces().getOffloadPolicies(myNamespace), policies); + } + + @Test + public void testSetTopicOffloadPolicies() throws Exception { + conf.setManagedLedgerOffloadThresholdInSeconds(100); + conf.setManagedLedgerOffloadAutoTriggerSizeThresholdBytes(100); + + LedgerOffloader topicOffloader = mock(LedgerOffloader.class); + when(topicOffloader.getOffloadDriverName()).thenReturn("mock"); + doReturn(topicOffloader).when(pulsar).createManagedLedgerOffloader(any()); + + OffloadPoliciesImpl namespacePolicies = new OffloadPoliciesImpl(); + namespacePolicies.setManagedLedgerOffloadThresholdInBytes(200L); + namespacePolicies.setManagedLedgerOffloadThresholdInSeconds(200L); + namespacePolicies.setManagedLedgerOffloadDeletionLagInMillis(400L); + namespacePolicies.setManagedLedgerOffloadDriver("s3"); + namespacePolicies.setManagedLedgerOffloadBucket("buck2"); + + admin.namespaces().setOffloadThresholdInSeconds(myNamespace, 300); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(myNamespace), 300); + + admin.namespaces().setOffloadPolicies(myNamespace, namespacePolicies); + assertEquals(admin.namespaces().getOffloadPolicies(myNamespace), namespacePolicies); + + OffloadPoliciesImpl topicPolicies = new OffloadPoliciesImpl(); + topicPolicies.setManagedLedgerOffloadThresholdInBytes(500L); + topicPolicies.setManagedLedgerOffloadThresholdInSeconds(500L); + topicPolicies.setManagedLedgerOffloadDeletionLagInMillis(400L); + topicPolicies.setManagedLedgerOffloadDriver("s3"); + topicPolicies.setManagedLedgerOffloadBucket("buck2"); + + String topicName = testTopic + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topicName); + admin.topicPolicies().setOffloadPolicies(topicName, topicPolicies); + + // Wait until broker update policies finished. + for (int a = 1; a <= 5; a++) { + try { + OffloadPolicies policies = admin.topicPolicies().getOffloadPolicies(topicName); + + assertEquals(policies.getManagedLedgerOffloadThresholdInSeconds(), + topicPolicies.getManagedLedgerOffloadThresholdInSeconds()); + assertEquals(policies.getManagedLedgerOffloadThresholdInBytes(), + topicPolicies.getManagedLedgerOffloadThresholdInBytes()); + } catch (Exception e) { + if (a == 5) { + throw e; + } else { + Thread.sleep(1000L); + } + } + } + } + @Test public void testTopicLevelOffloadPartitioned() throws Exception { testOffload(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 307c8447674f6..0b640c16776e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1455,15 +1455,18 @@ public void testSetOffloadThreshold() throws Exception { System.out.println(namespace); // set a default pulsar.getConfiguration().setManagedLedgerOffloadAutoTriggerSizeThresholdBytes(1); + pulsar.getConfiguration().setManagedLedgerOffloadThresholdInSeconds(100); // create the namespace admin.namespaces().createNamespace(namespace, Set.of(testLocalCluster)); admin.topics().createNonPartitionedTopic(topicName.toString()); admin.namespaces().setOffloadDeleteLag(namespace, 10000, TimeUnit.SECONDS); - assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace)); + assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1); // assert we get the default which indicates it will fall back to default - assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace)); + assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), -1); // the ledger config should have the expected value ManagedLedgerConfig ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get(); MockLedgerOffloader offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "", @@ -1472,15 +1475,21 @@ public void testSetOffloadThreshold() throws Exception { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), + admin.namespaces().getOffloadThresholdInSeconds(namespace), pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(-1)); + assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(), + Long.valueOf(-1)); + // set an override for the namespace admin.namespaces().setOffloadThreshold(namespace, 100); - assertEquals(100, admin.namespaces().getOffloadThreshold(namespace)); + admin.namespaces().setOffloadThresholdInSeconds(namespace, 100); + assertEquals(admin.namespaces().getOffloadThreshold(namespace), 100); + assertEquals(admin.namespaces().getOffloadThresholdInSeconds(namespace), 100); ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get(); admin.namespaces().getOffloadPolicies(namespace); offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "", @@ -1489,14 +1498,18 @@ public void testSetOffloadThreshold() throws Exception { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), + admin.namespaces().getOffloadThresholdInSeconds(namespace), pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(100)); + assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(), + Long.valueOf(100)); // set another negative value to disable admin.namespaces().setOffloadThreshold(namespace, -2); + admin.namespaces().setOffloadThresholdInSeconds(namespace, -2); assertEquals(-2, admin.namespaces().getOffloadThreshold(namespace)); ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get(); offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "", @@ -1505,14 +1518,18 @@ public void testSetOffloadThreshold() throws Exception { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), + admin.namespaces().getOffloadThresholdInSeconds(namespace), pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(-2)); + assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(), + Long.valueOf(-2)); // set back to -1 and fall back to default admin.namespaces().setOffloadThreshold(namespace, -1); + admin.namespaces().setOffloadThresholdInSeconds(namespace, -1); assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace)); ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get(); offloader = new MockLedgerOffloader(OffloadPoliciesImpl.create("S3", "", "", "", @@ -1521,11 +1538,14 @@ public void testSetOffloadThreshold() throws Exception { OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), + admin.namespaces().getOffloadThresholdInSeconds(namespace), pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(-1)); + assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds(), + Long.valueOf(-1)); // cleanup admin.topics().delete(topicName.toString(), true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index b2405ed6eec77..c0281f073cfd4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -703,7 +703,7 @@ public void testReplicatorOffloadPolicies() throws Exception { init(namespace, persistentTopicName); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("s3", "region", "bucket", "endpoint", null, null, null, null, - 8, 9, 10L, null, OffloadedReadPriority.BOOKKEEPER_FIRST); + 8, 9, 10L, 10L, null, OffloadedReadPriority.BOOKKEEPER_FIRST); // local try { admin1.topicPolicies().setOffloadPolicies(persistentTopicName, offloadPolicies); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 16d4e155635d8..f4c284bb48434 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -3532,6 +3532,45 @@ CompletableFuture removeMaxUnackedMessagesPerSubscriptionAsync( */ CompletableFuture getOffloadThresholdAsync(String namespace); + + /** + * Get the offloadThresholdInSeconds for a namespace. + * + *

+ * Response example: + * + *

+     * 10000000
+     * 
+ * + * @param namespace + * Namespace name + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + long getOffloadThresholdInSeconds(String namespace) throws PulsarAdminException; + + + /** + * Get the offloadThresholdInSeconds for a namespace. + * + *

+ * Response example: + * + *

+     * 10000000
+     * 
+ * + * @param namespace + * Namespace name + */ + CompletableFuture getOffloadThresholdInSecondsAsync(String namespace); + /** * Set the offloadThreshold for a namespace. *

@@ -3581,6 +3620,50 @@ CompletableFuture removeMaxUnackedMessagesPerSubscriptionAsync( */ CompletableFuture setOffloadThresholdAsync(String namespace, long offloadThreshold); + + /** + * Set the offloadThresholdInSeconds for a namespace. + *

+ * Negative values disabled automatic offloading. Setting a threshold of 0 will offload data as soon as possible. + *

+ * Request example: + * + *

+     * 10000000
+     * 
+ * + * @param namespace + * Namespace name + * @param offloadThresholdInSeconds + * maximum number of bytes stored before offloading is triggered + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + void setOffloadThresholdInSeconds(String namespace, long offloadThresholdInSeconds) throws PulsarAdminException; + + /** + * Set the offloadThresholdInSeconds for a namespace. + *

+ * Negative values disabled automatic offloading. Setting a threshold of 0 will offload data as soon as possible. + *

+ * Request example: + * + *

+     * 10000000
+     * 
+ * + * @param namespace + * Namespace name + * @param offloadThresholdInSeconds + * maximum number of seconds stored before offloading is triggered + */ + CompletableFuture setOffloadThresholdInSecondsAsync(String namespace, long offloadThresholdInSeconds); + /** * Get the offload deletion lag for a namespace, in milliseconds. * The number of milliseconds to wait before deleting a ledger segment which has been offloaded from diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index ae469e9c5966f..066fdf1df4f09 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -94,6 +94,8 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") public long offload_threshold = -1; @SuppressWarnings("checkstyle:MemberName") + public long offload_threshold_in_seconds = -1; + @SuppressWarnings("checkstyle:MemberName") public Long offload_deletion_lag_ms = null; @SuppressWarnings("checkstyle:MemberName") public Integer max_topics_per_namespace = null; @@ -145,7 +147,7 @@ public int hashCode() { max_producers_per_topic, max_consumers_per_topic, max_consumers_per_subscription, max_unacked_messages_per_consumer, max_unacked_messages_per_subscription, - compaction_threshold, offload_threshold, + compaction_threshold, offload_threshold, offload_threshold_in_seconds, offload_deletion_lag_ms, schema_auto_update_compatibility_strategy, schema_validation_enforced, @@ -191,6 +193,7 @@ public boolean equals(Object obj) { && Objects.equals(max_consumers_per_subscription, other.max_consumers_per_subscription) && Objects.equals(compaction_threshold, other.compaction_threshold) && offload_threshold == other.offload_threshold + && offload_threshold_in_seconds == other.offload_threshold_in_seconds && Objects.equals(offload_deletion_lag_ms, other.offload_deletion_lag_ms) && schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy && schema_validation_enforced == other.schema_validation_enforced diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 3d193c827732e..6d4889a751d37 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -1518,6 +1518,16 @@ public CompletableFuture getOffloadThresholdAsync(String namespace) { return asyncGetNamespaceParts(new FutureCallback(){}, namespace, "offloadThreshold"); } + @Override + public long getOffloadThresholdInSeconds(String namespace) throws PulsarAdminException { + return sync(() -> getOffloadThresholdInSecondsAsync(namespace)); + } + + @Override + public CompletableFuture getOffloadThresholdInSecondsAsync(String namespace) { + return asyncGetNamespaceParts(new FutureCallback(){}, namespace, "offloadThresholdInSeconds"); + } + @Override public void setOffloadThreshold(String namespace, long offloadThreshold) throws PulsarAdminException { sync(() -> setOffloadThresholdAsync(namespace, offloadThreshold)); @@ -1530,6 +1540,19 @@ public CompletableFuture setOffloadThresholdAsync(String namespace, long o return asyncPutRequest(path, Entity.entity(offloadThreshold, MediaType.APPLICATION_JSON)); } + @Override + public void setOffloadThresholdInSeconds(String namespace, long offloadThresholdInSeconds) + throws PulsarAdminException { + sync(() -> setOffloadThresholdInSecondsAsync(namespace, offloadThresholdInSeconds)); + } + + @Override + public CompletableFuture setOffloadThresholdInSecondsAsync(String namespace, long offloadThresholdInSeconds) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "offloadThresholdInSeconds"); + return asyncPutRequest(path, Entity.entity(offloadThresholdInSeconds, MediaType.APPLICATION_JSON)); + } + @Override public Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException { return sync(() -> getOffloadDeleteLagMsAsync(namespace)); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 8741af7c2672b..cf76910ce6dfd 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -813,11 +813,11 @@ public void namespaces() throws Exception { verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1"); namespaces.run(split( - "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s -orp tiered-storage-first")); + "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oats 100 -oae 10s -orp tiered-storage-first")); verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1", OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket", "http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024, - 10 * 1024 * 1024L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + 10 * 1024 * 1024L, 100L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST)); namespaces.run(split("remove-offload-policies myprop/clust/ns1")); verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1"); @@ -973,11 +973,11 @@ public void topicPolicies() throws Exception { verify(mockTopicsPolicies).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" + - " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first")); + " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -ts 10 -orp tiered-storage-first")); verify(mockTopicsPolicies) .setOffloadPolicies("persistent://myprop/clust/ns1/ds1", OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, - 8, 9, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + 8, 9, 10L, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).getRetention("persistent://myprop/clust/ns1/ds1", false); @@ -1423,11 +1423,11 @@ public void topicPolicies() throws Exception { verify(mockGlobalTopicsPolicies).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" + - " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first -g")); + " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -ts 100 -orp tiered-storage-first -g")); verify(mockGlobalTopicsPolicies) .setOffloadPolicies("persistent://myprop/clust/ns1/ds1", OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, - 8, 9, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + 8, 9, 10L, 100L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); cmdTopics.run(split("set-auto-subscription-creation persistent://prop/clust/ns1/ds1 -e -g")); verify(mockGlobalTopicsPolicies).setAutoSubscriptionCreation("persistent://prop/clust/ns1/ds1", @@ -1720,10 +1720,10 @@ public void topics() throws Exception { cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1")); verify(mockTopics).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ; - cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first")); + cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -ts 50 -m 8 -rb 9 -t 10 -orp tiered-storage-first")); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, - 8, 9, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST); + 8, 9, 10L, 50L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST); verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies); cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1")); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index aba0a6cda547c..b64df272b4468 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -2273,6 +2273,13 @@ private class SetOffloadPolicies extends CliCommand { required = false) private String offloadAfterThresholdStr; + @Parameter( + names = {"--offloadAfterThresholdInSeconds", "-oats"}, + description = "Offload after threshold seconds (eg: 1,5,10)", + required = false + ) + private String offloadAfterThresholdInSecondsStr; + @Parameter( names = {"--offloadedReadPriority", "-orp"}, description = "Read priority for offloaded messages. By default, once messages are offloaded to " @@ -2366,6 +2373,15 @@ && maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) { offloadAfterThresholdInBytes = offloadAfterThreshold; } } + + Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS; + if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) { + long offloadThresholdInSeconds0 = Long.parseLong(offloadAfterThresholdInSecondsStr.trim()); + if (maxValueCheck("OffloadAfterThresholdInSeconds", offloadThresholdInSeconds0, Long.MAX_VALUE)) { + offloadThresholdInSeconds = offloadThresholdInSeconds0; + } + } + OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY; if (this.offloadReadPriorityStr != null) { @@ -2384,7 +2400,7 @@ && maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) { s3Role, s3RoleSessionName, awsId, awsSecret, maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes, - offloadAfterElapsedInMillis, offloadedReadPriority); + offloadThresholdInSeconds, offloadAfterElapsedInMillis, offloadedReadPriority); getAdmin().namespaces().setOffloadPolicies(namespace, offloadPolicies); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index 1a29dd451d6a2..98e6530082634 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -1763,6 +1763,10 @@ private class SetOffloadPolicies extends CliCommand { , description = "ManagedLedger offload threshold in bytes", required = true) private long offloadThresholdInBytes; + @Parameter(names = {"-ts", "--offloadThresholdInSeconds"} + , description = "ManagedLedger offload threshold in seconds") + private Long offloadThresholdInSeconds; + @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"} , description = "ManagedLedger offload deletion lag in bytes") private Long offloadDeletionLagInMillis; @@ -1804,7 +1808,8 @@ void run() throws PulsarAdminException { s3Role, s3RoleSessionName, awsId, awsSecret, maxBlockSizeInBytes, - readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority); + readBufferSizeInBytes, offloadThresholdInBytes, offloadThresholdInSeconds, + offloadDeletionLagInMillis, offloadedReadPriority); getTopicPolicies(isGlobal).setOffloadPolicies(persistentTopic, offloadPolicies); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 3b6f571333855..60aa649737878 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -2150,6 +2150,10 @@ private class SetOffloadPolicies extends CliCommand { , description = "ManagedLedger offload threshold in bytes", required = true) private long offloadThresholdInBytes; + @Parameter(names = {"-ts", "--offloadThresholdInSeconds"} + , description = "ManagedLedger offload threshold in seconds") + private Long offloadThresholdInSeconds; + @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"} , description = "ManagedLedger offload deletion lag in bytes") private Long offloadDeletionLagInMillis; @@ -2186,7 +2190,8 @@ void run() throws PulsarAdminException { s3Role, s3RoleSessionName, awsId, awsSecret, maxBlockSizeInBytes, - readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority); + readBufferSizeInBytes, offloadThresholdInBytes, offloadThresholdInSeconds, + offloadDeletionLagInMillis, offloadedReadPriority); getTopics().setOffloadPolicies(persistentTopic, offloadPolicies); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 4eee063cc9618..fb33e3198aa60 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -184,11 +184,13 @@ public static OffloadPoliciesImpl create(String driver, String region, String bu String credentialId, String credentialSecret, Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes, Long offloadThresholdInBytes, + Long offloadThresholdInSeconds, Long offloadDeletionLagInMillis, OffloadedReadPriority readPriority) { OffloadPoliciesImplBuilder builder = builder() .managedLedgerOffloadDriver(driver) .managedLedgerOffloadThresholdInBytes(offloadThresholdInBytes) + .managedLedgerOffloadThresholdInSeconds(offloadThresholdInSeconds) .managedLedgerOffloadDeletionLagInMillis(offloadDeletionLagInMillis) .managedLedgerOffloadBucket(bucket) .managedLedgerOffloadRegion(region) @@ -402,7 +404,8 @@ private static void setProperty(Properties properties, String key, Object value) * @return offload policies */ public static OffloadPoliciesImpl oldPoliciesCompatible(OffloadPoliciesImpl nsLevelPolicies, Policies policies) { - if (policies == null || (policies.offload_threshold == -1 && policies.offload_deletion_lag_ms == null)) { + if (policies == null || (policies.offload_threshold == -1 && policies.offload_threshold_in_seconds == -1 + && policies.offload_deletion_lag_ms == null)) { return nsLevelPolicies; } if (nsLevelPolicies == null) { @@ -412,6 +415,10 @@ public static OffloadPoliciesImpl oldPoliciesCompatible(OffloadPoliciesImpl nsLe && policies.offload_threshold != -1) { nsLevelPolicies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold); } + if (nsLevelPolicies.getManagedLedgerOffloadThresholdInSeconds() == null + && policies.offload_threshold_in_seconds != -1) { + nsLevelPolicies.setManagedLedgerOffloadThresholdInSeconds(policies.offload_threshold_in_seconds); + } if (nsLevelPolicies.getManagedLedgerOffloadDeletionLagInMillis() == null && policies.offload_deletion_lag_ms != null) { nsLevelPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java index c0d45389821f9..88036b1688437 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java @@ -50,6 +50,7 @@ public void testS3Configuration() { final Integer maxBlockSizeInBytes = 5 * M; final Integer readBufferSizeInBytes = 2 * M; final Long offloadThresholdInBytes = 10L * M; + final Long offloadThresholdInSeconds = 1000L; final Long offloadDeletionLagInMillis = 5L * MIN; OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create( @@ -64,6 +65,7 @@ public void testS3Configuration() { maxBlockSizeInBytes, readBufferSizeInBytes, offloadThresholdInBytes, + offloadThresholdInSeconds, offloadDeletionLagInMillis, OffloadedReadPriority.TIERED_STORAGE_FIRST ); @@ -78,6 +80,7 @@ public void testS3Configuration() { offloadThresholdInBytes); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), Long.valueOf(offloadDeletionLagInMillis)); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInSeconds(), offloadThresholdInSeconds); } @Test @@ -93,6 +96,7 @@ public void testGcsConfiguration() { final Integer maxBlockSizeInBytes = 5 * M; final Integer readBufferSizeInBytes = 2 * M; final Long offloadThresholdInBytes = 0L; + final Long offloadThresholdInSeconds = 1000L; final Long offloadDeletionLagInMillis = 5 * MIN; final OffloadedReadPriority readPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST; @@ -108,6 +112,7 @@ public void testGcsConfiguration() { maxBlockSizeInBytes, readBufferSizeInBytes, offloadThresholdInBytes, + offloadThresholdInSeconds, offloadDeletionLagInMillis, readPriority ); @@ -120,6 +125,7 @@ public void testGcsConfiguration() { Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), offloadThresholdInBytes); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), offloadDeletionLagInMillis); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority(), readPriority); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInSeconds(), offloadThresholdInSeconds); } @Test @@ -255,22 +261,27 @@ public void compatibleWithConfigFileTest() { public void oldPoliciesCompatibleTest() { Policies policies = new Policies(); Assert.assertEquals(policies.offload_threshold, -1); + Assert.assertEquals(policies.offload_threshold_in_seconds, -1); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.oldPoliciesCompatible(null, policies); Assert.assertNull(offloadPolicies); policies.offload_deletion_lag_ms = 1000L; policies.offload_threshold = 0; + policies.offload_threshold_in_seconds = 0; offloadPolicies = OffloadPoliciesImpl.oldPoliciesCompatible(offloadPolicies, policies); Assert.assertNotNull(offloadPolicies); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), Long.valueOf(1000)); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(0)); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInSeconds(), Long.valueOf(0)); policies.offload_deletion_lag_ms = 2000L; policies.offload_threshold = 100; + policies.offload_threshold_in_seconds = 100; offloadPolicies = OffloadPoliciesImpl.oldPoliciesCompatible(offloadPolicies, policies); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), Long.valueOf(1000)); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), Long.valueOf(0)); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInSeconds(), Long.valueOf(0)); } @Test diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java index b10d575f88ca6..86b2ee56c85fe 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java @@ -426,6 +426,7 @@ public void pulsarSplitJsonCodecTest() throws JsonProcessingException, Unsupport 5000, 2000, 1000L, + 1000L, 5000L, OffloadedReadPriority.BOOKKEEPER_FIRST );